/* * Copyright 1999-2018 Alibaba Group Holding Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.alibaba.csp.sentinel.dashboard.service; import java.util.HashSet; import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import com.alibaba.csp.sentinel.cluster.ClusterStateManager; import com.alibaba.csp.sentinel.dashboard.domain.cluster.state.ClusterUniversalStatePairVO; import com.alibaba.csp.sentinel.util.AssertUtil; import com.alibaba.csp.sentinel.util.function.Tuple2; import com.alibaba.csp.sentinel.dashboard.client.SentinelApiClient; import com.alibaba.csp.sentinel.dashboard.domain.cluster.ClusterAppAssignResultVO; import com.alibaba.csp.sentinel.dashboard.domain.cluster.ClusterGroupEntity; import com.alibaba.csp.sentinel.dashboard.domain.cluster.config.ClusterClientConfig; import com.alibaba.csp.sentinel.dashboard.domain.cluster.config.ServerFlowConfig; import com.alibaba.csp.sentinel.dashboard.domain.cluster.config.ServerTransportConfig; import com.alibaba.csp.sentinel.dashboard.domain.cluster.request.ClusterAppAssignMap; import com.alibaba.csp.sentinel.dashboard.util.MachineUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** * @author Eric Zhao * @since 1.4.1 */ @Service public class ClusterAssignServiceImpl implements ClusterAssignService { private final Logger LOGGER = LoggerFactory.getLogger(ClusterAssignServiceImpl.class); @Autowired private SentinelApiClient sentinelApiClient; @Autowired private ClusterConfigService clusterConfigService; private boolean isMachineInApp(/*@NonEmpty*/ String machineId) { return machineId.contains(":"); } private ClusterAppAssignResultVO handleUnbindClusterServerNotInApp(String app, String machineId) { Set failedSet = new HashSet<>(); try { List list = clusterConfigService.getClusterUniversalState(app) .get(10, TimeUnit.SECONDS); Set toModifySet = list.stream() .filter(e -> e.getState().getStateInfo().getMode() == ClusterStateManager.CLUSTER_CLIENT) .filter(e -> machineId.equals(e.getState().getClient().getClientConfig().getServerHost() + ':' + e.getState().getClient().getClientConfig().getServerPort())) .map(e -> e.getIp() + '@' + e.getCommandPort()) .collect(Collectors.toSet()); // Modify mode to NOT-STARTED for all associated token clients. modifyToNonStarted(toModifySet, failedSet); } catch (Exception ex) { Throwable e = ex instanceof ExecutionException ? ex.getCause() : ex; LOGGER.error("Failed to unbind machine <{}>", machineId, e); failedSet.add(machineId); } return new ClusterAppAssignResultVO() .setFailedClientSet(failedSet) .setFailedServerSet(new HashSet<>()); } private void modifyToNonStarted(Set toModifySet, Set failedSet) { toModifySet.parallelStream() .map(MachineUtils::parseCommandIpAndPort) .filter(Optional::isPresent) .map(Optional::get) .map(e -> { CompletableFuture f = modifyMode(e.r1, e.r2, ClusterStateManager.CLUSTER_NOT_STARTED); return Tuple2.of(e.r1 + '@' + e.r2, f); }) .forEach(f -> handleFutureSync(f, failedSet)); } @Override public ClusterAppAssignResultVO unbindClusterServer(String app, String machineId) { AssertUtil.assertNotBlank(app, "app cannot be blank"); AssertUtil.assertNotBlank(machineId, "machineId cannot be blank"); if (isMachineInApp(machineId)) { return handleUnbindClusterServerNotInApp(app, machineId); } Set failedSet = new HashSet<>(); try { ClusterGroupEntity entity = clusterConfigService.getClusterUniversalStateForAppMachine(app, machineId) .get(10, TimeUnit.SECONDS); Set toModifySet = new HashSet<>(); toModifySet.add(machineId); if (entity.getClientSet() != null) { toModifySet.addAll(entity.getClientSet()); } // Modify mode to NOT-STARTED for all chosen token servers and associated token clients. modifyToNonStarted(toModifySet, failedSet); } catch (Exception ex) { Throwable e = ex instanceof ExecutionException ? ex.getCause() : ex; LOGGER.error("Failed to unbind machine <{}>", machineId, e); failedSet.add(machineId); } return new ClusterAppAssignResultVO() .setFailedClientSet(failedSet) .setFailedServerSet(new HashSet<>()); } @Override public ClusterAppAssignResultVO unbindClusterServers(String app, Set machineIdSet) { AssertUtil.assertNotBlank(app, "app cannot be blank"); AssertUtil.isTrue(machineIdSet != null && !machineIdSet.isEmpty(), "machineIdSet cannot be empty"); ClusterAppAssignResultVO result = new ClusterAppAssignResultVO() .setFailedClientSet(new HashSet<>()) .setFailedServerSet(new HashSet<>()); for (String machineId : machineIdSet) { ClusterAppAssignResultVO resultVO = unbindClusterServer(app, machineId); result.getFailedClientSet().addAll(resultVO.getFailedClientSet()); result.getFailedServerSet().addAll(resultVO.getFailedServerSet()); } return result; } @Override public ClusterAppAssignResultVO applyAssignToApp(String app, List clusterMap, Set remainingSet) { AssertUtil.assertNotBlank(app, "app cannot be blank"); AssertUtil.notNull(clusterMap, "clusterMap cannot be null"); Set failedServerSet = new HashSet<>(); Set failedClientSet = new HashSet<>(); // Assign server and apply config. clusterMap.stream() .filter(Objects::nonNull) .filter(ClusterAppAssignMap::getBelongToApp) .map(e -> { String ip = e.getIp(); int commandPort = parsePort(e); CompletableFuture f = modifyMode(ip, commandPort, ClusterStateManager.CLUSTER_SERVER) .thenCompose(v -> applyServerConfigChange(app, ip, commandPort, e)); return Tuple2.of(e.getMachineId(), f); }) .forEach(t -> handleFutureSync(t, failedServerSet)); // Assign client of servers and apply config. clusterMap.parallelStream() .filter(Objects::nonNull) .forEach(e -> applyAllClientConfigChange(app, e, failedClientSet)); // Unbind remaining (unassigned) machines. applyAllRemainingMachineSet(app, remainingSet, failedClientSet); return new ClusterAppAssignResultVO() .setFailedClientSet(failedClientSet) .setFailedServerSet(failedServerSet); } private void applyAllRemainingMachineSet(String app, Set remainingSet, Set failedSet) { if (remainingSet == null || remainingSet.isEmpty()) { return; } remainingSet.parallelStream() .filter(Objects::nonNull) .map(MachineUtils::parseCommandIpAndPort) .filter(Optional::isPresent) .map(Optional::get) .map(ipPort -> { String ip = ipPort.r1; int commandPort = ipPort.r2; CompletableFuture f = modifyMode(ip, commandPort, ClusterStateManager.CLUSTER_NOT_STARTED); return Tuple2.of(ip + '@' + commandPort, f); }) .forEach(t -> handleFutureSync(t, failedSet)); } private void applyAllClientConfigChange(String app, ClusterAppAssignMap assignMap, Set failedSet) { Set clientSet = assignMap.getClientSet(); if (clientSet == null || clientSet.isEmpty()) { return; } final String serverIp = assignMap.getIp(); final int serverPort = assignMap.getPort(); clientSet.stream() .map(MachineUtils::parseCommandIpAndPort) .filter(Optional::isPresent) .map(Optional::get) .map(ipPort -> { CompletableFuture f = sentinelApiClient .modifyClusterMode(ipPort.r1, ipPort.r2, ClusterStateManager.CLUSTER_CLIENT) .thenCompose(v -> sentinelApiClient.modifyClusterClientConfig(app, ipPort.r1, ipPort.r2, new ClusterClientConfig().setRequestTimeout(20) .setServerHost(serverIp) .setServerPort(serverPort) )); return Tuple2.of(ipPort.r1 + '@' + ipPort.r2, f); }) .forEach(t -> handleFutureSync(t, failedSet)); } private void handleFutureSync(Tuple2> t, Set failedSet) { try { t.r2.get(10, TimeUnit.SECONDS); } catch (Exception ex) { if (ex instanceof ExecutionException) { LOGGER.error("Request for <{}> failed", t.r1, ex.getCause()); } else { LOGGER.error("Request for <{}> failed", t.r1, ex); } failedSet.add(t.r1); } } private CompletableFuture applyServerConfigChange(String app, String ip, int commandPort, ClusterAppAssignMap assignMap) { ServerTransportConfig transportConfig = new ServerTransportConfig() .setPort(assignMap.getPort()) .setIdleSeconds(600); return sentinelApiClient.modifyClusterServerTransportConfig(app, ip, commandPort, transportConfig) .thenCompose(v -> applyServerFlowConfigChange(app, ip, commandPort, assignMap)) .thenCompose(v -> applyServerNamespaceSetConfig(app, ip, commandPort, assignMap)); } private CompletableFuture applyServerFlowConfigChange(String app, String ip, int commandPort, ClusterAppAssignMap assignMap) { Double maxAllowedQps = assignMap.getMaxAllowedQps(); if (maxAllowedQps == null || maxAllowedQps <= 0 || maxAllowedQps > 20_0000) { return CompletableFuture.completedFuture(null); } return sentinelApiClient.modifyClusterServerFlowConfig(app, ip, commandPort, new ServerFlowConfig().setMaxAllowedQps(maxAllowedQps)); } private CompletableFuture applyServerNamespaceSetConfig(String app, String ip, int commandPort, ClusterAppAssignMap assignMap) { Set namespaceSet = assignMap.getNamespaceSet(); if (namespaceSet == null || namespaceSet.isEmpty()) { return CompletableFuture.completedFuture(null); } return sentinelApiClient.modifyClusterServerNamespaceSet(app, ip, commandPort, namespaceSet); } private CompletableFuture modifyMode(String ip, int port, int mode) { return sentinelApiClient.modifyClusterMode(ip, port, mode); } private int parsePort(ClusterAppAssignMap assignMap) { return MachineUtils.parseCommandPort(assignMap.getMachineId()) .orElse(ServerTransportConfig.DEFAULT_PORT); } }