package tech.powerjob.server.core.container; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Multimap; import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; import org.apache.commons.io.filefilter.FileFilterUtils; import org.apache.commons.io.filefilter.IOFileFilter; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.maven.shared.invoker.DefaultInvocationRequest; import org.apache.maven.shared.invoker.DefaultInvoker; import org.apache.maven.shared.invoker.InvocationRequest; import org.apache.maven.shared.invoker.Invoker; import org.eclipse.jgit.api.CloneCommand; import org.eclipse.jgit.api.Git; import org.eclipse.jgit.lib.Ref; import org.eclipse.jgit.lib.Repository; import org.eclipse.jgit.transport.CredentialsProvider; import org.eclipse.jgit.transport.UsernamePasswordCredentialsProvider; import org.springframework.core.env.Environment; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import org.springframework.web.multipart.MultipartFile; import tech.powerjob.common.OmsConstant; import tech.powerjob.common.exception.ImpossibleException; import tech.powerjob.common.model.DeployedContainerInfo; import tech.powerjob.common.model.GitRepoInfo; import tech.powerjob.common.request.ServerDeployContainerRequest; import tech.powerjob.common.request.ServerDestroyContainerRequest; import tech.powerjob.common.serialize.JsonUtils; import tech.powerjob.common.utils.CommonUtils; import tech.powerjob.common.utils.NetUtils; import tech.powerjob.common.utils.SegmentLock; import tech.powerjob.remote.framework.base.URL; import tech.powerjob.server.common.constants.ContainerSourceType; import tech.powerjob.common.enums.SwitchableStatus; import tech.powerjob.server.common.module.WorkerInfo; import tech.powerjob.server.common.utils.OmsFileUtils; import tech.powerjob.server.extension.LockService; import tech.powerjob.server.extension.dfs.*; import tech.powerjob.server.persistence.remote.model.ContainerInfoDO; import tech.powerjob.server.persistence.remote.repository.ContainerInfoRepository; import tech.powerjob.server.persistence.storage.Constants; import tech.powerjob.server.remote.server.redirector.DesignateServer; import tech.powerjob.server.remote.transporter.impl.ServerURLFactory; import tech.powerjob.server.remote.transporter.TransportService; import tech.powerjob.server.remote.worker.WorkerClusterQueryService; import javax.annotation.Resource; import javax.websocket.RemoteEndpoint; import javax.websocket.Session; import java.io.File; import java.io.IOException; import java.util.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; /** * 容器服务 * * @author tjq * @since 2020/5/16 */ @Slf4j @Service public class ContainerService { @Resource private Environment environment; @Resource private LockService lockService; @Resource private ContainerInfoRepository containerInfoRepository; @Resource private DFsService dFsService; @Resource private TransportService transportService; @Resource private WorkerClusterQueryService workerClusterQueryService; // 下载用的分段锁 private final SegmentLock segmentLock = new SegmentLock(4); // 并发部署的机器数量 private static final int DEPLOY_BATCH_NUM = 50; // 部署间隔 private static final long DEPLOY_MIN_INTERVAL = 10 * 60 * 1000L; // 最长部署时间 private static final long DEPLOY_MAX_COST_TIME = 10 * 60 * 1000L; /** * 保存容器 * @param container 容器保存请求 */ public void save(ContainerInfoDO container) { Long originId = container.getId(); if (originId != null) { // just validate containerInfoRepository.findById(originId).orElseThrow(() -> new IllegalArgumentException("can't find container by id: " + originId)); } else { container.setGmtCreate(new Date()); } container.setGmtModified(new Date()); // 文件上传形式的 sourceInfo 为该文件的 md5 值,Git形式的 md5 在部署阶段生成 if (container.getSourceType() == ContainerSourceType.FatJar.getV()) { container.setVersion(container.getSourceInfo()); }else { container.setVersion("init"); } containerInfoRepository.saveAndFlush(container); } /** * 删除容器(通知 Worker 销毁容器 & 删除数据库) * @param appId 应用ID,用于保护性判断 * @param containerId 容器ID */ public void delete(Long appId, Long containerId) { ContainerInfoDO container = containerInfoRepository.findById(containerId).orElseThrow(() -> new IllegalArgumentException("can't find container by id: " + containerId)); if (!Objects.equals(appId, container.getAppId())) { throw new RuntimeException("Permission Denied!"); } ServerDestroyContainerRequest destroyRequest = new ServerDestroyContainerRequest(container.getId()); workerClusterQueryService.getAllAliveWorkers(container.getAppId()).forEach(workerInfo -> { final URL url = ServerURLFactory.destroyContainer2Worker(workerInfo.getAddress()); transportService.tell(workerInfo.getProtocol(), url, destroyRequest); }); log.info("[ContainerService] delete container: {}.", container); // 软删除 container.setStatus(SwitchableStatus.DELETED.getV()); container.setGmtModified(new Date()); containerInfoRepository.saveAndFlush(container); } /** * 上传用于部署的容器的 Jar 文件 * @param file 接受的文件 * @return 该文件的 md5 值 * @throws IOException 异常 */ public String uploadContainerJarFile(MultipartFile file) throws IOException { log.info("[ContainerService] start to uploadContainerJarFile, fileName={},size={}", file.getName(), file.getSize()); String workerDirStr = OmsFileUtils.genTemporaryWorkPath(); String tmpFileStr = workerDirStr + "tmp.jar"; File workerDir = new File(workerDirStr); File tmpFile = new File(tmpFileStr); try { // 下载到本地 FileUtils.forceMkdirParent(tmpFile); file.transferTo(tmpFile); // 生成MD5,这兄弟耗时有点小严重 String md5 = OmsFileUtils.md5(tmpFile); String fileName = genContainerJarName(md5); // 上传到 DFS,这兄弟耗时也有点小严重,导致这个接口整体比较慢...不过也没必要开线程去处理 FileLocation fl = new FileLocation().setBucket(Constants.CONTAINER_BUCKET).setName(fileName); StoreRequest storeRequest = new StoreRequest().setLocalFile(tmpFile).setFileLocation(fl); dFsService.store(storeRequest); // 将文件拷贝到正确的路径 String finalFileStr = OmsFileUtils.genContainerJarPath() + fileName; File finalFile = new File(finalFileStr); if (finalFile.exists()) { FileUtils.forceDelete(finalFile); } FileUtils.moveFile(tmpFile, finalFile); log.info("[ContainerService] uploadContainerJarFile successfully,md5={}", md5); return md5; } catch (Throwable t) { log.error("[ContainerService] uploadContainerJarFile failed!", t); ExceptionUtils.rethrow(t); throw new ImpossibleException(); } finally { CommonUtils.executeIgnoreException(() -> FileUtils.forceDelete(workerDir)); } } /** * 获取构建容器所需要的 Jar 文件 * @param version 版本 * @return 本地Jar文件 */ public File fetchContainerJarFile(String version) { String fileName = genContainerJarName(version); String filePath = OmsFileUtils.genContainerJarPath() + fileName; File localFile = new File(filePath); if (localFile.exists()) { return localFile; } FileLocation fileLocation = new FileLocation().setBucket(Constants.CONTAINER_BUCKET).setName(fileName); try { Optional fileMetaOpt = dFsService.fetchFileMeta(fileLocation); if (fileMetaOpt.isPresent()) { dFsService.download(new DownloadRequest().setFileLocation(fileLocation).setTarget(localFile)); } } catch (Exception e) { log.warn("[ContainerService] fetchContainerJarFile from dsf failed, version: {}", version, e); } return localFile; } /** * 部署容器 * @param containerId 容器ID * @param session WebSocket Session * @throws Exception 异常 */ public void deploy(Long containerId, Session session) throws Exception { String deployLock = "containerDeployLock-" + containerId; RemoteEndpoint.Async remote = session.getAsyncRemote(); // 最长部署时间:10分钟 boolean lock = lockService.tryLock(deployLock, DEPLOY_MAX_COST_TIME); if (!lock) { remote.sendText("SYSTEM: acquire deploy lock failed, maybe other user is deploying, please wait until the running deploy task finished."); return; } try { Optional containerInfoOpt = containerInfoRepository.findById(containerId); if (!containerInfoOpt.isPresent()) { remote.sendText("SYSTEM: can't find container by id: " + containerId); return; } ContainerInfoDO container = containerInfoOpt.get(); Date lastDeployTime = container.getLastDeployTime(); if (lastDeployTime != null) { if ((System.currentTimeMillis() - lastDeployTime.getTime()) < DEPLOY_MIN_INTERVAL) { remote.sendText("SYSTEM: [warn] deploy too frequent, last deploy time is: " + DateFormatUtils.format(lastDeployTime, OmsConstant.TIME_PATTERN)); } } // 准备文件 File jarFile = prepareJarFile(container, session); if (jarFile == null) { return; } double sizeMB = 1.0 * jarFile.length() / FileUtils.ONE_MB; remote.sendText(String.format("SYSTEM: the jarFile(size=%fMB) is prepared and ready to be deployed to the worker.", sizeMB)); // 修改数据库,更新 MD5和最新部署时间 Date now = new Date(); container.setGmtModified(now); container.setLastDeployTime(now); containerInfoRepository.saveAndFlush(container); remote.sendText(String.format("SYSTEM: update current container version=%s successfully!", container.getVersion())); // 开始部署(需要分批进行) final List allAliveWorkers = workerClusterQueryService.getAllAliveWorkers(container.getAppId()); if (allAliveWorkers.isEmpty()) { remote.sendText("SYSTEM: there is no worker available now, deploy failed!"); return; } String port = environment.getProperty("local.server.port"); String downloadURL = String.format("http://%s:%s/container/downloadJar?version=%s", NetUtils.getLocalHost(), port, container.getVersion()); ServerDeployContainerRequest req = new ServerDeployContainerRequest(containerId, container.getContainerName(), container.getVersion(), downloadURL); long sleepTime = calculateSleepTime(jarFile.length()); AtomicInteger count = new AtomicInteger(); allAliveWorkers.forEach(workerInfo -> { final URL url = ServerURLFactory.deployContainer2Worker(workerInfo.getAddress()); transportService.tell(workerInfo.getProtocol(), url, req); remote.sendText("SYSTEM: send deploy request to " + url.getAddress()); if (count.incrementAndGet() % DEPLOY_BATCH_NUM == 0) { CommonUtils.executeIgnoreException(() -> Thread.sleep(sleepTime)); } }); remote.sendText("SYSTEM: deploy finished, congratulations!"); }finally { lockService.unlock(deployLock); } } /** * 获取部署信息 * @param appId 容器所属应用ID * @param containerId 容器ID * @return 拼接好的可阅读字符串 */ @DesignateServer public String fetchDeployedInfo(Long appId, Long containerId) { List infoList = workerClusterQueryService.getDeployedContainerInfos(appId, containerId); Set aliveWorkers = workerClusterQueryService.getAllAliveWorkers(appId) .stream() .map(WorkerInfo::getAddress) .collect(Collectors.toSet()); Set deployedList = Sets.newLinkedHashSet(); Multimap version2DeployedContainerInfoList = ArrayListMultimap.create(); infoList.forEach(info -> { String targetWorkerAddress = info.getWorkerAddress(); if (aliveWorkers.contains(targetWorkerAddress)) { deployedList.add(targetWorkerAddress); version2DeployedContainerInfoList.put(info.getVersion(), info); } }); Set unDeployedList = Sets.newHashSet(aliveWorkers); unDeployedList.removeAll(deployedList); StringBuilder sb = new StringBuilder("========== DeployedInfo ==========").append(System.lineSeparator()); // 集群分裂,各worker版本不统一,问题很大 if (version2DeployedContainerInfoList.keySet().size() > 1) { sb.append("WARN: there exists multi version container now, please redeploy to fix this problem").append(System.lineSeparator()); } version2DeployedContainerInfoList.asMap().forEach((version, deployedContainerInfos) -> { sb.append("[version] ").append(version).append(System.lineSeparator()); deployedContainerInfos.forEach(deployedContainerInfo -> sb.append(String.format("Address: %s, DeployedTime: %s", deployedContainerInfo.getWorkerAddress(), CommonUtils.formatTime(deployedContainerInfo.getDeployedTime()))).append(System.lineSeparator())); }); // 当前在线未部署机器 if (!CollectionUtils.isEmpty(unDeployedList)) { sb.append("WARN: there exists unDeployed worker(PowerJob will auto fix when some job need to process)").append(System.lineSeparator()); sb.append("unDeployed worker list ==> ").append(unDeployedList).append(System.lineSeparator()); } if (CollectionUtils.isEmpty(deployedList)) { sb.append("no worker deployed this container now~"); } return sb.toString(); } private File prepareJarFile(ContainerInfoDO container, Session session) throws Exception { RemoteEndpoint.Async remote = session.getAsyncRemote(); // 获取Jar,Git需要先 clone成Jar计算MD5,JarFile则直接下载 ContainerSourceType sourceType = ContainerSourceType.of(container.getSourceType()); if (sourceType == ContainerSourceType.Git) { String workerDirStr = OmsFileUtils.genTemporaryWorkPath(); File workerDir = new File(workerDirStr); FileUtils.forceMkdir(workerDir); try { // git clone remote.sendText("SYSTEM: start to git clone the code repo, using config: " + container.getSourceInfo()); GitRepoInfo gitRepoInfo = JsonUtils.parseObject(container.getSourceInfo(), GitRepoInfo.class); CloneCommand cloneCommand = Git.cloneRepository() .setDirectory(workerDir) .setURI(gitRepoInfo.getRepo()) .setBranch(gitRepoInfo.getBranch()); if (!StringUtils.isEmpty(gitRepoInfo.getUsername())) { CredentialsProvider credentialsProvider = new UsernamePasswordCredentialsProvider(gitRepoInfo.getUsername(), gitRepoInfo.getPassword()); cloneCommand.setCredentialsProvider(credentialsProvider); } cloneCommand.call(); // 获取最新的 commitId 作为版本 String oldVersion = container.getVersion(); try (Repository repository = Git.open(workerDir).getRepository()) { Ref head = repository.getRefDatabase().findRef("HEAD"); container.setVersion(head.getObjectId().getName()); } if (container.getVersion().equals(oldVersion)) { remote.sendText(String.format("SYSTEM: this commitId(%s) is the same as the last.", oldVersion)); }else { remote.sendText(String.format("SYSTEM: new version detected, from %s to %s.", oldVersion, container.getVersion())); } remote.sendText("SYSTEM: git clone successfully, star to compile the project."); // mvn clean package -DskipTests -U Invoker mvnInvoker = new DefaultInvoker(); InvocationRequest ivkReq = new DefaultInvocationRequest(); // -U:强制让Maven检查所有SNAPSHOT依赖更新,确保集成基于最新的状态 // -e:如果构建出现异常,该参数能让Maven打印完整的stack trace // -B:让Maven使用批处理模式构建项目,能够避免一些需要人工参与交互而造成的挂起状态 ivkReq.setGoals(Lists.newArrayList("clean", "package", "-DskipTests", "-U", "-e", "-B")); ivkReq.setBaseDirectory(workerDir); ivkReq.setOutputHandler(remote::sendText); ivkReq.setBatchMode(true); mvnInvoker.execute(ivkReq); String targetDirStr = workerDirStr + "/target"; File targetDir = new File(targetDirStr); IOFileFilter fileFilter = FileFilterUtils.asFileFilter((dir, name) -> name.endsWith("jar-with-dependencies.jar")); Collection jarFile = FileUtils.listFiles(targetDir, fileFilter, null); if (CollectionUtils.isEmpty(jarFile)) { remote.sendText("SYSTEM: can't find packaged jar(maybe maven build failed), so deploy failed."); return null; } File jarWithDependency = jarFile.iterator().next(); String jarFileName = genContainerJarName(container.getVersion()); FileLocation dfsFL = new FileLocation().setBucket(Constants.CONTAINER_BUCKET).setName(jarFileName); Optional dfsMetaOpt = dFsService.fetchFileMeta(dfsFL); if (dfsMetaOpt.isPresent()) { remote.sendText("SYSTEM: find the jar resource in remote successfully, so it's no need to upload anymore."); } else { remote.sendText("SYSTEM: can't find the jar resource in remote, maybe this is a new version, start to upload new version."); dFsService.store(new StoreRequest().setFileLocation(dfsFL).setLocalFile(jarWithDependency)); remote.sendText("SYSTEM: upload to GridFS successfully~"); } // 将文件从临时工作目录移动到正式目录 String localFileStr = OmsFileUtils.genContainerJarPath() + jarFileName; File localFile = new File(localFileStr); if (localFile.exists()) { FileUtils.forceDelete(localFile); } FileUtils.copyFile(jarWithDependency, localFile); return localFile; } catch (Throwable t) { log.error("[ContainerService] prepareJarFile failed for container: {}", container, t); remote.sendText("SYSTEM: [ERROR] prepare jar file failed: " + ExceptionUtils.getStackTrace(t)); } finally { // 删除工作区数据 FileUtils.forceDelete(workerDir); } } // 先查询本地是否存在目标 Jar 文件 String jarFileName = genContainerJarName(container.getVersion()); String localFileStr = OmsFileUtils.genContainerJarPath() + jarFileName; File localFile = new File(localFileStr); if (localFile.exists()) { remote.sendText("SYSTEM: find the jar file in local disk."); return localFile; } // 从 MongoDB 下载 remote.sendText(String.format("SYSTEM: try to find the jarFile(%s) in GridFS", jarFileName)); downloadJarFromGridFS(jarFileName, localFile); remote.sendText("SYSTEM: download jar file from GridFS successfully~"); return localFile; } private void downloadJarFromGridFS(String mongoFileName, File targetFile) { int lockId = mongoFileName.hashCode(); try { segmentLock.lockInterruptibleSafe(lockId); if (targetFile.exists()) { return; } try { FileLocation dfsFL = new FileLocation().setBucket(Constants.CONTAINER_BUCKET).setName(mongoFileName); Optional dfsMetaOpt = dFsService.fetchFileMeta(dfsFL); if (!dfsMetaOpt.isPresent()) { log.warn("[ContainerService] can't find container's jar file({}) in gridFS.", mongoFileName); return; } FileUtils.forceMkdirParent(targetFile); dFsService.download(new DownloadRequest().setTarget(targetFile).setFileLocation(dfsFL)); }catch (Exception e) { CommonUtils.executeIgnoreException(() -> FileUtils.forceDelete(targetFile)); ExceptionUtils.rethrow(e); } }finally { segmentLock.unlock(lockId); } } private static String genContainerJarName(String version) { return String.format("oms-container-%s.jar", version); } /** * 计算 sleep 时间(每10M睡眠1S + 1) * @param fileLength 文件的字节数 * @return sleep 时间 */ private long calculateSleepTime(long fileLength) { return (fileLength / FileUtils.ONE_MB / 10 + 1) * 1000; } }