package tech.powerjob.samples.processors;
|
|
import com.alibaba.fastjson.JSONObject;
|
import com.google.common.collect.Lists;
|
import lombok.AllArgsConstructor;
|
import lombok.Data;
|
import lombok.extern.slf4j.Slf4j;
|
import org.apache.commons.lang3.StringUtils;
|
import org.springframework.stereotype.Component;
|
import tech.powerjob.common.serialize.JsonUtils;
|
import tech.powerjob.common.utils.MapUtils;
|
import tech.powerjob.worker.core.processor.ProcessResult;
|
import tech.powerjob.worker.core.processor.TaskContext;
|
import tech.powerjob.worker.core.processor.TaskResult;
|
import tech.powerjob.worker.core.processor.sdk.MapReduceProcessor;
|
import tech.powerjob.worker.log.OmsLogger;
|
|
import java.io.Serializable;
|
import java.util.List;
|
import java.util.Optional;
|
import java.util.concurrent.ThreadLocalRandom;
|
import java.util.concurrent.atomic.AtomicLong;
|
|
/**
|
* MapReduce 处理器示例
|
* 控制台参数:{"batchSize": 100, "batchNum": 2}
|
*
|
* @author tjq
|
* @since 2020/4/17
|
*/
|
@Slf4j
|
@Component("demoMapReduceProcessor")
|
public class MapReduceProcessorDemo implements MapReduceProcessor {
|
|
@Override
|
public ProcessResult process(TaskContext context) throws Exception {
|
|
// PowerJob 提供的日志 API,可支持在控制台指定多种日志模式(在线查看 / 本地打印)。最佳实践:全部使用 OmsLogger 打印日志,开发阶段控制台配置为 在线日志方便开发;上线后调整为本地日志,与直接使用 SLF4J 无异
|
OmsLogger omsLogger = context.getOmsLogger();
|
|
// 是否为根任务,一般根任务进行任务的分发
|
boolean isRootTask = isRootTask();
|
// Task 名称,除了 MAP 任务其他 taskName 均由开发者自己创建,某种意义上也可以按参数理解(比如多层 MAP 的情况下,taskName 可以命名为,Map_Level1, Map_Level2,最终按 taskName 判断层级进不同的执行分支)
|
String taskName = context.getTaskName();
|
// 任务参数,控制台任务配置中直接填写的参数
|
String jobParamsStr = context.getJobParams();
|
// 任务示例参数,运行任务时手动填写的参数(等同于 OpenAPI runJob 的携带的参数)
|
String instanceParamsStr = context.getInstanceParams();
|
|
omsLogger.info("[MapReduceDemo] [startExecuteNewTask] jobId:{}, instanceId:{}, taskId:{}, taskName: {}, RetryTimes: {}, isRootTask:{}, jobParams:{}, instanceParams:{}", context.getJobId(), context.getInstanceId(), context.getTaskId(), taskName, context.getCurrentRetryTimes(), isRootTask, jobParamsStr, instanceParamsStr);
|
|
// 常见写法,优先从 InstanceParams 获取参数,取不到再从 JobParams 中获取,灵活性最佳(相当于实现了实例参数重载任务参数)
|
String finalParams = StringUtils.isEmpty(instanceParamsStr) ? jobParamsStr : instanceParamsStr;
|
final JSONObject params = Optional.ofNullable(finalParams).map(JSONObject::parseObject).orElse(new JSONObject());
|
|
if (isRootTask) {
|
|
omsLogger.info("[MapReduceDemo] [RootTask] start execute root task~");
|
|
/*
|
* rootTask 内的核心逻辑,即为按自己的业务需求拆分子任务。比如
|
* - 从数据库/数仓拉一批任务出来做计算,那 MAP 任务就可以 stream 读全库,每 N 个 ID 作为一个 SubTask 对外分发
|
* - 需要读取几千万个文件进行解析,那么 MAP 任务就可以将 N 个文件名作为一个 SubTask 对外分发,每个子任务接收到文件名称进行文件处理
|
*
|
* eg. 现在需要从文件中读取100W个ID,并处理数据库中这些ID对应的数据,那么步骤如下:
|
* 1. 根任务(RootTask)读取文件,流式拉取100W个ID,并按100个一批的大小组装成子任务进行派发
|
* 2. 非根任务获取子任务,完成业务逻辑的处理
|
*
|
* 以下 demo 进行该逻辑的模拟
|
*/
|
|
|
// 构造子任务
|
|
// 需要读取的文件总数
|
Long num = MapUtils.getLong(params, "num", 100000L);
|
// 每个子任务携带多少个文件ID(此参数越大,每个子任务就“越大”,如果失败的重试成本就越高。参数越小,每个子任务就越轻,当相应的分片数量会提升,会让 PowerJob 计算开销增大,建议按业务需求合理调配)
|
Long batchSize = MapUtils.getLong(params, "batchSize", 100L);
|
|
// 此处模拟从文件读取 num 个 ID,每个子任务携带 batchSize 个 ID 作为一个分片
|
List<Long> ids = Lists.newArrayList();
|
for (long i = 0; i < num; i++) {
|
ids.add(i);
|
|
if (ids.size() >= batchSize) {
|
|
// 构造自己的子任务,自行传递所有需要的参数
|
SubTask subTask = new SubTask(ThreadLocalRandom.current().nextLong(), Lists.newArrayList(ids), "extra");
|
ids.clear();
|
|
try {
|
/*
|
第一个参数:List<子任务>,map 支持批量操作以减少网络 IO 提升性能,简单起见此处不再示例,开发者可自行优化性能
|
第二个参数:子任务名称,即后续 Task 执行时从 TaskContext#taskName 拿到的值。某种意义上也可以按参数理解(比如多层 MAP 的情况下,taskName 可以命名为,Map_Level1, Map_Level2,最终按 taskName 判断层级进不同的执行分支)
|
*/
|
map(Lists.newArrayList(subTask), "L1_FILE_PROCESS");
|
} catch (Exception e) {
|
// 注意 MAP 操作可能抛出异常,建议进行捕获并按需处理
|
omsLogger.error("[MapReduceDemo] map task failed!", e);
|
throw e;
|
}
|
}
|
}
|
|
if (!ids.isEmpty()) {
|
map(Lists.newArrayList(new SubTask()), "L1_FILE_PROCESS");
|
}
|
|
// map 阶段的结果,由于前置逻辑为异常直接抛出,执行到这里一定成功,所以无脑设置为 success。开发者可自行调整逻辑
|
return new ProcessResult(true, "MAP_SUCCESS,totalNum:" + num);
|
|
}
|
|
// 如果是简单的二层结构(ROOT - SubTASK),此处一定是子 Task,无需再次判断。否则可使用 TaskContext#taskName 字符串匹配 或 TaskContext#SubTask 对象内自定义参数匹配,进入目标执行分支
|
|
// 获取前置节点 map 传递过来的参数,进行业务处理
|
SubTask subTask = (SubTask) context.getSubTask();
|
log.info("[MapReduceDemo] [SubTask] taskId:{}, taskName: {}, subTask: {}", context.getTaskId(), taskName, JsonUtils.toJSONString(subTask));
|
Thread.sleep(MapUtils.getLong(params, "bizProcessCost", 233L));
|
|
// 模拟有成功有失败的情况,开发者按真实业务执行情况判断即可
|
long successRate = MapUtils.getLong(params, "successRate", 80L);
|
long randomNum = ThreadLocalRandom.current().nextLong(100);
|
if (successRate > randomNum) {
|
return new ProcessResult(true, "PROCESS_SUCCESS:" + randomNum);
|
} else {
|
return new ProcessResult(false, "PROCESS_FAILED:" + randomNum);
|
}
|
}
|
|
@Override
|
public ProcessResult reduce(TaskContext context, List<TaskResult> taskResults) {
|
|
// 子任务结果太大,上报在线日志会有 IO 问题,直接使用本地日志打
|
log.info("List<TaskResult>: {}", JSONObject.toJSONString(taskResults));
|
|
OmsLogger omsLogger = context.getOmsLogger();
|
omsLogger.info("================ MapReduceProcessorDemo#reduce ================");
|
|
// 所有 Task 执行结束后,reduce 将会被执行,taskResults 保存了所有子任务的执行结果。(注意 reduce 由于保存了所有子任务的执行结果,在子任务规模巨大时对内存有极大开销,超大型计算任务慎用或使用流式 reduce(开发中))
|
|
// 用法举例:统计执行结果
|
AtomicLong successCnt = new AtomicLong(0);
|
AtomicLong failedCnt = new AtomicLong(0);
|
taskResults.forEach(tr -> {
|
if (tr.isSuccess()) {
|
successCnt.incrementAndGet();
|
} else {
|
failedCnt.incrementAndGet();
|
}
|
});
|
|
|
double successRate = 1.0 * successCnt.get() / (successCnt.get() + failedCnt.get());
|
|
String resultMsg = String.format("succeedTaskNum:%d,failedTaskNum:%d,successRate:%f", successCnt.get(), failedCnt.get(), successRate);
|
omsLogger.info("[MapReduceDemo] [Reduce] {}", resultMsg);
|
|
// reduce 阶段的结果,将作为任务真正执行结果
|
if (successRate > 0.8) {
|
return new ProcessResult(true, resultMsg);
|
} else {
|
return new ProcessResult(false, resultMsg);
|
}
|
|
}
|
|
|
/**
|
* 自定义的子任务,按自己的业务需求定义即可
|
* 注意:代表子任务参数的类:一定要有无参构造方法!一定要有无参构造方法!一定要有无参构造方法!
|
* 最好把 GET / SET 方法也加上,减少序列化问题的概率
|
*/
|
@Data
|
@AllArgsConstructor
|
public static class SubTask implements Serializable {
|
|
/**
|
* 再次强调,一定要有无参构造方法
|
*/
|
public SubTask() {
|
}
|
|
private Long siteId;
|
|
private List<Long> idList;
|
|
private String extra;
|
}
|
}
|