package com.walker.scheduler;
|
|
import com.walker.infrastructure.arguments.ArgumentsManager;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
|
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.ThreadFactory;
|
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.TimeUnit;
|
//import com.walkersoft.datagather.store.DatabaseStore;
|
|
public abstract class AbstractScheduler implements Scheduler {
|
|
protected transient final Logger logger = LoggerFactory.getLogger(getClass());
|
|
private int id;
|
private String name;
|
public void setId(int id) {
|
this.id = id;
|
}
|
|
public void setName(String name) {
|
this.name = name;
|
}
|
|
private Option option = null;
|
|
// 内部时钟频率,默认:10秒
|
private long timeInterval = 5000 * 1;
|
|
protected long startTime = 0;
|
private long restartTime = 0;
|
|
// 记录线程引用,这样可以调用让线程中断睡眠
|
// 2016-11-28 时克英
|
private Thread kernelThread = null;
|
|
public int nThreads = 1;
|
public int nThreadQueue = 16;
|
|
private ExecutorService executorService = null;
|
private InternalTimerRunner timerRunner = new InternalTimerRunner();
|
|
protected boolean started = false; // 是否已经启动运行
|
|
// 如果任务执行没有返回值(调用请求没有得到数据),是否让任务终止
|
private boolean taskTerminateCondition = false;
|
|
// 任务调用失败最大值,默认:0表示不限制最大值
|
private int maxFailedTimes = 0;
|
private int currentFailedTime = 0;
|
|
protected ScheduleEngine scheduleEngine = null;
|
|
public ScheduleEngine getScheduleEngine() {
|
return scheduleEngine;
|
}
|
|
// DatabaseStore store = null;
|
|
// 是否核心调度器
|
private boolean kernelScheduler = false;
|
|
public AbstractScheduler(){}
|
|
public AbstractScheduler(int id, String name
|
// , DatabaseStore store
|
){
|
this.id = id;
|
this.name = name;
|
// this.store = store;
|
if(this.executorService == null){
|
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(nThreads, nThreads,
|
0L, TimeUnit.MILLISECONDS,
|
new LinkedBlockingQueue<Runnable>(nThreadQueue),
|
new ThreadFactory() {
|
@Override
|
public Thread newThread(Runnable r) {
|
Thread t = new Thread(r);
|
t.setDaemon(true);
|
return t;
|
}
|
});
|
this.executorService = threadPoolExecutor;
|
logger.info("初始化了线程池对象'executorService', nThreads=" + nThreads + ", nThreadQueue=" + nThreadQueue);
|
}
|
}
|
|
@Override
|
public String getStoreId(){
|
// if(store != null){
|
// return store.getId();
|
// }
|
// return null;
|
throw new UnsupportedOperationException("该属性废弃");
|
}
|
|
/**
|
* 返回采集请求数据发生错误的次数,无论是http或者数据库请求失败都会计数。<br>
|
* 当该值超过最大整数时,自动清零,并重新计数。
|
* @return
|
*/
|
public int getCurrentFailedTime() {
|
return currentFailedTime;
|
}
|
|
// public DatabaseStore getStore() {
|
// return store;
|
// }
|
|
@Override
|
public int getId() {
|
return this.id;
|
}
|
|
@Override
|
public String getName() {
|
return this.name;
|
}
|
|
@Override
|
public Option getOption(){
|
return this.option;
|
}
|
|
public long getTimeInterval() {
|
return timeInterval;
|
}
|
|
@Override
|
public long getStartTime() {
|
return startTime;
|
}
|
|
public long getRestartTime() {
|
return restartTime;
|
}
|
|
@Override
|
public boolean isStarted() {
|
return started;
|
}
|
|
@Override
|
public boolean isPause(){
|
if(timerRunner != null){
|
return timerRunner.isPause();
|
}
|
return false;
|
}
|
|
@Override
|
public boolean isTaskTerminateCondition() {
|
return taskTerminateCondition;
|
}
|
|
@Override
|
public boolean isKernelScheduler(){
|
return kernelScheduler;
|
}
|
|
/**
|
* 中断执行线程,通常是在它间隔睡眠时,执行该方法唤醒。
|
*/
|
public void interruptKernelThread(){
|
logger.debug("线程被唤醒一次:" + this.getName());
|
if(this.kernelThread != null){
|
kernelThread.interrupt();
|
}
|
}
|
|
public boolean getStart(){
|
return started;
|
}
|
|
public boolean getPause(){
|
return isPause();
|
}
|
|
@Override
|
public void start() {
|
if(!started){
|
this.checkData();
|
executorService.execute(timerRunner);
|
logger.info("调度器'" + name + "'启动......");
|
started = true;
|
if(scheduleEngine != null){
|
scheduleEngine.setStatusStarted(id);
|
}
|
} else {
|
throw new IllegalStateException("调度器已启动,调用状态错误。id = " + id);
|
}
|
startTime = System.currentTimeMillis();
|
}
|
|
@Override
|
public void restart(){
|
if(started && !timerRunner.isPause()){
|
logger.warn("调度器正在执行,不能重复执行启动操作。id = " + id);
|
throw new IllegalStateException("调度器正在执行,不能重复执行启动操作。id = " + id);
|
}
|
logger.info("调度器'" + name + "'暂停后,被重新运行......");
|
timerRunner.setPause(false);
|
|
this.checkData();
|
executorService.execute(timerRunner);
|
|
if(scheduleEngine != null){
|
scheduleEngine.setStatusRestarted(id);
|
}
|
restartTime = System.currentTimeMillis();
|
}
|
|
private void checkData(){
|
if(this.option == null){
|
throw new IllegalStateException("调度器'" + this.name + "'无法启动: option未设置");
|
}
|
}
|
|
@Override
|
public void stop() {
|
timerRunner.setStop();
|
executorService.shutdown();
|
started = false;
|
if(scheduleEngine != null){
|
scheduleEngine.setStatusStoped(id);
|
}
|
logger.info("调度器'" + name + "'被终止运行......");
|
}
|
|
@Override
|
public void pause() {
|
timerRunner.setPause(true);
|
if(scheduleEngine != null){
|
scheduleEngine.setStatusPaused(id);
|
}
|
logger.info("调度器'" + name + "'暂停......");
|
}
|
|
protected void setCurrentFailedTime(int currentFailedTime) {
|
this.currentFailedTime = currentFailedTime;
|
}
|
|
public void setTimeInterval(long timeInterval) {
|
this.timeInterval = timeInterval;
|
}
|
|
@Override
|
public void setTaskTerminateCondition(boolean boo) {
|
this.taskTerminateCondition = boo;
|
}
|
|
/**
|
* 设置调度任务失败最大次数,超过该次数,自动终止任务。
|
* @param maxFailedTimes
|
*/
|
@Override
|
public void setMaxFailedTimes(int maxFailedTimes) {
|
this.maxFailedTimes = maxFailedTimes;
|
}
|
|
@Override
|
public void setOption(Option option) {
|
if(option == null){
|
throw new IllegalArgumentException("创建调度器失败:设置的option并不存在");
|
}
|
this.option = option;
|
}
|
|
@Override
|
public void setScheduleEngine(ScheduleEngine scheduleEngine) {
|
this.scheduleEngine = scheduleEngine;
|
}
|
|
/**
|
* 运行任务,由子类执行。因为输入任务参数只有子类才知道。
|
* @return
|
* @throws Exception
|
*/
|
protected abstract Object runTask() throws Exception;
|
|
private class InternalTimerRunner implements Runnable{
|
|
private boolean pause = false;
|
private boolean stop = false;
|
|
public void setStop() {
|
this.stop = true;
|
}
|
|
public void setPause(boolean pause) {
|
// if(pause){
|
// // 暂停
|
// try {
|
// this.pause = true;
|
// Thread.currentThread().wait();
|
// } catch (InterruptedException e) {
|
// e.printStackTrace();
|
// this.pause = false;
|
// }
|
// } else {
|
// // 恢复
|
// Thread.currentThread().notify();
|
// this.pause = false;
|
// }
|
this.pause = pause;
|
}
|
|
public boolean isPause() {
|
return pause;
|
}
|
|
@Override
|
public void run() {
|
while(!stop){
|
if(maxFailedTimes >= 1 && currentFailedTime >= maxFailedTimes){
|
logger.info("调度被迫终止'" + name + "': 因为调用失败超过最大值(" + maxFailedTimes + ")");
|
// stop();
|
// this.stop = true;
|
break;
|
}
|
|
// 采用新策略,暂停后,直接停止调用它的线程,恢复后重新执行线程任务
|
// 2015-1-5 by shikeying
|
if(this.pause){
|
break;
|
}
|
|
try{
|
Option.TimeObject to = option.isAvailableTime(System.currentTimeMillis());
|
if(to.isAvailable()){
|
// if(option.isAvailable(System.currentTimeMillis())){
|
logger.debug("======== 满足时间要求,执行任务调用一次");
|
onBeforeSchedule(to);
|
Object result = runTask();
|
|
//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
// 时克英添加,2019-01-02,如果是周期性任务,每次执行完都必须切换到下个时间
|
if(option.isCycleTask()){
|
option.scheduleToNext(to);
|
}
|
//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
if(result == null && taskTerminateCondition){
|
logger.info("调度被迫终止'" + name + "': 因为调用未返回任何数据");
|
// stop();
|
break;
|
}
|
}
|
} catch(Exception ex){
|
if(ex instanceof NotFoundGatherDataException){
|
logger.debug("数据源没有可采集数据,不属于业务错误。");
|
} else {
|
if(currentFailedTime >= Integer.MAX_VALUE){
|
currentFailedTime = 0;
|
}
|
currentFailedTime ++;
|
logger.error("任务调用失败一次,scheduler = " + id, ex);
|
}
|
} finally {
|
try {
|
if(timeInterval > 0){
|
// logger.debug("*************** sleep: " + timeInterval);
|
TimeUnit.MILLISECONDS.sleep(timeInterval);
|
}
|
} catch (InterruptedException e) {}
|
}
|
}
|
if(!pause && !stop){
|
// 只有没有暂停的情况下而且没有手动停止,才算调度执行完成
|
if(scheduleEngine != null){
|
scheduleEngine.setStatusDone(id);
|
}
|
started = false;
|
}
|
}
|
}
|
|
/**
|
* 每次执行调度之前,调用该方法。
|
* @param to
|
*/
|
protected void onBeforeSchedule(Option.TimeObject to){
|
|
}
|
|
private ArgumentsManager argumentManager;
|
|
@Override
|
public ArgumentsManager getArgumentManager() {
|
return argumentManager;
|
}
|
|
public void setArgumentManager(ArgumentsManager argumentManager){
|
this.argumentManager = argumentManager;
|
}
|
}
|