package com.walker.scheduler; import com.walker.infrastructure.arguments.ArgumentsManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; //import com.walkersoft.datagather.store.DatabaseStore; public abstract class AbstractScheduler implements Scheduler { protected transient final Logger logger = LoggerFactory.getLogger(getClass()); private int id; private String name; public void setId(int id) { this.id = id; } public void setName(String name) { this.name = name; } private Option option = null; // 内部时钟频率,默认:10秒 private long timeInterval = 5000 * 1; protected long startTime = 0; private long restartTime = 0; // 记录线程引用,这样可以调用让线程中断睡眠 // 2016-11-28 时克英 private Thread kernelThread = null; public int nThreads = 1; public int nThreadQueue = 16; private ExecutorService executorService = null; private InternalTimerRunner timerRunner = new InternalTimerRunner(); protected boolean started = false; // 是否已经启动运行 // 如果任务执行没有返回值(调用请求没有得到数据),是否让任务终止 private boolean taskTerminateCondition = false; // 任务调用失败最大值,默认:0表示不限制最大值 private int maxFailedTimes = 0; private int currentFailedTime = 0; protected ScheduleEngine scheduleEngine = null; public ScheduleEngine getScheduleEngine() { return scheduleEngine; } // DatabaseStore store = null; // 是否核心调度器 private boolean kernelScheduler = false; public AbstractScheduler(){} public AbstractScheduler(int id, String name // , DatabaseStore store ){ this.id = id; this.name = name; // this.store = store; if(this.executorService == null){ ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(nThreadQueue), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); return t; } }); this.executorService = threadPoolExecutor; logger.info("初始化了线程池对象'executorService', nThreads=" + nThreads + ", nThreadQueue=" + nThreadQueue); } } @Override public String getStoreId(){ // if(store != null){ // return store.getId(); // } // return null; throw new UnsupportedOperationException("该属性废弃"); } /** * 返回采集请求数据发生错误的次数,无论是http或者数据库请求失败都会计数。
* 当该值超过最大整数时,自动清零,并重新计数。 * @return */ public int getCurrentFailedTime() { return currentFailedTime; } // public DatabaseStore getStore() { // return store; // } @Override public int getId() { return this.id; } @Override public String getName() { return this.name; } @Override public Option getOption(){ return this.option; } public long getTimeInterval() { return timeInterval; } @Override public long getStartTime() { return startTime; } public long getRestartTime() { return restartTime; } @Override public boolean isStarted() { return started; } @Override public boolean isPause(){ if(timerRunner != null){ return timerRunner.isPause(); } return false; } @Override public boolean isTaskTerminateCondition() { return taskTerminateCondition; } @Override public boolean isKernelScheduler(){ return kernelScheduler; } /** * 中断执行线程,通常是在它间隔睡眠时,执行该方法唤醒。 */ public void interruptKernelThread(){ logger.debug("线程被唤醒一次:" + this.getName()); if(this.kernelThread != null){ kernelThread.interrupt(); } } public boolean getStart(){ return started; } public boolean getPause(){ return isPause(); } @Override public void start() { if(!started){ this.checkData(); executorService.execute(timerRunner); logger.info("调度器'" + name + "'启动......"); started = true; if(scheduleEngine != null){ scheduleEngine.setStatusStarted(id); } } else { throw new IllegalStateException("调度器已启动,调用状态错误。id = " + id); } startTime = System.currentTimeMillis(); } @Override public void restart(){ if(started && !timerRunner.isPause()){ logger.warn("调度器正在执行,不能重复执行启动操作。id = " + id); throw new IllegalStateException("调度器正在执行,不能重复执行启动操作。id = " + id); } logger.info("调度器'" + name + "'暂停后,被重新运行......"); timerRunner.setPause(false); this.checkData(); executorService.execute(timerRunner); if(scheduleEngine != null){ scheduleEngine.setStatusRestarted(id); } restartTime = System.currentTimeMillis(); } private void checkData(){ if(this.option == null){ throw new IllegalStateException("调度器'" + this.name + "'无法启动: option未设置"); } } @Override public void stop() { timerRunner.setStop(); executorService.shutdown(); started = false; if(scheduleEngine != null){ scheduleEngine.setStatusStoped(id); } logger.info("调度器'" + name + "'被终止运行......"); } @Override public void pause() { timerRunner.setPause(true); if(scheduleEngine != null){ scheduleEngine.setStatusPaused(id); } logger.info("调度器'" + name + "'暂停......"); } protected void setCurrentFailedTime(int currentFailedTime) { this.currentFailedTime = currentFailedTime; } public void setTimeInterval(long timeInterval) { this.timeInterval = timeInterval; } @Override public void setTaskTerminateCondition(boolean boo) { this.taskTerminateCondition = boo; } /** * 设置调度任务失败最大次数,超过该次数,自动终止任务。 * @param maxFailedTimes */ @Override public void setMaxFailedTimes(int maxFailedTimes) { this.maxFailedTimes = maxFailedTimes; } @Override public void setOption(Option option) { if(option == null){ throw new IllegalArgumentException("创建调度器失败:设置的option并不存在"); } this.option = option; } @Override public void setScheduleEngine(ScheduleEngine scheduleEngine) { this.scheduleEngine = scheduleEngine; } /** * 运行任务,由子类执行。因为输入任务参数只有子类才知道。 * @return * @throws Exception */ protected abstract Object runTask() throws Exception; private class InternalTimerRunner implements Runnable{ private boolean pause = false; private boolean stop = false; public void setStop() { this.stop = true; } public void setPause(boolean pause) { // if(pause){ // // 暂停 // try { // this.pause = true; // Thread.currentThread().wait(); // } catch (InterruptedException e) { // e.printStackTrace(); // this.pause = false; // } // } else { // // 恢复 // Thread.currentThread().notify(); // this.pause = false; // } this.pause = pause; } public boolean isPause() { return pause; } @Override public void run() { while(!stop){ if(maxFailedTimes >= 1 && currentFailedTime >= maxFailedTimes){ logger.info("调度被迫终止'" + name + "': 因为调用失败超过最大值(" + maxFailedTimes + ")"); // stop(); // this.stop = true; break; } // 采用新策略,暂停后,直接停止调用它的线程,恢复后重新执行线程任务 // 2015-1-5 by shikeying if(this.pause){ break; } try{ Option.TimeObject to = option.isAvailableTime(System.currentTimeMillis()); if(to.isAvailable()){ // if(option.isAvailable(System.currentTimeMillis())){ logger.debug("======== 满足时间要求,执行任务调用一次"); onBeforeSchedule(to); Object result = runTask(); //~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ // 时克英添加,2019-01-02,如果是周期性任务,每次执行完都必须切换到下个时间 if(option.isCycleTask()){ option.scheduleToNext(to); } //~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ if(result == null && taskTerminateCondition){ logger.info("调度被迫终止'" + name + "': 因为调用未返回任何数据"); // stop(); break; } } } catch(Exception ex){ if(ex instanceof NotFoundGatherDataException){ logger.debug("数据源没有可采集数据,不属于业务错误。"); } else { if(currentFailedTime >= Integer.MAX_VALUE){ currentFailedTime = 0; } currentFailedTime ++; logger.error("任务调用失败一次,scheduler = " + id, ex); } } finally { try { if(timeInterval > 0){ // logger.debug("*************** sleep: " + timeInterval); TimeUnit.MILLISECONDS.sleep(timeInterval); } } catch (InterruptedException e) {} } } if(!pause && !stop){ // 只有没有暂停的情况下而且没有手动停止,才算调度执行完成 if(scheduleEngine != null){ scheduleEngine.setStatusDone(id); } started = false; } } } /** * 每次执行调度之前,调用该方法。 * @param to */ protected void onBeforeSchedule(Option.TimeObject to){ } private ArgumentsManager argumentManager; @Override public ArgumentsManager getArgumentManager() { return argumentManager; } public void setArgumentManager(ArgumentsManager argumentManager){ this.argumentManager = argumentManager; } }