package com.walker.scheduler;
|
|
import com.walker.infrastructure.utils.StringUtils;
|
|
/**
|
* 抽象的核心调度器实现。</p>
|
* 核心调度器不能够被人为管理(启动、停止等)。<br>
|
* 它只能被超级管理员控制,通常启动后不会被终止(除非超管干预)。
|
* @author shikeying
|
* @date 2016年8月24日
|
*
|
*/
|
public abstract class AbstractKernelScheduler extends AbstractScheduler {
|
|
// 上次调用返回的结果,标记下以便在后面的调用处理中分析是否要切换输入参数
|
private Object previousInvokeResult = StringUtils.EMPTY_STRING;
|
|
// 计数调用次数,即:提供参数的次数,第一次为0
|
private int invokeCount = 0;
|
|
/**
|
* 延长的时钟间隔默认值,该值在没有数据可采集时让系统进入简单休眠模式。
|
* 默认20分钟:20 * 60 * 1000
|
*/
|
private static final long MORE_INTERVAL_TIME = 10 * 60 * 1000;
|
private long originalTimeInterval = 0;
|
// 时钟间隔时间是否被修改过
|
private boolean timeIntervalChanged = false;
|
|
// 休眠时间,该变量主要让用户修改,默认使用:MORE_INTERVAL_TIME
|
private long waitSleepTime = 0;
|
|
// 是否允许'未采集到数据'时线程进入睡眠一段时间?
|
private boolean allowIdleSleep = true;
|
|
public boolean isAllowIdleSleep() {
|
return allowIdleSleep;
|
}
|
|
public void setAllowIdleSleep(boolean allowIdleSleep) {
|
this.allowIdleSleep = allowIdleSleep;
|
}
|
|
/**
|
* 休眠时间,该变量主要让用户修改,如果为0,默认使用:MORE_INTERVAL_TIME
|
* @param waitSleepTime
|
*/
|
public void setWaitSleepTime(long waitSleepTime) {
|
this.waitSleepTime = waitSleepTime;
|
}
|
|
// public AbstractKernelScheduler(){
|
// }
|
|
public AbstractKernelScheduler(int id, String name
|
// , DatabaseStore store
|
) {
|
super(id, name);
|
}
|
|
@Override
|
protected Object runTask() throws Exception {
|
if(this.originalTimeInterval == 0){
|
// 初始化时先记录时钟间隔时间,后面还要修改
|
originalTimeInterval = this.getTimeInterval();
|
}
|
|
// 开启选项才能设置休眠功能
|
if(this.allowIdleSleep){
|
// 如果某个表上次没有采集到数据,本次将会切换表继续采集,按照顺序切换
|
if(this.getInvokeCount() > 0){
|
if(this.isTimeIntervalChanged()){
|
// 更新过时钟,并且上次采集有数据,让时钟复位
|
if(previousInvokeResult != null){
|
// 如果在时钟间隔被修改过的情况下,采集返回了数据,应当复位时钟按照正常时间调度采集。
|
this.doResetIntervalTime();
|
} else {
|
// doSwitchWaitingTable(true);
|
this.doChangeIntervalTime();
|
}
|
} else {
|
// 没有更新过时钟
|
if(previousInvokeResult == null){
|
logger.debug("后续任务调用中,上次调用没有返回数据,调度睡眠。currentGather = ");
|
// doSwitchWaitingTable(true);
|
this.doChangeIntervalTime();
|
}
|
}
|
}
|
}
|
|
Object[] inputParams = getRunParameters(previousInvokeResult);
|
// if(inputParams == null){
|
// throw new IllegalStateException("请实现方法:getRunParameters();");
|
// }
|
previousInvokeResult = doRunOnce(inputParams);
|
if(invokeCount >= Integer.MAX_VALUE){
|
invokeCount = 0;
|
}
|
invokeCount++;
|
|
// 如果是运行一次的调度器,执行完一次任务后,就直接结束掉
|
// 时克英 2019-01-02
|
if(this.getOption().getPeriodType() == Option.PeriodType.NONE){
|
this.stop();
|
return null;
|
}
|
return previousInvokeResult;
|
}
|
|
/**
|
* 修改时钟间隔时间,让它能更缓慢的工作(休息)
|
*/
|
protected void doChangeIntervalTime(){
|
if(waitSleepTime == 0){
|
this.setTimeInterval(MORE_INTERVAL_TIME);
|
} else {
|
this.setTimeInterval(waitSleepTime);
|
}
|
this.timeIntervalChanged = true;
|
}
|
|
/**
|
* 复位时钟间隔时间到默认值。
|
*/
|
protected void doResetIntervalTime(){
|
this.setTimeInterval(originalTimeInterval);
|
this.timeIntervalChanged = false;
|
logger.debug("...........采集调度时钟间隔被复位,reset interval time:" + originalTimeInterval);
|
}
|
|
@Override
|
public String getStoreId(){
|
throw new UnsupportedOperationException();
|
}
|
|
// @Override
|
// public DatabaseStore getStore() {
|
// throw new UnsupportedOperationException();
|
// }
|
|
@Override
|
public boolean isKernelScheduler(){
|
return true;
|
}
|
|
public boolean isTimeIntervalChanged() {
|
return timeIntervalChanged;
|
}
|
|
/**
|
* 返回运行任务的动态参数,每次调用任务时,参数可能都不一样。</p>
|
* 例如:一卡通数据采集中,每次都会依次选择一个学校的一个表来采集;<br>
|
* 此时还有配合参数maxFailedTimes
|
* @param previousInvokeResult 上次请求采集是否返回数据,如果该值为<code>null</code>,表示没有返回
|
* @return
|
*/
|
protected abstract Object[] getRunParameters(Object previousInvokeResult);
|
|
/**
|
* 执行一次具体任务动作,由子类实现
|
* @return
|
*/
|
protected abstract Object doRunOnce(Object[] inputParams) throws Exception;
|
|
/**
|
* 返回采集当前调用的次数
|
* @return
|
*/
|
public int getInvokeCount() {
|
return invokeCount;
|
}
|
}
|