shikeying
2024-01-11 3b67e947e36133e2a40eb2737b15ea375e157ea0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
package com.walker.store.support;
 
import com.walker.connector.Address;
import com.walker.connector.support.DatabaseConnector;
import com.walker.connector.util.ConnectorUtils;
import com.walker.db.DatabaseException;
import com.walker.db.DatabaseExistException;
import com.walker.db.DatabaseType;
import com.walker.dbmeta.util.DatabaseUtils;
import com.walker.infrastructure.utils.StringUtils;
import com.walker.store.AbstractStore;
import com.walker.store.StoreStrategy;
import com.walker.store.strategy.DatabaseStoreStrategy;
 
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
 
public abstract class DatabaseStore extends AbstractStore {
 
    /* 多库情况下执行连接器,key=Address, value=DatabaseConnector */
    private ConcurrentHashMap<Address, DatabaseConnector> cachedConnectors = new ConcurrentHashMap<Address, DatabaseConnector>(8);
    
    /* 管理用的连接器缓存,key=ip.hashCode + port, value=DatabaseConnector */
    private ConcurrentHashMap<Integer, DatabaseConnector> manageConnectors = new ConcurrentHashMap<Integer, DatabaseConnector>(8);
    
    private DatabaseType databaseType = DatabaseType.MYSQL;
    
    // 创建数据库、表结构时引用锁
    private final Object _lock = new Object();
    
    private boolean transaction = true; // 是否支持数据库事务
    
    // 是否支持数据更新
    private boolean supportUpdate = false;
    
    /**
     * 存储在写入时,是否支持数据更新操作。</p>
     * 如果支持,就意味着,需要先检查目标数据是否存在,如果存在要根据主键更新
     * @return
     */
    public boolean isSupportUpdate() {
        return supportUpdate;
    }
 
    public void setSupportUpdate(boolean supportUpdate) {
        this.supportUpdate = supportUpdate;
    }
 
    public boolean isTransaction() {
        return transaction;
    }
 
    public void setTransaction(boolean transaction) {
        this.transaction = transaction;
    }
 
    public void setDatabaseType(DatabaseType databaseType) {
        this.databaseType = databaseType;
    }
 
    @Override
    public void initialize() {
        super.initialize();
        this.doPrepare();
    }
    
    private void doPrepare(){
        StoreStrategy repository = (StoreStrategy)this.storeStrategy;
        List<Address> addresses = repository.getDefineAddressList();
        if(addresses == null || addresses.size() == 0){
            throw new IllegalStateException("请先调用setRepository()方法");
        }
        
        if(databaseType == DatabaseType.MYSQL){
            // 初始化已经存在的数据库连接器
//            Repository repo = this.getMetaDataEngine().getExistRepository(getId());
//            if(repo != null){
//                existAddressList = repo.getAddressList();
//            }
        } else if(databaseType == DatabaseType.POSTGRES){
            logger.info("存储使用了postgreSQL数据库");
            
        } else if(databaseType == DatabaseType.ORACLE){
            throw new UnsupportedOperationException("当前不支持oracle存储方式");
        } else {
            throw new UnsupportedOperationException("databaseType = " + databaseType);
        }
    }
    
    @Override
    public void destroy() {
        super.destroy();
        for(DatabaseConnector conn : cachedConnectors.values()){
            conn.destroy();
        }
        for(DatabaseConnector conn : manageConnectors.values()){
            conn.destroy();
        }
        cachedConnectors.clear();
        manageConnectors.clear();
    }
    
    /**
     * 返回当前存储正在使用的数据库连接器对象<code>DatabaseConnector</code>。</p>
     * 如果还没有创建会返回<code>null</code><br>
//     * @param storeId
     * @return
     */
    public DatabaseConnector getCurrentStoreConnector(){
        Address address = this.getMetaDataEngine().getUsingAddress(this.getId());
        if(address == null){
            logger.warn("当前存储缓存的address对象不存在,可能是第一次执行还没有创建。storeId = " + this.getId() + ", " + this.getDescription());
            return null;
        }
//        return this.cachedConnectors.get(address);
        return this.doCreateConnector(address);
    }
    
    @Override
    public void write(String srcName, String createTableSQL
            , Object parameter, List<Object> datas) throws Exception {
        if(StringUtils.isEmpty(srcName)){
            throw new IllegalArgumentException("必须提供写入的原始表名:srcName!");
        }
        
        /* 注意:此处必须顺序执行,先保证库、表选择正确,后面的写数据可以并发 */
        Address addr = null;
        String destTableName = null;
        DatabaseConnector connector = null;
        
        synchronized (_lock) {
            DatabaseStoreStrategy strategy = (DatabaseStoreStrategy)this.storeStrategy;
            addr = strategy.getDatabaseAddress(parameter);
            // 处理数据库
            this.doCheckDatabase(addr);
            // 处理表
            destTableName = strategy.getTableName(addr, srcName, parameter);
            connector = this.doCheckTable(addr, destTableName
                    , createTableSQL.trim().toLowerCase(), parameter);
        }
        
        // 向表中写入数据
        // 1、找出原始表的字段信息
        List<String> fields = this.getMetaDataEngine().getFields(addr, destTableName);
        if(StringUtils.isEmptyList(fields)){
            throw new DatabaseException("未找到表字段信息,无法执行写入SQL语句。tableName = " + destTableName);
        }
        // 2、把数据与字段信息拼接成sql语句执行入库
        if(datas != null){
            this.onWriteDataToDB(addr, connector, fields, destTableName, datas, srcName, parameter);
        }
    }
    
//    /**
//     * 返回一个数据库连接器对象
//     * @param addr 数据库连接地址
//     * @param manage 是否管理连接,管理连接没有数据库名
//     * @return
//     */
//    private DatabaseConnector getDbConnectorByType(Address addr, boolean manage){
//        if(databaseType == DatabaseType.MYSQL){
//            if(manage){
//                return ConnectorUtils.createMySQLManageConnector(addr);
//            } else {
//                return ConnectorUtils.createMySQLConnector(addr);
//            }
//        } else if(databaseType == DatabaseType.POSTGRES){
//            if(manage){
//                return ConnectorUtils.createPostgresManageConnector(addr);
//            } else {
//                return ConnectorUtils.createPostgresConnector(addr);
//            }
//        } else if(databaseType == DatabaseType.ORACLE){
//            if(manage){
//                return ConnectorUtils.createOracleManageConnector(addr);
//            } else {
//                return ConnectorUtils.createOracleConnector(addr);
//            }
//        } else {
//            throw new UnsupportedOperationException("未实现对其他数据库的支持:DatabaseConnector");
//        }
//    }
    
    private DatabaseConnector doCheckTable(Address addr
            , String destTableName, String createSQL, Object parameter) 
            throws DatabaseException{
        if(StringUtils.isEmpty(destTableName)){
            throw new IllegalArgumentException("策略生成的表名不存在,无法继续执行");
        }
        
        DatabaseConnector connector = doCreateConnector(addr);
        
        if(!this.getMetaDataEngine().isExistTable(getId(), addr, destTableName)){
            logger.debug("元数据中没有表,需要自动创建:" + destTableName);
            
            // 找出sql中表名,如果与给定的目的表名不一致,要用目的表名替换
            if(StringUtils.isEmpty(createSQL)){
                throw new IllegalArgumentException("SQL语句中未找到表名,destTableName = " + destTableName);
            }
            
            // 处理给定的SQL语句,因为里面可能会存在多个,包括:创建表结构和索引的!
            String[] sqlList = createSQL.split(StringUtils.SEPARATOR_SEMI_COLON);
            if(sqlList == null || sqlList.length == 0){
                throw new DatabaseException("存储引擎未发现任何创建表或者索引的SQL语句,无法继续执行写入数据任务。createSQL = " + createSQL + ", id = " + this.getId());
            }
            for(String sql : sqlList){
                sql = sql.trim(); // 去掉回车、换行、空格等
                if(DatabaseUtils.isCreateTableSQL(sql)){
                    String tname = DatabaseUtils.findTableNameInSQL(sql);
                    if(!tname.equals(destTableName)){
                        sql = sql.replace(tname, destTableName);
//                        logger.debug("替换后的SQL = " + sql);
                    }
                    connector.exeCreateTable(sql);
                    
                } else if(DatabaseUtils.isCreateIndexSQL(sql)){
                    String iname = DatabaseUtils.findIndexNameInSQL(sql);
                    // 重用生成表名的策略来生成索引名字,它两个其实一样,不想再加入新接口了。
                    String indexName = ((DatabaseStoreStrategy)this.storeStrategy).getTableName(addr, iname, parameter);
                    if(!iname.equals(indexName)){
                        // 替换索引名字
                        sql = sql.replace(iname, indexName);
                        // 替换索引中表名
                        sql = sql.replace(DatabaseUtils.findTableNameInIndex(sql), destTableName);
//                        logger.debug("替换后的索引SQL = " + sql);
                    }
                    connector.exeCreateTable(sql);
                }
            }
            
            // 通过引擎保存表的元数据
            this.getMetaDataEngine().saveNewTable(getId(), addr, destTableName);
        }
        return connector;
    }
    
    private DatabaseConnector doCreateConnector(Address addr){
        DatabaseConnector connector = cachedConnectors.get(addr);
        if(connector == null){
            connector = ConnectorUtils.getDbConnectorByType(addr, false, this.databaseType);
            if(transaction){
                connector = ConnectorUtils.acquireTransactionProxyConnector(connector);
                logger.debug("创建支持事务的数据库连接对象:" + connector);
            }
            cachedConnectors.put(addr, connector);
        }
        return connector;
    }
    
//    private DatabaseConnector aquireTransactionProxyFactoryBean(DatabaseConnector connector){
//        PlatformTransactionManager transactionManager = new DataSourceTransactionManager(connector.getDataSource());
//        
//        // 定义事务规则
//        Properties transactionAttributes = new Properties();
//        transactionAttributes.put("query*", "PROPAGATION_SUPPORTS,readOnly");
//        transactionAttributes.put("exec*", "PROPAGATION_REQUIRED, -Exception");
//        
//        TransactionProxyFactoryBean transactionProxy = new TransactionProxyFactoryBean();
//        transactionProxy.setTransactionManager(transactionManager);
//        transactionProxy.setTransactionAttributes(transactionAttributes);
//        transactionProxy.setProxyTargetClass(true);
//        transactionProxy.setTarget(connector);
//        transactionProxy.afterPropertiesSet();
//        return (DatabaseConnector)transactionProxy.getObject();
//    }
    
    /**
     * 检查要写入的数据库是否已经正确保存到元数据中。<br>
     * 该方法会自动创建数据库,如果出现'数据库已存在'之外的其他异常将会抛出。
     * @param addr
     * @throws DatabaseException
     */
    private void doCheckDatabase(Address addr) throws DatabaseException{
        if(!this.getMetaDataEngine().isExistDatabase(this.getId(), addr)){
            logger.info("元数据中没有数据库:" + addr);
            DatabaseConnector connector = doCreateManageConnector(addr);
            try {
                connector.exeCreateDatabase(addr.getServiceName());
                // 创建完数据库,间隔一段时间,否则并发打会导致多数据库无法创建
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (DatabaseException e) {
                if(e instanceof DatabaseExistException){
                    logger.info("数据库已经存在,不再创建:" + addr.getServiceName());
                } else {
                    throw e;
                }
            } catch(InterruptedException ex){
                
            }
            
            // 调用引擎,保存该数据库到元数据库中
            this.getMetaDataEngine().saveNewAddress(getId(), addr);
        }
    }
    
    private DatabaseConnector doCreateManageConnector(Address addr){
        int hashCode = addr.getUrl().hashCode() + addr.getPort();
        DatabaseConnector connector = manageConnectors.get(hashCode);
        if(connector == null){
            connector = ConnectorUtils.getDbConnectorByType(addr, true, this.databaseType);
            manageConnectors.put(hashCode, connector);
        }
        return connector;
    }
    
    /**
     * 写入数据到对应的字段中,然后通过connector对象提交数据。
     * @param addr 连接数据库地址对象
     * @param connector
     * @param fields 写入的字段元数据
     * @param destTableName 要写入的目的表名
     * @param datas
     * @param srcName 源表名字
     * @param parameter 业务传递参数,如:customerCode
     * @return
     * @throws DatabaseException
     */
    protected abstract int onWriteDataToDB(Address addr, DatabaseConnector connector
            , List<String> fields
            , String destTableName, List<Object> datas
            , String srcName, Object parameter) throws DatabaseException;
    
}