package tech.powerjob.samples.mr; import tech.powerjob.worker.core.processor.ProcessResult; import tech.powerjob.worker.core.processor.TaskContext; import tech.powerjob.worker.core.processor.TaskResult; import tech.powerjob.worker.core.processor.sdk.MapReduceProcessor; import tech.powerjob.worker.log.OmsLogger; import com.google.common.base.Splitter; import com.google.common.collect.Lists; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NoArgsConstructor; import org.springframework.stereotype.Component; import java.util.List; import java.util.Map; /** * MapReduce 模拟 静态分片 * 典型的杀鸡焉用牛刀~ * * @author tjq * @since 2020/5/21 */ @Component public class StaticSliceProcessor implements MapReduceProcessor { @Override public ProcessResult process(TaskContext context) throws Exception { OmsLogger omsLogger = context.getOmsLogger(); // root task 负责分发任务 if (isRootTask()) { // 从控制台传递分片参数,假设格式为KV:1=a&2=b&3=c String jobParams = context.getJobParams(); Map paramsMap = Splitter.on("&").withKeyValueSeparator("=").split(jobParams); List subTasks = Lists.newLinkedList(); paramsMap.forEach((k, v) -> subTasks.add(new SubTask(Integer.parseInt(k), v))); map(subTasks, "SLICE_TASK"); return new ProcessResult(true, "map successfully"); } Object subTask = context.getSubTask(); if (subTask instanceof SubTask) { // 实际处理 // 当然,如果觉得 subTask 还是很大,也可以继续分发哦 return new ProcessResult(true, "subTask:" + ((SubTask) subTask).getIndex() + " process successfully"); } return new ProcessResult(false, "UNKNOWN BUG"); } @Override public ProcessResult reduce(TaskContext context, List taskResults) { // 按需求做一些统计工作... 不需要的话,直接使用 Map 处理器即可 return new ProcessResult(true, "xxxx"); } @Getter @NoArgsConstructor @AllArgsConstructor private static class SubTask { private int index; private String params; } }