/*
|
* Copyright 1999-2018 Alibaba Group Holding Ltd.
|
*
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
* you may not use this file except in compliance with the License.
|
* You may obtain a copy of the License at
|
*
|
* http://www.apache.org/licenses/LICENSE-2.0
|
*
|
* Unless required by applicable law or agreed to in writing, software
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
* See the License for the specific language governing permissions and
|
* limitations under the License.
|
*/
|
package com.alibaba.csp.sentinel.dashboard.client;
|
|
import com.alibaba.csp.sentinel.adapter.gateway.common.rule.GatewayFlowRule;
|
import com.alibaba.csp.sentinel.command.CommandConstants;
|
import com.alibaba.csp.sentinel.command.vo.NodeVo;
|
import com.alibaba.csp.sentinel.config.SentinelConfig;
|
import com.alibaba.csp.sentinel.dashboard.datasource.entity.SentinelVersion;
|
import com.alibaba.csp.sentinel.dashboard.datasource.entity.gateway.ApiDefinitionEntity;
|
import com.alibaba.csp.sentinel.dashboard.datasource.entity.gateway.GatewayFlowRuleEntity;
|
import com.alibaba.csp.sentinel.dashboard.datasource.entity.rule.*;
|
import com.alibaba.csp.sentinel.dashboard.discovery.AppManagement;
|
import com.alibaba.csp.sentinel.dashboard.domain.cluster.ClusterClientInfoVO;
|
import com.alibaba.csp.sentinel.dashboard.domain.cluster.config.ClusterClientConfig;
|
import com.alibaba.csp.sentinel.dashboard.domain.cluster.config.ServerFlowConfig;
|
import com.alibaba.csp.sentinel.dashboard.domain.cluster.config.ServerTransportConfig;
|
import com.alibaba.csp.sentinel.dashboard.domain.cluster.state.ClusterServerStateVO;
|
import com.alibaba.csp.sentinel.dashboard.domain.cluster.state.ClusterStateSimpleEntity;
|
import com.alibaba.csp.sentinel.dashboard.util.AsyncUtils;
|
import com.alibaba.csp.sentinel.dashboard.util.VersionUtils;
|
import com.alibaba.csp.sentinel.slots.block.Rule;
|
import com.alibaba.csp.sentinel.slots.block.authority.AuthorityRule;
|
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule;
|
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
|
import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowRule;
|
import com.alibaba.csp.sentinel.slots.system.SystemRule;
|
import com.alibaba.csp.sentinel.util.AssertUtil;
|
import com.alibaba.csp.sentinel.util.StringUtil;
|
import com.alibaba.fastjson.JSON;
|
import org.apache.http.Consts;
|
import org.apache.http.HttpResponse;
|
import org.apache.http.NameValuePair;
|
import org.apache.http.client.entity.UrlEncodedFormEntity;
|
import org.apache.http.client.methods.HttpGet;
|
import org.apache.http.client.methods.HttpPost;
|
import org.apache.http.client.methods.HttpUriRequest;
|
import org.apache.http.client.utils.URLEncodedUtils;
|
import org.apache.http.concurrent.FutureCallback;
|
import org.apache.http.conn.util.InetAddressUtils;
|
import org.apache.http.entity.ContentType;
|
import org.apache.http.impl.client.DefaultRedirectStrategy;
|
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
|
import org.apache.http.impl.nio.client.HttpAsyncClients;
|
import org.apache.http.impl.nio.reactor.IOReactorConfig;
|
import org.apache.http.message.BasicNameValuePair;
|
import org.apache.http.util.EntityUtils;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.lang.Nullable;
|
import org.springframework.stereotype.Component;
|
|
import java.io.UnsupportedEncodingException;
|
import java.net.URLEncoder;
|
import java.nio.charset.Charset;
|
import java.util.*;
|
import java.util.Map.Entry;
|
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.ExecutionException;
|
import java.util.stream.Collectors;
|
|
/**
|
* Communicate with Sentinel client.
|
*
|
* @author leyou
|
*/
|
@Component
|
public class SentinelApiClient {
|
private static Logger logger = LoggerFactory.getLogger(SentinelApiClient.class);
|
|
private static final Charset DEFAULT_CHARSET = Charset.forName(SentinelConfig.charset());
|
private static final String HTTP_HEADER_CONTENT_TYPE = "Content-Type";
|
private static final String HTTP_HEADER_CONTENT_TYPE_URLENCODED = ContentType.create(URLEncodedUtils.CONTENT_TYPE).toString();
|
|
private static final String RESOURCE_URL_PATH = "jsonTree";
|
private static final String CLUSTER_NODE_PATH = "clusterNode";
|
private static final String GET_RULES_PATH = "getRules";
|
private static final String SET_RULES_PATH = "setRules";
|
private static final String GET_PARAM_RULE_PATH = "getParamFlowRules";
|
private static final String SET_PARAM_RULE_PATH = "setParamFlowRules";
|
|
private static final String FETCH_CLUSTER_MODE_PATH = "getClusterMode";
|
private static final String MODIFY_CLUSTER_MODE_PATH = "setClusterMode";
|
private static final String FETCH_CLUSTER_CLIENT_CONFIG_PATH = "cluster/client/fetchConfig";
|
private static final String MODIFY_CLUSTER_CLIENT_CONFIG_PATH = "cluster/client/modifyConfig";
|
|
private static final String FETCH_CLUSTER_SERVER_BASIC_INFO_PATH = "cluster/server/info";
|
|
private static final String MODIFY_CLUSTER_SERVER_TRANSPORT_CONFIG_PATH = "cluster/server/modifyTransportConfig";
|
private static final String MODIFY_CLUSTER_SERVER_FLOW_CONFIG_PATH = "cluster/server/modifyFlowConfig";
|
private static final String MODIFY_CLUSTER_SERVER_NAMESPACE_SET_PATH = "cluster/server/modifyNamespaceSet";
|
|
private static final String FETCH_GATEWAY_API_PATH = "gateway/getApiDefinitions";
|
private static final String MODIFY_GATEWAY_API_PATH = "gateway/updateApiDefinitions";
|
|
private static final String FETCH_GATEWAY_FLOW_RULE_PATH = "gateway/getRules";
|
private static final String MODIFY_GATEWAY_FLOW_RULE_PATH = "gateway/updateRules";
|
|
private static final String FLOW_RULE_TYPE = "flow";
|
private static final String DEGRADE_RULE_TYPE = "degrade";
|
private static final String SYSTEM_RULE_TYPE = "system";
|
private static final String AUTHORITY_TYPE = "authority";
|
|
private CloseableHttpAsyncClient httpClient;
|
|
private static final SentinelVersion version160 = new SentinelVersion(1, 6, 0);
|
private static final SentinelVersion version171 = new SentinelVersion(1, 7, 1);
|
|
@Autowired
|
private AppManagement appManagement;
|
|
public SentinelApiClient() {
|
IOReactorConfig ioConfig = IOReactorConfig.custom().setConnectTimeout(3000).setSoTimeout(10000)
|
.setIoThreadCount(Runtime.getRuntime().availableProcessors() * 2).build();
|
httpClient = HttpAsyncClients.custom().setRedirectStrategy(new DefaultRedirectStrategy() {
|
@Override
|
protected boolean isRedirectable(final String method) {
|
return false;
|
}
|
}).setMaxConnTotal(4000).setMaxConnPerRoute(1000).setDefaultIOReactorConfig(ioConfig).build();
|
httpClient.start();
|
}
|
|
private boolean isSuccess(int statusCode) {
|
return statusCode >= 200 && statusCode < 300;
|
}
|
|
private boolean isCommandNotFound(int statusCode, String body) {
|
return statusCode == 400 && StringUtil.isNotEmpty(body) && body.contains(CommandConstants.MSG_UNKNOWN_COMMAND_PREFIX);
|
}
|
|
protected boolean isSupportPost(String app, String ip, int port) {
|
return StringUtil.isNotEmpty(app) && Optional.ofNullable(appManagement.getDetailApp(app))
|
.flatMap(e -> e.getMachine(ip, port))
|
.flatMap(m -> VersionUtils.parseVersion(m.getVersion())
|
.map(v -> v.greaterOrEqual(version160)))
|
.orElse(false);
|
}
|
|
/**
|
* Check whether target instance (identified by tuple of app-ip:port)
|
* supports the form of "xxxxx; xx=xx" in "Content-Type" header.
|
*
|
* @param app target app name
|
* @param ip target node's address
|
* @param port target node's port
|
*/
|
protected boolean isSupportEnhancedContentType(String app, String ip, int port) {
|
return StringUtil.isNotEmpty(app) && Optional.ofNullable(appManagement.getDetailApp(app))
|
.flatMap(e -> e.getMachine(ip, port))
|
.flatMap(m -> VersionUtils.parseVersion(m.getVersion())
|
.map(v -> v.greaterOrEqual(version171)))
|
.orElse(false);
|
}
|
|
private StringBuilder queryString(Map<String, String> params) {
|
StringBuilder queryStringBuilder = new StringBuilder();
|
for (Entry<String, String> entry : params.entrySet()) {
|
if (StringUtil.isEmpty(entry.getValue())) {
|
continue;
|
}
|
String name = urlEncode(entry.getKey());
|
String value = urlEncode(entry.getValue());
|
if (name != null && value != null) {
|
if (queryStringBuilder.length() > 0) {
|
queryStringBuilder.append('&');
|
}
|
queryStringBuilder.append(name).append('=').append(value);
|
}
|
}
|
return queryStringBuilder;
|
}
|
|
/**
|
* Build an `HttpUriRequest` in POST way.
|
*
|
* @param url
|
* @param params
|
* @param supportEnhancedContentType see {@link #isSupportEnhancedContentType(String, String, int)}
|
* @return
|
*/
|
protected static HttpUriRequest postRequest(String url, Map<String, String> params, boolean supportEnhancedContentType) {
|
HttpPost httpPost = new HttpPost(url);
|
if (params != null && params.size() > 0) {
|
List<NameValuePair> list = new ArrayList<>(params.size());
|
for (Entry<String, String> entry : params.entrySet()) {
|
list.add(new BasicNameValuePair(entry.getKey(), entry.getValue()));
|
}
|
httpPost.setEntity(new UrlEncodedFormEntity(list, Consts.UTF_8));
|
if (!supportEnhancedContentType) {
|
httpPost.setHeader(HTTP_HEADER_CONTENT_TYPE, HTTP_HEADER_CONTENT_TYPE_URLENCODED);
|
}
|
}
|
return httpPost;
|
}
|
|
private String urlEncode(String str) {
|
try {
|
return URLEncoder.encode(str, DEFAULT_CHARSET.name());
|
} catch (UnsupportedEncodingException e) {
|
logger.info("encode string error: {}", str, e);
|
return null;
|
}
|
}
|
|
private String getBody(HttpResponse response) throws Exception {
|
Charset charset = null;
|
try {
|
String contentTypeStr = response.getFirstHeader(HTTP_HEADER_CONTENT_TYPE).getValue();
|
if (StringUtil.isNotEmpty(contentTypeStr)) {
|
ContentType contentType = ContentType.parse(contentTypeStr);
|
charset = contentType.getCharset();
|
}
|
} catch (Exception ignore) {
|
}
|
return EntityUtils.toString(response.getEntity(), charset != null ? charset : DEFAULT_CHARSET);
|
}
|
|
/**
|
* With no param
|
*
|
* @param ip
|
* @param port
|
* @param api
|
* @return
|
*/
|
private CompletableFuture<String> executeCommand(String ip, int port, String api, boolean useHttpPost) {
|
return executeCommand(ip, port, api, null, useHttpPost);
|
}
|
|
/**
|
* No app specified, force to GET
|
*
|
* @param ip
|
* @param port
|
* @param api
|
* @param params
|
* @return
|
*/
|
private CompletableFuture<String> executeCommand(String ip, int port, String api, Map<String, String> params, boolean useHttpPost) {
|
return executeCommand(null, ip, port, api, params, useHttpPost);
|
}
|
|
/**
|
* Prefer to execute request using POST
|
*
|
* @param app
|
* @param ip
|
* @param port
|
* @param api
|
* @param params
|
* @return
|
*/
|
private CompletableFuture<String> executeCommand(String app, String ip, int port, String api, Map<String, String> params, boolean useHttpPost) {
|
CompletableFuture<String> future = new CompletableFuture<>();
|
if (StringUtil.isBlank(ip) || StringUtil.isBlank(api)) {
|
future.completeExceptionally(new IllegalArgumentException("Bad URL or command name"));
|
return future;
|
}
|
if (!InetAddressUtils.isIPv4Address(ip) && !InetAddressUtils.isIPv6Address(ip)) {
|
future.completeExceptionally(new IllegalArgumentException("Bad IP"));
|
return future;
|
}
|
if (!StringUtil.isEmpty(app) && !appManagement.isValidMachineOfApp(app, ip)) {
|
future.completeExceptionally(new IllegalArgumentException("Given ip does not belong to given app"));
|
return future;
|
}
|
StringBuilder urlBuilder = new StringBuilder();
|
urlBuilder.append("http://");
|
urlBuilder.append(ip).append(':').append(port).append('/').append(api);
|
if (params == null) {
|
params = Collections.emptyMap();
|
}
|
if (!useHttpPost || !isSupportPost(app, ip, port)) {
|
// Using GET in older versions, append parameters after url
|
if (!params.isEmpty()) {
|
if (urlBuilder.indexOf("?") == -1) {
|
urlBuilder.append('?');
|
} else {
|
urlBuilder.append('&');
|
}
|
urlBuilder.append(queryString(params));
|
}
|
return executeCommand(new HttpGet(urlBuilder.toString()));
|
} else {
|
// Using POST
|
return executeCommand(
|
postRequest(urlBuilder.toString(), params, isSupportEnhancedContentType(app, ip, port)));
|
}
|
}
|
|
private CompletableFuture<String> executeCommand(HttpUriRequest request) {
|
CompletableFuture<String> future = new CompletableFuture<>();
|
httpClient.execute(request, new FutureCallback<HttpResponse>() {
|
@Override
|
public void completed(final HttpResponse response) {
|
int statusCode = response.getStatusLine().getStatusCode();
|
try {
|
String value = getBody(response);
|
if (isSuccess(statusCode)) {
|
future.complete(value);
|
} else {
|
if (isCommandNotFound(statusCode, value)) {
|
future.completeExceptionally(new CommandNotFoundException(request.getURI().getPath()));
|
} else {
|
future.completeExceptionally(new CommandFailedException(value));
|
}
|
}
|
|
} catch (Exception ex) {
|
future.completeExceptionally(ex);
|
logger.error("HTTP request failed: {}", request.getURI().toString(), ex);
|
}
|
}
|
|
@Override
|
public void failed(final Exception ex) {
|
future.completeExceptionally(ex);
|
logger.error("HTTP request failed: {}", request.getURI().toString(), ex);
|
}
|
|
@Override
|
public void cancelled() {
|
future.complete(null);
|
}
|
});
|
return future;
|
}
|
|
public void close() throws Exception {
|
httpClient.close();
|
}
|
|
@Nullable
|
private <T> CompletableFuture<List<T>> fetchItemsAsync(String ip, int port, String api, String type, Class<T> ruleType) {
|
AssertUtil.notEmpty(ip, "Bad machine IP");
|
AssertUtil.isTrue(port > 0, "Bad machine port");
|
Map<String, String> params = null;
|
if (StringUtil.isNotEmpty(type)) {
|
params = new HashMap<>(1);
|
params.put("type", type);
|
}
|
return executeCommand(ip, port, api, params, false)
|
.thenApply(json -> JSON.parseArray(json, ruleType));
|
}
|
|
@Nullable
|
private <T> List<T> fetchItems(String ip, int port, String api, String type, Class<T> ruleType) {
|
try {
|
AssertUtil.notEmpty(ip, "Bad machine IP");
|
AssertUtil.isTrue(port > 0, "Bad machine port");
|
Map<String, String> params = null;
|
if (StringUtil.isNotEmpty(type)) {
|
params = new HashMap<>(1);
|
params.put("type", type);
|
}
|
return fetchItemsAsync(ip, port, api, type, ruleType).get();
|
} catch (InterruptedException | ExecutionException e) {
|
logger.error("Error when fetching items from api: {} -> {}", api, type, e);
|
return null;
|
} catch (Exception e) {
|
logger.error("Error when fetching items: {} -> {}", api, type, e);
|
return null;
|
}
|
}
|
|
private <T extends Rule> List<T> fetchRules(String ip, int port, String type, Class<T> ruleType) {
|
return fetchItems(ip, port, GET_RULES_PATH, type, ruleType);
|
}
|
|
private boolean setRules(String app, String ip, int port, String type, List<? extends RuleEntity> entities) {
|
if (entities == null) {
|
return true;
|
}
|
try {
|
AssertUtil.notEmpty(app, "Bad app name");
|
AssertUtil.notEmpty(ip, "Bad machine IP");
|
AssertUtil.isTrue(port > 0, "Bad machine port");
|
String data = JSON.toJSONString(
|
entities.stream().map(r -> r.toRule()).collect(Collectors.toList()));
|
Map<String, String> params = new HashMap<>(2);
|
params.put("type", type);
|
params.put("data", data);
|
String result = executeCommand(app, ip, port, SET_RULES_PATH, params, true).get();
|
logger.info("setRules result: {}, type={}", result, type);
|
return true;
|
} catch (InterruptedException e) {
|
logger.warn("setRules API failed: {}", type, e);
|
return false;
|
} catch (ExecutionException e) {
|
logger.warn("setRules API failed: {}", type, e.getCause());
|
return false;
|
} catch (Exception e) {
|
logger.error("setRules API failed, type={}", type, e);
|
return false;
|
}
|
}
|
|
private CompletableFuture<Void> setRulesAsync(String app, String ip, int port, String type, List<? extends RuleEntity> entities) {
|
try {
|
AssertUtil.notNull(entities, "rules cannot be null");
|
AssertUtil.notEmpty(app, "Bad app name");
|
AssertUtil.notEmpty(ip, "Bad machine IP");
|
AssertUtil.isTrue(port > 0, "Bad machine port");
|
String data = JSON.toJSONString(
|
entities.stream().map(r -> r.toRule()).collect(Collectors.toList()));
|
Map<String, String> params = new HashMap<>(2);
|
params.put("type", type);
|
params.put("data", data);
|
return executeCommand(app, ip, port, SET_RULES_PATH, params, true)
|
.thenCompose(r -> {
|
if ("success".equalsIgnoreCase(r.trim())) {
|
return CompletableFuture.completedFuture(null);
|
}
|
return AsyncUtils.newFailedFuture(new CommandFailedException(r));
|
});
|
} catch (Exception e) {
|
logger.error("setRulesAsync API failed, type={}", type, e);
|
return AsyncUtils.newFailedFuture(e);
|
}
|
}
|
|
public List<NodeVo> fetchResourceOfMachine(String ip, int port, String type) {
|
return fetchItems(ip, port, RESOURCE_URL_PATH, type, NodeVo.class);
|
}
|
|
/**
|
* Fetch cluster node.
|
*
|
* @param ip ip to fetch
|
* @param port port of the ip
|
* @param includeZero whether zero value should in the result list.
|
* @return
|
*/
|
public List<NodeVo> fetchClusterNodeOfMachine(String ip, int port, boolean includeZero) {
|
String type = "notZero";
|
if (includeZero) {
|
type = "zero";
|
}
|
return fetchItems(ip, port, CLUSTER_NODE_PATH, type, NodeVo.class);
|
}
|
|
public List<FlowRuleEntity> fetchFlowRuleOfMachine(String app, String ip, int port) {
|
List<FlowRule> rules = fetchRules(ip, port, FLOW_RULE_TYPE, FlowRule.class);
|
if (rules != null) {
|
return rules.stream().map(rule -> FlowRuleEntity.fromFlowRule(app, ip, port, rule))
|
.collect(Collectors.toList());
|
} else {
|
return null;
|
}
|
}
|
|
public List<DegradeRuleEntity> fetchDegradeRuleOfMachine(String app, String ip, int port) {
|
List<DegradeRule> rules = fetchRules(ip, port, DEGRADE_RULE_TYPE, DegradeRule.class);
|
if (rules != null) {
|
return rules.stream().map(rule -> DegradeRuleEntity.fromDegradeRule(app, ip, port, rule))
|
.collect(Collectors.toList());
|
} else {
|
return null;
|
}
|
}
|
|
public List<SystemRuleEntity> fetchSystemRuleOfMachine(String app, String ip, int port) {
|
List<SystemRule> rules = fetchRules(ip, port, SYSTEM_RULE_TYPE, SystemRule.class);
|
if (rules != null) {
|
return rules.stream().map(rule -> SystemRuleEntity.fromSystemRule(app, ip, port, rule))
|
.collect(Collectors.toList());
|
} else {
|
return null;
|
}
|
}
|
|
/**
|
* Fetch all parameter flow rules from provided machine.
|
*
|
* @param app application name
|
* @param ip machine client IP
|
* @param port machine client port
|
* @return all retrieved parameter flow rules
|
* @since 0.2.1
|
*/
|
public CompletableFuture<List<ParamFlowRuleEntity>> fetchParamFlowRulesOfMachine(String app, String ip, int port) {
|
try {
|
AssertUtil.notEmpty(app, "Bad app name");
|
AssertUtil.notEmpty(ip, "Bad machine IP");
|
AssertUtil.isTrue(port > 0, "Bad machine port");
|
return fetchItemsAsync(ip, port, GET_PARAM_RULE_PATH, null, ParamFlowRule.class)
|
.thenApply(rules -> rules.stream()
|
.map(e -> ParamFlowRuleEntity.fromParamFlowRule(app, ip, port, e))
|
.collect(Collectors.toList())
|
);
|
} catch (Exception e) {
|
logger.error("Error when fetching parameter flow rules", e);
|
return AsyncUtils.newFailedFuture(e);
|
}
|
}
|
|
/**
|
* Fetch all authority rules from provided machine.
|
*
|
* @param app application name
|
* @param ip machine client IP
|
* @param port machine client port
|
* @return all retrieved authority rules
|
* @since 0.2.1
|
*/
|
public List<AuthorityRuleEntity> fetchAuthorityRulesOfMachine(String app, String ip, int port) {
|
AssertUtil.notEmpty(app, "Bad app name");
|
AssertUtil.notEmpty(ip, "Bad machine IP");
|
AssertUtil.isTrue(port > 0, "Bad machine port");
|
Map<String, String> params = new HashMap<>(1);
|
params.put("type", AUTHORITY_TYPE);
|
List<AuthorityRule> rules = fetchRules(ip, port, AUTHORITY_TYPE, AuthorityRule.class);
|
return Optional.ofNullable(rules).map(r -> r.stream()
|
.map(e -> AuthorityRuleEntity.fromAuthorityRule(app, ip, port, e))
|
.collect(Collectors.toList())
|
).orElse(null);
|
}
|
|
/**
|
* set rules of the machine. rules == null will return immediately;
|
* rules.isEmpty() means setting the rules to empty.
|
*
|
* @param app
|
* @param ip
|
* @param port
|
* @param rules
|
* @return whether successfully set the rules.
|
*/
|
public boolean setFlowRuleOfMachine(String app, String ip, int port, List<FlowRuleEntity> rules) {
|
return setRules(app, ip, port, FLOW_RULE_TYPE, rules);
|
}
|
|
public CompletableFuture<Void> setFlowRuleOfMachineAsync(String app, String ip, int port, List<FlowRuleEntity> rules) {
|
return setRulesAsync(app, ip, port, FLOW_RULE_TYPE, rules);
|
}
|
|
/**
|
* set rules of the machine. rules == null will return immediately;
|
* rules.isEmpty() means setting the rules to empty.
|
*
|
* @param app
|
* @param ip
|
* @param port
|
* @param rules
|
* @return whether successfully set the rules.
|
*/
|
public boolean setDegradeRuleOfMachine(String app, String ip, int port, List<DegradeRuleEntity> rules) {
|
return setRules(app, ip, port, DEGRADE_RULE_TYPE, rules);
|
}
|
|
/**
|
* set rules of the machine. rules == null will return immediately;
|
* rules.isEmpty() means setting the rules to empty.
|
*
|
* @param app
|
* @param ip
|
* @param port
|
* @param rules
|
* @return whether successfully set the rules.
|
*/
|
public boolean setSystemRuleOfMachine(String app, String ip, int port, List<SystemRuleEntity> rules) {
|
return setRules(app, ip, port, SYSTEM_RULE_TYPE, rules);
|
}
|
|
public boolean setAuthorityRuleOfMachine(String app, String ip, int port, List<AuthorityRuleEntity> rules) {
|
return setRules(app, ip, port, AUTHORITY_TYPE, rules);
|
}
|
|
public CompletableFuture<Void> setParamFlowRuleOfMachine(String app, String ip, int port, List<ParamFlowRuleEntity> rules) {
|
if (rules == null) {
|
return CompletableFuture.completedFuture(null);
|
}
|
if (StringUtil.isBlank(ip) || port <= 0) {
|
return AsyncUtils.newFailedFuture(new IllegalArgumentException("Invalid parameter"));
|
}
|
try {
|
String data = JSON.toJSONString(
|
rules.stream().map(ParamFlowRuleEntity::getRule).collect(Collectors.toList())
|
);
|
Map<String, String> params = new HashMap<>(1);
|
params.put("data", data);
|
return executeCommand(app, ip, port, SET_PARAM_RULE_PATH, params, true)
|
.thenCompose(e -> {
|
if (CommandConstants.MSG_SUCCESS.equals(e)) {
|
return CompletableFuture.completedFuture(null);
|
} else {
|
logger.warn("Push parameter flow rules to client failed: " + e);
|
return AsyncUtils.newFailedFuture(new RuntimeException(e));
|
}
|
});
|
} catch (Exception ex) {
|
logger.warn("Error when setting parameter flow rule", ex);
|
return AsyncUtils.newFailedFuture(ex);
|
}
|
}
|
|
// Cluster related
|
|
public CompletableFuture<ClusterStateSimpleEntity> fetchClusterMode(String ip, int port) {
|
if (StringUtil.isBlank(ip) || port <= 0) {
|
return AsyncUtils.newFailedFuture(new IllegalArgumentException("Invalid parameter"));
|
}
|
try {
|
return executeCommand(ip, port, FETCH_CLUSTER_MODE_PATH, false)
|
.thenApply(r -> JSON.parseObject(r, ClusterStateSimpleEntity.class));
|
} catch (Exception ex) {
|
logger.warn("Error when fetching cluster mode", ex);
|
return AsyncUtils.newFailedFuture(ex);
|
}
|
}
|
|
public CompletableFuture<Void> modifyClusterMode(String ip, int port, int mode) {
|
if (StringUtil.isBlank(ip) || port <= 0) {
|
return AsyncUtils.newFailedFuture(new IllegalArgumentException("Invalid parameter"));
|
}
|
try {
|
Map<String, String> params = new HashMap<>(1);
|
params.put("mode", String.valueOf(mode));
|
return executeCommand(ip, port, MODIFY_CLUSTER_MODE_PATH, params, false)
|
.thenCompose(e -> {
|
if (CommandConstants.MSG_SUCCESS.equals(e)) {
|
return CompletableFuture.completedFuture(null);
|
} else {
|
logger.warn("Error when modifying cluster mode: " + e);
|
return AsyncUtils.newFailedFuture(new RuntimeException(e));
|
}
|
});
|
} catch (Exception ex) {
|
logger.warn("Error when modifying cluster mode", ex);
|
return AsyncUtils.newFailedFuture(ex);
|
}
|
}
|
|
public CompletableFuture<ClusterClientInfoVO> fetchClusterClientInfoAndConfig(String ip, int port) {
|
if (StringUtil.isBlank(ip) || port <= 0) {
|
return AsyncUtils.newFailedFuture(new IllegalArgumentException("Invalid parameter"));
|
}
|
try {
|
return executeCommand(ip, port, FETCH_CLUSTER_CLIENT_CONFIG_PATH, false)
|
.thenApply(r -> JSON.parseObject(r, ClusterClientInfoVO.class));
|
} catch (Exception ex) {
|
logger.warn("Error when fetching cluster client config", ex);
|
return AsyncUtils.newFailedFuture(ex);
|
}
|
}
|
|
public CompletableFuture<Void> modifyClusterClientConfig(String app, String ip, int port, ClusterClientConfig config) {
|
if (StringUtil.isBlank(ip) || port <= 0) {
|
return AsyncUtils.newFailedFuture(new IllegalArgumentException("Invalid parameter"));
|
}
|
try {
|
Map<String, String> params = new HashMap<>(1);
|
params.put("data", JSON.toJSONString(config));
|
return executeCommand(app, ip, port, MODIFY_CLUSTER_CLIENT_CONFIG_PATH, params, true)
|
.thenCompose(e -> {
|
if (CommandConstants.MSG_SUCCESS.equals(e)) {
|
return CompletableFuture.completedFuture(null);
|
} else {
|
logger.warn("Error when modifying cluster client config: " + e);
|
return AsyncUtils.newFailedFuture(new RuntimeException(e));
|
}
|
});
|
} catch (Exception ex) {
|
logger.warn("Error when modifying cluster client config", ex);
|
return AsyncUtils.newFailedFuture(ex);
|
}
|
}
|
|
public CompletableFuture<Void> modifyClusterServerFlowConfig(String app, String ip, int port, ServerFlowConfig config) {
|
if (StringUtil.isBlank(ip) || port <= 0) {
|
return AsyncUtils.newFailedFuture(new IllegalArgumentException("Invalid parameter"));
|
}
|
try {
|
Map<String, String> params = new HashMap<>(1);
|
params.put("data", JSON.toJSONString(config));
|
return executeCommand(app, ip, port, MODIFY_CLUSTER_SERVER_FLOW_CONFIG_PATH, params, true)
|
.thenCompose(e -> {
|
if (CommandConstants.MSG_SUCCESS.equals(e)) {
|
return CompletableFuture.completedFuture(null);
|
} else {
|
logger.warn("Error when modifying cluster server flow config: " + e);
|
return AsyncUtils.newFailedFuture(new RuntimeException(e));
|
}
|
});
|
} catch (Exception ex) {
|
logger.warn("Error when modifying cluster server flow config", ex);
|
return AsyncUtils.newFailedFuture(ex);
|
}
|
}
|
|
public CompletableFuture<Void> modifyClusterServerTransportConfig(String app, String ip, int port, ServerTransportConfig config) {
|
if (StringUtil.isBlank(ip) || port <= 0) {
|
return AsyncUtils.newFailedFuture(new IllegalArgumentException("Invalid parameter"));
|
}
|
try {
|
Map<String, String> params = new HashMap<>(2);
|
params.put("port", config.getPort().toString());
|
params.put("idleSeconds", config.getIdleSeconds().toString());
|
return executeCommand(app, ip, port, MODIFY_CLUSTER_SERVER_TRANSPORT_CONFIG_PATH, params, false)
|
.thenCompose(e -> {
|
if (CommandConstants.MSG_SUCCESS.equals(e)) {
|
return CompletableFuture.completedFuture(null);
|
} else {
|
logger.warn("Error when modifying cluster server transport config: " + e);
|
return AsyncUtils.newFailedFuture(new RuntimeException(e));
|
}
|
});
|
} catch (Exception ex) {
|
logger.warn("Error when modifying cluster server transport config", ex);
|
return AsyncUtils.newFailedFuture(ex);
|
}
|
}
|
|
public CompletableFuture<Void> modifyClusterServerNamespaceSet(String app, String ip, int port, Set<String> set) {
|
if (StringUtil.isBlank(ip) || port <= 0) {
|
return AsyncUtils.newFailedFuture(new IllegalArgumentException("Invalid parameter"));
|
}
|
try {
|
Map<String, String> params = new HashMap<>(1);
|
params.put("data", JSON.toJSONString(set));
|
return executeCommand(app, ip, port, MODIFY_CLUSTER_SERVER_NAMESPACE_SET_PATH, params, true)
|
.thenCompose(e -> {
|
if (CommandConstants.MSG_SUCCESS.equals(e)) {
|
return CompletableFuture.completedFuture(null);
|
} else {
|
logger.warn("Error when modifying cluster server NamespaceSet", e);
|
return AsyncUtils.newFailedFuture(new RuntimeException(e));
|
}
|
});
|
} catch (Exception ex) {
|
logger.warn("Error when modifying cluster server NamespaceSet", ex);
|
return AsyncUtils.newFailedFuture(ex);
|
}
|
}
|
|
public CompletableFuture<ClusterServerStateVO> fetchClusterServerBasicInfo(String ip, int port) {
|
if (StringUtil.isBlank(ip) || port <= 0) {
|
return AsyncUtils.newFailedFuture(new IllegalArgumentException("Invalid parameter"));
|
}
|
try {
|
return executeCommand(ip, port, FETCH_CLUSTER_SERVER_BASIC_INFO_PATH, false)
|
.thenApply(r -> JSON.parseObject(r, ClusterServerStateVO.class));
|
} catch (Exception ex) {
|
logger.warn("Error when fetching cluster sever all config and basic info", ex);
|
return AsyncUtils.newFailedFuture(ex);
|
}
|
}
|
|
public CompletableFuture<List<ApiDefinitionEntity>> fetchApis(String app, String ip, int port) {
|
if (StringUtil.isBlank(ip) || port <= 0) {
|
return AsyncUtils.newFailedFuture(new IllegalArgumentException("Invalid parameter"));
|
}
|
|
try {
|
return executeCommand(ip, port, FETCH_GATEWAY_API_PATH, false)
|
.thenApply(r -> {
|
List<ApiDefinitionEntity> entities = JSON.parseArray(r, ApiDefinitionEntity.class);
|
if (entities != null) {
|
for (ApiDefinitionEntity entity : entities) {
|
entity.setApp(app);
|
entity.setIp(ip);
|
entity.setPort(port);
|
}
|
}
|
return entities;
|
});
|
} catch (Exception ex) {
|
logger.warn("Error when fetching gateway apis", ex);
|
return AsyncUtils.newFailedFuture(ex);
|
}
|
}
|
|
public boolean modifyApis(String app, String ip, int port, List<ApiDefinitionEntity> apis) {
|
if (apis == null) {
|
return true;
|
}
|
|
try {
|
AssertUtil.notEmpty(app, "Bad app name");
|
AssertUtil.notEmpty(ip, "Bad machine IP");
|
AssertUtil.isTrue(port > 0, "Bad machine port");
|
String data = JSON.toJSONString(
|
apis.stream().map(r -> r.toApiDefinition()).collect(Collectors.toList()));
|
Map<String, String> params = new HashMap<>(2);
|
params.put("data", data);
|
String result = executeCommand(app, ip, port, MODIFY_GATEWAY_API_PATH, params, true).get();
|
logger.info("Modify gateway apis: {}", result);
|
return true;
|
} catch (Exception e) {
|
logger.warn("Error when modifying gateway apis", e);
|
return false;
|
}
|
}
|
|
public CompletableFuture<List<GatewayFlowRuleEntity>> fetchGatewayFlowRules(String app, String ip, int port) {
|
if (StringUtil.isBlank(ip) || port <= 0) {
|
return AsyncUtils.newFailedFuture(new IllegalArgumentException("Invalid parameter"));
|
}
|
|
try {
|
return executeCommand(ip, port, FETCH_GATEWAY_FLOW_RULE_PATH, false)
|
.thenApply(r -> {
|
List<GatewayFlowRule> gatewayFlowRules = JSON.parseArray(r, GatewayFlowRule.class);
|
List<GatewayFlowRuleEntity> entities = gatewayFlowRules.stream().map(rule -> GatewayFlowRuleEntity.fromGatewayFlowRule(app, ip, port, rule)).collect(Collectors.toList());
|
return entities;
|
});
|
} catch (Exception ex) {
|
logger.warn("Error when fetching gateway flow rules", ex);
|
return AsyncUtils.newFailedFuture(ex);
|
}
|
}
|
|
public boolean modifyGatewayFlowRules(String app, String ip, int port, List<GatewayFlowRuleEntity> rules) {
|
if (rules == null) {
|
return true;
|
}
|
|
try {
|
AssertUtil.notEmpty(app, "Bad app name");
|
AssertUtil.notEmpty(ip, "Bad machine IP");
|
AssertUtil.isTrue(port > 0, "Bad machine port");
|
String data = JSON.toJSONString(
|
rules.stream().map(r -> r.toGatewayFlowRule()).collect(Collectors.toList()));
|
Map<String, String> params = new HashMap<>(2);
|
params.put("data", data);
|
String result = executeCommand(app, ip, port, MODIFY_GATEWAY_FLOW_RULE_PATH, params, true).get();
|
logger.info("Modify gateway flow rules: {}", result);
|
return true;
|
} catch (Exception e) {
|
logger.warn("Error when modifying gateway apis", e);
|
return false;
|
}
|
}
|
}
|