/* * Copyright 1999-2019 Seata.io Group. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package io.seata.server.transaction.saga; import io.netty.channel.Channel; import io.seata.common.util.CollectionUtils; import io.seata.core.exception.GlobalTransactionException; import io.seata.core.exception.TransactionException; import io.seata.core.model.BranchStatus; import io.seata.core.model.BranchType; import io.seata.core.model.GlobalStatus; import io.seata.core.protocol.transaction.BranchCommitRequest; import io.seata.core.protocol.transaction.BranchCommitResponse; import io.seata.core.protocol.transaction.BranchRollbackRequest; import io.seata.core.protocol.transaction.BranchRollbackResponse; import io.seata.core.rpc.RemotingServer; import io.seata.core.rpc.netty.ChannelManager; import io.seata.server.coordinator.AbstractCore; import io.seata.server.session.BranchSession; import io.seata.server.session.GlobalSession; import io.seata.server.session.SessionHelper; import io.seata.server.session.SessionHolder; import java.io.IOException; import java.util.Map; import java.util.concurrent.TimeoutException; /** * The type saga core. * * @author ph3636 */ public class SagaCore extends AbstractCore { public SagaCore(RemotingServer remotingServer) { super(remotingServer); } @Override public BranchType getHandleBranchType() { return BranchType.SAGA; } @Override public void globalSessionStatusCheck(GlobalSession globalSession) throws GlobalTransactionException { // SAGA type accept forward(retry) operation on timeout or commit fail, forward operation will register remaining branches } @Override public BranchStatus branchCommitSend(BranchCommitRequest request, GlobalSession globalSession, BranchSession branchSession) throws IOException, TimeoutException { Map channels = ChannelManager.getRmChannels(); if (CollectionUtils.isEmpty(channels)) { LOGGER.error("Failed to commit SAGA global[" + globalSession.getXid() + ", RM channels is empty."); return BranchStatus.PhaseTwo_CommitFailed_Retryable; } String sagaResourceId = getSagaResourceId(globalSession); Channel sagaChannel = channels.get(sagaResourceId); if (sagaChannel == null) { LOGGER.error("Failed to commit SAGA global[" + globalSession.getXid() + ", cannot find channel by resourceId[" + sagaResourceId + "]"); return BranchStatus.PhaseTwo_CommitFailed_Retryable; } BranchCommitResponse response = (BranchCommitResponse) remotingServer.sendSyncRequest(sagaChannel, request); return response.getBranchStatus(); } @Override public BranchStatus branchRollbackSend(BranchRollbackRequest request, GlobalSession globalSession, BranchSession branchSession) throws IOException, TimeoutException { Map channels = ChannelManager.getRmChannels(); if (CollectionUtils.isEmpty(channels)) { LOGGER.error("Failed to rollback SAGA global[" + globalSession.getXid() + ", RM channels is empty."); return BranchStatus.PhaseTwo_RollbackFailed_Retryable; } String sagaResourceId = getSagaResourceId(globalSession); Channel sagaChannel = channels.get(sagaResourceId); if (sagaChannel == null) { LOGGER.error("Failed to rollback SAGA global[" + globalSession.getXid() + ", cannot find channel by resourceId[" + sagaResourceId + "]"); return BranchStatus.PhaseTwo_RollbackFailed_Retryable; } BranchRollbackResponse response = (BranchRollbackResponse) remotingServer.sendSyncRequest(sagaChannel, request); return response.getBranchStatus(); } @Override public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException { try { BranchStatus branchStatus = branchCommit(globalSession, SessionHelper.newBranch(BranchType.SAGA, globalSession.getXid(), -1, getSagaResourceId(globalSession), globalSession.getStatus().name())); switch (branchStatus) { case PhaseTwo_Committed: SessionHelper.removeAllBranch(globalSession, !retrying); LOGGER.info("Successfully committed SAGA global[" + globalSession.getXid() + "]"); break; case PhaseTwo_Rollbacked: LOGGER.info("Successfully rollbacked SAGA global[" + globalSession.getXid() + "]"); SessionHelper.removeAllBranch(globalSession, !retrying); SessionHelper.endRollbacked(globalSession, retrying); return false; case PhaseTwo_RollbackFailed_Retryable: LOGGER.error("By [{}], failed to rollback SAGA global [{}], will retry later.", branchStatus, globalSession.getXid()); SessionHolder.getRetryCommittingSessionManager().removeGlobalSession(globalSession); globalSession.queueToRetryRollback(); return false; case PhaseOne_Failed: LOGGER.error("By [{}], finish SAGA global [{}]", branchStatus, globalSession.getXid()); SessionHelper.removeAllBranch(globalSession, !retrying); globalSession.changeGlobalStatus(GlobalStatus.Finished); globalSession.end(); return false; case PhaseTwo_CommitFailed_Unretryable: if (globalSession.canBeCommittedAsync()) { LOGGER.error("By [{}], failed to commit SAGA global [{}]", branchStatus, globalSession.getXid()); break; } else { SessionHelper.endCommitFailed(globalSession,retrying); LOGGER.error("Finally, failed to commit SAGA global[{}]", globalSession.getXid()); return false; } default: if (!retrying) { globalSession.queueToRetryCommit(); } else { LOGGER.error("Failed to commit SAGA global[{}], will retry later.", globalSession.getXid()); } return false; } } catch (Exception ex) { LOGGER.error("Failed to commit global[" + globalSession.getXid() + "]", ex); if (!retrying) { globalSession.queueToRetryRollback(); } throw new TransactionException(ex); } return true; } @Override public boolean doGlobalRollback(GlobalSession globalSession, boolean retrying) throws TransactionException { try { BranchStatus branchStatus = branchRollback(globalSession, SessionHelper.newBranch(BranchType.SAGA, globalSession.getXid(), -1, getSagaResourceId(globalSession), globalSession.getStatus().name())); switch (branchStatus) { case PhaseTwo_Rollbacked: SessionHelper.removeAllBranch(globalSession, !retrying); LOGGER.info("Successfully rollbacked SAGA global[{}]",globalSession.getXid()); break; case PhaseTwo_RollbackFailed_Unretryable: SessionHelper.endRollbackFailed(globalSession, retrying); LOGGER.error("Failed to rollback SAGA global[{}]", globalSession.getXid()); return false; case PhaseTwo_CommitFailed_Retryable: SessionHolder.getRetryRollbackingSessionManager().removeGlobalSession(globalSession); globalSession.queueToRetryCommit(); LOGGER.warn("Retry by custom recover strategy [Forward] on timeout, SAGA global[{}]", globalSession.getXid()); return false; default: LOGGER.error("Failed to rollback SAGA global[{}]", globalSession.getXid()); if (!retrying) { globalSession.queueToRetryRollback(); } return false; } } catch (Exception ex) { LOGGER.error("Failed to rollback global[{}]", globalSession.getXid(), ex); if (!retrying) { globalSession.queueToRetryRollback(); } throw new TransactionException(ex); } return true; } @Override public void doGlobalReport(GlobalSession globalSession, String xid, GlobalStatus globalStatus) throws TransactionException { if (GlobalStatus.Committed.equals(globalStatus)) { SessionHelper.removeAllBranch(globalSession, false); SessionHelper.endCommitted(globalSession, false); LOGGER.info("Global[{}] committed", globalSession.getXid()); } else if (GlobalStatus.Rollbacked.equals(globalStatus) || GlobalStatus.Finished.equals(globalStatus)) { SessionHelper.removeAllBranch(globalSession, false); SessionHelper.endRollbacked(globalSession, false); LOGGER.info("Global[{}] rollbacked", globalSession.getXid()); } else { globalSession.changeGlobalStatus(globalStatus); LOGGER.info("Global[{}] reporting is successfully done. status[{}]", globalSession.getXid(), globalSession.getStatus()); if (GlobalStatus.RollbackRetrying.equals(globalStatus) || GlobalStatus.TimeoutRollbackRetrying.equals(globalStatus) || GlobalStatus.UnKnown.equals(globalStatus)) { globalSession.queueToRetryRollback(); LOGGER.info("Global[{}] will retry rollback", globalSession.getXid()); } else if (GlobalStatus.CommitRetrying.equals(globalStatus)) { globalSession.queueToRetryCommit(); LOGGER.info("Global[{}] will retry commit", globalSession.getXid()); } } } /** * get saga ResourceId * * @param globalSession the globalSession * @return sagaResourceId */ private String getSagaResourceId(GlobalSession globalSession) { return globalSession.getApplicationId() + "#" + globalSession.getTransactionServiceGroup(); } }