package com.iplatform.gather;
|
|
import com.walker.scheduler.NotFoundGatherDataException;
|
import com.walker.scheduler.Option;
|
import com.walker.store.support.DatabaseStore;
|
|
import java.util.HashMap;
|
import java.util.List;
|
import java.util.Map;
|
|
/**
|
* 同时采集并存储多个表,调度任务实现。
|
* @author 时克英
|
* @date 2022-09-10
|
*/
|
public abstract class MultiTableStoreScheduler extends GatherScheduler{
|
|
// 当前正在采集的表对象
|
private WaitingTableInfo currentTableInfo;
|
|
// 缓存等待采集的表信息
|
Map<Integer, WaitingTableInfo> cachedWaitingTables = null;
|
|
// 初始化加载的采集表数量
|
private int totalInitialize = 0;
|
|
/**
|
* 平衡参数,如果大于1会按照总数百分比加载,
|
* 主要考虑后续使用多个调度器来实现并发采集。(暂未使用)
|
*/
|
private int balance = 0;
|
|
//
|
// 调用失败次数超过该值,会强制切换表进行采集
|
private static final int maxFailedTimesForSwitch = 4;
|
// 调用次数超过该值,也会强制切换表进行采集
|
private static final int maxInvokeTimesForSwitch = 1024;
|
|
// 上一次获得'那一天'
|
private int previousDay = 0;
|
|
public MultiTableStoreScheduler(int id, String name, DatabaseStore store){
|
super(id, name, store);
|
// 禁止父类处理睡眠功能,由子类自定义控制
|
this.setAllowIdleSleep(false);
|
store.setSupportUpdate(this.isSupportUpdateData());
|
}
|
|
/**
|
* 获取等待采集的表信息集合。
|
* @param supportUpdate
|
* @return
|
* @date 2022-09-12
|
*/
|
protected abstract List<WaitingTableInfo> acquireWaitingTableList(boolean supportUpdate);
|
|
/**
|
* 获取当前已经采集的表数据版本号最大值,在增量采集中会使用。<p></p>
|
* 但在全量采集表则返回空。
|
* @return
|
*/
|
protected abstract Long acquireGatheredMaxVersion();
|
|
public WaitingTableInfo getCurrentTableInfo() {
|
return currentTableInfo;
|
}
|
|
/**
|
* 是否支持更新数据
|
* @return
|
*/
|
protected boolean isSupportUpdateData(){
|
return false;
|
}
|
|
@Override
|
protected Object[] getRunParameters(Object previousInvokeResult) {
|
/*
|
* 找出学校信息来提供参数。
|
* 因为学校较多而且每个学校都可能有许多表,因此这里需要能支持几种提供策略。
|
*
|
* 方案一、优先加载完成一个学校
|
* 方案二、各个学校平均加载,
|
* */
|
|
// 从持久化数据中,检查等待采集的表集合信息
|
if(cachedWaitingTables == null || cachedWaitingTables.size() == 0){
|
this.doInitializeWaitingList();
|
}
|
// 如果某个表上次没有采集到数据,本次将会切换表继续采集,按照顺序切换
|
if(this.getInvokeCount() > 0){
|
if(this.isTimeIntervalChanged()){
|
// 更新过时钟,并且上次采集有数据,让时钟复位
|
if(previousInvokeResult != null){
|
// 如果在时钟间隔被修改过的情况下,采集返回了数据,应当复位时钟按照正常时间调度采集。
|
this.doResetIntervalTime();
|
} else {
|
doSwitchWaitingTable(true);
|
}
|
} else {
|
// 没有更新过时钟
|
if(previousInvokeResult == null){
|
logger.debug("后续任务调用中,上次调用没有返回数据,切换采集表。currentTableInfo = " + this.currentTableInfo);
|
doSwitchWaitingTable(true);
|
}
|
}
|
}
|
|
if(this.getCurrentFailedTime() >= maxFailedTimesForSwitch){
|
logger.debug("采集请求失败超过'" + maxFailedTimesForSwitch + "'次,强制切换采集表");
|
doSwitchWaitingTable(true);
|
// 这句很重要,因为这是有状态;不这样做会导致每次执行都切换采集表
|
this.setCurrentFailedTime(0); // 失败次数回0
|
}
|
// 调用总次数超过最大值,也会导致切换表采集
|
if(this.getInvokeCount() > 0 && this.getInvokeCount() % maxInvokeTimesForSwitch == 0){
|
logger.debug("调用次数超过最大值,自动切换表采集。currentTableInfo = " + this.currentTableInfo);
|
// doSwitchWaitingTable(false);
|
doSwitchWaitingTable(true);
|
}
|
|
// 查找数据表中的最大版本号,如果是全量则返回:null
|
Long maxVersion = this.acquireGatheredMaxVersion();
|
|
return new Object[]{this.currentTableInfo.getTableName()
|
, this.currentTableInfo.getSqlContent()
|
, this.currentTableInfo, new Object[]{maxVersion}};
|
}
|
|
@Override
|
protected void onBeforeSchedule(Option.TimeObject to){
|
// 为了测试方便,开发时使用小时作为重建条件;正式可能会改成按天计算
|
rebuildWaitingTableByHour(to);
|
}
|
|
private void rebuildWaitingTableByHour(Option.TimeObject to){
|
this.rebuildWaitingTableByTime(1, to);
|
}
|
|
private void rebuildWaitingTableByDay(Option.TimeObject to){
|
this.rebuildWaitingTableByTime(2, to);
|
}
|
|
/**
|
* 每次调度执行先检查是否新的一天,如果是就需要评估是否需要重新加载采集表信息
|
* 因为后续采集表数量可能会达到上万,因此需要采用必要的缓存来分析采集日志以确定策略。
|
* @param type 时间类型:1_小时, 2_天
|
* @param to 传入的时间对象<code>TimeObject</code>
|
*/
|
private void rebuildWaitingTableByTime(int type, Option.TimeObject to){
|
if(previousDay == 0){
|
if(type == 1){
|
previousDay = to.getHour();
|
} else if(type == 2){
|
previousDay = to.getDay();
|
} else if(type == 3){
|
// previousDay = to.getMonth();
|
throw new UnsupportedOperationException();
|
} else {
|
throw new UnsupportedOperationException();
|
}
|
}
|
boolean _changed = false;
|
if(type == 1){
|
_changed = to.getHour() != previousDay;
|
} else if(type == 2){
|
_changed = to.getDay() != previousDay;
|
} else {
|
throw new UnsupportedOperationException();
|
}
|
|
if(_changed){
|
logger.info("时间改变,开始重建采集表信息。time:" + to + ", 重新初始化采集列表");
|
// 每天强制重新初始化采集表集合
|
this.doInitializeWaitingList();
|
// 同时复位调度时钟的间隔时间
|
this.doResetIntervalTime();
|
if(type == 1){
|
previousDay = to.getHour();
|
} else if(type == 2){
|
previousDay = to.getDay();
|
} else {
|
throw new UnsupportedOperationException();
|
}
|
}
|
}
|
|
//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
//~ 多表切换相关
|
//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
/**
|
* 切换采集表,采集过程中会根据各种情况切换表,避免一个采集过多而其他表无法采集。
|
* @param removeCurrentTable 是否把当前采集表从采集队列移除?
|
*/
|
private void doSwitchWaitingTable(boolean removeCurrentTable){
|
if(this.cachedWaitingTables.size() == 1){
|
logger.debug("只有一个表,无法切换采集。");
|
return;
|
}
|
int index = this.currentTableInfo.getIndex();
|
if(removeCurrentTable){
|
cachedWaitingTables.remove(index);
|
}
|
|
if(cachedWaitingTables.isEmpty()){
|
logger.info("===========> 采集表已经没有数据,可能是没有数据可采集,增加休息时间。(之后重新初始化加载)");
|
this.doChangeIntervalTime();
|
this.doInitializeWaitingList();
|
return;
|
}
|
if(index >= totalInitialize-1){
|
index = 0;
|
}
|
this.searchNextTable(index);
|
}
|
|
private void searchNextTable(int index){
|
WaitingTableInfo wti = cachedWaitingTables.get(index);
|
if(wti == null){
|
index++;
|
searchNextTable(index);
|
} else {
|
this.currentTableInfo = wti;
|
logger.debug("================> 找到了下一个采集表信息:" + wti);
|
}
|
}
|
|
/**
|
* 初始化加载等待采集的表集合信息
|
// * @param balance 平衡参数,如果大于1会按照总数百分比加载,主要考虑后续使用多个调度器来实现并发采集。
|
*/
|
private void doInitializeWaitingList(){
|
// List<WaitingTableInfo> loaded = gatherTask.aquireWaitingTableList(this.isSupportUpdateData());
|
List<WaitingTableInfo> loaded = this.acquireWaitingTableList(this.isSupportUpdateData());
|
logger.debug("+++++++++++++++++++ 本次获得的采集表都包含:" + loaded);
|
|
if(loaded != null && loaded.size() > 0){
|
totalInitialize = loaded.size();
|
cachedWaitingTables = new HashMap<Integer, WaitingTableInfo>(totalInitialize);
|
int i = 0;
|
for(WaitingTableInfo wti : loaded){
|
wti.setIndex(i);
|
cachedWaitingTables.put(i, wti);
|
i++;
|
}
|
} else {
|
// logger.warn("没有发现任何采集表定义信息,该任务启动毫无意义! scheduler = " + this.getId());
|
this.terminateGatherScheduler(this.getId());
|
throw new NotFoundGatherDataException("没有发现任何采集表定义信息,该任务启动毫无意义! scheduler = " + this.getId());
|
}
|
this.currentTableInfo = cachedWaitingTables.get(0);
|
if(this.cachedWaitingTables.size() == 1){
|
logger.warn("采集表数量只有一个,请使用: SingleTableStoreScheduler 调度器实现!");
|
}
|
}
|
|
// private DatabaseGatherTask task = null;
|
/*private void terminateGatherScheduler(){
|
// 根据调度器ID获得采集ID
|
SchedulerUtils.terminateGatherScheduler(this.getId());
|
logger.info("调度器 '" + this.getId() + "'(" + this.getName() + "被强制终止),因为未找到定义的学校表信息");
|
}*/
|
}
|