shikeying
2024-01-11 3b67e947e36133e2a40eb2737b15ea375e157ea0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package com.walker.scheduler;
 
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
 
/**
 * 采集调度器管理器,默认实现。
 * @author shikeying
 * @date 2015年12月30日
 *
 */
public abstract class GatherSchedulerManager {
 
    protected transient final Logger logger = LoggerFactory.getLogger(this.getClass());
    
//    private final ConcurrentHashMap<Integer, GatherScheduler> cached = new ConcurrentHashMap<Integer, GatherScheduler>(8);
    private final ConcurrentHashMap<Integer, AbstractScheduler> cached = new ConcurrentHashMap<Integer, AbstractScheduler>(8);
    
    protected ConcurrentHashMap<Integer, AbstractScheduler> getCachedData(){
        return cached;
    }
    
    public void startScheduler(AbstractScheduler gatherScheduler){
        if(gatherScheduler == null){
            return;
        }
        if(cached.get(gatherScheduler.getId()) != null){
            throw new IllegalStateException("已经存在调度器实现,不能重复。id = " + gatherScheduler.getId());
        }
        cached.putIfAbsent(gatherScheduler.getId(), gatherScheduler);
        gatherScheduler.start();
        logger.info("[GatherSchedulerManager]-启动了一个调度器:" + gatherScheduler);
    }
    
    @Deprecated
    public void restartScheduler(AbstractScheduler gatherScheduler){
        if(gatherScheduler == null){
            return;
        }
        Scheduler gs = cached.get(gatherScheduler.getId());
        if(gs != null){
            gs.restart();
            logger.debug("缓存中存在该调度器,重新启动。id = " + gs.getId());
            return;
        }
        // 缓存中没有,直接调用启动方法
//        startScheduler(gatherScheduler);
        cached.putIfAbsent(gatherScheduler.getId(), gatherScheduler);
        gatherScheduler.restart();
        logger.info("[GatherSchedulerManager]-重新启动该调度器:" + gatherScheduler);
    }
    
    public void restartScheduler(int id){
        Scheduler gs = cached.get(id);
        if(gs != null){
            gs.restart();
            logger.debug("缓存中存在该调度器,重新启动。id = " + gs.getId());
            return;
        }
    }
    
    public void stopScheduler(int schedulerId){
        Scheduler scheduler = cached.get(schedulerId);
        if(scheduler == null){
            throw new IllegalStateException("GatherSchedulerManager管理器中尚未存在该调度器:" + schedulerId);
        }
        try{
            scheduler.stop();
            cached.remove(schedulerId);
        } catch(Exception ex){
            logger.error("停止调度器出现异常:" + ex.getMessage(), ex);
        }
    }
    
    public void pauseScheduler(int schedulerId){
        Scheduler scheduler = cached.get(schedulerId);
        if(scheduler == null){
            throw new IllegalStateException("GatherSchedulerManager管理器中尚未存在该调度器:" + schedulerId);
        }
        scheduler.pause();
    }
    
    /**
     * 列出所有正在运行,包含:暂停的,所有调度器
     * @return
     */
    public List<AbstractScheduler> listSchedulers(){
        List<AbstractScheduler> result = new ArrayList<AbstractScheduler>(cached.size());
        for(AbstractScheduler gs : cached.values()){
            result.add(gs);
        }
        return result;
    }
    
    public AbstractScheduler getOneGatherScheduler(int schedulerId){
        AbstractScheduler scheduler = cached.get(schedulerId);
//        if(scheduler == null){
//            throw new IllegalStateException("缓存中不存在该调度器:" + schedulerId);
//        }
        return scheduler;
    }
    
    /**
     * 初始化方法,子类实现。如:启动时从数据库加载任务列表
     */
    protected abstract void initialize();
}