- Add new field `clusterMode` and `clusterConfig` for cluster mode - Add a `ClusterFlowConfig` class for specific items for cluster flow control - Update FlowRuleChecker to support cluster mode - Extract valid rule checking and rule map generating logic to FlowRuleUtil Signed-off-by: Eric Zhao <sczyh16@gmail.com>master
@@ -0,0 +1,31 @@ | |||
/* | |||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||
* | |||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||
* you may not use this file except in compliance with the License. | |||
* You may obtain a copy of the License at | |||
* | |||
* http://www.apache.org/licenses/LICENSE-2.0 | |||
* | |||
* Unless required by applicable law or agreed to in writing, software | |||
* distributed under the License is distributed on an "AS IS" BASIS, | |||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
* See the License for the specific language governing permissions and | |||
* limitations under the License. | |||
*/ | |||
package com.alibaba.csp.sentinel.slots.block; | |||
/** | |||
* @author Eric Zhao | |||
* @since 1.4.0 | |||
*/ | |||
public final class ClusterRuleConstant { | |||
public static final int FLOW_CLUSTER_STRATEGY_NORMAL = 0; | |||
public static final int FLOW_CLUSTER_STRATEGY_REF = 1; | |||
public static final int FLOW_THRESHOLD_AVG_LOCAL = 0; | |||
public static final int FLOW_THRESHOLD_GLOBAL = 1; | |||
private ClusterRuleConstant() {} | |||
} |
@@ -0,0 +1,154 @@ | |||
/* | |||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||
* | |||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||
* you may not use this file except in compliance with the License. | |||
* You may obtain a copy of the License at | |||
* | |||
* http://www.apache.org/licenses/LICENSE-2.0 | |||
* | |||
* Unless required by applicable law or agreed to in writing, software | |||
* distributed under the License is distributed on an "AS IS" BASIS, | |||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
* See the License for the specific language governing permissions and | |||
* limitations under the License. | |||
*/ | |||
package com.alibaba.csp.sentinel.slots.block.flow; | |||
import com.alibaba.csp.sentinel.slots.block.ClusterRuleConstant; | |||
/** | |||
* Flow rule config in cluster mode. | |||
* | |||
* @author Eric Zhao | |||
* @since 1.4.0 | |||
*/ | |||
public class ClusterFlowConfig { | |||
/** | |||
* Global unique ID. | |||
*/ | |||
private Integer flowId; | |||
/** | |||
* Threshold type (average by local value or global value). | |||
*/ | |||
private int thresholdType = ClusterRuleConstant.FLOW_THRESHOLD_AVG_LOCAL; | |||
private boolean fallbackToLocalWhenFail; | |||
/** | |||
* 0: normal; 1: using reference (borrow from reference). | |||
*/ | |||
private int strategy = ClusterRuleConstant.FLOW_CLUSTER_STRATEGY_NORMAL; | |||
private Integer refFlowId; | |||
private int refSampleCount = 10; | |||
private double refRatio = 1d; | |||
public Integer getFlowId() { | |||
return flowId; | |||
} | |||
public ClusterFlowConfig setFlowId(Integer flowId) { | |||
this.flowId = flowId; | |||
return this; | |||
} | |||
public int getThresholdType() { | |||
return thresholdType; | |||
} | |||
public ClusterFlowConfig setThresholdType(int thresholdType) { | |||
this.thresholdType = thresholdType; | |||
return this; | |||
} | |||
public int getStrategy() { | |||
return strategy; | |||
} | |||
public ClusterFlowConfig setStrategy(int strategy) { | |||
this.strategy = strategy; | |||
return this; | |||
} | |||
public Integer getRefFlowId() { | |||
return refFlowId; | |||
} | |||
public ClusterFlowConfig setRefFlowId(Integer refFlowId) { | |||
this.refFlowId = refFlowId; | |||
return this; | |||
} | |||
public int getRefSampleCount() { | |||
return refSampleCount; | |||
} | |||
public ClusterFlowConfig setRefSampleCount(int refSampleCount) { | |||
this.refSampleCount = refSampleCount; | |||
return this; | |||
} | |||
public double getRefRatio() { | |||
return refRatio; | |||
} | |||
public ClusterFlowConfig setRefRatio(double refRatio) { | |||
this.refRatio = refRatio; | |||
return this; | |||
} | |||
public boolean isFallbackToLocalWhenFail() { | |||
return fallbackToLocalWhenFail; | |||
} | |||
public ClusterFlowConfig setFallbackToLocalWhenFail(boolean fallbackToLocalWhenFail) { | |||
this.fallbackToLocalWhenFail = fallbackToLocalWhenFail; | |||
return this; | |||
} | |||
@Override | |||
public boolean equals(Object o) { | |||
if (this == o) { return true; } | |||
if (o == null || getClass() != o.getClass()) { return false; } | |||
ClusterFlowConfig that = (ClusterFlowConfig)o; | |||
if (thresholdType != that.thresholdType) { return false; } | |||
if (fallbackToLocalWhenFail != that.fallbackToLocalWhenFail) { return false; } | |||
if (strategy != that.strategy) { return false; } | |||
if (refSampleCount != that.refSampleCount) { return false; } | |||
if (Double.compare(that.refRatio, refRatio) != 0) { return false; } | |||
if (flowId != null ? !flowId.equals(that.flowId) : that.flowId != null) { return false; } | |||
return refFlowId != null ? refFlowId.equals(that.refFlowId) : that.refFlowId == null; | |||
} | |||
@Override | |||
public int hashCode() { | |||
int result; | |||
long temp; | |||
result = flowId != null ? flowId.hashCode() : 0; | |||
result = 31 * result + thresholdType; | |||
result = 31 * result + (fallbackToLocalWhenFail ? 1 : 0); | |||
result = 31 * result + strategy; | |||
result = 31 * result + (refFlowId != null ? refFlowId.hashCode() : 0); | |||
result = 31 * result + refSampleCount; | |||
temp = Double.doubleToLongBits(refRatio); | |||
result = 31 * result + (int)(temp ^ (temp >>> 32)); | |||
return result; | |||
} | |||
@Override | |||
public String toString() { | |||
return "ClusterFlowConfig{" + | |||
"flowId=" + flowId + | |||
", thresholdType=" + thresholdType + | |||
", fallbackToLocalWhenFail=" + fallbackToLocalWhenFail + | |||
", strategy=" + strategy + | |||
", refFlowId=" + refFlowId + | |||
", refSampleCount=" + refSampleCount + | |||
", refRatio=" + refRatio + | |||
'}'; | |||
} | |||
} |
@@ -85,6 +85,12 @@ public class FlowRule extends AbstractRule { | |||
*/ | |||
private int maxQueueingTimeMs = 500; | |||
private boolean clusterMode; | |||
/** | |||
* Flow rule config for cluster mode. | |||
*/ | |||
private ClusterFlowConfig clusterConfig; | |||
/** | |||
* The traffic shaping (throttling) controller. | |||
*/ | |||
@@ -162,6 +168,24 @@ public class FlowRule extends AbstractRule { | |||
return this; | |||
} | |||
public boolean isClusterMode() { | |||
return clusterMode; | |||
} | |||
public FlowRule setClusterMode(boolean clusterMode) { | |||
this.clusterMode = clusterMode; | |||
return this; | |||
} | |||
public ClusterFlowConfig getClusterConfig() { | |||
return clusterConfig; | |||
} | |||
public FlowRule setClusterConfig(ClusterFlowConfig clusterConfig) { | |||
this.clusterConfig = clusterConfig; | |||
return this; | |||
} | |||
@Override | |||
public boolean passCheck(Context context, DefaultNode node, int acquireCount, Object... args) { | |||
return true; | |||
@@ -169,43 +193,21 @@ public class FlowRule extends AbstractRule { | |||
@Override | |||
public boolean equals(Object o) { | |||
if (this == o) { | |||
return true; | |||
} | |||
if (!(o instanceof FlowRule)) { | |||
return false; | |||
} | |||
if (!super.equals(o)) { | |||
return false; | |||
} | |||
FlowRule flowRule = (FlowRule)o; | |||
if (grade != flowRule.grade) { | |||
return false; | |||
} | |||
if (Double.compare(flowRule.count, count) != 0) { | |||
return false; | |||
} | |||
if (strategy != flowRule.strategy) { | |||
return false; | |||
} | |||
if (refResource != null ? !refResource.equals(flowRule.refResource) : flowRule.refResource != null) { | |||
return false; | |||
} | |||
if (this.controlBehavior != flowRule.controlBehavior) { | |||
return false; | |||
} | |||
if (warmUpPeriodSec != flowRule.warmUpPeriodSec) { | |||
return false; | |||
} | |||
if (maxQueueingTimeMs != flowRule.maxQueueingTimeMs) { | |||
return false; | |||
} | |||
if (this == o) { return true; } | |||
if (o == null || getClass() != o.getClass()) { return false; } | |||
if (!super.equals(o)) { return false; } | |||
return true; | |||
FlowRule rule = (FlowRule)o; | |||
if (grade != rule.grade) { return false; } | |||
if (Double.compare(rule.count, count) != 0) { return false; } | |||
if (strategy != rule.strategy) { return false; } | |||
if (controlBehavior != rule.controlBehavior) { return false; } | |||
if (warmUpPeriodSec != rule.warmUpPeriodSec) { return false; } | |||
if (maxQueueingTimeMs != rule.maxQueueingTimeMs) { return false; } | |||
if (clusterMode != rule.clusterMode) { return false; } | |||
if (refResource != null ? !refResource.equals(rule.refResource) : rule.refResource != null) { return false; } | |||
return clusterConfig != null ? clusterConfig.equals(rule.clusterConfig) : rule.clusterConfig == null; | |||
} | |||
@Override | |||
@@ -217,10 +219,11 @@ public class FlowRule extends AbstractRule { | |||
result = 31 * result + (int)(temp ^ (temp >>> 32)); | |||
result = 31 * result + strategy; | |||
result = 31 * result + (refResource != null ? refResource.hashCode() : 0); | |||
result = 31 * result + (int)(temp ^ (temp >>> 32)); | |||
result = 31 * result + warmUpPeriodSec; | |||
result = 31 * result + controlBehavior; | |||
result = 31 * result + warmUpPeriodSec; | |||
result = 31 * result + maxQueueingTimeMs; | |||
result = 31 * result + (clusterMode ? 1 : 0); | |||
result = 31 * result + (clusterConfig != null ? clusterConfig.hashCode() : 0); | |||
return result; | |||
} | |||
@@ -236,7 +239,9 @@ public class FlowRule extends AbstractRule { | |||
", controlBehavior=" + controlBehavior + | |||
", warmUpPeriodSec=" + warmUpPeriodSec + | |||
", maxQueueingTimeMs=" + maxQueueingTimeMs + | |||
", clusterMode=" + clusterMode + | |||
", clusterConfig=" + clusterConfig + | |||
", controller=" + controller + | |||
"}"; | |||
'}'; | |||
} | |||
} |
@@ -15,7 +15,12 @@ | |||
*/ | |||
package com.alibaba.csp.sentinel.slots.block.flow; | |||
import com.alibaba.csp.sentinel.cluster.ClusterTokenClient; | |||
import com.alibaba.csp.sentinel.cluster.TokenClientProvider; | |||
import com.alibaba.csp.sentinel.cluster.TokenResultStatus; | |||
import com.alibaba.csp.sentinel.cluster.TokenResult; | |||
import com.alibaba.csp.sentinel.context.Context; | |||
import com.alibaba.csp.sentinel.log.RecordLog; | |||
import com.alibaba.csp.sentinel.node.DefaultNode; | |||
import com.alibaba.csp.sentinel.node.Node; | |||
import com.alibaba.csp.sentinel.slots.block.RuleConstant; | |||
@@ -30,11 +35,25 @@ import com.alibaba.csp.sentinel.util.StringUtil; | |||
final class FlowRuleChecker { | |||
static boolean passCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount) { | |||
return passCheck(rule, context, node, acquireCount, false); | |||
} | |||
static boolean passCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount, | |||
boolean prioritized) { | |||
String limitApp = rule.getLimitApp(); | |||
if (limitApp == null) { | |||
return true; | |||
} | |||
if (rule.isClusterMode()) { | |||
return passClusterCheck(rule, context, node, acquireCount, prioritized); | |||
} | |||
return passLocalCheck(rule, context, node, acquireCount, prioritized); | |||
} | |||
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, | |||
boolean prioritized) { | |||
Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node); | |||
if (selectedNode == null) { | |||
return true; | |||
@@ -43,6 +62,38 @@ final class FlowRuleChecker { | |||
return rule.getRater().canPass(selectedNode, acquireCount); | |||
} | |||
static boolean passClusterCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, | |||
boolean prioritized) { | |||
try { | |||
ClusterTokenClient client = TokenClientProvider.getClient(); | |||
if (client != null) { | |||
TokenResult result = client.requestToken(rule.getClusterConfig().getFlowId(), acquireCount, | |||
prioritized); | |||
switch (result.getStatus()) { | |||
case TokenResultStatus.OK: | |||
return true; | |||
case TokenResultStatus.SHOULD_WAIT: | |||
// Wait for next tick. | |||
Thread.sleep(result.getWaitInMs()); | |||
return true; | |||
case TokenResultStatus.NO_RULE_EXISTS: | |||
case TokenResultStatus.BAD_REQUEST: | |||
case TokenResultStatus.FAIL: | |||
return passLocalCheck(rule, context, node, acquireCount, prioritized); | |||
case TokenResultStatus.BLOCKED: | |||
default: | |||
return false; | |||
} | |||
} | |||
// If client is absent, then fallback to local mode. | |||
} catch (Throwable ex) { | |||
RecordLog.warn("[FlowRuleChecker] Request cluster token unexpected failed", ex); | |||
} | |||
// TODO: choose whether fallback to local or inactivate the rule. | |||
// Downgrade to local flow control when token client or server for this rule is not available. | |||
return passLocalCheck(rule, context, node, acquireCount, prioritized); | |||
} | |||
static Node selectReferenceNode(FlowRule rule, Context context, DefaultNode node) { | |||
String refResource = rule.getRefResource(); | |||
int strategy = rule.getStrategy(); | |||
@@ -103,4 +154,4 @@ final class FlowRuleChecker { | |||
} | |||
private FlowRuleChecker() {} | |||
} | |||
} |
@@ -19,10 +19,23 @@ import java.util.Comparator; | |||
import com.alibaba.csp.sentinel.slots.block.RuleConstant; | |||
/** | |||
* Comparator for flow rules. | |||
* | |||
* @author jialiang.linjl | |||
*/ | |||
public class FlowRuleComparator implements Comparator<FlowRule> { | |||
@Override | |||
public int compare(FlowRule o1, FlowRule o2) { | |||
// Clustered mode will be on the top. | |||
if (o1.isClusterMode() && !o2.isClusterMode()) { | |||
return 1; | |||
} | |||
if (!o1.isClusterMode() && o2.isClusterMode()) { | |||
return -1; | |||
} | |||
if (o1.getLimitApp() == null) { | |||
return 0; | |||
@@ -16,8 +16,6 @@ | |||
package com.alibaba.csp.sentinel.slots.block.flow; | |||
import java.util.ArrayList; | |||
import java.util.Collections; | |||
import java.util.Comparator; | |||
import java.util.List; | |||
import java.util.Map; | |||
import java.util.concurrent.ConcurrentHashMap; | |||
@@ -32,16 +30,10 @@ import com.alibaba.csp.sentinel.node.metric.MetricTimerListener; | |||
import com.alibaba.csp.sentinel.property.DynamicSentinelProperty; | |||
import com.alibaba.csp.sentinel.property.PropertyListener; | |||
import com.alibaba.csp.sentinel.property.SentinelProperty; | |||
import com.alibaba.csp.sentinel.slots.block.RuleConstant; | |||
import com.alibaba.csp.sentinel.slots.block.flow.controller.DefaultController; | |||
import com.alibaba.csp.sentinel.slots.block.flow.controller.RateLimiterController; | |||
import com.alibaba.csp.sentinel.slots.block.flow.controller.WarmUpController; | |||
import com.alibaba.csp.sentinel.slots.block.flow.controller.WarmUpRateLimiterController; | |||
/** | |||
* <p> | |||
* One resources can have multiple rules. And these rules take effects in the | |||
* following order: | |||
* One resources can have multiple rules. And these rules take effects in the following order: | |||
* <ol> | |||
* <li>requests from specified caller</li> | |||
* <li>no specified caller</li> | |||
@@ -49,18 +41,21 @@ import com.alibaba.csp.sentinel.slots.block.flow.controller.WarmUpRateLimiterCon | |||
* </p> | |||
* | |||
* @author jialiang.linjl | |||
* @author Eric Zhao | |||
*/ | |||
public class FlowRuleManager { | |||
private static final Map<String, List<FlowRule>> flowRules = new ConcurrentHashMap<String, List<FlowRule>>(); | |||
private final static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, | |||
new NamedThreadFactory("sentinel-metrics-record-task", true)); | |||
private final static FlowPropertyListener listener = new FlowPropertyListener(); | |||
private static final FlowPropertyListener LISTENER = new FlowPropertyListener(); | |||
private static SentinelProperty<List<FlowRule>> currentProperty = new DynamicSentinelProperty<List<FlowRule>>(); | |||
private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1, | |||
new NamedThreadFactory("sentinel-metrics-record-task", true)); | |||
static { | |||
currentProperty.addListener(listener); | |||
scheduler.scheduleAtFixedRate(new MetricTimerListener(), 0, 1, TimeUnit.SECONDS); | |||
currentProperty.addListener(LISTENER); | |||
SCHEDULER.scheduleAtFixedRate(new MetricTimerListener(), 0, 1, TimeUnit.SECONDS); | |||
} | |||
/** | |||
@@ -70,10 +65,10 @@ public class FlowRuleManager { | |||
* @param property the property to listen. | |||
*/ | |||
public static void register2Property(SentinelProperty<List<FlowRule>> property) { | |||
synchronized (listener) { | |||
synchronized (LISTENER) { | |||
RecordLog.info("[FlowRuleManager] Registering new property to flow rule manager"); | |||
currentProperty.removeListener(listener); | |||
property.addListener(listener); | |||
currentProperty.removeListener(LISTENER); | |||
property.addListener(LISTENER); | |||
currentProperty = property; | |||
} | |||
} | |||
@@ -100,65 +95,7 @@ public class FlowRuleManager { | |||
currentProperty.updateValue(rules); | |||
} | |||
private static Map<String, List<FlowRule>> loadFlowConf(List<FlowRule> list) { | |||
Map<String, List<FlowRule>> newRuleMap = new ConcurrentHashMap<String, List<FlowRule>>(); | |||
if (list == null || list.isEmpty()) { | |||
return newRuleMap; | |||
} | |||
for (FlowRule rule : list) { | |||
if (!isValidRule(rule)) { | |||
RecordLog.warn("[FlowRuleManager] Ignoring invalid flow rule when loading new flow rules: " + rule); | |||
continue; | |||
} | |||
if (StringUtil.isBlank(rule.getLimitApp())) { | |||
rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT); | |||
} | |||
TrafficShapingController rater = new DefaultController(rule.getCount(), rule.getGrade()); | |||
if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS | |||
&& rule.getControlBehavior() == RuleConstant.CONTROL_BEHAVIOR_WARM_UP | |||
&& rule.getWarmUpPeriodSec() > 0) { | |||
rater = new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(), ColdFactorProperty.coldFactor); | |||
} else if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS | |||
&& rule.getControlBehavior() == RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER | |||
&& rule.getMaxQueueingTimeMs() > 0) { | |||
rater = new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount()); | |||
} else if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS | |||
&& rule.getControlBehavior() == RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER | |||
&& rule.getMaxQueueingTimeMs() > 0 && rule.getWarmUpPeriodSec() > 0) { | |||
rater = new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(), | |||
rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor); | |||
} | |||
rule.setRater(rater); | |||
String identity = rule.getResource(); | |||
List<FlowRule> ruleM = newRuleMap.get(identity); | |||
if (ruleM == null) { | |||
ruleM = new ArrayList<FlowRule>(); | |||
newRuleMap.put(identity, ruleM); | |||
} | |||
ruleM.add(rule); | |||
} | |||
if (!newRuleMap.isEmpty()) { | |||
Comparator<FlowRule> comparator = new FlowRuleComparator(); | |||
// Sort the rules. | |||
for (List<FlowRule> rules : newRuleMap.values()) { | |||
Collections.sort(rules, comparator); | |||
} | |||
} | |||
return newRuleMap; | |||
} | |||
static Map<String, List<FlowRule>> getFlowRules() { | |||
static Map<String, List<FlowRule>> getFlowRuleMap() { | |||
return flowRules; | |||
} | |||
@@ -188,7 +125,7 @@ public class FlowRuleManager { | |||
@Override | |||
public void configUpdate(List<FlowRule> value) { | |||
Map<String, List<FlowRule>> rules = loadFlowConf(value); | |||
Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(value); | |||
if (rules != null) { | |||
flowRules.clear(); | |||
flowRules.putAll(rules); | |||
@@ -198,43 +135,13 @@ public class FlowRuleManager { | |||
@Override | |||
public void configLoad(List<FlowRule> conf) { | |||
Map<String, List<FlowRule>> rules = loadFlowConf(conf); | |||
Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(conf); | |||
if (rules != null) { | |||
flowRules.clear(); | |||
flowRules.putAll(rules); | |||
} | |||
RecordLog.info("[FlowRuleManager] Flow rules loaded: " + flowRules); | |||
} | |||
} | |||
public static boolean isValidRule(FlowRule rule) { | |||
boolean baseValid = rule != null && !StringUtil.isBlank(rule.getResource()) && rule.getCount() >= 0 | |||
&& rule.getGrade() >= 0 && rule.getStrategy() >= 0 && rule.getControlBehavior() >= 0; | |||
if (!baseValid) { | |||
return false; | |||
} | |||
// Check strategy and control (shaping) behavior. | |||
return checkStrategyField(rule) && checkControlBehaviorField(rule); | |||
} | |||
private static boolean checkStrategyField(/*@NonNull*/ FlowRule rule) { | |||
if (rule.getStrategy() == RuleConstant.STRATEGY_RELATE || rule.getStrategy() == RuleConstant.STRATEGY_CHAIN) { | |||
return StringUtil.isNotBlank(rule.getRefResource()); | |||
} | |||
return true; | |||
} | |||
private static boolean checkControlBehaviorField(/*@NonNull*/ FlowRule rule) { | |||
switch (rule.getControlBehavior()) { | |||
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP: | |||
return rule.getWarmUpPeriodSec() > 0; | |||
case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER: | |||
return rule.getMaxQueueingTimeMs() > 0; | |||
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER: | |||
return rule.getWarmUpPeriodSec() > 0 && rule.getMaxQueueingTimeMs() > 0; | |||
default: | |||
return true; | |||
} | |||
} | |||
} |
@@ -0,0 +1,227 @@ | |||
/* | |||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||
* | |||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||
* you may not use this file except in compliance with the License. | |||
* You may obtain a copy of the License at | |||
* | |||
* http://www.apache.org/licenses/LICENSE-2.0 | |||
* | |||
* Unless required by applicable law or agreed to in writing, software | |||
* distributed under the License is distributed on an "AS IS" BASIS, | |||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
* See the License for the specific language governing permissions and | |||
* limitations under the License. | |||
*/ | |||
package com.alibaba.csp.sentinel.slots.block.flow; | |||
import java.util.ArrayList; | |||
import java.util.Collections; | |||
import java.util.Comparator; | |||
import java.util.List; | |||
import java.util.Map; | |||
import java.util.concurrent.ConcurrentHashMap; | |||
import com.alibaba.csp.sentinel.log.RecordLog; | |||
import com.alibaba.csp.sentinel.slots.block.ClusterRuleConstant; | |||
import com.alibaba.csp.sentinel.slots.block.RuleConstant; | |||
import com.alibaba.csp.sentinel.slots.block.flow.controller.DefaultController; | |||
import com.alibaba.csp.sentinel.slots.block.flow.controller.RateLimiterController; | |||
import com.alibaba.csp.sentinel.slots.block.flow.controller.WarmUpController; | |||
import com.alibaba.csp.sentinel.slots.block.flow.controller.WarmUpRateLimiterController; | |||
import com.alibaba.csp.sentinel.util.StringUtil; | |||
import com.alibaba.csp.sentinel.util.function.Function; | |||
import com.alibaba.csp.sentinel.util.function.Predicate; | |||
/** | |||
* @author Eric Zhao | |||
* @since 1.4.0 | |||
*/ | |||
public final class FlowRuleUtil { | |||
/** | |||
* Build the flow rule map from raw list of flow rules, grouping by resource name. | |||
* | |||
* @param list raw list of flow rules | |||
* @return constructed new flow rule map; empty map if list is null or empty, or no valid rules | |||
*/ | |||
public static Map<String, List<FlowRule>> buildFlowRuleMap(List<FlowRule> list) { | |||
return buildFlowRuleMap(list, null); | |||
} | |||
/** | |||
* Build the flow rule map from raw list of flow rules, grouping by resource name. | |||
* | |||
* @param list raw list of flow rules | |||
* @param filter rule filter | |||
* @return constructed new flow rule map; empty map if list is null or empty, or no wanted rules | |||
*/ | |||
public static Map<String, List<FlowRule>> buildFlowRuleMap(List<FlowRule> list, Predicate<FlowRule> filter) { | |||
return buildFlowRuleMap(list, filter, true); | |||
} | |||
/** | |||
* Build the flow rule map from raw list of flow rules, grouping by resource name. | |||
* | |||
* @param list raw list of flow rules | |||
* @param filter rule filter | |||
* @param shouldSort whether the rules should be sorted | |||
* @return constructed new flow rule map; empty map if list is null or empty, or no wanted rules | |||
*/ | |||
public static Map<String, List<FlowRule>> buildFlowRuleMap(List<FlowRule> list, Predicate<FlowRule> filter, | |||
boolean shouldSort) { | |||
return buildFlowRuleMap(list, extractResource, filter, shouldSort); | |||
} | |||
/** | |||
* Build the flow rule map from raw list of flow rules, grouping by provided group function. | |||
* | |||
* @param list raw list of flow rules | |||
* @param groupFunction grouping function of the map (by key) | |||
* @param filter rule filter | |||
* @param shouldSort whether the rules should be sorted | |||
* @param <K> type of key | |||
* @return constructed new flow rule map; empty map if list is null or empty, or no wanted rules | |||
*/ | |||
public static <K> Map<K, List<FlowRule>> buildFlowRuleMap(List<FlowRule> list, Function<FlowRule, K> groupFunction, | |||
Predicate<FlowRule> filter, boolean shouldSort) { | |||
Map<K, List<FlowRule>> newRuleMap = new ConcurrentHashMap<K, List<FlowRule>>(); | |||
if (list == null || list.isEmpty()) { | |||
return newRuleMap; | |||
} | |||
for (FlowRule rule : list) { | |||
if (!isValidRule(rule)) { | |||
RecordLog.warn("[FlowRuleManager] Ignoring invalid flow rule when loading new flow rules: " + rule); | |||
continue; | |||
} | |||
if (filter != null && !filter.test(rule)) { | |||
continue; | |||
} | |||
if (StringUtil.isBlank(rule.getLimitApp())) { | |||
rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT); | |||
} | |||
TrafficShapingController rater = generateRater(rule); | |||
rule.setRater(rater); | |||
K key = groupFunction.apply(rule); | |||
if (key == null) { | |||
continue; | |||
} | |||
List<FlowRule> flowRules = newRuleMap.get(key); | |||
if (flowRules == null) { | |||
flowRules = new ArrayList<FlowRule>(); | |||
newRuleMap.put(key, flowRules); | |||
} | |||
flowRules.add(rule); | |||
} | |||
if (shouldSort && !newRuleMap.isEmpty()) { | |||
Comparator<FlowRule> comparator = new FlowRuleComparator(); | |||
// Sort the rules. | |||
for (List<FlowRule> rules : newRuleMap.values()) { | |||
Collections.sort(rules, comparator); | |||
} | |||
} | |||
return newRuleMap; | |||
} | |||
private static TrafficShapingController generateRater(/*@Valid*/ FlowRule rule) { | |||
if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) { | |||
switch (rule.getControlBehavior()) { | |||
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP: | |||
return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(), | |||
ColdFactorProperty.coldFactor); | |||
case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER: | |||
return new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount()); | |||
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER: | |||
return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(), | |||
rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor); | |||
case RuleConstant.CONTROL_BEHAVIOR_DEFAULT: | |||
default: | |||
// Default mode or unknown mode: default traffic shaping controller (fast-reject). | |||
} | |||
} | |||
return new DefaultController(rule.getCount(), rule.getGrade()); | |||
} | |||
/** | |||
* Check whether provided ID can be a valid cluster flow ID. | |||
* | |||
* @param id flow ID to check | |||
* @return true if valid, otherwise false | |||
*/ | |||
public static boolean validClusterRuleId(Integer id) { | |||
return id != null && id > 0; | |||
} | |||
/** | |||
* Check whether provided flow rule is valid. | |||
* | |||
* @param rule flow rule to check | |||
* @return true if valid, otherwise false | |||
*/ | |||
public static boolean isValidRule(FlowRule rule) { | |||
boolean baseValid = rule != null && !StringUtil.isBlank(rule.getResource()) && rule.getCount() >= 0 | |||
&& rule.getGrade() >= 0 && rule.getStrategy() >= 0 && rule.getControlBehavior() >= 0; | |||
if (!baseValid) { | |||
return false; | |||
} | |||
// Check strategy and control (shaping) behavior. | |||
return checkClusterField(rule) && checkStrategyField(rule) && checkControlBehaviorField(rule); | |||
} | |||
private static boolean checkClusterField(/*@NonNull*/ FlowRule rule) { | |||
if (!rule.isClusterMode()) { | |||
return true; | |||
} | |||
ClusterFlowConfig clusterConfig = rule.getClusterConfig(); | |||
if (clusterConfig == null) { | |||
return false; | |||
} | |||
if (!validClusterRuleId(clusterConfig.getFlowId())) { | |||
return false; | |||
} | |||
switch (rule.getStrategy()) { | |||
case ClusterRuleConstant.FLOW_CLUSTER_STRATEGY_NORMAL: | |||
return true; | |||
case ClusterRuleConstant.FLOW_CLUSTER_STRATEGY_REF: | |||
return validClusterRuleId(clusterConfig.getRefFlowId()); | |||
default: | |||
return true; | |||
} | |||
} | |||
private static boolean checkStrategyField(/*@NonNull*/ FlowRule rule) { | |||
if (rule.getStrategy() == RuleConstant.STRATEGY_RELATE || rule.getStrategy() == RuleConstant.STRATEGY_CHAIN) { | |||
return StringUtil.isNotBlank(rule.getRefResource()); | |||
} | |||
return true; | |||
} | |||
private static boolean checkControlBehaviorField(/*@NonNull*/ FlowRule rule) { | |||
switch (rule.getControlBehavior()) { | |||
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP: | |||
return rule.getWarmUpPeriodSec() > 0; | |||
case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER: | |||
return rule.getMaxQueueingTimeMs() > 0; | |||
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER: | |||
return rule.getWarmUpPeriodSec() > 0 && rule.getMaxQueueingTimeMs() > 0; | |||
default: | |||
return true; | |||
} | |||
} | |||
private static final Function<FlowRule, String> extractResource = new Function<FlowRule, String>() { | |||
@Override | |||
public String apply(FlowRule rule) { | |||
return rule.getResource(); | |||
} | |||
}; | |||
private FlowRuleUtil() {} | |||
} |
@@ -138,27 +138,27 @@ public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> { | |||
@Override | |||
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, | |||
boolean prioritized, Object... args) throws Throwable { | |||
checkFlow(resourceWrapper, context, node, count); | |||
checkFlow(resourceWrapper, context, node, count, prioritized); | |||
fireEntry(context, resourceWrapper, node, count, prioritized, args); | |||
} | |||
void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count) throws BlockException { | |||
void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException { | |||
// Flow rule map cannot be null. | |||
Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRules(); | |||
Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap(); | |||
List<FlowRule> rules = flowRules.get(resource.getName()); | |||
if (rules != null) { | |||
for (FlowRule rule : rules) { | |||
if (!canPassCheck(rule, context, node, count)) { | |||
if (!canPassCheck(rule, context, node, count, prioritized)) { | |||
throw new FlowException(rule.getLimitApp()); | |||
} | |||
} | |||
} | |||
} | |||
boolean canPassCheck(FlowRule rule, Context context, DefaultNode node, int count) { | |||
return FlowRuleChecker.passCheck(rule, context, node, count); | |||
boolean canPassCheck(FlowRule rule, Context context, DefaultNode node, int count, boolean prioritized) { | |||
return FlowRuleChecker.passCheck(rule, context, node, count, prioritized); | |||
} | |||
@Override | |||