package com.walker.dbmeta; import com.walker.connector.Address; import com.walker.connector.support.DatabaseConnector; import com.walker.db.DatabaseException; import com.walker.db.TableInfo; import com.walker.dbmeta.util.DatabaseMetaEngineUtils; import com.walker.infrastructure.utils.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; import java.util.Map; /** * 抽象的数据库元数据引擎定义。

* 该引擎仅作为数据库元数据管理,与系统元数据有区别。
* 系统元数据包含了对于业务表和库的管理,不包含具体数据库的详细信息,他也需要此引擎。 * @author shikeying * @date 2015年12月21日 * *

* 修改对象,因为每个引擎对象只会有一个DatabaseConnector引用,所以去掉connector集合对象,
* 保持单例状态。 * @category 时克英 * @date 2019-05-16 */ public abstract class AbstractDatabaseMetaEngine implements DatabaseMetaEngine { protected final transient Logger logger = LoggerFactory.getLogger(getClass()); // /* 数据库连接器缓存,key=Address, value=DatabaseConnector */ // private ConcurrentHashMap connectors = new ConcurrentHashMap(8); private DatabaseConnector connector = null; /** * 返回表中字段对象集合 * @param address * @param tableName * @return */ public List getFieldsObject(Address address, String tableName){ DatabaseConnector conn = getConnector(address); checkSchema(address.getServiceName()); return loadFieldsObject(conn, tableName); } /** * 加载某个表的所有字段名字集合 // * @param schema * @param tableName 表名 * @return 字段名集合 */ protected abstract List loadFieldsObject(DatabaseConnector connector, String tableName); @Override public List getFields(Address address, String tableName){ DatabaseConnector conn = getConnector(address); checkSchema(address.getServiceName()); return loadFields(conn, tableName); } @Override public int getTableSize(Address address) { DatabaseConnector conn = getConnector(address); // 注意:此地不应当直接引用Mysql相关信息,这是抽象类,后续优化 checkSchema(address.getServiceName()); return loadSchemaTableSize(conn); } @Override public Map getTableRows(Address address, List tableNameList){ DatabaseConnector conn = getConnector(address); checkSchema(address.getServiceName()); return this.loadTablesRow(address, conn, tableNameList); } @Override public long getTableRow(Address address, String tableName){ DatabaseConnector conn = getConnector(address); checkSchema(address.getServiceName()); return this.loadTableRow(conn, tableName); } @Override public List> loadTableDatas(Address address , String tableName, String sql){ DatabaseConnector conn = getConnector(address); checkSchema(address.getServiceName()); try { return this.loadDatas(conn, tableName, sql); } catch (Exception e) { e.printStackTrace(); return null; } } protected DatabaseConnector getConnector(Address address){ // DatabaseConnector conn = connectors.get(address); // if(conn == null){ // conn = createDbConnector(address); // if(conn == null){ // throw new UnsupportedOperationException("请实现方法'createDbConnector()'!"); // } // connectors.putIfAbsent(address, conn); // } // return conn; if(this.connector == null){ this.connector = createDbConnector(address); if(this.connector == null){ throw new UnsupportedOperationException("请实现方法'createDbConnector()'!"); } } return this.connector; } private void checkSchema(String schema){ if(schema == null) throw new IllegalStateException("数据库名字不存在,无法执行元数据查询"); } @Override public void initialize() { } @Override public void destroy() { // for(DatabaseConnector conn : connectors.values()){ // conn.destroy(); // } if(this.connector != null){ this.connector.destroy(); } } /** * 根据不同数据库,创建不同的连接对象 * @param address * @return */ protected abstract DatabaseConnector createDbConnector(Address address); /** * 加载给定数据库中存在多少个用户表数量 * @param connnector * @return */ protected abstract int loadSchemaTableSize(DatabaseConnector connnector); /** * 加载某个表的所有字段名字集合 // * @param schema * @param tableName 表名 * @return 字段名集合 */ protected abstract List loadFields(DatabaseConnector connector, String tableName); /** * 加载给定表名集合,有多少条数据信息 * @param connector * @param tableNameList * @return */ protected abstract Map loadTablesRow(Address address, DatabaseConnector connector, List tableNameList); protected abstract long loadTableRow(DatabaseConnector connector, String tableName); protected abstract List> loadDatas(DatabaseConnector connector , String tableName, String sql) throws Exception; public List getTableNamesByLike(Address address, String tableNameLike) { return null; } public void createTableDynamic(Address address , List> datas, String dataVersionField, String tableName) throws DatabaseException { if(StringUtils.isEmptyList(datas)){ logger.warn("未找到任何数据集合,无法动态创建表结构。tableName = " + tableName); return; } if(StringUtils.isEmpty(tableName)){ throw new DatabaseException("未提供表名,无法动态创建表结构"); } tableName = tableName.toLowerCase(); // 搜索字段类型 List fieldList = this.doAquireFieldList(datas.get(0), tableName); if(StringUtils.isEmptyList(fieldList)){ logger.warn("this.doAquireFieldList()返回的字段集合为空,不创建表"); return; } // 检查数据版本字段是否数值类型(如果存在) if(StringUtils.isNotEmpty(dataVersionField)){ if(!DatabaseMetaEngineUtils.isNumberField(fieldList, dataVersionField)){ throw new DatabaseException("数据版本字段必须是long类型:" + dataVersionField); } } else { logger.info("dataVersionField字段不存在,无法增量采集,只能全量更新。table = " + tableName); } // 调用子类,创建表结构ddl(或者nosql操作) this.doCreateTableAction(address, fieldList, dataVersionField, tableName); } private List doAquireFieldList(Map data, String tableName) throws DatabaseException{ if(data.size() == 0){ return null; } List fieldList = new ArrayList<>(); // Map cachedFieldMap = new HashMap<>(); FieldInfo fi = null; for(Map.Entry entry : data.entrySet()){ fi = DatabaseMetaEngineUtils.getFieldInfo(entry.getKey(), entry.getValue(), tableName); if(fi == null){ throw new DatabaseException("fieldInfo对象创建失败:" + entry.getKey() + ", table = " + tableName); } fieldList.add(fi); } return fieldList; } /** * 具体创建表结构过程,由子类实现,如:关系数据库ddl,nosql数据库的schema等 * @param fieldList 字段集合 * @param dataVersionField 数据版本字段名称(一定是数值) * @param tableName 表名 * @throws DatabaseException 抛出检查异常 */ protected abstract void doCreateTableAction(Address address , List fieldList , String dataVersionField, String tableName) throws DatabaseException; @Override public DatabaseConnector getConnector() { return connector; } @Override public void setConnector(DatabaseConnector connector) { if(this.connector != null){ throw new IllegalStateException("this.connector已经存在,不能重复设置"); } this.connector = connector; } }