/* * Copyright 1999-2018 Alibaba Group Holding Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.alibaba.csp.sentinel.dashboard.client; import com.alibaba.csp.sentinel.adapter.gateway.common.rule.GatewayFlowRule; import com.alibaba.csp.sentinel.command.CommandConstants; import com.alibaba.csp.sentinel.command.vo.NodeVo; import com.alibaba.csp.sentinel.config.SentinelConfig; import com.alibaba.csp.sentinel.dashboard.datasource.entity.SentinelVersion; import com.alibaba.csp.sentinel.dashboard.datasource.entity.gateway.ApiDefinitionEntity; import com.alibaba.csp.sentinel.dashboard.datasource.entity.gateway.GatewayFlowRuleEntity; import com.alibaba.csp.sentinel.dashboard.datasource.entity.rule.*; import com.alibaba.csp.sentinel.dashboard.discovery.AppManagement; import com.alibaba.csp.sentinel.dashboard.domain.cluster.ClusterClientInfoVO; import com.alibaba.csp.sentinel.dashboard.domain.cluster.config.ClusterClientConfig; import com.alibaba.csp.sentinel.dashboard.domain.cluster.config.ServerFlowConfig; import com.alibaba.csp.sentinel.dashboard.domain.cluster.config.ServerTransportConfig; import com.alibaba.csp.sentinel.dashboard.domain.cluster.state.ClusterServerStateVO; import com.alibaba.csp.sentinel.dashboard.domain.cluster.state.ClusterStateSimpleEntity; import com.alibaba.csp.sentinel.dashboard.util.AsyncUtils; import com.alibaba.csp.sentinel.dashboard.util.VersionUtils; import com.alibaba.csp.sentinel.slots.block.Rule; import com.alibaba.csp.sentinel.slots.block.authority.AuthorityRule; import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule; import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowRule; import com.alibaba.csp.sentinel.slots.system.SystemRule; import com.alibaba.csp.sentinel.util.AssertUtil; import com.alibaba.csp.sentinel.util.StringUtil; import com.alibaba.fastjson.JSON; import org.apache.http.Consts; import org.apache.http.HttpResponse; import org.apache.http.NameValuePair; import org.apache.http.client.entity.UrlEncodedFormEntity; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpUriRequest; import org.apache.http.client.utils.URLEncodedUtils; import org.apache.http.concurrent.FutureCallback; import org.apache.http.conn.util.InetAddressUtils; import org.apache.http.entity.ContentType; import org.apache.http.impl.client.DefaultRedirectStrategy; import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; import org.apache.http.impl.nio.client.HttpAsyncClients; import org.apache.http.impl.nio.reactor.IOReactorConfig; import org.apache.http.message.BasicNameValuePair; import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.lang.Nullable; import org.springframework.stereotype.Component; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; import java.nio.charset.Charset; import java.util.*; import java.util.Map.Entry; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; /** * Communicate with Sentinel client. * * @author leyou */ @Component public class SentinelApiClient { private static Logger logger = LoggerFactory.getLogger(SentinelApiClient.class); private static final Charset DEFAULT_CHARSET = Charset.forName(SentinelConfig.charset()); private static final String HTTP_HEADER_CONTENT_TYPE = "Content-Type"; private static final String HTTP_HEADER_CONTENT_TYPE_URLENCODED = ContentType.create(URLEncodedUtils.CONTENT_TYPE).toString(); private static final String RESOURCE_URL_PATH = "jsonTree"; private static final String CLUSTER_NODE_PATH = "clusterNode"; private static final String GET_RULES_PATH = "getRules"; private static final String SET_RULES_PATH = "setRules"; private static final String GET_PARAM_RULE_PATH = "getParamFlowRules"; private static final String SET_PARAM_RULE_PATH = "setParamFlowRules"; private static final String FETCH_CLUSTER_MODE_PATH = "getClusterMode"; private static final String MODIFY_CLUSTER_MODE_PATH = "setClusterMode"; private static final String FETCH_CLUSTER_CLIENT_CONFIG_PATH = "cluster/client/fetchConfig"; private static final String MODIFY_CLUSTER_CLIENT_CONFIG_PATH = "cluster/client/modifyConfig"; private static final String FETCH_CLUSTER_SERVER_BASIC_INFO_PATH = "cluster/server/info"; private static final String MODIFY_CLUSTER_SERVER_TRANSPORT_CONFIG_PATH = "cluster/server/modifyTransportConfig"; private static final String MODIFY_CLUSTER_SERVER_FLOW_CONFIG_PATH = "cluster/server/modifyFlowConfig"; private static final String MODIFY_CLUSTER_SERVER_NAMESPACE_SET_PATH = "cluster/server/modifyNamespaceSet"; private static final String FETCH_GATEWAY_API_PATH = "gateway/getApiDefinitions"; private static final String MODIFY_GATEWAY_API_PATH = "gateway/updateApiDefinitions"; private static final String FETCH_GATEWAY_FLOW_RULE_PATH = "gateway/getRules"; private static final String MODIFY_GATEWAY_FLOW_RULE_PATH = "gateway/updateRules"; private static final String FLOW_RULE_TYPE = "flow"; private static final String DEGRADE_RULE_TYPE = "degrade"; private static final String SYSTEM_RULE_TYPE = "system"; private static final String AUTHORITY_TYPE = "authority"; private CloseableHttpAsyncClient httpClient; private static final SentinelVersion version160 = new SentinelVersion(1, 6, 0); private static final SentinelVersion version171 = new SentinelVersion(1, 7, 1); @Autowired private AppManagement appManagement; public SentinelApiClient() { IOReactorConfig ioConfig = IOReactorConfig.custom().setConnectTimeout(3000).setSoTimeout(10000) .setIoThreadCount(Runtime.getRuntime().availableProcessors() * 2).build(); httpClient = HttpAsyncClients.custom().setRedirectStrategy(new DefaultRedirectStrategy() { @Override protected boolean isRedirectable(final String method) { return false; } }).setMaxConnTotal(4000).setMaxConnPerRoute(1000).setDefaultIOReactorConfig(ioConfig).build(); httpClient.start(); } private boolean isSuccess(int statusCode) { return statusCode >= 200 && statusCode < 300; } private boolean isCommandNotFound(int statusCode, String body) { return statusCode == 400 && StringUtil.isNotEmpty(body) && body.contains(CommandConstants.MSG_UNKNOWN_COMMAND_PREFIX); } protected boolean isSupportPost(String app, String ip, int port) { return StringUtil.isNotEmpty(app) && Optional.ofNullable(appManagement.getDetailApp(app)) .flatMap(e -> e.getMachine(ip, port)) .flatMap(m -> VersionUtils.parseVersion(m.getVersion()) .map(v -> v.greaterOrEqual(version160))) .orElse(false); } /** * Check whether target instance (identified by tuple of app-ip:port) * supports the form of "xxxxx; xx=xx" in "Content-Type" header. * * @param app target app name * @param ip target node's address * @param port target node's port */ protected boolean isSupportEnhancedContentType(String app, String ip, int port) { return StringUtil.isNotEmpty(app) && Optional.ofNullable(appManagement.getDetailApp(app)) .flatMap(e -> e.getMachine(ip, port)) .flatMap(m -> VersionUtils.parseVersion(m.getVersion()) .map(v -> v.greaterOrEqual(version171))) .orElse(false); } private StringBuilder queryString(Map params) { StringBuilder queryStringBuilder = new StringBuilder(); for (Entry entry : params.entrySet()) { if (StringUtil.isEmpty(entry.getValue())) { continue; } String name = urlEncode(entry.getKey()); String value = urlEncode(entry.getValue()); if (name != null && value != null) { if (queryStringBuilder.length() > 0) { queryStringBuilder.append('&'); } queryStringBuilder.append(name).append('=').append(value); } } return queryStringBuilder; } /** * Build an `HttpUriRequest` in POST way. * * @param url * @param params * @param supportEnhancedContentType see {@link #isSupportEnhancedContentType(String, String, int)} * @return */ protected static HttpUriRequest postRequest(String url, Map params, boolean supportEnhancedContentType) { HttpPost httpPost = new HttpPost(url); if (params != null && params.size() > 0) { List list = new ArrayList<>(params.size()); for (Entry entry : params.entrySet()) { list.add(new BasicNameValuePair(entry.getKey(), entry.getValue())); } httpPost.setEntity(new UrlEncodedFormEntity(list, Consts.UTF_8)); if (!supportEnhancedContentType) { httpPost.setHeader(HTTP_HEADER_CONTENT_TYPE, HTTP_HEADER_CONTENT_TYPE_URLENCODED); } } return httpPost; } private String urlEncode(String str) { try { return URLEncoder.encode(str, DEFAULT_CHARSET.name()); } catch (UnsupportedEncodingException e) { logger.info("encode string error: {}", str, e); return null; } } private String getBody(HttpResponse response) throws Exception { Charset charset = null; try { String contentTypeStr = response.getFirstHeader(HTTP_HEADER_CONTENT_TYPE).getValue(); if (StringUtil.isNotEmpty(contentTypeStr)) { ContentType contentType = ContentType.parse(contentTypeStr); charset = contentType.getCharset(); } } catch (Exception ignore) { } return EntityUtils.toString(response.getEntity(), charset != null ? charset : DEFAULT_CHARSET); } /** * With no param * * @param ip * @param port * @param api * @return */ private CompletableFuture executeCommand(String ip, int port, String api, boolean useHttpPost) { return executeCommand(ip, port, api, null, useHttpPost); } /** * No app specified, force to GET * * @param ip * @param port * @param api * @param params * @return */ private CompletableFuture executeCommand(String ip, int port, String api, Map params, boolean useHttpPost) { return executeCommand(null, ip, port, api, params, useHttpPost); } /** * Prefer to execute request using POST * * @param app * @param ip * @param port * @param api * @param params * @return */ private CompletableFuture executeCommand(String app, String ip, int port, String api, Map params, boolean useHttpPost) { CompletableFuture future = new CompletableFuture<>(); if (StringUtil.isBlank(ip) || StringUtil.isBlank(api)) { future.completeExceptionally(new IllegalArgumentException("Bad URL or command name")); return future; } if (!InetAddressUtils.isIPv4Address(ip) && !InetAddressUtils.isIPv6Address(ip)) { future.completeExceptionally(new IllegalArgumentException("Bad IP")); return future; } if (!StringUtil.isEmpty(app) && !appManagement.isValidMachineOfApp(app, ip)) { future.completeExceptionally(new IllegalArgumentException("Given ip does not belong to given app")); return future; } StringBuilder urlBuilder = new StringBuilder(); urlBuilder.append("http://"); urlBuilder.append(ip).append(':').append(port).append('/').append(api); if (params == null) { params = Collections.emptyMap(); } if (!useHttpPost || !isSupportPost(app, ip, port)) { // Using GET in older versions, append parameters after url if (!params.isEmpty()) { if (urlBuilder.indexOf("?") == -1) { urlBuilder.append('?'); } else { urlBuilder.append('&'); } urlBuilder.append(queryString(params)); } return executeCommand(new HttpGet(urlBuilder.toString())); } else { // Using POST return executeCommand( postRequest(urlBuilder.toString(), params, isSupportEnhancedContentType(app, ip, port))); } } private CompletableFuture executeCommand(HttpUriRequest request) { CompletableFuture future = new CompletableFuture<>(); httpClient.execute(request, new FutureCallback() { @Override public void completed(final HttpResponse response) { int statusCode = response.getStatusLine().getStatusCode(); try { String value = getBody(response); if (isSuccess(statusCode)) { future.complete(value); } else { if (isCommandNotFound(statusCode, value)) { future.completeExceptionally(new CommandNotFoundException(request.getURI().getPath())); } else { future.completeExceptionally(new CommandFailedException(value)); } } } catch (Exception ex) { future.completeExceptionally(ex); logger.error("HTTP request failed: {}", request.getURI().toString(), ex); } } @Override public void failed(final Exception ex) { future.completeExceptionally(ex); logger.error("HTTP request failed: {}", request.getURI().toString(), ex); } @Override public void cancelled() { future.complete(null); } }); return future; } public void close() throws Exception { httpClient.close(); } @Nullable private CompletableFuture> fetchItemsAsync(String ip, int port, String api, String type, Class ruleType) { AssertUtil.notEmpty(ip, "Bad machine IP"); AssertUtil.isTrue(port > 0, "Bad machine port"); Map params = null; if (StringUtil.isNotEmpty(type)) { params = new HashMap<>(1); params.put("type", type); } return executeCommand(ip, port, api, params, false) .thenApply(json -> JSON.parseArray(json, ruleType)); } @Nullable private List fetchItems(String ip, int port, String api, String type, Class ruleType) { try { AssertUtil.notEmpty(ip, "Bad machine IP"); AssertUtil.isTrue(port > 0, "Bad machine port"); Map params = null; if (StringUtil.isNotEmpty(type)) { params = new HashMap<>(1); params.put("type", type); } return fetchItemsAsync(ip, port, api, type, ruleType).get(); } catch (InterruptedException | ExecutionException e) { logger.error("Error when fetching items from api: {} -> {}", api, type, e); return null; } catch (Exception e) { logger.error("Error when fetching items: {} -> {}", api, type, e); return null; } } private List fetchRules(String ip, int port, String type, Class ruleType) { return fetchItems(ip, port, GET_RULES_PATH, type, ruleType); } private boolean setRules(String app, String ip, int port, String type, List entities) { if (entities == null) { return true; } try { AssertUtil.notEmpty(app, "Bad app name"); AssertUtil.notEmpty(ip, "Bad machine IP"); AssertUtil.isTrue(port > 0, "Bad machine port"); String data = JSON.toJSONString( entities.stream().map(r -> r.toRule()).collect(Collectors.toList())); Map params = new HashMap<>(2); params.put("type", type); params.put("data", data); String result = executeCommand(app, ip, port, SET_RULES_PATH, params, true).get(); logger.info("setRules result: {}, type={}", result, type); return true; } catch (InterruptedException e) { logger.warn("setRules API failed: {}", type, e); return false; } catch (ExecutionException e) { logger.warn("setRules API failed: {}", type, e.getCause()); return false; } catch (Exception e) { logger.error("setRules API failed, type={}", type, e); return false; } } private CompletableFuture setRulesAsync(String app, String ip, int port, String type, List entities) { try { AssertUtil.notNull(entities, "rules cannot be null"); AssertUtil.notEmpty(app, "Bad app name"); AssertUtil.notEmpty(ip, "Bad machine IP"); AssertUtil.isTrue(port > 0, "Bad machine port"); String data = JSON.toJSONString( entities.stream().map(r -> r.toRule()).collect(Collectors.toList())); Map params = new HashMap<>(2); params.put("type", type); params.put("data", data); return executeCommand(app, ip, port, SET_RULES_PATH, params, true) .thenCompose(r -> { if ("success".equalsIgnoreCase(r.trim())) { return CompletableFuture.completedFuture(null); } return AsyncUtils.newFailedFuture(new CommandFailedException(r)); }); } catch (Exception e) { logger.error("setRulesAsync API failed, type={}", type, e); return AsyncUtils.newFailedFuture(e); } } public List fetchResourceOfMachine(String ip, int port, String type) { return fetchItems(ip, port, RESOURCE_URL_PATH, type, NodeVo.class); } /** * Fetch cluster node. * * @param ip ip to fetch * @param port port of the ip * @param includeZero whether zero value should in the result list. * @return */ public List fetchClusterNodeOfMachine(String ip, int port, boolean includeZero) { String type = "notZero"; if (includeZero) { type = "zero"; } return fetchItems(ip, port, CLUSTER_NODE_PATH, type, NodeVo.class); } public List fetchFlowRuleOfMachine(String app, String ip, int port) { List rules = fetchRules(ip, port, FLOW_RULE_TYPE, FlowRule.class); if (rules != null) { return rules.stream().map(rule -> FlowRuleEntity.fromFlowRule(app, ip, port, rule)) .collect(Collectors.toList()); } else { return null; } } public List fetchDegradeRuleOfMachine(String app, String ip, int port) { List rules = fetchRules(ip, port, DEGRADE_RULE_TYPE, DegradeRule.class); if (rules != null) { return rules.stream().map(rule -> DegradeRuleEntity.fromDegradeRule(app, ip, port, rule)) .collect(Collectors.toList()); } else { return null; } } public List fetchSystemRuleOfMachine(String app, String ip, int port) { List rules = fetchRules(ip, port, SYSTEM_RULE_TYPE, SystemRule.class); if (rules != null) { return rules.stream().map(rule -> SystemRuleEntity.fromSystemRule(app, ip, port, rule)) .collect(Collectors.toList()); } else { return null; } } /** * Fetch all parameter flow rules from provided machine. * * @param app application name * @param ip machine client IP * @param port machine client port * @return all retrieved parameter flow rules * @since 0.2.1 */ public CompletableFuture> fetchParamFlowRulesOfMachine(String app, String ip, int port) { try { AssertUtil.notEmpty(app, "Bad app name"); AssertUtil.notEmpty(ip, "Bad machine IP"); AssertUtil.isTrue(port > 0, "Bad machine port"); return fetchItemsAsync(ip, port, GET_PARAM_RULE_PATH, null, ParamFlowRule.class) .thenApply(rules -> rules.stream() .map(e -> ParamFlowRuleEntity.fromParamFlowRule(app, ip, port, e)) .collect(Collectors.toList()) ); } catch (Exception e) { logger.error("Error when fetching parameter flow rules", e); return AsyncUtils.newFailedFuture(e); } } /** * Fetch all authority rules from provided machine. * * @param app application name * @param ip machine client IP * @param port machine client port * @return all retrieved authority rules * @since 0.2.1 */ public List fetchAuthorityRulesOfMachine(String app, String ip, int port) { AssertUtil.notEmpty(app, "Bad app name"); AssertUtil.notEmpty(ip, "Bad machine IP"); AssertUtil.isTrue(port > 0, "Bad machine port"); Map params = new HashMap<>(1); params.put("type", AUTHORITY_TYPE); List rules = fetchRules(ip, port, AUTHORITY_TYPE, AuthorityRule.class); return Optional.ofNullable(rules).map(r -> r.stream() .map(e -> AuthorityRuleEntity.fromAuthorityRule(app, ip, port, e)) .collect(Collectors.toList()) ).orElse(null); } /** * set rules of the machine. rules == null will return immediately; * rules.isEmpty() means setting the rules to empty. * * @param app * @param ip * @param port * @param rules * @return whether successfully set the rules. */ public boolean setFlowRuleOfMachine(String app, String ip, int port, List rules) { return setRules(app, ip, port, FLOW_RULE_TYPE, rules); } public CompletableFuture setFlowRuleOfMachineAsync(String app, String ip, int port, List rules) { return setRulesAsync(app, ip, port, FLOW_RULE_TYPE, rules); } /** * set rules of the machine. rules == null will return immediately; * rules.isEmpty() means setting the rules to empty. * * @param app * @param ip * @param port * @param rules * @return whether successfully set the rules. */ public boolean setDegradeRuleOfMachine(String app, String ip, int port, List rules) { return setRules(app, ip, port, DEGRADE_RULE_TYPE, rules); } /** * set rules of the machine. rules == null will return immediately; * rules.isEmpty() means setting the rules to empty. * * @param app * @param ip * @param port * @param rules * @return whether successfully set the rules. */ public boolean setSystemRuleOfMachine(String app, String ip, int port, List rules) { return setRules(app, ip, port, SYSTEM_RULE_TYPE, rules); } public boolean setAuthorityRuleOfMachine(String app, String ip, int port, List rules) { return setRules(app, ip, port, AUTHORITY_TYPE, rules); } public CompletableFuture setParamFlowRuleOfMachine(String app, String ip, int port, List rules) { if (rules == null) { return CompletableFuture.completedFuture(null); } if (StringUtil.isBlank(ip) || port <= 0) { return AsyncUtils.newFailedFuture(new IllegalArgumentException("Invalid parameter")); } try { String data = JSON.toJSONString( rules.stream().map(ParamFlowRuleEntity::getRule).collect(Collectors.toList()) ); Map params = new HashMap<>(1); params.put("data", data); return executeCommand(app, ip, port, SET_PARAM_RULE_PATH, params, true) .thenCompose(e -> { if (CommandConstants.MSG_SUCCESS.equals(e)) { return CompletableFuture.completedFuture(null); } else { logger.warn("Push parameter flow rules to client failed: " + e); return AsyncUtils.newFailedFuture(new RuntimeException(e)); } }); } catch (Exception ex) { logger.warn("Error when setting parameter flow rule", ex); return AsyncUtils.newFailedFuture(ex); } } // Cluster related public CompletableFuture fetchClusterMode(String ip, int port) { if (StringUtil.isBlank(ip) || port <= 0) { return AsyncUtils.newFailedFuture(new IllegalArgumentException("Invalid parameter")); } try { return executeCommand(ip, port, FETCH_CLUSTER_MODE_PATH, false) .thenApply(r -> JSON.parseObject(r, ClusterStateSimpleEntity.class)); } catch (Exception ex) { logger.warn("Error when fetching cluster mode", ex); return AsyncUtils.newFailedFuture(ex); } } public CompletableFuture modifyClusterMode(String ip, int port, int mode) { if (StringUtil.isBlank(ip) || port <= 0) { return AsyncUtils.newFailedFuture(new IllegalArgumentException("Invalid parameter")); } try { Map params = new HashMap<>(1); params.put("mode", String.valueOf(mode)); return executeCommand(ip, port, MODIFY_CLUSTER_MODE_PATH, params, false) .thenCompose(e -> { if (CommandConstants.MSG_SUCCESS.equals(e)) { return CompletableFuture.completedFuture(null); } else { logger.warn("Error when modifying cluster mode: " + e); return AsyncUtils.newFailedFuture(new RuntimeException(e)); } }); } catch (Exception ex) { logger.warn("Error when modifying cluster mode", ex); return AsyncUtils.newFailedFuture(ex); } } public CompletableFuture fetchClusterClientInfoAndConfig(String ip, int port) { if (StringUtil.isBlank(ip) || port <= 0) { return AsyncUtils.newFailedFuture(new IllegalArgumentException("Invalid parameter")); } try { return executeCommand(ip, port, FETCH_CLUSTER_CLIENT_CONFIG_PATH, false) .thenApply(r -> JSON.parseObject(r, ClusterClientInfoVO.class)); } catch (Exception ex) { logger.warn("Error when fetching cluster client config", ex); return AsyncUtils.newFailedFuture(ex); } } public CompletableFuture modifyClusterClientConfig(String app, String ip, int port, ClusterClientConfig config) { if (StringUtil.isBlank(ip) || port <= 0) { return AsyncUtils.newFailedFuture(new IllegalArgumentException("Invalid parameter")); } try { Map params = new HashMap<>(1); params.put("data", JSON.toJSONString(config)); return executeCommand(app, ip, port, MODIFY_CLUSTER_CLIENT_CONFIG_PATH, params, true) .thenCompose(e -> { if (CommandConstants.MSG_SUCCESS.equals(e)) { return CompletableFuture.completedFuture(null); } else { logger.warn("Error when modifying cluster client config: " + e); return AsyncUtils.newFailedFuture(new RuntimeException(e)); } }); } catch (Exception ex) { logger.warn("Error when modifying cluster client config", ex); return AsyncUtils.newFailedFuture(ex); } } public CompletableFuture modifyClusterServerFlowConfig(String app, String ip, int port, ServerFlowConfig config) { if (StringUtil.isBlank(ip) || port <= 0) { return AsyncUtils.newFailedFuture(new IllegalArgumentException("Invalid parameter")); } try { Map params = new HashMap<>(1); params.put("data", JSON.toJSONString(config)); return executeCommand(app, ip, port, MODIFY_CLUSTER_SERVER_FLOW_CONFIG_PATH, params, true) .thenCompose(e -> { if (CommandConstants.MSG_SUCCESS.equals(e)) { return CompletableFuture.completedFuture(null); } else { logger.warn("Error when modifying cluster server flow config: " + e); return AsyncUtils.newFailedFuture(new RuntimeException(e)); } }); } catch (Exception ex) { logger.warn("Error when modifying cluster server flow config", ex); return AsyncUtils.newFailedFuture(ex); } } public CompletableFuture modifyClusterServerTransportConfig(String app, String ip, int port, ServerTransportConfig config) { if (StringUtil.isBlank(ip) || port <= 0) { return AsyncUtils.newFailedFuture(new IllegalArgumentException("Invalid parameter")); } try { Map params = new HashMap<>(2); params.put("port", config.getPort().toString()); params.put("idleSeconds", config.getIdleSeconds().toString()); return executeCommand(app, ip, port, MODIFY_CLUSTER_SERVER_TRANSPORT_CONFIG_PATH, params, false) .thenCompose(e -> { if (CommandConstants.MSG_SUCCESS.equals(e)) { return CompletableFuture.completedFuture(null); } else { logger.warn("Error when modifying cluster server transport config: " + e); return AsyncUtils.newFailedFuture(new RuntimeException(e)); } }); } catch (Exception ex) { logger.warn("Error when modifying cluster server transport config", ex); return AsyncUtils.newFailedFuture(ex); } } public CompletableFuture modifyClusterServerNamespaceSet(String app, String ip, int port, Set set) { if (StringUtil.isBlank(ip) || port <= 0) { return AsyncUtils.newFailedFuture(new IllegalArgumentException("Invalid parameter")); } try { Map params = new HashMap<>(1); params.put("data", JSON.toJSONString(set)); return executeCommand(app, ip, port, MODIFY_CLUSTER_SERVER_NAMESPACE_SET_PATH, params, true) .thenCompose(e -> { if (CommandConstants.MSG_SUCCESS.equals(e)) { return CompletableFuture.completedFuture(null); } else { logger.warn("Error when modifying cluster server NamespaceSet", e); return AsyncUtils.newFailedFuture(new RuntimeException(e)); } }); } catch (Exception ex) { logger.warn("Error when modifying cluster server NamespaceSet", ex); return AsyncUtils.newFailedFuture(ex); } } public CompletableFuture fetchClusterServerBasicInfo(String ip, int port) { if (StringUtil.isBlank(ip) || port <= 0) { return AsyncUtils.newFailedFuture(new IllegalArgumentException("Invalid parameter")); } try { return executeCommand(ip, port, FETCH_CLUSTER_SERVER_BASIC_INFO_PATH, false) .thenApply(r -> JSON.parseObject(r, ClusterServerStateVO.class)); } catch (Exception ex) { logger.warn("Error when fetching cluster sever all config and basic info", ex); return AsyncUtils.newFailedFuture(ex); } } public CompletableFuture> fetchApis(String app, String ip, int port) { if (StringUtil.isBlank(ip) || port <= 0) { return AsyncUtils.newFailedFuture(new IllegalArgumentException("Invalid parameter")); } try { return executeCommand(ip, port, FETCH_GATEWAY_API_PATH, false) .thenApply(r -> { List entities = JSON.parseArray(r, ApiDefinitionEntity.class); if (entities != null) { for (ApiDefinitionEntity entity : entities) { entity.setApp(app); entity.setIp(ip); entity.setPort(port); } } return entities; }); } catch (Exception ex) { logger.warn("Error when fetching gateway apis", ex); return AsyncUtils.newFailedFuture(ex); } } public boolean modifyApis(String app, String ip, int port, List apis) { if (apis == null) { return true; } try { AssertUtil.notEmpty(app, "Bad app name"); AssertUtil.notEmpty(ip, "Bad machine IP"); AssertUtil.isTrue(port > 0, "Bad machine port"); String data = JSON.toJSONString( apis.stream().map(r -> r.toApiDefinition()).collect(Collectors.toList())); Map params = new HashMap<>(2); params.put("data", data); String result = executeCommand(app, ip, port, MODIFY_GATEWAY_API_PATH, params, true).get(); logger.info("Modify gateway apis: {}", result); return true; } catch (Exception e) { logger.warn("Error when modifying gateway apis", e); return false; } } public CompletableFuture> fetchGatewayFlowRules(String app, String ip, int port) { if (StringUtil.isBlank(ip) || port <= 0) { return AsyncUtils.newFailedFuture(new IllegalArgumentException("Invalid parameter")); } try { return executeCommand(ip, port, FETCH_GATEWAY_FLOW_RULE_PATH, false) .thenApply(r -> { List gatewayFlowRules = JSON.parseArray(r, GatewayFlowRule.class); List entities = gatewayFlowRules.stream().map(rule -> GatewayFlowRuleEntity.fromGatewayFlowRule(app, ip, port, rule)).collect(Collectors.toList()); return entities; }); } catch (Exception ex) { logger.warn("Error when fetching gateway flow rules", ex); return AsyncUtils.newFailedFuture(ex); } } public boolean modifyGatewayFlowRules(String app, String ip, int port, List rules) { if (rules == null) { return true; } try { AssertUtil.notEmpty(app, "Bad app name"); AssertUtil.notEmpty(ip, "Bad machine IP"); AssertUtil.isTrue(port > 0, "Bad machine port"); String data = JSON.toJSONString( rules.stream().map(r -> r.toGatewayFlowRule()).collect(Collectors.toList())); Map params = new HashMap<>(2); params.put("data", data); String result = executeCommand(app, ip, port, MODIFY_GATEWAY_FLOW_RULE_PATH, params, true).get(); logger.info("Modify gateway flow rules: {}", result); return true; } catch (Exception e) { logger.warn("Error when modifying gateway apis", e); return false; } } }