shikeying
2024-01-11 3b67e947e36133e2a40eb2737b15ea375e157ea0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
package com.walker.scheduler;
 
import com.walker.infrastructure.arguments.ArgumentsManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
//import com.walkersoft.datagather.store.DatabaseStore;
 
public abstract class AbstractScheduler implements Scheduler {
 
    protected transient final Logger logger = LoggerFactory.getLogger(getClass());
 
    private int id;
    private String name;
    public void setId(int id) {
        this.id = id;
    }
 
    public void setName(String name) {
        this.name = name;
    }
 
    private Option option = null;
 
    // 内部时钟频率,默认:10秒
    private long timeInterval = 5000 * 1;
 
    protected long startTime = 0;
    private long restartTime = 0;
 
    // 记录线程引用,这样可以调用让线程中断睡眠
    // 2016-11-28 时克英
    private Thread kernelThread = null;
 
    public int nThreads = 1;
    public int nThreadQueue = 16;
 
    private ExecutorService executorService = null;
    private InternalTimerRunner timerRunner = new InternalTimerRunner();
 
    protected boolean started = false; // 是否已经启动运行
 
    // 如果任务执行没有返回值(调用请求没有得到数据),是否让任务终止
    private boolean taskTerminateCondition = false;
 
    // 任务调用失败最大值,默认:0表示不限制最大值
    private int maxFailedTimes = 0;
    private int currentFailedTime = 0;
 
    protected ScheduleEngine scheduleEngine = null;
 
    public ScheduleEngine getScheduleEngine() {
        return scheduleEngine;
    }
 
//    DatabaseStore store = null;
 
    // 是否核心调度器
    private boolean kernelScheduler = false;
 
    public AbstractScheduler(){}
 
    public AbstractScheduler(int id, String name
//            , DatabaseStore store
    ){
        this.id = id;
        this.name = name;
//        this.store = store;
        if(this.executorService == null){
            ThreadPoolExecutor threadPoolExecutor =  new ThreadPoolExecutor(nThreads, nThreads,
                    0L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<Runnable>(nThreadQueue),
                    new ThreadFactory() {
                        @Override
                        public Thread newThread(Runnable r) {
                            Thread t = new Thread(r);
                            t.setDaemon(true);
                            return t;
                        }
                    });
            this.executorService = threadPoolExecutor;
            logger.info("初始化了线程池对象'executorService', nThreads=" + nThreads + ", nThreadQueue=" + nThreadQueue);
        }
    }
 
    @Override
    public String getStoreId(){
//        if(store != null){
//            return store.getId();
//        }
//        return null;
        throw new UnsupportedOperationException("该属性废弃");
    }
 
    /**
     * 返回采集请求数据发生错误的次数,无论是http或者数据库请求失败都会计数。<br>
     * 当该值超过最大整数时,自动清零,并重新计数。
     * @return
     */
    public int getCurrentFailedTime() {
        return currentFailedTime;
    }
 
//    public DatabaseStore getStore() {
//        return store;
//    }
 
    @Override
    public int getId() {
        return this.id;
    }
 
    @Override
    public String getName() {
        return this.name;
    }
 
    @Override
    public Option getOption(){
        return this.option;
    }
 
    public long getTimeInterval() {
        return timeInterval;
    }
 
    @Override
    public long getStartTime() {
        return startTime;
    }
 
    public long getRestartTime() {
        return restartTime;
    }
 
    @Override
    public boolean isStarted() {
        return started;
    }
 
    @Override
    public boolean isPause(){
        if(timerRunner != null){
            return timerRunner.isPause();
        }
        return false;
    }
 
    @Override
    public boolean isTaskTerminateCondition() {
        return taskTerminateCondition;
    }
 
    @Override
    public boolean isKernelScheduler(){
        return kernelScheduler;
    }
 
    /**
     * 中断执行线程,通常是在它间隔睡眠时,执行该方法唤醒。
     */
    public void interruptKernelThread(){
        logger.debug("线程被唤醒一次:" + this.getName());
        if(this.kernelThread != null){
            kernelThread.interrupt();
        }
    }
 
    public boolean getStart(){
        return started;
    }
 
    public boolean getPause(){
        return isPause();
    }
 
    @Override
    public void start() {
        if(!started){
            this.checkData();
            executorService.execute(timerRunner);
            logger.info("调度器'" + name + "'启动......");
            started = true;
            if(scheduleEngine != null){
                scheduleEngine.setStatusStarted(id);
            }
        } else {
            throw new IllegalStateException("调度器已启动,调用状态错误。id = " + id);
        }
        startTime = System.currentTimeMillis();
    }
 
    @Override
    public void restart(){
        if(started && !timerRunner.isPause()){
            logger.warn("调度器正在执行,不能重复执行启动操作。id = " + id);
            throw new IllegalStateException("调度器正在执行,不能重复执行启动操作。id = " + id);
        }
        logger.info("调度器'" + name + "'暂停后,被重新运行......");
        timerRunner.setPause(false);
 
        this.checkData();
        executorService.execute(timerRunner);
 
        if(scheduleEngine != null){
            scheduleEngine.setStatusRestarted(id);
        }
        restartTime = System.currentTimeMillis();
    }
 
    private void checkData(){
        if(this.option == null){
            throw new IllegalStateException("调度器'" + this.name + "'无法启动: option未设置");
        }
    }
 
    @Override
    public void stop() {
        timerRunner.setStop();
        executorService.shutdown();
        started = false;
        if(scheduleEngine != null){
            scheduleEngine.setStatusStoped(id);
        }
        logger.info("调度器'" + name + "'被终止运行......");
    }
 
    @Override
    public void pause() {
        timerRunner.setPause(true);
        if(scheduleEngine != null){
            scheduleEngine.setStatusPaused(id);
        }
        logger.info("调度器'" + name + "'暂停......");
    }
 
    protected void setCurrentFailedTime(int currentFailedTime) {
        this.currentFailedTime = currentFailedTime;
    }
 
    public void setTimeInterval(long timeInterval) {
        this.timeInterval = timeInterval;
    }
 
    @Override
    public void setTaskTerminateCondition(boolean boo) {
        this.taskTerminateCondition = boo;
    }
 
    /**
     * 设置调度任务失败最大次数,超过该次数,自动终止任务。
     * @param maxFailedTimes
     */
    @Override
    public void setMaxFailedTimes(int maxFailedTimes) {
        this.maxFailedTimes = maxFailedTimes;
    }
 
    @Override
    public void setOption(Option option) {
        if(option == null){
            throw new IllegalArgumentException("创建调度器失败:设置的option并不存在");
        }
        this.option = option;
    }
 
    @Override
    public void setScheduleEngine(ScheduleEngine scheduleEngine) {
        this.scheduleEngine = scheduleEngine;
    }
 
    /**
     * 运行任务,由子类执行。因为输入任务参数只有子类才知道。
     * @return
     * @throws Exception
     */
    protected abstract Object runTask() throws Exception;
 
    private class InternalTimerRunner implements Runnable{
 
        private boolean pause = false;
        private boolean stop = false;
 
        public void setStop() {
            this.stop = true;
        }
 
        public void setPause(boolean pause) {
//            if(pause){
//                // 暂停
//                try {
//                    this.pause = true;
//                    Thread.currentThread().wait();
//                } catch (InterruptedException e) {
//                    e.printStackTrace();
//                    this.pause = false;
//                }
//            } else {
//                // 恢复
//                Thread.currentThread().notify();
//                this.pause = false;
//            }
            this.pause = pause;
        }
 
        public boolean isPause() {
            return pause;
        }
 
        @Override
        public void run() {
            while(!stop){
                if(maxFailedTimes >= 1 && currentFailedTime >= maxFailedTimes){
                    logger.info("调度被迫终止'" + name + "': 因为调用失败超过最大值(" + maxFailedTimes + ")");
//                    stop();
//                    this.stop = true;
                    break;
                }
 
                // 采用新策略,暂停后,直接停止调用它的线程,恢复后重新执行线程任务
                // 2015-1-5 by shikeying
                if(this.pause){
                    break;
                }
 
                try{
                    Option.TimeObject to = option.isAvailableTime(System.currentTimeMillis());
                    if(to.isAvailable()){
//                    if(option.isAvailable(System.currentTimeMillis())){
                        logger.debug("======== 满足时间要求,执行任务调用一次");
                        onBeforeSchedule(to);
                        Object result = runTask();
 
                        //~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
                        // 时克英添加,2019-01-02,如果是周期性任务,每次执行完都必须切换到下个时间
                        if(option.isCycleTask()){
                            option.scheduleToNext(to);
                        }
                        //~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
                        if(result == null && taskTerminateCondition){
                            logger.info("调度被迫终止'" + name + "': 因为调用未返回任何数据");
//                                stop();
                            break;
                        }
                    }
                } catch(Exception ex){
                    if(ex instanceof NotFoundGatherDataException){
                        logger.debug("数据源没有可采集数据,不属于业务错误。");
                    } else {
                        if(currentFailedTime >= Integer.MAX_VALUE){
                            currentFailedTime = 0;
                        }
                        currentFailedTime ++;
                        logger.error("任务调用失败一次,scheduler = " + id, ex);
                    }
                } finally {
                    try {
                        if(timeInterval > 0){
//                            logger.debug("*************** sleep: " + timeInterval);
                            TimeUnit.MILLISECONDS.sleep(timeInterval);
                        }
                    } catch (InterruptedException e) {}
                }
            }
            if(!pause && !stop){
                // 只有没有暂停的情况下而且没有手动停止,才算调度执行完成
                if(scheduleEngine != null){
                    scheduleEngine.setStatusDone(id);
                }
                started = false;
            }
        }
    }
 
    /**
     * 每次执行调度之前,调用该方法。
     * @param to
     */
    protected void onBeforeSchedule(Option.TimeObject to){
 
    }
 
    private ArgumentsManager argumentManager;
 
    @Override
    public ArgumentsManager getArgumentManager() {
        return argumentManager;
    }
 
    public void setArgumentManager(ArgumentsManager argumentManager){
        this.argumentManager = argumentManager;
    }
}