- Add `UnaryLeapArray` and `RequestLimiter` to enable simple QPS limit - Improve cluster rule manager and server config manager to support request limiter - Support `TOO_MANY_REQUEST` status in client side - Also improve the automatic namespace register of embedded server mode Signed-off-by: Eric Zhao <sczyh16@gmail.com>master
@@ -19,6 +19,7 @@ import com.alibaba.csp.sentinel.cluster.TokenResultStatus; | |||||
import com.alibaba.csp.sentinel.cluster.TokenResult; | import com.alibaba.csp.sentinel.cluster.TokenResult; | ||||
import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterFlowRuleManager; | import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterFlowRuleManager; | ||||
import com.alibaba.csp.sentinel.cluster.flow.statistic.ClusterMetricStatistics; | import com.alibaba.csp.sentinel.cluster.flow.statistic.ClusterMetricStatistics; | ||||
import com.alibaba.csp.sentinel.cluster.flow.statistic.limit.GlobalRequestLimiter; | |||||
import com.alibaba.csp.sentinel.cluster.server.config.ClusterServerConfigManager; | import com.alibaba.csp.sentinel.cluster.server.config.ClusterServerConfigManager; | ||||
import com.alibaba.csp.sentinel.cluster.flow.statistic.data.ClusterFlowEvent; | import com.alibaba.csp.sentinel.cluster.flow.statistic.data.ClusterFlowEvent; | ||||
import com.alibaba.csp.sentinel.cluster.flow.statistic.metric.ClusterMetric; | import com.alibaba.csp.sentinel.cluster.flow.statistic.metric.ClusterMetric; | ||||
@@ -46,8 +47,18 @@ final class ClusterFlowChecker { | |||||
} | } | ||||
} | } | ||||
static boolean allowProceed(long flowId) { | |||||
String namespace = ClusterFlowRuleManager.getNamespace(flowId); | |||||
return GlobalRequestLimiter.tryPass(namespace); | |||||
} | |||||
static TokenResult acquireClusterToken(/*@Valid*/ FlowRule rule, int acquireCount, boolean prioritized) { | static TokenResult acquireClusterToken(/*@Valid*/ FlowRule rule, int acquireCount, boolean prioritized) { | ||||
Long id = rule.getClusterConfig().getFlowId(); | Long id = rule.getClusterConfig().getFlowId(); | ||||
if (!allowProceed(id)) { | |||||
return new TokenResult(TokenResultStatus.TOO_MANY_REQUEST); | |||||
} | |||||
ClusterMetric metric = ClusterMetricStatistics.getMetric(id); | ClusterMetric metric = ClusterMetricStatistics.getMetric(id); | ||||
if (metric == null) { | if (metric == null) { | ||||
return new TokenResult(TokenResultStatus.FAIL); | return new TokenResult(TokenResultStatus.FAIL); | ||||
@@ -21,6 +21,7 @@ import com.alibaba.csp.sentinel.cluster.TokenResult; | |||||
import com.alibaba.csp.sentinel.cluster.TokenResultStatus; | import com.alibaba.csp.sentinel.cluster.TokenResultStatus; | ||||
import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterParamFlowRuleManager; | import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterParamFlowRuleManager; | ||||
import com.alibaba.csp.sentinel.cluster.flow.statistic.ClusterParamMetricStatistics; | import com.alibaba.csp.sentinel.cluster.flow.statistic.ClusterParamMetricStatistics; | ||||
import com.alibaba.csp.sentinel.cluster.flow.statistic.limit.GlobalRequestLimiter; | |||||
import com.alibaba.csp.sentinel.cluster.flow.statistic.metric.ClusterParamMetric; | import com.alibaba.csp.sentinel.cluster.flow.statistic.metric.ClusterParamMetric; | ||||
import com.alibaba.csp.sentinel.cluster.server.log.ClusterServerStatLogUtil; | import com.alibaba.csp.sentinel.cluster.server.log.ClusterServerStatLogUtil; | ||||
import com.alibaba.csp.sentinel.slots.block.ClusterRuleConstant; | import com.alibaba.csp.sentinel.slots.block.ClusterRuleConstant; | ||||
@@ -33,8 +34,18 @@ import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowRule; | |||||
*/ | */ | ||||
public final class ClusterParamFlowChecker { | public final class ClusterParamFlowChecker { | ||||
static boolean allowProceed(long flowId) { | |||||
String namespace = ClusterParamFlowRuleManager.getNamespace(flowId); | |||||
return GlobalRequestLimiter.tryPass(namespace); | |||||
} | |||||
static TokenResult acquireClusterToken(ParamFlowRule rule, int count, Collection<Object> values) { | static TokenResult acquireClusterToken(ParamFlowRule rule, int count, Collection<Object> values) { | ||||
Long id = rule.getClusterConfig().getFlowId(); | Long id = rule.getClusterConfig().getFlowId(); | ||||
if (!allowProceed(id)) { | |||||
return new TokenResult(TokenResultStatus.TOO_MANY_REQUEST); | |||||
} | |||||
ClusterParamMetric metric = ClusterParamMetricStatistics.getMetric(id); | ClusterParamMetric metric = ClusterParamMetricStatistics.getMetric(id); | ||||
if (metric == null) { | if (metric == null) { | ||||
// Unexpected state, return FAIL. | // Unexpected state, return FAIL. | ||||
@@ -33,6 +33,7 @@ import com.alibaba.csp.sentinel.property.DynamicSentinelProperty; | |||||
import com.alibaba.csp.sentinel.property.PropertyListener; | import com.alibaba.csp.sentinel.property.PropertyListener; | ||||
import com.alibaba.csp.sentinel.property.SentinelProperty; | import com.alibaba.csp.sentinel.property.SentinelProperty; | ||||
import com.alibaba.csp.sentinel.slots.block.RuleConstant; | import com.alibaba.csp.sentinel.slots.block.RuleConstant; | ||||
import com.alibaba.csp.sentinel.slots.block.flow.ClusterFlowConfig; | |||||
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; | import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; | ||||
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleUtil; | import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleUtil; | ||||
import com.alibaba.csp.sentinel.util.AssertUtil; | import com.alibaba.csp.sentinel.util.AssertUtil; | ||||
@@ -208,6 +209,17 @@ public final class ClusterFlowRuleManager { | |||||
return FLOW_RULES.get(id); | return FLOW_RULES.get(id); | ||||
} | } | ||||
public static Set<Long> getFlowIdSet(String namespace) { | |||||
if (StringUtil.isEmpty(namespace)) { | |||||
return new HashSet<>(); | |||||
} | |||||
Set<Long> set = NAMESPACE_FLOW_ID_MAP.get(namespace); | |||||
if (set == null) { | |||||
return new HashSet<>(); | |||||
} | |||||
return new HashSet<>(set); | |||||
} | |||||
public static List<FlowRule> getAllFlowRules() { | public static List<FlowRule> getAllFlowRules() { | ||||
return new ArrayList<>(FLOW_RULES.values()); | return new ArrayList<>(FLOW_RULES.values()); | ||||
} | } | ||||
@@ -303,6 +315,10 @@ public final class ClusterFlowRuleManager { | |||||
return ConnectionManager.getConnectedCount(namespace); | return ConnectionManager.getConnectedCount(namespace); | ||||
} | } | ||||
public static String getNamespace(long flowId) { | |||||
return FLOW_NAMESPACE_MAP.get(flowId); | |||||
} | |||||
private static void applyClusterFlowRule(List<FlowRule> list, /*@Valid*/ String namespace) { | private static void applyClusterFlowRule(List<FlowRule> list, /*@Valid*/ String namespace) { | ||||
if (list == null || list.isEmpty()) { | if (list == null || list.isEmpty()) { | ||||
clearAndResetRulesFor(namespace); | clearAndResetRulesFor(namespace); | ||||
@@ -326,7 +342,8 @@ public final class ClusterFlowRuleManager { | |||||
} | } | ||||
// Flow id should not be null after filtered. | // Flow id should not be null after filtered. | ||||
Long flowId = rule.getClusterConfig().getFlowId(); | |||||
ClusterFlowConfig clusterConfig = rule.getClusterConfig(); | |||||
Long flowId = clusterConfig.getFlowId(); | |||||
if (flowId == null) { | if (flowId == null) { | ||||
continue; | continue; | ||||
} | } | ||||
@@ -336,8 +353,7 @@ public final class ClusterFlowRuleManager { | |||||
// Prepare cluster metric from valid flow ID. | // Prepare cluster metric from valid flow ID. | ||||
ClusterMetricStatistics.putMetricIfAbsent(flowId, | ClusterMetricStatistics.putMetricIfAbsent(flowId, | ||||
new ClusterMetric(ClusterServerConfigManager.getSampleCount(), | |||||
ClusterServerConfigManager.getIntervalMs())); | |||||
new ClusterMetric(clusterConfig.getSampleCount(), clusterConfig.getWindowIntervalMs())); | |||||
} | } | ||||
// Cleanup unused cluster metrics. | // Cleanup unused cluster metrics. | ||||
@@ -25,7 +25,6 @@ import java.util.concurrent.ConcurrentHashMap; | |||||
import com.alibaba.csp.sentinel.cluster.flow.statistic.ClusterParamMetricStatistics; | import com.alibaba.csp.sentinel.cluster.flow.statistic.ClusterParamMetricStatistics; | ||||
import com.alibaba.csp.sentinel.cluster.flow.statistic.metric.ClusterParamMetric; | import com.alibaba.csp.sentinel.cluster.flow.statistic.metric.ClusterParamMetric; | ||||
import com.alibaba.csp.sentinel.cluster.server.ServerConstants; | import com.alibaba.csp.sentinel.cluster.server.ServerConstants; | ||||
import com.alibaba.csp.sentinel.cluster.server.config.ClusterServerConfigManager; | |||||
import com.alibaba.csp.sentinel.cluster.server.connection.ConnectionManager; | import com.alibaba.csp.sentinel.cluster.server.connection.ConnectionManager; | ||||
import com.alibaba.csp.sentinel.cluster.server.util.ClusterRuleUtil; | import com.alibaba.csp.sentinel.cluster.server.util.ClusterRuleUtil; | ||||
import com.alibaba.csp.sentinel.log.RecordLog; | import com.alibaba.csp.sentinel.log.RecordLog; | ||||
@@ -33,6 +32,7 @@ import com.alibaba.csp.sentinel.property.DynamicSentinelProperty; | |||||
import com.alibaba.csp.sentinel.property.PropertyListener; | import com.alibaba.csp.sentinel.property.PropertyListener; | ||||
import com.alibaba.csp.sentinel.property.SentinelProperty; | import com.alibaba.csp.sentinel.property.SentinelProperty; | ||||
import com.alibaba.csp.sentinel.slots.block.RuleConstant; | import com.alibaba.csp.sentinel.slots.block.RuleConstant; | ||||
import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowClusterConfig; | |||||
import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowRule; | import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowRule; | ||||
import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowRuleUtil; | import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowRuleUtil; | ||||
import com.alibaba.csp.sentinel.util.AssertUtil; | import com.alibaba.csp.sentinel.util.AssertUtil; | ||||
@@ -100,6 +100,10 @@ public final class ClusterParamFlowRuleManager { | |||||
ClusterParamFlowRuleManager.propertySupplier = propertySupplier; | ClusterParamFlowRuleManager.propertySupplier = propertySupplier; | ||||
} | } | ||||
public static String getNamespace(long flowId) { | |||||
return FLOW_NAMESPACE_MAP.get(flowId); | |||||
} | |||||
/** | /** | ||||
* Listen to the {@link SentinelProperty} for cluster {@link ParamFlowRule}s. | * Listen to the {@link SentinelProperty} for cluster {@link ParamFlowRule}s. | ||||
* The property is the source of cluster {@link ParamFlowRule}s for a specific namespace. | * The property is the source of cluster {@link ParamFlowRule}s for a specific namespace. | ||||
@@ -218,6 +222,17 @@ public final class ClusterParamFlowRuleManager { | |||||
return PARAM_RULES.get(id); | return PARAM_RULES.get(id); | ||||
} | } | ||||
public static Set<Long> getFlowIdSet(String namespace) { | |||||
if (StringUtil.isEmpty(namespace)) { | |||||
return new HashSet<>(); | |||||
} | |||||
Set<Long> set = NAMESPACE_FLOW_ID_MAP.get(namespace); | |||||
if (set == null) { | |||||
return new HashSet<>(); | |||||
} | |||||
return new HashSet<>(set); | |||||
} | |||||
public static List<ParamFlowRule> getAllParamRules() { | public static List<ParamFlowRule> getAllParamRules() { | ||||
return new ArrayList<>(PARAM_RULES.values()); | return new ArrayList<>(PARAM_RULES.values()); | ||||
} | } | ||||
@@ -325,8 +340,9 @@ public final class ClusterParamFlowRuleManager { | |||||
ParamFlowRuleUtil.fillExceptionFlowItems(rule); | ParamFlowRuleUtil.fillExceptionFlowItems(rule); | ||||
ParamFlowClusterConfig clusterConfig = rule.getClusterConfig(); | |||||
// Flow id should not be null after filtered. | // Flow id should not be null after filtered. | ||||
Long flowId = rule.getClusterConfig().getFlowId(); | |||||
Long flowId = clusterConfig.getFlowId(); | |||||
if (flowId == null) { | if (flowId == null) { | ||||
continue; | continue; | ||||
} | } | ||||
@@ -336,8 +352,7 @@ public final class ClusterParamFlowRuleManager { | |||||
// Prepare cluster parameter metric from valid rule ID. | // Prepare cluster parameter metric from valid rule ID. | ||||
ClusterParamMetricStatistics.putMetricIfAbsent(flowId, | ClusterParamMetricStatistics.putMetricIfAbsent(flowId, | ||||
new ClusterParamMetric(ClusterServerConfigManager.getSampleCount(), | |||||
ClusterServerConfigManager.getIntervalMs())); | |||||
new ClusterParamMetric(clusterConfig.getSampleCount(), clusterConfig.getWindowIntervalMs())); | |||||
} | } | ||||
// Cleanup unused cluster parameter metrics. | // Cleanup unused cluster parameter metrics. | ||||
@@ -0,0 +1,83 @@ | |||||
/* | |||||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||||
* | |||||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||||
* you may not use this file except in compliance with the License. | |||||
* You may obtain a copy of the License at | |||||
* | |||||
* http://www.apache.org/licenses/LICENSE-2.0 | |||||
* | |||||
* Unless required by applicable law or agreed to in writing, software | |||||
* distributed under the License is distributed on an "AS IS" BASIS, | |||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
* See the License for the specific language governing permissions and | |||||
* limitations under the License. | |||||
*/ | |||||
package com.alibaba.csp.sentinel.cluster.flow.statistic.limit; | |||||
import java.util.Map; | |||||
import java.util.concurrent.ConcurrentHashMap; | |||||
import com.alibaba.csp.sentinel.cluster.server.config.ClusterServerConfigManager; | |||||
import com.alibaba.csp.sentinel.util.AssertUtil; | |||||
/** | |||||
* @author Eric Zhao | |||||
* @since 1.4.1 | |||||
*/ | |||||
public final class GlobalRequestLimiter { | |||||
private static final Map<String, RequestLimiter> GLOBAL_QPS_LIMITER_MAP = new ConcurrentHashMap<>(); | |||||
public static void initIfAbsent(String namespace) { | |||||
AssertUtil.notEmpty(namespace, "namespace cannot be empty"); | |||||
if (!GLOBAL_QPS_LIMITER_MAP.containsKey(namespace)) { | |||||
GLOBAL_QPS_LIMITER_MAP.put(namespace, new RequestLimiter(ClusterServerConfigManager.getMaxAllowedQps(namespace))); | |||||
} | |||||
} | |||||
public static RequestLimiter getRequestLimiter(String namespace) { | |||||
if (namespace == null) { | |||||
return null; | |||||
} | |||||
return GLOBAL_QPS_LIMITER_MAP.get(namespace); | |||||
} | |||||
public static boolean tryPass(String namespace) { | |||||
if (namespace == null) { | |||||
return false; | |||||
} | |||||
RequestLimiter limiter = GLOBAL_QPS_LIMITER_MAP.get(namespace); | |||||
if (limiter == null) { | |||||
return true; | |||||
} | |||||
return limiter.tryPass(); | |||||
} | |||||
public static double getCurrentQps(String namespace) { | |||||
RequestLimiter limiter = getRequestLimiter(namespace); | |||||
if (limiter == null) { | |||||
return 0; | |||||
} | |||||
return limiter.getQps(); | |||||
} | |||||
public static double getMaxAllowedQps(String namespace) { | |||||
RequestLimiter limiter = getRequestLimiter(namespace); | |||||
if (limiter == null) { | |||||
return 0; | |||||
} | |||||
return limiter.getQpsAllowed(); | |||||
} | |||||
public static void applyMaxQpsChange(double maxAllowedQps) { | |||||
AssertUtil.isTrue(maxAllowedQps >= 0, "max allowed QPS should > 0"); | |||||
for (RequestLimiter limiter : GLOBAL_QPS_LIMITER_MAP.values()) { | |||||
if (limiter != null) { | |||||
limiter.setQpsAllowed(maxAllowedQps); | |||||
} | |||||
} | |||||
} | |||||
private GlobalRequestLimiter() {} | |||||
} |
@@ -0,0 +1,88 @@ | |||||
/* | |||||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||||
* | |||||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||||
* you may not use this file except in compliance with the License. | |||||
* You may obtain a copy of the License at | |||||
* | |||||
* http://www.apache.org/licenses/LICENSE-2.0 | |||||
* | |||||
* Unless required by applicable law or agreed to in writing, software | |||||
* distributed under the License is distributed on an "AS IS" BASIS, | |||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
* See the License for the specific language governing permissions and | |||||
* limitations under the License. | |||||
*/ | |||||
package com.alibaba.csp.sentinel.cluster.flow.statistic.limit; | |||||
import java.util.List; | |||||
import com.alibaba.csp.sentinel.slots.statistic.base.LeapArray; | |||||
import com.alibaba.csp.sentinel.slots.statistic.base.LongAdder; | |||||
import com.alibaba.csp.sentinel.slots.statistic.base.UnaryLeapArray; | |||||
import com.alibaba.csp.sentinel.util.AssertUtil; | |||||
/** | |||||
* @author Eric Zhao | |||||
* @since 1.4.1 | |||||
*/ | |||||
public class RequestLimiter { | |||||
private double qpsAllowed; | |||||
private final LeapArray<LongAdder> data; | |||||
public RequestLimiter(double qpsAllowed) { | |||||
this(new UnaryLeapArray(10, 1000), qpsAllowed); | |||||
} | |||||
RequestLimiter(LeapArray<LongAdder> data, double qpsAllowed) { | |||||
AssertUtil.isTrue(qpsAllowed >= 0, "max allowed QPS should > 0"); | |||||
this.data = data; | |||||
this.qpsAllowed = qpsAllowed; | |||||
} | |||||
public void increment() { | |||||
data.currentWindow().value().increment(); | |||||
} | |||||
public void add(int x) { | |||||
data.currentWindow().value().add(x); | |||||
} | |||||
public long getSum() { | |||||
data.currentWindow(); | |||||
long success = 0; | |||||
List<LongAdder> list = data.values(); | |||||
for (LongAdder window : list) { | |||||
success += window.sum(); | |||||
} | |||||
return success; | |||||
} | |||||
public double getQps() { | |||||
return getSum() / data.getIntervalInSecond(); | |||||
} | |||||
public double getQpsAllowed() { | |||||
return qpsAllowed; | |||||
} | |||||
public boolean canPass() { | |||||
return getQps() + 1 <= qpsAllowed; | |||||
} | |||||
public RequestLimiter setQpsAllowed(double qpsAllowed) { | |||||
this.qpsAllowed = qpsAllowed; | |||||
return this; | |||||
} | |||||
public boolean tryPass() { | |||||
if (canPass()) { | |||||
add(1); | |||||
return true; | |||||
} | |||||
return false; | |||||
} | |||||
} |
@@ -17,6 +17,7 @@ package com.alibaba.csp.sentinel.cluster.server.command.handler; | |||||
import java.util.Set; | import java.util.Set; | ||||
import com.alibaba.csp.sentinel.cluster.flow.statistic.limit.GlobalRequestLimiter; | |||||
import com.alibaba.csp.sentinel.cluster.server.config.ClusterServerConfigManager; | import com.alibaba.csp.sentinel.cluster.server.config.ClusterServerConfigManager; | ||||
import com.alibaba.csp.sentinel.cluster.server.config.ServerFlowConfig; | import com.alibaba.csp.sentinel.cluster.server.config.ServerFlowConfig; | ||||
import com.alibaba.csp.sentinel.cluster.server.config.ServerTransportConfig; | import com.alibaba.csp.sentinel.cluster.server.config.ServerTransportConfig; | ||||
@@ -55,15 +56,32 @@ public class FetchClusterServerInfoCommandHandler implements CommandHandler<Stri | |||||
.setExceedCount(ClusterServerConfigManager.getExceedCount()) | .setExceedCount(ClusterServerConfigManager.getExceedCount()) | ||||
.setMaxOccupyRatio(ClusterServerConfigManager.getMaxOccupyRatio()) | .setMaxOccupyRatio(ClusterServerConfigManager.getMaxOccupyRatio()) | ||||
.setIntervalMs(ClusterServerConfigManager.getIntervalMs()) | .setIntervalMs(ClusterServerConfigManager.getIntervalMs()) | ||||
.setSampleCount(ClusterServerConfigManager.getSampleCount()); | |||||
.setSampleCount(ClusterServerConfigManager.getSampleCount()) | |||||
.setMaxAllowedQps(ClusterServerConfigManager.getMaxAllowedQps()); | |||||
JSONArray requestLimitData = buildRequestLimitData(namespaceSet); | |||||
info.fluentPut("port", ClusterServerConfigManager.getPort()) | info.fluentPut("port", ClusterServerConfigManager.getPort()) | ||||
.fluentPut("connection", connectionGroups) | .fluentPut("connection", connectionGroups) | ||||
.fluentPut("requestLimitData", requestLimitData) | |||||
.fluentPut("transport", transportConfig) | .fluentPut("transport", transportConfig) | ||||
.fluentPut("flow", flowConfig) | .fluentPut("flow", flowConfig) | ||||
.fluentPut("namespaceSet", ClusterServerConfigManager.getNamespaceSet()); | |||||
.fluentPut("namespaceSet", namespaceSet) | |||||
.fluentPut("embedded", ClusterServerConfigManager.isEmbedded()); | |||||
return CommandResponse.ofSuccess(info.toJSONString()); | return CommandResponse.ofSuccess(info.toJSONString()); | ||||
} | } | ||||
private JSONArray buildRequestLimitData(Set<String> namespaceSet) { | |||||
JSONArray array = new JSONArray(); | |||||
for (String namespace : namespaceSet) { | |||||
array.add(new JSONObject() | |||||
.fluentPut("namespace", namespace) | |||||
.fluentPut("currentQps", GlobalRequestLimiter.getCurrentQps(namespace)) | |||||
.fluentPut("maxAllowedQps", GlobalRequestLimiter.getMaxAllowedQps(namespace)) | |||||
); | |||||
} | |||||
return array; | |||||
} | |||||
} | } | ||||
@@ -48,10 +48,16 @@ public class ModifyClusterServerFlowConfigHandler implements CommandHandler<Stri | |||||
if (StringUtil.isEmpty(namespace)) { | if (StringUtil.isEmpty(namespace)) { | ||||
RecordLog.info("[ModifyClusterServerFlowConfigHandler] Receiving cluster server global flow config: " + data); | RecordLog.info("[ModifyClusterServerFlowConfigHandler] Receiving cluster server global flow config: " + data); | ||||
ServerFlowConfig config = JSON.parseObject(data, ServerFlowConfig.class); | ServerFlowConfig config = JSON.parseObject(data, ServerFlowConfig.class); | ||||
if (!ClusterServerConfigManager.isValidFlowConfig(config)) { | |||||
CommandResponse.ofFailure(new IllegalArgumentException("Bad flow config")); | |||||
} | |||||
ClusterServerConfigManager.loadGlobalFlowConfig(config); | ClusterServerConfigManager.loadGlobalFlowConfig(config); | ||||
} else { | } else { | ||||
RecordLog.info("[ModifyClusterServerFlowConfigHandler] Receiving cluster server flow config for namespace <{0}>: {1}", namespace, data); | RecordLog.info("[ModifyClusterServerFlowConfigHandler] Receiving cluster server flow config for namespace <{0}>: {1}", namespace, data); | ||||
ServerFlowConfig config = JSON.parseObject(data, ServerFlowConfig.class); | ServerFlowConfig config = JSON.parseObject(data, ServerFlowConfig.class); | ||||
if (!ClusterServerConfigManager.isValidFlowConfig(config)) { | |||||
CommandResponse.ofFailure(new IllegalArgumentException("Bad flow config")); | |||||
} | |||||
ClusterServerConfigManager.loadFlowConfig(namespace, config); | ClusterServerConfigManager.loadFlowConfig(namespace, config); | ||||
} | } | ||||
@@ -23,10 +23,13 @@ import java.util.Map; | |||||
import java.util.Set; | import java.util.Set; | ||||
import java.util.concurrent.ConcurrentHashMap; | import java.util.concurrent.ConcurrentHashMap; | ||||
import com.alibaba.csp.sentinel.cluster.ClusterConstants; | |||||
import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterFlowRuleManager; | import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterFlowRuleManager; | ||||
import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterParamFlowRuleManager; | import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterParamFlowRuleManager; | ||||
import com.alibaba.csp.sentinel.cluster.flow.statistic.ClusterMetricStatistics; | import com.alibaba.csp.sentinel.cluster.flow.statistic.ClusterMetricStatistics; | ||||
import com.alibaba.csp.sentinel.cluster.flow.statistic.ClusterParamMetricStatistics; | import com.alibaba.csp.sentinel.cluster.flow.statistic.ClusterParamMetricStatistics; | ||||
import com.alibaba.csp.sentinel.cluster.flow.statistic.limit.GlobalRequestLimiter; | |||||
import com.alibaba.csp.sentinel.cluster.registry.ConfigSupplierRegistry; | |||||
import com.alibaba.csp.sentinel.cluster.server.ServerConstants; | import com.alibaba.csp.sentinel.cluster.server.ServerConstants; | ||||
import com.alibaba.csp.sentinel.log.RecordLog; | import com.alibaba.csp.sentinel.log.RecordLog; | ||||
import com.alibaba.csp.sentinel.property.DynamicSentinelProperty; | import com.alibaba.csp.sentinel.property.DynamicSentinelProperty; | ||||
@@ -40,10 +43,12 @@ import com.alibaba.csp.sentinel.util.AssertUtil; | |||||
*/ | */ | ||||
public final class ClusterServerConfigManager { | public final class ClusterServerConfigManager { | ||||
private static boolean embedded = false; | |||||
/** | /** | ||||
* Server global transport and scope config. | * Server global transport and scope config. | ||||
*/ | */ | ||||
private static volatile int port = ServerTransportConfig.DEFAULT_PORT; | |||||
private static volatile int port = ClusterConstants.DEFAULT_CLUSTER_SERVER_PORT; | |||||
private static volatile int idleSeconds = ServerTransportConfig.DEFAULT_IDLE_SECONDS; | private static volatile int idleSeconds = ServerTransportConfig.DEFAULT_IDLE_SECONDS; | ||||
private static volatile Set<String> namespaceSet = Collections.singleton(ServerConstants.DEFAULT_NAMESPACE); | private static volatile Set<String> namespaceSet = Collections.singleton(ServerConstants.DEFAULT_NAMESPACE); | ||||
@@ -54,6 +59,7 @@ public final class ClusterServerConfigManager { | |||||
private static volatile double maxOccupyRatio = ServerFlowConfig.DEFAULT_MAX_OCCUPY_RATIO; | private static volatile double maxOccupyRatio = ServerFlowConfig.DEFAULT_MAX_OCCUPY_RATIO; | ||||
private static volatile int intervalMs = ServerFlowConfig.DEFAULT_INTERVAL_MS; | private static volatile int intervalMs = ServerFlowConfig.DEFAULT_INTERVAL_MS; | ||||
private static volatile int sampleCount = ServerFlowConfig.DEFAULT_SAMPLE_COUNT; | private static volatile int sampleCount = ServerFlowConfig.DEFAULT_SAMPLE_COUNT; | ||||
private static volatile double maxAllowedQps = ServerFlowConfig.DEFAULT_MAX_ALLOWED_QPS; | |||||
/** | /** | ||||
* Namespace-specific flow config for token server. | * Namespace-specific flow config for token server. | ||||
@@ -170,6 +176,11 @@ public final class ClusterServerConfigManager { | |||||
newSet = new HashSet<>(newSet); | newSet = new HashSet<>(newSet); | ||||
newSet.add(ServerConstants.DEFAULT_NAMESPACE); | newSet.add(ServerConstants.DEFAULT_NAMESPACE); | ||||
if (embedded) { | |||||
// In embedded server mode, the server itself is also a part of service, | |||||
// so it should be added to namespace set. | |||||
newSet.add(ConfigSupplierRegistry.getNamespaceSupplier().get()); | |||||
} | |||||
Set<String> oldSet = ClusterServerConfigManager.namespaceSet; | Set<String> oldSet = ClusterServerConfigManager.namespaceSet; | ||||
if (oldSet != null && !oldSet.isEmpty()) { | if (oldSet != null && !oldSet.isEmpty()) { | ||||
@@ -185,6 +196,7 @@ public final class ClusterServerConfigManager { | |||||
for (String ns : newSet) { | for (String ns : newSet) { | ||||
ClusterFlowRuleManager.registerPropertyIfAbsent(ns); | ClusterFlowRuleManager.registerPropertyIfAbsent(ns); | ||||
ClusterParamFlowRuleManager.registerPropertyIfAbsent(ns); | ClusterParamFlowRuleManager.registerPropertyIfAbsent(ns); | ||||
GlobalRequestLimiter.initIfAbsent(ns); | |||||
} | } | ||||
} | } | ||||
@@ -256,6 +268,10 @@ public final class ClusterServerConfigManager { | |||||
if (config.getMaxOccupyRatio() != maxOccupyRatio) { | if (config.getMaxOccupyRatio() != maxOccupyRatio) { | ||||
maxOccupyRatio = config.getMaxOccupyRatio(); | maxOccupyRatio = config.getMaxOccupyRatio(); | ||||
} | } | ||||
if (config.getMaxAllowedQps() != maxAllowedQps) { | |||||
maxAllowedQps = config.getMaxAllowedQps(); | |||||
GlobalRequestLimiter.applyMaxQpsChange(maxAllowedQps); | |||||
} | |||||
int newIntervalMs = config.getIntervalMs(); | int newIntervalMs = config.getIntervalMs(); | ||||
int newSampleCount = config.getSampleCount(); | int newSampleCount = config.getSampleCount(); | ||||
if (newIntervalMs != intervalMs || newSampleCount != sampleCount) { | if (newIntervalMs != intervalMs || newSampleCount != sampleCount) { | ||||
@@ -277,7 +293,8 @@ public final class ClusterServerConfigManager { | |||||
} | } | ||||
public static boolean isValidFlowConfig(ServerFlowConfig config) { | public static boolean isValidFlowConfig(ServerFlowConfig config) { | ||||
return config != null && config.getMaxOccupyRatio() >= 0 && config.getExceedCount() >= 0; | |||||
return config != null && config.getMaxOccupyRatio() >= 0 && config.getExceedCount() >= 0 | |||||
&& config.getMaxAllowedQps() >= 0; | |||||
} | } | ||||
public static double getExceedCount(String namespace) { | public static double getExceedCount(String namespace) { | ||||
@@ -322,6 +339,19 @@ public final class ClusterServerConfigManager { | |||||
return sampleCount; | return sampleCount; | ||||
} | } | ||||
public static double getMaxAllowedQps() { | |||||
return maxAllowedQps; | |||||
} | |||||
public static double getMaxAllowedQps(String namespace) { | |||||
AssertUtil.notEmpty(namespace, "namespace cannot be empty"); | |||||
ServerFlowConfig config = NAMESPACE_CONF.get(namespace); | |||||
if (config != null) { | |||||
return config.getMaxAllowedQps(); | |||||
} | |||||
return maxAllowedQps; | |||||
} | |||||
public static double getExceedCount() { | public static double getExceedCount() { | ||||
return exceedCount; | return exceedCount; | ||||
} | } | ||||
@@ -354,5 +384,17 @@ public final class ClusterServerConfigManager { | |||||
applyNamespaceSetChange(namespaceSet); | applyNamespaceSetChange(namespaceSet); | ||||
} | } | ||||
public static boolean isEmbedded() { | |||||
return embedded; | |||||
} | |||||
public static void setEmbedded(boolean embedded) { | |||||
ClusterServerConfigManager.embedded = embedded; | |||||
} | |||||
public static void setMaxAllowedQps(double maxAllowedQps) { | |||||
ClusterServerConfigManager.maxAllowedQps = maxAllowedQps; | |||||
} | |||||
private ClusterServerConfigManager() {} | private ClusterServerConfigManager() {} | ||||
} | } |
@@ -28,6 +28,7 @@ public class ServerFlowConfig { | |||||
public static final int DEFAULT_INTERVAL_MS = 1000; | public static final int DEFAULT_INTERVAL_MS = 1000; | ||||
public static final int DEFAULT_SAMPLE_COUNT= 10; | public static final int DEFAULT_SAMPLE_COUNT= 10; | ||||
public static final double DEFAULT_MAX_ALLOWED_QPS= 30000; | |||||
private final String namespace; | private final String namespace; | ||||
@@ -36,6 +37,8 @@ public class ServerFlowConfig { | |||||
private int intervalMs = DEFAULT_INTERVAL_MS; | private int intervalMs = DEFAULT_INTERVAL_MS; | ||||
private int sampleCount = DEFAULT_SAMPLE_COUNT; | private int sampleCount = DEFAULT_SAMPLE_COUNT; | ||||
private double maxAllowedQps = DEFAULT_MAX_ALLOWED_QPS; | |||||
public ServerFlowConfig() { | public ServerFlowConfig() { | ||||
this(ServerConstants.DEFAULT_NAMESPACE); | this(ServerConstants.DEFAULT_NAMESPACE); | ||||
} | } | ||||
@@ -84,6 +87,15 @@ public class ServerFlowConfig { | |||||
return this; | return this; | ||||
} | } | ||||
public double getMaxAllowedQps() { | |||||
return maxAllowedQps; | |||||
} | |||||
public ServerFlowConfig setMaxAllowedQps(double maxAllowedQps) { | |||||
this.maxAllowedQps = maxAllowedQps; | |||||
return this; | |||||
} | |||||
@Override | @Override | ||||
public String toString() { | public String toString() { | ||||
return "ServerFlowConfig{" + | return "ServerFlowConfig{" + | ||||
@@ -92,6 +104,7 @@ public class ServerFlowConfig { | |||||
", maxOccupyRatio=" + maxOccupyRatio + | ", maxOccupyRatio=" + maxOccupyRatio + | ||||
", intervalMs=" + intervalMs + | ", intervalMs=" + intervalMs + | ||||
", sampleCount=" + sampleCount + | ", sampleCount=" + sampleCount + | ||||
", maxAllowedQps=" + maxAllowedQps + | |||||
'}'; | '}'; | ||||
} | } | ||||
} | } |
@@ -25,6 +25,10 @@ public final class TokenResultStatus { | |||||
* Bad client request. | * Bad client request. | ||||
*/ | */ | ||||
public static final int BAD_REQUEST = -4; | public static final int BAD_REQUEST = -4; | ||||
/** | |||||
* Too many request in server. | |||||
*/ | |||||
public static final int TOO_MANY_REQUEST = -2; | |||||
/** | /** | ||||
* Server or client unexpected failure (due to transport or serialization failure). | * Server or client unexpected failure (due to transport or serialization failure). | ||||
*/ | */ | ||||
@@ -178,6 +178,7 @@ final class FlowRuleChecker { | |||||
case TokenResultStatus.NO_RULE_EXISTS: | case TokenResultStatus.NO_RULE_EXISTS: | ||||
case TokenResultStatus.BAD_REQUEST: | case TokenResultStatus.BAD_REQUEST: | ||||
case TokenResultStatus.FAIL: | case TokenResultStatus.FAIL: | ||||
case TokenResultStatus.TOO_MANY_REQUEST: | |||||
return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized); | return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized); | ||||
case TokenResultStatus.BLOCKED: | case TokenResultStatus.BLOCKED: | ||||
default: | default: | ||||
@@ -0,0 +1,38 @@ | |||||
/* | |||||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||||
* | |||||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||||
* you may not use this file except in compliance with the License. | |||||
* You may obtain a copy of the License at | |||||
* | |||||
* http://www.apache.org/licenses/LICENSE-2.0 | |||||
* | |||||
* Unless required by applicable law or agreed to in writing, software | |||||
* distributed under the License is distributed on an "AS IS" BASIS, | |||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
* See the License for the specific language governing permissions and | |||||
* limitations under the License. | |||||
*/ | |||||
package com.alibaba.csp.sentinel.slots.statistic.base; | |||||
/** | |||||
* @author Eric Zhao | |||||
*/ | |||||
public class UnaryLeapArray extends LeapArray<LongAdder> { | |||||
public UnaryLeapArray(int sampleCount, int intervalInMs) { | |||||
super(sampleCount, intervalInMs); | |||||
} | |||||
@Override | |||||
public LongAdder newEmptyBucket() { | |||||
return new LongAdder(); | |||||
} | |||||
@Override | |||||
protected WindowWrap<LongAdder> resetWindowTo(WindowWrap<LongAdder> windowWrap, long startTime) { | |||||
windowWrap.resetTo(startTime); | |||||
windowWrap.value().reset(); | |||||
return windowWrap; | |||||
} | |||||
} |