shikeyin
2024-01-11 65da8373531677b1c37a98f53eaa30c892f35e5a
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
package com.iplatform.gather;
 
import com.walker.scheduler.NotFoundGatherDataException;
import com.walker.scheduler.Option;
import com.walker.store.support.DatabaseStore;
 
import java.util.HashMap;
import java.util.List;
import java.util.Map;
 
/**
 * 同时采集并存储多个表,调度任务实现。
 * @author 时克英
 * @date 2022-09-10
 */
public abstract class MultiTableStoreScheduler extends GatherScheduler{
 
    // 当前正在采集的表对象
    private WaitingTableInfo currentTableInfo;
 
    // 缓存等待采集的表信息
    Map<Integer, WaitingTableInfo> cachedWaitingTables = null;
 
    // 初始化加载的采集表数量
    private int totalInitialize = 0;
 
    /**
     * 平衡参数,如果大于1会按照总数百分比加载,
     * 主要考虑后续使用多个调度器来实现并发采集。(暂未使用)
     */
    private int balance = 0;
 
    //
    // 调用失败次数超过该值,会强制切换表进行采集
    private static final int maxFailedTimesForSwitch = 4;
    // 调用次数超过该值,也会强制切换表进行采集
    private static final int maxInvokeTimesForSwitch = 1024;
 
    // 上一次获得'那一天'
    private int previousDay = 0;
 
    public MultiTableStoreScheduler(int id, String name, DatabaseStore store){
        super(id, name, store);
        // 禁止父类处理睡眠功能,由子类自定义控制
        this.setAllowIdleSleep(false);
        store.setSupportUpdate(this.isSupportUpdateData());
    }
 
    /**
     * 获取等待采集的表信息集合。
     * @param supportUpdate
     * @return
     * @date 2022-09-12
     */
    protected abstract List<WaitingTableInfo> acquireWaitingTableList(boolean supportUpdate);
 
    /**
     * 获取当前已经采集的表数据版本号最大值,在增量采集中会使用。<p></p>
     * 但在全量采集表则返回空。
     * @return
     */
    protected abstract Long acquireGatheredMaxVersion();
 
    public WaitingTableInfo getCurrentTableInfo() {
        return currentTableInfo;
    }
 
    /**
     * 是否支持更新数据
     * @return
     */
    protected boolean isSupportUpdateData(){
        return false;
    }
 
    @Override
    protected Object[] getRunParameters(Object previousInvokeResult) {
        /*
         * 找出学校信息来提供参数。
         * 因为学校较多而且每个学校都可能有许多表,因此这里需要能支持几种提供策略。
         *
         * 方案一、优先加载完成一个学校
         * 方案二、各个学校平均加载,
         * */
 
        // 从持久化数据中,检查等待采集的表集合信息
        if(cachedWaitingTables == null || cachedWaitingTables.size() == 0){
            this.doInitializeWaitingList();
        }
        // 如果某个表上次没有采集到数据,本次将会切换表继续采集,按照顺序切换
        if(this.getInvokeCount() > 0){
            if(this.isTimeIntervalChanged()){
                // 更新过时钟,并且上次采集有数据,让时钟复位
                if(previousInvokeResult != null){
                    // 如果在时钟间隔被修改过的情况下,采集返回了数据,应当复位时钟按照正常时间调度采集。
                    this.doResetIntervalTime();
                } else {
                    doSwitchWaitingTable(true);
                }
            } else {
                // 没有更新过时钟
                if(previousInvokeResult == null){
                    logger.debug("后续任务调用中,上次调用没有返回数据,切换采集表。currentTableInfo = " + this.currentTableInfo);
                    doSwitchWaitingTable(true);
                }
            }
        }
 
        if(this.getCurrentFailedTime() >= maxFailedTimesForSwitch){
            logger.debug("采集请求失败超过'" + maxFailedTimesForSwitch + "'次,强制切换采集表");
            doSwitchWaitingTable(true);
            // 这句很重要,因为这是有状态;不这样做会导致每次执行都切换采集表
            this.setCurrentFailedTime(0); // 失败次数回0
        }
        // 调用总次数超过最大值,也会导致切换表采集
        if(this.getInvokeCount() > 0 && this.getInvokeCount() % maxInvokeTimesForSwitch == 0){
            logger.debug("调用次数超过最大值,自动切换表采集。currentTableInfo = " + this.currentTableInfo);
//            doSwitchWaitingTable(false);
            doSwitchWaitingTable(true);
        }
 
        // 查找数据表中的最大版本号,如果是全量则返回:null
        Long maxVersion = this.acquireGatheredMaxVersion();
 
        return new Object[]{this.currentTableInfo.getTableName()
                , this.currentTableInfo.getSqlContent()
                , this.currentTableInfo, new Object[]{maxVersion}};
    }
 
    @Override
    protected void onBeforeSchedule(Option.TimeObject to){
        // 为了测试方便,开发时使用小时作为重建条件;正式可能会改成按天计算
        rebuildWaitingTableByHour(to);
    }
 
    private void rebuildWaitingTableByHour(Option.TimeObject to){
        this.rebuildWaitingTableByTime(1, to);
    }
 
    private void rebuildWaitingTableByDay(Option.TimeObject to){
        this.rebuildWaitingTableByTime(2, to);
    }
 
    /**
     * 每次调度执行先检查是否新的一天,如果是就需要评估是否需要重新加载采集表信息
     * 因为后续采集表数量可能会达到上万,因此需要采用必要的缓存来分析采集日志以确定策略。
     * @param type 时间类型:1_小时, 2_天
     * @param to 传入的时间对象<code>TimeObject</code>
     */
    private void rebuildWaitingTableByTime(int type, Option.TimeObject to){
        if(previousDay == 0){
            if(type == 1){
                previousDay = to.getHour();
            } else if(type == 2){
                previousDay = to.getDay();
            } else if(type == 3){
//                previousDay = to.getMonth();
                throw new UnsupportedOperationException();
            } else {
                throw new UnsupportedOperationException();
            }
        }
        boolean _changed = false;
        if(type == 1){
            _changed = to.getHour() != previousDay;
        } else if(type == 2){
            _changed = to.getDay() != previousDay;
        } else {
            throw new UnsupportedOperationException();
        }
 
        if(_changed){
            logger.info("时间改变,开始重建采集表信息。time:" + to + ", 重新初始化采集列表");
            // 每天强制重新初始化采集表集合
            this.doInitializeWaitingList();
            // 同时复位调度时钟的间隔时间
            this.doResetIntervalTime();
            if(type == 1){
                previousDay = to.getHour();
            } else if(type == 2){
                previousDay = to.getDay();
            } else {
                throw new UnsupportedOperationException();
            }
        }
    }
 
    //~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    //~ 多表切换相关
    //~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    /**
     * 切换采集表,采集过程中会根据各种情况切换表,避免一个采集过多而其他表无法采集。
     * @param removeCurrentTable 是否把当前采集表从采集队列移除?
     */
    private void doSwitchWaitingTable(boolean removeCurrentTable){
        if(this.cachedWaitingTables.size() == 1){
            logger.debug("只有一个表,无法切换采集。");
            return;
        }
        int index = this.currentTableInfo.getIndex();
        if(removeCurrentTable){
            cachedWaitingTables.remove(index);
        }
 
        if(cachedWaitingTables.isEmpty()){
            logger.info("===========> 采集表已经没有数据,可能是没有数据可采集,增加休息时间。(之后重新初始化加载)");
            this.doChangeIntervalTime();
            this.doInitializeWaitingList();
            return;
        }
        if(index >= totalInitialize-1){
            index = 0;
        }
        this.searchNextTable(index);
    }
 
    private void searchNextTable(int index){
        WaitingTableInfo wti = cachedWaitingTables.get(index);
        if(wti == null){
            index++;
            searchNextTable(index);
        } else {
            this.currentTableInfo = wti;
            logger.debug("================> 找到了下一个采集表信息:" + wti);
        }
    }
 
    /**
     * 初始化加载等待采集的表集合信息
//     * @param balance 平衡参数,如果大于1会按照总数百分比加载,主要考虑后续使用多个调度器来实现并发采集。
     */
    private void doInitializeWaitingList(){
//        List<WaitingTableInfo> loaded = gatherTask.aquireWaitingTableList(this.isSupportUpdateData());
        List<WaitingTableInfo> loaded = this.acquireWaitingTableList(this.isSupportUpdateData());
        logger.debug("+++++++++++++++++++ 本次获得的采集表都包含:" + loaded);
 
        if(loaded != null && loaded.size() > 0){
            totalInitialize = loaded.size();
            cachedWaitingTables = new HashMap<Integer, WaitingTableInfo>(totalInitialize);
            int i = 0;
            for(WaitingTableInfo wti : loaded){
                wti.setIndex(i);
                cachedWaitingTables.put(i, wti);
                i++;
            }
        } else {
//            logger.warn("没有发现任何采集表定义信息,该任务启动毫无意义! scheduler = " + this.getId());
            this.terminateGatherScheduler(this.getId());
            throw new NotFoundGatherDataException("没有发现任何采集表定义信息,该任务启动毫无意义! scheduler = " + this.getId());
        }
        this.currentTableInfo = cachedWaitingTables.get(0);
        if(this.cachedWaitingTables.size() == 1){
            logger.warn("采集表数量只有一个,请使用: SingleTableStoreScheduler 调度器实现!");
        }
    }
 
//    private DatabaseGatherTask task = null;
    /*private void terminateGatherScheduler(){
        // 根据调度器ID获得采集ID
        SchedulerUtils.terminateGatherScheduler(this.getId());
        logger.info("调度器 '" + this.getId() + "'(" + this.getName() + "被强制终止),因为未找到定义的学校表信息");
    }*/
}