package com.walker.store.support; import com.walker.connector.Address; import com.walker.connector.support.DatabaseConnector; import com.walker.connector.util.ConnectorUtils; import com.walker.db.DatabaseException; import com.walker.db.DatabaseExistException; import com.walker.db.DatabaseType; import com.walker.dbmeta.util.DatabaseUtils; import com.walker.infrastructure.utils.StringUtils; import com.walker.store.AbstractStore; import com.walker.store.StoreStrategy; import com.walker.store.strategy.DatabaseStoreStrategy; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; public abstract class DatabaseStore extends AbstractStore { /* 多库情况下执行连接器,key=Address, value=DatabaseConnector */ private ConcurrentHashMap cachedConnectors = new ConcurrentHashMap(8); /* 管理用的连接器缓存,key=ip.hashCode + port, value=DatabaseConnector */ private ConcurrentHashMap manageConnectors = new ConcurrentHashMap(8); private DatabaseType databaseType = DatabaseType.MYSQL; // 创建数据库、表结构时引用锁 private final Object _lock = new Object(); private boolean transaction = true; // 是否支持数据库事务 // 是否支持数据更新 private boolean supportUpdate = false; /** * 存储在写入时,是否支持数据更新操作。

* 如果支持,就意味着,需要先检查目标数据是否存在,如果存在要根据主键更新 * @return */ public boolean isSupportUpdate() { return supportUpdate; } public void setSupportUpdate(boolean supportUpdate) { this.supportUpdate = supportUpdate; } public boolean isTransaction() { return transaction; } public void setTransaction(boolean transaction) { this.transaction = transaction; } public void setDatabaseType(DatabaseType databaseType) { this.databaseType = databaseType; } @Override public void initialize() { super.initialize(); this.doPrepare(); } private void doPrepare(){ StoreStrategy repository = (StoreStrategy)this.storeStrategy; List
addresses = repository.getDefineAddressList(); if(addresses == null || addresses.size() == 0){ throw new IllegalStateException("请先调用setRepository()方法"); } if(databaseType == DatabaseType.MYSQL){ // 初始化已经存在的数据库连接器 // Repository repo = this.getMetaDataEngine().getExistRepository(getId()); // if(repo != null){ // existAddressList = repo.getAddressList(); // } } else if(databaseType == DatabaseType.POSTGRES){ logger.info("存储使用了postgreSQL数据库"); } else if(databaseType == DatabaseType.ORACLE){ throw new UnsupportedOperationException("当前不支持oracle存储方式"); } else { throw new UnsupportedOperationException("databaseType = " + databaseType); } } @Override public void destroy() { super.destroy(); for(DatabaseConnector conn : cachedConnectors.values()){ conn.destroy(); } for(DatabaseConnector conn : manageConnectors.values()){ conn.destroy(); } cachedConnectors.clear(); manageConnectors.clear(); } /** * 返回当前存储正在使用的数据库连接器对象DatabaseConnector

* 如果还没有创建会返回null
// * @param storeId * @return */ public DatabaseConnector getCurrentStoreConnector(){ Address address = this.getMetaDataEngine().getUsingAddress(this.getId()); if(address == null){ logger.warn("当前存储缓存的address对象不存在,可能是第一次执行还没有创建。storeId = " + this.getId() + ", " + this.getDescription()); return null; } // return this.cachedConnectors.get(address); return this.doCreateConnector(address); } @Override public void write(String srcName, String createTableSQL , Object parameter, List datas) throws Exception { if(StringUtils.isEmpty(srcName)){ throw new IllegalArgumentException("必须提供写入的原始表名:srcName!"); } /* 注意:此处必须顺序执行,先保证库、表选择正确,后面的写数据可以并发 */ Address addr = null; String destTableName = null; DatabaseConnector connector = null; synchronized (_lock) { DatabaseStoreStrategy strategy = (DatabaseStoreStrategy)this.storeStrategy; addr = strategy.getDatabaseAddress(parameter); // 处理数据库 this.doCheckDatabase(addr); // 处理表 destTableName = strategy.getTableName(addr, srcName, parameter); connector = this.doCheckTable(addr, destTableName , createTableSQL.trim().toLowerCase(), parameter); } // 向表中写入数据 // 1、找出原始表的字段信息 List fields = this.getMetaDataEngine().getFields(addr, destTableName); if(StringUtils.isEmptyList(fields)){ throw new DatabaseException("未找到表字段信息,无法执行写入SQL语句。tableName = " + destTableName); } // 2、把数据与字段信息拼接成sql语句执行入库 if(datas != null){ this.onWriteDataToDB(addr, connector, fields, destTableName, datas, srcName, parameter); } } // /** // * 返回一个数据库连接器对象 // * @param addr 数据库连接地址 // * @param manage 是否管理连接,管理连接没有数据库名 // * @return // */ // private DatabaseConnector getDbConnectorByType(Address addr, boolean manage){ // if(databaseType == DatabaseType.MYSQL){ // if(manage){ // return ConnectorUtils.createMySQLManageConnector(addr); // } else { // return ConnectorUtils.createMySQLConnector(addr); // } // } else if(databaseType == DatabaseType.POSTGRES){ // if(manage){ // return ConnectorUtils.createPostgresManageConnector(addr); // } else { // return ConnectorUtils.createPostgresConnector(addr); // } // } else if(databaseType == DatabaseType.ORACLE){ // if(manage){ // return ConnectorUtils.createOracleManageConnector(addr); // } else { // return ConnectorUtils.createOracleConnector(addr); // } // } else { // throw new UnsupportedOperationException("未实现对其他数据库的支持:DatabaseConnector"); // } // } private DatabaseConnector doCheckTable(Address addr , String destTableName, String createSQL, Object parameter) throws DatabaseException{ if(StringUtils.isEmpty(destTableName)){ throw new IllegalArgumentException("策略生成的表名不存在,无法继续执行"); } DatabaseConnector connector = doCreateConnector(addr); if(!this.getMetaDataEngine().isExistTable(getId(), addr, destTableName)){ logger.debug("元数据中没有表,需要自动创建:" + destTableName); // 找出sql中表名,如果与给定的目的表名不一致,要用目的表名替换 if(StringUtils.isEmpty(createSQL)){ throw new IllegalArgumentException("SQL语句中未找到表名,destTableName = " + destTableName); } // 处理给定的SQL语句,因为里面可能会存在多个,包括:创建表结构和索引的! String[] sqlList = createSQL.split(StringUtils.SEPARATOR_SEMI_COLON); if(sqlList == null || sqlList.length == 0){ throw new DatabaseException("存储引擎未发现任何创建表或者索引的SQL语句,无法继续执行写入数据任务。createSQL = " + createSQL + ", id = " + this.getId()); } for(String sql : sqlList){ sql = sql.trim(); // 去掉回车、换行、空格等 if(DatabaseUtils.isCreateTableSQL(sql)){ String tname = DatabaseUtils.findTableNameInSQL(sql); if(!tname.equals(destTableName)){ sql = sql.replace(tname, destTableName); // logger.debug("替换后的SQL = " + sql); } connector.exeCreateTable(sql); } else if(DatabaseUtils.isCreateIndexSQL(sql)){ String iname = DatabaseUtils.findIndexNameInSQL(sql); // 重用生成表名的策略来生成索引名字,它两个其实一样,不想再加入新接口了。 String indexName = ((DatabaseStoreStrategy)this.storeStrategy).getTableName(addr, iname, parameter); if(!iname.equals(indexName)){ // 替换索引名字 sql = sql.replace(iname, indexName); // 替换索引中表名 sql = sql.replace(DatabaseUtils.findTableNameInIndex(sql), destTableName); // logger.debug("替换后的索引SQL = " + sql); } connector.exeCreateTable(sql); } } // 通过引擎保存表的元数据 this.getMetaDataEngine().saveNewTable(getId(), addr, destTableName); } return connector; } private DatabaseConnector doCreateConnector(Address addr){ DatabaseConnector connector = cachedConnectors.get(addr); if(connector == null){ connector = ConnectorUtils.getDbConnectorByType(addr, false, this.databaseType); if(transaction){ connector = ConnectorUtils.acquireTransactionProxyConnector(connector); logger.debug("创建支持事务的数据库连接对象:" + connector); } cachedConnectors.put(addr, connector); } return connector; } // private DatabaseConnector aquireTransactionProxyFactoryBean(DatabaseConnector connector){ // PlatformTransactionManager transactionManager = new DataSourceTransactionManager(connector.getDataSource()); // // // 定义事务规则 // Properties transactionAttributes = new Properties(); // transactionAttributes.put("query*", "PROPAGATION_SUPPORTS,readOnly"); // transactionAttributes.put("exec*", "PROPAGATION_REQUIRED, -Exception"); // // TransactionProxyFactoryBean transactionProxy = new TransactionProxyFactoryBean(); // transactionProxy.setTransactionManager(transactionManager); // transactionProxy.setTransactionAttributes(transactionAttributes); // transactionProxy.setProxyTargetClass(true); // transactionProxy.setTarget(connector); // transactionProxy.afterPropertiesSet(); // return (DatabaseConnector)transactionProxy.getObject(); // } /** * 检查要写入的数据库是否已经正确保存到元数据中。
* 该方法会自动创建数据库,如果出现'数据库已存在'之外的其他异常将会抛出。 * @param addr * @throws DatabaseException */ private void doCheckDatabase(Address addr) throws DatabaseException{ if(!this.getMetaDataEngine().isExistDatabase(this.getId(), addr)){ logger.info("元数据中没有数据库:" + addr); DatabaseConnector connector = doCreateManageConnector(addr); try { connector.exeCreateDatabase(addr.getServiceName()); // 创建完数据库,间隔一段时间,否则并发打会导致多数据库无法创建 TimeUnit.MILLISECONDS.sleep(100); } catch (DatabaseException e) { if(e instanceof DatabaseExistException){ logger.info("数据库已经存在,不再创建:" + addr.getServiceName()); } else { throw e; } } catch(InterruptedException ex){ } // 调用引擎,保存该数据库到元数据库中 this.getMetaDataEngine().saveNewAddress(getId(), addr); } } private DatabaseConnector doCreateManageConnector(Address addr){ int hashCode = addr.getUrl().hashCode() + addr.getPort(); DatabaseConnector connector = manageConnectors.get(hashCode); if(connector == null){ connector = ConnectorUtils.getDbConnectorByType(addr, true, this.databaseType); manageConnectors.put(hashCode, connector); } return connector; } /** * 写入数据到对应的字段中,然后通过connector对象提交数据。 * @param addr 连接数据库地址对象 * @param connector * @param fields 写入的字段元数据 * @param destTableName 要写入的目的表名 * @param datas * @param srcName 源表名字 * @param parameter 业务传递参数,如:customerCode * @return * @throws DatabaseException */ protected abstract int onWriteDataToDB(Address addr, DatabaseConnector connector , List fields , String destTableName, List datas , String srcName, Object parameter) throws DatabaseException; }