shikeyin
2024-01-11 65da8373531677b1c37a98f53eaa30c892f35e5a
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package com.iplatform.gather;
 
import com.walker.infrastructure.utils.StringUtils;
import com.walker.scheduler.impl.TimedScheduler;
import com.walker.scheduler.util.OptionUtils;
import com.walker.store.AbstractStore;
import com.walker.store.task.GatherTask;
 
/**
 * 抽象的采集调度器默认实现,提供了创建调度的模板。
 * 所有采集数据任务都应继承该模板使用。
 * @author 时克英
 * @date 2022-09-09
 */
public abstract class GatherScheduler extends TimedScheduler {
 
    private AbstractStore store;
    private GatherTask gatherTask;
 
    // 检测采集存储的目标表是否存在,2022-09-12
    private boolean checkDestTableExist = true;
 
    public GatherScheduler(int id, String name, AbstractStore store){
        super(id, name);
//        if(id <= 0){
//            throw new IllegalArgumentException("GatherScheduler: id is required!");
//        }
//        if(StringUtils.isEmpty(name)){
//            throw new IllegalArgumentException("GatherScheduler: name is required!");
//        }
        if(store == null){
            throw new IllegalArgumentException("GatherScheduler: store is required!");
        }
        this.store = store;
        // 设置时间选项为永远,同:ForeverScheduler
        this.setOption(OptionUtils.combineEveryDay24HourOption());
    }
 
    @Override
    public void stop() {
        super.stop();
        this.store.destroy();
    }
 
    @Override
    protected Object onProcess(Object[] inputParams) throws Exception {
        if(this.gatherTask == null){
            this.gatherTask = this.providerTask(this.store);
            if(this.gatherTask == null){
                throw new IllegalArgumentException("未提供任务对象,调度无法执行:" + this.getName());
            }
        }
        // srcName, createTableSQL, parameter, params[]
        // 1因为无法更新私服,因此这个抛出异常先不粗粝,注释掉,回单位要加上
        return this.gatherTask.run((String)inputParams[0], (String)inputParams[1], inputParams[2], (Object[])inputParams[3]);
    }
 
    @Override
    public String getStoreId(){
        return this.store.getId();
    }
 
    protected AbstractStore getStore(){
        return this.store;
    }
 
    /**
     * 获取一个特定的采集任务,采集任务主要处理数据获取的具体操作。
     * @param store
     * @return
     */
    protected abstract GatherTask providerTask(AbstractStore store);
 
    /**
     * 终止调度器运行,在重要检查中,例如:参数配置或采集数据等设置不正确的时候无法启动调度器,此时需要终止。
     * <p>该方法会调用调度管理器,更新调度器状态。</p>
     * @param schedulerId
     * @date 2022-09-13
     */
    protected abstract void terminateGatherScheduler(int schedulerId);
 
    /**
     * 是否检查目标表是否存在
     * @return
     */
    public boolean isCheckDestTableExist() {
        return checkDestTableExist;
    }
 
    /**
     * 设置是否检查目标表是否存在
     * @param checkDestTableExist
     */
    public void setCheckDestTableExist(boolean checkDestTableExist) {
        this.checkDestTableExist = checkDestTableExist;
    }
}