/* * Copyright 1999-2019 Seata.io Group. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package io.seata.server.coordinator; import io.netty.channel.Channel; import io.seata.common.thread.NamedThreadFactory; import io.seata.common.util.CollectionUtils; import io.seata.config.ConfigurationFactory; import io.seata.core.constants.ConfigurationKeys; import io.seata.core.context.RootContext; import io.seata.core.exception.TransactionException; import io.seata.core.model.GlobalStatus; import io.seata.core.protocol.AbstractMessage; import io.seata.core.protocol.AbstractResultMessage; import io.seata.core.protocol.transaction.*; import io.seata.core.rpc.Disposable; import io.seata.core.rpc.RemotingServer; import io.seata.core.rpc.RpcContext; import io.seata.core.rpc.TransactionMessageHandler; import io.seata.core.rpc.netty.ChannelManager; import io.seata.core.rpc.netty.NettyRemotingServer; import io.seata.server.AbstractTCInboundHandler; import io.seata.server.metrics.MetricsPublisher; import io.seata.server.session.*; import io.seata.server.store.StoreConfig; import org.apache.commons.lang.time.DateFormatUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; import java.util.Collection; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import static io.seata.common.Constants.*; import static io.seata.common.DefaultValues.*; /** * The type Default coordinator. */ public class DefaultCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultCoordinator.class); private static final int TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS = 5000; /** * The constant COMMITTING_RETRY_PERIOD. */ protected static final long COMMITTING_RETRY_PERIOD = CONFIG.getLong(ConfigurationKeys.COMMITING_RETRY_PERIOD, DEFAULT_COMMITING_RETRY_PERIOD); /** * The constant ASYNC_COMMITTING_RETRY_PERIOD. */ protected static final long ASYNC_COMMITTING_RETRY_PERIOD = CONFIG.getLong( ConfigurationKeys.ASYNC_COMMITING_RETRY_PERIOD, DEFAULT_ASYNC_COMMITTING_RETRY_PERIOD); /** * The constant ROLLBACKING_RETRY_PERIOD. */ protected static final long ROLLBACKING_RETRY_PERIOD = CONFIG.getLong(ConfigurationKeys.ROLLBACKING_RETRY_PERIOD, DEFAULT_ROLLBACKING_RETRY_PERIOD); /** * The constant TIMEOUT_RETRY_PERIOD. */ protected static final long TIMEOUT_RETRY_PERIOD = CONFIG.getLong(ConfigurationKeys.TIMEOUT_RETRY_PERIOD, DEFAULT_TIMEOUT_RETRY_PERIOD); /** * The Transaction undo log delete period. */ protected static final long UNDO_LOG_DELETE_PERIOD = CONFIG.getLong( ConfigurationKeys.TRANSACTION_UNDO_LOG_DELETE_PERIOD, DEFAULT_UNDO_LOG_DELETE_PERIOD); /** * The Transaction undo log delay delete period */ protected static final long UNDO_LOG_DELAY_DELETE_PERIOD = 3 * 60 * 1000; private static final int ALWAYS_RETRY_BOUNDARY = 0; /** * default branch async queue size */ private static final int DEFAULT_BRANCH_ASYNC_QUEUE_SIZE = 5000; /** * the pool size of branch asynchronous remove thread pool */ private static final int BRANCH_ASYNC_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2; private static final long MAX_COMMIT_RETRY_TIMEOUT = ConfigurationFactory.getInstance().getLong( ConfigurationKeys.MAX_COMMIT_RETRY_TIMEOUT, DEFAULT_MAX_COMMIT_RETRY_TIMEOUT); private static final long MAX_ROLLBACK_RETRY_TIMEOUT = ConfigurationFactory.getInstance().getLong( ConfigurationKeys.MAX_ROLLBACK_RETRY_TIMEOUT, DEFAULT_MAX_ROLLBACK_RETRY_TIMEOUT); private static final boolean ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE = ConfigurationFactory.getInstance().getBoolean( ConfigurationKeys.ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE, DEFAULT_ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE); private final ScheduledThreadPoolExecutor retryRollbacking = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory(RETRY_ROLLBACKING, 1)); private final ScheduledThreadPoolExecutor retryCommitting = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory(RETRY_COMMITTING, 1)); private final ScheduledThreadPoolExecutor asyncCommitting = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory(ASYNC_COMMITTING, 1)); private final ScheduledThreadPoolExecutor timeoutCheck = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory(TX_TIMEOUT_CHECK, 1)); private final ScheduledThreadPoolExecutor undoLogDelete = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory(UNDOLOG_DELETE, 1)); private final GlobalStatus[] rollbackingStatuses = new GlobalStatus[] {GlobalStatus.TimeoutRollbacking, GlobalStatus.TimeoutRollbackRetrying, GlobalStatus.RollbackRetrying, GlobalStatus.Rollbacking}; private final GlobalStatus[] retryCommittingStatuses = new GlobalStatus[] {GlobalStatus.Committing, GlobalStatus.CommitRetrying, GlobalStatus.Committed}; private final ThreadPoolExecutor branchRemoveExecutor; private RemotingServer remotingServer; private final DefaultCore core; private static volatile DefaultCoordinator instance; /** * Instantiates a new Default coordinator. * * @param remotingServer the remoting server */ private DefaultCoordinator(RemotingServer remotingServer) { if (remotingServer == null) { throw new IllegalArgumentException("RemotingServer not allowed be null."); } this.remotingServer = remotingServer; this.core = new DefaultCore(remotingServer); boolean enableBranchAsyncRemove = CONFIG.getBoolean( ConfigurationKeys.ENABLE_BRANCH_ASYNC_REMOVE, DEFAULT_ENABLE_BRANCH_ASYNC_REMOVE); // create branchRemoveExecutor if (enableBranchAsyncRemove && StoreConfig.getSessionMode() != StoreConfig.SessionMode.FILE) { branchRemoveExecutor = new ThreadPoolExecutor(BRANCH_ASYNC_POOL_SIZE, BRANCH_ASYNC_POOL_SIZE, Integer.MAX_VALUE, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>( CONFIG.getInt(ConfigurationKeys.SESSION_BRANCH_ASYNC_QUEUE_SIZE, DEFAULT_BRANCH_ASYNC_QUEUE_SIZE) ), new NamedThreadFactory("branchSessionRemove", BRANCH_ASYNC_POOL_SIZE), new ThreadPoolExecutor.CallerRunsPolicy()); } else { branchRemoveExecutor = null; } } public static DefaultCoordinator getInstance(RemotingServer remotingServer) { if (null == instance) { synchronized (DefaultCoordinator.class) { if (null == instance) { instance = new DefaultCoordinator(remotingServer); } } } return instance; } public static DefaultCoordinator getInstance() { if (null == instance) { throw new IllegalArgumentException("The instance has not been created."); } return instance; } /** * Asynchronous remove branch * * @param globalSession the globalSession * @param branchSession the branchSession */ public void doBranchRemoveAsync(GlobalSession globalSession, BranchSession branchSession) { if (globalSession == null) { return; } branchRemoveExecutor.execute(new BranchRemoveTask(globalSession, branchSession)); } /** * Asynchronous remove all branch * * @param globalSession the globalSession */ public void doBranchRemoveAllAsync(GlobalSession globalSession) { if (globalSession == null) { return; } branchRemoveExecutor.execute(new BranchRemoveTask(globalSession)); } @Override protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext) throws TransactionException { response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout())); if (LOGGER.isInfoEnabled()) { LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}", rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid()); } } @Override protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext) throws TransactionException { MDC.put(RootContext.MDC_KEY_XID, request.getXid()); response.setGlobalStatus(core.commit(request.getXid())); } @Override protected void doGlobalRollback(GlobalRollbackRequest request, GlobalRollbackResponse response, RpcContext rpcContext) throws TransactionException { MDC.put(RootContext.MDC_KEY_XID, request.getXid()); response.setGlobalStatus(core.rollback(request.getXid())); } @Override protected void doGlobalStatus(GlobalStatusRequest request, GlobalStatusResponse response, RpcContext rpcContext) throws TransactionException { MDC.put(RootContext.MDC_KEY_XID, request.getXid()); response.setGlobalStatus(core.getStatus(request.getXid())); } @Override protected void doGlobalReport(GlobalReportRequest request, GlobalReportResponse response, RpcContext rpcContext) throws TransactionException { MDC.put(RootContext.MDC_KEY_XID, request.getXid()); response.setGlobalStatus(core.globalReport(request.getXid(), request.getGlobalStatus())); } @Override protected void doBranchRegister(BranchRegisterRequest request, BranchRegisterResponse response, RpcContext rpcContext) throws TransactionException { MDC.put(RootContext.MDC_KEY_XID, request.getXid()); response.setBranchId( core.branchRegister(request.getBranchType(), request.getResourceId(), rpcContext.getClientId(), request.getXid(), request.getApplicationData(), request.getLockKey())); } @Override protected void doBranchReport(BranchReportRequest request, BranchReportResponse response, RpcContext rpcContext) throws TransactionException { MDC.put(RootContext.MDC_KEY_XID, request.getXid()); MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(request.getBranchId())); core.branchReport(request.getBranchType(), request.getXid(), request.getBranchId(), request.getStatus(), request.getApplicationData()); } @Override protected void doLockCheck(GlobalLockQueryRequest request, GlobalLockQueryResponse response, RpcContext rpcContext) throws TransactionException { MDC.put(RootContext.MDC_KEY_XID, request.getXid()); response.setLockable( core.lockQuery(request.getBranchType(), request.getResourceId(), request.getXid(), request.getLockKey())); } /** * Timeout check. */ protected void timeoutCheck() { SessionCondition sessionCondition = new SessionCondition(GlobalStatus.Begin); sessionCondition.setLazyLoadBranch(true); Collection beginGlobalsessions = SessionHolder.getRootSessionManager().findGlobalSessions(sessionCondition); if (CollectionUtils.isEmpty(beginGlobalsessions)) { return; } if (!beginGlobalsessions.isEmpty() && LOGGER.isDebugEnabled()) { LOGGER.debug("Global transaction timeout check begin, size: {}", beginGlobalsessions.size()); } SessionHelper.forEach(beginGlobalsessions, globalSession -> { if (LOGGER.isDebugEnabled()) { LOGGER.debug( globalSession.getXid() + " " + globalSession.getStatus() + " " + globalSession.getBeginTime() + " " + globalSession.getTimeout()); } SessionHolder.lockAndExecute(globalSession, () -> { if (globalSession.getStatus() != GlobalStatus.Begin || !globalSession.isTimeout()) { return false; } LOGGER.warn("Global transaction[{}] is timeout and will be rollback,transaction begin time:{} and now:{}", globalSession.getXid(), DateFormatUtils.ISO_DATE_FORMAT.format(globalSession.getBeginTime()), DateFormatUtils.ISO_DATE_FORMAT.format(System.currentTimeMillis())); globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager()); globalSession.close(); globalSession.setStatus(GlobalStatus.TimeoutRollbacking); globalSession.addSessionLifecycleListener(SessionHolder.getRetryRollbackingSessionManager()); SessionHolder.getRetryRollbackingSessionManager().addGlobalSession(globalSession); // transaction timeout and start rollbacking event MetricsPublisher.postSessionDoingEvent(globalSession, GlobalStatus.TimeoutRollbacking.name(), false, false); return true; }); }); if (!beginGlobalsessions.isEmpty() && LOGGER.isDebugEnabled()) { LOGGER.debug("Global transaction timeout check end. "); } } /** * Handle retry rollbacking. */ protected void handleRetryRollbacking() { SessionCondition sessionCondition = new SessionCondition(rollbackingStatuses); sessionCondition.setLazyLoadBranch(true); Collection rollbackingSessions = SessionHolder.getRetryRollbackingSessionManager().findGlobalSessions(sessionCondition); if (CollectionUtils.isEmpty(rollbackingSessions)) { return; } long now = System.currentTimeMillis(); SessionHelper.forEach(rollbackingSessions, rollbackingSession -> { try { // prevent repeated rollback if (rollbackingSession.getStatus() == GlobalStatus.Rollbacking && !rollbackingSession.isDeadSession()) { // The function of this 'return' is 'continue'. return; } if (isRetryTimeout(now, MAX_ROLLBACK_RETRY_TIMEOUT, rollbackingSession.getBeginTime())) { if (ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE) { rollbackingSession.clean(); } SessionHelper.endRollbackFailed(rollbackingSession, true, true); //The function of this 'return' is 'continue'. return; } rollbackingSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager()); core.doGlobalRollback(rollbackingSession, true); } catch (TransactionException ex) { LOGGER.error("Failed to retry rollbacking [{}] {} {}", rollbackingSession.getXid(), ex.getCode(), ex.getMessage()); } }); } /** * Handle retry committing. */ protected void handleRetryCommitting() { SessionCondition retryCommittingSessionCondition = new SessionCondition(retryCommittingStatuses); retryCommittingSessionCondition.setLazyLoadBranch(true); Collection committingSessions = SessionHolder.getRetryCommittingSessionManager().findGlobalSessions(retryCommittingSessionCondition); if (CollectionUtils.isEmpty(committingSessions)) { return; } long now = System.currentTimeMillis(); SessionHelper.forEach(committingSessions, committingSession -> { try { // prevent repeated commit if (GlobalStatus.Committing.equals(committingSession.getStatus()) && !committingSession.isDeadSession()) { // The function of this 'return' is 'continue'. return; } if (isRetryTimeout(now, MAX_COMMIT_RETRY_TIMEOUT, committingSession.getBeginTime())) { // commit retry timeout event SessionHelper.endCommitFailed(committingSession, true, true); //The function of this 'return' is 'continue'. return; } if (GlobalStatus.Committed.equals(committingSession.getStatus()) && committingSession.getBranchSessions().isEmpty()) { SessionHelper.endCommitted(committingSession,true); } committingSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager()); core.doGlobalCommit(committingSession, true); } catch (TransactionException ex) { LOGGER.error("Failed to retry committing [{}] {} {}", committingSession.getXid(), ex.getCode(), ex.getMessage()); } }); } /** * Handle async committing. */ protected void handleAsyncCommitting() { SessionCondition sessionCondition = new SessionCondition(GlobalStatus.AsyncCommitting); Collection asyncCommittingSessions = SessionHolder.getAsyncCommittingSessionManager().findGlobalSessions(sessionCondition); if (CollectionUtils.isEmpty(asyncCommittingSessions)) { return; } SessionHelper.forEach(asyncCommittingSessions, asyncCommittingSession -> { try { asyncCommittingSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager()); core.doGlobalCommit(asyncCommittingSession, true); } catch (TransactionException ex) { LOGGER.error("Failed to async committing [{}] {} {}", asyncCommittingSession.getXid(), ex.getCode(), ex.getMessage(), ex); } }); } /** * Undo log delete. */ protected void undoLogDelete() { Map rmChannels = ChannelManager.getRmChannels(); if (rmChannels == null || rmChannels.isEmpty()) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("no active rm channels to delete undo log"); } return; } short saveDays = CONFIG.getShort(ConfigurationKeys.TRANSACTION_UNDO_LOG_SAVE_DAYS, UndoLogDeleteRequest.DEFAULT_SAVE_DAYS); for (Map.Entry channelEntry : rmChannels.entrySet()) { String resourceId = channelEntry.getKey(); UndoLogDeleteRequest deleteRequest = new UndoLogDeleteRequest(); deleteRequest.setResourceId(resourceId); deleteRequest.setSaveDays(saveDays > 0 ? saveDays : UndoLogDeleteRequest.DEFAULT_SAVE_DAYS); try { remotingServer.sendAsyncRequest(channelEntry.getValue(), deleteRequest); } catch (Exception e) { LOGGER.error("Failed to async delete undo log resourceId = {}, exception: {}", resourceId, e.getMessage()); } } } private boolean isRetryTimeout(long now, long timeout, long beginTime) { return timeout >= ALWAYS_RETRY_BOUNDARY && now - beginTime > timeout; } /** * Init. */ public void init() { retryRollbacking.scheduleAtFixedRate( () -> SessionHolder.distributedLockAndExecute(RETRY_ROLLBACKING, this::handleRetryRollbacking), 0, ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS); retryCommitting.scheduleAtFixedRate( () -> SessionHolder.distributedLockAndExecute(RETRY_COMMITTING, this::handleRetryCommitting), 0, COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS); asyncCommitting.scheduleAtFixedRate( () -> SessionHolder.distributedLockAndExecute(ASYNC_COMMITTING, this::handleAsyncCommitting), 0, ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS); timeoutCheck.scheduleAtFixedRate( () -> SessionHolder.distributedLockAndExecute(TX_TIMEOUT_CHECK, this::timeoutCheck), 0, TIMEOUT_RETRY_PERIOD, TimeUnit.MILLISECONDS); undoLogDelete.scheduleAtFixedRate( () -> SessionHolder.distributedLockAndExecute(UNDOLOG_DELETE, this::undoLogDelete), UNDO_LOG_DELAY_DELETE_PERIOD, UNDO_LOG_DELETE_PERIOD, TimeUnit.MILLISECONDS); } @Override public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) { if (!(request instanceof AbstractTransactionRequestToTC)) { throw new IllegalArgumentException(); } AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request; transactionRequest.setTCInboundHandler(this); return transactionRequest.handle(context); } @Override public void onResponse(AbstractResultMessage response, RpcContext context) { if (!(response instanceof AbstractTransactionResponse)) { throw new IllegalArgumentException(); } } @Override public void destroy() { // 1. first shutdown timed task retryRollbacking.shutdown(); retryCommitting.shutdown(); asyncCommitting.shutdown(); timeoutCheck.shutdown(); undoLogDelete.shutdown(); if (branchRemoveExecutor != null) { branchRemoveExecutor.shutdown(); } try { retryRollbacking.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS); retryCommitting.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS); asyncCommitting.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS); timeoutCheck.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS); undoLogDelete.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS); if (branchRemoveExecutor != null) { branchRemoveExecutor.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS); } } catch (InterruptedException ignore) { } // 2. second close netty flow if (remotingServer instanceof NettyRemotingServer) { ((NettyRemotingServer) remotingServer).destroy(); } // 3. third destroy SessionHolder SessionHolder.destroy(); instance = null; } /** * only used for mock test * @param remotingServer */ public void setRemotingServer(RemotingServer remotingServer) { this.remotingServer = remotingServer; } /** * the task to remove branchSession */ static class BranchRemoveTask implements Runnable { /** * the globalSession */ private final GlobalSession globalSession; /** * the branchSession */ private final BranchSession branchSession; /** * If you use this construct, the task will remove the branchSession provided by the parameter * @param globalSession the globalSession */ public BranchRemoveTask(GlobalSession globalSession, BranchSession branchSession) { this.globalSession = globalSession; if (branchSession == null) { throw new IllegalArgumentException("BranchSession can`t be null!"); } this.branchSession = branchSession; } /** * If you use this construct, the task will remove all branchSession * @param globalSession the globalSession */ public BranchRemoveTask(GlobalSession globalSession) { this.globalSession = globalSession; this.branchSession = null; } @Override public void run() { if (globalSession == null) { return; } try { MDC.put(RootContext.MDC_KEY_XID, globalSession.getXid()); if (branchSession != null) { doRemove(branchSession); } else { globalSession.getSortedBranches().forEach(this::doRemove); } } catch (Exception unKnowException) { LOGGER.error("Asynchronous delete branchSession error, xid = {}", globalSession.getXid(), unKnowException); } finally { MDC.remove(RootContext.MDC_KEY_XID); } } private void doRemove(BranchSession bt) { try { MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(bt.getBranchId())); globalSession.removeBranch(bt); LOGGER.info("Asynchronous delete branchSession successfully, xid = {}, branchId = {}", globalSession.getXid(), bt.getBranchId()); } catch (TransactionException transactionException) { LOGGER.error("Asynchronous delete branchSession error, xid = {}, branchId = {}", globalSession.getXid(), bt.getBranchId(), transactionException); } finally { MDC.remove(RootContext.MDC_KEY_BRANCH_ID); } } } }