package tech.powerjob.samples.tester; import com.alibaba.fastjson.JSONObject; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import tech.powerjob.client.ClientConfig; import tech.powerjob.client.IPowerJobClient; import tech.powerjob.client.PowerJobClient; import tech.powerjob.common.enums.ExecuteType; import tech.powerjob.common.enums.ProcessorType; import tech.powerjob.common.enums.SwitchableStatus; import tech.powerjob.common.enums.TimeExpressionType; import tech.powerjob.common.request.http.SaveJobInfoRequest; import tech.powerjob.common.response.InstanceInfoDTO; import tech.powerjob.common.response.JobInfoDTO; import tech.powerjob.common.response.ResultDTO; import tech.powerjob.official.processors.util.CommonUtils; import tech.powerjob.worker.core.processor.ProcessResult; import tech.powerjob.worker.core.processor.TaskContext; import tech.powerjob.worker.core.processor.sdk.BasicProcessor; import java.io.Serializable; import java.util.List; import java.util.Map; import java.util.Optional; /** * 测试 OpenAPI * * @author tjq * @since 2024/8/11 */ @Slf4j @Component public class OpenApiTester implements BasicProcessor { private final Map clientCache = Maps.newHashMap(); private static final String NEW_JOB_PARAMS = "{'aa':'bb'}"; private static final String RUN_INSTANCE_PARAMS = "{'key':'value'}"; private static final int NEW_JOB_MAX_INSTANCE_NUM = 9; @Override public ProcessResult process(TaskContext context) throws Exception { IPowerJobClient client = fetchClient(context); SaveJobInfoRequest saveJobInfoRequest = buildSaveJobInfoRequest(); context.getOmsLogger().info("[newJob] saveJobInfoRequest: {}", JSONObject.toJSONString(saveJobInfoRequest)); ResultDTO saveJobResult = client.saveJob(saveJobInfoRequest); context.getOmsLogger().info("[newJob] RESPONSE: {}", JSONObject.toJSONString(saveJobResult)); Long createdJobId = fetchResultData(saveJobResult); // 测试导出 ResultDTO exportJobResult = client.exportJob(createdJobId); context.getOmsLogger().info("[exportJob] exportJobResult: {}", JSONObject.toJSONString(exportJobResult)); SaveJobInfoRequest exportJobInfo = fetchResultData(exportJobResult); assert exportJobInfo.getJobParams().equals(saveJobInfoRequest.getJobParams()); assert exportJobInfo.getMaxInstanceNum().equals(saveJobInfoRequest.getMaxInstanceNum()); // 测试复制 context.getOmsLogger().info("[copyJob] sourceJobId: {}", createdJobId); ResultDTO copyJobResult = client.copyJob(createdJobId); context.getOmsLogger().info("[copyJob] copyJobResult: {}", JSONObject.toJSONString(copyJobResult)); Long copiedJobId = fetchResultData(copyJobResult); context.getOmsLogger().info("[disableJob] targetJobId: {}", copiedJobId); ResultDTO disableJobResult = client.disableJob(copiedJobId); fetchResultData(disableJobResult); context.getOmsLogger().info("[disableJob] disableJobResult: {}", disableJobResult); ResultDTO createdJobInfoResult = client.fetchJob(createdJobId); context.getOmsLogger().info("[fetchJob] createdJobInfo: {}", JSONObject.toJSONString(createdJobInfoResult)); ResultDTO copiedJobInfoResult = client.fetchJob(copiedJobId); context.getOmsLogger().info("[fetchJob] copiedJobInfo: {}", JSONObject.toJSONString(copiedJobInfoResult)); JobInfoDTO createdJob = fetchResultData(createdJobInfoResult); JobInfoDTO copiedJob = fetchResultData(copiedJobInfoResult); assert createdJob.getJobParams().equals(copiedJob.getJobParams()); assert createdJob.getMaxInstanceNum().equals(copiedJob.getMaxInstanceNum()); assert copiedJob.getStatus() == SwitchableStatus.DISABLE.getV(); ResultDTO enableJobResult = client.enableJob(copiedJob.getId()); fetchResultData(enableJobResult); context.getOmsLogger().info("[enableJob] enableJobResult: {}", JSONObject.toJSONString(enableJobResult)); // 再次查询验证 enable ResultDTO copiedJobInfoResult2 = client.fetchJob(copiedJobId); context.getOmsLogger().info("[fetchJob] copiedJobInfoResult2: {}", JSONObject.toJSONString(copiedJobInfoResult2)); JobInfoDTO copiedJob2 = fetchResultData(copiedJobInfoResult2); assert copiedJob2.getStatus() == SwitchableStatus.ENABLE.getV(); // 删除拷贝出来的任务 ResultDTO deleteJobResult = client.deleteJob(copiedJobId); context.getOmsLogger().info("[deleteJob] deleteJobResult: {}", JSONObject.toJSONString(deleteJobResult)); fetchResultData(deleteJobResult); // 执行任务 ResultDTO runJobResult = client.runJob(createdJobId, RUN_INSTANCE_PARAMS, 0); context.getOmsLogger().info("[runJob] runJobResult: {}", JSONObject.toJSONString(runJobResult)); Long instanceId = fetchResultData(runJobResult); // 等10S,理论上应该能执行完成 Thread.sleep(10000); // 查询任务详情和状态 ResultDTO fetchInstanceInfoResult = client.fetchInstanceInfo(instanceId); context.getOmsLogger().info("[fetchInstanceInfo] fetchInstanceInfoResult: {}", JSONObject.toJSONString(fetchInstanceInfoResult)); InstanceInfoDTO instanceInfoDTO = fetchResultData(fetchInstanceInfoResult); ResultDTO fetchInstanceStatusResult = client.fetchInstanceStatus(instanceId); context.getOmsLogger().info("[fetchInstanceStatus] fetchInstanceStatusResult: {}", JSONObject.toJSONString(fetchInstanceStatusResult)); Integer instanceStatus = fetchResultData(fetchInstanceStatusResult); assert instanceInfoDTO.getStatus() == instanceStatus; // 回收全部资源 ResultDTO deleteCreatedJobResult = client.deleteJob(createdJobId); context.getOmsLogger().info("[deleteJob] deleteCreatedJobResult: {}", JSONObject.toJSONString(deleteCreatedJobResult)); fetchResultData(deleteCreatedJobResult); return new ProcessResult(true); } private static T fetchResultData(ResultDTO resultDTO) { if (resultDTO.isSuccess()) { return resultDTO.getData(); } throw new RuntimeException(resultDTO.getMessage()); } private SaveJobInfoRequest buildSaveJobInfoRequest() { SaveJobInfoRequest newJobInfo = new SaveJobInfoRequest(); newJobInfo.setJobName("JobCreateByOpenAPI"); newJobInfo.setJobDescription("Timestamp: " + System.currentTimeMillis()); newJobInfo.setJobParams(NEW_JOB_PARAMS); newJobInfo.setTimeExpressionType(TimeExpressionType.API); newJobInfo.setExecuteType(ExecuteType.STANDALONE); newJobInfo.setProcessorType(ProcessorType.BUILT_IN); newJobInfo.setProcessorInfo("tech.powerjob.samples.processors.StandaloneProcessorDemo"); newJobInfo.setMaxInstanceNum(NEW_JOB_MAX_INSTANCE_NUM); newJobInfo.setMinCpuCores(0.01); newJobInfo.setMinMemorySpace(0.02); newJobInfo.setMinDiskSpace(0.03); return newJobInfo; } private IPowerJobClient fetchClient(TaskContext context) { String params = CommonUtils.parseParams(context); Config clientConfig = Optional.ofNullable(params).map(x -> JSONObject.parseObject(params, Config.class)).orElse(new Config()); String appName = Optional.ofNullable(clientConfig.getAppName()).orElse("powerjob-worker-samples"); String password = Optional.ofNullable(clientConfig.getPassword()).orElse("powerjob123"); List addressList = Optional.ofNullable(clientConfig.getAddressList()).orElse(Lists.newArrayList("127.0.0.1:7700", "127.0.0.1:7701")); String key = String.format("client_%s_%s_%s", appName, password, addressList); return clientCache.computeIfAbsent(key, ignore -> initPowerJobClient(appName, password, addressList)); } private IPowerJobClient initPowerJobClient(String appName, String password, List addressList) { ClientConfig config = new ClientConfig(); config.setAppName(appName); config.setPassword(password); config.setAddressList(addressList); log.info("[OpenApiTester] initPowerJobClient, config: {}", config); return new PowerJobClient(config); } @Data public static class Config implements Serializable { private String appName; private String password; private List addressList; } }