package com.iplatform.gather; import com.walker.connector.Address; import com.walker.connector.support.DatabaseConnector; import com.walker.store.strategy.DatabaseStoreStrategy; import com.walker.store.support.DatabaseStore; /** * 支持增量采集的多表采集(数据库)调度器实现。 * @author 时克英 * @date 2022-09-12 */ public abstract class IncreaseMultiTableScheduler extends MultiTableStoreScheduler{ public IncreaseMultiTableScheduler(int id, String name, DatabaseStore store){ super(id, name, store); } @Override protected Long acquireGatheredMaxVersion() { Long verValue = null; DatabaseStore databaseStore = (DatabaseStore) this.getStore(); DatabaseConnector connector = databaseStore.getCurrentStoreConnector(); if(connector == null){ throw new IllegalArgumentException("IncreaseMultiTableScheduler 未设置: DatabaseStore"); } String destTableName = ((DatabaseStoreStrategy)this.getStore().getStoreStrategy()).getTableName(null , this.getCurrentTableInfo().getTableName(), this.getCurrentTableInfo().getCustomerCode()); logger.debug("查找表数据最大版本号,存在可用连接器,查找表名:" + destTableName); if(this.isCheckDestTableExist()){ // 因为在切换新的表采集时,可能还没有创建该表,因此需要先判断是否存在 Address addr = new Address(); addr.setUrl(connector.getUrl()); addr.setPort(connector.getPort()); addr.setService(connector.getServiceName()); boolean existQueryTable = this.getStore().getMetaDataEngine().isExistTable(this.getStoreId(), addr, destTableName); if(!existQueryTable){ logger.debug("----------> 不存在表,不能查找最大版本号,默认version = 0,table = " + destTableName); return Long.valueOf(0); } } StringBuilder sql = new StringBuilder("select max("); sql.append(this.getCurrentTableInfo().getVersionName()); sql.append(") verValue from "); sql.append(destTableName); verValue = connector.queryForLong(sql.toString(), null); logger.debug("找到业务表版本号最大值,verValue = " + verValue); return verValue; } }