package com.walker.scheduler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; /** * 采集调度器管理器,默认实现。 * @author shikeying * @date 2015年12月30日 * */ public abstract class GatherSchedulerManager { protected transient final Logger logger = LoggerFactory.getLogger(this.getClass()); // private final ConcurrentHashMap cached = new ConcurrentHashMap(8); private final ConcurrentHashMap cached = new ConcurrentHashMap(8); protected ConcurrentHashMap getCachedData(){ return cached; } public void startScheduler(AbstractScheduler gatherScheduler){ if(gatherScheduler == null){ return; } if(cached.get(gatherScheduler.getId()) != null){ throw new IllegalStateException("已经存在调度器实现,不能重复。id = " + gatherScheduler.getId()); } cached.putIfAbsent(gatherScheduler.getId(), gatherScheduler); gatherScheduler.start(); logger.info("[GatherSchedulerManager]-启动了一个调度器:" + gatherScheduler); } @Deprecated public void restartScheduler(AbstractScheduler gatherScheduler){ if(gatherScheduler == null){ return; } Scheduler gs = cached.get(gatherScheduler.getId()); if(gs != null){ gs.restart(); logger.debug("缓存中存在该调度器,重新启动。id = " + gs.getId()); return; } // 缓存中没有,直接调用启动方法 // startScheduler(gatherScheduler); cached.putIfAbsent(gatherScheduler.getId(), gatherScheduler); gatherScheduler.restart(); logger.info("[GatherSchedulerManager]-重新启动该调度器:" + gatherScheduler); } public void restartScheduler(int id){ Scheduler gs = cached.get(id); if(gs != null){ gs.restart(); logger.debug("缓存中存在该调度器,重新启动。id = " + gs.getId()); return; } } public void stopScheduler(int schedulerId){ Scheduler scheduler = cached.get(schedulerId); if(scheduler == null){ throw new IllegalStateException("GatherSchedulerManager管理器中尚未存在该调度器:" + schedulerId); } try{ scheduler.stop(); cached.remove(schedulerId); } catch(Exception ex){ logger.error("停止调度器出现异常:" + ex.getMessage(), ex); } } public void pauseScheduler(int schedulerId){ Scheduler scheduler = cached.get(schedulerId); if(scheduler == null){ throw new IllegalStateException("GatherSchedulerManager管理器中尚未存在该调度器:" + schedulerId); } scheduler.pause(); } /** * 列出所有正在运行,包含:暂停的,所有调度器 * @return */ public List listSchedulers(){ List result = new ArrayList(cached.size()); for(AbstractScheduler gs : cached.values()){ result.add(gs); } return result; } public AbstractScheduler getOneGatherScheduler(int schedulerId){ AbstractScheduler scheduler = cached.get(schedulerId); // if(scheduler == null){ // throw new IllegalStateException("缓存中不存在该调度器:" + schedulerId); // } return scheduler; } /** * 初始化方法,子类实现。如:启动时从数据库加载任务列表 */ protected abstract void initialize(); }