WangHan
2024-09-12 d5855a4926926698b740bc6c7ba489de47adb68b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package tech.powerjob.worker.background;
 
import tech.powerjob.common.enhance.SafeRunnable;
import tech.powerjob.common.enums.LogLevel;
import tech.powerjob.common.model.InstanceLogContent;
import tech.powerjob.common.request.WorkerLogReportReq;
import tech.powerjob.remote.framework.transporter.Transporter;
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import tech.powerjob.worker.background.discovery.ServerDiscoveryService;
import tech.powerjob.worker.common.utils.TransportUtils;
 
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
 
/**
 * 日志处理器
 *
 * @author tjq
 * @since 2020/4/21
 */
@Slf4j
public class OmsLogHandler {
 
    private final String workerAddress;
    private final Transporter transporter;
    private final ServerDiscoveryService serverDiscoveryService;
 
    // 处理线程,需要通过线程池启动
    public final Runnable logSubmitter = new LogSubmitter();
    // 上报锁,只需要一个线程上报即可
    private final Lock reportLock = new ReentrantLock();
    // 生产者消费者模式,异步上传日志
    private final BlockingQueue<InstanceLogContent> logQueue = Queues.newLinkedBlockingQueue(10240);
 
    // 每次上报携带的数据条数
    private static final int BATCH_SIZE = 20;
    // 本地囤积阈值
    private static final int REPORT_SIZE = 1024;
 
    public OmsLogHandler(String workerAddress, Transporter transporter, ServerDiscoveryService serverDiscoveryService) {
        this.workerAddress = workerAddress;
        this.transporter = transporter;
        this.serverDiscoveryService = serverDiscoveryService;
    }
 
    /**
     * 提交日志
     * @param instanceId 任务实例ID
     * @param logContent 日志内容
     */
    public void submitLog(long instanceId, LogLevel logLevel, String logContent) {
 
        if (logQueue.size() > REPORT_SIZE) {
            // 线程的生命周期是个不可循环的过程,一个线程对象结束了不能再次start,只能一直创建和销毁
            new Thread(logSubmitter).start();
        }
 
        InstanceLogContent tuple = new InstanceLogContent(instanceId, System.currentTimeMillis(), logLevel.getV(), logContent);
        boolean offerRet = logQueue.offer(tuple);
        if (!offerRet) {
            log.warn("[OmsLogHandler] [{}] submit log failed, maybe your log speed is too fast!", instanceId);
        }
    }
 
 
 
    private class LogSubmitter extends SafeRunnable {
 
        @Override
        public void run0() {
 
            boolean lockResult = reportLock.tryLock();
            if (!lockResult) {
                return;
            }
 
            try {
 
                final String currentServerAddress = serverDiscoveryService.getCurrentServerAddress();
                // 当前无可用 Server
                if (StringUtils.isEmpty(currentServerAddress)) {
                    if (!logQueue.isEmpty()) {
                        logQueue.clear();
                        log.warn("[OmsLogHandler] because there is no available server to report logs which leads to queue accumulation, oms discarded all logs.");
                    }
                    return;
                }
 
                List<InstanceLogContent> logs = Lists.newLinkedList();
 
                while (!logQueue.isEmpty()) {
                    try {
                        InstanceLogContent logContent = logQueue.poll(100, TimeUnit.MILLISECONDS);
                        logs.add(logContent);
 
                        if (logs.size() >= BATCH_SIZE) {
                            WorkerLogReportReq req = new WorkerLogReportReq(workerAddress, Lists.newLinkedList(logs));
                            // 不可靠请求,WEB日志不追求极致
                            TransportUtils.reportLogs(req, currentServerAddress, transporter);
                            logs.clear();
                        }
 
                    }catch (Exception ignore) {
                        break;
                    }
                }
 
                if (!logs.isEmpty()) {
                    WorkerLogReportReq req = new WorkerLogReportReq(workerAddress, logs);
                    TransportUtils.reportLogs(req, currentServerAddress, transporter);
                }
 
            }finally {
                reportLock.unlock();
            }
        }
    }
}