/* * Copyright 1999-2019 Seata.io Group. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package io.seata.server.storage.file.lock; import io.seata.common.exception.FrameworkException; import io.seata.common.exception.StoreException; import io.seata.common.util.CollectionUtils; import io.seata.core.exception.BranchTransactionException; import io.seata.core.exception.TransactionException; import io.seata.core.lock.AbstractLocker; import io.seata.core.lock.RowLock; import io.seata.core.model.LockStatus; import io.seata.server.session.BranchSession; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import static io.seata.core.exception.TransactionExceptionCode.LockKeyConflictFailFast; /** * The type Memory locker. * * @author zhangsen */ public class FileLocker extends AbstractLocker { private static final int BUCKET_PER_TABLE = 128; private static final ConcurrentMap>> LOCK_MAP = new ConcurrentHashMap<>(); /** * The Branch session. */ protected BranchSession branchSession; /** * Instantiates a new Memory locker. * * @param branchSession the branch session */ public FileLocker(BranchSession branchSession) { this.branchSession = branchSession; } @Override public boolean acquireLock(List rowLocks) { return acquireLock(rowLocks, true, false); } @Override public boolean acquireLock(List rowLocks, boolean autoCommit, boolean skipCheckLock) { if (CollectionUtils.isEmpty(rowLocks)) { // no lock return true; } String resourceId = branchSession.getResourceId(); long transactionId = branchSession.getTransactionId(); ConcurrentMap> bucketHolder = branchSession.getLockHolder(); ConcurrentMap> dbLockMap = CollectionUtils.computeIfAbsent( LOCK_MAP, resourceId, key -> new ConcurrentHashMap<>()); boolean failFast = false; boolean canLock = true; for (RowLock lock : rowLocks) { String tableName = lock.getTableName(); String pk = lock.getPk(); ConcurrentMap tableLockMap = CollectionUtils.computeIfAbsent(dbLockMap, tableName, key -> new ConcurrentHashMap<>()); int bucketId = pk.hashCode() % BUCKET_PER_TABLE; BucketLockMap bucketLockMap = CollectionUtils.computeIfAbsent(tableLockMap, bucketId, key -> new BucketLockMap()); BranchSession previousLockBranchSession = bucketLockMap.get().putIfAbsent(pk, branchSession); if (previousLockBranchSession == null) { // No existing lock, and now locked by myself Set keysInHolder = CollectionUtils.computeIfAbsent(bucketHolder, bucketLockMap, key -> ConcurrentHashMap.newKeySet()); keysInHolder.add(pk); } else if (previousLockBranchSession.getTransactionId() == transactionId) { // Locked by me before continue; } else { LOGGER.info("Global lock on [" + tableName + ":" + pk + "] is holding by " + previousLockBranchSession.getBranchId()); try { // Release all acquired locks. branchSession.unlock(); } catch (TransactionException e) { throw new FrameworkException(e); } if (!autoCommit && previousLockBranchSession.getLockStatus() == LockStatus.Rollbacking) { failFast = true; break; } if (canLock) { canLock = false; if (autoCommit) { break; } } } } if (failFast) { throw new StoreException(new BranchTransactionException(LockKeyConflictFailFast)); } return canLock; } @Override public boolean releaseLock(List rowLock) { if (CollectionUtils.isEmpty(rowLock)) { //no lock return true; } ConcurrentMap> lockHolder = branchSession.getLockHolder(); if (CollectionUtils.isEmpty(lockHolder)) { return true; } for (Map.Entry> entry : lockHolder.entrySet()) { BucketLockMap bucket = entry.getKey(); Set keys = entry.getValue(); for (String key : keys) { // remove lock only if it locked by myself bucket.get().remove(key, branchSession); } } lockHolder.clear(); return true; } @Override public boolean isLockable(List rowLocks) { if (CollectionUtils.isEmpty(rowLocks)) { //no lock return true; } Long transactionId = rowLocks.get(0).getTransactionId(); String resourceId = rowLocks.get(0).getResourceId(); ConcurrentMap> dbLockMap = LOCK_MAP.get(resourceId); if (dbLockMap == null) { return true; } for (RowLock rowLock : rowLocks) { String tableName = rowLock.getTableName(); String pk = rowLock.getPk(); ConcurrentMap tableLockMap = dbLockMap.get(tableName); if (tableLockMap == null) { continue; } int bucketId = pk.hashCode() % BUCKET_PER_TABLE; BucketLockMap bucketLockMap = tableLockMap.get(bucketId); if (bucketLockMap == null) { continue; } BranchSession branchSession = bucketLockMap.get().get(pk); Long lockingTransactionId = branchSession != null ? branchSession.getTransactionId() : null; if (lockingTransactionId == null || lockingTransactionId.longValue() == transactionId) { // Locked by me continue; } else { LOGGER.info("Global lock on [" + tableName + ":" + pk + "] is holding by " + lockingTransactionId); return false; } } return true; } @Override public void updateLockStatus(String xid, LockStatus lockStatus) { } @Override public void cleanAllLocks() { LOCK_MAP.clear(); } /** * Because bucket lock map will be key of HashMap(lockHolder), however {@link ConcurrentHashMap} overwrites * {@link Object#hashCode()} and {@link Object#equals(Object)}, that leads to hash key conflict in lockHolder. * We define a {@link BucketLockMap} to hold the ConcurrentHashMap(bucketLockMap) and replace it as key of * HashMap(lockHolder). */ public static class BucketLockMap { private final ConcurrentHashMap bucketLockMap = new ConcurrentHashMap<>(); ConcurrentHashMap get() { return bucketLockMap; } @Override public int hashCode() { return super.hashCode(); } @Override public boolean equals(Object o) { return super.equals(o); } } }