package tech.powerjob.server.remote.server.election; import com.alibaba.fastjson.JSONObject; import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import tech.powerjob.common.exception.PowerJobException; import tech.powerjob.common.request.ServerDiscoveryRequest; import tech.powerjob.common.response.AskResponse; import tech.powerjob.common.serialize.JsonUtils; import tech.powerjob.remote.framework.base.URL; import tech.powerjob.server.extension.LockService; import tech.powerjob.server.persistence.remote.model.AppInfoDO; import tech.powerjob.server.persistence.remote.repository.AppInfoRepository; import tech.powerjob.server.remote.transporter.ProtocolInfo; import tech.powerjob.server.remote.transporter.TransportService; import tech.powerjob.server.remote.transporter.impl.ServerURLFactory; import java.util.Date; import java.util.Optional; import java.util.Set; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** * Default server election policy, first-come, first-served, no load balancing capability * * @author tjq * @since 2021/2/9 */ @Slf4j @Service public class ServerElectionService { private final LockService lockService; private final TransportService transportService; private final AppInfoRepository appInfoRepository; private final int accurateSelectServerPercentage; private static final int RETRY_TIMES = 10; private static final long PING_TIMEOUT_MS = 1000; private static final String SERVER_ELECT_LOCK = "server_elect_%d"; public ServerElectionService(LockService lockService, TransportService transportService, AppInfoRepository appInfoRepository,@Value("${oms.accurate.select.server.percentage}") int accurateSelectServerPercentage) { this.lockService = lockService; this.transportService = transportService; this.appInfoRepository = appInfoRepository; this.accurateSelectServerPercentage = accurateSelectServerPercentage; } public String elect(ServerDiscoveryRequest request) { if (!accurate()) { final String currentServer = request.getCurrentServer(); // 如果是本机,就不需要查数据库那么复杂的操作了,直接返回成功 Optional localProtocolInfoOpt = Optional.ofNullable(transportService.allProtocols().get(request.getProtocol())); if (localProtocolInfoOpt.isPresent()) { if (localProtocolInfoOpt.get().getExternalAddress().equals(currentServer) || localProtocolInfoOpt.get().getAddress().equals(currentServer)) { log.info("[ServerElection] this server[{}] is worker[appId={}]'s current server, skip check", currentServer, request.getAppId()); return currentServer; } } } return getServer0(request); } private String getServer0(ServerDiscoveryRequest discoveryRequest) { final Long appId = discoveryRequest.getAppId(); final String protocol = discoveryRequest.getProtocol(); Set downServerCache = Sets.newHashSet(); for (int i = 0; i < RETRY_TIMES; i++) { // 无锁获取当前数据库中的Server Optional appInfoOpt = appInfoRepository.findById(appId); if (!appInfoOpt.isPresent()) { throw new PowerJobException(appId + " is not registered!"); } String appName = appInfoOpt.get().getAppName(); String originServer = appInfoOpt.get().getCurrentServer(); String activeAddress = activeAddress(originServer, downServerCache, protocol); if (StringUtils.isNotEmpty(activeAddress)) { return activeAddress; } // 无可用Server,重新进行Server选举,需要加锁 String lockName = String.format(SERVER_ELECT_LOCK, appId); boolean lockStatus = lockService.tryLock(lockName, 30000); if (!lockStatus) { try { Thread.sleep(500); }catch (Exception ignore) { } continue; } try { // 可能上一台机器已经完成了Server选举,需要再次判断 AppInfoDO appInfo = appInfoRepository.findById(appId).orElseThrow(() -> new RuntimeException("impossible, unless we just lost our database.")); String address = activeAddress(appInfo.getCurrentServer(), downServerCache, protocol); if (StringUtils.isNotEmpty(address)) { return address; } // 篡位,如果本机存在协议,则作为Server调度该 worker final ProtocolInfo targetProtocolInfo = transportService.allProtocols().get(protocol); if (targetProtocolInfo != null) { // 注意,写入 AppInfoDO#currentServer 的永远是 default 的绑定地址,仅在返回的时候特殊处理为协议地址 appInfo.setCurrentServer(transportService.defaultProtocol().getAddress()); appInfo.setGmtModified(new Date()); appInfoRepository.saveAndFlush(appInfo); log.info("[ServerElection] this server({}) become the new server for app(appId={}).", appInfo.getCurrentServer(), appId); return targetProtocolInfo.getExternalAddress(); } }catch (Exception e) { log.error("[ServerElection] write new server to db failed for app {}.", appName, e); } finally { lockService.unlock(lockName); } } throw new PowerJobException("server elect failed for app " + appId); } /** * 判断指定server是否存活 * @param serverAddress 需要检测的server地址(绑定的内网地址) * @param downServerCache 缓存,防止多次发送PING(这个QPS其实还蛮爆表的...) * @param protocol 协议,用于返回指定的地址 * @return null or address(外部地址) */ private String activeAddress(String serverAddress, Set downServerCache, String protocol) { if (downServerCache.contains(serverAddress)) { return null; } if (StringUtils.isEmpty(serverAddress)) { return null; } Ping ping = new Ping(); ping.setCurrentTime(System.currentTimeMillis()); URL targetUrl = ServerURLFactory.ping2Friend(serverAddress); try { AskResponse response = transportService.ask(transportService.defaultProtocol().getProtocol(), targetUrl, ping, AskResponse.class) .toCompletableFuture() .get(PING_TIMEOUT_MS, TimeUnit.MILLISECONDS); if (response.isSuccess()) { // 检测通过的是远程 server 的暴露地址,需要返回 worker 需要的协议地址 final JSONObject protocolInfo = JsonUtils.parseObject(response.getData(), JSONObject.class).getJSONObject(protocol); if (protocolInfo != null) { downServerCache.remove(serverAddress); ProtocolInfo remoteProtocol = protocolInfo.toJavaObject(ProtocolInfo.class); log.info("[ServerElection] server[{}] is active, it will be the master, final protocol={}", serverAddress, remoteProtocol); // 4.3.3 升级 4.3.4 过程中,未升级的 server 还不存在 externalAddress,需要使用 address 兼容 return Optional.ofNullable(remoteProtocol.getExternalAddress()).orElse(remoteProtocol.getAddress()); } else { log.warn("[ServerElection] server[{}] is active but don't have target protocol", serverAddress); } } } catch (TimeoutException te) { log.warn("[ServerElection] server[{}] was down due to ping timeout!", serverAddress); } catch (Exception e) { log.warn("[ServerElection] server[{}] was down with unknown case!", serverAddress, e); } downServerCache.add(serverAddress); return null; } private boolean accurate() { return ThreadLocalRandom.current().nextInt(100) < accurateSelectServerPercentage; } }