package tech.powerjob.samples.processors; import tech.powerjob.common.serialize.JsonUtils; import tech.powerjob.samples.MysteryService; import tech.powerjob.worker.core.processor.ProcessResult; import tech.powerjob.worker.core.processor.TaskContext; import tech.powerjob.worker.core.processor.sdk.MapProcessor; import com.google.common.collect.Lists; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NoArgsConstructor; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.List; import java.util.concurrent.ThreadLocalRandom; /** * Map处理器 示例 * * @author tjq * @since 2020/4/18 */ @Component public class MapProcessorDemo implements MapProcessor { @Resource private MysteryService mysteryService; /** * 每一批发送任务大小 */ private static final int BATCH_SIZE = 100; /** * 发送的批次 */ private static final int BATCH_NUM = 5; @Override public ProcessResult process(TaskContext context) throws Exception { log.info("============== MapProcessorDemo#process =============="); log.info("isRootTask:{}", isRootTask()); log.info("taskContext:{}", JsonUtils.toJSONString(context)); log.info("{}", mysteryService.hasaki()); if (isRootTask()) { log.info("==== MAP ===="); List subTasks = Lists.newLinkedList(); for (int j = 0; j < BATCH_NUM; j++) { SubTask subTask = new SubTask(); subTask.siteId = j; subTask.itemIds = Lists.newLinkedList(); subTasks.add(subTask); for (int i = 0; i < BATCH_SIZE; i++) { subTask.itemIds.add(i + j * 100); } } map(subTasks, "MAP_TEST_TASK"); return new ProcessResult(true, "map successfully"); } else { log.info("==== PROCESS ===="); SubTask subTask = (SubTask) context.getSubTask(); for (Integer itemId : subTask.getItemIds()) { if (Thread.interrupted()) { // 任务被中断 log.info("job has been stop! so stop to process subTask: {} => {}", subTask.getSiteId(), itemId); break; } log.info("processing subTask: {} => {}", subTask.getSiteId(), itemId); int max = Integer.MAX_VALUE >> 7; for (int i = 0; ; i++) { // 模拟耗时操作 if (i > max) { break; } } } // 测试在 Map 任务中追加上下文 context.getWorkflowContext().appendData2WfContext("Yasuo", "A sword's poor company for a long road."); boolean b = ThreadLocalRandom.current().nextBoolean(); if (context.getCurrentRetryTimes() >= 1) { // 重试的话一定会成功 b = true; } return new ProcessResult(b, "RESULT:" + b); } } @Getter @NoArgsConstructor @AllArgsConstructor public static class SubTask { private Integer siteId; private List itemIds; } }