WangHan
2024-09-12 d5855a4926926698b740bc6c7ba489de47adb68b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
package tech.powerjob.server.remote.transporter.impl;
 
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Service;
import tech.powerjob.common.OmsConstant;
import tech.powerjob.common.PowerSerializable;
import tech.powerjob.common.enums.Protocol;
import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.remote.framework.actor.Actor;
import tech.powerjob.remote.framework.base.Address;
import tech.powerjob.remote.framework.base.RemotingException;
import tech.powerjob.remote.framework.base.ServerType;
import tech.powerjob.remote.framework.base.URL;
import tech.powerjob.remote.framework.engine.EngineConfig;
import tech.powerjob.remote.framework.engine.EngineOutput;
import tech.powerjob.remote.framework.engine.RemoteEngine;
import tech.powerjob.remote.framework.engine.impl.PowerJobRemoteEngine;
import tech.powerjob.server.remote.transporter.ProtocolInfo;
import tech.powerjob.server.remote.transporter.TransportService;
 
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionStage;
 
/**
 * server 数据传输服务
 *
 * @author tjq
 * @since 2023/1/21
 */
@Slf4j
@Service
public class PowerTransportService implements TransportService, InitializingBean, DisposableBean, ApplicationContextAware {
 
    /**
     * server 需要激活的通讯协议,建议激活全部支持的协议
     */
    @Value("${oms.transporter.active.protocols}")
    private String activeProtocols;
 
    /**
     * 主要通讯协议,用于 server 与 server 之间的通讯,用户必须保证该协议可用(端口开放)!
     */
    @Value("${oms.transporter.main.protocol}")
    private String mainProtocol;
 
    private static final String PROTOCOL_PORT_CONFIG = "oms.%s.port";
 
    private final Environment environment;
 
    private ProtocolInfo defaultProtocol;
    private final Map<String, ProtocolInfo> protocolName2Info = Maps.newHashMap();
 
    private final List<RemoteEngine> engines = Lists.newArrayList();
 
    private ApplicationContext applicationContext;
 
    public PowerTransportService(Environment environment) {
        this.environment = environment;
    }
 
    @Override
    public ProtocolInfo defaultProtocol() {
        return defaultProtocol;
    }
 
    @Override
    public Map<String, ProtocolInfo> allProtocols() {
        return protocolName2Info;
    }
 
    private ProtocolInfo fetchProtocolInfo(String protocol) {
        // 兼容老版 worker 未上报 protocol 的情况
        protocol = compatibleProtocol(protocol);
        final ProtocolInfo protocolInfo = protocolName2Info.get(protocol);
        if (protocolInfo == null) {
            throw new IllegalArgumentException("can't find Transporter by protocol :" + protocol);
        }
        return protocolInfo;
    }
 
    @Override
    public void tell(String protocol, URL url, PowerSerializable request) {
        fetchProtocolInfo(protocol).getTransporter().tell(url, request);
    }
 
    @Override
    public <T> CompletionStage<T> ask(String protocol, URL url, PowerSerializable request, Class<T> clz) throws RemotingException {
        return fetchProtocolInfo(protocol).getTransporter().ask(url, request, clz);
    }
 
    private void initRemoteFrameWork(String protocol, int port) {
 
        // 从构造器注入改为从 applicationContext 获取来避免循环依赖
        final Map<String, Object> beansWithAnnotation = applicationContext.getBeansWithAnnotation(Actor.class);
        log.info("[PowerTransportService] [{}] find Actor num={},names={}", protocol, beansWithAnnotation.size(), beansWithAnnotation.keySet());
 
        Address address = new Address()
                .setHost(NetUtils.getLocalHost())
                .setPort(port);
 
        ProtocolInfo protocolInfo = new ProtocolInfo(protocol, address.getHost(), address.getPort(), null);
 
        EngineConfig engineConfig = new EngineConfig()
                .setServerType(ServerType.SERVER)
                .setType(protocol.toUpperCase())
                .setBindAddress(address)
                .setActorList(Lists.newArrayList(beansWithAnnotation.values()));
 
        if (!StringUtils.equalsIgnoreCase(protocolInfo.getExternalAddress(), protocolInfo.getAddress())) {
            Address externalAddress = Address.fromIpv4(protocolInfo.getExternalAddress());
            engineConfig.setExternalAddress(externalAddress);
            log.info("[PowerTransportService] [{}] exist externalAddress: {}", protocol, externalAddress);
        }
 
        log.info("[PowerTransportService] [{}] start to initialize RemoteEngine[address={}]", protocol, address);
        RemoteEngine re = new PowerJobRemoteEngine();
        final EngineOutput engineOutput = re.start(engineConfig);
        log.info("[PowerTransportService] [{}] start RemoteEngine[address={}] successfully", protocol, address);
 
        this.engines.add(re);
 
        protocolInfo.setTransporter(engineOutput.getTransporter());
        this.protocolName2Info.put(protocol, protocolInfo);
    }
 
    @Override
    public void afterPropertiesSet() throws Exception {
 
        log.info("[PowerTransportService] start to initialize whole PowerTransportService!");
        log.info("[PowerTransportService] activeProtocols: {}", activeProtocols);
 
        if (StringUtils.isEmpty(activeProtocols)) {
            throw new IllegalArgumentException("activeProtocols can't be empty!");
        }
 
        for (String protocol : activeProtocols.split(OmsConstant.COMMA)) {
            try {
                final int port = parseProtocolPort(protocol);
                initRemoteFrameWork(protocol, port);
            } catch (Throwable t) {
                log.error("[PowerTransportService] initialize protocol[{}] failed. If you don't need to use this protocol, you can turn it off by 'oms.transporter.active.protocols'", protocol);
                ExceptionUtils.rethrow(t);
            }
        }
 
        choseDefault();
 
        log.info("[PowerTransportService] initialize successfully!");
        log.info("[PowerTransportService] ALL_PROTOCOLS: {}", protocolName2Info);
    }
 
    /**
     * 获取协议端口,考虑兼容性 & 用户仔细扩展的场景,选择动态从 env 获取 port
     * @return port
     */
    private int parseProtocolPort(String protocol) {
        final String key1 = String.format(PROTOCOL_PORT_CONFIG, protocol.toLowerCase());
        final String key2 = String.format(PROTOCOL_PORT_CONFIG, protocol.toUpperCase());
        String portStr = environment.getProperty(key1);
        if (StringUtils.isEmpty(portStr)) {
            portStr = environment.getProperty(key2);
        }
        log.info("[PowerTransportService] fetch port for protocol[{}], key={}, value={}", protocol, key1, portStr);
 
        if (StringUtils.isEmpty(portStr)) {
            throw new IllegalArgumentException(String.format("can't find protocol config by key: %s, please check your spring config!", key1));
        }
 
        return Integer.parseInt(portStr);
    }
 
    private String compatibleProtocol(String p) {
        if (p == null) {
            return Protocol.AKKA.name();
        }
        return p;
    }
 
    /**
     * HTTP 优先,否则默认取第一个协议
     */
    private void choseDefault() {
 
 
        this.defaultProtocol = this.protocolName2Info.get(mainProtocol);
        log.info("[PowerTransportService] chose [{}] as the default protocol, make sure this protocol can work!", mainProtocol);
 
        if (this.defaultProtocol == null) {
            throw new IllegalArgumentException("can't find default protocol, please check your config!");
        }
    }
 
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
 
    @Override
    public void destroy() throws Exception {
        engines.forEach(e -> {
            try {
                e.close();
            } catch (Exception ignore) {
            }
        });
    }
}