package com.walker.dbmeta;
|
|
import com.walker.connector.Address;
|
import com.walker.connector.support.DatabaseConnector;
|
import com.walker.db.DatabaseException;
|
import com.walker.db.TableInfo;
|
import com.walker.dbmeta.util.DatabaseMetaEngineUtils;
|
import com.walker.infrastructure.utils.StringUtils;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
|
import java.util.ArrayList;
|
import java.util.List;
|
import java.util.Map;
|
|
/**
|
* 抽象的数据库元数据引擎定义。</p>
|
* 该引擎仅作为数据库元数据管理,与系统元数据有区别。<br>
|
* 系统元数据包含了对于业务表和库的管理,不包含具体数据库的详细信息,他也需要此引擎。
|
* @author shikeying
|
* @date 2015年12月21日
|
*
|
*</p>
|
* 修改对象,因为每个引擎对象只会有一个<code>DatabaseConnector</code>引用,所以去掉connector集合对象,<br>
|
* 保持单例状态。
|
* @category 时克英
|
* @date 2019-05-16
|
*/
|
public abstract class AbstractDatabaseMetaEngine implements DatabaseMetaEngine {
|
|
protected final transient Logger logger = LoggerFactory.getLogger(getClass());
|
|
// /* 数据库连接器缓存,key=Address, value=DatabaseConnector */
|
// private ConcurrentHashMap<Address, DatabaseConnector> connectors = new ConcurrentHashMap<Address, DatabaseConnector>(8);
|
|
private DatabaseConnector connector = null;
|
|
/**
|
* 返回表中字段对象集合
|
* @param address
|
* @param tableName
|
* @return
|
*/
|
public List<FieldInfo> getFieldsObject(Address address, String tableName){
|
DatabaseConnector conn = getConnector(address);
|
checkSchema(address.getServiceName());
|
return loadFieldsObject(conn, tableName);
|
}
|
|
/**
|
* 加载某个表的所有字段名字集合
|
// * @param schema
|
* @param tableName 表名
|
* @return 字段名集合
|
*/
|
protected abstract List<FieldInfo> loadFieldsObject(DatabaseConnector connector, String tableName);
|
|
|
@Override
|
public List<String> getFields(Address address, String tableName){
|
DatabaseConnector conn = getConnector(address);
|
checkSchema(address.getServiceName());
|
return loadFields(conn, tableName);
|
}
|
|
@Override
|
public int getTableSize(Address address) {
|
DatabaseConnector conn = getConnector(address);
|
// 注意:此地不应当直接引用Mysql相关信息,这是抽象类,后续优化
|
checkSchema(address.getServiceName());
|
return loadSchemaTableSize(conn);
|
}
|
|
@Override
|
public Map<String, TableInfo> getTableRows(Address address, List<String> tableNameList){
|
DatabaseConnector conn = getConnector(address);
|
checkSchema(address.getServiceName());
|
return this.loadTablesRow(address, conn, tableNameList);
|
}
|
|
@Override
|
public long getTableRow(Address address, String tableName){
|
DatabaseConnector conn = getConnector(address);
|
checkSchema(address.getServiceName());
|
return this.loadTableRow(conn, tableName);
|
}
|
|
@Override
|
public List<Map<String, Object>> loadTableDatas(Address address
|
, String tableName, String sql){
|
DatabaseConnector conn = getConnector(address);
|
checkSchema(address.getServiceName());
|
try {
|
return this.loadDatas(conn, tableName, sql);
|
} catch (Exception e) {
|
e.printStackTrace();
|
return null;
|
}
|
}
|
|
protected DatabaseConnector getConnector(Address address){
|
// DatabaseConnector conn = connectors.get(address);
|
// if(conn == null){
|
// conn = createDbConnector(address);
|
// if(conn == null){
|
// throw new UnsupportedOperationException("请实现方法'createDbConnector()'!");
|
// }
|
// connectors.putIfAbsent(address, conn);
|
// }
|
// return conn;
|
if(this.connector == null){
|
this.connector = createDbConnector(address);
|
if(this.connector == null){
|
throw new UnsupportedOperationException("请实现方法'createDbConnector()'!");
|
}
|
}
|
return this.connector;
|
}
|
|
private void checkSchema(String schema){
|
if(schema == null)
|
throw new IllegalStateException("数据库名字不存在,无法执行元数据查询");
|
}
|
|
@Override
|
public void initialize() {
|
|
}
|
|
@Override
|
public void destroy() {
|
// for(DatabaseConnector conn : connectors.values()){
|
// conn.destroy();
|
// }
|
if(this.connector != null){
|
this.connector.destroy();
|
}
|
}
|
|
/**
|
* 根据不同数据库,创建不同的连接对象
|
* @param address
|
* @return
|
*/
|
protected abstract DatabaseConnector createDbConnector(Address address);
|
|
/**
|
* 加载给定数据库中存在多少个用户表数量
|
* @param connnector
|
* @return
|
*/
|
protected abstract int loadSchemaTableSize(DatabaseConnector connnector);
|
|
/**
|
* 加载某个表的所有字段名字集合
|
// * @param schema
|
* @param tableName 表名
|
* @return 字段名集合
|
*/
|
protected abstract List<String> loadFields(DatabaseConnector connector, String tableName);
|
|
/**
|
* 加载给定表名集合,有多少条数据信息
|
* @param connector
|
* @param tableNameList
|
* @return
|
*/
|
protected abstract Map<String, TableInfo> loadTablesRow(Address address, DatabaseConnector connector, List<String> tableNameList);
|
|
protected abstract long loadTableRow(DatabaseConnector connector, String tableName);
|
|
protected abstract List<Map<String, Object>> loadDatas(DatabaseConnector connector
|
, String tableName, String sql) throws Exception;
|
|
public List<String> getTableNamesByLike(Address address, String tableNameLike) {
|
return null;
|
}
|
|
public void createTableDynamic(Address address
|
, List<Map<String, Object>> datas, String dataVersionField, String tableName) throws DatabaseException {
|
if(StringUtils.isEmptyList(datas)){
|
logger.warn("未找到任何数据集合,无法动态创建表结构。tableName = " + tableName);
|
return;
|
}
|
if(StringUtils.isEmpty(tableName)){
|
throw new DatabaseException("未提供表名,无法动态创建表结构");
|
}
|
tableName = tableName.toLowerCase();
|
|
// 搜索字段类型
|
List<FieldInfo> fieldList = this.doAquireFieldList(datas.get(0), tableName);
|
if(StringUtils.isEmptyList(fieldList)){
|
logger.warn("this.doAquireFieldList()返回的字段集合为空,不创建表");
|
return;
|
}
|
|
// 检查数据版本字段是否数值类型(如果存在)
|
if(StringUtils.isNotEmpty(dataVersionField)){
|
if(!DatabaseMetaEngineUtils.isNumberField(fieldList, dataVersionField)){
|
throw new DatabaseException("数据版本字段必须是long类型:" + dataVersionField);
|
}
|
} else {
|
logger.info("dataVersionField字段不存在,无法增量采集,只能全量更新。table = " + tableName);
|
}
|
|
// 调用子类,创建表结构ddl(或者nosql操作)
|
this.doCreateTableAction(address, fieldList, dataVersionField, tableName);
|
}
|
|
private List<FieldInfo> doAquireFieldList(Map<String, Object> data, String tableName) throws DatabaseException{
|
if(data.size() == 0){
|
return null;
|
}
|
|
List<FieldInfo> fieldList = new ArrayList<>();
|
// Map<String, FieldInfo> cachedFieldMap = new HashMap<>();
|
FieldInfo fi = null;
|
for(Map.Entry<String, Object> entry : data.entrySet()){
|
fi = DatabaseMetaEngineUtils.getFieldInfo(entry.getKey(), entry.getValue(), tableName);
|
if(fi == null){
|
throw new DatabaseException("fieldInfo对象创建失败:" + entry.getKey() + ", table = " + tableName);
|
}
|
fieldList.add(fi);
|
}
|
|
return fieldList;
|
}
|
|
/**
|
* 具体创建表结构过程,由子类实现,如:关系数据库ddl,nosql数据库的schema等
|
* @param fieldList 字段集合
|
* @param dataVersionField 数据版本字段名称(一定是数值)
|
* @param tableName 表名
|
* @throws DatabaseException 抛出检查异常
|
*/
|
protected abstract void doCreateTableAction(Address address
|
, List<FieldInfo> fieldList
|
, String dataVersionField, String tableName) throws DatabaseException;
|
|
@Override
|
public DatabaseConnector getConnector() {
|
return connector;
|
}
|
|
@Override
|
public void setConnector(DatabaseConnector connector) {
|
if(this.connector != null){
|
throw new IllegalStateException("this.connector已经存在,不能重复设置");
|
}
|
this.connector = connector;
|
}
|
}
|