WangHan
2024-09-12 d5855a4926926698b740bc6c7ba489de47adb68b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package tech.powerjob.worker.container;
 
import tech.powerjob.common.model.DeployedContainerInfo;
import tech.powerjob.common.request.ServerDeployContainerRequest;
import tech.powerjob.common.request.WorkerNeedDeployContainerRequest;
import tech.powerjob.common.response.AskResponse;
import tech.powerjob.common.utils.CommonUtils;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.common.utils.PowerFileUtils;
import tech.powerjob.worker.common.utils.TransportUtils;
 
import java.io.File;
import java.net.URL;
import java.util.List;
import java.util.Map;
 
/**
 * 容器工厂
 *
 * @author tjq
 * @since 2020/5/16
 */
@Slf4j
public class OmsContainerFactory {
 
    private static final String CONTAINER_DIR = PowerFileUtils.workspace() + "/container/";
    private static final Map<Long, OmsContainer> CARGO = Maps.newConcurrentMap();
 
    /**
     * 获取容器
     * @param containerId 容器ID
     * @param workerRuntime 当容器不存在且 serverActor 非空时,尝试从服务端重新拉取容器
     * @return 容器示例,可能为 null
     */
    public static OmsContainer fetchContainer(Long containerId, WorkerRuntime workerRuntime) {
 
        OmsContainer omsContainer = CARGO.get(containerId);
        if (omsContainer != null) {
            return omsContainer;
        }
 
        final String currentServerAddress = workerRuntime.getServerDiscoveryService().getCurrentServerAddress();
 
        // 尝试从 server 加载
        log.info("[OmsContainer-{}] can't find the container in factory, try to deploy from server.", containerId);
        WorkerNeedDeployContainerRequest request = new WorkerNeedDeployContainerRequest(containerId);
 
        try {
 
            AskResponse askResponse = TransportUtils.reliableQueryContainerInfo(request, currentServerAddress, workerRuntime.getTransporter());
 
            if (askResponse.isSuccess()) {
                ServerDeployContainerRequest deployRequest = askResponse.getData(ServerDeployContainerRequest.class);
                log.info("[OmsContainer-{}] fetch containerInfo from server successfully.", containerId);
                deployContainer(deployRequest);
            }else {
                log.warn("[OmsContainer-{}] fetch containerInfo failed, reason is {}.", containerId, askResponse.getMessage());
            }
        }catch (Exception e) {
            log.error("[OmsContainer-{}] get container failed, exception is {}", containerId, e.toString());
        }
 
        return CARGO.get(containerId);
    }
 
 
    /**
     * 部署容器,整个过程串行进行,问题不大
     * @param request 部署容器请求
     */
    public static synchronized void deployContainer(ServerDeployContainerRequest request) {
 
        Long containerId = request.getContainerId();
        String containerName = request.getContainerName();
        String version = request.getVersion();
 
        log.info("[OmsContainer-{}] start to deploy container(name={},version={},downloadUrl={})", containerId, containerName, version, request.getDownloadURL());
 
        OmsContainer oldContainer = CARGO.get(containerId);
        if (oldContainer != null && version.equals(oldContainer.getVersion())) {
            log.info("[OmsContainer-{}] version={} already deployed, so skip this deploy task.", containerId, version);
            return;
        }
 
        String filePath = CONTAINER_DIR + containerId + "/" + version + ".jar";
        // 下载Container到本地
        File jarFile = new File(filePath);
 
        try {
            if (!jarFile.exists()) {
                log.info("[OmsContainer-{}] container not exist(path={}), try to download from server!", containerId, jarFile.getPath());
                FileUtils.forceMkdirParent(jarFile);
                FileUtils.copyURLToFile(new URL(request.getDownloadURL()), jarFile, 5000, 300000);
                log.info("[OmsContainer-{}] download jar successfully, path={}", containerId, jarFile.getPath());
            }
 
            // 创建新容器
            OmsContainer newContainer = new OmsJarContainer(containerId, containerName, version, jarFile);
            newContainer.init();
 
            // 替换容器
            CARGO.put(containerId, newContainer);
            log.info("[OmsContainer-{}] deployed new version:{} successfully!", containerId, version);
 
            if (oldContainer != null) {
                // 销毁旧容器
                log.info("[OmsContainer-{}] start to destroy old container(version={})", containerId, oldContainer.getVersion());
                oldContainer.destroy();
            }
 
        } catch (Exception e) {
            log.error("[OmsContainer-{}] deployContainer(name={},version={}) failed.", containerId, containerName, version, e);
            // 如果部署失败,则删除该 jar(本次失败可能是下载jar出错导致,不删除会导致这个版本永久无法重新部署)
            CommonUtils.executeIgnoreException(() -> FileUtils.forceDelete(jarFile));
        }
    }
 
    /**
     * 获取该Worker已部署容器的信息
     * @return 已部署容器信息
     */
    public static List<DeployedContainerInfo> getDeployedContainerInfos() {
        List<DeployedContainerInfo> info = Lists.newLinkedList();
        CARGO.forEach((name, container) -> info.add(new DeployedContainerInfo(container.getContainerId(), container.getVersion(), container.getDeployedTime(), null)));
        return info;
    }
 
    /**
     * 销毁指定容器
     * @param containerId 容器ID
     */
    public static void destroyContainer(Long containerId) {
        OmsContainer container = CARGO.remove(containerId);
        if (container == null) {
            log.info("[OmsContainer-{}] container not exists, so there is no need to destroy the container.", containerId);
            return;
        }
        try {
            container.destroy();
        }catch (Exception e) {
            log.warn("[OmsContainer-{}] destroy container failed.", containerId, e);
        }
    }
}