package com.iplatform.gather; import com.walker.scheduler.NotFoundGatherDataException; import com.walker.scheduler.Option; import com.walker.store.support.DatabaseStore; import java.util.HashMap; import java.util.List; import java.util.Map; /** * 同时采集并存储多个表,调度任务实现。 * @author 时克英 * @date 2022-09-10 */ public abstract class MultiTableStoreScheduler extends GatherScheduler{ // 当前正在采集的表对象 private WaitingTableInfo currentTableInfo; // 缓存等待采集的表信息 Map cachedWaitingTables = null; // 初始化加载的采集表数量 private int totalInitialize = 0; /** * 平衡参数,如果大于1会按照总数百分比加载, * 主要考虑后续使用多个调度器来实现并发采集。(暂未使用) */ private int balance = 0; // // 调用失败次数超过该值,会强制切换表进行采集 private static final int maxFailedTimesForSwitch = 4; // 调用次数超过该值,也会强制切换表进行采集 private static final int maxInvokeTimesForSwitch = 1024; // 上一次获得'那一天' private int previousDay = 0; public MultiTableStoreScheduler(int id, String name, DatabaseStore store){ super(id, name, store); // 禁止父类处理睡眠功能,由子类自定义控制 this.setAllowIdleSleep(false); store.setSupportUpdate(this.isSupportUpdateData()); } /** * 获取等待采集的表信息集合。 * @param supportUpdate * @return * @date 2022-09-12 */ protected abstract List acquireWaitingTableList(boolean supportUpdate); /** * 获取当前已经采集的表数据版本号最大值,在增量采集中会使用。

* 但在全量采集表则返回空。 * @return */ protected abstract Long acquireGatheredMaxVersion(); public WaitingTableInfo getCurrentTableInfo() { return currentTableInfo; } /** * 是否支持更新数据 * @return */ protected boolean isSupportUpdateData(){ return false; } @Override protected Object[] getRunParameters(Object previousInvokeResult) { /* * 找出学校信息来提供参数。 * 因为学校较多而且每个学校都可能有许多表,因此这里需要能支持几种提供策略。 * * 方案一、优先加载完成一个学校 * 方案二、各个学校平均加载, * */ // 从持久化数据中,检查等待采集的表集合信息 if(cachedWaitingTables == null || cachedWaitingTables.size() == 0){ this.doInitializeWaitingList(); } // 如果某个表上次没有采集到数据,本次将会切换表继续采集,按照顺序切换 if(this.getInvokeCount() > 0){ if(this.isTimeIntervalChanged()){ // 更新过时钟,并且上次采集有数据,让时钟复位 if(previousInvokeResult != null){ // 如果在时钟间隔被修改过的情况下,采集返回了数据,应当复位时钟按照正常时间调度采集。 this.doResetIntervalTime(); } else { doSwitchWaitingTable(true); } } else { // 没有更新过时钟 if(previousInvokeResult == null){ logger.debug("后续任务调用中,上次调用没有返回数据,切换采集表。currentTableInfo = " + this.currentTableInfo); doSwitchWaitingTable(true); } } } if(this.getCurrentFailedTime() >= maxFailedTimesForSwitch){ logger.debug("采集请求失败超过'" + maxFailedTimesForSwitch + "'次,强制切换采集表"); doSwitchWaitingTable(true); // 这句很重要,因为这是有状态;不这样做会导致每次执行都切换采集表 this.setCurrentFailedTime(0); // 失败次数回0 } // 调用总次数超过最大值,也会导致切换表采集 if(this.getInvokeCount() > 0 && this.getInvokeCount() % maxInvokeTimesForSwitch == 0){ logger.debug("调用次数超过最大值,自动切换表采集。currentTableInfo = " + this.currentTableInfo); // doSwitchWaitingTable(false); doSwitchWaitingTable(true); } // 查找数据表中的最大版本号,如果是全量则返回:null Long maxVersion = this.acquireGatheredMaxVersion(); return new Object[]{this.currentTableInfo.getTableName() , this.currentTableInfo.getSqlContent() , this.currentTableInfo, new Object[]{maxVersion}}; } @Override protected void onBeforeSchedule(Option.TimeObject to){ // 为了测试方便,开发时使用小时作为重建条件;正式可能会改成按天计算 rebuildWaitingTableByHour(to); } private void rebuildWaitingTableByHour(Option.TimeObject to){ this.rebuildWaitingTableByTime(1, to); } private void rebuildWaitingTableByDay(Option.TimeObject to){ this.rebuildWaitingTableByTime(2, to); } /** * 每次调度执行先检查是否新的一天,如果是就需要评估是否需要重新加载采集表信息 * 因为后续采集表数量可能会达到上万,因此需要采用必要的缓存来分析采集日志以确定策略。 * @param type 时间类型:1_小时, 2_天 * @param to 传入的时间对象TimeObject */ private void rebuildWaitingTableByTime(int type, Option.TimeObject to){ if(previousDay == 0){ if(type == 1){ previousDay = to.getHour(); } else if(type == 2){ previousDay = to.getDay(); } else if(type == 3){ // previousDay = to.getMonth(); throw new UnsupportedOperationException(); } else { throw new UnsupportedOperationException(); } } boolean _changed = false; if(type == 1){ _changed = to.getHour() != previousDay; } else if(type == 2){ _changed = to.getDay() != previousDay; } else { throw new UnsupportedOperationException(); } if(_changed){ logger.info("时间改变,开始重建采集表信息。time:" + to + ", 重新初始化采集列表"); // 每天强制重新初始化采集表集合 this.doInitializeWaitingList(); // 同时复位调度时钟的间隔时间 this.doResetIntervalTime(); if(type == 1){ previousDay = to.getHour(); } else if(type == 2){ previousDay = to.getDay(); } else { throw new UnsupportedOperationException(); } } } //~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ //~ 多表切换相关 //~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ /** * 切换采集表,采集过程中会根据各种情况切换表,避免一个采集过多而其他表无法采集。 * @param removeCurrentTable 是否把当前采集表从采集队列移除? */ private void doSwitchWaitingTable(boolean removeCurrentTable){ if(this.cachedWaitingTables.size() == 1){ logger.debug("只有一个表,无法切换采集。"); return; } int index = this.currentTableInfo.getIndex(); if(removeCurrentTable){ cachedWaitingTables.remove(index); } if(cachedWaitingTables.isEmpty()){ logger.info("===========> 采集表已经没有数据,可能是没有数据可采集,增加休息时间。(之后重新初始化加载)"); this.doChangeIntervalTime(); this.doInitializeWaitingList(); return; } if(index >= totalInitialize-1){ index = 0; } this.searchNextTable(index); } private void searchNextTable(int index){ WaitingTableInfo wti = cachedWaitingTables.get(index); if(wti == null){ index++; searchNextTable(index); } else { this.currentTableInfo = wti; logger.debug("================> 找到了下一个采集表信息:" + wti); } } /** * 初始化加载等待采集的表集合信息 // * @param balance 平衡参数,如果大于1会按照总数百分比加载,主要考虑后续使用多个调度器来实现并发采集。 */ private void doInitializeWaitingList(){ // List loaded = gatherTask.aquireWaitingTableList(this.isSupportUpdateData()); List loaded = this.acquireWaitingTableList(this.isSupportUpdateData()); logger.debug("+++++++++++++++++++ 本次获得的采集表都包含:" + loaded); if(loaded != null && loaded.size() > 0){ totalInitialize = loaded.size(); cachedWaitingTables = new HashMap(totalInitialize); int i = 0; for(WaitingTableInfo wti : loaded){ wti.setIndex(i); cachedWaitingTables.put(i, wti); i++; } } else { // logger.warn("没有发现任何采集表定义信息,该任务启动毫无意义! scheduler = " + this.getId()); this.terminateGatherScheduler(this.getId()); throw new NotFoundGatherDataException("没有发现任何采集表定义信息,该任务启动毫无意义! scheduler = " + this.getId()); } this.currentTableInfo = cachedWaitingTables.get(0); if(this.cachedWaitingTables.size() == 1){ logger.warn("采集表数量只有一个,请使用: SingleTableStoreScheduler 调度器实现!"); } } // private DatabaseGatherTask task = null; /*private void terminateGatherScheduler(){ // 根据调度器ID获得采集ID SchedulerUtils.terminateGatherScheduler(this.getId()); logger.info("调度器 '" + this.getId() + "'(" + this.getName() + "被强制终止),因为未找到定义的学校表信息"); }*/ }