package com.iplatform.gather;
|
|
import com.walker.infrastructure.utils.StringUtils;
|
import com.walker.scheduler.impl.TimedScheduler;
|
import com.walker.scheduler.util.OptionUtils;
|
import com.walker.store.AbstractStore;
|
import com.walker.store.task.GatherTask;
|
|
/**
|
* 抽象的采集调度器默认实现,提供了创建调度的模板。
|
* 所有采集数据任务都应继承该模板使用。
|
* @author 时克英
|
* @date 2022-09-09
|
*/
|
public abstract class GatherScheduler extends TimedScheduler {
|
|
private AbstractStore store;
|
private GatherTask gatherTask;
|
|
// 检测采集存储的目标表是否存在,2022-09-12
|
private boolean checkDestTableExist = true;
|
|
public GatherScheduler(int id, String name, AbstractStore store){
|
super(id, name);
|
// if(id <= 0){
|
// throw new IllegalArgumentException("GatherScheduler: id is required!");
|
// }
|
// if(StringUtils.isEmpty(name)){
|
// throw new IllegalArgumentException("GatherScheduler: name is required!");
|
// }
|
if(store == null){
|
throw new IllegalArgumentException("GatherScheduler: store is required!");
|
}
|
this.store = store;
|
// 设置时间选项为永远,同:ForeverScheduler
|
this.setOption(OptionUtils.combineEveryDay24HourOption());
|
}
|
|
@Override
|
public void stop() {
|
super.stop();
|
this.store.destroy();
|
}
|
|
@Override
|
protected Object onProcess(Object[] inputParams) throws Exception {
|
if(this.gatherTask == null){
|
this.gatherTask = this.providerTask(this.store);
|
if(this.gatherTask == null){
|
throw new IllegalArgumentException("未提供任务对象,调度无法执行:" + this.getName());
|
}
|
}
|
// srcName, createTableSQL, parameter, params[]
|
// 1因为无法更新私服,因此这个抛出异常先不粗粝,注释掉,回单位要加上
|
return this.gatherTask.run((String)inputParams[0], (String)inputParams[1], inputParams[2], (Object[])inputParams[3]);
|
}
|
|
@Override
|
public String getStoreId(){
|
return this.store.getId();
|
}
|
|
protected AbstractStore getStore(){
|
return this.store;
|
}
|
|
/**
|
* 获取一个特定的采集任务,采集任务主要处理数据获取的具体操作。
|
* @param store
|
* @return
|
*/
|
protected abstract GatherTask providerTask(AbstractStore store);
|
|
/**
|
* 终止调度器运行,在重要检查中,例如:参数配置或采集数据等设置不正确的时候无法启动调度器,此时需要终止。
|
* <p>该方法会调用调度管理器,更新调度器状态。</p>
|
* @param schedulerId
|
* @date 2022-09-13
|
*/
|
protected abstract void terminateGatherScheduler(int schedulerId);
|
|
/**
|
* 是否检查目标表是否存在
|
* @return
|
*/
|
public boolean isCheckDestTableExist() {
|
return checkDestTableExist;
|
}
|
|
/**
|
* 设置是否检查目标表是否存在
|
* @param checkDestTableExist
|
*/
|
public void setCheckDestTableExist(boolean checkDestTableExist) {
|
this.checkDestTableExist = checkDestTableExist;
|
}
|
}
|