package com.walker.store.task;
|
|
import com.walker.connector.AbstractConnector;
|
import com.walker.store.Storeable;
|
|
import java.util.List;
|
|
/**
|
* 抽象的采集任务定义。</p>
|
* 注意:这里让connector对象缓存了,也就意味着一个采集任务只能对应一个远程地址了
|
* @author shikeying
|
* @date 2015年12月24日
|
*
|
*/
|
public abstract class GatherTask extends AbstractTask{
|
|
/* 注意:这里让connector对象缓存了,也就意味着一个采集任务只能对应一个远程地址了 */
|
protected AbstractConnector connector = null;
|
|
/* 采集源可参考的数据量,大致原始数据量,仅作参考 */
|
private int srcPreferenceSize = 0;
|
|
public GatherTask(Storeable store, String name){
|
super(store, name);
|
}
|
|
/**
|
* 采集任务定义中,执行任务的返回值表示:是否请求到远程响应数据,如果没有就返回<code>null</code>
|
*/
|
@Override
|
public Object run(String srcName, String createTableSQL, Object parameter
|
, Object[] params) throws Exception{
|
if(connector == null){
|
connector = getConnector();
|
}
|
if(connector == null){
|
throw new IllegalArgumentException("GatherTask参数错误:connector必须存在!");
|
}
|
|
// long startTime = System.currentTimeMillis();
|
|
List<Object> datas = this.invokeRequest(connector, params);
|
if(datas == null){
|
logger.debug("GatherTask调用请求返回空数据");
|
return null;
|
}
|
|
logger.debug(".......采集返回的结果数据:" + datas);
|
store.write(srcName, createTableSQL, parameter, datas);
|
|
if(datas.size() > 0){
|
logger.debug("........采集执行onAfterWrite一次,srcName = " + srcName);
|
// LogInfo logInfo = new LogInfo();
|
// 每次请求并写入的处理总时间(秒)
|
// logInfo.setSpanTime((System.currentTimeMillis()-startTime)/1000).setWriteCount(datas.size());
|
// this.onAfterWrite(srcName, parameter, srcPreferenceSize, logInfo);
|
this.onAfterWrite(srcName, parameter, srcPreferenceSize);
|
}
|
return DEFAULT_RESULT;
|
}
|
|
public int getSrcPreferenceSize() {
|
return srcPreferenceSize;
|
}
|
|
/**
|
* 返回连接器对象,不同子类可以返回各种具体连接器,如:http、db等
|
* @return
|
*/
|
protected abstract AbstractConnector getConnector();
|
|
public void setSrcPreferenceSize(int srcPreferenceSize) {
|
this.srcPreferenceSize = srcPreferenceSize;
|
}
|
|
/**
|
* 调用请求的具体过程,由子类来实现。
|
* @param connector 连接器
|
* @param params 业务要传递的请求参数
|
* @return
|
* @throws Exception
|
*/
|
protected abstract List<Object> invokeRequest(AbstractConnector connector
|
, Object[] params) throws Exception;
|
|
/**
|
* 成功写入数据之后,调用该方法。
|
* @param srcName 原始表名称
|
* @param parameter 业务参数
|
* @param srcPreferenceSize 数据源返回的参考数据量
|
*/
|
protected void onAfterWrite(String srcName, Object parameter, int srcPreferenceSize){}
|
|
}
|