package tech.powerjob.server.remote.transporter.impl; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; import org.springframework.beans.BeansException; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.core.env.Environment; import org.springframework.stereotype.Service; import tech.powerjob.common.OmsConstant; import tech.powerjob.common.PowerSerializable; import tech.powerjob.common.enums.Protocol; import tech.powerjob.common.utils.NetUtils; import tech.powerjob.remote.framework.actor.Actor; import tech.powerjob.remote.framework.base.Address; import tech.powerjob.remote.framework.base.RemotingException; import tech.powerjob.remote.framework.base.ServerType; import tech.powerjob.remote.framework.base.URL; import tech.powerjob.remote.framework.engine.EngineConfig; import tech.powerjob.remote.framework.engine.EngineOutput; import tech.powerjob.remote.framework.engine.RemoteEngine; import tech.powerjob.remote.framework.engine.impl.PowerJobRemoteEngine; import tech.powerjob.server.remote.transporter.ProtocolInfo; import tech.powerjob.server.remote.transporter.TransportService; import java.util.List; import java.util.Map; import java.util.concurrent.CompletionStage; /** * server 数据传输服务 * * @author tjq * @since 2023/1/21 */ @Slf4j @Service public class PowerTransportService implements TransportService, InitializingBean, DisposableBean, ApplicationContextAware { /** * server 需要激活的通讯协议,建议激活全部支持的协议 */ @Value("${oms.transporter.active.protocols}") private String activeProtocols; /** * 主要通讯协议,用于 server 与 server 之间的通讯,用户必须保证该协议可用(端口开放)! */ @Value("${oms.transporter.main.protocol}") private String mainProtocol; private static final String PROTOCOL_PORT_CONFIG = "oms.%s.port"; private final Environment environment; private ProtocolInfo defaultProtocol; private final Map protocolName2Info = Maps.newHashMap(); private final List engines = Lists.newArrayList(); private ApplicationContext applicationContext; public PowerTransportService(Environment environment) { this.environment = environment; } @Override public ProtocolInfo defaultProtocol() { return defaultProtocol; } @Override public Map allProtocols() { return protocolName2Info; } private ProtocolInfo fetchProtocolInfo(String protocol) { // 兼容老版 worker 未上报 protocol 的情况 protocol = compatibleProtocol(protocol); final ProtocolInfo protocolInfo = protocolName2Info.get(protocol); if (protocolInfo == null) { throw new IllegalArgumentException("can't find Transporter by protocol :" + protocol); } return protocolInfo; } @Override public void tell(String protocol, URL url, PowerSerializable request) { fetchProtocolInfo(protocol).getTransporter().tell(url, request); } @Override public CompletionStage ask(String protocol, URL url, PowerSerializable request, Class clz) throws RemotingException { return fetchProtocolInfo(protocol).getTransporter().ask(url, request, clz); } private void initRemoteFrameWork(String protocol, int port) { // 从构造器注入改为从 applicationContext 获取来避免循环依赖 final Map beansWithAnnotation = applicationContext.getBeansWithAnnotation(Actor.class); log.info("[PowerTransportService] [{}] find Actor num={},names={}", protocol, beansWithAnnotation.size(), beansWithAnnotation.keySet()); Address address = new Address() .setHost(NetUtils.getLocalHost()) .setPort(port); ProtocolInfo protocolInfo = new ProtocolInfo(protocol, address.getHost(), address.getPort(), null); EngineConfig engineConfig = new EngineConfig() .setServerType(ServerType.SERVER) .setType(protocol.toUpperCase()) .setBindAddress(address) .setActorList(Lists.newArrayList(beansWithAnnotation.values())); if (!StringUtils.equalsIgnoreCase(protocolInfo.getExternalAddress(), protocolInfo.getAddress())) { Address externalAddress = Address.fromIpv4(protocolInfo.getExternalAddress()); engineConfig.setExternalAddress(externalAddress); log.info("[PowerTransportService] [{}] exist externalAddress: {}", protocol, externalAddress); } log.info("[PowerTransportService] [{}] start to initialize RemoteEngine[address={}]", protocol, address); RemoteEngine re = new PowerJobRemoteEngine(); final EngineOutput engineOutput = re.start(engineConfig); log.info("[PowerTransportService] [{}] start RemoteEngine[address={}] successfully", protocol, address); this.engines.add(re); protocolInfo.setTransporter(engineOutput.getTransporter()); this.protocolName2Info.put(protocol, protocolInfo); } @Override public void afterPropertiesSet() throws Exception { log.info("[PowerTransportService] start to initialize whole PowerTransportService!"); log.info("[PowerTransportService] activeProtocols: {}", activeProtocols); if (StringUtils.isEmpty(activeProtocols)) { throw new IllegalArgumentException("activeProtocols can't be empty!"); } for (String protocol : activeProtocols.split(OmsConstant.COMMA)) { try { final int port = parseProtocolPort(protocol); initRemoteFrameWork(protocol, port); } catch (Throwable t) { log.error("[PowerTransportService] initialize protocol[{}] failed. If you don't need to use this protocol, you can turn it off by 'oms.transporter.active.protocols'", protocol); ExceptionUtils.rethrow(t); } } choseDefault(); log.info("[PowerTransportService] initialize successfully!"); log.info("[PowerTransportService] ALL_PROTOCOLS: {}", protocolName2Info); } /** * 获取协议端口,考虑兼容性 & 用户仔细扩展的场景,选择动态从 env 获取 port * @return port */ private int parseProtocolPort(String protocol) { final String key1 = String.format(PROTOCOL_PORT_CONFIG, protocol.toLowerCase()); final String key2 = String.format(PROTOCOL_PORT_CONFIG, protocol.toUpperCase()); String portStr = environment.getProperty(key1); if (StringUtils.isEmpty(portStr)) { portStr = environment.getProperty(key2); } log.info("[PowerTransportService] fetch port for protocol[{}], key={}, value={}", protocol, key1, portStr); if (StringUtils.isEmpty(portStr)) { throw new IllegalArgumentException(String.format("can't find protocol config by key: %s, please check your spring config!", key1)); } return Integer.parseInt(portStr); } private String compatibleProtocol(String p) { if (p == null) { return Protocol.AKKA.name(); } return p; } /** * HTTP 优先,否则默认取第一个协议 */ private void choseDefault() { this.defaultProtocol = this.protocolName2Info.get(mainProtocol); log.info("[PowerTransportService] chose [{}] as the default protocol, make sure this protocol can work!", mainProtocol); if (this.defaultProtocol == null) { throw new IllegalArgumentException("can't find default protocol, please check your config!"); } } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } @Override public void destroy() throws Exception { engines.forEach(e -> { try { e.close(); } catch (Exception ignore) { } }); } }