package tech.powerjob.client.service.impl; import lombok.extern.slf4j.Slf4j; import tech.powerjob.client.ClientConfig; import tech.powerjob.client.extension.ClientExtension; import tech.powerjob.client.extension.ExtensionContext; import tech.powerjob.client.service.HttpResponse; import tech.powerjob.client.service.PowerRequestBody; import tech.powerjob.client.service.RequestService; import tech.powerjob.common.OpenAPIConstant; import tech.powerjob.common.exception.PowerJobException; import tech.powerjob.common.utils.CollectionUtils; import javax.net.ssl.X509TrustManager; import java.io.IOException; import java.security.cert.X509Certificate; import java.util.List; import java.util.Objects; /** * 集群请求服务 * 封装网络相关通用逻辑 * * @author tjq * @since 2024/2/21 */ @Slf4j abstract class ClusterRequestService implements RequestService { protected final ClientConfig config; /** * 当前地址(上次请求成功的地址) */ protected String currentAddress; /** * 地址格式 * 协议://域名/OpenAPI/子路径 */ protected static final String URL_PATTERN = "%s://%s%s%s"; /** * 默认超时时间 */ protected static final Integer DEFAULT_TIMEOUT_SECONDS = 2; protected static final int HTTP_SUCCESS_CODE = 200; public ClusterRequestService(ClientConfig config) { this.config = config; this.currentAddress = config.getAddressList().get(0); } /** * 具体某一次 HTTP 请求的实现 * @param url 完整请求地址 * @param body 请求体 * @return 响应 * @throws IOException 异常 */ protected abstract HttpResponse sendHttpRequest(String url, PowerRequestBody body) throws IOException; /** * 封装集群请求能力 * @param path 请求 PATH * @param powerRequestBody 请求体 * @return 响应 */ protected HttpResponse clusterHaRequest(String path, PowerRequestBody powerRequestBody) { // 先尝试默认地址 String url = getUrl(path, currentAddress); try { return sendHttpRequest(url, powerRequestBody); } catch (IOException e) { log.warn("[ClusterRequestService] request url:{} failed, reason is {}.", url, e.toString()); } List addressList = fetchAddressList(); // 失败,开始重试 for (String addr : addressList) { if (Objects.equals(addr, currentAddress)) { continue; } url = getUrl(path, addr); try { HttpResponse res = sendHttpRequest(url, powerRequestBody); log.warn("[ClusterRequestService] server change: from({}) -> to({}).", currentAddress, addr); currentAddress = addr; return res; } catch (IOException e) { log.warn("[ClusterRequestService] request url:{} failed, reason is {}.", url, e.toString()); } } log.error("[ClusterRequestService] do post for path: {} failed because of no server available in {}.", path, addressList); throw new PowerJobException("no server available when send post request"); } private List fetchAddressList() { ClientExtension clientExtension = config.getClientExtension(); if (clientExtension != null) { List addressList = clientExtension.addressProvider(new ExtensionContext()); if (!CollectionUtils.isEmpty(addressList)) { return addressList; } } return config.getAddressList(); } /** * 不验证证书 * X.509 是一个国际标准,定义了公钥证书的格式。这个标准是由国际电信联盟(ITU-T)制定的,用于公钥基础设施(PKI)中数字证书的创建和分发。X.509证书主要用于在公开网络上验证实体的身份,如服务器或客户端的身份验证过程中,确保通信双方是可信的。X.509证书广泛应用于多种安全协议中,包括SSL/TLS,它是实现HTTPS的基础。 */ protected static class NoVerifyX509TrustManager implements X509TrustManager { @Override public void checkClientTrusted(X509Certificate[] arg0, String arg1) { } @Override public void checkServerTrusted(X509Certificate[] arg0, String arg1) { // 不验证 } @Override public X509Certificate[] getAcceptedIssuers() { return new X509Certificate[0]; } } private String getUrl(String path, String address) { String protocol = config.getProtocol().getProtocol(); return String.format(URL_PATTERN, protocol, address, OpenAPIConstant.WEB_PATH, path); } }