package tech.powerjob.server.remote.worker; import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import tech.powerjob.common.model.DeployedContainerInfo; import tech.powerjob.server.common.module.WorkerInfo; import tech.powerjob.server.persistence.remote.model.JobInfoDO; import tech.powerjob.server.remote.server.redirector.DesignateServer; import tech.powerjob.server.remote.worker.filter.WorkerFilter; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; /** * 获取 worker 集群信息 * * @author tjq * @since 2021/2/19 */ @Slf4j @Service public class WorkerClusterQueryService { private final List workerFilters; public WorkerClusterQueryService(List workerFilters) { this.workerFilters = workerFilters; } /** * get worker for job * * @param jobInfo job * @return worker cluster info, sorted by metrics desc */ public List geAvailableWorkers(JobInfoDO jobInfo) { List workers = Lists.newLinkedList(getWorkerInfosByAppId(jobInfo.getAppId()).values()); // 过滤不符合要求的机器 workers.removeIf(workerInfo -> filterWorker(workerInfo, jobInfo)); // 限定集群大小(0代表不限制) if (!workers.isEmpty() && jobInfo.getMaxWorkerCount() > 0 && workers.size() > jobInfo.getMaxWorkerCount()) { workers = workers.subList(0, jobInfo.getMaxWorkerCount()); } return workers; } @DesignateServer public List getAllWorkers(Long appId) { List workers = Lists.newLinkedList(getWorkerInfosByAppId(appId).values()); workers.sort((o1, o2) -> o2.getSystemMetrics().calculateScore() - o1.getSystemMetrics().calculateScore()); return workers; } /** * get all alive workers * * @param appId appId * @return alive workers */ @DesignateServer public List getAllAliveWorkers(Long appId) { List workers = Lists.newLinkedList(getWorkerInfosByAppId(appId).values()); workers.removeIf(WorkerInfo::timeout); return workers; } /** * Gets worker info by address. * * @param appId the app id * @param address the address * @return the worker info by address */ public Optional getWorkerInfoByAddress(Long appId, String address) { // this may cause NPE while address value is null . final Map workerInfosByAppId = getWorkerInfosByAppId(appId); //add null check for both workerInfos Map and address if (null != workerInfosByAppId && null != address) { return Optional.ofNullable(workerInfosByAppId.get(address)); } return Optional.empty(); } public Map getAppId2ClusterStatus() { return WorkerClusterManagerService.getAppId2ClusterStatus(); } /** * 获取某个应用容器的部署情况 * * @param appId 应用ID * @param containerId 容器ID * @return 部署情况 */ public List getDeployedContainerInfos(Long appId, Long containerId) { ClusterStatusHolder clusterStatusHolder = getAppId2ClusterStatus().get(appId); if (clusterStatusHolder == null) { return Collections.emptyList(); } return clusterStatusHolder.getDeployedContainerInfos(containerId); } private Map getWorkerInfosByAppId(Long appId) { ClusterStatusHolder clusterStatusHolder = getAppId2ClusterStatus().get(appId); if (clusterStatusHolder == null) { log.warn("[WorkerManagerService] can't find any worker for app(appId={}) yet.", appId); return Collections.emptyMap(); } return clusterStatusHolder.getAllWorkers(); } /** * filter invalid worker for job * * @param workerInfo worker info * @param jobInfo job info * @return filter this worker when return true */ private boolean filterWorker(WorkerInfo workerInfo, JobInfoDO jobInfo) { for (WorkerFilter filter : workerFilters) { if (filter.filter(workerInfo, jobInfo)) { return true; } } return false; } }