shikeyin
2024-01-11 65da8373531677b1c37a98f53eaa30c892f35e5a
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package com.iplatform.gather;
 
import com.walker.store.support.DatabaseStore;
 
/**
 * 单表数据采集调度器默认实现。后续该对象会废弃,没太大意义,因为通常都不会只采集一个表。
 * @author shikeying
 * @date 2016年2月22日
 *
 */
public abstract class SingleTableGatherScheduler extends GatherScheduler{
 
    private String tableName;
 
    private String createTableSql;
 
    private String loadSql;
 
    private int index = 0;
 
    private int pageSize = 64;
 
    public SingleTableGatherScheduler(int id, String name, DatabaseStore store){
        super(id, name, store);
    }
 
    public void setTableName(String tableName) {
        this.tableName = tableName;
    }
 
    public void setCreateTableSql(String createTableSql) {
        this.createTableSql = createTableSql;
    }
 
    public void setLoadSql(String loadSql) {
        this.loadSql = loadSql;
    }
 
    public void setPageSize(int pageSize) {
        this.pageSize = pageSize;
    }
 
    @Override
    protected Object[] getRunParameters(Object previousInvokeResult) {
        // 从原始表加载数据,这里主要是给出加载的条件。
        Object[] params = new Object[2];
        String querySql = null;
        if(this.getInvokeCount() == 0){
            logger.debug("第一次执行:getRunParameters");
            querySql = this.loadSql + getPageSQLInfo(index, pageSize);
            params[0] = 0;
            index += pageSize;
 
        } else if(previousInvokeResult != null){
            logger.debug("第" + this.getInvokeCount() + "次调用,前一次存在数据");
            querySql = this.loadSql + getPageSQLInfo(index, pageSize);
            params[0] = index;
            index += pageSize;
 
        } else {
            logger.debug("第" + this.getInvokeCount() + "次调用,前一次未获得数据");
            querySql = this.loadSql + getPageSQLInfo(index, pageSize);
            if(index - pageSize >= 0){
                index -= pageSize;
                params[0] = index;
                index += pageSize;
            } else {
                params[0] = 0;
                index += pageSize;
            }
        }
        logger.debug("-----> SQL = " + querySql + ", index = " + params[0]);
 
//        params[0] = index;
        params[1] = pageSize;
        return new Object[]{tableName, createTableSql, null, new Object[]{querySql, params}};
    }
 
    /**
     * 子类提供分页查询的SQL代码,如:MySQL = limit ?, ?、ORACLE等
     * @param index
     * @param pageSize
     * @return
     */
    protected abstract String getPageSQLInfo(int index, int pageSize);
}