package com.walker.store.support;
|
|
import com.walker.connector.Address;
|
import com.walker.connector.support.DatabaseConnector;
|
import com.walker.connector.util.ConnectorUtils;
|
import com.walker.db.DatabaseException;
|
import com.walker.db.DatabaseExistException;
|
import com.walker.db.DatabaseType;
|
import com.walker.dbmeta.util.DatabaseUtils;
|
import com.walker.infrastructure.utils.StringUtils;
|
import com.walker.store.AbstractStore;
|
import com.walker.store.StoreStrategy;
|
import com.walker.store.strategy.DatabaseStoreStrategy;
|
|
import java.util.List;
|
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.TimeUnit;
|
|
public abstract class DatabaseStore extends AbstractStore {
|
|
/* 多库情况下执行连接器,key=Address, value=DatabaseConnector */
|
private ConcurrentHashMap<Address, DatabaseConnector> cachedConnectors = new ConcurrentHashMap<Address, DatabaseConnector>(8);
|
|
/* 管理用的连接器缓存,key=ip.hashCode + port, value=DatabaseConnector */
|
private ConcurrentHashMap<Integer, DatabaseConnector> manageConnectors = new ConcurrentHashMap<Integer, DatabaseConnector>(8);
|
|
private DatabaseType databaseType = DatabaseType.MYSQL;
|
|
// 创建数据库、表结构时引用锁
|
private final Object _lock = new Object();
|
|
private boolean transaction = true; // 是否支持数据库事务
|
|
// 是否支持数据更新
|
private boolean supportUpdate = false;
|
|
/**
|
* 存储在写入时,是否支持数据更新操作。</p>
|
* 如果支持,就意味着,需要先检查目标数据是否存在,如果存在要根据主键更新
|
* @return
|
*/
|
public boolean isSupportUpdate() {
|
return supportUpdate;
|
}
|
|
public void setSupportUpdate(boolean supportUpdate) {
|
this.supportUpdate = supportUpdate;
|
}
|
|
public boolean isTransaction() {
|
return transaction;
|
}
|
|
public void setTransaction(boolean transaction) {
|
this.transaction = transaction;
|
}
|
|
public void setDatabaseType(DatabaseType databaseType) {
|
this.databaseType = databaseType;
|
}
|
|
@Override
|
public void initialize() {
|
super.initialize();
|
this.doPrepare();
|
}
|
|
private void doPrepare(){
|
StoreStrategy repository = (StoreStrategy)this.storeStrategy;
|
List<Address> addresses = repository.getDefineAddressList();
|
if(addresses == null || addresses.size() == 0){
|
throw new IllegalStateException("请先调用setRepository()方法");
|
}
|
|
if(databaseType == DatabaseType.MYSQL){
|
// 初始化已经存在的数据库连接器
|
// Repository repo = this.getMetaDataEngine().getExistRepository(getId());
|
// if(repo != null){
|
// existAddressList = repo.getAddressList();
|
// }
|
} else if(databaseType == DatabaseType.POSTGRES){
|
logger.info("存储使用了postgreSQL数据库");
|
|
} else if(databaseType == DatabaseType.ORACLE){
|
throw new UnsupportedOperationException("当前不支持oracle存储方式");
|
} else {
|
throw new UnsupportedOperationException("databaseType = " + databaseType);
|
}
|
}
|
|
@Override
|
public void destroy() {
|
super.destroy();
|
for(DatabaseConnector conn : cachedConnectors.values()){
|
conn.destroy();
|
}
|
for(DatabaseConnector conn : manageConnectors.values()){
|
conn.destroy();
|
}
|
cachedConnectors.clear();
|
manageConnectors.clear();
|
}
|
|
/**
|
* 返回当前存储正在使用的数据库连接器对象<code>DatabaseConnector</code>。</p>
|
* 如果还没有创建会返回<code>null</code><br>
|
// * @param storeId
|
* @return
|
*/
|
public DatabaseConnector getCurrentStoreConnector(){
|
Address address = this.getMetaDataEngine().getUsingAddress(this.getId());
|
if(address == null){
|
logger.warn("当前存储缓存的address对象不存在,可能是第一次执行还没有创建。storeId = " + this.getId() + ", " + this.getDescription());
|
return null;
|
}
|
// return this.cachedConnectors.get(address);
|
return this.doCreateConnector(address);
|
}
|
|
@Override
|
public void write(String srcName, String createTableSQL
|
, Object parameter, List<Object> datas) throws Exception {
|
if(StringUtils.isEmpty(srcName)){
|
throw new IllegalArgumentException("必须提供写入的原始表名:srcName!");
|
}
|
|
/* 注意:此处必须顺序执行,先保证库、表选择正确,后面的写数据可以并发 */
|
Address addr = null;
|
String destTableName = null;
|
DatabaseConnector connector = null;
|
|
synchronized (_lock) {
|
DatabaseStoreStrategy strategy = (DatabaseStoreStrategy)this.storeStrategy;
|
addr = strategy.getDatabaseAddress(parameter);
|
// 处理数据库
|
this.doCheckDatabase(addr);
|
// 处理表
|
destTableName = strategy.getTableName(addr, srcName, parameter);
|
connector = this.doCheckTable(addr, destTableName
|
, createTableSQL.trim().toLowerCase(), parameter);
|
}
|
|
// 向表中写入数据
|
// 1、找出原始表的字段信息
|
List<String> fields = this.getMetaDataEngine().getFields(addr, destTableName);
|
if(StringUtils.isEmptyList(fields)){
|
throw new DatabaseException("未找到表字段信息,无法执行写入SQL语句。tableName = " + destTableName);
|
}
|
// 2、把数据与字段信息拼接成sql语句执行入库
|
if(datas != null){
|
this.onWriteDataToDB(addr, connector, fields, destTableName, datas, srcName, parameter);
|
}
|
}
|
|
// /**
|
// * 返回一个数据库连接器对象
|
// * @param addr 数据库连接地址
|
// * @param manage 是否管理连接,管理连接没有数据库名
|
// * @return
|
// */
|
// private DatabaseConnector getDbConnectorByType(Address addr, boolean manage){
|
// if(databaseType == DatabaseType.MYSQL){
|
// if(manage){
|
// return ConnectorUtils.createMySQLManageConnector(addr);
|
// } else {
|
// return ConnectorUtils.createMySQLConnector(addr);
|
// }
|
// } else if(databaseType == DatabaseType.POSTGRES){
|
// if(manage){
|
// return ConnectorUtils.createPostgresManageConnector(addr);
|
// } else {
|
// return ConnectorUtils.createPostgresConnector(addr);
|
// }
|
// } else if(databaseType == DatabaseType.ORACLE){
|
// if(manage){
|
// return ConnectorUtils.createOracleManageConnector(addr);
|
// } else {
|
// return ConnectorUtils.createOracleConnector(addr);
|
// }
|
// } else {
|
// throw new UnsupportedOperationException("未实现对其他数据库的支持:DatabaseConnector");
|
// }
|
// }
|
|
private DatabaseConnector doCheckTable(Address addr
|
, String destTableName, String createSQL, Object parameter)
|
throws DatabaseException{
|
if(StringUtils.isEmpty(destTableName)){
|
throw new IllegalArgumentException("策略生成的表名不存在,无法继续执行");
|
}
|
|
DatabaseConnector connector = doCreateConnector(addr);
|
|
if(!this.getMetaDataEngine().isExistTable(getId(), addr, destTableName)){
|
logger.debug("元数据中没有表,需要自动创建:" + destTableName);
|
|
// 找出sql中表名,如果与给定的目的表名不一致,要用目的表名替换
|
if(StringUtils.isEmpty(createSQL)){
|
throw new IllegalArgumentException("SQL语句中未找到表名,destTableName = " + destTableName);
|
}
|
|
// 处理给定的SQL语句,因为里面可能会存在多个,包括:创建表结构和索引的!
|
String[] sqlList = createSQL.split(StringUtils.SEPARATOR_SEMI_COLON);
|
if(sqlList == null || sqlList.length == 0){
|
throw new DatabaseException("存储引擎未发现任何创建表或者索引的SQL语句,无法继续执行写入数据任务。createSQL = " + createSQL + ", id = " + this.getId());
|
}
|
for(String sql : sqlList){
|
sql = sql.trim(); // 去掉回车、换行、空格等
|
if(DatabaseUtils.isCreateTableSQL(sql)){
|
String tname = DatabaseUtils.findTableNameInSQL(sql);
|
if(!tname.equals(destTableName)){
|
sql = sql.replace(tname, destTableName);
|
// logger.debug("替换后的SQL = " + sql);
|
}
|
connector.exeCreateTable(sql);
|
|
} else if(DatabaseUtils.isCreateIndexSQL(sql)){
|
String iname = DatabaseUtils.findIndexNameInSQL(sql);
|
// 重用生成表名的策略来生成索引名字,它两个其实一样,不想再加入新接口了。
|
String indexName = ((DatabaseStoreStrategy)this.storeStrategy).getTableName(addr, iname, parameter);
|
if(!iname.equals(indexName)){
|
// 替换索引名字
|
sql = sql.replace(iname, indexName);
|
// 替换索引中表名
|
sql = sql.replace(DatabaseUtils.findTableNameInIndex(sql), destTableName);
|
// logger.debug("替换后的索引SQL = " + sql);
|
}
|
connector.exeCreateTable(sql);
|
}
|
}
|
|
// 通过引擎保存表的元数据
|
this.getMetaDataEngine().saveNewTable(getId(), addr, destTableName);
|
}
|
return connector;
|
}
|
|
private DatabaseConnector doCreateConnector(Address addr){
|
DatabaseConnector connector = cachedConnectors.get(addr);
|
if(connector == null){
|
connector = ConnectorUtils.getDbConnectorByType(addr, false, this.databaseType);
|
if(transaction){
|
connector = ConnectorUtils.acquireTransactionProxyConnector(connector);
|
logger.debug("创建支持事务的数据库连接对象:" + connector);
|
}
|
cachedConnectors.put(addr, connector);
|
}
|
return connector;
|
}
|
|
// private DatabaseConnector aquireTransactionProxyFactoryBean(DatabaseConnector connector){
|
// PlatformTransactionManager transactionManager = new DataSourceTransactionManager(connector.getDataSource());
|
//
|
// // 定义事务规则
|
// Properties transactionAttributes = new Properties();
|
// transactionAttributes.put("query*", "PROPAGATION_SUPPORTS,readOnly");
|
// transactionAttributes.put("exec*", "PROPAGATION_REQUIRED, -Exception");
|
//
|
// TransactionProxyFactoryBean transactionProxy = new TransactionProxyFactoryBean();
|
// transactionProxy.setTransactionManager(transactionManager);
|
// transactionProxy.setTransactionAttributes(transactionAttributes);
|
// transactionProxy.setProxyTargetClass(true);
|
// transactionProxy.setTarget(connector);
|
// transactionProxy.afterPropertiesSet();
|
// return (DatabaseConnector)transactionProxy.getObject();
|
// }
|
|
/**
|
* 检查要写入的数据库是否已经正确保存到元数据中。<br>
|
* 该方法会自动创建数据库,如果出现'数据库已存在'之外的其他异常将会抛出。
|
* @param addr
|
* @throws DatabaseException
|
*/
|
private void doCheckDatabase(Address addr) throws DatabaseException{
|
if(!this.getMetaDataEngine().isExistDatabase(this.getId(), addr)){
|
logger.info("元数据中没有数据库:" + addr);
|
DatabaseConnector connector = doCreateManageConnector(addr);
|
try {
|
connector.exeCreateDatabase(addr.getServiceName());
|
// 创建完数据库,间隔一段时间,否则并发打会导致多数据库无法创建
|
TimeUnit.MILLISECONDS.sleep(100);
|
} catch (DatabaseException e) {
|
if(e instanceof DatabaseExistException){
|
logger.info("数据库已经存在,不再创建:" + addr.getServiceName());
|
} else {
|
throw e;
|
}
|
} catch(InterruptedException ex){
|
|
}
|
|
// 调用引擎,保存该数据库到元数据库中
|
this.getMetaDataEngine().saveNewAddress(getId(), addr);
|
}
|
}
|
|
private DatabaseConnector doCreateManageConnector(Address addr){
|
int hashCode = addr.getUrl().hashCode() + addr.getPort();
|
DatabaseConnector connector = manageConnectors.get(hashCode);
|
if(connector == null){
|
connector = ConnectorUtils.getDbConnectorByType(addr, true, this.databaseType);
|
manageConnectors.put(hashCode, connector);
|
}
|
return connector;
|
}
|
|
/**
|
* 写入数据到对应的字段中,然后通过connector对象提交数据。
|
* @param addr 连接数据库地址对象
|
* @param connector
|
* @param fields 写入的字段元数据
|
* @param destTableName 要写入的目的表名
|
* @param datas
|
* @param srcName 源表名字
|
* @param parameter 业务传递参数,如:customerCode
|
* @return
|
* @throws DatabaseException
|
*/
|
protected abstract int onWriteDataToDB(Address addr, DatabaseConnector connector
|
, List<String> fields
|
, String destTableName, List<Object> datas
|
, String srcName, Object parameter) throws DatabaseException;
|
|
}
|