/*
|
* Copyright 1999-2019 Seata.io Group.
|
*
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
* you may not use this file except in compliance with the License.
|
* You may obtain a copy of the License at
|
*
|
* http://www.apache.org/licenses/LICENSE-2.0
|
*
|
* Unless required by applicable law or agreed to in writing, software
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
* See the License for the specific language governing permissions and
|
* limitations under the License.
|
*/
|
package io.seata.server.storage.file.session;
|
|
import io.seata.common.exception.ShouldNeverHappenException;
|
import io.seata.common.loader.LoadLevel;
|
import io.seata.common.loader.Scope;
|
import io.seata.common.util.CollectionUtils;
|
import io.seata.common.util.StringUtils;
|
import io.seata.config.ConfigurationFactory;
|
import io.seata.core.constants.ConfigurationKeys;
|
import io.seata.core.exception.TransactionException;
|
import io.seata.core.model.GlobalStatus;
|
import io.seata.server.session.*;
|
import io.seata.server.storage.file.ReloadableStore;
|
import io.seata.server.storage.file.TransactionWriteStore;
|
import io.seata.server.storage.file.store.FileTransactionStoreManager;
|
import io.seata.server.store.AbstractTransactionStoreManager;
|
import io.seata.server.store.SessionStorable;
|
import io.seata.server.store.TransactionStoreManager;
|
|
import java.io.File;
|
import java.io.IOException;
|
import java.util.*;
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import static io.seata.common.DefaultValues.DEFAULT_SERVICE_SESSION_RELOAD_READ_SIZE;
|
|
|
/**
|
* The type File based session manager.
|
*
|
* @author slievrly
|
*/
|
@LoadLevel(name = "file", scope = Scope.PROTOTYPE)
|
public class FileSessionManager extends AbstractSessionManager implements Reloadable {
|
|
private static final int READ_SIZE = ConfigurationFactory.getInstance().getInt(
|
ConfigurationKeys.SERVICE_SESSION_RELOAD_READ_SIZE, DEFAULT_SERVICE_SESSION_RELOAD_READ_SIZE);
|
/**
|
* The Session map.
|
*/
|
private Map<String, GlobalSession> sessionMap = new ConcurrentHashMap<>();
|
|
/**
|
* Instantiates a new File based session manager.
|
*
|
* @param name the name
|
* @param sessionStoreFilePath the session store file path
|
* @throws IOException the io exception
|
*/
|
public FileSessionManager(String name, String sessionStoreFilePath) throws IOException {
|
super(name);
|
if (StringUtils.isNotBlank(sessionStoreFilePath)) {
|
transactionStoreManager = new FileTransactionStoreManager(
|
sessionStoreFilePath + File.separator + name, this);
|
} else {
|
transactionStoreManager = new AbstractTransactionStoreManager() {
|
@Override
|
public boolean writeSession(LogOperation logOperation, SessionStorable session) {
|
return true;
|
}
|
};
|
}
|
}
|
|
@Override
|
public void reload() {
|
restoreSessions();
|
}
|
|
@Override
|
public void addGlobalSession(GlobalSession session) throws TransactionException {
|
CollectionUtils.computeIfAbsent(sessionMap, session.getXid(), k -> {
|
try {
|
super.addGlobalSession(session);
|
} catch (TransactionException e) {
|
LOGGER.error("addGlobalSession fail, msg: {}", e.getMessage());
|
}
|
return session;
|
});
|
}
|
|
@Override
|
public GlobalSession findGlobalSession(String xid) {
|
return sessionMap.get(xid);
|
}
|
|
@Override
|
public GlobalSession findGlobalSession(String xid, boolean withBranchSessions) {
|
// withBranchSessions without process in memory
|
return sessionMap.get(xid);
|
}
|
|
@Override
|
public void removeGlobalSession(GlobalSession session) throws TransactionException {
|
if (sessionMap.remove(session.getXid()) != null) {
|
super.removeGlobalSession(session);
|
}
|
}
|
|
@Override
|
public Collection<GlobalSession> allSessions() {
|
return sessionMap.values();
|
}
|
|
@Override
|
public List<GlobalSession> findGlobalSessions(SessionCondition condition) {
|
List<GlobalSession> found = new ArrayList<>();
|
|
List<GlobalStatus> globalStatuses = null;
|
if (null != condition.getStatuses() && condition.getStatuses().length > 0) {
|
globalStatuses = Arrays.asList(condition.getStatuses());
|
}
|
for (GlobalSession globalSession : sessionMap.values()) {
|
if (null != condition.getOverTimeAliveMills() && condition.getOverTimeAliveMills() > 0) {
|
if (System.currentTimeMillis() - globalSession.getBeginTime() <= condition.getOverTimeAliveMills()) {
|
continue;
|
}
|
}
|
|
if (!StringUtils.isEmpty(condition.getXid())) {
|
if (Objects.equals(condition.getXid(), globalSession.getXid())) {
|
// Only one will be found, just add and return
|
found.add(globalSession);
|
return found;
|
} else {
|
continue;
|
}
|
}
|
|
if (null != condition.getTransactionId() && condition.getTransactionId() > 0) {
|
if (Objects.equals(condition.getTransactionId(), globalSession.getTransactionId())) {
|
// Only one will be found, just add and return
|
found.add(globalSession);
|
return found;
|
} else {
|
continue;
|
}
|
}
|
|
if (null != globalStatuses) {
|
if (!globalStatuses.contains(globalSession.getStatus())) {
|
continue;
|
}
|
}
|
|
// All test pass, add to resp
|
found.add(globalSession);
|
}
|
return found;
|
}
|
|
@Override
|
public <T> T lockAndExecute(GlobalSession globalSession, GlobalSession.LockCallable<T> lockCallable)
|
throws TransactionException {
|
globalSession.lock();
|
try {
|
return lockCallable.call();
|
} finally {
|
globalSession.unlock();
|
}
|
}
|
|
private void restoreSessions() {
|
final Set<String> removedGlobalBuffer = new HashSet<>();
|
final Map<String, Map<Long, BranchSession>> unhandledBranchBuffer = new HashMap<>();
|
|
restoreSessions(true, removedGlobalBuffer, unhandledBranchBuffer);
|
restoreSessions(false, removedGlobalBuffer, unhandledBranchBuffer);
|
|
if (!unhandledBranchBuffer.isEmpty()) {
|
unhandledBranchBuffer.values().forEach(unhandledBranchSessions -> {
|
unhandledBranchSessions.values().forEach(branchSession -> {
|
String xid = branchSession.getXid();
|
if (removedGlobalBuffer.contains(xid)) {
|
return;
|
}
|
|
long bid = branchSession.getBranchId();
|
GlobalSession found = sessionMap.get(xid);
|
if (found == null) {
|
// Ignore
|
if (LOGGER.isInfoEnabled()) {
|
LOGGER.info("GlobalSession Does Not Exists For BranchSession [" + bid + "/" + xid + "]");
|
}
|
} else {
|
BranchSession existingBranch = found.getBranch(branchSession.getBranchId());
|
if (existingBranch == null) {
|
found.add(branchSession);
|
} else {
|
existingBranch.setStatus(branchSession.getStatus());
|
}
|
}
|
});
|
});
|
}
|
}
|
|
private boolean checkSessionStatus(GlobalSession globalSession) {
|
GlobalStatus globalStatus = globalSession.getStatus();
|
switch (globalStatus) {
|
case UnKnown:
|
case Committed:
|
case CommitFailed:
|
case Rollbacked:
|
case RollbackFailed:
|
case TimeoutRollbacked:
|
case TimeoutRollbackFailed:
|
case RollbackRetryTimeout:
|
case Finished:
|
return false;
|
default:
|
return true;
|
}
|
}
|
|
private void restoreSessions(boolean isHistory, Set<String> removedGlobalBuffer, Map<String,
|
Map<Long, BranchSession>> unhandledBranchBuffer) {
|
if (!(transactionStoreManager instanceof ReloadableStore)) {
|
return;
|
}
|
while (((ReloadableStore)transactionStoreManager).hasRemaining(isHistory)) {
|
List<TransactionWriteStore> stores = ((ReloadableStore)transactionStoreManager).readWriteStore(READ_SIZE,
|
isHistory);
|
restore(stores, removedGlobalBuffer, unhandledBranchBuffer);
|
}
|
}
|
|
private void restore(List<TransactionWriteStore> stores, Set<String> removedGlobalBuffer,
|
Map<String, Map<Long, BranchSession>> unhandledBranchBuffer) {
|
for (TransactionWriteStore store : stores) {
|
TransactionStoreManager.LogOperation logOperation = store.getOperate();
|
SessionStorable sessionStorable = store.getSessionRequest();
|
switch (logOperation) {
|
case GLOBAL_ADD:
|
case GLOBAL_UPDATE: {
|
GlobalSession globalSession = (GlobalSession)sessionStorable;
|
if (globalSession.getTransactionId() == 0) {
|
LOGGER.error(
|
"Restore globalSession from file failed, the transactionId is zero , xid:" + globalSession
|
.getXid());
|
break;
|
}
|
if (removedGlobalBuffer.contains(globalSession.getXid())) {
|
break;
|
}
|
GlobalSession foundGlobalSession = sessionMap.get(globalSession.getXid());
|
if (foundGlobalSession == null) {
|
if (this.checkSessionStatus(globalSession)) {
|
sessionMap.put(globalSession.getXid(), globalSession);
|
} else {
|
removedGlobalBuffer.add(globalSession.getXid());
|
unhandledBranchBuffer.remove(globalSession.getXid());
|
}
|
} else {
|
if (this.checkSessionStatus(globalSession)) {
|
foundGlobalSession.setStatus(globalSession.getStatus());
|
} else {
|
sessionMap.remove(globalSession.getXid());
|
removedGlobalBuffer.add(globalSession.getXid());
|
unhandledBranchBuffer.remove(globalSession.getXid());
|
}
|
}
|
break;
|
}
|
case GLOBAL_REMOVE: {
|
GlobalSession globalSession = (GlobalSession)sessionStorable;
|
if (globalSession.getTransactionId() == 0) {
|
LOGGER.error(
|
"Restore globalSession from file failed, the transactionId is zero , xid:" + globalSession
|
.getXid());
|
break;
|
}
|
if (removedGlobalBuffer.contains(globalSession.getXid())) {
|
break;
|
}
|
if (sessionMap.remove(globalSession.getXid()) == null) {
|
if (LOGGER.isInfoEnabled()) {
|
LOGGER.info("GlobalSession To Be Removed Does Not Exists [" + globalSession.getXid() + "]");
|
}
|
}
|
removedGlobalBuffer.add(globalSession.getXid());
|
unhandledBranchBuffer.remove(globalSession.getXid());
|
break;
|
}
|
case BRANCH_ADD:
|
case BRANCH_UPDATE: {
|
BranchSession branchSession = (BranchSession)sessionStorable;
|
if (branchSession.getTransactionId() == 0) {
|
LOGGER.error(
|
"Restore branchSession from file failed, the transactionId is zero , xid:" + branchSession
|
.getXid());
|
break;
|
}
|
if (removedGlobalBuffer.contains(branchSession.getXid())) {
|
break;
|
}
|
GlobalSession foundGlobalSession = sessionMap.get(branchSession.getXid());
|
if (foundGlobalSession == null) {
|
unhandledBranchBuffer.computeIfAbsent(branchSession.getXid(), key -> new HashMap<>())
|
.put(branchSession.getBranchId(), branchSession);
|
} else {
|
BranchSession existingBranch = foundGlobalSession.getBranch(branchSession.getBranchId());
|
if (existingBranch == null) {
|
foundGlobalSession.add(branchSession);
|
} else {
|
existingBranch.setStatus(branchSession.getStatus());
|
}
|
}
|
break;
|
}
|
case BRANCH_REMOVE: {
|
BranchSession branchSession = (BranchSession)sessionStorable;
|
String xid = branchSession.getXid();
|
if (removedGlobalBuffer.contains(xid)) {
|
break;
|
}
|
long bid = branchSession.getBranchId();
|
if (branchSession.getTransactionId() == 0) {
|
LOGGER.error(
|
"Restore branchSession from file failed, the transactionId is zero , xid:" + branchSession
|
.getXid());
|
break;
|
}
|
GlobalSession found = sessionMap.get(xid);
|
if (found == null) {
|
if (LOGGER.isInfoEnabled()) {
|
LOGGER.info(
|
"GlobalSession To Be Updated (Remove Branch) Does Not Exists [" + bid + "/" + xid
|
+ "]");
|
}
|
} else {
|
BranchSession theBranch = found.getBranch(bid);
|
if (theBranch == null) {
|
if (LOGGER.isInfoEnabled()) {
|
LOGGER.info("BranchSession To Be Updated Does Not Exists [" + bid + "/" + xid + "]");
|
}
|
} else {
|
found.remove(theBranch);
|
}
|
}
|
break;
|
}
|
default:
|
throw new ShouldNeverHappenException("Unknown Operation: " + logOperation);
|
}
|
}
|
|
}
|
|
@Override
|
public void destroy() {
|
transactionStoreManager.shutdown();
|
}
|
|
}
|