package tech.powerjob.worker.background;
|
|
import lombok.RequiredArgsConstructor;
|
import lombok.extern.slf4j.Slf4j;
|
import org.apache.commons.lang3.StringUtils;
|
import tech.powerjob.common.enhance.SafeRunnable;
|
import tech.powerjob.common.model.SystemMetrics;
|
import tech.powerjob.common.request.WorkerHeartbeat;
|
import tech.powerjob.worker.common.PowerJobWorkerVersion;
|
import tech.powerjob.worker.common.WorkerRuntime;
|
import tech.powerjob.worker.common.utils.SystemInfoUtils;
|
import tech.powerjob.worker.common.utils.TransportUtils;
|
import tech.powerjob.worker.container.OmsContainerFactory;
|
import tech.powerjob.worker.core.tracker.manager.HeavyTaskTrackerManager;
|
import tech.powerjob.worker.core.tracker.manager.LightTaskTrackerManager;
|
|
|
/**
|
* Worker健康度定时上报
|
*
|
* @author tjq
|
* @since 2020/3/25
|
*/
|
@Slf4j
|
@RequiredArgsConstructor
|
public class WorkerHealthReporter extends SafeRunnable {
|
|
private final WorkerRuntime workerRuntime;
|
|
@Override
|
public void run0() {
|
|
// 没有可用Server,无法上报
|
String currentServer = workerRuntime.getServerDiscoveryService().getCurrentServerAddress();
|
if (StringUtils.isEmpty(currentServer)) {
|
log.warn("[WorkerHealthReporter] no available server,fail to report health info!");
|
return;
|
}
|
|
SystemMetrics systemMetrics;
|
|
if (workerRuntime.getWorkerConfig().getSystemMetricsCollector() == null) {
|
systemMetrics = SystemInfoUtils.getSystemMetrics();
|
} else {
|
systemMetrics = workerRuntime.getWorkerConfig().getSystemMetricsCollector().collect();
|
}
|
|
WorkerHeartbeat heartbeat = new WorkerHeartbeat();
|
|
heartbeat.setSystemMetrics(systemMetrics);
|
heartbeat.setWorkerAddress(workerRuntime.getWorkerAddress());
|
heartbeat.setAppName(workerRuntime.getWorkerConfig().getAppName());
|
heartbeat.setAppId(workerRuntime.getAppId());
|
heartbeat.setHeartbeatTime(System.currentTimeMillis());
|
heartbeat.setVersion(PowerJobWorkerVersion.getVersion());
|
heartbeat.setProtocol(workerRuntime.getWorkerConfig().getProtocol().name());
|
heartbeat.setClient("KingPenguin");
|
heartbeat.setTag(workerRuntime.getWorkerConfig().getTag());
|
|
// 上报 Tracker 数量
|
heartbeat.setLightTaskTrackerNum(LightTaskTrackerManager.currentTaskTrackerSize());
|
heartbeat.setHeavyTaskTrackerNum(HeavyTaskTrackerManager.currentTaskTrackerSize());
|
// 是否超载
|
if (workerRuntime.getWorkerConfig().getMaxLightweightTaskNum() <= LightTaskTrackerManager.currentTaskTrackerSize() || workerRuntime.getWorkerConfig().getMaxHeavyweightTaskNum() <= HeavyTaskTrackerManager.currentTaskTrackerSize()){
|
heartbeat.setOverload(true);
|
}
|
// 获取当前加载的容器列表
|
heartbeat.setContainerInfos(OmsContainerFactory.getDeployedContainerInfos());
|
// 发送请求
|
if (StringUtils.isEmpty(currentServer)) {
|
return;
|
}
|
// log
|
log.info("[WorkerHealthReporter] report health status,appId:{},appName:{},isOverload:{},maxLightweightTaskNum:{},currentLightweightTaskNum:{},maxHeavyweightTaskNum:{},currentHeavyweightTaskNum:{}" ,
|
heartbeat.getAppId(),
|
heartbeat.getAppName(),
|
heartbeat.isOverload(),
|
workerRuntime.getWorkerConfig().getMaxLightweightTaskNum(),
|
heartbeat.getLightTaskTrackerNum(),
|
workerRuntime.getWorkerConfig().getMaxHeavyweightTaskNum(),
|
heartbeat.getHeavyTaskTrackerNum()
|
);
|
|
TransportUtils.reportWorkerHeartbeat(heartbeat, currentServer, workerRuntime.getTransporter());
|
}
|
}
|