package com.walker.store.support;
import com.walker.connector.Address;
import com.walker.connector.support.DatabaseConnector;
import com.walker.connector.util.ConnectorUtils;
import com.walker.db.DatabaseException;
import com.walker.db.DatabaseExistException;
import com.walker.db.DatabaseType;
import com.walker.dbmeta.util.DatabaseUtils;
import com.walker.infrastructure.utils.StringUtils;
import com.walker.store.AbstractStore;
import com.walker.store.StoreStrategy;
import com.walker.store.strategy.DatabaseStoreStrategy;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
public abstract class DatabaseStore extends AbstractStore {
/* 多库情况下执行连接器,key=Address, value=DatabaseConnector */
private ConcurrentHashMap
cachedConnectors = new ConcurrentHashMap(8);
/* 管理用的连接器缓存,key=ip.hashCode + port, value=DatabaseConnector */
private ConcurrentHashMap manageConnectors = new ConcurrentHashMap(8);
private DatabaseType databaseType = DatabaseType.MYSQL;
// 创建数据库、表结构时引用锁
private final Object _lock = new Object();
private boolean transaction = true; // 是否支持数据库事务
// 是否支持数据更新
private boolean supportUpdate = false;
/**
* 存储在写入时,是否支持数据更新操作。
* 如果支持,就意味着,需要先检查目标数据是否存在,如果存在要根据主键更新
* @return
*/
public boolean isSupportUpdate() {
return supportUpdate;
}
public void setSupportUpdate(boolean supportUpdate) {
this.supportUpdate = supportUpdate;
}
public boolean isTransaction() {
return transaction;
}
public void setTransaction(boolean transaction) {
this.transaction = transaction;
}
public void setDatabaseType(DatabaseType databaseType) {
this.databaseType = databaseType;
}
@Override
public void initialize() {
super.initialize();
this.doPrepare();
}
private void doPrepare(){
StoreStrategy repository = (StoreStrategy)this.storeStrategy;
List addresses = repository.getDefineAddressList();
if(addresses == null || addresses.size() == 0){
throw new IllegalStateException("请先调用setRepository()方法");
}
if(databaseType == DatabaseType.MYSQL){
// 初始化已经存在的数据库连接器
// Repository repo = this.getMetaDataEngine().getExistRepository(getId());
// if(repo != null){
// existAddressList = repo.getAddressList();
// }
} else if(databaseType == DatabaseType.POSTGRES){
logger.info("存储使用了postgreSQL数据库");
} else if(databaseType == DatabaseType.ORACLE){
throw new UnsupportedOperationException("当前不支持oracle存储方式");
} else {
throw new UnsupportedOperationException("databaseType = " + databaseType);
}
}
@Override
public void destroy() {
super.destroy();
for(DatabaseConnector conn : cachedConnectors.values()){
conn.destroy();
}
for(DatabaseConnector conn : manageConnectors.values()){
conn.destroy();
}
cachedConnectors.clear();
manageConnectors.clear();
}
/**
* 返回当前存储正在使用的数据库连接器对象DatabaseConnector
。
* 如果还没有创建会返回null
// * @param storeId
* @return
*/
public DatabaseConnector getCurrentStoreConnector(){
Address address = this.getMetaDataEngine().getUsingAddress(this.getId());
if(address == null){
logger.warn("当前存储缓存的address对象不存在,可能是第一次执行还没有创建。storeId = " + this.getId() + ", " + this.getDescription());
return null;
}
// return this.cachedConnectors.get(address);
return this.doCreateConnector(address);
}
@Override
public void write(String srcName, String createTableSQL
, Object parameter, List