package com.walker.store.task;
|
|
import com.walker.infrastructure.utils.StringUtils;
|
import com.walker.store.Storeable;
|
|
import java.util.List;
|
import java.util.concurrent.TimeUnit;
|
|
/**
|
* 监听接收数据类型的任务,例如:接收异步消息
|
* @author shikeying
|
* @date 2016年1月18日
|
*
|
*/
|
public class ListeningTask extends AbstractTask {
|
|
public ListeningTask(Storeable store, String name) {
|
super(store, name);
|
}
|
|
@Override
|
public Object run(String srcName, String createTableSQL, Object parameter, Object[] params) throws Exception {
|
// parameter作为传入的业务(监听接收到的)数据集合
|
if(parameter == null){
|
logger.debug("ListeningTask输入参数不存在:parameter,要采集的数据未提供: " + srcName);
|
this.doPauseOnReceivedEmptyData();
|
return null;
|
}
|
if(StringUtils.isEmpty(createTableSQL)){
|
logger.warn("采集的数据未提供SQL语句,可能无法完成存储:" + srcName);
|
}
|
|
if(!(parameter instanceof List)){
|
throw new IllegalArgumentException("ListeningTask: parameter参数必须是集合");
|
}
|
|
// long startTime = System.currentTimeMillis();
|
List<Object> datas = (List<Object>)parameter;
|
|
logger.debug(".......采集返回的结果数据:" + datas);
|
store.write(srcName, createTableSQL, parameter, datas);
|
|
if(datas.size() > 0){
|
logger.debug("........采集执行onAfterWrite一次,srcName = " + srcName);
|
// LogInfo logInfo = new LogInfo();
|
// logInfo.setSpanTime(System.currentTimeMillis()-startTime).setWriteCount(datas.size());
|
// this.onAfterWrite(srcName, parameter, logInfo);
|
this.onAfterWrite(srcName, parameter);
|
}
|
return DEFAULT_RESULT;
|
}
|
|
private void doPauseOnReceivedEmptyData(){
|
try {
|
TimeUnit.SECONDS.sleep(2);
|
} catch (InterruptedException e) {
|
// e.printStackTrace();
|
}
|
}
|
|
protected void onAfterWrite(String srcName, Object parameter){}
|
|
}
|