package tech.powerjob.samples.mr; import tech.powerjob.worker.core.processor.ProcessResult; import tech.powerjob.worker.core.processor.TaskContext; import tech.powerjob.worker.core.processor.TaskResult; import tech.powerjob.worker.core.processor.sdk.MapReduceProcessor; import com.google.common.collect.Lists; import java.util.List; /** * 模拟 DAG 的处理器(在正式提供DAG支持前,可用该方法代替) * * ROOT -> A -> B -> REDUCE * -> C * * @author tjq * @since 2020/5/15 */ public class DAGSimulationProcessor implements MapReduceProcessor { @Override public ProcessResult process(TaskContext context) throws Exception { if (isRootTask()) { // L1. 执行根任务 // 执行完毕后产生子任务 A,需要传递的参数可以作为 TaskA 的属性进行传递 TaskA taskA = new TaskA(); try { map(Lists.newArrayList(taskA), "LEVEL1_TASK_A"); return new ProcessResult(true, "map success"); } catch (Exception e) { return new ProcessResult(false, "map failed"); } } if (context.getSubTask() instanceof TaskA) { // L2. 执行A任务 // 执行完成后产生子任务 B,C(并行执行) TaskB taskB = new TaskB(); TaskC taskC = new TaskC(); try { map(Lists.newArrayList(taskB, taskC), "LEVEL2_TASK_BC"); return new ProcessResult(true, "map success"); } catch (Exception ignore) { } } if (context.getSubTask() instanceof TaskB) { // L3. 执行B任务 return new ProcessResult(true, "xxx"); } if (context.getSubTask() instanceof TaskC) { // L3. 执行C任务 return new ProcessResult(true, "xxx"); } return new ProcessResult(false, "UNKNOWN_TYPE_OF_SUB_TASK"); } @Override public ProcessResult reduce(TaskContext context, List taskResults) { // L4. 执行最终 Reduce 任务,taskResults保存了之前所有任务的结果 taskResults.forEach(taskResult -> { // do something... }); return new ProcessResult(true, "reduce success"); } private static class TaskA { } private static class TaskB { } private static class TaskC { } }