WangHan
2024-09-12 d5855a4926926698b740bc6c7ba489de47adb68b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package tech.powerjob.worker.persistence.fs.impl;
 
import com.google.common.collect.Lists;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.common.utils.CollectionUtils;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.worker.persistence.TaskDO;
import tech.powerjob.worker.persistence.fs.ExternalTaskPersistenceService;
import tech.powerjob.worker.persistence.fs.FsService;
 
import java.io.IOException;
import java.util.Collections;
import java.util.List;
 
/**
 * 外部文件存储服务
 *
 * @author tjq
 * @since 2024/2/22
 */
@Slf4j
public class ExternalTaskFileSystemPersistenceService implements ExternalTaskPersistenceService {
    private final Long instanceId;
 
    private final FsService pendingFsService;
 
    private final FsService resultFsService;
 
    private static final String PENDING_FILE_NAME = "%d-pending";
    private static final String RESULT_FILE_NAME = "%d-result";
 
    public ExternalTaskFileSystemPersistenceService(Long instanceId, boolean needResult) {
        this.instanceId = instanceId;
 
        this.pendingFsService = new LocalDiskFsService(String.format(PENDING_FILE_NAME, instanceId));
        if (needResult) {
            this.resultFsService = new LocalDiskFsService(String.format(RESULT_FILE_NAME, instanceId));
        } else {
            this.resultFsService = new FsService() {
                @Override
                public void writeLine(String content) throws IOException {
                }
 
                @Override
                public String readLine() throws IOException {
                    return null;
                }
                @Override
                public void close() {
                }
            };
        }
    }
 
    @Override
    public boolean persistPendingTask(List<TaskDO> tasks) {
        if (CollectionUtils.isEmpty(tasks)) {
            return true;
        }
        try {
            String content = JsonUtils.toJSONString(tasks);
            pendingFsService.writeLine(content);
            return true;
        } catch (Exception e) {
            log.error("[ExternalTaskPersistenceService] [{}] persistPendingTask failed: {}", instanceId, tasks);
        }
        return false;
    }
 
    @Override
    @SneakyThrows
    public List<TaskDO> readPendingTask() {
        String pendingTaskStr = pendingFsService.readLine();
        return str2TaskDoList(pendingTaskStr);
    }
 
    @Override
    public boolean persistFinishedTask(List<TaskDO> tasks) {
 
        if (CollectionUtils.isEmpty(tasks)) {
            return true;
        }
 
        // 移除无用的参数列
        tasks.forEach(t -> t.setTaskContent(null));
 
        try {
            String content = JsonUtils.toJSONString(tasks);
            resultFsService.writeLine(content);
            return true;
        } catch (Exception e) {
            log.error("[ExternalTaskPersistenceService] [{}] persistPendingTask failed: {}", instanceId, tasks);
        }
        return false;
    }
 
    @Override
    @SneakyThrows
    public List<TaskDO> readFinishedTask() {
        String finishedStr = resultFsService.readLine();
        return str2TaskDoList(finishedStr);
    }
 
 
    private static List<TaskDO> str2TaskDoList(String finishedStr) throws Exception {
        if (StringUtils.isEmpty(finishedStr)) {
            return Collections.emptyList();
        }
        TaskDO[] taskDOS = JsonUtils.parseObject(finishedStr, TaskDO[].class);
        if (taskDOS != null) {
            return Lists.newArrayList(taskDOS);
        }
        return Collections.emptyList();
    }
 
    @Override
    public void close()  {
        CommonUtils.executeIgnoreException(() -> {
            if (pendingFsService != null) {
                pendingFsService.close();
            }
        });
 
        CommonUtils.executeIgnoreException(() -> {
            if (resultFsService != null) {
                resultFsService.close();
            }
        });
    }
}