/* * Copyright 1999-2019 Seata.io Group. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package io.seata.server.storage.redis.store; import com.google.common.collect.ImmutableMap; import io.seata.common.XID; import io.seata.common.exception.RedisException; import io.seata.common.exception.StoreException; import io.seata.common.util.BeanUtils; import io.seata.common.util.CollectionUtils; import io.seata.common.util.StringUtils; import io.seata.config.Configuration; import io.seata.config.ConfigurationFactory; import io.seata.core.model.GlobalStatus; import io.seata.core.store.BranchTransactionDO; import io.seata.core.store.GlobalTransactionDO; import io.seata.server.console.param.GlobalSessionParam; import io.seata.server.session.GlobalSession; import io.seata.server.session.SessionCondition; import io.seata.server.session.SessionStatusValidator; import io.seata.server.storage.SessionConverter; import io.seata.server.storage.redis.JedisPooledFactory; import io.seata.server.store.AbstractTransactionStoreManager; import io.seata.server.store.SessionStorable; import io.seata.server.store.TransactionStoreManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.Jedis; import redis.clients.jedis.Pipeline; import redis.clients.jedis.Transaction; import java.util.*; import java.util.function.Function; import java.util.stream.Collectors; import static io.seata.common.ConfigurationKeys.STORE_REDIS_QUERY_LIMIT; import static io.seata.common.DefaultValues.DEFAULT_QUERY_LIMIT; import static io.seata.core.constants.RedisKeyConstants.*; /** * The redis transaction store manager * * @author funkye * @author wangzhongxiang * @author doubleDimple */ public class RedisTransactionStoreManager extends AbstractTransactionStoreManager implements TransactionStoreManager { private static final Logger LOGGER = LoggerFactory.getLogger(RedisTransactionStoreManager.class); /**the prefix of the branch transactions*/ private static final String REDIS_SEATA_BRANCHES_PREFIX = "SEATA_BRANCHES_"; /**the prefix of the branch transaction*/ private static final String REDIS_SEATA_BRANCH_PREFIX = "SEATA_BRANCH_"; /**the prefix of the global transaction*/ private static final String REDIS_SEATA_GLOBAL_PREFIX = "SEATA_GLOBAL_"; /**the prefix of the global transaction status*/ private static final String REDIS_SEATA_STATUS_PREFIX = "SEATA_STATUS_"; /**the key of global transaction status for begin*/ private static final String REDIS_SEATA_BEGIN_TRANSACTIONS_KEY = "SEATA_BEGIN_TRANSACTIONS"; private static volatile RedisTransactionStoreManager instance; private static final String OK = "OK"; /** * The constant CONFIG. */ private static final Configuration CONFIG = ConfigurationFactory.getInstance(); /** * The Log query limit. */ private int logQueryLimit; /** * Get the instance. */ public static RedisTransactionStoreManager getInstance() { if (instance == null) { synchronized (RedisTransactionStoreManager.class) { if (instance == null) { instance = new RedisTransactionStoreManager(); } } } return instance; } /** * init map to constructor */ public RedisTransactionStoreManager() { super(); initGlobalMap(); initBranchMap(); logQueryLimit = CONFIG.getInt(STORE_REDIS_QUERY_LIMIT, DEFAULT_QUERY_LIMIT); } /** * Map for LogOperation Global Operation */ public static volatile ImmutableMap> globalMap; /** * Map for LogOperation Branch Operation */ public static volatile ImmutableMap> branchMap; /** * init globalMap * */ public void initGlobalMap() { if (CollectionUtils.isEmpty(branchMap)) { globalMap = ImmutableMap.>builder() .put(LogOperation.GLOBAL_ADD, this::insertGlobalTransactionDO) .put(LogOperation.GLOBAL_UPDATE, this::updateGlobalTransactionDO) .put(LogOperation.GLOBAL_REMOVE, this::deleteGlobalTransactionDO) .build(); } } /** * init branchMap * */ public void initBranchMap() { if (CollectionUtils.isEmpty(branchMap)) { branchMap = ImmutableMap.>builder() .put(LogOperation.BRANCH_ADD, this::insertBranchTransactionDO) .put(LogOperation.BRANCH_UPDATE, this::updateBranchTransactionDO) .put(LogOperation.BRANCH_REMOVE, this::deleteBranchTransactionDO) .build(); } } @Override public boolean writeSession(LogOperation logOperation, SessionStorable session) { if (globalMap.containsKey(logOperation) || branchMap.containsKey(logOperation)) { return globalMap.containsKey(logOperation) ? globalMap.get(logOperation).apply(SessionConverter.convertGlobalTransactionDO(session)) : branchMap.get(logOperation).apply(SessionConverter.convertBranchTransactionDO(session)); } else { throw new StoreException("Unknown LogOperation:" + logOperation.name()); } } /** * Insert branch transaction * @param branchTransactionDO * @return the boolean */ private boolean insertBranchTransactionDO(BranchTransactionDO branchTransactionDO) { String branchKey = buildBranchKey(branchTransactionDO.getBranchId()); String branchListKey = buildBranchListKeyByXid(branchTransactionDO.getXid()); try (Jedis jedis = JedisPooledFactory.getJedisInstance(); Pipeline pipelined = jedis.pipelined()) { Date now = new Date(); branchTransactionDO.setGmtCreate(now); branchTransactionDO.setGmtModified(now); pipelined.hmset(branchKey, BeanUtils.objectToMap(branchTransactionDO)); pipelined.rpush(branchListKey, branchKey); pipelined.sync(); return true; } catch (Exception ex) { throw new RedisException(ex); } } /** * Delete the branch transaction * @param branchTransactionDO * @return */ private boolean deleteBranchTransactionDO(BranchTransactionDO branchTransactionDO) { String branchKey = buildBranchKey(branchTransactionDO.getBranchId()); try (Jedis jedis = JedisPooledFactory.getJedisInstance()) { String xid = jedis.hget(branchKey, REDIS_KEY_BRANCH_XID); if (StringUtils.isEmpty(xid)) { return true; } String branchListKey = buildBranchListKeyByXid(branchTransactionDO.getXid()); try (Pipeline pipelined = jedis.pipelined()) { pipelined.lrem(branchListKey, 0, branchKey); pipelined.del(branchKey); pipelined.sync(); } return true; } catch (Exception ex) { throw new RedisException(ex); } } /** * Update the branch transaction * @param branchTransactionDO * @return */ private boolean updateBranchTransactionDO(BranchTransactionDO branchTransactionDO) { String branchKey = buildBranchKey(branchTransactionDO.getBranchId()); try (Jedis jedis = JedisPooledFactory.getJedisInstance()) { String previousBranchStatus = jedis.hget(branchKey, REDIS_KEY_BRANCH_STATUS); if (StringUtils.isEmpty(previousBranchStatus)) { throw new StoreException("Branch transaction is not exist, update branch transaction failed."); } Map map = new HashMap<>(3, 1); map.put(REDIS_KEY_BRANCH_STATUS, String.valueOf(branchTransactionDO.getStatus())); map.put(REDIS_KEY_BRANCH_GMT_MODIFIED, String.valueOf((new Date()).getTime())); if (StringUtils.isNotBlank(branchTransactionDO.getApplicationData())) { map.put(REDIS_KEY_BRANCH_APPLICATION_DATA, String.valueOf(branchTransactionDO.getApplicationData())); } jedis.hmset(branchKey, map); return true; } catch (Exception ex) { throw new RedisException(ex); } } /** * Insert the global transaction. * @param globalTransactionDO * @return */ private boolean insertGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) { String globalKey = buildGlobalKeyByTransactionId(globalTransactionDO.getTransactionId()); try (Jedis jedis = JedisPooledFactory.getJedisInstance(); Pipeline pipelined = jedis.pipelined()) { Date now = new Date(); globalTransactionDO.setGmtCreate(now); globalTransactionDO.setGmtModified(now); pipelined.hmset(globalKey, BeanUtils.objectToMap(globalTransactionDO)); String xid = globalTransactionDO.getXid(); pipelined.rpush(buildGlobalStatus(globalTransactionDO.getStatus()), xid); pipelined.zadd(REDIS_SEATA_BEGIN_TRANSACTIONS_KEY, globalTransactionDO.getBeginTime() + globalTransactionDO.getTimeout(), globalKey); pipelined.sync(); return true; } catch (Exception ex) { throw new RedisException(ex); } } /** * Delete the global transaction. * It will operate two parts: * 1.delete the global session map * 2.remove the xid from the global status list * If the operate failed,the succeed operates will rollback * @param globalTransactionDO * @return */ private boolean deleteGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) { String globalKey = buildGlobalKeyByTransactionId(globalTransactionDO.getTransactionId()); try (Jedis jedis = JedisPooledFactory.getJedisInstance()) { String xid = jedis.hget(globalKey, REDIS_KEY_GLOBAL_XID); if (StringUtils.isEmpty(xid)) { LOGGER.warn("Global transaction is not exist,xid = {}.Maybe has been deleted by another tc server", globalTransactionDO.getXid()); return true; } try (Pipeline pipelined = jedis.pipelined()) { pipelined.lrem(buildGlobalStatus(globalTransactionDO.getStatus()), 0, globalTransactionDO.getXid()); pipelined.del(globalKey); if (GlobalStatus.Begin.getCode() == globalTransactionDO.getStatus() || GlobalStatus.UnKnown.getCode() == globalTransactionDO.getStatus()) { pipelined.zrem(REDIS_SEATA_BEGIN_TRANSACTIONS_KEY, globalKey); } pipelined.sync(); } return true; } catch (Exception ex) { throw new RedisException(ex); } } /** * Update the global transaction. * It will update two parts: * 1.the global session map * 2.the global status list * If the update failed,the succeed operates will rollback * @param globalTransactionDO * @return */ private boolean updateGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) { String xid = globalTransactionDO.getXid(); String globalKey = buildGlobalKeyByTransactionId(globalTransactionDO.getTransactionId()); try (Jedis jedis = JedisPooledFactory.getJedisInstance()) { // Defensive watch to prevent other TC server operating concurrently,Fail fast jedis.watch(globalKey); List statusAndGmtModified = jedis.hmget(globalKey, REDIS_KEY_GLOBAL_STATUS, REDIS_KEY_GLOBAL_GMT_MODIFIED); String previousStatus = statusAndGmtModified.get(0); if (StringUtils.isEmpty(previousStatus)) { jedis.unwatch(); throw new StoreException("Global transaction is not exist, update global transaction failed."); } if (previousStatus.equals(String.valueOf(globalTransactionDO.getStatus()))) { jedis.unwatch(); return true; } GlobalStatus before = GlobalStatus.get(Integer.parseInt(previousStatus)); GlobalStatus after = GlobalStatus.get(globalTransactionDO.getStatus()); if (!SessionStatusValidator.validateUpdateStatus(before, after)) { throw new StoreException("Illegal changing of global status, update global transaction failed." + " beforeStatus[" + before.name() + "] cannot be changed to afterStatus[" + after.name() + "]"); } String previousGmtModified = statusAndGmtModified.get(1); Transaction multi = jedis.multi(); Map map = new HashMap<>(2); map.put(REDIS_KEY_GLOBAL_STATUS,String.valueOf(globalTransactionDO.getStatus())); map.put(REDIS_KEY_GLOBAL_GMT_MODIFIED,String.valueOf((new Date()).getTime())); multi.hmset(globalKey, map); multi.lrem(buildGlobalStatus(Integer.valueOf(previousStatus)), 0, xid); multi.rpush(buildGlobalStatus(globalTransactionDO.getStatus()), xid); multi.zrem(REDIS_SEATA_BEGIN_TRANSACTIONS_KEY, globalKey); List exec = multi.exec(); if (CollectionUtils.isEmpty(exec)) { //The data has changed by another tc, so we still think the modification is successful. LOGGER.warn("The global transaction xid = {}, maybe changed by another TC. It does not affect the results",globalTransactionDO.getXid()); return true; } String hmset = exec.get(0).toString(); long lrem = (long)exec.get(1); long rpush = (long)exec.get(2); if (OK.equalsIgnoreCase(hmset) && lrem > 0 && rpush > 0) { return true; } else { // If someone failed, the succeed operations need rollback if (OK.equalsIgnoreCase(hmset)) { // Defensive watch to prevent other TC server operating concurrently,give up this operate jedis.watch(globalKey); String xid2 = jedis.hget(globalKey, REDIS_KEY_GLOBAL_XID); if (StringUtils.isNotEmpty(xid2)) { Map mapPrevious = new HashMap<>(2,1); mapPrevious.put(REDIS_KEY_GLOBAL_STATUS,previousStatus); mapPrevious.put(REDIS_KEY_GLOBAL_GMT_MODIFIED,previousGmtModified); Transaction multi2 = jedis.multi(); multi2.hmset(globalKey,mapPrevious); multi2.exec(); } } if (lrem > 0) { jedis.rpush(buildGlobalStatus(Integer.valueOf(previousStatus)), xid); } if (rpush > 0) { jedis.lrem(buildGlobalStatus(globalTransactionDO.getStatus()), 0, xid); } return false; } } catch (Exception ex) { throw new RedisException(ex); } } /** * Read session global session. * * @param xid the xid * @param withBranchSessions the withBranchSessions * @return the global session */ @Override public GlobalSession readSession(String xid, boolean withBranchSessions) { String transactionId = String.valueOf(XID.getTransactionId(xid)); String globalKey = buildGlobalKeyByTransactionId(transactionId); try (Jedis jedis = JedisPooledFactory.getJedisInstance()) { Map map = jedis.hgetAll(globalKey); if (CollectionUtils.isEmpty(map)) { return null; } GlobalTransactionDO globalTransactionDO = (GlobalTransactionDO)BeanUtils.mapToObject(map, GlobalTransactionDO.class); List branchTransactionDOs = null; if (withBranchSessions) { branchTransactionDOs = this.readBranchSessionByXid(jedis, xid); } GlobalSession session = getGlobalSession(globalTransactionDO, branchTransactionDOs, withBranchSessions); return session; } } /** * Read session global session. * * @param xid * the xid * @return the global session */ @Override public GlobalSession readSession(String xid) { return this.readSession(xid, true); } /** * Read globalSession list by global status * * @param statuses the statuses * @return the list */ @Override public List readSession(GlobalStatus[] statuses, boolean withBranchSessions) { List globalSessions = Collections.synchronizedList(new ArrayList<>()); List statusKeys = convertStatusKeys(statuses); Map targetMap = calculateStatuskeysHasData(statusKeys); if (targetMap.size() == 0 || logQueryLimit <= 0) { return globalSessions; } int perStatusLimit = resetLogQueryLimit(targetMap); final long countGlobalSessions = targetMap.values().stream().collect(Collectors.summarizingInt(Integer::intValue)).getSum(); // queryCount final long queryCount = Math.min(logQueryLimit, countGlobalSessions); List> list = new ArrayList<>(); dogetXidsForTargetMapRecursive(targetMap, 0L, perStatusLimit - 1, queryCount, list); if (CollectionUtils.isNotEmpty(list)) { List xids = list.stream().flatMap(Collection::stream).collect(Collectors.toList()); xids.parallelStream().forEach(xid -> { GlobalSession globalSession = this.readSession(xid, withBranchSessions); if (globalSession != null) { globalSessions.add(globalSession); } }); } return globalSessions; } @Override public List readSortByTimeoutBeginSessions(boolean withBranchSessions) { List list = Collections.emptyList(); List statusKeys = convertStatusKeys(GlobalStatus.Begin); Map targetMap = calculateStatuskeysHasData(statusKeys); if (targetMap.size() == 0 || logQueryLimit <= 0) { return list; } final long countGlobalSessions = targetMap.values().stream().collect(Collectors.summarizingInt(Integer::intValue)).getSum(); // queryCount final long queryCount = Math.min(logQueryLimit, countGlobalSessions); try (Jedis jedis = JedisPooledFactory.getJedisInstance()) { Set values = jedis.zrangeByScore(REDIS_SEATA_BEGIN_TRANSACTIONS_KEY, 0, System.currentTimeMillis(), 0, (int) queryCount); List> rep; try (Pipeline pipeline = jedis.pipelined()) { for (String value : values) { pipeline.hgetAll(value); } rep = (List>) (List) pipeline.syncAndReturnAll(); } list = rep.stream().map(map -> { GlobalTransactionDO globalTransactionDO = (GlobalTransactionDO) BeanUtils.mapToObject(map, GlobalTransactionDO.class); if (globalTransactionDO != null) { String xid = globalTransactionDO.getXid(); List branchTransactionDOs = new ArrayList<>(); if (withBranchSessions) { branchTransactionDOs = this.readBranchSessionByXid(jedis, xid); } return getGlobalSession(globalTransactionDO, branchTransactionDOs, withBranchSessions); } return null; }).filter(Objects::nonNull).collect(Collectors.toList()); } return list; } /** * get everyone keys limit * * @param targetMap * @return */ private int resetLogQueryLimit(Map targetMap) { int resetLimitQuery = logQueryLimit; if (targetMap.size() > 1) { int size = targetMap.size(); resetLimitQuery = (logQueryLimit / size) == 0 ? 1 : (logQueryLimit / size); } return resetLimitQuery; } /** * read the global session list by different condition * @param sessionCondition the session condition * @return the global sessions */ @Override public List readSession(SessionCondition sessionCondition) { List globalSessions = new ArrayList<>(); if (StringUtils.isNotEmpty(sessionCondition.getXid())) { GlobalSession globalSession = this.readSession(sessionCondition.getXid(), !sessionCondition.isLazyLoadBranch()); if (globalSession != null) { globalSessions.add(globalSession); } return globalSessions; } else if (sessionCondition.getTransactionId() != null) { GlobalSession globalSession = this .readSessionByTransactionId(sessionCondition.getTransactionId().toString(), !sessionCondition.isLazyLoadBranch()); if (globalSession != null) { globalSessions.add(globalSession); } return globalSessions; } else if (CollectionUtils.isNotEmpty(sessionCondition.getStatuses())) { if (sessionCondition.getStatuses().length == 1 && sessionCondition.getStatuses()[0] == GlobalStatus.Begin) { return this.readSortByTimeoutBeginSessions(!sessionCondition.isLazyLoadBranch()); } else { return readSession(sessionCondition.getStatuses(), !sessionCondition.isLazyLoadBranch()); } } return null; } /** * query GlobalSession by status with page * * @param param * @return List */ public List readSessionStatusByPage(GlobalSessionParam param) { List globalSessions = new ArrayList<>(); int pageNum = param.getPageNum(); int pageSize = param.getPageSize(); int start = Math.max((pageNum - 1) * pageSize, 0); int end = pageNum * pageSize - 1; if (param.getStatus() != null) { String statusKey = buildGlobalStatus(GlobalStatus.get(param.getStatus()).getCode()); try (Jedis jedis = JedisPooledFactory.getJedisInstance()) { final List xids = jedis.lrange(statusKey, start, end); xids.forEach(xid -> { GlobalSession globalSession = this.readSession(xid, param.isWithBranch()); if (globalSession != null) { globalSessions.add(globalSession); } }); } } return globalSessions; } /** * assemble the global session and branch session * @param globalTransactionDO the global transactionDo * @param branchTransactionDOs the branch transactionDos * @param withBranchSessions if read branch sessions * @return the global session with branch session */ private GlobalSession getGlobalSession(GlobalTransactionDO globalTransactionDO, List branchTransactionDOs, boolean withBranchSessions) { GlobalSession globalSession = SessionConverter.convertGlobalSession(globalTransactionDO, !withBranchSessions); if (CollectionUtils.isNotEmpty(branchTransactionDOs)) { for (BranchTransactionDO branchTransactionDO : branchTransactionDOs) { globalSession.add(SessionConverter.convertBranchSession(branchTransactionDO)); } } return globalSession; } /** * read the global session by transactionId * @param transactionId the transaction id * @param withBranchSessions if read branch sessions * @return the global session */ private GlobalSession readSessionByTransactionId(String transactionId, boolean withBranchSessions) { String globalKey = buildGlobalKeyByTransactionId(transactionId); String xid = null; try (Jedis jedis = JedisPooledFactory.getJedisInstance()) { Map map = jedis.hgetAll(globalKey); if (CollectionUtils.isEmpty(map)) { return null; } GlobalTransactionDO globalTransactionDO = (GlobalTransactionDO)BeanUtils.mapToObject(map, GlobalTransactionDO.class); if (globalTransactionDO != null) { xid = globalTransactionDO.getXid(); } List branchTransactionDOs = new ArrayList<>(); if (withBranchSessions) { branchTransactionDOs = this.readBranchSessionByXid(jedis, xid); } return getGlobalSession(globalTransactionDO, branchTransactionDOs, withBranchSessions); } } /** * Read the branch session list by xid * @param jedis the jedis * @param xid the xid * @return the branch transactionDo list */ private List readBranchSessionByXid(Jedis jedis, String xid) { List branchTransactionDOs = new ArrayList<>(); String branchListKey = buildBranchListKeyByXid(xid); List branchKeys = lRange(jedis, branchListKey); if (CollectionUtils.isNotEmpty(branchKeys)) { try (Pipeline pipeline = jedis.pipelined()) { branchKeys.stream().forEach(branchKey -> pipeline.hgetAll(branchKey)); List branchInfos = pipeline.syncAndReturnAll(); for (Object branchInfo : branchInfos) { if (branchInfo != null) { Map branchInfoMap = (Map)branchInfo; Optional branchTransactionDO = Optional.ofNullable( (BranchTransactionDO)BeanUtils.mapToObject(branchInfoMap, BranchTransactionDO.class)); branchTransactionDO.ifPresent(branchTransactionDOs::add); } } } } if (CollectionUtils.isNotEmpty(branchTransactionDOs)) { Collections.sort(branchTransactionDOs); } return branchTransactionDOs; } private List lRange(Jedis jedis, String key) { List keys = new ArrayList<>(); List values; int limit = 20; int start = 0; int stop = limit; for (;;) { values = jedis.lrange(key, start, stop); keys.addAll(values); if (CollectionUtils.isEmpty(values) || values.size() < limit) { break; } start = keys.size(); stop = start + limit; } return keys; } public List findBranchSessionByXid(String xid) { try (Jedis jedis = JedisPooledFactory.getJedisInstance()) { return readBranchSessionByXid(jedis, xid); } } /** * query globalSession by page * * @param pageNum * @param pageSize * @param withBranchSessions * @return List */ public List findGlobalSessionByPage(int pageNum, int pageSize, boolean withBranchSessions) { List globalSessions = new ArrayList<>(); int start = Math.max((pageNum - 1) * pageSize, 0); int end = pageNum * pageSize - 1; List statusKeys = convertStatusKeys(GlobalStatus.values()); Map stringLongMap = calculateStatuskeysHasData(statusKeys); List> list = dogetXidsForTargetMap(stringLongMap, start, end, pageSize); if (CollectionUtils.isNotEmpty(list)) { List xids = list.stream().flatMap(Collection::stream).collect(Collectors.toList()); xids.forEach(xid -> { if (globalSessions.size() < pageSize) { GlobalSession globalSession = this.readSession(xid, withBranchSessions); if (globalSession != null) { globalSessions.add(globalSession); } } }); } return globalSessions; } /** * query and sort existing values * * @param statusKeys * @return */ private Map calculateStatuskeysHasData(List statusKeys) { Map resultMap = new LinkedHashMap<>(); Map keysMap = new HashMap<>(statusKeys.size()); try (Jedis jedis = JedisPooledFactory.getJedisInstance(); Pipeline pipelined = jedis.pipelined()) { statusKeys.forEach(key -> pipelined.llen(key)); List counts = (List) pipelined.syncAndReturnAll(); for (int i = 0; i < counts.size(); i++) { if (counts.get(i) > 0) { keysMap.put(statusKeys.get(i), counts.get(i).intValue()); } } } //sort List> list = new ArrayList<>(keysMap.entrySet()); list.sort((o1, o2) -> o2.getValue() - o1.getValue()); list.forEach(e -> resultMap.put(e.getKey(), e.getValue())); return resultMap; } /** * count GlobalSession total by status * * @param values * @return Long */ public Long countByGlobalSessions(GlobalStatus[] values) { List statusKeys = new ArrayList<>(); Long total = 0L; for (GlobalStatus status : values) { statusKeys.add(buildGlobalStatus(status.getCode())); } try (Jedis jedis = JedisPooledFactory.getJedisInstance(); Pipeline pipelined = jedis.pipelined()) { statusKeys.stream().forEach(statusKey -> pipelined.llen(statusKey)); List list = (List)(List)pipelined.syncAndReturnAll(); if (list.size() > 0) { total = list.stream().mapToLong(value -> value).sum(); } return total; } } private List convertStatusKeys(GlobalStatus... statuses) { List statusKeys = new ArrayList<>(); for (int i = 0; i < statuses.length; i++) { statusKeys.add(buildGlobalStatus(statuses[i].getCode())); } return statusKeys; } private void dogetXidsForTargetMapRecursive(Map targetMap, long start, long end, long queryCount, List> listList) { long total = listList.stream().mapToLong(List::size).sum(); if (total >= queryCount) { return; } // when start greater than offset(totalCount) if (start >= queryCount) { return; } if (targetMap.size() == 0) { return; } try (Jedis jedis = JedisPooledFactory.getJedisInstance()) { Iterator> iterator = targetMap.entrySet().iterator(); while (iterator.hasNext()) { String key = iterator.next().getKey(); final long sum = listList.stream().mapToLong(List::size).sum(); final long diffCount = queryCount - sum; if (diffCount <= 0) { return; } List list; if (end - start >= diffCount) { long endNew = start + diffCount - 1; list = jedis.lrange(key, start, endNew); } else { list = jedis.lrange(key, start, end); } if (list.size() > 0) { listList.add(list); } else { iterator.remove(); } } } long startNew = end + 1; long endNew = startNew + end - start; dogetXidsForTargetMapRecursive(targetMap, startNew, endNew, queryCount, listList); } private List> dogetXidsForTargetMap(Map targetMap, int start, int end, int totalCount) { List> listList = new ArrayList<>(); try (Jedis jedis = JedisPooledFactory.getJedisInstance()) { for (String key : targetMap.keySet()) { final List list = jedis.lrange(key, start, end); final long sum = listList.stream().mapToLong(List::size).sum(); if (list.size() > 0 && sum < totalCount) { listList.add(list); } else { start = 0; end = totalCount - 1; } } } return listList; } private String buildBranchListKeyByXid(String xid) { return REDIS_SEATA_BRANCHES_PREFIX + xid; } private String buildGlobalKeyByTransactionId(Object transactionId) { return REDIS_SEATA_GLOBAL_PREFIX + transactionId; } private String buildBranchKey(Long branchId) { return REDIS_SEATA_BRANCH_PREFIX + branchId; } private String buildGlobalStatus(Integer status) { return REDIS_SEATA_STATUS_PREFIX + status; } /** * Sets log query limit. * * @param logQueryLimit the log query limit */ public void setLogQueryLimit(int logQueryLimit) { this.logQueryLimit = logQueryLimit; } }