shikeying
2024-01-11 3b67e947e36133e2a40eb2737b15ea375e157ea0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
package com.walker.dbmeta;
 
import com.walker.connector.Address;
import com.walker.connector.support.DatabaseConnector;
import com.walker.db.DatabaseException;
import com.walker.db.TableInfo;
import com.walker.dbmeta.util.DatabaseMetaEngineUtils;
import com.walker.infrastructure.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
 
/**
 * 抽象的数据库元数据引擎定义。</p>
 * 该引擎仅作为数据库元数据管理,与系统元数据有区别。<br>
 * 系统元数据包含了对于业务表和库的管理,不包含具体数据库的详细信息,他也需要此引擎。
 * @author shikeying
 * @date 2015年12月21日
 *
 *</p>
 * 修改对象,因为每个引擎对象只会有一个<code>DatabaseConnector</code>引用,所以去掉connector集合对象,<br>
 * 保持单例状态。
 * @category 时克英
 * @date 2019-05-16
 */
public abstract class AbstractDatabaseMetaEngine implements DatabaseMetaEngine {
 
    protected final transient Logger logger = LoggerFactory.getLogger(getClass());
 
//    /* 数据库连接器缓存,key=Address, value=DatabaseConnector */
//    private ConcurrentHashMap<Address, DatabaseConnector> connectors = new ConcurrentHashMap<Address, DatabaseConnector>(8);
 
    private DatabaseConnector connector = null;
 
    /**
     * 返回表中字段对象集合
     * @param address
     * @param tableName
     * @return
     */
    public List<FieldInfo> getFieldsObject(Address address, String tableName){
        DatabaseConnector conn = getConnector(address);
        checkSchema(address.getServiceName());
        return loadFieldsObject(conn, tableName);
    }
 
    /**
     * 加载某个表的所有字段名字集合
//     * @param schema
     * @param tableName 表名
     * @return 字段名集合
     */
    protected abstract List<FieldInfo> loadFieldsObject(DatabaseConnector connector, String tableName);
 
 
    @Override
    public List<String> getFields(Address address, String tableName){
        DatabaseConnector conn = getConnector(address);
        checkSchema(address.getServiceName());
        return loadFields(conn, tableName);
    }
 
    @Override
    public int getTableSize(Address address) {
        DatabaseConnector conn = getConnector(address);
        // 注意:此地不应当直接引用Mysql相关信息,这是抽象类,后续优化
        checkSchema(address.getServiceName());
        return loadSchemaTableSize(conn);
    }
 
    @Override
    public Map<String, TableInfo> getTableRows(Address address, List<String> tableNameList){
        DatabaseConnector conn = getConnector(address);
        checkSchema(address.getServiceName());
        return this.loadTablesRow(address, conn, tableNameList);
    }
 
    @Override
    public long getTableRow(Address address, String tableName){
        DatabaseConnector conn = getConnector(address);
        checkSchema(address.getServiceName());
        return this.loadTableRow(conn, tableName);
    }
 
    @Override
    public List<Map<String, Object>> loadTableDatas(Address address
            , String tableName, String sql){
        DatabaseConnector conn = getConnector(address);
        checkSchema(address.getServiceName());
        try {
            return this.loadDatas(conn, tableName, sql);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }
 
    protected DatabaseConnector getConnector(Address address){
//        DatabaseConnector conn = connectors.get(address);
//        if(conn == null){
//            conn = createDbConnector(address);
//            if(conn == null){
//                throw new UnsupportedOperationException("请实现方法'createDbConnector()'!");
//            }
//            connectors.putIfAbsent(address, conn);
//        }
//        return conn;
        if(this.connector == null){
            this.connector = createDbConnector(address);
            if(this.connector == null){
                throw new UnsupportedOperationException("请实现方法'createDbConnector()'!");
            }
        }
        return this.connector;
    }
 
    private void checkSchema(String schema){
        if(schema == null)
            throw new IllegalStateException("数据库名字不存在,无法执行元数据查询");
    }
 
    @Override
    public void initialize() {
 
    }
 
    @Override
    public void destroy() {
//        for(DatabaseConnector conn : connectors.values()){
//            conn.destroy();
//        }
        if(this.connector != null){
            this.connector.destroy();
        }
    }
 
    /**
     * 根据不同数据库,创建不同的连接对象
     * @param address
     * @return
     */
    protected abstract DatabaseConnector createDbConnector(Address address);
 
    /**
     * 加载给定数据库中存在多少个用户表数量
     * @param connnector
     * @return
     */
    protected abstract int loadSchemaTableSize(DatabaseConnector connnector);
 
    /**
     * 加载某个表的所有字段名字集合
//     * @param schema
     * @param tableName 表名
     * @return 字段名集合
     */
    protected abstract List<String> loadFields(DatabaseConnector connector, String tableName);
 
    /**
     * 加载给定表名集合,有多少条数据信息
     * @param connector
     * @param tableNameList
     * @return
     */
    protected abstract Map<String, TableInfo> loadTablesRow(Address address, DatabaseConnector connector, List<String> tableNameList);
 
    protected abstract long loadTableRow(DatabaseConnector connector, String tableName);
 
    protected abstract List<Map<String, Object>> loadDatas(DatabaseConnector connector
            , String tableName, String sql) throws Exception;
 
    public List<String> getTableNamesByLike(Address address, String tableNameLike) {
        return null;
    }
 
    public void createTableDynamic(Address address
            , List<Map<String, Object>> datas, String dataVersionField, String tableName) throws DatabaseException {
        if(StringUtils.isEmptyList(datas)){
            logger.warn("未找到任何数据集合,无法动态创建表结构。tableName = " + tableName);
            return;
        }
        if(StringUtils.isEmpty(tableName)){
            throw new DatabaseException("未提供表名,无法动态创建表结构");
        }
        tableName = tableName.toLowerCase();
 
        // 搜索字段类型
        List<FieldInfo> fieldList = this.doAquireFieldList(datas.get(0), tableName);
        if(StringUtils.isEmptyList(fieldList)){
            logger.warn("this.doAquireFieldList()返回的字段集合为空,不创建表");
            return;
        }
 
        // 检查数据版本字段是否数值类型(如果存在)
        if(StringUtils.isNotEmpty(dataVersionField)){
            if(!DatabaseMetaEngineUtils.isNumberField(fieldList, dataVersionField)){
                throw new DatabaseException("数据版本字段必须是long类型:" + dataVersionField);
            }
        } else {
            logger.info("dataVersionField字段不存在,无法增量采集,只能全量更新。table = " + tableName);
        }
 
        // 调用子类,创建表结构ddl(或者nosql操作)
        this.doCreateTableAction(address, fieldList, dataVersionField, tableName);
    }
 
    private List<FieldInfo> doAquireFieldList(Map<String, Object> data, String tableName) throws DatabaseException{
        if(data.size() == 0){
            return null;
        }
 
        List<FieldInfo> fieldList = new ArrayList<>();
//        Map<String, FieldInfo> cachedFieldMap = new HashMap<>();
        FieldInfo fi = null;
        for(Map.Entry<String, Object> entry : data.entrySet()){
            fi = DatabaseMetaEngineUtils.getFieldInfo(entry.getKey(), entry.getValue(), tableName);
            if(fi == null){
                throw new DatabaseException("fieldInfo对象创建失败:" + entry.getKey() + ", table = " + tableName);
            }
            fieldList.add(fi);
        }
 
        return fieldList;
    }
 
    /**
     * 具体创建表结构过程,由子类实现,如:关系数据库ddl,nosql数据库的schema等
     * @param fieldList 字段集合
     * @param dataVersionField 数据版本字段名称(一定是数值)
     * @param tableName 表名
     * @throws DatabaseException 抛出检查异常
     */
    protected abstract void doCreateTableAction(Address address
            , List<FieldInfo> fieldList
            , String dataVersionField, String tableName) throws DatabaseException;
 
    @Override
    public DatabaseConnector getConnector() {
        return connector;
    }
 
    @Override
    public void setConnector(DatabaseConnector connector) {
        if(this.connector != null){
            throw new IllegalStateException("this.connector已经存在,不能重复设置");
        }
        this.connector = connector;
    }
}