diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/ClusterRuleConstant.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/ClusterRuleConstant.java new file mode 100644 index 00000000..78cacf34 --- /dev/null +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/ClusterRuleConstant.java @@ -0,0 +1,31 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.slots.block; + +/** + * @author Eric Zhao + * @since 1.4.0 + */ +public final class ClusterRuleConstant { + + public static final int FLOW_CLUSTER_STRATEGY_NORMAL = 0; + public static final int FLOW_CLUSTER_STRATEGY_REF = 1; + + public static final int FLOW_THRESHOLD_AVG_LOCAL = 0; + public static final int FLOW_THRESHOLD_GLOBAL = 1; + + private ClusterRuleConstant() {} +} diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/ClusterFlowConfig.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/ClusterFlowConfig.java new file mode 100644 index 00000000..506701e2 --- /dev/null +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/ClusterFlowConfig.java @@ -0,0 +1,154 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.slots.block.flow; + +import com.alibaba.csp.sentinel.slots.block.ClusterRuleConstant; + +/** + * Flow rule config in cluster mode. + * + * @author Eric Zhao + * @since 1.4.0 + */ +public class ClusterFlowConfig { + + /** + * Global unique ID. + */ + private Integer flowId; + + /** + * Threshold type (average by local value or global value). + */ + private int thresholdType = ClusterRuleConstant.FLOW_THRESHOLD_AVG_LOCAL; + private boolean fallbackToLocalWhenFail; + + /** + * 0: normal; 1: using reference (borrow from reference). + */ + private int strategy = ClusterRuleConstant.FLOW_CLUSTER_STRATEGY_NORMAL; + + private Integer refFlowId; + private int refSampleCount = 10; + private double refRatio = 1d; + + public Integer getFlowId() { + return flowId; + } + + public ClusterFlowConfig setFlowId(Integer flowId) { + this.flowId = flowId; + return this; + } + + public int getThresholdType() { + return thresholdType; + } + + public ClusterFlowConfig setThresholdType(int thresholdType) { + this.thresholdType = thresholdType; + return this; + } + + public int getStrategy() { + return strategy; + } + + public ClusterFlowConfig setStrategy(int strategy) { + this.strategy = strategy; + return this; + } + + public Integer getRefFlowId() { + return refFlowId; + } + + public ClusterFlowConfig setRefFlowId(Integer refFlowId) { + this.refFlowId = refFlowId; + return this; + } + + public int getRefSampleCount() { + return refSampleCount; + } + + public ClusterFlowConfig setRefSampleCount(int refSampleCount) { + this.refSampleCount = refSampleCount; + return this; + } + + public double getRefRatio() { + return refRatio; + } + + public ClusterFlowConfig setRefRatio(double refRatio) { + this.refRatio = refRatio; + return this; + } + + public boolean isFallbackToLocalWhenFail() { + return fallbackToLocalWhenFail; + } + + public ClusterFlowConfig setFallbackToLocalWhenFail(boolean fallbackToLocalWhenFail) { + this.fallbackToLocalWhenFail = fallbackToLocalWhenFail; + return this; + } + + @Override + public boolean equals(Object o) { + if (this == o) { return true; } + if (o == null || getClass() != o.getClass()) { return false; } + + ClusterFlowConfig that = (ClusterFlowConfig)o; + + if (thresholdType != that.thresholdType) { return false; } + if (fallbackToLocalWhenFail != that.fallbackToLocalWhenFail) { return false; } + if (strategy != that.strategy) { return false; } + if (refSampleCount != that.refSampleCount) { return false; } + if (Double.compare(that.refRatio, refRatio) != 0) { return false; } + if (flowId != null ? !flowId.equals(that.flowId) : that.flowId != null) { return false; } + return refFlowId != null ? refFlowId.equals(that.refFlowId) : that.refFlowId == null; + } + + @Override + public int hashCode() { + int result; + long temp; + result = flowId != null ? flowId.hashCode() : 0; + result = 31 * result + thresholdType; + result = 31 * result + (fallbackToLocalWhenFail ? 1 : 0); + result = 31 * result + strategy; + result = 31 * result + (refFlowId != null ? refFlowId.hashCode() : 0); + result = 31 * result + refSampleCount; + temp = Double.doubleToLongBits(refRatio); + result = 31 * result + (int)(temp ^ (temp >>> 32)); + return result; + } + + @Override + public String toString() { + return "ClusterFlowConfig{" + + "flowId=" + flowId + + ", thresholdType=" + thresholdType + + ", fallbackToLocalWhenFail=" + fallbackToLocalWhenFail + + ", strategy=" + strategy + + ", refFlowId=" + refFlowId + + ", refSampleCount=" + refSampleCount + + ", refRatio=" + refRatio + + '}'; + } +} diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRule.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRule.java index dc42a89e..e7413ea2 100755 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRule.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRule.java @@ -85,6 +85,12 @@ public class FlowRule extends AbstractRule { */ private int maxQueueingTimeMs = 500; + private boolean clusterMode; + /** + * Flow rule config for cluster mode. + */ + private ClusterFlowConfig clusterConfig; + /** * The traffic shaping (throttling) controller. */ @@ -162,6 +168,24 @@ public class FlowRule extends AbstractRule { return this; } + public boolean isClusterMode() { + return clusterMode; + } + + public FlowRule setClusterMode(boolean clusterMode) { + this.clusterMode = clusterMode; + return this; + } + + public ClusterFlowConfig getClusterConfig() { + return clusterConfig; + } + + public FlowRule setClusterConfig(ClusterFlowConfig clusterConfig) { + this.clusterConfig = clusterConfig; + return this; + } + @Override public boolean passCheck(Context context, DefaultNode node, int acquireCount, Object... args) { return true; @@ -169,43 +193,21 @@ public class FlowRule extends AbstractRule { @Override public boolean equals(Object o) { - if (this == o) { - return true; - } - if (!(o instanceof FlowRule)) { - return false; - } - if (!super.equals(o)) { - return false; - } - - FlowRule flowRule = (FlowRule)o; - - if (grade != flowRule.grade) { - return false; - } - if (Double.compare(flowRule.count, count) != 0) { - return false; - } - if (strategy != flowRule.strategy) { - return false; - } - if (refResource != null ? !refResource.equals(flowRule.refResource) : flowRule.refResource != null) { - return false; - } - if (this.controlBehavior != flowRule.controlBehavior) { - return false; - } - - if (warmUpPeriodSec != flowRule.warmUpPeriodSec) { - return false; - } - - if (maxQueueingTimeMs != flowRule.maxQueueingTimeMs) { - return false; - } + if (this == o) { return true; } + if (o == null || getClass() != o.getClass()) { return false; } + if (!super.equals(o)) { return false; } - return true; + FlowRule rule = (FlowRule)o; + + if (grade != rule.grade) { return false; } + if (Double.compare(rule.count, count) != 0) { return false; } + if (strategy != rule.strategy) { return false; } + if (controlBehavior != rule.controlBehavior) { return false; } + if (warmUpPeriodSec != rule.warmUpPeriodSec) { return false; } + if (maxQueueingTimeMs != rule.maxQueueingTimeMs) { return false; } + if (clusterMode != rule.clusterMode) { return false; } + if (refResource != null ? !refResource.equals(rule.refResource) : rule.refResource != null) { return false; } + return clusterConfig != null ? clusterConfig.equals(rule.clusterConfig) : rule.clusterConfig == null; } @Override @@ -217,10 +219,11 @@ public class FlowRule extends AbstractRule { result = 31 * result + (int)(temp ^ (temp >>> 32)); result = 31 * result + strategy; result = 31 * result + (refResource != null ? refResource.hashCode() : 0); - result = 31 * result + (int)(temp ^ (temp >>> 32)); - result = 31 * result + warmUpPeriodSec; result = 31 * result + controlBehavior; + result = 31 * result + warmUpPeriodSec; result = 31 * result + maxQueueingTimeMs; + result = 31 * result + (clusterMode ? 1 : 0); + result = 31 * result + (clusterConfig != null ? clusterConfig.hashCode() : 0); return result; } @@ -236,7 +239,9 @@ public class FlowRule extends AbstractRule { ", controlBehavior=" + controlBehavior + ", warmUpPeriodSec=" + warmUpPeriodSec + ", maxQueueingTimeMs=" + maxQueueingTimeMs + + ", clusterMode=" + clusterMode + + ", clusterConfig=" + clusterConfig + ", controller=" + controller + - "}"; + '}'; } } diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleChecker.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleChecker.java index 197a3bbf..7518d7c1 100644 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleChecker.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleChecker.java @@ -15,7 +15,12 @@ */ package com.alibaba.csp.sentinel.slots.block.flow; +import com.alibaba.csp.sentinel.cluster.ClusterTokenClient; +import com.alibaba.csp.sentinel.cluster.TokenClientProvider; +import com.alibaba.csp.sentinel.cluster.TokenResultStatus; +import com.alibaba.csp.sentinel.cluster.TokenResult; import com.alibaba.csp.sentinel.context.Context; +import com.alibaba.csp.sentinel.log.RecordLog; import com.alibaba.csp.sentinel.node.DefaultNode; import com.alibaba.csp.sentinel.node.Node; import com.alibaba.csp.sentinel.slots.block.RuleConstant; @@ -30,11 +35,25 @@ import com.alibaba.csp.sentinel.util.StringUtil; final class FlowRuleChecker { static boolean passCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount) { + return passCheck(rule, context, node, acquireCount, false); + } + + static boolean passCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount, + boolean prioritized) { String limitApp = rule.getLimitApp(); if (limitApp == null) { return true; } + if (rule.isClusterMode()) { + return passClusterCheck(rule, context, node, acquireCount, prioritized); + } + + return passLocalCheck(rule, context, node, acquireCount, prioritized); + } + + private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, + boolean prioritized) { Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node); if (selectedNode == null) { return true; @@ -43,6 +62,38 @@ final class FlowRuleChecker { return rule.getRater().canPass(selectedNode, acquireCount); } + static boolean passClusterCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, + boolean prioritized) { + try { + ClusterTokenClient client = TokenClientProvider.getClient(); + if (client != null) { + TokenResult result = client.requestToken(rule.getClusterConfig().getFlowId(), acquireCount, + prioritized); + switch (result.getStatus()) { + case TokenResultStatus.OK: + return true; + case TokenResultStatus.SHOULD_WAIT: + // Wait for next tick. + Thread.sleep(result.getWaitInMs()); + return true; + case TokenResultStatus.NO_RULE_EXISTS: + case TokenResultStatus.BAD_REQUEST: + case TokenResultStatus.FAIL: + return passLocalCheck(rule, context, node, acquireCount, prioritized); + case TokenResultStatus.BLOCKED: + default: + return false; + } + } + // If client is absent, then fallback to local mode. + } catch (Throwable ex) { + RecordLog.warn("[FlowRuleChecker] Request cluster token unexpected failed", ex); + } + // TODO: choose whether fallback to local or inactivate the rule. + // Downgrade to local flow control when token client or server for this rule is not available. + return passLocalCheck(rule, context, node, acquireCount, prioritized); + } + static Node selectReferenceNode(FlowRule rule, Context context, DefaultNode node) { String refResource = rule.getRefResource(); int strategy = rule.getStrategy(); @@ -103,4 +154,4 @@ final class FlowRuleChecker { } private FlowRuleChecker() {} -} +} \ No newline at end of file diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleComparator.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleComparator.java index 688bed96..ed1b22c6 100755 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleComparator.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleComparator.java @@ -19,10 +19,23 @@ import java.util.Comparator; import com.alibaba.csp.sentinel.slots.block.RuleConstant; +/** + * Comparator for flow rules. + * + * @author jialiang.linjl + */ public class FlowRuleComparator implements Comparator { @Override public int compare(FlowRule o1, FlowRule o2) { + // Clustered mode will be on the top. + if (o1.isClusterMode() && !o2.isClusterMode()) { + return 1; + } + + if (!o1.isClusterMode() && o2.isClusterMode()) { + return -1; + } if (o1.getLimitApp() == null) { return 0; diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleManager.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleManager.java index 230b4679..9dd23026 100755 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleManager.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleManager.java @@ -16,8 +16,6 @@ package com.alibaba.csp.sentinel.slots.block.flow; import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -32,16 +30,10 @@ import com.alibaba.csp.sentinel.node.metric.MetricTimerListener; import com.alibaba.csp.sentinel.property.DynamicSentinelProperty; import com.alibaba.csp.sentinel.property.PropertyListener; import com.alibaba.csp.sentinel.property.SentinelProperty; -import com.alibaba.csp.sentinel.slots.block.RuleConstant; -import com.alibaba.csp.sentinel.slots.block.flow.controller.DefaultController; -import com.alibaba.csp.sentinel.slots.block.flow.controller.RateLimiterController; -import com.alibaba.csp.sentinel.slots.block.flow.controller.WarmUpController; -import com.alibaba.csp.sentinel.slots.block.flow.controller.WarmUpRateLimiterController; /** *

- * One resources can have multiple rules. And these rules take effects in the - * following order: + * One resources can have multiple rules. And these rules take effects in the following order: *

    *
  1. requests from specified caller
  2. *
  3. no specified caller
  4. @@ -49,18 +41,21 @@ import com.alibaba.csp.sentinel.slots.block.flow.controller.WarmUpRateLimiterCon *

    * * @author jialiang.linjl + * @author Eric Zhao */ public class FlowRuleManager { private static final Map> flowRules = new ConcurrentHashMap>(); - private final static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, - new NamedThreadFactory("sentinel-metrics-record-task", true)); - private final static FlowPropertyListener listener = new FlowPropertyListener(); + + private static final FlowPropertyListener LISTENER = new FlowPropertyListener(); private static SentinelProperty> currentProperty = new DynamicSentinelProperty>(); + private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1, + new NamedThreadFactory("sentinel-metrics-record-task", true)); + static { - currentProperty.addListener(listener); - scheduler.scheduleAtFixedRate(new MetricTimerListener(), 0, 1, TimeUnit.SECONDS); + currentProperty.addListener(LISTENER); + SCHEDULER.scheduleAtFixedRate(new MetricTimerListener(), 0, 1, TimeUnit.SECONDS); } /** @@ -70,10 +65,10 @@ public class FlowRuleManager { * @param property the property to listen. */ public static void register2Property(SentinelProperty> property) { - synchronized (listener) { + synchronized (LISTENER) { RecordLog.info("[FlowRuleManager] Registering new property to flow rule manager"); - currentProperty.removeListener(listener); - property.addListener(listener); + currentProperty.removeListener(LISTENER); + property.addListener(LISTENER); currentProperty = property; } } @@ -100,65 +95,7 @@ public class FlowRuleManager { currentProperty.updateValue(rules); } - private static Map> loadFlowConf(List list) { - Map> newRuleMap = new ConcurrentHashMap>(); - - if (list == null || list.isEmpty()) { - return newRuleMap; - } - - for (FlowRule rule : list) { - if (!isValidRule(rule)) { - RecordLog.warn("[FlowRuleManager] Ignoring invalid flow rule when loading new flow rules: " + rule); - continue; - } - if (StringUtil.isBlank(rule.getLimitApp())) { - rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT); - } - - TrafficShapingController rater = new DefaultController(rule.getCount(), rule.getGrade()); - if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS - && rule.getControlBehavior() == RuleConstant.CONTROL_BEHAVIOR_WARM_UP - && rule.getWarmUpPeriodSec() > 0) { - rater = new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(), ColdFactorProperty.coldFactor); - - } else if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS - && rule.getControlBehavior() == RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER - && rule.getMaxQueueingTimeMs() > 0) { - rater = new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount()); - } else if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS - && rule.getControlBehavior() == RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER - && rule.getMaxQueueingTimeMs() > 0 && rule.getWarmUpPeriodSec() > 0) { - rater = new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(), - rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor); - } - - rule.setRater(rater); - - String identity = rule.getResource(); - List ruleM = newRuleMap.get(identity); - - if (ruleM == null) { - ruleM = new ArrayList(); - newRuleMap.put(identity, ruleM); - } - - ruleM.add(rule); - - } - - if (!newRuleMap.isEmpty()) { - Comparator comparator = new FlowRuleComparator(); - // Sort the rules. - for (List rules : newRuleMap.values()) { - Collections.sort(rules, comparator); - } - } - - return newRuleMap; - } - - static Map> getFlowRules() { + static Map> getFlowRuleMap() { return flowRules; } @@ -188,7 +125,7 @@ public class FlowRuleManager { @Override public void configUpdate(List value) { - Map> rules = loadFlowConf(value); + Map> rules = FlowRuleUtil.buildFlowRuleMap(value); if (rules != null) { flowRules.clear(); flowRules.putAll(rules); @@ -198,43 +135,13 @@ public class FlowRuleManager { @Override public void configLoad(List conf) { - Map> rules = loadFlowConf(conf); + Map> rules = FlowRuleUtil.buildFlowRuleMap(conf); if (rules != null) { flowRules.clear(); flowRules.putAll(rules); } RecordLog.info("[FlowRuleManager] Flow rules loaded: " + flowRules); } - } - public static boolean isValidRule(FlowRule rule) { - boolean baseValid = rule != null && !StringUtil.isBlank(rule.getResource()) && rule.getCount() >= 0 - && rule.getGrade() >= 0 && rule.getStrategy() >= 0 && rule.getControlBehavior() >= 0; - if (!baseValid) { - return false; - } - // Check strategy and control (shaping) behavior. - return checkStrategyField(rule) && checkControlBehaviorField(rule); - } - - private static boolean checkStrategyField(/*@NonNull*/ FlowRule rule) { - if (rule.getStrategy() == RuleConstant.STRATEGY_RELATE || rule.getStrategy() == RuleConstant.STRATEGY_CHAIN) { - return StringUtil.isNotBlank(rule.getRefResource()); - } - return true; - } - - private static boolean checkControlBehaviorField(/*@NonNull*/ FlowRule rule) { - switch (rule.getControlBehavior()) { - case RuleConstant.CONTROL_BEHAVIOR_WARM_UP: - return rule.getWarmUpPeriodSec() > 0; - case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER: - return rule.getMaxQueueingTimeMs() > 0; - case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER: - return rule.getWarmUpPeriodSec() > 0 && rule.getMaxQueueingTimeMs() > 0; - default: - return true; - } - } } diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleUtil.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleUtil.java new file mode 100644 index 00000000..aa5a58c6 --- /dev/null +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleUtil.java @@ -0,0 +1,227 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.slots.block.flow; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import com.alibaba.csp.sentinel.log.RecordLog; +import com.alibaba.csp.sentinel.slots.block.ClusterRuleConstant; +import com.alibaba.csp.sentinel.slots.block.RuleConstant; +import com.alibaba.csp.sentinel.slots.block.flow.controller.DefaultController; +import com.alibaba.csp.sentinel.slots.block.flow.controller.RateLimiterController; +import com.alibaba.csp.sentinel.slots.block.flow.controller.WarmUpController; +import com.alibaba.csp.sentinel.slots.block.flow.controller.WarmUpRateLimiterController; +import com.alibaba.csp.sentinel.util.StringUtil; +import com.alibaba.csp.sentinel.util.function.Function; +import com.alibaba.csp.sentinel.util.function.Predicate; + +/** + * @author Eric Zhao + * @since 1.4.0 + */ +public final class FlowRuleUtil { + + /** + * Build the flow rule map from raw list of flow rules, grouping by resource name. + * + * @param list raw list of flow rules + * @return constructed new flow rule map; empty map if list is null or empty, or no valid rules + */ + public static Map> buildFlowRuleMap(List list) { + return buildFlowRuleMap(list, null); + } + + /** + * Build the flow rule map from raw list of flow rules, grouping by resource name. + * + * @param list raw list of flow rules + * @param filter rule filter + * @return constructed new flow rule map; empty map if list is null or empty, or no wanted rules + */ + public static Map> buildFlowRuleMap(List list, Predicate filter) { + return buildFlowRuleMap(list, filter, true); + } + + /** + * Build the flow rule map from raw list of flow rules, grouping by resource name. + * + * @param list raw list of flow rules + * @param filter rule filter + * @param shouldSort whether the rules should be sorted + * @return constructed new flow rule map; empty map if list is null or empty, or no wanted rules + */ + public static Map> buildFlowRuleMap(List list, Predicate filter, + boolean shouldSort) { + return buildFlowRuleMap(list, extractResource, filter, shouldSort); + } + + /** + * Build the flow rule map from raw list of flow rules, grouping by provided group function. + * + * @param list raw list of flow rules + * @param groupFunction grouping function of the map (by key) + * @param filter rule filter + * @param shouldSort whether the rules should be sorted + * @param type of key + * @return constructed new flow rule map; empty map if list is null or empty, or no wanted rules + */ + public static Map> buildFlowRuleMap(List list, Function groupFunction, + Predicate filter, boolean shouldSort) { + Map> newRuleMap = new ConcurrentHashMap>(); + if (list == null || list.isEmpty()) { + return newRuleMap; + } + + for (FlowRule rule : list) { + if (!isValidRule(rule)) { + RecordLog.warn("[FlowRuleManager] Ignoring invalid flow rule when loading new flow rules: " + rule); + continue; + } + if (filter != null && !filter.test(rule)) { + continue; + } + if (StringUtil.isBlank(rule.getLimitApp())) { + rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT); + } + + TrafficShapingController rater = generateRater(rule); + rule.setRater(rater); + + K key = groupFunction.apply(rule); + if (key == null) { + continue; + } + List flowRules = newRuleMap.get(key); + + if (flowRules == null) { + flowRules = new ArrayList(); + newRuleMap.put(key, flowRules); + } + + flowRules.add(rule); + } + + if (shouldSort && !newRuleMap.isEmpty()) { + Comparator comparator = new FlowRuleComparator(); + // Sort the rules. + for (List rules : newRuleMap.values()) { + Collections.sort(rules, comparator); + } + } + + return newRuleMap; + } + + private static TrafficShapingController generateRater(/*@Valid*/ FlowRule rule) { + if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) { + switch (rule.getControlBehavior()) { + case RuleConstant.CONTROL_BEHAVIOR_WARM_UP: + return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(), + ColdFactorProperty.coldFactor); + case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER: + return new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount()); + case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER: + return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(), + rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor); + case RuleConstant.CONTROL_BEHAVIOR_DEFAULT: + default: + // Default mode or unknown mode: default traffic shaping controller (fast-reject). + } + } + return new DefaultController(rule.getCount(), rule.getGrade()); + } + + /** + * Check whether provided ID can be a valid cluster flow ID. + * + * @param id flow ID to check + * @return true if valid, otherwise false + */ + public static boolean validClusterRuleId(Integer id) { + return id != null && id > 0; + } + + /** + * Check whether provided flow rule is valid. + * + * @param rule flow rule to check + * @return true if valid, otherwise false + */ + public static boolean isValidRule(FlowRule rule) { + boolean baseValid = rule != null && !StringUtil.isBlank(rule.getResource()) && rule.getCount() >= 0 + && rule.getGrade() >= 0 && rule.getStrategy() >= 0 && rule.getControlBehavior() >= 0; + if (!baseValid) { + return false; + } + // Check strategy and control (shaping) behavior. + return checkClusterField(rule) && checkStrategyField(rule) && checkControlBehaviorField(rule); + } + + private static boolean checkClusterField(/*@NonNull*/ FlowRule rule) { + if (!rule.isClusterMode()) { + return true; + } + ClusterFlowConfig clusterConfig = rule.getClusterConfig(); + if (clusterConfig == null) { + return false; + } + if (!validClusterRuleId(clusterConfig.getFlowId())) { + return false; + } + switch (rule.getStrategy()) { + case ClusterRuleConstant.FLOW_CLUSTER_STRATEGY_NORMAL: + return true; + case ClusterRuleConstant.FLOW_CLUSTER_STRATEGY_REF: + return validClusterRuleId(clusterConfig.getRefFlowId()); + default: + return true; + } + } + + private static boolean checkStrategyField(/*@NonNull*/ FlowRule rule) { + if (rule.getStrategy() == RuleConstant.STRATEGY_RELATE || rule.getStrategy() == RuleConstant.STRATEGY_CHAIN) { + return StringUtil.isNotBlank(rule.getRefResource()); + } + return true; + } + + private static boolean checkControlBehaviorField(/*@NonNull*/ FlowRule rule) { + switch (rule.getControlBehavior()) { + case RuleConstant.CONTROL_BEHAVIOR_WARM_UP: + return rule.getWarmUpPeriodSec() > 0; + case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER: + return rule.getMaxQueueingTimeMs() > 0; + case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER: + return rule.getWarmUpPeriodSec() > 0 && rule.getMaxQueueingTimeMs() > 0; + default: + return true; + } + } + + private static final Function extractResource = new Function() { + @Override + public String apply(FlowRule rule) { + return rule.getResource(); + } + }; + + private FlowRuleUtil() {} +} diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowSlot.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowSlot.java index 4bd8ca5d..3f392db1 100755 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowSlot.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowSlot.java @@ -138,27 +138,27 @@ public class FlowSlot extends AbstractLinkedProcessorSlot { @Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { - checkFlow(resourceWrapper, context, node, count); + checkFlow(resourceWrapper, context, node, count, prioritized); fireEntry(context, resourceWrapper, node, count, prioritized, args); } - void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count) throws BlockException { + void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException { // Flow rule map cannot be null. - Map> flowRules = FlowRuleManager.getFlowRules(); + Map> flowRules = FlowRuleManager.getFlowRuleMap(); List rules = flowRules.get(resource.getName()); if (rules != null) { for (FlowRule rule : rules) { - if (!canPassCheck(rule, context, node, count)) { + if (!canPassCheck(rule, context, node, count, prioritized)) { throw new FlowException(rule.getLimitApp()); } } } } - boolean canPassCheck(FlowRule rule, Context context, DefaultNode node, int count) { - return FlowRuleChecker.passCheck(rule, context, node, count); + boolean canPassCheck(FlowRule rule, Context context, DefaultNode node, int count, boolean prioritized) { + return FlowRuleChecker.passCheck(rule, context, node, count, prioritized); } @Override