package com.walker.scheduler;
|
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
|
import java.util.ArrayList;
|
import java.util.List;
|
import java.util.concurrent.ConcurrentHashMap;
|
|
/**
|
* 采集调度器管理器,默认实现。
|
* @author shikeying
|
* @date 2015年12月30日
|
*
|
*/
|
public abstract class GatherSchedulerManager {
|
|
protected transient final Logger logger = LoggerFactory.getLogger(this.getClass());
|
|
// private final ConcurrentHashMap<Integer, GatherScheduler> cached = new ConcurrentHashMap<Integer, GatherScheduler>(8);
|
private final ConcurrentHashMap<Integer, AbstractScheduler> cached = new ConcurrentHashMap<Integer, AbstractScheduler>(8);
|
|
protected ConcurrentHashMap<Integer, AbstractScheduler> getCachedData(){
|
return cached;
|
}
|
|
public void startScheduler(AbstractScheduler gatherScheduler){
|
if(gatherScheduler == null){
|
return;
|
}
|
if(cached.get(gatherScheduler.getId()) != null){
|
throw new IllegalStateException("已经存在调度器实现,不能重复。id = " + gatherScheduler.getId());
|
}
|
cached.putIfAbsent(gatherScheduler.getId(), gatherScheduler);
|
gatherScheduler.start();
|
logger.info("[GatherSchedulerManager]-启动了一个调度器:" + gatherScheduler);
|
}
|
|
@Deprecated
|
public void restartScheduler(AbstractScheduler gatherScheduler){
|
if(gatherScheduler == null){
|
return;
|
}
|
Scheduler gs = cached.get(gatherScheduler.getId());
|
if(gs != null){
|
gs.restart();
|
logger.debug("缓存中存在该调度器,重新启动。id = " + gs.getId());
|
return;
|
}
|
// 缓存中没有,直接调用启动方法
|
// startScheduler(gatherScheduler);
|
cached.putIfAbsent(gatherScheduler.getId(), gatherScheduler);
|
gatherScheduler.restart();
|
logger.info("[GatherSchedulerManager]-重新启动该调度器:" + gatherScheduler);
|
}
|
|
public void restartScheduler(int id){
|
Scheduler gs = cached.get(id);
|
if(gs != null){
|
gs.restart();
|
logger.debug("缓存中存在该调度器,重新启动。id = " + gs.getId());
|
return;
|
}
|
}
|
|
public void stopScheduler(int schedulerId){
|
Scheduler scheduler = cached.get(schedulerId);
|
if(scheduler == null){
|
throw new IllegalStateException("GatherSchedulerManager管理器中尚未存在该调度器:" + schedulerId);
|
}
|
try{
|
scheduler.stop();
|
cached.remove(schedulerId);
|
} catch(Exception ex){
|
logger.error("停止调度器出现异常:" + ex.getMessage(), ex);
|
}
|
}
|
|
public void pauseScheduler(int schedulerId){
|
Scheduler scheduler = cached.get(schedulerId);
|
if(scheduler == null){
|
throw new IllegalStateException("GatherSchedulerManager管理器中尚未存在该调度器:" + schedulerId);
|
}
|
scheduler.pause();
|
}
|
|
/**
|
* 列出所有正在运行,包含:暂停的,所有调度器
|
* @return
|
*/
|
public List<AbstractScheduler> listSchedulers(){
|
List<AbstractScheduler> result = new ArrayList<AbstractScheduler>(cached.size());
|
for(AbstractScheduler gs : cached.values()){
|
result.add(gs);
|
}
|
return result;
|
}
|
|
public AbstractScheduler getOneGatherScheduler(int schedulerId){
|
AbstractScheduler scheduler = cached.get(schedulerId);
|
// if(scheduler == null){
|
// throw new IllegalStateException("缓存中不存在该调度器:" + schedulerId);
|
// }
|
return scheduler;
|
}
|
|
/**
|
* 初始化方法,子类实现。如:启动时从数据库加载任务列表
|
*/
|
protected abstract void initialize();
|
}
|