shikeying
2024-01-11 3b67e947e36133e2a40eb2737b15ea375e157ea0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
package com.walker.scheduler;
 
import com.walker.infrastructure.utils.StringUtils;
 
/**
 * 抽象的核心调度器实现。</p>
 * 核心调度器不能够被人为管理(启动、停止等)。<br>
 * 它只能被超级管理员控制,通常启动后不会被终止(除非超管干预)。
 * @author shikeying
 * @date 2016年8月24日
 *
 */
public abstract class AbstractKernelScheduler extends AbstractScheduler {
 
    // 上次调用返回的结果,标记下以便在后面的调用处理中分析是否要切换输入参数
    private Object previousInvokeResult = StringUtils.EMPTY_STRING;
    
    // 计数调用次数,即:提供参数的次数,第一次为0
    private int invokeCount = 0;
    
    /**
     * 延长的时钟间隔默认值,该值在没有数据可采集时让系统进入简单休眠模式。
     * 默认20分钟:20 * 60 * 1000
     */
    private static final long MORE_INTERVAL_TIME = 10 * 60 * 1000;
    private long originalTimeInterval = 0;
    // 时钟间隔时间是否被修改过
    private boolean timeIntervalChanged = false;
    
    // 休眠时间,该变量主要让用户修改,默认使用:MORE_INTERVAL_TIME
    private long waitSleepTime = 0;
 
    // 是否允许'未采集到数据'时线程进入睡眠一段时间?
    private boolean allowIdleSleep = true;
 
    public boolean isAllowIdleSleep() {
        return allowIdleSleep;
    }
 
    public void setAllowIdleSleep(boolean allowIdleSleep) {
        this.allowIdleSleep = allowIdleSleep;
    }
 
    /**
     * 休眠时间,该变量主要让用户修改,如果为0,默认使用:MORE_INTERVAL_TIME
     * @param waitSleepTime
     */
    public void setWaitSleepTime(long waitSleepTime) {
        this.waitSleepTime = waitSleepTime;
    }
 
//    public AbstractKernelScheduler(){
//    }
    
    public AbstractKernelScheduler(int id, String name
//            , DatabaseStore store
    ) {
        super(id, name);
    }
 
    @Override
    protected Object runTask() throws Exception {
        if(this.originalTimeInterval == 0){
            // 初始化时先记录时钟间隔时间,后面还要修改
            originalTimeInterval = this.getTimeInterval();
        }
 
        // 开启选项才能设置休眠功能
        if(this.allowIdleSleep){
            // 如果某个表上次没有采集到数据,本次将会切换表继续采集,按照顺序切换
            if(this.getInvokeCount() > 0){
                if(this.isTimeIntervalChanged()){
                    // 更新过时钟,并且上次采集有数据,让时钟复位
                    if(previousInvokeResult != null){
                        // 如果在时钟间隔被修改过的情况下,采集返回了数据,应当复位时钟按照正常时间调度采集。
                        this.doResetIntervalTime();
                    } else {
//                        doSwitchWaitingTable(true);
                        this.doChangeIntervalTime();
                    }
                } else {
                    // 没有更新过时钟
                    if(previousInvokeResult == null){
                        logger.debug("后续任务调用中,上次调用没有返回数据,调度睡眠。currentGather = ");
//                        doSwitchWaitingTable(true);
                        this.doChangeIntervalTime();
                    }
                }
            }
        }
 
        Object[] inputParams = getRunParameters(previousInvokeResult);
//        if(inputParams == null){
//            throw new IllegalStateException("请实现方法:getRunParameters();");
//        }
        previousInvokeResult = doRunOnce(inputParams);
        if(invokeCount >= Integer.MAX_VALUE){
            invokeCount = 0;
        }
        invokeCount++;
        
        // 如果是运行一次的调度器,执行完一次任务后,就直接结束掉
        // 时克英 2019-01-02
        if(this.getOption().getPeriodType() == Option.PeriodType.NONE){
            this.stop();
            return null;
        }
        return previousInvokeResult;
    }
    
    /**
     * 修改时钟间隔时间,让它能更缓慢的工作(休息)
     */
    protected void doChangeIntervalTime(){
        if(waitSleepTime == 0){
            this.setTimeInterval(MORE_INTERVAL_TIME);
        } else {
            this.setTimeInterval(waitSleepTime);
        }
        this.timeIntervalChanged = true;
    }
    
    /**
     * 复位时钟间隔时间到默认值。
     */
    protected void doResetIntervalTime(){
        this.setTimeInterval(originalTimeInterval);
        this.timeIntervalChanged = false;
        logger.debug("...........采集调度时钟间隔被复位,reset interval time:" + originalTimeInterval);
    }
 
    @Override
    public String getStoreId(){
        throw new UnsupportedOperationException();
    }
    
//    @Override
//    public DatabaseStore getStore() {
//        throw new UnsupportedOperationException();
//    }
    
    @Override
    public boolean isKernelScheduler(){
        return true;
    }
    
    public boolean isTimeIntervalChanged() {
        return timeIntervalChanged;
    }
    
    /**
     * 返回运行任务的动态参数,每次调用任务时,参数可能都不一样。</p>
     * 例如:一卡通数据采集中,每次都会依次选择一个学校的一个表来采集;<br>
     * 此时还有配合参数maxFailedTimes
     * @param previousInvokeResult 上次请求采集是否返回数据,如果该值为<code>null</code>,表示没有返回
     * @return
     */
    protected abstract Object[] getRunParameters(Object previousInvokeResult);
    
    /**
     * 执行一次具体任务动作,由子类实现
     * @return
     */
    protected abstract Object doRunOnce(Object[] inputParams) throws Exception;
    
    /**
     * 返回采集当前调用的次数
     * @return
     */
    public int getInvokeCount() {
        return invokeCount;
    }
}