package com.walker.store.task; import com.walker.connector.AbstractConnector; import com.walker.store.Storeable; import java.util.List; /** * 抽象的采集任务定义。

* 注意:这里让connector对象缓存了,也就意味着一个采集任务只能对应一个远程地址了 * @author shikeying * @date 2015年12月24日 * */ public abstract class GatherTask extends AbstractTask{ /* 注意:这里让connector对象缓存了,也就意味着一个采集任务只能对应一个远程地址了 */ protected AbstractConnector connector = null; /* 采集源可参考的数据量,大致原始数据量,仅作参考 */ private int srcPreferenceSize = 0; public GatherTask(Storeable store, String name){ super(store, name); } /** * 采集任务定义中,执行任务的返回值表示:是否请求到远程响应数据,如果没有就返回null */ @Override public Object run(String srcName, String createTableSQL, Object parameter , Object[] params) throws Exception{ if(connector == null){ connector = getConnector(); } if(connector == null){ throw new IllegalArgumentException("GatherTask参数错误:connector必须存在!"); } // long startTime = System.currentTimeMillis(); List datas = this.invokeRequest(connector, params); if(datas == null){ logger.debug("GatherTask调用请求返回空数据"); return null; } logger.debug(".......采集返回的结果数据:" + datas); store.write(srcName, createTableSQL, parameter, datas); if(datas.size() > 0){ logger.debug("........采集执行onAfterWrite一次,srcName = " + srcName); // LogInfo logInfo = new LogInfo(); // 每次请求并写入的处理总时间(秒) // logInfo.setSpanTime((System.currentTimeMillis()-startTime)/1000).setWriteCount(datas.size()); // this.onAfterWrite(srcName, parameter, srcPreferenceSize, logInfo); this.onAfterWrite(srcName, parameter, srcPreferenceSize); } return DEFAULT_RESULT; } public int getSrcPreferenceSize() { return srcPreferenceSize; } /** * 返回连接器对象,不同子类可以返回各种具体连接器,如:http、db等 * @return */ protected abstract AbstractConnector getConnector(); public void setSrcPreferenceSize(int srcPreferenceSize) { this.srcPreferenceSize = srcPreferenceSize; } /** * 调用请求的具体过程,由子类来实现。 * @param connector 连接器 * @param params 业务要传递的请求参数 * @return * @throws Exception */ protected abstract List invokeRequest(AbstractConnector connector , Object[] params) throws Exception; /** * 成功写入数据之后,调用该方法。 * @param srcName 原始表名称 * @param parameter 业务参数 * @param srcPreferenceSize 数据源返回的参考数据量 */ protected void onAfterWrite(String srcName, Object parameter, int srcPreferenceSize){} }