diff --git a/.codecov.yml b/.codecov.yml index 33e7d838..b15b071e 100644 --- a/.codecov.yml +++ b/.codecov.yml @@ -1,4 +1,5 @@ ignore: - "sentinel-demo/.*" - "sentinel-dashboard/.*" - - "sentinel-benchmark/.*" \ No newline at end of file + - "sentinel-benchmark/.*" + - "sentinel-transport/.*" \ No newline at end of file diff --git a/sentinel-cluster/README.md b/sentinel-cluster/README.md new file mode 100644 index 00000000..d6e1c2fd --- /dev/null +++ b/sentinel-cluster/README.md @@ -0,0 +1,7 @@ +# Sentinel Cluster Flow Control + +This is the default implementation of Sentinel cluster flow control. + +- `sentinel-cluster-common-default`: common module for cluster transport and functions +- `sentinel-cluster-client-default`: default cluster client module using Netty as underlying transport library +- `sentinel-cluster-server-default`: default cluster server module \ No newline at end of file diff --git a/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/DefaultClusterTokenClient.java b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/DefaultClusterTokenClient.java index d2bc0045..58779cec 100644 --- a/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/DefaultClusterTokenClient.java +++ b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/DefaultClusterTokenClient.java @@ -55,7 +55,6 @@ public class DefaultClusterTokenClient implements ClusterTokenClient { changeServer(clusterClientConfig); } }); - // TODO: check here, who should start the client? initNewConnection(); } @@ -90,7 +89,6 @@ public class DefaultClusterTokenClient implements ClusterTokenClient { return; } try { - // TODO: what if the client is pending init? if (transportClient != null) { transportClient.stop(); } @@ -108,12 +106,14 @@ public class DefaultClusterTokenClient implements ClusterTokenClient { if (shouldStart.get()) { if (transportClient != null) { transportClient.start(); + } else { + RecordLog.warn("[DefaultClusterTokenClient] Cannot start transport client: client not created"); } } } private void stopClientIfStarted() throws Exception { - if (shouldStart.get()) { + if (shouldStart.compareAndSet(true, false)) { if (transportClient != null) { transportClient.stop(); } @@ -129,9 +129,7 @@ public class DefaultClusterTokenClient implements ClusterTokenClient { @Override public void stop() throws Exception { - if (shouldStart.compareAndSet(true, false)) { - stopClientIfStarted(); - } + stopClientIfStarted(); } @Override diff --git a/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/config/ClusterClientConfigManager.java b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/config/ClusterClientConfigManager.java index 834bdcd7..ca2af3da 100644 --- a/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/config/ClusterClientConfigManager.java +++ b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/config/ClusterClientConfigManager.java @@ -104,6 +104,7 @@ public final class ClusterClientConfigManager { public static boolean isValidConfig(ClusterClientConfig config) { return config != null && StringUtil.isNotBlank(config.getServerHost()) && config.getServerPort() > 0 + && config.getServerPort() <= 65535 && config.getRequestTimeout() > 0; } diff --git a/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/handler/TokenClientHandler.java b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/handler/TokenClientHandler.java index 3d36b4d8..61b2e37c 100644 --- a/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/handler/TokenClientHandler.java +++ b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/handler/TokenClientHandler.java @@ -45,14 +45,13 @@ public class TokenClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { - currentState.compareAndSet(ClientConstants.CLIENT_STATUS_PENDING, ClientConstants.CLIENT_STATUS_STARTED); + currentState.set(ClientConstants.CLIENT_STATUS_STARTED); fireClientPing(ctx); RecordLog.info("[TokenClientHandler] Client handler active, remote address: " + ctx.channel().remoteAddress()); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - System.out.println(String.format("[%s] Client message recv: %s", System.currentTimeMillis(), msg)); // TODO: remove here if (msg instanceof ClusterResponse) { ClusterResponse response = (ClusterResponse) msg; @@ -96,7 +95,7 @@ public class TokenClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { RecordLog.info("[TokenClientHandler] Client channel unregistered, remote address: " + ctx.channel().remoteAddress()); - currentState.compareAndSet(ClientConstants.CLIENT_STATUS_STARTED, ClientConstants.CLIENT_STATUS_OFF); + currentState.set(ClientConstants.CLIENT_STATUS_OFF); disconnectCallback.run(); } diff --git a/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/command/handler/ModifyClusterClientConfigHandler.java b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/command/handler/ModifyClusterClientConfigHandler.java index 8614fb8f..73d00d5d 100644 --- a/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/command/handler/ModifyClusterClientConfigHandler.java +++ b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/command/handler/ModifyClusterClientConfigHandler.java @@ -46,7 +46,7 @@ public class ModifyClusterClientConfigHandler implements CommandHandler ClusterClientConfig clusterClientConfig = JSON.parseObject(data, ClusterClientConfig.class); ClusterClientConfigManager.applyNewConfig(clusterClientConfig); - return CommandResponse.ofSuccess("ok"); + return CommandResponse.ofSuccess("success"); } catch (Exception e) { RecordLog.warn("[ModifyClusterClientConfigHandler] Decode client cluster config error", e); return CommandResponse.ofFailure(e, "decode client cluster config error"); diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/rule/ClusterFlowRuleManager.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/rule/ClusterFlowRuleManager.java index 27b6853b..621ff6b5 100644 --- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/rule/ClusterFlowRuleManager.java +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/rule/ClusterFlowRuleManager.java @@ -15,6 +15,7 @@ */ package com.alibaba.csp.sentinel.cluster.flow.rule; +import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -163,6 +164,11 @@ public final class ClusterFlowRuleManager { } } + /** + * Remove cluster flow rule property for a specific namespace. + * + * @param namespace valid namespace + */ public static void removeProperty(String namespace) { AssertUtil.notEmpty(namespace, "namespace cannot be empty"); synchronized (UPDATE_LOCK) { @@ -189,6 +195,12 @@ public final class ClusterFlowRuleManager { } } + /** + * Get flow rule by rule ID. + * + * @param id rule ID + * @return flow rule + */ public static FlowRule getFlowRuleById(Long id) { if (!ClusterRuleUtil.validId(id)) { return null; @@ -196,10 +208,57 @@ public final class ClusterFlowRuleManager { return FLOW_RULES.get(id); } + public static List getAllFlowRules() { + return new ArrayList<>(FLOW_RULES.values()); + } + + /** + * Get all cluster flow rules within a specific namespace. + * + * @param namespace valid namespace + * @return cluster flow rules within the provided namespace + */ + public static List getFlowRules(String namespace) { + if (StringUtil.isEmpty(namespace)) { + return new ArrayList<>(); + } + List rules = new ArrayList<>(); + Set flowIdSet = NAMESPACE_FLOW_ID_MAP.get(namespace); + if (flowIdSet == null || flowIdSet.isEmpty()) { + return rules; + } + for (Long flowId : flowIdSet) { + FlowRule rule = FLOW_RULES.get(flowId); + if (rule != null) { + rules.add(rule); + } + } + return rules; + } + + /** + * Load flow rules for a specific namespace. The former rules of the namespace will be replaced. + * + * @param namespace a valid namespace + * @param rules rule list + */ + public static void loadRules(String namespace, List rules) { + AssertUtil.notEmpty(namespace, "namespace cannot be empty"); + NamespaceFlowProperty property = PROPERTY_MAP.get(namespace); + if (property != null) { + property.getProperty().updateValue(rules); + } + } + private static void resetNamespaceFlowIdMapFor(/*@Valid*/ String namespace) { NAMESPACE_FLOW_ID_MAP.put(namespace, new HashSet()); } + /** + * Clear all rules of the provided namespace and reset map. + * + * @param namespace valid namespace + */ private static void clearAndResetRulesFor(/*@Valid*/ String namespace) { Set flowIdSet = NAMESPACE_FLOW_ID_MAP.get(namespace); if (flowIdSet != null && !flowIdSet.isEmpty()) { diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/rule/ClusterParamFlowRuleManager.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/rule/ClusterParamFlowRuleManager.java index d6b255bc..c81393d5 100644 --- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/rule/ClusterParamFlowRuleManager.java +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/rule/ClusterParamFlowRuleManager.java @@ -15,6 +15,7 @@ */ package com.alibaba.csp.sentinel.cluster.flow.rule; +import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -217,6 +218,54 @@ public final class ClusterParamFlowRuleManager { return PARAM_RULES.get(id); } + public static List getAllParamRules() { + return new ArrayList<>(PARAM_RULES.values()); + } + + /** + * Get all cluster parameter flow rules within a specific namespace. + * + * @param namespace a valid namespace + * @return cluster parameter flow rules within the provided namespace + */ + public static List getParamRules(String namespace) { + if (StringUtil.isEmpty(namespace)) { + return new ArrayList<>(); + } + List rules = new ArrayList<>(); + Set flowIdSet = NAMESPACE_FLOW_ID_MAP.get(namespace); + if (flowIdSet == null || flowIdSet.isEmpty()) { + return rules; + } + for (Long flowId : flowIdSet) { + ParamFlowRule rule = PARAM_RULES.get(flowId); + if (rule != null) { + rules.add(rule); + } + } + return rules; + } + + /** + * Load parameter flow rules for a specific namespace. The former rules of the namespace will be replaced. + * + * @param namespace a valid namespace + * @param rules rule list + */ + public static void loadRules(String namespace, List rules) { + AssertUtil.notEmpty(namespace, "namespace cannot be empty"); + NamespaceFlowProperty property = PROPERTY_MAP.get(namespace); + if (property != null) { + property.getProperty().updateValue(rules); + } + } + + /** + * Get connected count for associated namespace of given {@code flowId}. + * + * @param flowId existing rule ID + * @return connected count + */ public static int getConnectedCount(long flowId) { if (flowId <= 0) { return 0; diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/SentinelDefaultTokenServer.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/SentinelDefaultTokenServer.java index c6a42db9..7aa478fe 100644 --- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/SentinelDefaultTokenServer.java +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/SentinelDefaultTokenServer.java @@ -79,7 +79,7 @@ public class SentinelDefaultTokenServer implements ClusterTokenServer { } try { if (server != null) { - stopServerIfStarted(); + stopServer(); } this.server = new NettyTransportServer(newPort); this.port = newPort; @@ -101,13 +101,11 @@ public class SentinelDefaultTokenServer implements ClusterTokenServer { } } - private void stopServerIfStarted() throws Exception { - if (shouldStart.get()) { - if (server != null) { - server.stop(); - if (embedded) { - handleEmbeddedStop(); - } + private void stopServer() throws Exception { + if (server != null) { + server.stop(); + if (embedded) { + handleEmbeddedStop(); } } } @@ -136,7 +134,7 @@ public class SentinelDefaultTokenServer implements ClusterTokenServer { @Override public void stop() throws Exception { if (shouldStart.compareAndSet(true, false)) { - stopServerIfStarted(); + stopServer(); } } } diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/command/handler/FetchClusterFlowRulesCommandHandler.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/command/handler/FetchClusterFlowRulesCommandHandler.java new file mode 100644 index 00000000..359dfc18 --- /dev/null +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/command/handler/FetchClusterFlowRulesCommandHandler.java @@ -0,0 +1,42 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.cluster.server.command.handler; + +import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterFlowRuleManager; +import com.alibaba.csp.sentinel.command.CommandHandler; +import com.alibaba.csp.sentinel.command.CommandRequest; +import com.alibaba.csp.sentinel.command.CommandResponse; +import com.alibaba.csp.sentinel.command.annotation.CommandMapping; +import com.alibaba.csp.sentinel.util.StringUtil; +import com.alibaba.fastjson.JSON; + +/** + * @author Eric Zhao + * @since 1.4.0 + */ +@CommandMapping(name = "cluster/server/flowRules") +public class FetchClusterFlowRulesCommandHandler implements CommandHandler { + + @Override + public CommandResponse handle(CommandRequest request) { + String namespace = request.getParam("namespace"); + if (StringUtil.isEmpty(namespace)) { + return CommandResponse.ofSuccess(JSON.toJSONString(ClusterFlowRuleManager.getAllFlowRules())); + } else { + return CommandResponse.ofSuccess(JSON.toJSONString(ClusterFlowRuleManager.getFlowRules(namespace))); + } + } +} diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/command/handler/FetchClusterParamFlowRulesCommandHandler.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/command/handler/FetchClusterParamFlowRulesCommandHandler.java new file mode 100644 index 00000000..5f490411 --- /dev/null +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/command/handler/FetchClusterParamFlowRulesCommandHandler.java @@ -0,0 +1,42 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.cluster.server.command.handler; + +import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterParamFlowRuleManager; +import com.alibaba.csp.sentinel.command.CommandHandler; +import com.alibaba.csp.sentinel.command.CommandRequest; +import com.alibaba.csp.sentinel.command.CommandResponse; +import com.alibaba.csp.sentinel.command.annotation.CommandMapping; +import com.alibaba.csp.sentinel.util.StringUtil; +import com.alibaba.fastjson.JSON; + +/** + * @author Eric Zhao + * @since 1.4.0 + */ +@CommandMapping(name = "cluster/server/paramRules") +public class FetchClusterParamFlowRulesCommandHandler implements CommandHandler { + + @Override + public CommandResponse handle(CommandRequest request) { + String namespace = request.getParam("namespace"); + if (StringUtil.isEmpty(namespace)) { + return CommandResponse.ofSuccess(JSON.toJSONString(ClusterParamFlowRuleManager.getAllParamRules())); + } else { + return CommandResponse.ofSuccess(JSON.toJSONString(ClusterParamFlowRuleManager.getParamRules(namespace))); + } + } +} diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/command/handler/FetchClusterServerConfigHandler.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/command/handler/FetchClusterServerConfigHandler.java new file mode 100644 index 00000000..72b94640 --- /dev/null +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/command/handler/FetchClusterServerConfigHandler.java @@ -0,0 +1,71 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.cluster.server.command.handler; + +import com.alibaba.csp.sentinel.cluster.server.config.ClusterServerConfigManager; +import com.alibaba.csp.sentinel.cluster.server.config.ServerFlowConfig; +import com.alibaba.csp.sentinel.cluster.server.config.ServerTransportConfig; +import com.alibaba.csp.sentinel.command.CommandHandler; +import com.alibaba.csp.sentinel.command.CommandRequest; +import com.alibaba.csp.sentinel.command.CommandResponse; +import com.alibaba.csp.sentinel.command.annotation.CommandMapping; +import com.alibaba.csp.sentinel.util.StringUtil; +import com.alibaba.fastjson.JSONObject; + +/** + * @author Eric Zhao + * @since 1.4.0 + */ +@CommandMapping(name = "cluster/server/fetchConfig") +public class FetchClusterServerConfigHandler implements CommandHandler { + + @Override + public CommandResponse handle(CommandRequest request) { + String namespace = request.getParam("namespace"); + if (StringUtil.isEmpty(namespace)) { + return globalConfigResult(); + } + return namespaceConfigResult(namespace); + } + + private CommandResponse namespaceConfigResult(/*@NonEmpty*/ String namespace) { + ServerFlowConfig flowConfig = new ServerFlowConfig() + .setExceedCount(ClusterServerConfigManager.getExceedCount(namespace)) + .setMaxOccupyRatio(ClusterServerConfigManager.getMaxOccupyRatio(namespace)) + .setIntervalMs(ClusterServerConfigManager.getIntervalMs(namespace)) + .setSampleCount(ClusterServerConfigManager.getSampleCount(namespace)); + JSONObject config = new JSONObject() + .fluentPut("flow", flowConfig); + return CommandResponse.ofSuccess(config.toJSONString()); + } + + private CommandResponse globalConfigResult() { + ServerTransportConfig transportConfig = new ServerTransportConfig() + .setPort(ClusterServerConfigManager.getPort()) + .setIdleSeconds(ClusterServerConfigManager.getIdleSeconds()); + ServerFlowConfig flowConfig = new ServerFlowConfig() + .setExceedCount(ClusterServerConfigManager.getExceedCount()) + .setMaxOccupyRatio(ClusterServerConfigManager.getMaxOccupyRatio()) + .setIntervalMs(ClusterServerConfigManager.getIntervalMs()) + .setSampleCount(ClusterServerConfigManager.getSampleCount()); + JSONObject config = new JSONObject() + .fluentPut("transport", transportConfig) + .fluentPut("flow", flowConfig) + .fluentPut("namespaceSet", ClusterServerConfigManager.getNamespaceSet()); + return CommandResponse.ofSuccess(config.toJSONString()); + } +} + diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/command/handler/FetchClusterServerInfoCommandHandler.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/command/handler/FetchClusterServerInfoCommandHandler.java new file mode 100644 index 00000000..4996be25 --- /dev/null +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/command/handler/FetchClusterServerInfoCommandHandler.java @@ -0,0 +1,69 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.cluster.server.command.handler; + +import java.util.Set; + +import com.alibaba.csp.sentinel.cluster.server.config.ClusterServerConfigManager; +import com.alibaba.csp.sentinel.cluster.server.config.ServerFlowConfig; +import com.alibaba.csp.sentinel.cluster.server.config.ServerTransportConfig; +import com.alibaba.csp.sentinel.cluster.server.connection.ConnectionGroup; +import com.alibaba.csp.sentinel.cluster.server.connection.ConnectionManager; +import com.alibaba.csp.sentinel.command.CommandHandler; +import com.alibaba.csp.sentinel.command.CommandRequest; +import com.alibaba.csp.sentinel.command.CommandResponse; +import com.alibaba.csp.sentinel.command.annotation.CommandMapping; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; + +/** + * @author Eric Zhao + * @since 1.4.0 + */ +@CommandMapping(name = "cluster/server/info") +public class FetchClusterServerInfoCommandHandler implements CommandHandler { + + @Override + public CommandResponse handle(CommandRequest request) { + JSONObject info = new JSONObject(); + JSONArray connectionGroups = new JSONArray(); + Set namespaceSet = ClusterServerConfigManager.getNamespaceSet(); + for (String namespace : namespaceSet) { + ConnectionGroup group = ConnectionManager.getConnectionGroup(namespace); + if (group != null) { + connectionGroups.add(group); + } + } + + ServerTransportConfig transportConfig = new ServerTransportConfig() + .setPort(ClusterServerConfigManager.getPort()) + .setIdleSeconds(ClusterServerConfigManager.getIdleSeconds()); + ServerFlowConfig flowConfig = new ServerFlowConfig() + .setExceedCount(ClusterServerConfigManager.getExceedCount()) + .setMaxOccupyRatio(ClusterServerConfigManager.getMaxOccupyRatio()) + .setIntervalMs(ClusterServerConfigManager.getIntervalMs()) + .setSampleCount(ClusterServerConfigManager.getSampleCount()); + + info.fluentPut("port", ClusterServerConfigManager.getPort()) + .fluentPut("connection", connectionGroups) + .fluentPut("transport", transportConfig) + .fluentPut("flow", flowConfig) + .fluentPut("namespaceSet", ClusterServerConfigManager.getNamespaceSet()); + + return CommandResponse.ofSuccess(info.toJSONString()); + } +} + diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/command/handler/ModifyClusterFlowRulesCommandHandler.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/command/handler/ModifyClusterFlowRulesCommandHandler.java new file mode 100644 index 00000000..08882c09 --- /dev/null +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/command/handler/ModifyClusterFlowRulesCommandHandler.java @@ -0,0 +1,63 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.cluster.server.command.handler; + +import java.net.URLDecoder; +import java.util.List; + +import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterFlowRuleManager; +import com.alibaba.csp.sentinel.command.CommandHandler; +import com.alibaba.csp.sentinel.command.CommandRequest; +import com.alibaba.csp.sentinel.command.CommandResponse; +import com.alibaba.csp.sentinel.command.annotation.CommandMapping; +import com.alibaba.csp.sentinel.log.RecordLog; +import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; +import com.alibaba.csp.sentinel.util.StringUtil; +import com.alibaba.fastjson.JSONArray; + +/** + * @author Eric Zhao + * @since 1.4.0 + */ +@CommandMapping(name = "cluster/server/modifyFlowRules") +public class ModifyClusterFlowRulesCommandHandler implements CommandHandler { + + @Override + public CommandResponse handle(CommandRequest request) { + String namespace = request.getParam("namespace"); + if (StringUtil.isEmpty(namespace)) { + return CommandResponse.ofFailure(new IllegalArgumentException("empty namespace")); + } + String data = request.getParam("data"); + if (StringUtil.isBlank(data)) { + return CommandResponse.ofFailure(new IllegalArgumentException("empty data")); + } + try { + data = URLDecoder.decode(data, "UTF-8"); + RecordLog.info("[ModifyClusterFlowRulesCommandHandler] Receiving cluster flow rules for namespace <{0}>: {1}", namespace, data); + + List flowRules = JSONArray.parseArray(data, FlowRule.class); + ClusterFlowRuleManager.loadRules(namespace, flowRules); + + return CommandResponse.ofSuccess(SUCCESS); + } catch (Exception e) { + RecordLog.warn("[ModifyClusterFlowRulesCommandHandler] Decode cluster flow rules error", e); + return CommandResponse.ofFailure(e, "decode cluster flow rules error"); + } + } + + private static final String SUCCESS = "success"; +} diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/command/handler/ModifyClusterParamFlowRulesCommandHandler.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/command/handler/ModifyClusterParamFlowRulesCommandHandler.java new file mode 100644 index 00000000..b4632173 --- /dev/null +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/command/handler/ModifyClusterParamFlowRulesCommandHandler.java @@ -0,0 +1,63 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.cluster.server.command.handler; + +import java.net.URLDecoder; +import java.util.List; + +import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterParamFlowRuleManager; +import com.alibaba.csp.sentinel.command.CommandHandler; +import com.alibaba.csp.sentinel.command.CommandRequest; +import com.alibaba.csp.sentinel.command.CommandResponse; +import com.alibaba.csp.sentinel.command.annotation.CommandMapping; +import com.alibaba.csp.sentinel.log.RecordLog; +import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowRule; +import com.alibaba.csp.sentinel.util.StringUtil; +import com.alibaba.fastjson.JSONArray; + +/** + * @author Eric Zhao + * @since 1.4.0 + */ +@CommandMapping(name = "cluster/server/modifyParamRules") +public class ModifyClusterParamFlowRulesCommandHandler implements CommandHandler { + + @Override + public CommandResponse handle(CommandRequest request) { + String namespace = request.getParam("namespace"); + if (StringUtil.isEmpty(namespace)) { + return CommandResponse.ofFailure(new IllegalArgumentException("empty namespace")); + } + String data = request.getParam("data"); + if (StringUtil.isBlank(data)) { + return CommandResponse.ofFailure(new IllegalArgumentException("empty data")); + } + try { + data = URLDecoder.decode(data, "UTF-8"); + RecordLog.info("[ModifyClusterParamFlowRulesCommandHandler] Receiving cluster param rules for namespace <{0}>: {1}", namespace, data); + + List flowRules = JSONArray.parseArray(data, ParamFlowRule.class); + ClusterParamFlowRuleManager.loadRules(namespace, flowRules); + + return CommandResponse.ofSuccess(SUCCESS); + } catch (Exception e) { + RecordLog.warn("[ModifyClusterParamFlowRulesCommandHandler] Decode cluster param rules error", e); + return CommandResponse.ofFailure(e, "decode cluster param rules error"); + } + } + + private static final String SUCCESS = "success"; +} diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/command/handler/ModifyClusterServerFlowConfigHandler.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/command/handler/ModifyClusterServerFlowConfigHandler.java new file mode 100644 index 00000000..9e444666 --- /dev/null +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/command/handler/ModifyClusterServerFlowConfigHandler.java @@ -0,0 +1,64 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.cluster.server.command.handler; + +import java.net.URLDecoder; + +import com.alibaba.csp.sentinel.cluster.server.config.ClusterServerConfigManager; +import com.alibaba.csp.sentinel.cluster.server.config.ServerFlowConfig; +import com.alibaba.csp.sentinel.cluster.server.config.ServerTransportConfig; +import com.alibaba.csp.sentinel.command.CommandHandler; +import com.alibaba.csp.sentinel.command.CommandRequest; +import com.alibaba.csp.sentinel.command.CommandResponse; +import com.alibaba.csp.sentinel.command.annotation.CommandMapping; +import com.alibaba.csp.sentinel.log.RecordLog; +import com.alibaba.csp.sentinel.util.StringUtil; +import com.alibaba.fastjson.JSON; + +/** + * @author Eric Zhao + * @since 1.4.0 + */ +@CommandMapping(name = "cluster/server/modifyFlowConfig") +public class ModifyClusterServerFlowConfigHandler implements CommandHandler { + + @Override + public CommandResponse handle(CommandRequest request) { + String data = request.getParam("data"); + if (StringUtil.isBlank(data)) { + return CommandResponse.ofFailure(new IllegalArgumentException("empty data")); + } + String namespace = request.getParam("namespace"); + try { + data = URLDecoder.decode(data, "utf-8"); + + if (StringUtil.isEmpty(namespace)) { + RecordLog.info("[ModifyClusterServerFlowConfigHandler] Receiving cluster server global flow config: " + data); + ServerFlowConfig config = JSON.parseObject(data, ServerFlowConfig.class); + ClusterServerConfigManager.loadGlobalFlowConfig(config); + } else { + RecordLog.info("[ModifyClusterServerFlowConfigHandler] Receiving cluster server flow config for namespace <{0}>: {1}", namespace, data); + ServerFlowConfig config = JSON.parseObject(data, ServerFlowConfig.class); + ClusterServerConfigManager.loadFlowConfig(namespace, config); + } + + return CommandResponse.ofSuccess("success"); + } catch (Exception e) { + RecordLog.warn("[ModifyClusterServerFlowConfigHandler] Decode cluster server flow config error", e); + return CommandResponse.ofFailure(e, "decode cluster server flow config error"); + } + } +} diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/command/handler/ModifyClusterServerTransportConfigHandler.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/command/handler/ModifyClusterServerTransportConfigHandler.java new file mode 100644 index 00000000..f1e267db --- /dev/null +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/command/handler/ModifyClusterServerTransportConfigHandler.java @@ -0,0 +1,56 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.cluster.server.command.handler; + +import com.alibaba.csp.sentinel.cluster.server.config.ClusterServerConfigManager; +import com.alibaba.csp.sentinel.cluster.server.config.ServerTransportConfig; +import com.alibaba.csp.sentinel.command.CommandHandler; +import com.alibaba.csp.sentinel.command.CommandRequest; +import com.alibaba.csp.sentinel.command.CommandResponse; +import com.alibaba.csp.sentinel.command.annotation.CommandMapping; +import com.alibaba.csp.sentinel.util.StringUtil; + +/** + * @author Eric Zhao + * @since 1.4.0 + */ +@CommandMapping(name = "cluster/server/modifyTransportConfig") +public class ModifyClusterServerTransportConfigHandler implements CommandHandler { + + @Override + public CommandResponse handle(CommandRequest request) { + String portValue = request.getParam("port"); + if (StringUtil.isBlank(portValue)) { + return CommandResponse.ofFailure(new IllegalArgumentException("invalid empty port")); + } + String idleSecondsValue = request.getParam("idleSeconds"); + if (StringUtil.isBlank(idleSecondsValue)) { + return CommandResponse.ofFailure(new IllegalArgumentException("invalid empty idleSeconds")); + } + try { + int port = Integer.valueOf(portValue); + int idleSeconds = Integer.valueOf(idleSecondsValue); + + ClusterServerConfigManager.loadGlobalTransportConfig(new ServerTransportConfig() + .setPort(port).setIdleSeconds(idleSeconds)); + return CommandResponse.ofSuccess("success"); + } catch (NumberFormatException e) { + return CommandResponse.ofFailure(new IllegalArgumentException("invalid parameter")); + } catch (Exception ex) { + return CommandResponse.ofFailure(new IllegalArgumentException("unexpected error")); + } + } +} diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/command/handler/ModifyServerNamespaceSetHandler.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/command/handler/ModifyServerNamespaceSetHandler.java new file mode 100644 index 00000000..4a78600f --- /dev/null +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/command/handler/ModifyServerNamespaceSetHandler.java @@ -0,0 +1,56 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.cluster.server.command.handler; + +import java.net.URLDecoder; +import java.util.Set; + +import com.alibaba.csp.sentinel.cluster.server.config.ClusterServerConfigManager; +import com.alibaba.csp.sentinel.cluster.server.config.ServerTransportConfig; +import com.alibaba.csp.sentinel.command.CommandHandler; +import com.alibaba.csp.sentinel.command.CommandRequest; +import com.alibaba.csp.sentinel.command.CommandResponse; +import com.alibaba.csp.sentinel.command.annotation.CommandMapping; +import com.alibaba.csp.sentinel.log.RecordLog; +import com.alibaba.csp.sentinel.util.StringUtil; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.TypeReference; + +/** + * @author Eric Zhao + * @since 1.4.0 + */ +@CommandMapping(name = "cluster/server/modifyNamespaceSet") +public class ModifyServerNamespaceSetHandler implements CommandHandler { + + @Override + public CommandResponse handle(CommandRequest request) { + String data = request.getParam("data"); + if (StringUtil.isBlank(data)) { + return CommandResponse.ofFailure(new IllegalArgumentException("empty data")); + } + try { + data = URLDecoder.decode(data, "utf-8"); + RecordLog.info("[ModifyServerNamespaceSetHandler] Receiving cluster server namespace set: " + data); + Set set = JSON.parseObject(data, new TypeReference>() {}); + ClusterServerConfigManager.loadServerNamespaceSet(set); + return CommandResponse.ofSuccess("success"); + } catch (Exception e) { + RecordLog.warn("[ModifyServerNamespaceSetHandler] Decode cluster server namespace set error", e); + return CommandResponse.ofFailure(e, "decode client cluster config error"); + } + } +} diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/config/ClusterServerConfigManager.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/config/ClusterServerConfigManager.java index 4054d824..92c8319a 100644 --- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/config/ClusterServerConfigManager.java +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/config/ClusterServerConfigManager.java @@ -17,6 +17,7 @@ package com.alibaba.csp.sentinel.cluster.server.config; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -110,6 +111,35 @@ public final class ClusterServerConfigManager { } } + public static void loadServerNamespaceSet(Set namespaceSet) { + namespaceSetProperty.updateValue(namespaceSet); + } + + public static void loadGlobalTransportConfig(ServerTransportConfig config) { + transportConfigProperty.updateValue(config); + } + + public static void loadGlobalFlowConfig(ServerFlowConfig config) { + globalFlowProperty.updateValue(config); + } + + /** + * Load server flow config for a specific namespace. + * + * @param namespace a valid namespace + * @param config valid flow config for the namespace + */ + public static void loadFlowConfig(String namespace, ServerFlowConfig config) { + AssertUtil.notEmpty(namespace, "namespace cannot be empty"); + // TODO: Support namespace-scope server flow config. + globalFlowProperty.updateValue(config); + } + + public static void addTransportConfigChangeObserver(ServerTransportConfigObserver observer) { + AssertUtil.notNull(observer, "observer cannot be null"); + TRANSPORT_CONFIG_OBSERVERS.add(observer); + } + private static class ServerNamespaceSetPropertyListener implements PropertyListener> { @Override @@ -137,6 +167,8 @@ public final class ClusterServerConfigManager { ClusterServerConfigManager.namespaceSet = Collections.singleton(ServerConstants.DEFAULT_NAMESPACE); return; } + + newSet = new HashSet<>(newSet); newSet.add(ServerConstants.DEFAULT_NAMESPACE); Set oldSet = ClusterServerConfigManager.namespaceSet; @@ -186,6 +218,19 @@ public final class ClusterServerConfigManager { } } + private static void updateTokenServer(ServerTransportConfig config) { + int newPort = config.getPort(); + AssertUtil.isTrue(newPort > 0, "token server port should be valid (positive)"); + if (newPort == port) { + return; + } + ClusterServerConfigManager.port = newPort; + + for (ServerTransportConfigObserver observer : TRANSPORT_CONFIG_OBSERVERS) { + observer.onTransportConfigChange(config); + } + } + private static class ServerGlobalFlowPropertyListener implements PropertyListener { @Override @@ -197,62 +242,44 @@ public final class ClusterServerConfigManager { public void configLoad(ServerFlowConfig config) { applyGlobalFlowConfig(config); } - } - private static synchronized void applyGlobalFlowConfig(ServerFlowConfig config) { - if (!isValidFlowConfig(config)) { - RecordLog.warn( - "[ClusterServerConfigManager] Invalid cluster server global flow config, ignoring: " + config); - return; - } - RecordLog.info("[ClusterServerConfigManager] Updating new server global flow config: " + config); - if (config.getExceedCount() != exceedCount) { - exceedCount = config.getExceedCount(); - } - if (config.getMaxOccupyRatio() != maxOccupyRatio) { - maxOccupyRatio = config.getMaxOccupyRatio(); - } - int newIntervalMs = config.getIntervalMs(); - int newSampleCount = config.getSampleCount(); - if (newIntervalMs != intervalMs || newSampleCount != sampleCount) { - if (newIntervalMs <= 0 || newSampleCount <= 0 || newIntervalMs % newSampleCount != 0) { - RecordLog.warn("[ClusterServerConfigManager] Ignoring invalid flow interval or sample count"); - } else { - intervalMs = newIntervalMs; - sampleCount = newSampleCount; - // Reset all the metrics. - ClusterMetricStatistics.resetFlowMetrics(); - ClusterParamMetricStatistics.resetFlowMetrics(); + private synchronized void applyGlobalFlowConfig(ServerFlowConfig config) { + if (!isValidFlowConfig(config)) { + RecordLog.warn( + "[ClusterServerConfigManager] Invalid cluster server global flow config, ignoring: " + config); + return; + } + RecordLog.info("[ClusterServerConfigManager] Updating new server global flow config: " + config); + if (config.getExceedCount() != exceedCount) { + exceedCount = config.getExceedCount(); + } + if (config.getMaxOccupyRatio() != maxOccupyRatio) { + maxOccupyRatio = config.getMaxOccupyRatio(); + } + int newIntervalMs = config.getIntervalMs(); + int newSampleCount = config.getSampleCount(); + if (newIntervalMs != intervalMs || newSampleCount != sampleCount) { + if (newIntervalMs <= 0 || newSampleCount <= 0 || newIntervalMs % newSampleCount != 0) { + RecordLog.warn("[ClusterServerConfigManager] Ignoring invalid flow interval or sample count"); + } else { + intervalMs = newIntervalMs; + sampleCount = newSampleCount; + // Reset all the metrics. + ClusterMetricStatistics.resetFlowMetrics(); + ClusterParamMetricStatistics.resetFlowMetrics(); + } } - } - } - - public static void updateTokenServer(ServerTransportConfig config) { - int newPort = config.getPort(); - AssertUtil.isTrue(newPort > 0, "token server port should be valid (positive)"); - if (newPort == port) { - return; - } - ClusterServerConfigManager.port = newPort; - - for (ServerTransportConfigObserver observer : TRANSPORT_CONFIG_OBSERVERS) { - observer.onTransportConfigChange(config); } } public static boolean isValidTransportConfig(ServerTransportConfig config) { - return config != null && config.getPort() > 0; + return config != null && config.getPort() > 0 && config.getPort() <= 65535; } public static boolean isValidFlowConfig(ServerFlowConfig config) { return config != null && config.getMaxOccupyRatio() >= 0 && config.getExceedCount() >= 0; } - public static void addTransportConfigChangeObserver(ServerTransportConfigObserver observer) { - AssertUtil.notNull(observer, "observer cannot be null"); - TRANSPORT_CONFIG_OBSERVERS.add(observer); - } - public static double getExceedCount(String namespace) { AssertUtil.notEmpty(namespace, "namespace cannot be empty"); ServerFlowConfig config = NAMESPACE_CONF.get(namespace); diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/ConnectionManager.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/ConnectionManager.java index 8ce244fe..5bbbaeaa 100644 --- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/ConnectionManager.java +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/ConnectionManager.java @@ -100,6 +100,12 @@ public final class ConnectionManager { return group; } + public static ConnectionGroup getConnectionGroup(String namespace) { + AssertUtil.assertNotBlank(namespace, "namespace should not be empty"); + ConnectionGroup group = getOrCreateGroup(namespace); + return group; + } + private static final Object CREATE_LOCK = new Object(); private ConnectionManager() {} diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/handler/TokenServerHandler.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/handler/TokenServerHandler.java index 69d113cb..b230f30c 100644 --- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/handler/TokenServerHandler.java +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/handler/TokenServerHandler.java @@ -46,13 +46,11 @@ public class TokenServerHandler extends ChannelInboundHandlerAdapter { public void channelActive(ChannelHandlerContext ctx) throws Exception { globalConnectionPool.createConnection(ctx.channel()); String remoteAddress = getRemoteAddress(ctx); - System.out.println("[TokenServerHandler] Connection established, remote client address: " + remoteAddress); //TODO: DEBUG } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { String remoteAddress = getRemoteAddress(ctx); - System.out.println("[TokenServerHandler] Connection inactive, remote client address: " + remoteAddress); //TODO: DEBUG globalConnectionPool.remove(ctx.channel()); ConnectionManager.removeConnection(remoteAddress); } @@ -61,7 +59,6 @@ public class TokenServerHandler extends ChannelInboundHandlerAdapter { @SuppressWarnings("unchecked") public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { globalConnectionPool.refreshLastReadTime(ctx.channel()); - System.out.println(String.format("[%s] Server message recv: %s", System.currentTimeMillis(), msg)); //TODO: DEBUG if (msg instanceof ClusterRequest) { ClusterRequest request = (ClusterRequest)msg; @@ -105,8 +102,6 @@ public class TokenServerHandler extends ChannelInboundHandlerAdapter { int status = ClusterConstants.RESPONSE_STATUS_OK; ClusterResponse response = new ClusterResponse<>(request.getId(), request.getType(), status, curCount); writeResponse(ctx, response); - - RecordLog.info("[TokenServerHandler] Client <{0}> registered with namespace <{1}>", clientAddress, namespace); } private String getRemoteAddress(ChannelHandlerContext ctx) { diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.command.CommandHandler b/sentinel-cluster/sentinel-cluster-server-default/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.command.CommandHandler new file mode 100755 index 00000000..5fe1f5b8 --- /dev/null +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.command.CommandHandler @@ -0,0 +1,9 @@ +com.alibaba.csp.sentinel.cluster.server.command.handler.ModifyClusterServerFlowConfigHandler +com.alibaba.csp.sentinel.cluster.server.command.handler.FetchClusterFlowRulesCommandHandler +com.alibaba.csp.sentinel.cluster.server.command.handler.FetchClusterParamFlowRulesCommandHandler +com.alibaba.csp.sentinel.cluster.server.command.handler.FetchClusterServerConfigHandler +com.alibaba.csp.sentinel.cluster.server.command.handler.ModifyClusterServerTransportConfigHandler +com.alibaba.csp.sentinel.cluster.server.command.handler.ModifyServerNamespaceSetHandler +com.alibaba.csp.sentinel.cluster.server.command.handler.ModifyClusterFlowRulesCommandHandler +com.alibaba.csp.sentinel.cluster.server.command.handler.ModifyClusterParamFlowRulesCommandHandler +com.alibaba.csp.sentinel.cluster.server.command.handler.FetchClusterServerInfoCommandHandler \ No newline at end of file diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/test/java/com/alibaba/csp/sentinel/cluster/flow/ClusterFlowCheckerTest.java b/sentinel-cluster/sentinel-cluster-server-default/src/test/java/com/alibaba/csp/sentinel/cluster/flow/ClusterFlowCheckerTest.java index 2b062e69..970e9fca 100644 --- a/sentinel-cluster/sentinel-cluster-server-default/src/test/java/com/alibaba/csp/sentinel/cluster/flow/ClusterFlowCheckerTest.java +++ b/sentinel-cluster/sentinel-cluster-server-default/src/test/java/com/alibaba/csp/sentinel/cluster/flow/ClusterFlowCheckerTest.java @@ -32,10 +32,9 @@ import static com.alibaba.csp.sentinel.cluster.ClusterFlowTestUtil.*; * @author Eric Zhao * @since 1.4.0 */ -@Ignore public class ClusterFlowCheckerTest { - @Test + //@Test public void testAcquireClusterTokenOccupyPass() { long flowId = 98765L; final int threshold = 5; diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/cluster/ClusterStateManager.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/cluster/ClusterStateManager.java index 2a27c745..4a4b1334 100644 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/cluster/ClusterStateManager.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/cluster/ClusterStateManager.java @@ -84,6 +84,10 @@ public final class ClusterStateManager { mode = CLUSTER_CLIENT; sleepIfNeeded(); lastModified = TimeUtil.currentTimeMillis(); + return startClient(); + } + + private static boolean startClient() { try { EmbeddedClusterTokenServer server = EmbeddedClusterTokenServerProvider.getServer(); if (server != null) { @@ -104,6 +108,23 @@ public final class ClusterStateManager { } } + private static boolean stopClient() { + try { + ClusterTokenClient tokenClient = TokenClientProvider.getClient(); + if (tokenClient != null) { + tokenClient.stop(); + RecordLog.info("[ClusterStateManager] Stopping the cluster token client"); + return true; + } else { + RecordLog.warn("[ClusterStateManager] Cannot stop cluster token client (no server SPI found)"); + return false; + } + } catch (Exception ex) { + RecordLog.warn("[ClusterStateManager] Error when stopping cluster token client", ex); + return false; + } + } + /** *

* Set current mode to server mode. If Sentinel currently works in client mode, @@ -117,6 +138,10 @@ public final class ClusterStateManager { mode = CLUSTER_SERVER; sleepIfNeeded(); lastModified = TimeUtil.currentTimeMillis(); + return startServer(); + } + + private static boolean startServer() { try { ClusterTokenClient tokenClient = TokenClientProvider.getClient(); if (tokenClient != null) { @@ -137,6 +162,23 @@ public final class ClusterStateManager { } } + private static boolean stopServer() { + try { + EmbeddedClusterTokenServer server = EmbeddedClusterTokenServerProvider.getServer(); + if (server != null) { + server.stop(); + RecordLog.info("[ClusterStateManager] Stopping the cluster server"); + return true; + } else { + RecordLog.warn("[ClusterStateManager] Cannot stop server (no server SPI found)"); + return false; + } + } catch (Exception ex) { + RecordLog.warn("[ClusterStateManager] Error when stopping server", ex); + return false; + } + } + /** * The interval between two change operations should be greater than {@code MIN_INTERVAL} (by default 10s). * Or we need to wait for a while. @@ -163,12 +205,12 @@ public final class ClusterStateManager { private static class ClusterStatePropertyListener implements PropertyListener { @Override - public void configLoad(Integer value) { + public synchronized void configLoad(Integer value) { applyState(value); } @Override - public void configUpdate(Integer value) { + public synchronized void configUpdate(Integer value) { applyState(value); } } @@ -177,6 +219,9 @@ public final class ClusterStateManager { if (state == null || state < 0) { return false; } + if (state == mode) { + return true; + } synchronized (UPDATE_LOCK) { switch (state) { case CLUSTER_CLIENT: @@ -190,5 +235,5 @@ public final class ClusterStateManager { } } - private static final int MIN_INTERVAL = 10 * 1000; + private static final int MIN_INTERVAL = 5 * 1000; } diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/TrafficShapingController.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/TrafficShapingController.java index bcba4a18..6c7a60a2 100755 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/TrafficShapingController.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/TrafficShapingController.java @@ -24,6 +24,16 @@ import com.alibaba.csp.sentinel.node.Node; */ public interface TrafficShapingController { + /** + * Check whether given resource entry can pass with provided count. + * + * @param node resource node + * @param acquireCount count to acquire + * @param prioritized whether the request is prioritized + * @return true if the resource entry can pass; false if it should be blocked + */ + boolean canPass(Node node, int acquireCount, boolean prioritized); + /** * Check whether given resource entry can pass with provided count. * diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/DefaultController.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/DefaultController.java index 4dc8368f..e3289cd7 100755 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/DefaultController.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/DefaultController.java @@ -36,6 +36,11 @@ public class DefaultController implements TrafficShapingController { @Override public boolean canPass(Node node, int acquireCount) { + return canPass(node, acquireCount, false); + } + + @Override + public boolean canPass(Node node, int acquireCount, boolean prioritized) { int curCount = avgUsedTokens(node); if (curCount + acquireCount > count) { return false; @@ -51,4 +56,11 @@ public class DefaultController implements TrafficShapingController { return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)node.passQps(); } + private void sleep(int timeMillis) { + try { + Thread.sleep(timeMillis); + } catch (InterruptedException e) { + // Ignore. + } + } } diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/RateLimiterController.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/RateLimiterController.java index f1dc74af..e17e53cf 100755 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/RateLimiterController.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/RateLimiterController.java @@ -39,6 +39,11 @@ public class RateLimiterController implements TrafficShapingController { @Override public boolean canPass(Node node, int acquireCount) { + return canPass(node, acquireCount, false); + } + + @Override + public boolean canPass(Node node, int acquireCount, boolean prioritized) { long currentTime = TimeUtil.currentTimeMillis(); // Calculate the interval between every two requests. long costTime = Math.round(1.0 * (acquireCount) / count * 1000); diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/WarmUpController.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/WarmUpController.java index 08c71166..2cc99a15 100755 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/WarmUpController.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/WarmUpController.java @@ -107,6 +107,11 @@ public class WarmUpController implements TrafficShapingController { @Override public boolean canPass(Node node, int acquireCount) { + return canPass(node, acquireCount, false); + } + + @Override + public boolean canPass(Node node, int acquireCount, boolean prioritized) { long passQps = node.passQps(); long previousQps = node.previousPassQps(); diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/WarmUpRateLimiterController.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/WarmUpRateLimiterController.java index 8c2c20dd..7080f4c4 100644 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/WarmUpRateLimiterController.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/WarmUpRateLimiterController.java @@ -39,6 +39,11 @@ public class WarmUpRateLimiterController extends WarmUpController { @Override public boolean canPass(Node node, int acquireCount) { + return canPass(node, acquireCount, false); + } + + @Override + public boolean canPass(Node node, int acquireCount, boolean prioritized) { long previousQps = node.previousPassQps(); syncToken(previousQps); diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/data/MetricBucket.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/data/MetricBucket.java index 0e96a28c..5648cb0f 100755 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/data/MetricBucket.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/data/MetricBucket.java @@ -16,6 +16,7 @@ package com.alibaba.csp.sentinel.slots.statistic.data; import com.alibaba.csp.sentinel.Constants; +import com.alibaba.csp.sentinel.slots.statistic.MetricEvent; import com.alibaba.csp.sentinel.slots.statistic.base.LongAdder; /** @@ -26,15 +27,16 @@ import com.alibaba.csp.sentinel.slots.statistic.base.LongAdder; */ public class MetricBucket { - private final LongAdder pass = new LongAdder(); - private final LongAdder block = new LongAdder(); - private final LongAdder exception = new LongAdder(); - private final LongAdder rt = new LongAdder(); - private final LongAdder success = new LongAdder(); + private final LongAdder[] counters; private volatile long minRt; public MetricBucket() { + MetricEvent[] events = MetricEvent.values(); + this.counters = new LongAdder[events.length]; + for (MetricEvent event : events) { + counters[event.ordinal()] = new LongAdder(); + } initMinRt(); } @@ -48,29 +50,36 @@ public class MetricBucket { * @return new metric bucket in initial state */ public MetricBucket reset() { - pass.reset(); - block.reset(); - exception.reset(); - rt.reset(); - success.reset(); + for (MetricEvent event : MetricEvent.values()) { + counters[event.ordinal()].reset(); + } initMinRt(); return this; } + public long get(MetricEvent event) { + return counters[event.ordinal()].sum(); + } + + public MetricBucket add(MetricEvent event, long n) { + counters[event.ordinal()].add(n); + return this; + } + public long pass() { - return pass.sum(); + return get(MetricEvent.PASS); } public long block() { - return block.sum(); + return get(MetricEvent.BLOCK); } public long exception() { - return exception.sum(); + return get(MetricEvent.EXCEPTION); } public long rt() { - return rt.sum(); + return get(MetricEvent.RT); } public long minRt() { @@ -78,27 +87,27 @@ public class MetricBucket { } public long success() { - return success.sum(); + return get(MetricEvent.SUCCESS); } public void addPass() { - pass.add(1L); + add(MetricEvent.PASS, 1); } public void addException() { - exception.add(1L); + add(MetricEvent.EXCEPTION, 1); } public void addBlock() { - block.add(1L); + add(MetricEvent.BLOCK, 1); } public void addSuccess() { - success.add(1L); + add(MetricEvent.SUCCESS, 1); } public void addRT(long rt) { - this.rt.add(rt); + add(MetricEvent.RT, rt); // Not thread-safe, but it's okay. if (rt < minRt) { diff --git a/sentinel-transport/sentinel-transport-common/src/main/java/com/alibaba/csp/sentinel/command/handler/FetchClusterModeCommandHandler.java b/sentinel-transport/sentinel-transport-common/src/main/java/com/alibaba/csp/sentinel/command/handler/cluster/FetchClusterModeCommandHandler.java similarity index 97% rename from sentinel-transport/sentinel-transport-common/src/main/java/com/alibaba/csp/sentinel/command/handler/FetchClusterModeCommandHandler.java rename to sentinel-transport/sentinel-transport-common/src/main/java/com/alibaba/csp/sentinel/command/handler/cluster/FetchClusterModeCommandHandler.java index 11eac68f..14be5199 100644 --- a/sentinel-transport/sentinel-transport-common/src/main/java/com/alibaba/csp/sentinel/command/handler/FetchClusterModeCommandHandler.java +++ b/sentinel-transport/sentinel-transport-common/src/main/java/com/alibaba/csp/sentinel/command/handler/cluster/FetchClusterModeCommandHandler.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.alibaba.csp.sentinel.command.handler; +package com.alibaba.csp.sentinel.command.handler.cluster; import com.alibaba.csp.sentinel.cluster.ClusterStateManager; import com.alibaba.csp.sentinel.cluster.client.TokenClientProvider; diff --git a/sentinel-transport/sentinel-transport-common/src/main/java/com/alibaba/csp/sentinel/command/handler/ModifyClusterModeCommandHandler.java b/sentinel-transport/sentinel-transport-common/src/main/java/com/alibaba/csp/sentinel/command/handler/cluster/ModifyClusterModeCommandHandler.java similarity index 89% rename from sentinel-transport/sentinel-transport-common/src/main/java/com/alibaba/csp/sentinel/command/handler/ModifyClusterModeCommandHandler.java rename to sentinel-transport/sentinel-transport-common/src/main/java/com/alibaba/csp/sentinel/command/handler/cluster/ModifyClusterModeCommandHandler.java index 1d67292e..072a52b0 100644 --- a/sentinel-transport/sentinel-transport-common/src/main/java/com/alibaba/csp/sentinel/command/handler/ModifyClusterModeCommandHandler.java +++ b/sentinel-transport/sentinel-transport-common/src/main/java/com/alibaba/csp/sentinel/command/handler/cluster/ModifyClusterModeCommandHandler.java @@ -13,13 +13,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.alibaba.csp.sentinel.command.handler; +package com.alibaba.csp.sentinel.command.handler.cluster; import com.alibaba.csp.sentinel.cluster.ClusterStateManager; import com.alibaba.csp.sentinel.command.CommandHandler; import com.alibaba.csp.sentinel.command.CommandRequest; import com.alibaba.csp.sentinel.command.CommandResponse; import com.alibaba.csp.sentinel.command.annotation.CommandMapping; +import com.alibaba.csp.sentinel.log.RecordLog; /** * @author Eric Zhao @@ -32,6 +33,7 @@ public class ModifyClusterModeCommandHandler implements CommandHandler { public CommandResponse handle(CommandRequest request) { try { int mode = Integer.valueOf(request.getParam("mode")); + RecordLog.info("[ModifyClusterModeCommandHandler] Modifying cluster mode to: " + mode); if (ClusterStateManager.applyState(mode)) { return CommandResponse.ofSuccess("success"); } else { diff --git a/sentinel-transport/sentinel-transport-common/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.command.CommandHandler b/sentinel-transport/sentinel-transport-common/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.command.CommandHandler index 4cd2606e..2edadbd4 100755 --- a/sentinel-transport/sentinel-transport-common/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.command.CommandHandler +++ b/sentinel-transport/sentinel-transport-common/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.command.CommandHandler @@ -12,5 +12,5 @@ com.alibaba.csp.sentinel.command.handler.OnOffGetCommandHandler com.alibaba.csp.sentinel.command.handler.OnOffSetCommandHandler com.alibaba.csp.sentinel.command.handler.SendMetricCommandHandler com.alibaba.csp.sentinel.command.handler.VersionCommandHandler -com.alibaba.csp.sentinel.command.handler.FetchClusterModeCommandHandler -com.alibaba.csp.sentinel.command.handler.ModifyClusterModeCommandHandler \ No newline at end of file +com.alibaba.csp.sentinel.command.handler.cluster.FetchClusterModeCommandHandler +com.alibaba.csp.sentinel.command.handler.cluster.ModifyClusterModeCommandHandler \ No newline at end of file