From 99bdb9cf3c3820b0dc334ed508209ad9ff054bce Mon Sep 17 00:00:00 2001 From: Eric Zhao Date: Fri, 4 Jan 2019 14:09:15 +0800 Subject: [PATCH] Add total QPS limit control for specific namespace in cluster flow control (#382) - Add `UnaryLeapArray` and `RequestLimiter` to enable simple QPS limit - Improve cluster rule manager and server config manager to support request limiter - Support `TOO_MANY_REQUEST` status in client side - Also improve the automatic namespace register of embedded server mode Signed-off-by: Eric Zhao --- .../cluster/flow/ClusterFlowChecker.java | 11 +++ .../cluster/flow/ClusterParamFlowChecker.java | 11 +++ .../flow/rule/ClusterFlowRuleManager.java | 22 ++++- .../rule/ClusterParamFlowRuleManager.java | 23 ++++- .../statistic/limit/GlobalRequestLimiter.java | 83 +++++++++++++++++ .../flow/statistic/limit/RequestLimiter.java | 88 +++++++++++++++++++ .../FetchClusterServerInfoCommandHandler.java | 22 ++++- .../ModifyClusterServerFlowConfigHandler.java | 6 ++ .../config/ClusterServerConfigManager.java | 46 +++++++++- .../server/config/ServerFlowConfig.java | 13 +++ .../sentinel/cluster/TokenResultStatus.java | 4 + .../slots/block/flow/FlowRuleChecker.java | 1 + .../slots/statistic/base/UnaryLeapArray.java | 38 ++++++++ 13 files changed, 357 insertions(+), 11 deletions(-) create mode 100644 sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/limit/GlobalRequestLimiter.java create mode 100644 sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/limit/RequestLimiter.java create mode 100644 sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/base/UnaryLeapArray.java diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/ClusterFlowChecker.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/ClusterFlowChecker.java index f9fb0108..ab114799 100644 --- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/ClusterFlowChecker.java +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/ClusterFlowChecker.java @@ -19,6 +19,7 @@ import com.alibaba.csp.sentinel.cluster.TokenResultStatus; import com.alibaba.csp.sentinel.cluster.TokenResult; import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterFlowRuleManager; import com.alibaba.csp.sentinel.cluster.flow.statistic.ClusterMetricStatistics; +import com.alibaba.csp.sentinel.cluster.flow.statistic.limit.GlobalRequestLimiter; import com.alibaba.csp.sentinel.cluster.server.config.ClusterServerConfigManager; import com.alibaba.csp.sentinel.cluster.flow.statistic.data.ClusterFlowEvent; import com.alibaba.csp.sentinel.cluster.flow.statistic.metric.ClusterMetric; @@ -46,8 +47,18 @@ final class ClusterFlowChecker { } } + static boolean allowProceed(long flowId) { + String namespace = ClusterFlowRuleManager.getNamespace(flowId); + return GlobalRequestLimiter.tryPass(namespace); + } + static TokenResult acquireClusterToken(/*@Valid*/ FlowRule rule, int acquireCount, boolean prioritized) { Long id = rule.getClusterConfig().getFlowId(); + + if (!allowProceed(id)) { + return new TokenResult(TokenResultStatus.TOO_MANY_REQUEST); + } + ClusterMetric metric = ClusterMetricStatistics.getMetric(id); if (metric == null) { return new TokenResult(TokenResultStatus.FAIL); diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/ClusterParamFlowChecker.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/ClusterParamFlowChecker.java index 798ecf84..20d26845 100644 --- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/ClusterParamFlowChecker.java +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/ClusterParamFlowChecker.java @@ -21,6 +21,7 @@ import com.alibaba.csp.sentinel.cluster.TokenResult; import com.alibaba.csp.sentinel.cluster.TokenResultStatus; import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterParamFlowRuleManager; import com.alibaba.csp.sentinel.cluster.flow.statistic.ClusterParamMetricStatistics; +import com.alibaba.csp.sentinel.cluster.flow.statistic.limit.GlobalRequestLimiter; import com.alibaba.csp.sentinel.cluster.flow.statistic.metric.ClusterParamMetric; import com.alibaba.csp.sentinel.cluster.server.log.ClusterServerStatLogUtil; import com.alibaba.csp.sentinel.slots.block.ClusterRuleConstant; @@ -33,8 +34,18 @@ import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowRule; */ public final class ClusterParamFlowChecker { + static boolean allowProceed(long flowId) { + String namespace = ClusterParamFlowRuleManager.getNamespace(flowId); + return GlobalRequestLimiter.tryPass(namespace); + } + static TokenResult acquireClusterToken(ParamFlowRule rule, int count, Collection values) { Long id = rule.getClusterConfig().getFlowId(); + + if (!allowProceed(id)) { + return new TokenResult(TokenResultStatus.TOO_MANY_REQUEST); + } + ClusterParamMetric metric = ClusterParamMetricStatistics.getMetric(id); if (metric == null) { // Unexpected state, return FAIL. diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/rule/ClusterFlowRuleManager.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/rule/ClusterFlowRuleManager.java index 621ff6b5..13bee56a 100644 --- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/rule/ClusterFlowRuleManager.java +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/rule/ClusterFlowRuleManager.java @@ -33,6 +33,7 @@ import com.alibaba.csp.sentinel.property.DynamicSentinelProperty; import com.alibaba.csp.sentinel.property.PropertyListener; import com.alibaba.csp.sentinel.property.SentinelProperty; import com.alibaba.csp.sentinel.slots.block.RuleConstant; +import com.alibaba.csp.sentinel.slots.block.flow.ClusterFlowConfig; import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleUtil; import com.alibaba.csp.sentinel.util.AssertUtil; @@ -208,6 +209,17 @@ public final class ClusterFlowRuleManager { return FLOW_RULES.get(id); } + public static Set getFlowIdSet(String namespace) { + if (StringUtil.isEmpty(namespace)) { + return new HashSet<>(); + } + Set set = NAMESPACE_FLOW_ID_MAP.get(namespace); + if (set == null) { + return new HashSet<>(); + } + return new HashSet<>(set); + } + public static List getAllFlowRules() { return new ArrayList<>(FLOW_RULES.values()); } @@ -303,6 +315,10 @@ public final class ClusterFlowRuleManager { return ConnectionManager.getConnectedCount(namespace); } + public static String getNamespace(long flowId) { + return FLOW_NAMESPACE_MAP.get(flowId); + } + private static void applyClusterFlowRule(List list, /*@Valid*/ String namespace) { if (list == null || list.isEmpty()) { clearAndResetRulesFor(namespace); @@ -326,7 +342,8 @@ public final class ClusterFlowRuleManager { } // Flow id should not be null after filtered. - Long flowId = rule.getClusterConfig().getFlowId(); + ClusterFlowConfig clusterConfig = rule.getClusterConfig(); + Long flowId = clusterConfig.getFlowId(); if (flowId == null) { continue; } @@ -336,8 +353,7 @@ public final class ClusterFlowRuleManager { // Prepare cluster metric from valid flow ID. ClusterMetricStatistics.putMetricIfAbsent(flowId, - new ClusterMetric(ClusterServerConfigManager.getSampleCount(), - ClusterServerConfigManager.getIntervalMs())); + new ClusterMetric(clusterConfig.getSampleCount(), clusterConfig.getWindowIntervalMs())); } // Cleanup unused cluster metrics. diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/rule/ClusterParamFlowRuleManager.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/rule/ClusterParamFlowRuleManager.java index c81393d5..ec5eca65 100644 --- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/rule/ClusterParamFlowRuleManager.java +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/rule/ClusterParamFlowRuleManager.java @@ -25,7 +25,6 @@ import java.util.concurrent.ConcurrentHashMap; import com.alibaba.csp.sentinel.cluster.flow.statistic.ClusterParamMetricStatistics; import com.alibaba.csp.sentinel.cluster.flow.statistic.metric.ClusterParamMetric; import com.alibaba.csp.sentinel.cluster.server.ServerConstants; -import com.alibaba.csp.sentinel.cluster.server.config.ClusterServerConfigManager; import com.alibaba.csp.sentinel.cluster.server.connection.ConnectionManager; import com.alibaba.csp.sentinel.cluster.server.util.ClusterRuleUtil; import com.alibaba.csp.sentinel.log.RecordLog; @@ -33,6 +32,7 @@ import com.alibaba.csp.sentinel.property.DynamicSentinelProperty; import com.alibaba.csp.sentinel.property.PropertyListener; import com.alibaba.csp.sentinel.property.SentinelProperty; import com.alibaba.csp.sentinel.slots.block.RuleConstant; +import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowClusterConfig; import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowRule; import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowRuleUtil; import com.alibaba.csp.sentinel.util.AssertUtil; @@ -100,6 +100,10 @@ public final class ClusterParamFlowRuleManager { ClusterParamFlowRuleManager.propertySupplier = propertySupplier; } + public static String getNamespace(long flowId) { + return FLOW_NAMESPACE_MAP.get(flowId); + } + /** * Listen to the {@link SentinelProperty} for cluster {@link ParamFlowRule}s. * The property is the source of cluster {@link ParamFlowRule}s for a specific namespace. @@ -218,6 +222,17 @@ public final class ClusterParamFlowRuleManager { return PARAM_RULES.get(id); } + public static Set getFlowIdSet(String namespace) { + if (StringUtil.isEmpty(namespace)) { + return new HashSet<>(); + } + Set set = NAMESPACE_FLOW_ID_MAP.get(namespace); + if (set == null) { + return new HashSet<>(); + } + return new HashSet<>(set); + } + public static List getAllParamRules() { return new ArrayList<>(PARAM_RULES.values()); } @@ -325,8 +340,9 @@ public final class ClusterParamFlowRuleManager { ParamFlowRuleUtil.fillExceptionFlowItems(rule); + ParamFlowClusterConfig clusterConfig = rule.getClusterConfig(); // Flow id should not be null after filtered. - Long flowId = rule.getClusterConfig().getFlowId(); + Long flowId = clusterConfig.getFlowId(); if (flowId == null) { continue; } @@ -336,8 +352,7 @@ public final class ClusterParamFlowRuleManager { // Prepare cluster parameter metric from valid rule ID. ClusterParamMetricStatistics.putMetricIfAbsent(flowId, - new ClusterParamMetric(ClusterServerConfigManager.getSampleCount(), - ClusterServerConfigManager.getIntervalMs())); + new ClusterParamMetric(clusterConfig.getSampleCount(), clusterConfig.getWindowIntervalMs())); } // Cleanup unused cluster parameter metrics. diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/limit/GlobalRequestLimiter.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/limit/GlobalRequestLimiter.java new file mode 100644 index 00000000..3cecfca0 --- /dev/null +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/limit/GlobalRequestLimiter.java @@ -0,0 +1,83 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.cluster.flow.statistic.limit; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import com.alibaba.csp.sentinel.cluster.server.config.ClusterServerConfigManager; +import com.alibaba.csp.sentinel.util.AssertUtil; + +/** + * @author Eric Zhao + * @since 1.4.1 + */ +public final class GlobalRequestLimiter { + + private static final Map GLOBAL_QPS_LIMITER_MAP = new ConcurrentHashMap<>(); + + public static void initIfAbsent(String namespace) { + AssertUtil.notEmpty(namespace, "namespace cannot be empty"); + if (!GLOBAL_QPS_LIMITER_MAP.containsKey(namespace)) { + GLOBAL_QPS_LIMITER_MAP.put(namespace, new RequestLimiter(ClusterServerConfigManager.getMaxAllowedQps(namespace))); + } + } + + public static RequestLimiter getRequestLimiter(String namespace) { + if (namespace == null) { + return null; + } + return GLOBAL_QPS_LIMITER_MAP.get(namespace); + } + + public static boolean tryPass(String namespace) { + if (namespace == null) { + return false; + } + RequestLimiter limiter = GLOBAL_QPS_LIMITER_MAP.get(namespace); + if (limiter == null) { + return true; + } + return limiter.tryPass(); + } + + public static double getCurrentQps(String namespace) { + RequestLimiter limiter = getRequestLimiter(namespace); + if (limiter == null) { + return 0; + } + return limiter.getQps(); + } + + public static double getMaxAllowedQps(String namespace) { + RequestLimiter limiter = getRequestLimiter(namespace); + if (limiter == null) { + return 0; + } + return limiter.getQpsAllowed(); + } + + public static void applyMaxQpsChange(double maxAllowedQps) { + AssertUtil.isTrue(maxAllowedQps >= 0, "max allowed QPS should > 0"); + for (RequestLimiter limiter : GLOBAL_QPS_LIMITER_MAP.values()) { + if (limiter != null) { + limiter.setQpsAllowed(maxAllowedQps); + } + } + } + + private GlobalRequestLimiter() {} +} diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/limit/RequestLimiter.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/limit/RequestLimiter.java new file mode 100644 index 00000000..2aec82d5 --- /dev/null +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/limit/RequestLimiter.java @@ -0,0 +1,88 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.cluster.flow.statistic.limit; + +import java.util.List; + +import com.alibaba.csp.sentinel.slots.statistic.base.LeapArray; +import com.alibaba.csp.sentinel.slots.statistic.base.LongAdder; +import com.alibaba.csp.sentinel.slots.statistic.base.UnaryLeapArray; +import com.alibaba.csp.sentinel.util.AssertUtil; + +/** + * @author Eric Zhao + * @since 1.4.1 + */ +public class RequestLimiter { + + private double qpsAllowed; + + private final LeapArray data; + + public RequestLimiter(double qpsAllowed) { + this(new UnaryLeapArray(10, 1000), qpsAllowed); + } + + RequestLimiter(LeapArray data, double qpsAllowed) { + AssertUtil.isTrue(qpsAllowed >= 0, "max allowed QPS should > 0"); + this.data = data; + this.qpsAllowed = qpsAllowed; + } + + public void increment() { + data.currentWindow().value().increment(); + } + + public void add(int x) { + data.currentWindow().value().add(x); + } + + public long getSum() { + data.currentWindow(); + long success = 0; + + List list = data.values(); + for (LongAdder window : list) { + success += window.sum(); + } + return success; + } + + public double getQps() { + return getSum() / data.getIntervalInSecond(); + } + + public double getQpsAllowed() { + return qpsAllowed; + } + + public boolean canPass() { + return getQps() + 1 <= qpsAllowed; + } + + public RequestLimiter setQpsAllowed(double qpsAllowed) { + this.qpsAllowed = qpsAllowed; + return this; + } + + public boolean tryPass() { + if (canPass()) { + add(1); + return true; + } + return false; + } +} diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/command/handler/FetchClusterServerInfoCommandHandler.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/command/handler/FetchClusterServerInfoCommandHandler.java index 4996be25..c5510c94 100644 --- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/command/handler/FetchClusterServerInfoCommandHandler.java +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/command/handler/FetchClusterServerInfoCommandHandler.java @@ -17,6 +17,7 @@ package com.alibaba.csp.sentinel.cluster.server.command.handler; import java.util.Set; +import com.alibaba.csp.sentinel.cluster.flow.statistic.limit.GlobalRequestLimiter; import com.alibaba.csp.sentinel.cluster.server.config.ClusterServerConfigManager; import com.alibaba.csp.sentinel.cluster.server.config.ServerFlowConfig; import com.alibaba.csp.sentinel.cluster.server.config.ServerTransportConfig; @@ -55,15 +56,32 @@ public class FetchClusterServerInfoCommandHandler implements CommandHandler namespaceSet) { + JSONArray array = new JSONArray(); + for (String namespace : namespaceSet) { + array.add(new JSONObject() + .fluentPut("namespace", namespace) + .fluentPut("currentQps", GlobalRequestLimiter.getCurrentQps(namespace)) + .fluentPut("maxAllowedQps", GlobalRequestLimiter.getMaxAllowedQps(namespace)) + ); + } + return array; + } } diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/command/handler/ModifyClusterServerFlowConfigHandler.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/command/handler/ModifyClusterServerFlowConfigHandler.java index 9e444666..38ed2572 100644 --- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/command/handler/ModifyClusterServerFlowConfigHandler.java +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/command/handler/ModifyClusterServerFlowConfigHandler.java @@ -48,10 +48,16 @@ public class ModifyClusterServerFlowConfigHandler implements CommandHandler: {1}", namespace, data); ServerFlowConfig config = JSON.parseObject(data, ServerFlowConfig.class); + if (!ClusterServerConfigManager.isValidFlowConfig(config)) { + CommandResponse.ofFailure(new IllegalArgumentException("Bad flow config")); + } ClusterServerConfigManager.loadFlowConfig(namespace, config); } diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/config/ClusterServerConfigManager.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/config/ClusterServerConfigManager.java index 92c8319a..8b158755 100644 --- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/config/ClusterServerConfigManager.java +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/config/ClusterServerConfigManager.java @@ -23,10 +23,13 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import com.alibaba.csp.sentinel.cluster.ClusterConstants; import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterFlowRuleManager; import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterParamFlowRuleManager; import com.alibaba.csp.sentinel.cluster.flow.statistic.ClusterMetricStatistics; import com.alibaba.csp.sentinel.cluster.flow.statistic.ClusterParamMetricStatistics; +import com.alibaba.csp.sentinel.cluster.flow.statistic.limit.GlobalRequestLimiter; +import com.alibaba.csp.sentinel.cluster.registry.ConfigSupplierRegistry; import com.alibaba.csp.sentinel.cluster.server.ServerConstants; import com.alibaba.csp.sentinel.log.RecordLog; import com.alibaba.csp.sentinel.property.DynamicSentinelProperty; @@ -40,10 +43,12 @@ import com.alibaba.csp.sentinel.util.AssertUtil; */ public final class ClusterServerConfigManager { + private static boolean embedded = false; + /** * Server global transport and scope config. */ - private static volatile int port = ServerTransportConfig.DEFAULT_PORT; + private static volatile int port = ClusterConstants.DEFAULT_CLUSTER_SERVER_PORT; private static volatile int idleSeconds = ServerTransportConfig.DEFAULT_IDLE_SECONDS; private static volatile Set namespaceSet = Collections.singleton(ServerConstants.DEFAULT_NAMESPACE); @@ -54,6 +59,7 @@ public final class ClusterServerConfigManager { private static volatile double maxOccupyRatio = ServerFlowConfig.DEFAULT_MAX_OCCUPY_RATIO; private static volatile int intervalMs = ServerFlowConfig.DEFAULT_INTERVAL_MS; private static volatile int sampleCount = ServerFlowConfig.DEFAULT_SAMPLE_COUNT; + private static volatile double maxAllowedQps = ServerFlowConfig.DEFAULT_MAX_ALLOWED_QPS; /** * Namespace-specific flow config for token server. @@ -170,6 +176,11 @@ public final class ClusterServerConfigManager { newSet = new HashSet<>(newSet); newSet.add(ServerConstants.DEFAULT_NAMESPACE); + if (embedded) { + // In embedded server mode, the server itself is also a part of service, + // so it should be added to namespace set. + newSet.add(ConfigSupplierRegistry.getNamespaceSupplier().get()); + } Set oldSet = ClusterServerConfigManager.namespaceSet; if (oldSet != null && !oldSet.isEmpty()) { @@ -185,6 +196,7 @@ public final class ClusterServerConfigManager { for (String ns : newSet) { ClusterFlowRuleManager.registerPropertyIfAbsent(ns); ClusterParamFlowRuleManager.registerPropertyIfAbsent(ns); + GlobalRequestLimiter.initIfAbsent(ns); } } @@ -256,6 +268,10 @@ public final class ClusterServerConfigManager { if (config.getMaxOccupyRatio() != maxOccupyRatio) { maxOccupyRatio = config.getMaxOccupyRatio(); } + if (config.getMaxAllowedQps() != maxAllowedQps) { + maxAllowedQps = config.getMaxAllowedQps(); + GlobalRequestLimiter.applyMaxQpsChange(maxAllowedQps); + } int newIntervalMs = config.getIntervalMs(); int newSampleCount = config.getSampleCount(); if (newIntervalMs != intervalMs || newSampleCount != sampleCount) { @@ -277,7 +293,8 @@ public final class ClusterServerConfigManager { } public static boolean isValidFlowConfig(ServerFlowConfig config) { - return config != null && config.getMaxOccupyRatio() >= 0 && config.getExceedCount() >= 0; + return config != null && config.getMaxOccupyRatio() >= 0 && config.getExceedCount() >= 0 + && config.getMaxAllowedQps() >= 0; } public static double getExceedCount(String namespace) { @@ -322,6 +339,19 @@ public final class ClusterServerConfigManager { return sampleCount; } + public static double getMaxAllowedQps() { + return maxAllowedQps; + } + + public static double getMaxAllowedQps(String namespace) { + AssertUtil.notEmpty(namespace, "namespace cannot be empty"); + ServerFlowConfig config = NAMESPACE_CONF.get(namespace); + if (config != null) { + return config.getMaxAllowedQps(); + } + return maxAllowedQps; + } + public static double getExceedCount() { return exceedCount; } @@ -354,5 +384,17 @@ public final class ClusterServerConfigManager { applyNamespaceSetChange(namespaceSet); } + public static boolean isEmbedded() { + return embedded; + } + + public static void setEmbedded(boolean embedded) { + ClusterServerConfigManager.embedded = embedded; + } + + public static void setMaxAllowedQps(double maxAllowedQps) { + ClusterServerConfigManager.maxAllowedQps = maxAllowedQps; + } + private ClusterServerConfigManager() {} } diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/config/ServerFlowConfig.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/config/ServerFlowConfig.java index 00991605..633609b0 100644 --- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/config/ServerFlowConfig.java +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/config/ServerFlowConfig.java @@ -28,6 +28,7 @@ public class ServerFlowConfig { public static final int DEFAULT_INTERVAL_MS = 1000; public static final int DEFAULT_SAMPLE_COUNT= 10; + public static final double DEFAULT_MAX_ALLOWED_QPS= 30000; private final String namespace; @@ -36,6 +37,8 @@ public class ServerFlowConfig { private int intervalMs = DEFAULT_INTERVAL_MS; private int sampleCount = DEFAULT_SAMPLE_COUNT; + private double maxAllowedQps = DEFAULT_MAX_ALLOWED_QPS; + public ServerFlowConfig() { this(ServerConstants.DEFAULT_NAMESPACE); } @@ -84,6 +87,15 @@ public class ServerFlowConfig { return this; } + public double getMaxAllowedQps() { + return maxAllowedQps; + } + + public ServerFlowConfig setMaxAllowedQps(double maxAllowedQps) { + this.maxAllowedQps = maxAllowedQps; + return this; + } + @Override public String toString() { return "ServerFlowConfig{" + @@ -92,6 +104,7 @@ public class ServerFlowConfig { ", maxOccupyRatio=" + maxOccupyRatio + ", intervalMs=" + intervalMs + ", sampleCount=" + sampleCount + + ", maxAllowedQps=" + maxAllowedQps + '}'; } } diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/cluster/TokenResultStatus.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/cluster/TokenResultStatus.java index 275d761e..ff5dddf5 100644 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/cluster/TokenResultStatus.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/cluster/TokenResultStatus.java @@ -25,6 +25,10 @@ public final class TokenResultStatus { * Bad client request. */ public static final int BAD_REQUEST = -4; + /** + * Too many request in server. + */ + public static final int TOO_MANY_REQUEST = -2; /** * Server or client unexpected failure (due to transport or serialization failure). */ diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleChecker.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleChecker.java index 08cf5dd2..7d552ddb 100644 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleChecker.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleChecker.java @@ -178,6 +178,7 @@ final class FlowRuleChecker { case TokenResultStatus.NO_RULE_EXISTS: case TokenResultStatus.BAD_REQUEST: case TokenResultStatus.FAIL: + case TokenResultStatus.TOO_MANY_REQUEST: return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized); case TokenResultStatus.BLOCKED: default: diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/base/UnaryLeapArray.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/base/UnaryLeapArray.java new file mode 100644 index 00000000..01ab8cf3 --- /dev/null +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/base/UnaryLeapArray.java @@ -0,0 +1,38 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.slots.statistic.base; + +/** + * @author Eric Zhao + */ +public class UnaryLeapArray extends LeapArray { + + public UnaryLeapArray(int sampleCount, int intervalInMs) { + super(sampleCount, intervalInMs); + } + + @Override + public LongAdder newEmptyBucket() { + return new LongAdder(); + } + + @Override + protected WindowWrap resetWindowTo(WindowWrap windowWrap, long startTime) { + windowWrap.resetTo(startTime); + windowWrap.value().reset(); + return windowWrap; + } +}