/* * Copyright 1999-2019 Seata.io Group. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package io.seata.server.storage.db.lock; import io.seata.common.exception.DataAccessException; import io.seata.common.exception.StoreException; import io.seata.common.util.CollectionUtils; import io.seata.common.util.IOUtil; import io.seata.common.util.LambdaUtils; import io.seata.common.util.StringUtils; import io.seata.config.Configuration; import io.seata.config.ConfigurationFactory; import io.seata.core.constants.ConfigurationKeys; import io.seata.core.constants.ServerTableColumnsName; import io.seata.core.exception.BranchTransactionException; import io.seata.core.model.LockStatus; import io.seata.core.store.LockDO; import io.seata.core.store.LockStore; import io.seata.core.store.db.sql.lock.LockStoreSqlFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.sql.DataSource; import java.sql.*; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.stream.Collectors; import static io.seata.common.DefaultValues.DEFAULT_LOCK_DB_TABLE; import static io.seata.core.exception.TransactionExceptionCode.LockKeyConflictFailFast; /** * The type Data base lock store. * * @author zhangsen */ public class LockStoreDataBaseDAO implements LockStore { private static final Logger LOGGER = LoggerFactory.getLogger(LockStoreDataBaseDAO.class); /** * The constant CONFIG. */ protected static final Configuration CONFIG = ConfigurationFactory.getInstance(); /** * The Lock store data source. */ protected DataSource lockStoreDataSource; /** * The Lock table. */ protected String lockTable; /** * The Db type. */ protected String dbType; /** * Instantiates a new Data base lock store dao. * * @param lockStoreDataSource the log store data source */ public LockStoreDataBaseDAO(DataSource lockStoreDataSource) { this.lockStoreDataSource = lockStoreDataSource; lockTable = CONFIG.getConfig(ConfigurationKeys.LOCK_DB_TABLE, DEFAULT_LOCK_DB_TABLE); dbType = CONFIG.getConfig(ConfigurationKeys.STORE_DB_TYPE); if (StringUtils.isBlank(dbType)) { throw new StoreException("there must be db type."); } if (lockStoreDataSource == null) { throw new StoreException("there must be lockStoreDataSource."); } } @Override public boolean acquireLock(LockDO lockDO) { return acquireLock(Collections.singletonList(lockDO)); } @Override public boolean acquireLock(List lockDOs) { return acquireLock(lockDOs, true, false); } @Override public boolean acquireLock(List lockDOs, boolean autoCommit, boolean skipCheckLock) { Connection conn = null; PreparedStatement ps = null; ResultSet rs = null; Set dbExistedRowKeys = new HashSet<>(); boolean originalAutoCommit = true; if (lockDOs.size() > 1) { lockDOs = lockDOs.stream().filter(LambdaUtils.distinctByKey(LockDO::getRowKey)).collect(Collectors.toList()); } try { conn = lockStoreDataSource.getConnection(); if (originalAutoCommit = conn.getAutoCommit()) { conn.setAutoCommit(false); } List unrepeatedLockDOs = lockDOs; //check lock if (!skipCheckLock) { boolean canLock = true; //query String checkLockSQL = LockStoreSqlFactory.getLogStoreSql(dbType).getCheckLockableSql(lockTable, lockDOs.size()); ps = conn.prepareStatement(checkLockSQL); for (int i = 0; i < lockDOs.size(); i++) { ps.setString(i + 1, lockDOs.get(i).getRowKey()); } rs = ps.executeQuery(); String currentXID = lockDOs.get(0).getXid(); boolean failFast = false; while (rs.next()) { String dbXID = rs.getString(ServerTableColumnsName.LOCK_TABLE_XID); if (!StringUtils.equals(dbXID, currentXID)) { if (LOGGER.isInfoEnabled()) { String dbPk = rs.getString(ServerTableColumnsName.LOCK_TABLE_PK); String dbTableName = rs.getString(ServerTableColumnsName.LOCK_TABLE_TABLE_NAME); long dbBranchId = rs.getLong(ServerTableColumnsName.LOCK_TABLE_BRANCH_ID); LOGGER.info("Global lock on [{}:{}] is holding by xid {} branchId {}", dbTableName, dbPk, dbXID, dbBranchId); } if (!autoCommit) { int status = rs.getInt(ServerTableColumnsName.LOCK_TABLE_STATUS); if (status == LockStatus.Rollbacking.getCode()) { failFast = true; } } canLock = false; break; } dbExistedRowKeys.add(rs.getString(ServerTableColumnsName.LOCK_TABLE_ROW_KEY)); } if (!canLock) { conn.rollback(); if (failFast) { throw new StoreException(new BranchTransactionException(LockKeyConflictFailFast)); } return false; } // If the lock has been exists in db, remove it from the lockDOs if (CollectionUtils.isNotEmpty(dbExistedRowKeys)) { unrepeatedLockDOs = lockDOs.stream().filter(lockDO -> !dbExistedRowKeys.contains(lockDO.getRowKey())) .collect(Collectors.toList()); } if (CollectionUtils.isEmpty(unrepeatedLockDOs)) { conn.rollback(); return true; } } // lock if (unrepeatedLockDOs.size() == 1) { LockDO lockDO = unrepeatedLockDOs.get(0); if (!doAcquireLock(conn, lockDO)) { if (LOGGER.isInfoEnabled()) { LOGGER.info("Global lock acquire failed, xid {} branchId {} pk {}", lockDO.getXid(), lockDO.getBranchId(), lockDO.getPk()); } conn.rollback(); return false; } } else { if (!doAcquireLocks(conn, unrepeatedLockDOs)) { if (LOGGER.isInfoEnabled()) { LOGGER.info("Global lock batch acquire failed, xid {} branchId {} pks {}", unrepeatedLockDOs.get(0).getXid(), unrepeatedLockDOs.get(0).getBranchId(), unrepeatedLockDOs.stream().map(lockDO -> lockDO.getPk()).collect(Collectors.toList())); } conn.rollback(); return false; } } conn.commit(); return true; } catch (SQLException e) { throw new StoreException(e); } finally { IOUtil.close(rs, ps); if (conn != null) { try { if (originalAutoCommit) { conn.setAutoCommit(true); } conn.close(); } catch (SQLException e) { } } } } @Override public boolean unLock(LockDO lockDO) { return unLock(Collections.singletonList(lockDO)); } @Override public boolean unLock(List lockDOs) { Connection conn = null; PreparedStatement ps = null; try { conn = lockStoreDataSource.getConnection(); conn.setAutoCommit(true); //batch release lock String batchDeleteSQL = LockStoreSqlFactory.getLogStoreSql(dbType).getBatchDeleteLockSql(lockTable, lockDOs.size()); ps = conn.prepareStatement(batchDeleteSQL); ps.setString(1, lockDOs.get(0).getXid()); for (int i = 0; i < lockDOs.size(); i++) { ps.setString(i + 2, lockDOs.get(i).getRowKey()); } ps.executeUpdate(); } catch (SQLException e) { throw new StoreException(e); } finally { IOUtil.close(ps, conn); } return true; } @Override public boolean unLock(String xid) { Connection conn = null; PreparedStatement ps = null; try { conn = lockStoreDataSource.getConnection(); conn.setAutoCommit(true); //batch release lock by branch list String batchDeleteSQL = LockStoreSqlFactory.getLogStoreSql(dbType).getBatchDeleteLockSqlByXid(lockTable); ps = conn.prepareStatement(batchDeleteSQL); ps.setString(1, xid); ps.executeUpdate(); } catch (SQLException e) { throw new StoreException(e); } finally { IOUtil.close(ps, conn); } return true; } @Override public boolean unLock(Long branchId) { Connection conn = null; PreparedStatement ps = null; try { conn = lockStoreDataSource.getConnection(); conn.setAutoCommit(true); //batch release lock by branchId String batchDeleteSQL = LockStoreSqlFactory.getLogStoreSql(dbType).getBatchDeleteLockSqlByBranchId(lockTable); ps = conn.prepareStatement(batchDeleteSQL); ps.setLong(1, branchId); ps.executeUpdate(); } catch (SQLException e) { throw new StoreException(e); } finally { IOUtil.close(ps, conn); } return true; } @Override public boolean isLockable(List lockDOs) { Connection conn = null; try { conn = lockStoreDataSource.getConnection(); conn.setAutoCommit(true); if (!checkLockable(conn, lockDOs)) { return false; } return true; } catch (SQLException e) { throw new DataAccessException(e); } finally { IOUtil.close(conn); } } @Override public void updateLockStatus(String xid, LockStatus lockStatus) { String updateStatusLockByGlobalSql = LockStoreSqlFactory.getLogStoreSql(dbType).getBatchUpdateStatusLockByGlobalSql(lockTable); try (Connection conn = lockStoreDataSource.getConnection(); PreparedStatement ps = conn.prepareStatement(updateStatusLockByGlobalSql)) { conn.setAutoCommit(true); ps.setInt(1, lockStatus.getCode()); ps.setString(2, xid); ps.executeUpdate(); } catch (SQLException e) { throw new DataAccessException(e); } } /** * Do acquire lock boolean. * * @param conn the conn * @param lockDO the lock do * @return the boolean */ protected boolean doAcquireLock(Connection conn, LockDO lockDO) { PreparedStatement ps = null; try { //insert String insertLockSQL = LockStoreSqlFactory.getLogStoreSql(dbType).getInsertLockSQL(lockTable); ps = conn.prepareStatement(insertLockSQL); ps.setString(1, lockDO.getXid()); ps.setLong(2, lockDO.getTransactionId()); ps.setLong(3, lockDO.getBranchId()); ps.setString(4, lockDO.getResourceId()); ps.setString(5, lockDO.getTableName()); ps.setString(6, lockDO.getPk()); ps.setString(7, lockDO.getRowKey()); ps.setInt(8, LockStatus.Locked.getCode()); return ps.executeUpdate() > 0; } catch (SQLException e) { if (e instanceof SQLIntegrityConstraintViolationException) { return false; } throw new StoreException(e); } finally { IOUtil.close(ps); } } /** * Do acquire lock boolean. * * @param conn the conn * @param lockDOs the lock do list * @return the boolean */ protected boolean doAcquireLocks(Connection conn, List lockDOs) throws SQLException { PreparedStatement ps = null; try { //insert String insertLockSQL = LockStoreSqlFactory.getLogStoreSql(dbType).getInsertLockSQL(lockTable); ps = conn.prepareStatement(insertLockSQL); for (LockDO lockDO : lockDOs) { ps.setString(1, lockDO.getXid()); ps.setLong(2, lockDO.getTransactionId()); ps.setLong(3, lockDO.getBranchId()); ps.setString(4, lockDO.getResourceId()); ps.setString(5, lockDO.getTableName()); ps.setString(6, lockDO.getPk()); ps.setString(7, lockDO.getRowKey()); ps.setInt(8, lockDO.getStatus()); ps.addBatch(); } return ps.executeBatch().length == lockDOs.size(); } catch (SQLIntegrityConstraintViolationException e) { LOGGER.error("Global lock batch acquire error: {}", e.getMessage(), e); //return false,let the caller go to conn.rollabck() return false; } catch (SQLException e) { throw e; } finally { IOUtil.close(ps); } } /** * Check lock boolean. * * @param conn the conn * @param lockDOs the lock do * @return the boolean */ protected boolean checkLockable(Connection conn, List lockDOs) { PreparedStatement ps = null; ResultSet rs = null; try { //query String checkLockSQL = LockStoreSqlFactory.getLogStoreSql(dbType).getCheckLockableSql(lockTable, lockDOs.size()); ps = conn.prepareStatement(checkLockSQL); for (int i = 0; i < lockDOs.size(); i++) { ps.setString(i + 1, lockDOs.get(i).getRowKey()); } rs = ps.executeQuery(); while (rs.next()) { String xid = rs.getString("xid"); if (!StringUtils.equals(xid, lockDOs.get(0).getXid())) { return false; } } return true; } catch (SQLException e) { throw new DataAccessException(e); } finally { IOUtil.close(rs, ps); } } /** * Sets lock table. * * @param lockTable the lock table */ public void setLockTable(String lockTable) { this.lockTable = lockTable; } /** * Sets db type. * * @param dbType the db type */ public void setDbType(String dbType) { this.dbType = dbType; } /** * Sets log store data source. * * @param lockStoreDataSource the log store data source */ public void setLogStoreDataSource(DataSource lockStoreDataSource) { this.lockStoreDataSource = lockStoreDataSource; } }