/*
|
* Copyright 1999-2019 Seata.io Group.
|
*
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
* you may not use this file except in compliance with the License.
|
* You may obtain a copy of the License at
|
*
|
* http://www.apache.org/licenses/LICENSE-2.0
|
*
|
* Unless required by applicable law or agreed to in writing, software
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
* See the License for the specific language governing permissions and
|
* limitations under the License.
|
*/
|
package io.seata.server.coordinator;
|
|
import io.seata.common.DefaultValues;
|
import io.seata.common.exception.NotSupportYetException;
|
import io.seata.common.loader.EnhancedServiceLoader;
|
import io.seata.common.util.CollectionUtils;
|
import io.seata.config.ConfigurationFactory;
|
import io.seata.core.context.RootContext;
|
import io.seata.core.exception.TransactionException;
|
import io.seata.core.logger.StackTraceLogger;
|
import io.seata.core.model.BranchStatus;
|
import io.seata.core.model.BranchType;
|
import io.seata.core.model.GlobalStatus;
|
import io.seata.core.rpc.RemotingServer;
|
import io.seata.server.metrics.MetricsPublisher;
|
import io.seata.server.session.BranchSession;
|
import io.seata.server.session.GlobalSession;
|
import io.seata.server.session.SessionHelper;
|
import io.seata.server.session.SessionHolder;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
import org.slf4j.MDC;
|
|
import java.util.List;
|
import java.util.Map;
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import static io.seata.core.constants.ConfigurationKeys.XAER_NOTA_RETRY_TIMEOUT;
|
import static io.seata.server.session.BranchSessionHandler.CONTINUE;
|
|
/**
|
* The type Default core.
|
*
|
* @author sharajava
|
*/
|
public class DefaultCore implements Core {
|
|
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultCore.class);
|
|
private static final int RETRY_XAER_NOTA_TIMEOUT = ConfigurationFactory.getInstance().getInt(XAER_NOTA_RETRY_TIMEOUT,
|
DefaultValues.DEFAULT_XAER_NOTA_RETRY_TIMEOUT);
|
|
private static Map<BranchType, AbstractCore> coreMap = new ConcurrentHashMap<>();
|
|
/**
|
* get the Default core.
|
*
|
* @param remotingServer the remoting server
|
*/
|
public DefaultCore(RemotingServer remotingServer) {
|
List<AbstractCore> allCore = EnhancedServiceLoader.loadAll(AbstractCore.class,
|
new Class[] {RemotingServer.class}, new Object[] {remotingServer});
|
if (CollectionUtils.isNotEmpty(allCore)) {
|
for (AbstractCore core : allCore) {
|
coreMap.put(core.getHandleBranchType(), core);
|
}
|
}
|
}
|
|
/**
|
* get core
|
*
|
* @param branchType the branchType
|
* @return the core
|
*/
|
public AbstractCore getCore(BranchType branchType) {
|
AbstractCore core = coreMap.get(branchType);
|
if (core == null) {
|
throw new NotSupportYetException("unsupported type:" + branchType.name());
|
}
|
return core;
|
}
|
|
/**
|
* only for mock
|
*
|
* @param branchType the branchType
|
* @param core the core
|
*/
|
public void mockCore(BranchType branchType, AbstractCore core) {
|
coreMap.put(branchType, core);
|
}
|
|
@Override
|
public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,
|
String applicationData, String lockKeys) throws TransactionException {
|
return getCore(branchType).branchRegister(branchType, resourceId, clientId, xid,
|
applicationData, lockKeys);
|
}
|
|
@Override
|
public void branchReport(BranchType branchType, String xid, long branchId, BranchStatus status,
|
String applicationData) throws TransactionException {
|
getCore(branchType).branchReport(branchType, xid, branchId, status, applicationData);
|
}
|
|
@Override
|
public boolean lockQuery(BranchType branchType, String resourceId, String xid, String lockKeys)
|
throws TransactionException {
|
return getCore(branchType).lockQuery(branchType, resourceId, xid, lockKeys);
|
}
|
|
@Override
|
public BranchStatus branchCommit(GlobalSession globalSession, BranchSession branchSession) throws TransactionException {
|
return getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);
|
}
|
|
@Override
|
public BranchStatus branchRollback(GlobalSession globalSession, BranchSession branchSession) throws TransactionException {
|
return getCore(branchSession.getBranchType()).branchRollback(globalSession, branchSession);
|
}
|
|
@Override
|
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
|
throws TransactionException {
|
GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name, timeout);
|
MDC.put(RootContext.MDC_KEY_XID, session.getXid());
|
session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
|
|
session.begin();
|
|
// transaction start event
|
MetricsPublisher.postSessionDoingEvent(session, false);
|
|
return session.getXid();
|
}
|
|
|
|
@Override
|
public GlobalStatus commit(String xid) throws TransactionException {
|
GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
|
if (globalSession == null) {
|
return GlobalStatus.Finished;
|
}
|
|
if (globalSession.isTimeout()) {
|
LOGGER.info("TC detected timeout, xid = {}", globalSession.getXid());
|
return GlobalStatus.TimeoutRollbacking;
|
}
|
|
globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
|
// just lock changeStatus
|
|
boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {
|
if (globalSession.getStatus() == GlobalStatus.Begin) {
|
// Highlight: Firstly, close the session, then no more branch can be registered.
|
globalSession.closeAndClean();
|
if (globalSession.canBeCommittedAsync()) {
|
globalSession.asyncCommit();
|
MetricsPublisher.postSessionDoneEvent(globalSession, GlobalStatus.Committed, false, false);
|
return false;
|
} else {
|
globalSession.changeGlobalStatus(GlobalStatus.Committing);
|
return true;
|
}
|
}
|
return false;
|
});
|
|
if (shouldCommit) {
|
boolean success = doGlobalCommit(globalSession, false);
|
//If successful and all remaining branches can be committed asynchronously, do async commit.
|
if (success && globalSession.hasBranch() && globalSession.canBeCommittedAsync()) {
|
globalSession.asyncCommit();
|
return GlobalStatus.Committed;
|
} else {
|
return globalSession.getStatus();
|
}
|
} else {
|
return globalSession.getStatus() == GlobalStatus.AsyncCommitting ? GlobalStatus.Committed : globalSession.getStatus();
|
}
|
}
|
|
@Override
|
public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {
|
boolean success = true;
|
// start committing event
|
MetricsPublisher.postSessionDoingEvent(globalSession, retrying);
|
|
if (globalSession.isSaga()) {
|
success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying);
|
} else {
|
Boolean result = SessionHelper.forEach(globalSession.getSortedBranches(), branchSession -> {
|
// if not retrying, skip the canBeCommittedAsync branches
|
if (!retrying && branchSession.canBeCommittedAsync()) {
|
return CONTINUE;
|
}
|
|
BranchStatus currentStatus = branchSession.getStatus();
|
if (currentStatus == BranchStatus.PhaseOne_Failed) {
|
SessionHelper.removeBranch(globalSession, branchSession, !retrying);
|
return CONTINUE;
|
}
|
try {
|
BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);
|
if (isXaerNotaTimeout(globalSession,branchStatus)) {
|
LOGGER.info("Commit branch XAER_NOTA retry timeout, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
|
branchStatus = BranchStatus.PhaseTwo_Committed;
|
}
|
switch (branchStatus) {
|
case PhaseTwo_Committed:
|
SessionHelper.removeBranch(globalSession, branchSession, !retrying);
|
LOGGER.info("Commit branch transaction successfully, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
|
return CONTINUE;
|
case PhaseTwo_CommitFailed_Unretryable:
|
//not at branch
|
SessionHelper.endCommitFailed(globalSession, retrying);
|
LOGGER.error("Committing global transaction[{}] finally failed, caused by branch transaction[{}] commit failed.", globalSession.getXid(), branchSession.getBranchId());
|
return false;
|
|
default:
|
if (!retrying) {
|
globalSession.queueToRetryCommit();
|
return false;
|
}
|
if (globalSession.canBeCommittedAsync()) {
|
LOGGER.error("Committing branch transaction[{}], status:{} and will retry later",
|
branchSession.getBranchId(), branchStatus);
|
return CONTINUE;
|
} else {
|
LOGGER.error(
|
"Committing global transaction[{}] failed, caused by branch transaction[{}] commit failed, will retry later.", globalSession.getXid(), branchSession.getBranchId());
|
return false;
|
}
|
}
|
} catch (Exception ex) {
|
StackTraceLogger.error(LOGGER, ex, "Committing branch transaction exception: {}",
|
new String[] {branchSession.toString()});
|
if (!retrying) {
|
globalSession.queueToRetryCommit();
|
throw new TransactionException(ex);
|
}
|
}
|
return CONTINUE;
|
});
|
// Return if the result is not null
|
if (result != null) {
|
return result;
|
}
|
//If has branch and not all remaining branches can be committed asynchronously,
|
//do print log and return false
|
if (globalSession.hasBranch() && !globalSession.canBeCommittedAsync()) {
|
LOGGER.info("Committing global transaction is NOT done, xid = {}.", globalSession.getXid());
|
return false;
|
}
|
}
|
// if it succeeds and there is no branch, retrying=true is the asynchronous state when retrying. EndCommitted is
|
// executed to improve concurrency performance, and the global transaction ends..
|
if (success && globalSession.getBranchSessions().isEmpty()) {
|
if (!retrying) {
|
//contains not AT branch
|
globalSession.setStatus(GlobalStatus.Committed);
|
}
|
SessionHelper.endCommitted(globalSession, retrying);
|
LOGGER.info("Committing global transaction is successfully done, xid = {}.", globalSession.getXid());
|
}
|
return success;
|
}
|
|
@Override
|
public GlobalStatus rollback(String xid) throws TransactionException {
|
GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
|
if (globalSession == null) {
|
return GlobalStatus.Finished;
|
}
|
globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
|
// just lock changeStatus
|
boolean shouldRollBack = SessionHolder.lockAndExecute(globalSession, () -> {
|
globalSession.close(); // Highlight: Firstly, close the session, then no more branch can be registered.
|
if (globalSession.getStatus() == GlobalStatus.Begin) {
|
globalSession.changeGlobalStatus(GlobalStatus.Rollbacking);
|
return true;
|
}
|
return false;
|
});
|
if (!shouldRollBack) {
|
return globalSession.getStatus();
|
}
|
|
boolean rollbackSuccess = doGlobalRollback(globalSession, false);
|
return rollbackSuccess ? GlobalStatus.Rollbacked : globalSession.getStatus();
|
}
|
|
@Override
|
public boolean doGlobalRollback(GlobalSession globalSession, boolean retrying) throws TransactionException {
|
boolean success = true;
|
// start rollback event
|
MetricsPublisher.postSessionDoingEvent(globalSession, retrying);
|
|
if (globalSession.isSaga()) {
|
success = getCore(BranchType.SAGA).doGlobalRollback(globalSession, retrying);
|
} else {
|
Boolean result = SessionHelper.forEach(globalSession.getReverseSortedBranches(), branchSession -> {
|
BranchStatus currentBranchStatus = branchSession.getStatus();
|
if (currentBranchStatus == BranchStatus.PhaseOne_Failed) {
|
SessionHelper.removeBranch(globalSession, branchSession, !retrying);
|
return CONTINUE;
|
}
|
try {
|
BranchStatus branchStatus = branchRollback(globalSession, branchSession);
|
if (isXaerNotaTimeout(globalSession, branchStatus)) {
|
LOGGER.info("Rollback branch XAER_NOTA retry timeout, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
|
branchStatus = BranchStatus.PhaseTwo_Rollbacked;
|
}
|
switch (branchStatus) {
|
case PhaseTwo_Rollbacked:
|
SessionHelper.removeBranch(globalSession, branchSession, !retrying);
|
LOGGER.info("Rollback branch transaction successfully, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
|
return CONTINUE;
|
case PhaseTwo_RollbackFailed_Unretryable:
|
SessionHelper.endRollbackFailed(globalSession, retrying);
|
LOGGER.error("Rollback branch transaction fail and stop retry, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
|
return false;
|
default:
|
LOGGER.error("Rollback branch transaction fail and will retry, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
|
if (!retrying) {
|
globalSession.queueToRetryRollback();
|
}
|
return false;
|
}
|
} catch (Exception ex) {
|
StackTraceLogger.error(LOGGER, ex,
|
"Rollback branch transaction exception, xid = {} branchId = {} exception = {}",
|
new String[] {globalSession.getXid(), String.valueOf(branchSession.getBranchId()), ex.getMessage()});
|
if (!retrying) {
|
globalSession.queueToRetryRollback();
|
}
|
throw new TransactionException(ex);
|
}
|
});
|
// Return if the result is not null
|
if (result != null) {
|
return result;
|
}
|
}
|
|
// In db mode, lock and branch data residual problems may occur.
|
// Therefore, execution needs to be delayed here and cannot be executed synchronously.
|
if (success) {
|
SessionHelper.endRollbacked(globalSession, retrying);
|
LOGGER.info("Rollback global transaction successfully, xid = {}.", globalSession.getXid());
|
}
|
return success;
|
}
|
|
@Override
|
public GlobalStatus getStatus(String xid) throws TransactionException {
|
GlobalSession globalSession = SessionHolder.findGlobalSession(xid, false);
|
if (globalSession == null) {
|
return GlobalStatus.Finished;
|
} else {
|
return globalSession.getStatus();
|
}
|
}
|
|
@Override
|
public GlobalStatus globalReport(String xid, GlobalStatus globalStatus) throws TransactionException {
|
GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
|
if (globalSession == null) {
|
return globalStatus;
|
}
|
globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
|
doGlobalReport(globalSession, xid, globalStatus);
|
return globalSession.getStatus();
|
}
|
|
@Override
|
public void doGlobalReport(GlobalSession globalSession, String xid, GlobalStatus globalStatus) throws TransactionException {
|
if (globalSession.isSaga()) {
|
getCore(BranchType.SAGA).doGlobalReport(globalSession, xid, globalStatus);
|
}
|
}
|
|
private boolean isXaerNotaTimeout(GlobalSession globalSession, BranchStatus branchStatus) {
|
if (BranchStatus.PhaseTwo_CommitFailed_XAER_NOTA_Retryable.equals(branchStatus) ||
|
BranchStatus.PhaseTwo_RollbackFailed_XAER_NOTA_Retryable.equals(branchStatus)) {
|
return System.currentTimeMillis() > globalSession.getBeginTime() + globalSession.getTimeout() +
|
Math.max(RETRY_XAER_NOTA_TIMEOUT, globalSession.getTimeout());
|
} else {
|
return false;
|
}
|
}
|
}
|