/*
|
* Copyright 1999-2019 Seata.io Group.
|
*
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
* you may not use this file except in compliance with the License.
|
* You may obtain a copy of the License at
|
*
|
* http://www.apache.org/licenses/LICENSE-2.0
|
*
|
* Unless required by applicable law or agreed to in writing, software
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
* See the License for the specific language governing permissions and
|
* limitations under the License.
|
*/
|
package io.seata.server.transaction.saga;
|
|
import io.netty.channel.Channel;
|
import io.seata.common.util.CollectionUtils;
|
import io.seata.core.exception.GlobalTransactionException;
|
import io.seata.core.exception.TransactionException;
|
import io.seata.core.model.BranchStatus;
|
import io.seata.core.model.BranchType;
|
import io.seata.core.model.GlobalStatus;
|
import io.seata.core.protocol.transaction.BranchCommitRequest;
|
import io.seata.core.protocol.transaction.BranchCommitResponse;
|
import io.seata.core.protocol.transaction.BranchRollbackRequest;
|
import io.seata.core.protocol.transaction.BranchRollbackResponse;
|
import io.seata.core.rpc.RemotingServer;
|
import io.seata.core.rpc.netty.ChannelManager;
|
import io.seata.server.coordinator.AbstractCore;
|
import io.seata.server.session.BranchSession;
|
import io.seata.server.session.GlobalSession;
|
import io.seata.server.session.SessionHelper;
|
import io.seata.server.session.SessionHolder;
|
|
import java.io.IOException;
|
import java.util.Map;
|
import java.util.concurrent.TimeoutException;
|
|
/**
|
* The type saga core.
|
*
|
* @author ph3636
|
*/
|
public class SagaCore extends AbstractCore {
|
|
public SagaCore(RemotingServer remotingServer) {
|
super(remotingServer);
|
}
|
|
@Override
|
public BranchType getHandleBranchType() {
|
return BranchType.SAGA;
|
}
|
|
@Override
|
public void globalSessionStatusCheck(GlobalSession globalSession) throws GlobalTransactionException {
|
// SAGA type accept forward(retry) operation on timeout or commit fail, forward operation will register remaining branches
|
}
|
|
@Override
|
public BranchStatus branchCommitSend(BranchCommitRequest request, GlobalSession globalSession,
|
BranchSession branchSession) throws IOException, TimeoutException {
|
Map<String, Channel> channels = ChannelManager.getRmChannels();
|
if (CollectionUtils.isEmpty(channels)) {
|
LOGGER.error("Failed to commit SAGA global[" + globalSession.getXid() + ", RM channels is empty.");
|
return BranchStatus.PhaseTwo_CommitFailed_Retryable;
|
}
|
String sagaResourceId = getSagaResourceId(globalSession);
|
Channel sagaChannel = channels.get(sagaResourceId);
|
if (sagaChannel == null) {
|
LOGGER.error("Failed to commit SAGA global[" + globalSession.getXid()
|
+ ", cannot find channel by resourceId[" + sagaResourceId + "]");
|
return BranchStatus.PhaseTwo_CommitFailed_Retryable;
|
}
|
BranchCommitResponse response = (BranchCommitResponse) remotingServer.sendSyncRequest(sagaChannel, request);
|
return response.getBranchStatus();
|
}
|
|
@Override
|
public BranchStatus branchRollbackSend(BranchRollbackRequest request, GlobalSession globalSession,
|
BranchSession branchSession) throws IOException, TimeoutException {
|
Map<String, Channel> channels = ChannelManager.getRmChannels();
|
if (CollectionUtils.isEmpty(channels)) {
|
LOGGER.error("Failed to rollback SAGA global[" + globalSession.getXid() + ", RM channels is empty.");
|
return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
|
}
|
String sagaResourceId = getSagaResourceId(globalSession);
|
Channel sagaChannel = channels.get(sagaResourceId);
|
if (sagaChannel == null) {
|
LOGGER.error("Failed to rollback SAGA global[" + globalSession.getXid()
|
+ ", cannot find channel by resourceId[" + sagaResourceId + "]");
|
return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
|
}
|
BranchRollbackResponse response = (BranchRollbackResponse) remotingServer.sendSyncRequest(sagaChannel, request);
|
return response.getBranchStatus();
|
}
|
|
@Override
|
public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {
|
try {
|
BranchStatus branchStatus = branchCommit(globalSession, SessionHelper.newBranch(BranchType.SAGA,
|
globalSession.getXid(), -1, getSagaResourceId(globalSession), globalSession.getStatus().name()));
|
|
switch (branchStatus) {
|
case PhaseTwo_Committed:
|
SessionHelper.removeAllBranch(globalSession, !retrying);
|
LOGGER.info("Successfully committed SAGA global[" + globalSession.getXid() + "]");
|
break;
|
case PhaseTwo_Rollbacked:
|
LOGGER.info("Successfully rollbacked SAGA global[" + globalSession.getXid() + "]");
|
SessionHelper.removeAllBranch(globalSession, !retrying);
|
SessionHelper.endRollbacked(globalSession, retrying);
|
return false;
|
case PhaseTwo_RollbackFailed_Retryable:
|
LOGGER.error("By [{}], failed to rollback SAGA global [{}], will retry later.", branchStatus,
|
globalSession.getXid());
|
SessionHolder.getRetryCommittingSessionManager().removeGlobalSession(globalSession);
|
globalSession.queueToRetryRollback();
|
return false;
|
case PhaseOne_Failed:
|
LOGGER.error("By [{}], finish SAGA global [{}]", branchStatus, globalSession.getXid());
|
SessionHelper.removeAllBranch(globalSession, !retrying);
|
globalSession.changeGlobalStatus(GlobalStatus.Finished);
|
globalSession.end();
|
return false;
|
case PhaseTwo_CommitFailed_Unretryable:
|
if (globalSession.canBeCommittedAsync()) {
|
LOGGER.error("By [{}], failed to commit SAGA global [{}]", branchStatus,
|
globalSession.getXid());
|
break;
|
} else {
|
SessionHelper.endCommitFailed(globalSession,retrying);
|
LOGGER.error("Finally, failed to commit SAGA global[{}]", globalSession.getXid());
|
return false;
|
}
|
default:
|
if (!retrying) {
|
globalSession.queueToRetryCommit();
|
} else {
|
LOGGER.error("Failed to commit SAGA global[{}], will retry later.", globalSession.getXid());
|
}
|
return false;
|
}
|
} catch (Exception ex) {
|
LOGGER.error("Failed to commit global[" + globalSession.getXid() + "]", ex);
|
|
if (!retrying) {
|
globalSession.queueToRetryRollback();
|
}
|
throw new TransactionException(ex);
|
}
|
return true;
|
}
|
|
@Override
|
public boolean doGlobalRollback(GlobalSession globalSession, boolean retrying) throws TransactionException {
|
try {
|
BranchStatus branchStatus = branchRollback(globalSession, SessionHelper.newBranch(BranchType.SAGA,
|
globalSession.getXid(), -1, getSagaResourceId(globalSession), globalSession.getStatus().name()));
|
|
switch (branchStatus) {
|
case PhaseTwo_Rollbacked:
|
SessionHelper.removeAllBranch(globalSession, !retrying);
|
LOGGER.info("Successfully rollbacked SAGA global[{}]",globalSession.getXid());
|
break;
|
case PhaseTwo_RollbackFailed_Unretryable:
|
SessionHelper.endRollbackFailed(globalSession, retrying);
|
LOGGER.error("Failed to rollback SAGA global[{}]", globalSession.getXid());
|
return false;
|
case PhaseTwo_CommitFailed_Retryable:
|
SessionHolder.getRetryRollbackingSessionManager().removeGlobalSession(globalSession);
|
globalSession.queueToRetryCommit();
|
LOGGER.warn("Retry by custom recover strategy [Forward] on timeout, SAGA global[{}]", globalSession.getXid());
|
return false;
|
default:
|
LOGGER.error("Failed to rollback SAGA global[{}]", globalSession.getXid());
|
if (!retrying) {
|
globalSession.queueToRetryRollback();
|
}
|
return false;
|
}
|
} catch (Exception ex) {
|
LOGGER.error("Failed to rollback global[{}]", globalSession.getXid(), ex);
|
if (!retrying) {
|
globalSession.queueToRetryRollback();
|
}
|
throw new TransactionException(ex);
|
}
|
return true;
|
}
|
|
@Override
|
public void doGlobalReport(GlobalSession globalSession, String xid, GlobalStatus globalStatus) throws TransactionException {
|
if (GlobalStatus.Committed.equals(globalStatus)) {
|
SessionHelper.removeAllBranch(globalSession, false);
|
SessionHelper.endCommitted(globalSession, false);
|
LOGGER.info("Global[{}] committed", globalSession.getXid());
|
} else if (GlobalStatus.Rollbacked.equals(globalStatus)
|
|| GlobalStatus.Finished.equals(globalStatus)) {
|
SessionHelper.removeAllBranch(globalSession, false);
|
SessionHelper.endRollbacked(globalSession, false);
|
LOGGER.info("Global[{}] rollbacked", globalSession.getXid());
|
} else {
|
globalSession.changeGlobalStatus(globalStatus);
|
LOGGER.info("Global[{}] reporting is successfully done. status[{}]", globalSession.getXid(), globalSession.getStatus());
|
|
if (GlobalStatus.RollbackRetrying.equals(globalStatus)
|
|| GlobalStatus.TimeoutRollbackRetrying.equals(globalStatus)
|
|| GlobalStatus.UnKnown.equals(globalStatus)) {
|
globalSession.queueToRetryRollback();
|
LOGGER.info("Global[{}] will retry rollback", globalSession.getXid());
|
} else if (GlobalStatus.CommitRetrying.equals(globalStatus)) {
|
globalSession.queueToRetryCommit();
|
LOGGER.info("Global[{}] will retry commit", globalSession.getXid());
|
}
|
}
|
}
|
|
/**
|
* get saga ResourceId
|
*
|
* @param globalSession the globalSession
|
* @return sagaResourceId
|
*/
|
private String getSagaResourceId(GlobalSession globalSession) {
|
return globalSession.getApplicationId() + "#" + globalSession.getTransactionServiceGroup();
|
}
|
}
|