WangHan
2024-09-12 d5855a4926926698b740bc6c7ba489de47adb68b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package tech.powerjob.client.service.impl;
 
import lombok.extern.slf4j.Slf4j;
import tech.powerjob.client.ClientConfig;
import tech.powerjob.client.extension.ClientExtension;
import tech.powerjob.client.extension.ExtensionContext;
import tech.powerjob.client.service.HttpResponse;
import tech.powerjob.client.service.PowerRequestBody;
import tech.powerjob.client.service.RequestService;
import tech.powerjob.common.OpenAPIConstant;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.utils.CollectionUtils;
 
import javax.net.ssl.X509TrustManager;
import java.io.IOException;
import java.security.cert.X509Certificate;
import java.util.List;
import java.util.Objects;
 
/**
 * 集群请求服务
 * 封装网络相关通用逻辑
 *
 * @author tjq
 * @since 2024/2/21
 */
@Slf4j
abstract class ClusterRequestService implements RequestService {
 
    protected final ClientConfig config;
 
    /**
     * 当前地址(上次请求成功的地址)
     */
    protected String currentAddress;
 
    /**
     * 地址格式
     * 协议://域名/OpenAPI/子路径
     */
    protected static final String URL_PATTERN = "%s://%s%s%s";
 
    /**
     * 默认超时时间
     */
    protected static final Integer DEFAULT_TIMEOUT_SECONDS = 2;
 
    protected static final int HTTP_SUCCESS_CODE = 200;
 
    public ClusterRequestService(ClientConfig config) {
        this.config = config;
        this.currentAddress = config.getAddressList().get(0);
    }
 
    /**
     * 具体某一次 HTTP 请求的实现
     * @param url 完整请求地址
     * @param body 请求体
     * @return 响应
     * @throws IOException 异常
     */
    protected abstract HttpResponse sendHttpRequest(String url, PowerRequestBody body) throws IOException;
 
    /**
     * 封装集群请求能力
     * @param path 请求 PATH
     * @param powerRequestBody 请求体
     * @return 响应
     */
    protected HttpResponse clusterHaRequest(String path, PowerRequestBody powerRequestBody) {
 
        // 先尝试默认地址
        String url = getUrl(path, currentAddress);
        try {
            return sendHttpRequest(url, powerRequestBody);
        } catch (IOException e) {
            log.warn("[ClusterRequestService] request url:{} failed, reason is {}.", url, e.toString());
        }
 
        List<String> addressList = fetchAddressList();
 
        // 失败,开始重试
        for (String addr : addressList) {
            if (Objects.equals(addr, currentAddress)) {
                continue;
            }
            url = getUrl(path, addr);
            try {
                HttpResponse res = sendHttpRequest(url, powerRequestBody);
                log.warn("[ClusterRequestService] server change: from({}) -> to({}).", currentAddress, addr);
                currentAddress = addr;
                return res;
            } catch (IOException e) {
                log.warn("[ClusterRequestService] request url:{} failed, reason is {}.", url, e.toString());
            }
        }
 
        log.error("[ClusterRequestService] do post for path: {} failed because of no server available in {}.", path, addressList);
        throw new PowerJobException("no server available when send post request");
    }
 
    private List<String> fetchAddressList() {
 
        ClientExtension clientExtension = config.getClientExtension();
        if (clientExtension != null) {
            List<String> addressList = clientExtension.addressProvider(new ExtensionContext());
            if (!CollectionUtils.isEmpty(addressList)) {
                return addressList;
            }
        }
 
        return config.getAddressList();
    }
 
    /**
     * 不验证证书
     * X.509 是一个国际标准,定义了公钥证书的格式。这个标准是由国际电信联盟(ITU-T)制定的,用于公钥基础设施(PKI)中数字证书的创建和分发。X.509证书主要用于在公开网络上验证实体的身份,如服务器或客户端的身份验证过程中,确保通信双方是可信的。X.509证书广泛应用于多种安全协议中,包括SSL/TLS,它是实现HTTPS的基础。
     */
    protected static class NoVerifyX509TrustManager implements X509TrustManager {
        @Override
        public void checkClientTrusted(X509Certificate[] arg0, String arg1) {
        }
 
        @Override
        public void checkServerTrusted(X509Certificate[] arg0, String arg1) {
            // 不验证
        }
 
        @Override
        public X509Certificate[] getAcceptedIssuers() {
            return new X509Certificate[0];
        }
    }
 
 
    private String getUrl(String path, String address) {
        String protocol = config.getProtocol().getProtocol();
        return String.format(URL_PATTERN, protocol, address, OpenAPIConstant.WEB_PATH, path);
    }
}