package com.iplatform.gather;
|
|
import com.walker.connector.Address;
|
import com.walker.connector.support.DatabaseConnector;
|
import com.walker.store.strategy.DatabaseStoreStrategy;
|
import com.walker.store.support.DatabaseStore;
|
|
/**
|
* 支持增量采集的多表采集(数据库)调度器实现。
|
* @author 时克英
|
* @date 2022-09-12
|
*/
|
public abstract class IncreaseMultiTableScheduler extends MultiTableStoreScheduler{
|
|
public IncreaseMultiTableScheduler(int id, String name, DatabaseStore store){
|
super(id, name, store);
|
}
|
|
@Override
|
protected Long acquireGatheredMaxVersion() {
|
Long verValue = null;
|
DatabaseStore databaseStore = (DatabaseStore) this.getStore();
|
DatabaseConnector connector = databaseStore.getCurrentStoreConnector();
|
if(connector == null){
|
throw new IllegalArgumentException("IncreaseMultiTableScheduler 未设置: DatabaseStore");
|
}
|
String destTableName = ((DatabaseStoreStrategy)this.getStore().getStoreStrategy()).getTableName(null
|
, this.getCurrentTableInfo().getTableName(), this.getCurrentTableInfo().getCustomerCode());
|
logger.debug("查找表数据最大版本号,存在可用连接器,查找表名:" + destTableName);
|
|
if(this.isCheckDestTableExist()){
|
// 因为在切换新的表采集时,可能还没有创建该表,因此需要先判断是否存在
|
Address addr = new Address();
|
addr.setUrl(connector.getUrl());
|
addr.setPort(connector.getPort());
|
addr.setService(connector.getServiceName());
|
|
boolean existQueryTable = this.getStore().getMetaDataEngine().isExistTable(this.getStoreId(), addr, destTableName);
|
if(!existQueryTable){
|
logger.debug("----------> 不存在表,不能查找最大版本号,默认version = 0,table = " + destTableName);
|
return Long.valueOf(0);
|
}
|
}
|
|
StringBuilder sql = new StringBuilder("select max(");
|
sql.append(this.getCurrentTableInfo().getVersionName());
|
sql.append(") verValue from ");
|
sql.append(destTableName);
|
verValue = connector.queryForLong(sql.toString(), null);
|
logger.debug("找到业务表版本号最大值,verValue = " + verValue);
|
return verValue;
|
}
|
}
|