shikeyin
2024-01-11 65da8373531677b1c37a98f53eaa30c892f35e5a
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package com.iplatform.gather;
 
import com.walker.connector.Address;
import com.walker.connector.support.DatabaseConnector;
import com.walker.store.strategy.DatabaseStoreStrategy;
import com.walker.store.support.DatabaseStore;
 
/**
 * 支持增量采集的多表采集(数据库)调度器实现。
 * @author 时克英
 * @date 2022-09-12
 */
public abstract class IncreaseMultiTableScheduler extends MultiTableStoreScheduler{
 
    public IncreaseMultiTableScheduler(int id, String name, DatabaseStore store){
        super(id, name, store);
    }
 
    @Override
    protected Long acquireGatheredMaxVersion() {
        Long verValue = null;
        DatabaseStore databaseStore = (DatabaseStore) this.getStore();
        DatabaseConnector connector = databaseStore.getCurrentStoreConnector();
        if(connector == null){
            throw new IllegalArgumentException("IncreaseMultiTableScheduler 未设置: DatabaseStore");
        }
        String destTableName = ((DatabaseStoreStrategy)this.getStore().getStoreStrategy()).getTableName(null
                , this.getCurrentTableInfo().getTableName(), this.getCurrentTableInfo().getCustomerCode());
        logger.debug("查找表数据最大版本号,存在可用连接器,查找表名:" + destTableName);
 
        if(this.isCheckDestTableExist()){
            // 因为在切换新的表采集时,可能还没有创建该表,因此需要先判断是否存在
            Address addr = new Address();
            addr.setUrl(connector.getUrl());
            addr.setPort(connector.getPort());
            addr.setService(connector.getServiceName());
 
            boolean existQueryTable = this.getStore().getMetaDataEngine().isExistTable(this.getStoreId(), addr, destTableName);
            if(!existQueryTable){
                logger.debug("----------> 不存在表,不能查找最大版本号,默认version = 0,table = " + destTableName);
                return Long.valueOf(0);
            }
        }
 
        StringBuilder sql = new StringBuilder("select max(");
        sql.append(this.getCurrentTableInfo().getVersionName());
        sql.append(") verValue from ");
        sql.append(destTableName);
        verValue = connector.queryForLong(sql.toString(), null);
        logger.debug("找到业务表版本号最大值,verValue = " + verValue);
        return verValue;
    }
}