WangHan
2024-09-12 d5855a4926926698b740bc6c7ba489de47adb68b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
package tech.powerjob.official.processors.impl.script;
 
import tech.powerjob.worker.common.utils.PowerFileUtils;
import tech.powerjob.worker.core.processor.ProcessResult;
import tech.powerjob.worker.core.processor.TaskContext;
import tech.powerjob.worker.log.OmsLogger;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
import tech.powerjob.official.processors.CommonBasicProcessor;
import tech.powerjob.official.processors.util.CommonUtils;
 
import java.io.*;
import java.net.URL;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.Set;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
 
/**
 * 脚本处理器
 *
 * @author tjq
 * @author Jiang Jining
 * @since 2020/4/16
 */
@Slf4j
public abstract class AbstractScriptProcessor extends CommonBasicProcessor {
 
    private static final ForkJoinPool POOL = new ForkJoinPool(4 * Runtime.getRuntime().availableProcessors());
    private static final Set<String> DOWNLOAD_PROTOCOL = Sets.newHashSet("http", "https", "ftp");
    protected static final String SH_SHELL = "/bin/sh";
    protected static final String CMD_SHELL = "cmd.exe";
 
    private static final String WORKER_DIR = PowerFileUtils.workspace() + "/official_script_processor/";
 
    @Override
    protected ProcessResult process0(TaskContext context) throws Exception {
        OmsLogger omsLogger = context.getOmsLogger();
        String scriptParams = CommonUtils.parseParams(context);
        omsLogger.info("[SYSTEM] ScriptProcessor start to process, params: {}", scriptParams);
        if (scriptParams == null) {
            String message = "[SYSTEM] ScriptParams is null, please check jobParam configuration.";
            omsLogger.warn(message);
            return new ProcessResult(false, message);
        }
        String scriptPath = prepareScriptFile(context.getInstanceId(), scriptParams);
        omsLogger.info("[SYSTEM] Generate executable file successfully, path: {}", scriptPath);
        
        if (SystemUtils.IS_OS_WINDOWS) {
            if (StringUtils.equals(getRunCommand(), SH_SHELL)) {
                String message = String.format("[SYSTEM] Current OS is %s where shell scripts cannot run.", SystemUtils.OS_NAME);
                omsLogger.warn(message);
                return new ProcessResult(false, message);
            }
        }
 
        // 授权
        if  ( !SystemUtils.IS_OS_WINDOWS) {
            ProcessBuilder chmodPb = new ProcessBuilder("/bin/chmod", "755", scriptPath);
            // 等待返回,这里不可能导致死锁(shell产生大量数据可能导致死锁)
            chmodPb.start().waitFor();
            omsLogger.info("[SYSTEM] chmod 755 authorization complete, ready to start execution~");
        }
        // 2. 执行目标脚本
        ProcessBuilder pb = StringUtils.equals(getRunCommand(), CMD_SHELL) ?
                new ProcessBuilder(getRunCommand(), "/c", scriptPath)
                : new ProcessBuilder(getRunCommand(), scriptPath);
        Process process = pb.start();
 
        StringBuilder inputBuilder = new StringBuilder();
        StringBuilder errorBuilder = new StringBuilder();
 
        boolean success = true;
        String result;
 
        final Charset charset = getCharset();
        try {
            InputStream is = process.getInputStream();
            InputStream es = process.getErrorStream();
 
            ForkJoinTask<?> inputSubmit = POOL.submit(() -> copyStream(is, inputBuilder, omsLogger, charset));
            ForkJoinTask<?> errorSubmit = POOL.submit(() -> copyStream(es, errorBuilder, omsLogger, charset));
 
            success = process.waitFor() == 0;
 
            // 阻塞等待日志读取
            inputSubmit.get();
            errorSubmit.get();
 
        } catch (InterruptedException ie) {
            omsLogger.info("[SYSTEM] ScriptProcessor has been interrupted");
        } finally {
            result = String.format("[INPUT]: %s;[ERROR]: %s", inputBuilder, errorBuilder);
        }
        return new ProcessResult(success, result);
    }
 
    private String prepareScriptFile(Long instanceId, String processorInfo) throws IOException {
        String scriptPath = WORKER_DIR + getScriptName(instanceId);
        File script = new File(scriptPath);
        if (script.exists()) {
            return scriptPath;
        }
        File dir = new File(script.getParent());
        boolean success = dir.mkdirs();
        success = script.createNewFile();
        if (!success) {
            throw new RuntimeException("create script file failed");
        }
 
        // 如果是下载链接,则从网络获取
        for (String protocol : DOWNLOAD_PROTOCOL) {
            if (processorInfo.startsWith(protocol)) {
                FileUtils.copyURLToFile(new URL(processorInfo), script, 5000, 300000);
                return scriptPath;
            }
        }
 
        final Charset charset = getCharset();
 
        if(charset != null)
        {
            try (Writer fstream = new OutputStreamWriter(Files.newOutputStream(script.toPath()), charset); BufferedWriter out = new BufferedWriter(fstream)) {
                out.write(processorInfo);
                out.flush();
            }
        }
        else {
            try (FileWriter fw = new FileWriter(script); BufferedWriter bw = new BufferedWriter(fw)) {
                bw.write(processorInfo);
                bw.flush();
            }
        }
        return scriptPath;
    }
 
    private void copyStream(InputStream is, StringBuilder sb, OmsLogger omsLogger, Charset charset) {
        String line;
        try (BufferedReader br = new BufferedReader(new InputStreamReader(is, charset))) {
            while ((line = br.readLine()) != null) {
                sb.append(line).append(System.lineSeparator());
                // 同步到在线日志
                omsLogger.info(line);
            }
        } catch (Exception e) {
            log.warn("[ScriptProcessor] copyStream failed.", e);
            omsLogger.warn("[SYSTEM] copyStream failed.", e);
 
            sb.append("Exception: ").append(e);
        } finally {
            try {
                is.close();
            } catch (IOException e) {
                log.warn("[ScriptProcessor] close stream failed.", e);
                omsLogger.warn("[SYSTEM] close stream failed.", e);
            }
        }
    }
 
    /**
     * 生成脚本名称
     * @param instanceId id of instance
     * @return 文件名称
     */
    protected abstract String getScriptName(Long instanceId);
 
    /**
     * 获取运行命令(eg,shell返回 /bin/sh)
     * @return 执行脚本的命令
     */
    protected abstract String getRunCommand();
 
    /**
     * 默认不指定
     * @return Charset
     */
    protected Charset getCharset() {
        return StandardCharsets.UTF_8;
    }
}