package com.walker.scheduler; import com.walker.infrastructure.utils.StringUtils; /** * 抽象的核心调度器实现。

* 核心调度器不能够被人为管理(启动、停止等)。
* 它只能被超级管理员控制,通常启动后不会被终止(除非超管干预)。 * @author shikeying * @date 2016年8月24日 * */ public abstract class AbstractKernelScheduler extends AbstractScheduler { // 上次调用返回的结果,标记下以便在后面的调用处理中分析是否要切换输入参数 private Object previousInvokeResult = StringUtils.EMPTY_STRING; // 计数调用次数,即:提供参数的次数,第一次为0 private int invokeCount = 0; /** * 延长的时钟间隔默认值,该值在没有数据可采集时让系统进入简单休眠模式。 * 默认20分钟:20 * 60 * 1000 */ private static final long MORE_INTERVAL_TIME = 10 * 60 * 1000; private long originalTimeInterval = 0; // 时钟间隔时间是否被修改过 private boolean timeIntervalChanged = false; // 休眠时间,该变量主要让用户修改,默认使用:MORE_INTERVAL_TIME private long waitSleepTime = 0; // 是否允许'未采集到数据'时线程进入睡眠一段时间? private boolean allowIdleSleep = true; public boolean isAllowIdleSleep() { return allowIdleSleep; } public void setAllowIdleSleep(boolean allowIdleSleep) { this.allowIdleSleep = allowIdleSleep; } /** * 休眠时间,该变量主要让用户修改,如果为0,默认使用:MORE_INTERVAL_TIME * @param waitSleepTime */ public void setWaitSleepTime(long waitSleepTime) { this.waitSleepTime = waitSleepTime; } // public AbstractKernelScheduler(){ // } public AbstractKernelScheduler(int id, String name // , DatabaseStore store ) { super(id, name); } @Override protected Object runTask() throws Exception { if(this.originalTimeInterval == 0){ // 初始化时先记录时钟间隔时间,后面还要修改 originalTimeInterval = this.getTimeInterval(); } // 开启选项才能设置休眠功能 if(this.allowIdleSleep){ // 如果某个表上次没有采集到数据,本次将会切换表继续采集,按照顺序切换 if(this.getInvokeCount() > 0){ if(this.isTimeIntervalChanged()){ // 更新过时钟,并且上次采集有数据,让时钟复位 if(previousInvokeResult != null){ // 如果在时钟间隔被修改过的情况下,采集返回了数据,应当复位时钟按照正常时间调度采集。 this.doResetIntervalTime(); } else { // doSwitchWaitingTable(true); this.doChangeIntervalTime(); } } else { // 没有更新过时钟 if(previousInvokeResult == null){ logger.debug("后续任务调用中,上次调用没有返回数据,调度睡眠。currentGather = "); // doSwitchWaitingTable(true); this.doChangeIntervalTime(); } } } } Object[] inputParams = getRunParameters(previousInvokeResult); // if(inputParams == null){ // throw new IllegalStateException("请实现方法:getRunParameters();"); // } previousInvokeResult = doRunOnce(inputParams); if(invokeCount >= Integer.MAX_VALUE){ invokeCount = 0; } invokeCount++; // 如果是运行一次的调度器,执行完一次任务后,就直接结束掉 // 时克英 2019-01-02 if(this.getOption().getPeriodType() == Option.PeriodType.NONE){ this.stop(); return null; } return previousInvokeResult; } /** * 修改时钟间隔时间,让它能更缓慢的工作(休息) */ protected void doChangeIntervalTime(){ if(waitSleepTime == 0){ this.setTimeInterval(MORE_INTERVAL_TIME); } else { this.setTimeInterval(waitSleepTime); } this.timeIntervalChanged = true; } /** * 复位时钟间隔时间到默认值。 */ protected void doResetIntervalTime(){ this.setTimeInterval(originalTimeInterval); this.timeIntervalChanged = false; logger.debug("...........采集调度时钟间隔被复位,reset interval time:" + originalTimeInterval); } @Override public String getStoreId(){ throw new UnsupportedOperationException(); } // @Override // public DatabaseStore getStore() { // throw new UnsupportedOperationException(); // } @Override public boolean isKernelScheduler(){ return true; } public boolean isTimeIntervalChanged() { return timeIntervalChanged; } /** * 返回运行任务的动态参数,每次调用任务时,参数可能都不一样。

* 例如:一卡通数据采集中,每次都会依次选择一个学校的一个表来采集;
* 此时还有配合参数maxFailedTimes * @param previousInvokeResult 上次请求采集是否返回数据,如果该值为null,表示没有返回 * @return */ protected abstract Object[] getRunParameters(Object previousInvokeResult); /** * 执行一次具体任务动作,由子类实现 * @return */ protected abstract Object doRunOnce(Object[] inputParams) throws Exception; /** * 返回采集当前调用的次数 * @return */ public int getInvokeCount() { return invokeCount; } }