package tech.powerjob.official.processors.impl; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONValidator; import tech.powerjob.worker.core.processor.ProcessResult; import tech.powerjob.worker.core.processor.TaskContext; import tech.powerjob.worker.log.OmsLogger; import lombok.Data; import okhttp3.*; import org.apache.commons.lang3.StringUtils; import tech.powerjob.official.processors.CommonBasicProcessor; import tech.powerjob.official.processors.util.CommonUtils; import java.time.Duration; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; /** * common http processor * * @author tjq * @author Jiang Jining * @since 2021/1/30 */ public class HttpProcessor extends CommonBasicProcessor { /** * Default timeout is 60 seconds. */ private static final int DEFAULT_TIMEOUT = 60; private static final int HTTP_SUCCESS_CODE = 200; private static final Map CLIENT_STORE = new ConcurrentHashMap<>(); @Override public ProcessResult process0(TaskContext taskContext) throws Exception { OmsLogger omsLogger = taskContext.getOmsLogger(); HttpParams httpParams = JSON.parseObject(CommonUtils.parseParams(taskContext), HttpParams.class); if (httpParams == null) { String message = "httpParams is null, please check jobParam configuration."; omsLogger.warn(message); return new ProcessResult(false, message); } if (StringUtils.isEmpty(httpParams.url)) { return new ProcessResult(false, "url can't be empty!"); } if (!httpParams.url.startsWith("http")) { httpParams.url = "http://" + httpParams.url; } omsLogger.info("request url: {}", httpParams.url); // set default method if (StringUtils.isEmpty(httpParams.method)) { httpParams.method = "GET"; omsLogger.info("using default request method: GET"); } else { httpParams.method = httpParams.method.toUpperCase(); omsLogger.info("request method: {}", httpParams.method); } // set default mediaType if (!"GET".equals(httpParams.method)) { // set default request body if (StringUtils.isEmpty(httpParams.body)) { httpParams.body = new JSONObject().toJSONString(); omsLogger.warn("try to use default request body:{}", httpParams.body); } if (JSONValidator.from(httpParams.body).validate() && StringUtils.isEmpty(httpParams.mediaType)) { httpParams.mediaType = "application/json"; omsLogger.warn("try to use 'application/json' as media type"); } } // set default timeout if (httpParams.timeout == null) { httpParams.timeout = DEFAULT_TIMEOUT; } omsLogger.info("request timeout: {} seconds", httpParams.timeout); OkHttpClient client = getClient(httpParams.timeout); Request.Builder builder = new Request.Builder().url(httpParams.url); if (httpParams.headers != null) { httpParams.headers.forEach((k, v) -> { builder.addHeader(k, v); omsLogger.info("add header {}:{}", k, v); }); } switch (httpParams.method) { case "PUT": case "DELETE": case "POST": MediaType mediaType = MediaType.parse(httpParams.mediaType); omsLogger.info("mediaType: {}", mediaType); RequestBody requestBody = RequestBody.create(mediaType, httpParams.body); builder.method(httpParams.method, requestBody); break; default: builder.get(); } Response response = client.newCall(builder.build()).execute(); omsLogger.info("response: {}", response); String msgBody = ""; if (response.body() != null) { msgBody = response.body().string(); } int responseCode = response.code(); String res = String.format("code:%d, body:%s", responseCode, msgBody); boolean success = true; if (responseCode != HTTP_SUCCESS_CODE) { success = false; omsLogger.warn("{} url: {} failed, response code is {}, response body is {}", httpParams.method, httpParams.url, responseCode, msgBody); } return new ProcessResult(success, res); } @Data public static class HttpParams { /** * POST / GET / PUT / DELETE */ private String method; /** * the request url */ private String url; /** * application/json * application/xml * image/png * image/jpeg * image/gif */ private String mediaType; private String body; private Map headers; /** * timeout for complete calls */ private Integer timeout; } private static OkHttpClient getClient(Integer timeout) { return CLIENT_STORE.computeIfAbsent(timeout, ignore -> new OkHttpClient.Builder() .connectTimeout(Duration.ZERO) .readTimeout(Duration.ZERO) .writeTimeout(Duration.ZERO) .callTimeout(timeout, TimeUnit.SECONDS) .build()); } }