/* * Copyright 1999-2019 Seata.io Group. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package io.seata.server.storage.file.session; import io.seata.common.exception.ShouldNeverHappenException; import io.seata.common.loader.LoadLevel; import io.seata.common.loader.Scope; import io.seata.common.util.CollectionUtils; import io.seata.common.util.StringUtils; import io.seata.config.ConfigurationFactory; import io.seata.core.constants.ConfigurationKeys; import io.seata.core.exception.TransactionException; import io.seata.core.model.GlobalStatus; import io.seata.server.session.*; import io.seata.server.storage.file.ReloadableStore; import io.seata.server.storage.file.TransactionWriteStore; import io.seata.server.storage.file.store.FileTransactionStoreManager; import io.seata.server.store.AbstractTransactionStoreManager; import io.seata.server.store.SessionStorable; import io.seata.server.store.TransactionStoreManager; import java.io.File; import java.io.IOException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import static io.seata.common.DefaultValues.DEFAULT_SERVICE_SESSION_RELOAD_READ_SIZE; /** * The type File based session manager. * * @author slievrly */ @LoadLevel(name = "file", scope = Scope.PROTOTYPE) public class FileSessionManager extends AbstractSessionManager implements Reloadable { private static final int READ_SIZE = ConfigurationFactory.getInstance().getInt( ConfigurationKeys.SERVICE_SESSION_RELOAD_READ_SIZE, DEFAULT_SERVICE_SESSION_RELOAD_READ_SIZE); /** * The Session map. */ private Map sessionMap = new ConcurrentHashMap<>(); /** * Instantiates a new File based session manager. * * @param name the name * @param sessionStoreFilePath the session store file path * @throws IOException the io exception */ public FileSessionManager(String name, String sessionStoreFilePath) throws IOException { super(name); if (StringUtils.isNotBlank(sessionStoreFilePath)) { transactionStoreManager = new FileTransactionStoreManager( sessionStoreFilePath + File.separator + name, this); } else { transactionStoreManager = new AbstractTransactionStoreManager() { @Override public boolean writeSession(LogOperation logOperation, SessionStorable session) { return true; } }; } } @Override public void reload() { restoreSessions(); } @Override public void addGlobalSession(GlobalSession session) throws TransactionException { CollectionUtils.computeIfAbsent(sessionMap, session.getXid(), k -> { try { super.addGlobalSession(session); } catch (TransactionException e) { LOGGER.error("addGlobalSession fail, msg: {}", e.getMessage()); } return session; }); } @Override public GlobalSession findGlobalSession(String xid) { return sessionMap.get(xid); } @Override public GlobalSession findGlobalSession(String xid, boolean withBranchSessions) { // withBranchSessions without process in memory return sessionMap.get(xid); } @Override public void removeGlobalSession(GlobalSession session) throws TransactionException { if (sessionMap.remove(session.getXid()) != null) { super.removeGlobalSession(session); } } @Override public Collection allSessions() { return sessionMap.values(); } @Override public List findGlobalSessions(SessionCondition condition) { List found = new ArrayList<>(); List globalStatuses = null; if (null != condition.getStatuses() && condition.getStatuses().length > 0) { globalStatuses = Arrays.asList(condition.getStatuses()); } for (GlobalSession globalSession : sessionMap.values()) { if (null != condition.getOverTimeAliveMills() && condition.getOverTimeAliveMills() > 0) { if (System.currentTimeMillis() - globalSession.getBeginTime() <= condition.getOverTimeAliveMills()) { continue; } } if (!StringUtils.isEmpty(condition.getXid())) { if (Objects.equals(condition.getXid(), globalSession.getXid())) { // Only one will be found, just add and return found.add(globalSession); return found; } else { continue; } } if (null != condition.getTransactionId() && condition.getTransactionId() > 0) { if (Objects.equals(condition.getTransactionId(), globalSession.getTransactionId())) { // Only one will be found, just add and return found.add(globalSession); return found; } else { continue; } } if (null != globalStatuses) { if (!globalStatuses.contains(globalSession.getStatus())) { continue; } } // All test pass, add to resp found.add(globalSession); } return found; } @Override public T lockAndExecute(GlobalSession globalSession, GlobalSession.LockCallable lockCallable) throws TransactionException { globalSession.lock(); try { return lockCallable.call(); } finally { globalSession.unlock(); } } private void restoreSessions() { final Set removedGlobalBuffer = new HashSet<>(); final Map> unhandledBranchBuffer = new HashMap<>(); restoreSessions(true, removedGlobalBuffer, unhandledBranchBuffer); restoreSessions(false, removedGlobalBuffer, unhandledBranchBuffer); if (!unhandledBranchBuffer.isEmpty()) { unhandledBranchBuffer.values().forEach(unhandledBranchSessions -> { unhandledBranchSessions.values().forEach(branchSession -> { String xid = branchSession.getXid(); if (removedGlobalBuffer.contains(xid)) { return; } long bid = branchSession.getBranchId(); GlobalSession found = sessionMap.get(xid); if (found == null) { // Ignore if (LOGGER.isInfoEnabled()) { LOGGER.info("GlobalSession Does Not Exists For BranchSession [" + bid + "/" + xid + "]"); } } else { BranchSession existingBranch = found.getBranch(branchSession.getBranchId()); if (existingBranch == null) { found.add(branchSession); } else { existingBranch.setStatus(branchSession.getStatus()); } } }); }); } } private boolean checkSessionStatus(GlobalSession globalSession) { GlobalStatus globalStatus = globalSession.getStatus(); switch (globalStatus) { case UnKnown: case Committed: case CommitFailed: case Rollbacked: case RollbackFailed: case TimeoutRollbacked: case TimeoutRollbackFailed: case RollbackRetryTimeout: case Finished: return false; default: return true; } } private void restoreSessions(boolean isHistory, Set removedGlobalBuffer, Map> unhandledBranchBuffer) { if (!(transactionStoreManager instanceof ReloadableStore)) { return; } while (((ReloadableStore)transactionStoreManager).hasRemaining(isHistory)) { List stores = ((ReloadableStore)transactionStoreManager).readWriteStore(READ_SIZE, isHistory); restore(stores, removedGlobalBuffer, unhandledBranchBuffer); } } private void restore(List stores, Set removedGlobalBuffer, Map> unhandledBranchBuffer) { for (TransactionWriteStore store : stores) { TransactionStoreManager.LogOperation logOperation = store.getOperate(); SessionStorable sessionStorable = store.getSessionRequest(); switch (logOperation) { case GLOBAL_ADD: case GLOBAL_UPDATE: { GlobalSession globalSession = (GlobalSession)sessionStorable; if (globalSession.getTransactionId() == 0) { LOGGER.error( "Restore globalSession from file failed, the transactionId is zero , xid:" + globalSession .getXid()); break; } if (removedGlobalBuffer.contains(globalSession.getXid())) { break; } GlobalSession foundGlobalSession = sessionMap.get(globalSession.getXid()); if (foundGlobalSession == null) { if (this.checkSessionStatus(globalSession)) { sessionMap.put(globalSession.getXid(), globalSession); } else { removedGlobalBuffer.add(globalSession.getXid()); unhandledBranchBuffer.remove(globalSession.getXid()); } } else { if (this.checkSessionStatus(globalSession)) { foundGlobalSession.setStatus(globalSession.getStatus()); } else { sessionMap.remove(globalSession.getXid()); removedGlobalBuffer.add(globalSession.getXid()); unhandledBranchBuffer.remove(globalSession.getXid()); } } break; } case GLOBAL_REMOVE: { GlobalSession globalSession = (GlobalSession)sessionStorable; if (globalSession.getTransactionId() == 0) { LOGGER.error( "Restore globalSession from file failed, the transactionId is zero , xid:" + globalSession .getXid()); break; } if (removedGlobalBuffer.contains(globalSession.getXid())) { break; } if (sessionMap.remove(globalSession.getXid()) == null) { if (LOGGER.isInfoEnabled()) { LOGGER.info("GlobalSession To Be Removed Does Not Exists [" + globalSession.getXid() + "]"); } } removedGlobalBuffer.add(globalSession.getXid()); unhandledBranchBuffer.remove(globalSession.getXid()); break; } case BRANCH_ADD: case BRANCH_UPDATE: { BranchSession branchSession = (BranchSession)sessionStorable; if (branchSession.getTransactionId() == 0) { LOGGER.error( "Restore branchSession from file failed, the transactionId is zero , xid:" + branchSession .getXid()); break; } if (removedGlobalBuffer.contains(branchSession.getXid())) { break; } GlobalSession foundGlobalSession = sessionMap.get(branchSession.getXid()); if (foundGlobalSession == null) { unhandledBranchBuffer.computeIfAbsent(branchSession.getXid(), key -> new HashMap<>()) .put(branchSession.getBranchId(), branchSession); } else { BranchSession existingBranch = foundGlobalSession.getBranch(branchSession.getBranchId()); if (existingBranch == null) { foundGlobalSession.add(branchSession); } else { existingBranch.setStatus(branchSession.getStatus()); } } break; } case BRANCH_REMOVE: { BranchSession branchSession = (BranchSession)sessionStorable; String xid = branchSession.getXid(); if (removedGlobalBuffer.contains(xid)) { break; } long bid = branchSession.getBranchId(); if (branchSession.getTransactionId() == 0) { LOGGER.error( "Restore branchSession from file failed, the transactionId is zero , xid:" + branchSession .getXid()); break; } GlobalSession found = sessionMap.get(xid); if (found == null) { if (LOGGER.isInfoEnabled()) { LOGGER.info( "GlobalSession To Be Updated (Remove Branch) Does Not Exists [" + bid + "/" + xid + "]"); } } else { BranchSession theBranch = found.getBranch(bid); if (theBranch == null) { if (LOGGER.isInfoEnabled()) { LOGGER.info("BranchSession To Be Updated Does Not Exists [" + bid + "/" + xid + "]"); } } else { found.remove(theBranch); } } break; } default: throw new ShouldNeverHappenException("Unknown Operation: " + logOperation); } } } @Override public void destroy() { transactionStoreManager.shutdown(); } }