WangHan
2024-09-12 d5855a4926926698b740bc6c7ba489de47adb68b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
package tech.powerjob.server.core.instance;
 
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.time.FastDateFormat;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.CollectionUtils;
import tech.powerjob.common.OmsConstant;
import tech.powerjob.common.enums.LogLevel;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.model.InstanceLogContent;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.common.utils.SegmentLock;
import tech.powerjob.server.common.constants.PJThreadPool;
import tech.powerjob.server.common.utils.OmsFileUtils;
import tech.powerjob.server.extension.dfs.*;
import tech.powerjob.server.persistence.StringPage;
import tech.powerjob.server.persistence.local.LocalInstanceLogDO;
import tech.powerjob.server.persistence.local.LocalInstanceLogRepository;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.persistence.storage.Constants;
import tech.powerjob.server.remote.server.redirector.DesignateServer;
 
import javax.annotation.Resource;
import java.io.*;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
 
/**
 * 任务实例运行时日志服务
 *
 * @author tjq
 * @since 2020/4/27
 */
@Slf4j
@Service
public class InstanceLogService {
 
    @Value("${server.port}")
    private int port;
 
    @Resource
    private InstanceMetadataService instanceMetadataService;
 
    @Resource
    private DFsService dFsService;
    /**
     * 本地数据库操作bean
     */
    @Resource(name = "localTransactionTemplate")
    private TransactionTemplate localTransactionTemplate;
 
    @Resource
    private LocalInstanceLogRepository localInstanceLogRepository;
 
    /**
     * 本地维护了在线日志的任务实例ID
     */
    private final Map<Long, Long> instanceId2LastReportTime = Maps.newConcurrentMap();
 
    @Resource(name = PJThreadPool.BACKGROUND_POOL)
    private AsyncTaskExecutor powerJobBackgroundPool;
 
    /**
     *  分段锁
     */
    private final SegmentLock segmentLock = new SegmentLock(8);
 
    /**
     * 格式化时间戳
     */
    private static final FastDateFormat DATE_FORMAT = FastDateFormat.getInstance(OmsConstant.TIME_PATTERN_PLUS);
    /**
     * 每一个展示的行数
     */
    private static final int MAX_LINE_COUNT = 100;
    /**
     * 更新中的日志缓存时间
     */
    private static final long LOG_CACHE_TIME = 10000;
 
    /**
     * 提交日志记录,持久化到本地数据库中
     * @param workerAddress 上报机器地址
     * @param logs 任务实例运行时日志
     */
    @Async(value = PJThreadPool.LOCAL_DB_POOL)
    public void submitLogs(String workerAddress, List<InstanceLogContent> logs) {
 
        List<LocalInstanceLogDO> logList = logs.stream().map(x -> {
            instanceId2LastReportTime.put(x.getInstanceId(), System.currentTimeMillis());
 
            LocalInstanceLogDO y = new LocalInstanceLogDO();
            BeanUtils.copyProperties(x, y);
            y.setWorkerAddress(workerAddress);
            return y;
        }).collect(Collectors.toList());
 
        try {
            CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.saveAll(logList));
        }catch (Exception e) {
            log.warn("[InstanceLogService] persistent instance logs failed, these logs will be dropped: {}.", logs, e);
        }
    }
 
    /**
     * 获取任务实例运行日志(默认存在本地数据,需要由生成完成请求的路由与转发)
     * @param appId appId,AOP 专用
     * @param instanceId 任务实例ID
     * @param index 页码,从0开始
     * @return 文本字符串
     */
    @DesignateServer
    public StringPage fetchInstanceLog(Long appId, Long instanceId, Long index) {
        try {
            Future<File> fileFuture = prepareLogFile(instanceId);
            // 超时并不会打断正在执行的任务
            File logFile = fileFuture.get(5, TimeUnit.SECONDS);
 
            // 分页展示数据
            long lines = 0;
            StringBuilder sb = new StringBuilder();
            String lineStr;
            long left = index * MAX_LINE_COUNT;
            long right = left + MAX_LINE_COUNT;
            try (LineNumberReader lr = new LineNumberReader(new FileReader(logFile))) {
                while ((lineStr = lr.readLine()) != null) {
 
                    // 指定范围内,读出
                    if (lines >= left && lines < right) {
                        sb.append(lineStr).append(System.lineSeparator());
                    }
                    ++lines;
                }
            }catch (Exception e) {
                log.warn("[InstanceLog-{}] read logFile from disk failed for app: {}.", instanceId, appId, e);
                return StringPage.simple("oms-server execution exception, caused by " + ExceptionUtils.getRootCauseMessage(e));
            }
 
            double totalPage = Math.ceil(1.0 * lines / MAX_LINE_COUNT);
            return new StringPage(index, (long) totalPage, sb.toString());
 
        }catch (TimeoutException te) {
            return StringPage.simple("log file is being prepared, please try again later.");
        }catch (Exception e) {
            log.warn("[InstanceLog-{}] fetch instance log failed.", instanceId, e);
            return StringPage.simple("oms-server execution exception, caused by " + ExceptionUtils.getRootCauseMessage(e));
        }
    }
 
    /**
     * 获取日志的下载链接
     * @param appId AOP 专用
     * @param instanceId 任务实例 ID
     * @return 下载链接
     */
    @DesignateServer
    public String fetchDownloadUrl(Long appId, Long instanceId) {
        String url = "http://" + NetUtils.getLocalHost() + ":" + port + "/instance/downloadLog?instanceId=" + instanceId;
        log.info("[InstanceLog-{}] downloadURL for appId[{}]: {}", instanceId, appId, url);
        return url;
    }
 
    /**
     * 下载全部的任务日志文件
     * @param instanceId 任务实例ID
     * @return 日志文件
     * @throws Exception 异常
     */
    public File downloadInstanceLog(long instanceId) throws Exception {
        Future<File> fileFuture = prepareLogFile(instanceId);
        return fileFuture.get(1, TimeUnit.MINUTES);
    }
 
    /**
     * 异步准备日志文件
     * @param instanceId 任务实例ID
     * @return 异步结果
     */
    private Future<File> prepareLogFile(long instanceId) {
        return powerJobBackgroundPool.submit(() -> {
            // 在线日志还在不断更新,需要使用本地数据库中的数据
            if (instanceId2LastReportTime.containsKey(instanceId)) {
                return genTemporaryLogFile(instanceId);
            }
            return genStableLogFile(instanceId);
        });
    }
 
    /**
     * 将本地的任务实例运行日志同步到 mongoDB 存储,在任务执行结束后异步执行
     * @param instanceId 任务实例ID
     */
    @Async(PJThreadPool.BACKGROUND_POOL)
    public void sync(Long instanceId) {
 
        Stopwatch sw = Stopwatch.createStarted();
        try {
            // 先持久化到本地文件
            File stableLogFile = genStableLogFile(instanceId);
            // 将文件推送到 MongoDB
 
            FileLocation dfsFL = new FileLocation().setBucket(Constants.LOG_BUCKET).setName(genMongoFileName(instanceId));
 
            try {
                dFsService.store(new StoreRequest().setLocalFile(stableLogFile).setFileLocation(dfsFL));
                log.info("[InstanceLog-{}] push local instanceLogs to mongoDB succeed, using: {}.", instanceId, sw.stop());
            }catch (Exception e) {
                log.warn("[InstanceLog-{}] push local instanceLogs to mongoDB failed.", instanceId, e);
            }
 
        }catch (Exception e) {
            log.warn("[InstanceLog-{}] sync local instanceLogs failed.", instanceId, e);
        }
        // 删除本地数据库数据
        try {
            instanceId2LastReportTime.remove(instanceId);
            CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.deleteByInstanceId(instanceId));
            log.info("[InstanceLog-{}] delete local instanceLog successfully.", instanceId);
        }catch (Exception e) {
            log.warn("[InstanceLog-{}] delete local instanceLog failed.", instanceId, e);
        }
    }
 
    private File genTemporaryLogFile(long instanceId) {
        String path = genLogFilePath(instanceId, false);
        int lockId = ("tpFileLock-" + instanceId).hashCode();
        try {
            segmentLock.lockInterruptibleSafe(lockId);
 
            // Stream 需要在事务的包裹之下使用
            return localTransactionTemplate.execute(status -> {
                File f = new File(path);
                // 如果文件存在且有效,则不再重新构建日志文件(这个判断也需要放在锁内,否则构建到一半的文件会被返回)
                if (f.exists() && (System.currentTimeMillis() - f.lastModified()) < LOG_CACHE_TIME) {
                    return f;
                }
                try {
                    // 创建父文件夹(文件在开流时自动会被创建)
                    FileUtils.forceMkdirParent(f);
 
                    // 重新构建文件
                    try (Stream<LocalInstanceLogDO> allLogStream = localInstanceLogRepository.findByInstanceIdOrderByLogTime(instanceId)) {
                        stream2File(allLogStream, f);
                    }
                    return f;
                }catch (Exception e) {
                    CommonUtils.executeIgnoreException(() -> FileUtils.forceDelete(f));
                    throw new RuntimeException(e);
                }
            });
        }finally {
            segmentLock.unlock(lockId);
        }
    }
 
    private File genStableLogFile(long instanceId) {
        String path = genLogFilePath(instanceId, true);
        int lockId = ("stFileLock-" + instanceId).hashCode();
        try {
            segmentLock.lockInterruptibleSafe(lockId);
 
            return localTransactionTemplate.execute(status -> {
 
                File f = new File(path);
                if (f.exists()) {
                    return f;
                }
 
                try {
                    // 创建父文件夹(文件在开流时自动会被创建)
                    FileUtils.forceMkdirParent(f);
 
                    // 本地存在数据,从本地持久化(对应 SYNC 的情况)
                    if (instanceId2LastReportTime.containsKey(instanceId)) {
                        try (Stream<LocalInstanceLogDO> allLogStream = localInstanceLogRepository.findByInstanceIdOrderByLogTime(instanceId)) {
                            stream2File(allLogStream, f);
                        }
                    }else {
 
                        FileLocation dfl = new FileLocation().setBucket(Constants.LOG_BUCKET).setName(genMongoFileName(instanceId));
                        Optional<FileMeta> dflMetaOpt = dFsService.fetchFileMeta(dfl);
                        if (!dflMetaOpt.isPresent()) {
                            OmsFileUtils.string2File("SYSTEM: There is no online log for this job instance.", f);
                            return f;
                        }
 
                        dFsService.download(new DownloadRequest().setTarget(f).setFileLocation(dfl));
                    }
                    return f;
                }catch (Exception e) {
                    CommonUtils.executeIgnoreException(() -> FileUtils.forceDelete(f));
                    throw new RuntimeException(e);
                }
            });
        }finally {
            segmentLock.unlock(lockId);
        }
    }
 
    /**
     * 将数据库中存储的日志流转化为磁盘日志文件
     * @param stream 流
     * @param logFile 目标日志文件
     */
    private void stream2File(Stream<LocalInstanceLogDO> stream, File logFile) {
        try (FileWriter fw = new FileWriter(logFile); BufferedWriter bfw = new BufferedWriter(fw)) {
            stream.forEach(instanceLog -> {
                try {
                    bfw.write(convertLog(instanceLog) + System.lineSeparator());
                }catch (Exception ignore) {
                }
            });
        }catch (IOException ie) {
            ExceptionUtils.rethrow(ie);
        }
    }
 
 
 
    /**
     * 拼接日志 -> 2020-04-29 22:07:10.059 [192.168.1.1:2777] INFO XXX
     * @param instanceLog 日志对象
     * @return 字符串
     */
    private static String convertLog(LocalInstanceLogDO instanceLog) {
        return String.format("%s [%s] %s %s",
                DATE_FORMAT.format(instanceLog.getLogTime()),
                instanceLog.getWorkerAddress(),
                LogLevel.genLogLevelString(instanceLog.getLogLevel()),
                instanceLog.getLogContent());
    }
 
 
    @Async(PJThreadPool.TIMING_POOL)
    @Scheduled(fixedDelay = 120000)
    public void timingCheck() {
 
        // 定时删除秒级任务的日志
        List<Long> frequentInstanceIds = Lists.newLinkedList();
        instanceId2LastReportTime.keySet().forEach(instanceId -> {
            try {
                JobInfoDO jobInfo = instanceMetadataService.fetchJobInfoByInstanceId(instanceId);
                if (TimeExpressionType.FREQUENT_TYPES.contains(jobInfo.getTimeExpressionType())) {
                    frequentInstanceIds.add(instanceId);
                }
            }catch (Exception ignore) {
            }
        });
 
        if (!CollectionUtils.isEmpty(frequentInstanceIds)) {
            // 只保留最近10分钟的日志
            long time = System.currentTimeMillis() - 10 * 60 * 1000;
            Lists.partition(frequentInstanceIds, 100).forEach(p -> {
                try {
                    localInstanceLogRepository.deleteByInstanceIdInAndLogTimeLessThan(p, time);
                }catch (Exception e) {
                    log.warn("[InstanceLogService] delete expired logs for instance: {} failed.", p, e);
                }
            });
        }
 
        // 删除长时间未 REPORT 的日志(必要性考证中......)
    }
 
 
    private static String genLogFilePath(long instanceId, boolean stable) {
        if (stable) {
            return OmsFileUtils.genLogDirPath() + String.format("%d-stable.log", instanceId);
        }else {
            return OmsFileUtils.genLogDirPath() + String.format("%d-temporary.log", instanceId);
        }
    }
    private static String genMongoFileName(long instanceId) {
        return String.format("oms-%d.log", instanceId);
    }
 
}