package tech.powerjob.worker.test.processors.demo; import tech.powerjob.worker.core.processor.ProcessResult; import tech.powerjob.worker.core.processor.TaskContext; import tech.powerjob.worker.core.processor.TaskResult; import tech.powerjob.worker.core.processor.sdk.MapReduceProcessor; import com.google.common.collect.Lists; import java.util.List; import java.util.concurrent.atomic.AtomicLong; /** * 示例-MapReduce任务处理器 * * @author tjq * @since 2020/4/15 */ public class MapReduceProcessorDemo implements MapReduceProcessor { @Override public ProcessResult process(TaskContext context) throws Exception { // 判断是否为根任务 if (isRootTask()) { // 构造子任务 List subTaskList = Lists.newLinkedList(); /* * 子任务的构造由开发者自己定义 * eg. 现在需要从文件中读取100W个ID,并处理数据库中这些ID对应的数据,那么步骤如下: * 1. 根任务(RootTask)读取文件,流式拉取100W个ID,并按1000个一批的大小组装成子任务进行派发 * 2. 非根任务获取子任务,完成业务逻辑的处理 */ // 调用 map 方法,派发子任务 map(subTaskList, "DATA_PROCESS_TASK"); return new ProcessResult(true, "map successfully"); } // 非子任务,可根据 subTask 的类型 或 TaskName 来判断分支 if (context.getSubTask() instanceof SubTask) { // 执行子任务,注:子任务人可以 map 产生新的子任务,可以构建任意级的 MapReduce 处理器 return new ProcessResult(true, "PROCESS_SUB_TASK_SUCCESS"); } return new ProcessResult(false, "UNKNOWN_BUG"); } @Override public ProcessResult reduce(TaskContext taskContext, List taskResults) { // 所有 Task 执行结束后,reduce 将会被执行 // taskResults 保存了所有子任务的执行结果 // 用法举例,统计执行结果 AtomicLong successCnt = new AtomicLong(0); taskResults.forEach(tr -> { if (tr.isSuccess()) { successCnt.incrementAndGet(); } }); return new ProcessResult(true, "success task num:" + successCnt.get()); } // 自定义的子任务 private static class SubTask { private Long siteId; private List idList; } }