From e9719d32ecfbaf64f0d620b6211c8c32d1c37711 Mon Sep 17 00:00:00 2001 From: Eric Zhao Date: Mon, 22 Apr 2019 11:06:26 +0800 Subject: [PATCH] Refactor flow algorithm for parameter flow control and support more traffic shaping mode (#677) * support burst count, statistic interval per rule and throttling mode. * Add fields in ParamFlowRule to support burst count, throttle, statistic interval per rule. * Deprecate HotParamLeapArray and use token bucket algorithm directly. --- .../demo/flow/param/ParamFlowQpsDemo.java | 16 +- .../demo/flow/param/ParamFlowQpsRunner.java | 46 +-- .../sentinel-parameter-flow-control/pom.xml | 10 + .../handler/FetchTopParamsCommandHandler.java | 64 ---- .../GetParamFlowRulesCommandHandler.java | 2 +- .../ModifyParamFlowRulesCommandHandler.java | 2 +- .../block/flow/param/ParamFlowChecker.java | 168 +++++++++-- .../slots/block/flow/param/ParamFlowRule.java | 81 +++-- .../flow/param/ParamFlowRuleManager.java | 10 +- .../block/flow/param/ParamFlowRuleUtil.java | 5 +- .../slots/block/flow/param/ParamFlowSlot.java | 20 +- .../block/flow/param/ParameterMetric.java | 210 +++++-------- .../ParamFlowStatisticEntryCallback.java | 4 +- ...libaba.csp.sentinel.command.CommandHandler | 3 +- .../flow/param/ParamFlowCheckerTest.java | 119 +++----- .../param/ParamFlowDefaultCheckerTest.java | 280 ++++++++++++++++++ .../block/flow/param/ParamFlowSlotTest.java | 51 +++- ...amFlowThrottleRateLimitingCheckerTest.java | 164 ++++++++++ .../block/flow/param/ParameterMetricTest.java | 99 ++++--- .../sentinel/test/AbstractTimeBasedTest.java | 58 ++++ 20 files changed, 986 insertions(+), 426 deletions(-) delete mode 100644 sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/command/handler/FetchTopParamsCommandHandler.java create mode 100644 sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowDefaultCheckerTest.java create mode 100644 sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowThrottleRateLimitingCheckerTest.java create mode 100644 sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/test/AbstractTimeBasedTest.java diff --git a/sentinel-demo/sentinel-demo-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/demo/flow/param/ParamFlowQpsDemo.java b/sentinel-demo/sentinel-demo-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/demo/flow/param/ParamFlowQpsDemo.java index 4e51de55..e3b29d7c 100644 --- a/sentinel-demo/sentinel-demo-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/demo/flow/param/ParamFlowQpsDemo.java +++ b/sentinel-demo/sentinel-demo-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/demo/flow/param/ParamFlowQpsDemo.java @@ -42,21 +42,27 @@ public class ParamFlowQpsDemo { private static final String RESOURCE_KEY = "resA"; - public static void main(String[] args) { - initHotParamFlowRules(); + public static void main(String[] args) throws Exception { + initParamFlowRules(); - final int threadCount = 8; + final int threadCount = 20; ParamFlowQpsRunner runner = new ParamFlowQpsRunner<>(PARAMS, RESOURCE_KEY, threadCount, 120); - runner.simulateTraffic(); runner.tick(); + + Thread.sleep(1000); + runner.simulateTraffic(); } - private static void initHotParamFlowRules() { + private static void initParamFlowRules() { // QPS mode, threshold is 5 for every frequent "hot spot" parameter in index 0 (the first arg). ParamFlowRule rule = new ParamFlowRule(RESOURCE_KEY) .setParamIdx(0) .setGrade(RuleConstant.FLOW_GRADE_QPS) + //.setDurationInSec(3) + //.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER) + //.setMaxQueueingTimeMs(600) .setCount(5); + // We can set threshold count for specific parameter value individually. // Here we add an exception item. That means: QPS threshold of entries with parameter `PARAM_B` (type: int) // in index 0 will be 10, rather than the global threshold (5). diff --git a/sentinel-demo/sentinel-demo-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/demo/flow/param/ParamFlowQpsRunner.java b/sentinel-demo/sentinel-demo-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/demo/flow/param/ParamFlowQpsRunner.java index fc213b2e..7dd00170 100644 --- a/sentinel-demo/sentinel-demo-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/demo/flow/param/ParamFlowQpsRunner.java +++ b/sentinel-demo/sentinel-demo-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/demo/flow/param/ParamFlowQpsRunner.java @@ -43,6 +43,7 @@ class ParamFlowQpsRunner { private final int threadCount; private final Map passCountMap = new ConcurrentHashMap<>(); + private final Map blockCountMap = new ConcurrentHashMap<>(); private volatile boolean stop = false; @@ -59,6 +60,7 @@ class ParamFlowQpsRunner { for (T param : params) { assertTrue(param != null, "Parameters should not be null"); passCountMap.putIfAbsent(param, new AtomicLong()); + blockCountMap.putIfAbsent(param, new AtomicLong()); } } @@ -94,11 +96,18 @@ class ParamFlowQpsRunner { private void passFor(T param) { passCountMap.get(param).incrementAndGet(); + // System.out.println(String.format("Parameter <%s> passed at: %d", param, TimeUtil.currentTimeMillis())); + } + + private void blockFor(T param) { + blockCountMap.get(param).incrementAndGet(); } final class RunTask implements Runnable { + @Override public void run() { + while (!stop) { Entry entry = null; T param = generateParam(); @@ -106,8 +115,9 @@ class ParamFlowQpsRunner { entry = SphU.entry(resourceName, EntryType.IN, 1, param); // Add pass for parameter. passFor(param); - } catch (BlockException e1) { + } catch (BlockException e) { // block.incrementAndGet(); + blockFor(param); } catch (Exception ex) { // biz exception ex.printStackTrace(); @@ -118,15 +128,19 @@ class ParamFlowQpsRunner { } } - try { - TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(0, 10)); - } catch (InterruptedException e) { - // ignore - } + sleep(ThreadLocalRandom.current().nextInt(0, 10)); } } } + private void sleep(int timeMs) { + try { + TimeUnit.MILLISECONDS.sleep(timeMs); + } catch (InterruptedException e) { + // ignore + } + } + final class TimerTask implements Runnable { @Override public void run() { @@ -139,21 +153,19 @@ class ParamFlowQpsRunner { map.putIfAbsent(param, 0L); } while (!stop) { - try { - TimeUnit.SECONDS.sleep(1); - } catch (InterruptedException e) { - } + sleep(1000); + // There may be a mismatch for time window of internal sliding window. // See corresponding `metrics.log` for accurate statistic log. for (T param : params) { - long globalPass = passCountMap.get(param).get(); - long oldPass = map.get(param); - long oneSecondPass = globalPass - oldPass; - map.put(param, globalPass); - System.out.println(String.format("[%d][%d] Parameter flow metrics for resource %s: " - + "pass count for param <%s> is %d", - seconds, TimeUtil.currentTimeMillis(), resourceName, param, oneSecondPass)); + + System.out.println(String.format( + "[%d][%d] Parameter flow metrics for resource %s: " + + "pass count for param <%s> is %d, block count: %d", + seconds, TimeUtil.currentTimeMillis(), resourceName, param, + passCountMap.get(param).getAndSet(0), blockCountMap.get(param).getAndSet(0))); } + System.out.println("============================="); if (seconds-- <= 0) { stop = true; } diff --git a/sentinel-extension/sentinel-parameter-flow-control/pom.xml b/sentinel-extension/sentinel-parameter-flow-control/pom.xml index 140334f9..6fe3c712 100644 --- a/sentinel-extension/sentinel-parameter-flow-control/pom.xml +++ b/sentinel-extension/sentinel-parameter-flow-control/pom.xml @@ -39,5 +39,15 @@ mockito-core test + + org.powermock + powermock-module-junit4 + test + + + org.powermock + powermock-api-mockito2 + test + \ No newline at end of file diff --git a/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/command/handler/FetchTopParamsCommandHandler.java b/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/command/handler/FetchTopParamsCommandHandler.java deleted file mode 100644 index bcd7ce53..00000000 --- a/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/command/handler/FetchTopParamsCommandHandler.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright 1999-2018 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.csp.sentinel.command.handler; - -import java.util.Map; - -import com.alibaba.csp.sentinel.command.CommandHandler; -import com.alibaba.csp.sentinel.command.CommandRequest; -import com.alibaba.csp.sentinel.command.CommandResponse; -import com.alibaba.csp.sentinel.command.annotation.CommandMapping; -import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowSlot; -import com.alibaba.csp.sentinel.slots.block.flow.param.ParameterMetric; -import com.alibaba.csp.sentinel.util.StringUtil; -import com.alibaba.fastjson.JSON; - -/** - * @author Eric Zhao - * @since 0.2.0 - */ -@CommandMapping(name = "topParams", desc = "get topN param in specified resource, accept param: res={resourceName}&idx={paramIndex}&n={topN}") -public class FetchTopParamsCommandHandler implements CommandHandler { - - @Override - public CommandResponse handle(CommandRequest request) { - String resourceName = request.getParam("res"); - if (StringUtil.isBlank(resourceName)) { - return CommandResponse.ofFailure(new IllegalArgumentException("Invalid parameter: res")); - } - String idx = request.getParam("idx"); - int index; - try { - index = Integer.valueOf(idx); - } catch (Exception ex) { - return CommandResponse.ofFailure(ex, "Invalid parameter: idx"); - } - String n = request.getParam("n"); - int amount; - try { - amount = Integer.valueOf(n); - } catch (Exception ex) { - return CommandResponse.ofFailure(ex, "Invalid parameter: n"); - } - ParameterMetric metric = ParamFlowSlot.getHotParamMetricForName(resourceName); - if (metric == null) { - return CommandResponse.ofSuccess("{}"); - } - Map values = metric.getTopPassParamCount(index, amount); - - return CommandResponse.ofSuccess(JSON.toJSONString(values)); - } -} diff --git a/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/command/handler/GetParamFlowRulesCommandHandler.java b/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/command/handler/GetParamFlowRulesCommandHandler.java index 9d3c918c..3f9d1436 100644 --- a/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/command/handler/GetParamFlowRulesCommandHandler.java +++ b/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/command/handler/GetParamFlowRulesCommandHandler.java @@ -26,7 +26,7 @@ import com.alibaba.fastjson.JSON; * @author Eric Zhao * @since 0.2.0 */ -@CommandMapping(name = "getParamFlowRules", desc = "get param flow rules") +@CommandMapping(name = "getParamFlowRules", desc = "Get all parameter flow rules") public class GetParamFlowRulesCommandHandler implements CommandHandler { @Override diff --git a/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/command/handler/ModifyParamFlowRulesCommandHandler.java b/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/command/handler/ModifyParamFlowRulesCommandHandler.java index e859db38..b8428aee 100644 --- a/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/command/handler/ModifyParamFlowRulesCommandHandler.java +++ b/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/command/handler/ModifyParamFlowRulesCommandHandler.java @@ -33,7 +33,7 @@ import com.alibaba.fastjson.JSONArray; * @author Eric Zhao * @since 0.2.0 */ -@CommandMapping(name = "setParamFlowRules", desc = "set param flow rules, accept param: data={paramFlowRule Json}") +@CommandMapping(name = "setParamFlowRules", desc = "Set parameter flow rules, while previous rules will be replaced.") public class ModifyParamFlowRulesCommandHandler implements CommandHandler { private static WritableDataSource> paramFlowWds = null; diff --git a/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowChecker.java b/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowChecker.java index fc6f499e..b61c2491 100644 --- a/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowChecker.java +++ b/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowChecker.java @@ -21,16 +21,21 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import com.alibaba.csp.sentinel.cluster.ClusterStateManager; -import com.alibaba.csp.sentinel.cluster.TokenService; -import com.alibaba.csp.sentinel.cluster.client.TokenClientProvider; import com.alibaba.csp.sentinel.cluster.TokenResult; import com.alibaba.csp.sentinel.cluster.TokenResultStatus; +import com.alibaba.csp.sentinel.cluster.TokenService; +import com.alibaba.csp.sentinel.cluster.client.TokenClientProvider; import com.alibaba.csp.sentinel.cluster.server.EmbeddedClusterTokenServerProvider; import com.alibaba.csp.sentinel.log.RecordLog; import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; import com.alibaba.csp.sentinel.slots.block.RuleConstant; +import com.alibaba.csp.sentinel.slots.statistic.cache.CacheMap; +import com.alibaba.csp.sentinel.util.TimeUtil; /** * Rule checker for parameter flow control. @@ -91,23 +96,17 @@ final class ParamFlowChecker { return true; } - static boolean passSingleValueCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count, Object value) { - Set exclusionItems = rule.getParsedHotItems().keySet(); + static boolean passSingleValueCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount, + Object value) { if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) { - double curCount = getHotParameters(resourceWrapper).getPassParamQps(rule.getParamIdx(), value); - - if (exclusionItems.contains(value)) { - // Pass check for exclusion items. - int itemQps = rule.getParsedHotItems().get(value); - return curCount + count <= itemQps; - } else if (curCount + count > rule.getCount()) { - if ((curCount - rule.getCount()) < 1 && (curCount - rule.getCount()) > 0) { - return true; - } - return false; + if (rule.getControlBehavior() == RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER) { + return passThrottleLocalCheck(resourceWrapper, rule, acquireCount, value); + } else { + return passDefaultLocalCheck(resourceWrapper, rule, acquireCount, value); } } else if (rule.getGrade() == RuleConstant.FLOW_GRADE_THREAD) { - long threadCount = getHotParameters(resourceWrapper).getThreadCount(rule.getParamIdx(), value); + Set exclusionItems = rule.getParsedHotItems().keySet(); + long threadCount = getParameterMetric(resourceWrapper).getThreadCount(rule.getParamIdx(), value); if (exclusionItems.contains(value)) { int itemThreshold = rule.getParsedHotItems().get(value); return ++threadCount <= itemThreshold; @@ -119,7 +118,136 @@ final class ParamFlowChecker { return true; } - private static ParameterMetric getHotParameters(ResourceWrapper resourceWrapper) { + static boolean passDefaultLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount, + Object value) { + ParameterMetric metric = getParameterMetric(resourceWrapper); + CacheMap tokenCounters = metric == null ? null : metric.getRuleTokenCounter(rule); + CacheMap timeCounters = metric == null ? null : metric.getRuleTimeCounter(rule); + + if (tokenCounters == null || timeCounters == null) { + return true; + } + + // Calculate max token count (threshold) + Set exclusionItems = rule.getParsedHotItems().keySet(); + int tokenCount = (int)rule.getCount(); + if (exclusionItems.contains(value)) { + tokenCount = rule.getParsedHotItems().get(value); + } + + if (tokenCount == 0) { + return false; + } + + int maxCount = tokenCount + rule.getBurstCount(); + if (acquireCount > maxCount) { + return false; + } + + while (true) { + long currentTime = TimeUtil.currentTimeMillis(); + + AtomicLong lastAddTokenTime = timeCounters.putIfAbsent(value, new AtomicLong(currentTime)); + if (lastAddTokenTime == null) { + // Token never added, just replenish the tokens and consume {@code acquireCount} immediately. + tokenCounters.putIfAbsent(value, new AtomicInteger(maxCount - acquireCount)); + return true; + } + + // Calculate the time duration since last token was added. + long passTime = currentTime - lastAddTokenTime.get(); + // A simplified token bucket algorithm that will replenish the tokens only when statistic window has passed. + if (passTime > rule.getDurationInSec() * 1000) { + AtomicInteger oldQps = tokenCounters.putIfAbsent(value, new AtomicInteger(maxCount - acquireCount)); + if (oldQps == null) { + // Might not be accurate here. + lastAddTokenTime.set(currentTime); + return true; + } else { + int restQps = oldQps.get(); + int toAddCount = (int)((passTime * tokenCount) / (rule.getDurationInSec() * 1000)); + int newQps = (restQps + toAddCount) > maxCount ? (maxCount - acquireCount) + : (restQps + toAddCount - acquireCount); + + if (newQps < 0) { + return false; + } + if (oldQps.compareAndSet(restQps, newQps)) { + lastAddTokenTime.set(currentTime); + return true; + } + Thread.yield(); + } + } else { + AtomicInteger oldQps = tokenCounters.get(value); + if (oldQps != null) { + int oldQpsValue = oldQps.get(); + if (oldQpsValue - acquireCount >= 0) { + if (oldQps.compareAndSet(oldQpsValue, oldQpsValue - acquireCount)) { + return true; + } + } else { + return false; + } + } + Thread.yield(); + } + } + } + + static boolean passThrottleLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount, + Object value) { + ParameterMetric metric = getParameterMetric(resourceWrapper); + CacheMap timeRecorderMap = metric == null ? null : metric.getRuleTimeCounter(rule); + if (timeRecorderMap == null) { + return true; + } + + // Calculate max token count (threshold) + Set exclusionItems = rule.getParsedHotItems().keySet(); + long tokenCount = (long)rule.getCount(); + if (exclusionItems.contains(value)) { + tokenCount = rule.getParsedHotItems().get(value); + } + + if (tokenCount == 0) { + return false; + } + + long costTime = Math.round(1.0 * 1000 * acquireCount * rule.getDurationInSec() / tokenCount); + while (true) { + long currentTime = TimeUtil.currentTimeMillis(); + AtomicLong timeRecorder = timeRecorderMap.putIfAbsent(value, new AtomicLong(currentTime)); + if (timeRecorder == null) { + return true; + } + //AtomicLong timeRecorder = timeRecorderMap.get(value); + long lastPassTime = timeRecorder.get(); + long expectedTime = lastPassTime + costTime; + + if (expectedTime <= currentTime || expectedTime - currentTime < rule.getMaxQueueingTimeMs()) { + AtomicLong lastPastTimeRef = timeRecorderMap.get(value); + if (lastPastTimeRef.compareAndSet(lastPassTime, currentTime)) { + long waitTime = expectedTime - currentTime; + if (waitTime > 0) { + lastPastTimeRef.set(expectedTime); + try { + TimeUnit.MILLISECONDS.sleep(waitTime); + } catch (InterruptedException e) { + RecordLog.warn("passThrottleLocalCheck: wait interrupted", e); + } + } + return true; + } else { + Thread.yield(); + } + } else { + return false; + } + } + } + + private static ParameterMetric getParameterMetric(ResourceWrapper resourceWrapper) { // Should not be null. return ParamFlowSlot.getParamMetric(resourceWrapper); } @@ -148,7 +276,8 @@ final class ParamFlowChecker { TokenService clusterService = pickClusterService(); if (clusterService == null) { - // No available cluster client or server, fallback to local or pass in need. + // No available cluster client or server, fallback to local or + // pass in need. return fallbackToLocalOrPass(resourceWrapper, rule, count, params); } @@ -187,5 +316,6 @@ final class ParamFlowChecker { return null; } - private ParamFlowChecker() {} + private ParamFlowChecker() { + } } diff --git a/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowRule.java b/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowRule.java index 5b2747fc..ff460429 100644 --- a/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowRule.java +++ b/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowRule.java @@ -19,6 +19,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import com.alibaba.csp.sentinel.context.Context; import com.alibaba.csp.sentinel.node.DefaultNode; @@ -55,6 +56,15 @@ public class ParamFlowRule extends AbstractRule { */ private double count; + /** + * Traffic shaping behavior (since 1.6.0). + */ + private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT; + + private int maxQueueingTimeMs = 0; + private int burstCount = 0; + private long durationInSec = 1; + /** * Original exclusion items of parameters. */ @@ -74,6 +84,42 @@ public class ParamFlowRule extends AbstractRule { */ private ParamFlowClusterConfig clusterConfig; + public int getControlBehavior() { + return controlBehavior; + } + + public ParamFlowRule setControlBehavior(int controlBehavior) { + this.controlBehavior = controlBehavior; + return this; + } + + public int getMaxQueueingTimeMs() { + return maxQueueingTimeMs; + } + + public ParamFlowRule setMaxQueueingTimeMs(int maxQueueingTimeMs) { + this.maxQueueingTimeMs = maxQueueingTimeMs; + return this; + } + + public int getBurstCount() { + return burstCount; + } + + public ParamFlowRule setBurstCount(int burstCount) { + this.burstCount = burstCount; + return this; + } + + public long getDurationInSec() { + return durationInSec; + } + + public ParamFlowRule setDurationInSec(long durationInSec) { + this.durationInSec = durationInSec; + return this; + } + public int getGrade() { return grade; } @@ -156,15 +202,18 @@ public class ParamFlowRule extends AbstractRule { if (o == null || getClass() != o.getClass()) { return false; } if (!super.equals(o)) { return false; } - ParamFlowRule rule = (ParamFlowRule)o; + ParamFlowRule that = (ParamFlowRule)o; - if (grade != rule.grade) { return false; } - if (Double.compare(rule.count, count) != 0) { return false; } - if (clusterMode != rule.clusterMode) { return false; } - if (paramIdx != null ? !paramIdx.equals(rule.paramIdx) : rule.paramIdx != null) { return false; } - if (paramFlowItemList != null ? !paramFlowItemList.equals(rule.paramFlowItemList) - : rule.paramFlowItemList != null) { return false; } - return clusterConfig != null ? clusterConfig.equals(rule.clusterConfig) : rule.clusterConfig == null; + if (grade != that.grade) { return false; } + if (Double.compare(that.count, count) != 0) { return false; } + if (controlBehavior != that.controlBehavior) { return false; } + if (maxQueueingTimeMs != that.maxQueueingTimeMs) { return false; } + if (burstCount != that.burstCount) { return false; } + if (durationInSec != that.durationInSec) { return false; } + if (clusterMode != that.clusterMode) { return false; } + if (!Objects.equals(paramIdx, that.paramIdx)) { return false; } + if (!Objects.equals(paramFlowItemList, that.paramFlowItemList)) { return false; } + return Objects.equals(clusterConfig, that.clusterConfig); } @Override @@ -175,21 +224,13 @@ public class ParamFlowRule extends AbstractRule { result = 31 * result + (paramIdx != null ? paramIdx.hashCode() : 0); temp = Double.doubleToLongBits(count); result = 31 * result + (int)(temp ^ (temp >>> 32)); + result = 31 * result + controlBehavior; + result = 31 * result + maxQueueingTimeMs; + result = 31 * result + burstCount; + result = 31 * result + (int)(durationInSec ^ (durationInSec >>> 32)); result = 31 * result + (paramFlowItemList != null ? paramFlowItemList.hashCode() : 0); result = 31 * result + (clusterMode ? 1 : 0); result = 31 * result + (clusterConfig != null ? clusterConfig.hashCode() : 0); return result; } - - @Override - public String toString() { - return "ParamFlowRule{" + - "grade=" + grade + - ", paramIdx=" + paramIdx + - ", count=" + count + - ", paramFlowItemList=" + paramFlowItemList + - ", clusterMode=" + clusterMode + - ", clusterConfig=" + clusterConfig + - '}'; - } } diff --git a/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowRuleManager.java b/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowRuleManager.java index 40c923b5..2481c943 100644 --- a/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowRuleManager.java +++ b/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowRuleManager.java @@ -62,8 +62,9 @@ public final class ParamFlowRuleManager { } /** - * Listen to the {@link SentinelProperty} for {@link ParamFlowRule}s. The property is the source - * of {@link ParamFlowRule}s. Parameter flow rules can also be set by {@link #loadRules(List)} directly. + * Listen to the {@link SentinelProperty} for {@link ParamFlowRule}s. The + * property is the source of {@link ParamFlowRule}s. Parameter flow rules + * can also be set by {@link #loadRules(List)} directly. * * @param property the property to listen */ @@ -149,6 +150,7 @@ public final class ParamFlowRuleManager { ruleSet = new HashSet<>(); newRuleMap.put(resourceName, ruleSet); } + ruleSet.add(rule); } @@ -164,6 +166,6 @@ public final class ParamFlowRuleManager { } } - private ParamFlowRuleManager() {} + private ParamFlowRuleManager() { + } } - diff --git a/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowRuleUtil.java b/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowRuleUtil.java index 356ec050..a8809a15 100644 --- a/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowRuleUtil.java +++ b/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowRuleUtil.java @@ -31,7 +31,10 @@ public final class ParamFlowRuleUtil { public static boolean isValidRule(ParamFlowRule rule) { return rule != null && !StringUtil.isBlank(rule.getResource()) && rule.getCount() >= 0 - && rule.getGrade() >= 0 && rule.getParamIdx() != null && checkCluster(rule); + && rule.getGrade() >= 0 && rule.getParamIdx() != null + && rule.getBurstCount() >= 0 && rule.getControlBehavior() >= 0 + && rule.getDurationInSec() > 0 && rule.getMaxQueueingTimeMs() >= 0 + && checkCluster(rule); } private static boolean checkCluster(/*@PreChecked*/ ParamFlowRule rule) { diff --git a/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowSlot.java b/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowSlot.java index a57b6636..001f7d3f 100644 --- a/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowSlot.java +++ b/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowSlot.java @@ -87,13 +87,9 @@ public class ParamFlowSlot extends AbstractLinkedProcessorSlot { applyRealParamIdx(rule, args.length); // Initialize the parameter metrics. - initHotParamMetricsFor(resourceWrapper, rule.getParamIdx()); + initHotParamMetricsFor(resourceWrapper, rule); if (!ParamFlowChecker.passCheck(resourceWrapper, rule, count, args)) { - - // Here we add the block count. - addBlockCount(resourceWrapper, count, args); - String triggeredParam = ""; if (args.length > rule.getParamIdx()) { Object value = args[rule.getParamIdx()]; @@ -104,22 +100,14 @@ public class ParamFlowSlot extends AbstractLinkedProcessorSlot { } } - private void addBlockCount(ResourceWrapper resourceWrapper, int count, Object... args) { - ParameterMetric parameterMetric = ParamFlowSlot.getParamMetric(resourceWrapper); - - if (parameterMetric != null) { - parameterMetric.addBlock(count, args); - } - } - /** * Init the parameter metric and index map for given resource. * Package-private for test. * * @param resourceWrapper resource to init - * @param index index to initialize, which must be valid + * @param rule relevant rule */ - void initHotParamMetricsFor(ResourceWrapper resourceWrapper, /*@Valid*/ int index) { + void initHotParamMetricsFor(ResourceWrapper resourceWrapper, /*@Valid*/ ParamFlowRule rule) { ParameterMetric metric; // Assume that the resource is valid. if ((metric = metricsMap.get(resourceWrapper)) == null) { @@ -131,7 +119,7 @@ public class ParamFlowSlot extends AbstractLinkedProcessorSlot { } } } - metric.initializeForIndex(index); + metric.initialize(rule); } public static ParameterMetric getParamMetric(ResourceWrapper resourceWrapper) { diff --git a/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParameterMetric.java b/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParameterMetric.java index 0f2a8003..4cb9d53f 100644 --- a/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParameterMetric.java +++ b/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParameterMetric.java @@ -19,16 +19,12 @@ import java.lang.reflect.Array; import java.util.Collection; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import com.alibaba.csp.sentinel.log.RecordLog; -import com.alibaba.csp.sentinel.node.IntervalProperty; -import com.alibaba.csp.sentinel.node.SampleCountProperty; import com.alibaba.csp.sentinel.slots.statistic.cache.CacheMap; import com.alibaba.csp.sentinel.slots.statistic.cache.ConcurrentLinkedHashMapWrapper; -import com.alibaba.csp.sentinel.slots.statistic.metric.HotParameterLeapArray; -import com.alibaba.csp.sentinel.util.AssertUtil; /** * Metrics for frequent ("hot spot") parameters. @@ -38,49 +34,79 @@ import com.alibaba.csp.sentinel.util.AssertUtil; */ public class ParameterMetric { - private final int sampleCount; - private final int intervalMs; - - public ParameterMetric() { - this(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL); + private static final int THREAD_COUNT_MAX_CAPACITY = 4000; + private static final int BASE_PARAM_MAX_CAPACITY = 4000; + private static final int TOTAL_MAX_CAPACITY = 20_0000; + + private final Object lock = new Object(); + + /** + * Format: (rule, (value, timeRecorder)) + * + * @since 1.6.0 + */ + private final Map> ruleTimeCounters = new HashMap<>(); + /** + * Format: (rule, (value, tokenCounter)) + * + * @since 1.6.0 + */ + private final Map> ruleTokenCounter = new HashMap<>(); + private final Map> threadCountMap = new HashMap<>(); + + /** + * Get the token counter for given parameter rule. + * + * @param rule valid parameter rule + * @return the associated token counter + * @since 1.6.0 + */ + public CacheMap getRuleTokenCounter(ParamFlowRule rule) { + return ruleTokenCounter.get(rule); } - public ParameterMetric(int sampleCount, int intervalInMs) { - AssertUtil.isTrue(sampleCount > 0, "sampleCount should be positive"); - AssertUtil.isTrue(intervalInMs > 0, "window interval should be positive"); - AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided"); - this.sampleCount = sampleCount; - this.intervalMs = intervalInMs; + /** + * Get the time record counter for given parameter rule. + * + * @param rule valid parameter rule + * @return the associated time counter + * @since 1.6.0 + */ + public CacheMap getRuleTimeCounter(ParamFlowRule rule) { + return ruleTimeCounters.get(rule); } - private Map rollingParameters = - new ConcurrentHashMap(); - private Map> threadCountMap = - new ConcurrentHashMap>(); - - public Map getRollingParameters() { - return rollingParameters; - } - - public Map> getThreadCountMap() { - return threadCountMap; + public void clear() { + synchronized (lock) { + threadCountMap.clear(); + ruleTimeCounters.clear(); + ruleTokenCounter.clear(); + } } - public synchronized void clear() { - rollingParameters.clear(); - threadCountMap.clear(); - } + public void initialize(ParamFlowRule rule) { + if (!ruleTimeCounters.containsKey(rule)) { + synchronized (lock) { + if (ruleTimeCounters.get(rule) == null) { + long size = Math.min(BASE_PARAM_MAX_CAPACITY * rule.getDurationInSec(), TOTAL_MAX_CAPACITY); + ruleTimeCounters.put(rule, new ConcurrentLinkedHashMapWrapper(size)); + } + } + } - public void initializeForIndex(int index) { - if (!rollingParameters.containsKey(index)) { - synchronized (this) { - // putIfAbsent - if (rollingParameters.get(index) == null) { - rollingParameters.put(index, new HotParameterLeapArray(sampleCount, intervalMs)); + if (!ruleTokenCounter.containsKey(rule)) { + synchronized (lock) { + if (ruleTokenCounter.get(rule) == null) { + long size = Math.min(BASE_PARAM_MAX_CAPACITY * rule.getDurationInSec(), TOTAL_MAX_CAPACITY); + ruleTokenCounter.put(rule, new ConcurrentLinkedHashMapWrapper(size)); } + } + } - if (threadCountMap.get(index) == null) { - threadCountMap.put(index, + if (!threadCountMap.containsKey(rule.getParamIdx())) { + synchronized (lock) { + if (threadCountMap.get(rule.getParamIdx()) == null) { + threadCountMap.put(rule.getParamIdx(), new ConcurrentLinkedHashMapWrapper(THREAD_COUNT_MAX_CAPACITY)); } } @@ -204,102 +230,30 @@ public class ParameterMetric { } } - public void addPass(int count, Object... args) { - add(RollingParamEvent.REQUEST_PASSED, count, args); - } - - public void addBlock(int count, Object... args) { - add(RollingParamEvent.REQUEST_BLOCKED, count, args); - } - - @SuppressWarnings("rawtypes") - private void add(RollingParamEvent event, int count, Object... args) { - if (args == null) { - return; - } - try { - for (int index = 0; index < args.length; index++) { - HotParameterLeapArray param = rollingParameters.get(index); - if (param == null) { - continue; - } - - Object arg = args[index]; - if (arg == null) { - continue; - } - if (Collection.class.isAssignableFrom(arg.getClass())) { - for (Object value : ((Collection)arg)) { - param.addValue(event, count, value); - } - } else if (arg.getClass().isArray()) { - int length = Array.getLength(arg); - for (int i = 0; i < length; i++) { - Object value = Array.get(arg, i); - param.addValue(event, count, value); - } - } else { - param.addValue(event, count, arg); - } - - } - } catch (Throwable e) { - RecordLog.warn("[ParameterMetric] Param exception", e); - } - } - - public double getPassParamQps(int index, Object value) { - try { - HotParameterLeapArray parameter = rollingParameters.get(index); - if (parameter == null || value == null) { - return -1; - } - return parameter.getRollingAvg(RollingParamEvent.REQUEST_PASSED, value); - } catch (Throwable e) { - RecordLog.info(e.getMessage(), e); + public long getThreadCount(int index, Object value) { + CacheMap cacheMap = threadCountMap.get(index); + if (cacheMap == null) { + return 0; } - return -1; + AtomicInteger count = cacheMap.get(value); + return count == null ? 0L : count.get(); } - public long getBlockParamQps(int index, Object value) { - try { - HotParameterLeapArray parameter = rollingParameters.get(index); - if (parameter == null || value == null) { - return -1; - } - - return (long)rollingParameters.get(index).getRollingAvg(RollingParamEvent.REQUEST_BLOCKED, value); - } catch (Throwable e) { - RecordLog.info(e.getMessage(), e); - } - - return -1; + /** + * Get the token counter map. Package-private for test. + * + * @return the token counter map + */ + Map> getRuleTokenCounterMap() { + return ruleTokenCounter; } - public Map getTopPassParamCount(int index, int number) { - try { - HotParameterLeapArray parameter = rollingParameters.get(index); - if (parameter == null) { - return new HashMap(); - } - - return parameter.getTopValues(RollingParamEvent.REQUEST_PASSED, number); - } catch (Throwable e) { - RecordLog.info(e.getMessage(), e); - } - - return new HashMap(); + Map> getThreadCountMap() { + return threadCountMap; } - public long getThreadCount(int index, Object value) { - if (threadCountMap.get(index) == null) { - return 0; - } - - AtomicInteger count = threadCountMap.get(index).get(value); - return count == null ? 0L : count.get(); + Map> getRuleTimeCounterMap() { + return ruleTimeCounters; } - - private static final long THREAD_COUNT_MAX_CAPACITY = 4000; } diff --git a/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/statistic/ParamFlowStatisticEntryCallback.java b/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/statistic/ParamFlowStatisticEntryCallback.java index 0aee2155..b7c09d81 100644 --- a/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/statistic/ParamFlowStatisticEntryCallback.java +++ b/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/statistic/ParamFlowStatisticEntryCallback.java @@ -30,13 +30,11 @@ import com.alibaba.csp.sentinel.slots.block.flow.param.ParameterMetric; public class ParamFlowStatisticEntryCallback implements ProcessorSlotEntryCallback { @Override - public void onPass(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args) - throws Exception { + public void onPass(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args) { // The "hot spot" parameter metric is present only if parameter flow rules for the resource exist. ParameterMetric parameterMetric = ParamFlowSlot.getParamMetric(resourceWrapper); if (parameterMetric != null) { - parameterMetric.addPass(count, args); parameterMetric.addThreadCount(args); } } diff --git a/sentinel-extension/sentinel-parameter-flow-control/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.command.CommandHandler b/sentinel-extension/sentinel-parameter-flow-control/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.command.CommandHandler index 4c8e6938..e97b4775 100755 --- a/sentinel-extension/sentinel-parameter-flow-control/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.command.CommandHandler +++ b/sentinel-extension/sentinel-parameter-flow-control/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.command.CommandHandler @@ -1,3 +1,2 @@ com.alibaba.csp.sentinel.command.handler.GetParamFlowRulesCommandHandler -com.alibaba.csp.sentinel.command.handler.ModifyParamFlowRulesCommandHandler -com.alibaba.csp.sentinel.command.handler.FetchTopParamsCommandHandler \ No newline at end of file +com.alibaba.csp.sentinel.command.handler.ModifyParamFlowRulesCommandHandler \ No newline at end of file diff --git a/sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowCheckerTest.java b/sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowCheckerTest.java index b22bd77c..628b859f 100644 --- a/sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowCheckerTest.java +++ b/sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowCheckerTest.java @@ -15,23 +15,29 @@ */ package com.alibaba.csp.sentinel.slots.block.flow.param; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; - -import com.alibaba.csp.sentinel.EntryType; -import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; -import com.alibaba.csp.sentinel.slotchain.StringResourceWrapper; -import com.alibaba.csp.sentinel.slots.block.RuleConstant; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.junit.After; import org.junit.Before; import org.junit.Test; -import static org.junit.Assert.*; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import com.alibaba.csp.sentinel.EntryType; +import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; +import com.alibaba.csp.sentinel.slotchain.StringResourceWrapper; +import com.alibaba.csp.sentinel.slots.block.RuleConstant; +import com.alibaba.csp.sentinel.slots.statistic.cache.ConcurrentLinkedHashMapWrapper; +import com.alibaba.csp.sentinel.util.TimeUtil; /** * Test cases for {@link ParamFlowChecker}. @@ -56,43 +62,21 @@ public class ParamFlowCheckerTest { } @Test - public void testSingleValueCheckQpsWithoutExceptionItems() { - final String resourceName = "testSingleValueCheckQpsWithoutExceptionItems"; - final ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN); - int paramIdx = 0; - - long threshold = 5L; - - ParamFlowRule rule = new ParamFlowRule(); - rule.setResource(resourceName); - rule.setCount(threshold); - rule.setParamIdx(paramIdx); - - String valueA = "valueA"; - String valueB = "valueB"; - ParameterMetric metric = mock(ParameterMetric.class); - when(metric.getPassParamQps(paramIdx, valueA)).thenReturn((double)threshold - 1); - when(metric.getPassParamQps(paramIdx, valueB)).thenReturn((double)threshold + 1); - ParamFlowSlot.getMetricsMap().put(resourceWrapper, metric); - - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueB)); - } - - @Test - public void testSingleValueCheckQpsWithExceptionItems() { + public void testSingleValueCheckQpsWithExceptionItems() throws InterruptedException { final String resourceName = "testSingleValueCheckQpsWithExceptionItems"; final ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN); + TimeUtil.currentTimeMillis(); int paramIdx = 0; long globalThreshold = 5L; - int thresholdB = 3; + int thresholdB = 0; int thresholdD = 7; ParamFlowRule rule = new ParamFlowRule(); rule.setResource(resourceName); rule.setCount(globalThreshold); rule.setParamIdx(paramIdx); + rule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER); String valueA = "valueA"; String valueB = "valueB"; @@ -105,29 +89,13 @@ public class ParamFlowCheckerTest { map.put(valueD, thresholdD); rule.setParsedHotItems(map); - ParameterMetric metric = mock(ParameterMetric.class); - when(metric.getPassParamQps(paramIdx, valueA)).thenReturn((double)globalThreshold - 1); - when(metric.getPassParamQps(paramIdx, valueB)).thenReturn((double)globalThreshold - 1); - when(metric.getPassParamQps(paramIdx, valueC)).thenReturn((double)globalThreshold - 1); - when(metric.getPassParamQps(paramIdx, valueD)).thenReturn((double)globalThreshold + 1); + ParameterMetric metric = new ParameterMetric(); ParamFlowSlot.getMetricsMap().put(resourceWrapper, metric); + metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper(4000)); assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueB)); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueC)); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueD)); - - when(metric.getPassParamQps(paramIdx, valueA)).thenReturn((double)globalThreshold); - when(metric.getPassParamQps(paramIdx, valueB)).thenReturn((double)thresholdB - 1L); - when(metric.getPassParamQps(paramIdx, valueC)).thenReturn((double)globalThreshold + 1); - when(metric.getPassParamQps(paramIdx, valueD)).thenReturn((double)globalThreshold - 1) - .thenReturn((double)thresholdD); - - assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueB)); - assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueC)); - assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueD)); - assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueD)); + TimeUnit.SECONDS.sleep(3); } @Test @@ -140,9 +108,7 @@ public class ParamFlowCheckerTest { int thresholdB = 3; int thresholdD = 7; - ParamFlowRule rule = new ParamFlowRule(resourceName) - .setCount(globalThreshold) - .setParamIdx(paramIdx) + ParamFlowRule rule = new ParamFlowRule(resourceName).setCount(globalThreshold).setParamIdx(paramIdx) .setGrade(RuleConstant.FLOW_GRADE_THREAD); String valueA = "valueA"; @@ -171,8 +137,7 @@ public class ParamFlowCheckerTest { when(metric.getThreadCount(paramIdx, valueA)).thenReturn(globalThreshold); when(metric.getThreadCount(paramIdx, valueB)).thenReturn(thresholdB - 1L); when(metric.getThreadCount(paramIdx, valueC)).thenReturn(globalThreshold + 1); - when(metric.getThreadCount(paramIdx, valueD)).thenReturn(globalThreshold - 1) - .thenReturn((long)thresholdD); + when(metric.getThreadCount(paramIdx, valueD)).thenReturn(globalThreshold - 1).thenReturn((long)thresholdD); assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueB)); @@ -182,52 +147,42 @@ public class ParamFlowCheckerTest { } @Test - public void testPassLocalCheckForCollection() { + public void testPassLocalCheckForCollection() throws InterruptedException { final String resourceName = "testPassLocalCheckForCollection"; final ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN); int paramIdx = 0; - double globalThreshold = 10; + double globalThreshold = 1; - ParamFlowRule rule = new ParamFlowRule(resourceName) - .setParamIdx(paramIdx) - .setCount(globalThreshold); + ParamFlowRule rule = new ParamFlowRule(resourceName).setParamIdx(paramIdx).setCount(globalThreshold); String v1 = "a", v2 = "B", v3 = "Cc"; List list = Arrays.asList(v1, v2, v3); - ParameterMetric metric = mock(ParameterMetric.class); - when(metric.getPassParamQps(paramIdx, v1)).thenReturn(globalThreshold - 2) - .thenReturn(globalThreshold - 1); - when(metric.getPassParamQps(paramIdx, v2)).thenReturn(globalThreshold - 2) - .thenReturn(globalThreshold - 1); - when(metric.getPassParamQps(paramIdx, v3)).thenReturn(globalThreshold - 1) - .thenReturn(globalThreshold); + ParameterMetric metric = new ParameterMetric(); ParamFlowSlot.getMetricsMap().put(resourceWrapper, metric); + metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper(4000)); + metric.getRuleTokenCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper(4000)); assertTrue(ParamFlowChecker.passCheck(resourceWrapper, rule, 1, list)); assertFalse(ParamFlowChecker.passCheck(resourceWrapper, rule, 1, list)); } @Test - public void testPassLocalCheckForArray() { + public void testPassLocalCheckForArray() throws InterruptedException { final String resourceName = "testPassLocalCheckForArray"; final ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN); int paramIdx = 0; - double globalThreshold = 10; + double globalThreshold = 1; - ParamFlowRule rule = new ParamFlowRule(resourceName) - .setParamIdx(paramIdx) - .setCount(globalThreshold); + ParamFlowRule rule = new ParamFlowRule(resourceName).setParamIdx(paramIdx) + .setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER).setCount(globalThreshold); + + TimeUtil.currentTimeMillis(); String v1 = "a", v2 = "B", v3 = "Cc"; Object arr = new String[] {v1, v2, v3}; - ParameterMetric metric = mock(ParameterMetric.class); - when(metric.getPassParamQps(paramIdx, v1)).thenReturn(globalThreshold - 2) - .thenReturn(globalThreshold - 1); - when(metric.getPassParamQps(paramIdx, v2)).thenReturn(globalThreshold - 2) - .thenReturn(globalThreshold - 1); - when(metric.getPassParamQps(paramIdx, v3)).thenReturn(globalThreshold - 1) - .thenReturn(globalThreshold); + ParameterMetric metric = new ParameterMetric(); ParamFlowSlot.getMetricsMap().put(resourceWrapper, metric); + metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper(4000)); assertTrue(ParamFlowChecker.passCheck(resourceWrapper, rule, 1, arr)); assertFalse(ParamFlowChecker.passCheck(resourceWrapper, rule, 1, arr)); diff --git a/sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowDefaultCheckerTest.java b/sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowDefaultCheckerTest.java new file mode 100644 index 00000000..f28bd1c0 --- /dev/null +++ b/sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowDefaultCheckerTest.java @@ -0,0 +1,280 @@ +package com.alibaba.csp.sentinel.slots.block.flow.param; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.alibaba.csp.sentinel.EntryType; +import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; +import com.alibaba.csp.sentinel.slotchain.StringResourceWrapper; +import com.alibaba.csp.sentinel.slots.statistic.cache.ConcurrentLinkedHashMapWrapper; +import com.alibaba.csp.sentinel.test.AbstractTimeBasedTest; +import com.alibaba.csp.sentinel.util.TimeUtil; + +/** + * @author jialiang.linjl + */ +public class ParamFlowDefaultCheckerTest extends AbstractTimeBasedTest { + + @Test + public void testParamFlowDefaultCheckSingleQps() { + final String resourceName = "testParamFlowDefaultCheckSingleQps"; + final ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN); + int paramIdx = 0; + + long threshold = 5L; + + ParamFlowRule rule = new ParamFlowRule(); + rule.setResource(resourceName); + rule.setCount(threshold); + rule.setParamIdx(paramIdx); + + String valueA = "valueA"; + ParameterMetric metric = new ParameterMetric(); + ParamFlowSlot.getMetricsMap().put(resourceWrapper, metric); + metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper(4000)); + metric.getRuleTokenCounterMap().put(rule, + new ConcurrentLinkedHashMapWrapper(4000)); + + // We mock the time directly to avoid unstable behaviour. + setCurrentMillis(System.currentTimeMillis()); + + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + + sleep(3000); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + } + + @Test + public void testParamFlowDefaultCheckSingleQpsWithBurst() throws InterruptedException { + final String resourceName = "testParamFlowDefaultCheckSingleQpsWithBurst"; + final ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN); + int paramIdx = 0; + + long threshold = 5L; + + ParamFlowRule rule = new ParamFlowRule(); + rule.setResource(resourceName); + rule.setCount(threshold); + rule.setParamIdx(paramIdx); + rule.setBurstCount(3); + + String valueA = "valueA"; + ParameterMetric metric = new ParameterMetric(); + ParamFlowSlot.getMetricsMap().put(resourceWrapper, metric); + metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper(4000)); + metric.getRuleTokenCounterMap().put(rule, + new ConcurrentLinkedHashMapWrapper(4000)); + + // We mock the time directly to avoid unstable behaviour. + setCurrentMillis(System.currentTimeMillis()); + + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + + sleep(1002); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + + sleep(1002); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + + sleep(2000); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + + sleep(1002); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + } + + @Test + public void testParamFlowDefaultCheckQpsInDifferentDuration() throws InterruptedException { + final String resourceName = "testParamFlowDefaultCheckQpsInDifferentDuration"; + final ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN); + int paramIdx = 0; + + long threshold = 5L; + + ParamFlowRule rule = new ParamFlowRule(); + rule.setResource(resourceName); + rule.setCount(threshold); + rule.setParamIdx(paramIdx); + rule.setDurationInSec(60); + + String valueA = "helloWorld"; + ParameterMetric metric = new ParameterMetric(); + ParamFlowSlot.getMetricsMap().put(resourceWrapper, metric); + metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper(4000)); + metric.getRuleTokenCounterMap().put(rule, + new ConcurrentLinkedHashMapWrapper(4000)); + + // We mock the time directly to avoid unstable behaviour. + setCurrentMillis(System.currentTimeMillis()); + + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + + assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + + sleepSecond(1); + assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + + sleepSecond(10); + assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + + sleepSecond(30); + assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + + sleepSecond(30); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + + assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); + } + + @Test + public void testParamFlowDefaultCheckSingleValueCheckQpsMultipleThreads() throws Exception { + // In this test case we use the actual time. + useActualTime(); + + final String resourceName = "testParamFlowDefaultCheckSingleValueCheckQpsMultipleThreads"; + final ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN); + int paramIdx = 0; + + long threshold = 5L; + + final ParamFlowRule rule = new ParamFlowRule(); + rule.setResource(resourceName); + rule.setCount(threshold); + rule.setParamIdx(paramIdx); + + final String valueA = "valueA"; + ParameterMetric metric = new ParameterMetric(); + ParamFlowSlot.getMetricsMap().put(resourceWrapper, metric); + metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper(4000)); + metric.getRuleTokenCounterMap().put(rule, + new ConcurrentLinkedHashMapWrapper(4000)); + int threadCount = 40; + + final CountDownLatch waitLatch = new CountDownLatch(threadCount); + final AtomicInteger successCount = new AtomicInteger(); + for (int i = 0; i < threadCount; i++) { + Thread t = new Thread(new Runnable() { + @Override + public void run() { + if (ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)) { + successCount.incrementAndGet(); + } + waitLatch.countDown(); + } + + }); + t.setName("sentinel-simulate-traffic-task-" + i); + t.start(); + } + waitLatch.await(); + + assertEquals(successCount.get(), threshold); + successCount.set(0); + + System.out.println("testParamFlowDefaultCheckSingleValueCheckQpsMultipleThreads: sleep for 3 seconds"); + TimeUnit.SECONDS.sleep(3); + + successCount.set(0); + final CountDownLatch waitLatch1 = new CountDownLatch(threadCount); + final long currentTime = TimeUtil.currentTimeMillis(); + final long endTime = currentTime + rule.getDurationInSec() * 1000 - 1; + for (int i = 0; i < threadCount; i++) { + Thread t = new Thread(new Runnable() { + @Override + public void run() { + long currentTime1 = currentTime; + while (currentTime1 <= endTime) { + if (ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)) { + successCount.incrementAndGet(); + } + + try { + TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(20)); + } catch (InterruptedException e) { + e.printStackTrace(); + } + currentTime1 = TimeUtil.currentTimeMillis(); + } + + waitLatch1.countDown(); + } + + }); + t.setName("sentinel-simulate-traffic-task-" + i); + t.start(); + } + waitLatch1.await(); + + assertEquals(successCount.get(), threshold); + } + + @Before + public void setUp() throws Exception { + ParamFlowSlot.getMetricsMap().clear(); + } + + @After + public void tearDown() throws Exception { + ParamFlowSlot.getMetricsMap().clear(); + } +} diff --git a/sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowSlotTest.java b/sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowSlotTest.java index 817e8703..df59432b 100644 --- a/sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowSlotTest.java +++ b/sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowSlotTest.java @@ -15,19 +15,28 @@ */ package com.alibaba.csp.sentinel.slots.block.flow.param; -import java.util.Collections; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; -import com.alibaba.csp.sentinel.EntryType; -import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; -import com.alibaba.csp.sentinel.slotchain.StringResourceWrapper; +import java.util.Collections; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.junit.After; import org.junit.Before; import org.junit.Test; -import static org.junit.Assert.*; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import com.alibaba.csp.sentinel.EntryType; +import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; +import com.alibaba.csp.sentinel.slotchain.StringResourceWrapper; +import com.alibaba.csp.sentinel.slots.statistic.cache.CacheMap; +import com.alibaba.csp.sentinel.slots.statistic.cache.ConcurrentLinkedHashMapWrapper; +import com.alibaba.csp.sentinel.util.TimeUtil; /** * Test cases for {@link ParamFlowSlot}. @@ -81,17 +90,21 @@ public class ParamFlowSlotTest { String resourceName = "testEntryWhenParamFlowExists"; ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN); long argToGo = 1L; - double count = 10; + double count = 1; ParamFlowRule rule = new ParamFlowRule(resourceName) .setCount(count) + .setBurstCount(0) .setParamIdx(0); ParamFlowRuleManager.loadRules(Collections.singletonList(rule)); ParameterMetric metric = mock(ParameterMetric.class); - // First pass, then blocked. - when(metric.getPassParamQps(rule.getParamIdx(), argToGo)) - .thenReturn(count - 1) - .thenReturn(count); + + CacheMap map = new ConcurrentLinkedHashMapWrapper(4000); + CacheMap map2 = new ConcurrentLinkedHashMapWrapper(4000); + when(metric.getRuleTimeCounter(rule)).thenReturn(map); + when(metric.getRuleTokenCounter(rule)).thenReturn(map2); + map.put(argToGo, new AtomicLong(TimeUtil.currentTimeMillis())); + // Insert the mock metric to control pass or block. ParamFlowSlot.getMetricsMap().put(resourceWrapper, metric); @@ -115,21 +128,29 @@ public class ParamFlowSlotTest { @Test public void testInitParamMetrics() { + + ParamFlowRule rule = new ParamFlowRule(); + rule.setParamIdx(1); int index = 1; String resourceName = "res-" + System.currentTimeMillis(); ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN); assertNull(ParamFlowSlot.getParamMetric(resourceWrapper)); - paramFlowSlot.initHotParamMetricsFor(resourceWrapper, index); + paramFlowSlot.initHotParamMetricsFor(resourceWrapper, rule); ParameterMetric metric = ParamFlowSlot.getParamMetric(resourceWrapper); assertNotNull(metric); - assertNotNull(metric.getRollingParameters().get(index)); + assertNotNull(metric.getRuleTimeCounterMap().get(rule)); assertNotNull(metric.getThreadCountMap().get(index)); // Duplicate init. - paramFlowSlot.initHotParamMetricsFor(resourceWrapper, index); + paramFlowSlot.initHotParamMetricsFor(resourceWrapper, rule); + assertSame(metric, ParamFlowSlot.getParamMetric(resourceWrapper)); + + ParamFlowRule rule2 = new ParamFlowRule(); + rule2.setParamIdx(1); assertSame(metric, ParamFlowSlot.getParamMetric(resourceWrapper)); + } @Before diff --git a/sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowThrottleRateLimitingCheckerTest.java b/sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowThrottleRateLimitingCheckerTest.java new file mode 100644 index 00000000..2f2fc3f8 --- /dev/null +++ b/sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowThrottleRateLimitingCheckerTest.java @@ -0,0 +1,164 @@ +package com.alibaba.csp.sentinel.slots.block.flow.param; + +import static org.junit.Assert.assertEquals; + +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.alibaba.csp.sentinel.EntryType; +import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; +import com.alibaba.csp.sentinel.slotchain.StringResourceWrapper; +import com.alibaba.csp.sentinel.slots.block.RuleConstant; +import com.alibaba.csp.sentinel.slots.statistic.cache.ConcurrentLinkedHashMapWrapper; +import com.alibaba.csp.sentinel.util.TimeUtil; + +/** + * @author jialiang.linjl + */ +public class ParamFlowThrottleRateLimitingCheckerTest { + + @Test + public void testSingleValueThrottleCheckQps() throws Exception { + final String resourceName = "testSingleValueThrottleCheckQps"; + final ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN); + int paramIdx = 0; + TimeUtil.currentTimeMillis(); + + long threshold = 5L; + + ParamFlowRule rule = new ParamFlowRule(); + rule.setResource(resourceName); + rule.setCount(threshold); + rule.setParamIdx(paramIdx); + rule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER); + + String valueA = "valueA"; + ParameterMetric metric = new ParameterMetric(); + ParamFlowSlot.getMetricsMap().put(resourceWrapper, metric); + metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper(4000)); + + long currentTime = TimeUtil.currentTimeMillis(); + long endTime = currentTime + rule.getDurationInSec() * 1000; + int successCount = 0; + while (currentTime <= endTime - 10) { + if (ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)) { + successCount++; + } + currentTime = TimeUtil.currentTimeMillis(); + } + assertEquals(successCount, threshold); + + System.out.println("testSingleValueThrottleCheckQps: sleep for 3 seconds"); + TimeUnit.SECONDS.sleep(3); + + currentTime = TimeUtil.currentTimeMillis(); + endTime = currentTime + rule.getDurationInSec() * 1000; + successCount = 0; + while (currentTime <= endTime - 10) { + if (ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)) { + successCount++; + } + currentTime = TimeUtil.currentTimeMillis(); + } + assertEquals(successCount, threshold); + } + + @Test + public void testSingleValueThrottleCheckQpsMultipleThreads() throws Exception { + final String resourceName = "testSingleValueThrottleCheckQpsMultipleThreads"; + final ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN); + int paramIdx = 0; + + long threshold = 5L; + + final ParamFlowRule rule = new ParamFlowRule(resourceName) + .setCount(threshold) + .setParamIdx(paramIdx) + .setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER); + + final String valueA = "valueA"; + ParameterMetric metric = new ParameterMetric(); + ParamFlowSlot.getMetricsMap().put(resourceWrapper, metric); + metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper(4000)); + + int threadCount = 40; + System.out.println(metric.getRuleTimeCounter(rule)); + + final CountDownLatch waitLatch = new CountDownLatch(threadCount); + final AtomicInteger successCount = new AtomicInteger(); + for (int i = 0; i < threadCount; i++) { + Thread t = new Thread(new Runnable() { + @Override + public void run() { + if (ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)) { + successCount.incrementAndGet(); + } + waitLatch.countDown(); + } + + }); + t.setName("sentinel-simulate-traffic-task-" + i); + t.start(); + } + waitLatch.await(); + + assertEquals(successCount.get(), 1); + System.out.println(threadCount); + successCount.set(0); + + System.out.println("testSingleValueThrottleCheckQpsMultipleThreads: sleep for 3 seconds"); + TimeUnit.SECONDS.sleep(3); + + successCount.set(0); + final CountDownLatch waitLatch1 = new CountDownLatch(threadCount); + final long currentTime = TimeUtil.currentTimeMillis(); + final long endTime = currentTime + rule.getDurationInSec() * 1000 - 1; + for (int i = 0; i < threadCount; i++) { + Thread t = new Thread(new Runnable() { + @Override + public void run() { + long currentTime1 = currentTime; + while (currentTime1 <= endTime) { + if (ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)) { + successCount.incrementAndGet(); + } + + Random random = new Random(); + + try { + TimeUnit.MILLISECONDS.sleep(random.nextInt(20)); + } catch (InterruptedException e) { + e.printStackTrace(); + } + currentTime1 = TimeUtil.currentTimeMillis(); + } + + waitLatch1.countDown(); + } + + }); + t.setName("sentinel-simulate-traffic-task-" + i); + t.start(); + } + waitLatch1.await(); + + assertEquals(successCount.get(), threshold); + } + + @Before + public void setUp() throws Exception { + ParamFlowSlot.getMetricsMap().clear(); + } + + @After + public void tearDown() throws Exception { + ParamFlowSlot.getMetricsMap().clear(); + } +} diff --git a/sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParameterMetricTest.java b/sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParameterMetricTest.java index 0450574b..8c7e7d8c 100644 --- a/sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParameterMetricTest.java +++ b/sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParameterMetricTest.java @@ -15,19 +15,20 @@ */ package com.alibaba.csp.sentinel.slots.block.flow.param; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; - -import com.alibaba.csp.sentinel.slots.statistic.cache.CacheMap; -import com.alibaba.csp.sentinel.slots.statistic.metric.HotParameterLeapArray; +import java.util.concurrent.atomic.AtomicLong; import org.junit.Test; -import static org.junit.Assert.*; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import com.alibaba.csp.sentinel.slots.block.RuleConstant; +import com.alibaba.csp.sentinel.slots.statistic.cache.CacheMap; /** * Test cases for {@link ParameterMetric}. @@ -38,42 +39,41 @@ import static org.mockito.Mockito.when; public class ParameterMetricTest { @Test - public void testGetTopParamCount() { - ParameterMetric metric = new ParameterMetric(); - int index = 1; - int n = 10; - RollingParamEvent event = RollingParamEvent.REQUEST_PASSED; - HotParameterLeapArray leapArray = mock(HotParameterLeapArray.class); - Map topValues = new HashMap() {{ - put("a", 3d); - put("b", 7d); - }}; - when(leapArray.getTopValues(event, n)).thenReturn(topValues); - - // Get when not initialized. - assertEquals(0, metric.getTopPassParamCount(index, n).size()); - - metric.getRollingParameters().put(index, leapArray); - assertEquals(topValues, metric.getTopPassParamCount(index, n)); - } - - @Test - public void testInitAndClearHotParameterMetric() { + public void testInitAndClearParameterMetric() { + // Create a parameter metric for resource "abc". ParameterMetric metric = new ParameterMetric(); - int index = 1; - metric.initializeForIndex(index); - HotParameterLeapArray leapArray = metric.getRollingParameters().get(index); - CacheMap cacheMap = metric.getThreadCountMap().get(index); - assertNotNull(leapArray); - assertNotNull(cacheMap); - metric.initializeForIndex(index); - assertSame(leapArray, metric.getRollingParameters().get(index)); - assertSame(cacheMap, metric.getThreadCountMap().get(index)); + ParamFlowRule rule = new ParamFlowRule("abc") + .setParamIdx(1); + metric.initialize(rule); + CacheMap threadCountMap = metric.getThreadCountMap().get(rule.getParamIdx()); + assertNotNull(threadCountMap); + CacheMap timeRecordMap = metric.getRuleTimeCounter(rule); + assertNotNull(timeRecordMap); + metric.initialize(rule); + assertSame(threadCountMap, metric.getThreadCountMap().get(rule.getParamIdx())); + assertSame(timeRecordMap, metric.getRuleTimeCounter(rule)); + + ParamFlowRule rule2 = new ParamFlowRule("abc") + .setParamIdx(1); + metric.initialize(rule2); + CacheMap timeRecordMap2 = metric.getRuleTimeCounter(rule2); + assertSame(timeRecordMap, timeRecordMap2); + + rule2.setParamIdx(2); + metric.initialize(rule2); + assertNotSame(timeRecordMap2, metric.getRuleTimeCounter(rule2)); + + ParamFlowRule rule3 = new ParamFlowRule("abc") + .setParamIdx(1) + .setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER); + metric.initialize(rule3); + assertNotSame(timeRecordMap, metric.getRuleTimeCounter(rule3)); metric.clear(); - assertEquals(0, metric.getRollingParameters().size()); assertEquals(0, metric.getThreadCountMap().size()); + assertEquals(0, metric.getRuleTimeCounterMap().size()); + assertEquals(0, metric.getRuleTokenCounterMap().size()); } @Test @@ -84,12 +84,15 @@ public class ParameterMetricTest { } private void testAddAndDecreaseThreadCount(int paramType) { - int paramIdx = 0; + + ParamFlowRule rule = new ParamFlowRule(); + rule.setParamIdx(0); + int n = 3; long[] v = new long[] {19L, 3L, 8L}; ParameterMetric metric = new ParameterMetric(); - metric.initializeForIndex(paramIdx); - assertTrue(metric.getThreadCountMap().containsKey(paramIdx)); + metric.initialize(rule); + assertTrue(metric.getThreadCountMap().containsKey(rule.getParamIdx())); switch (paramType) { case PARAM_TYPE_ARRAY: @@ -107,13 +110,13 @@ public class ParameterMetricTest { } assertEquals(1, metric.getThreadCountMap().size()); - CacheMap threadCountMap = metric.getThreadCountMap().get(paramIdx); + CacheMap threadCountMap = metric.getThreadCountMap().get(rule.getParamIdx()); assertEquals(v.length, threadCountMap.size()); for (long vs : v) { assertEquals(1, threadCountMap.get(vs).get()); } - for (int i = 1 ; i < n; i++) { + for (int i = 1; i < n; i++) { switch (paramType) { case PARAM_TYPE_ARRAY: metric.addThreadCount((Object)v); @@ -130,13 +133,13 @@ public class ParameterMetricTest { } } assertEquals(1, metric.getThreadCountMap().size()); - threadCountMap = metric.getThreadCountMap().get(paramIdx); + threadCountMap = metric.getThreadCountMap().get(rule.getParamIdx()); assertEquals(v.length, threadCountMap.size()); for (long vs : v) { assertEquals(n, threadCountMap.get(vs).get()); } - for (int i = 1 ; i < n; i++) { + for (int i = 1; i < n; i++) { switch (paramType) { case PARAM_TYPE_ARRAY: metric.decreaseThreadCount((Object)v); @@ -153,7 +156,7 @@ public class ParameterMetricTest { } } assertEquals(1, metric.getThreadCountMap().size()); - threadCountMap = metric.getThreadCountMap().get(paramIdx); + threadCountMap = metric.getThreadCountMap().get(rule.getParamIdx()); assertEquals(v.length, threadCountMap.size()); for (long vs : v) { assertEquals(1, threadCountMap.get(vs).get()); @@ -174,7 +177,7 @@ public class ParameterMetricTest { break; } assertEquals(1, metric.getThreadCountMap().size()); - threadCountMap = metric.getThreadCountMap().get(paramIdx); + threadCountMap = metric.getThreadCountMap().get(rule.getParamIdx()); assertEquals(0, threadCountMap.size()); } diff --git a/sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/test/AbstractTimeBasedTest.java b/sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/test/AbstractTimeBasedTest.java new file mode 100644 index 00000000..5ce74712 --- /dev/null +++ b/sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/test/AbstractTimeBasedTest.java @@ -0,0 +1,58 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.test; + +import com.alibaba.csp.sentinel.util.TimeUtil; + +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +/** + * Mock support for {@link TimeUtil} + * + * @author jason + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest({TimeUtil.class}) +public abstract class AbstractTimeBasedTest { + + private long currentMillis = 0; + + { + PowerMockito.mockStatic(TimeUtil.class); + PowerMockito.when(TimeUtil.currentTimeMillis()).thenReturn(currentMillis); + } + + protected final void useActualTime() { + PowerMockito.when(TimeUtil.currentTimeMillis()).thenCallRealMethod(); + } + + protected final void setCurrentMillis(long cur) { + currentMillis = cur; + PowerMockito.when(TimeUtil.currentTimeMillis()).thenReturn(currentMillis); + } + + protected final void sleep(int t) { + currentMillis += t; + PowerMockito.when(TimeUtil.currentTimeMillis()).thenReturn(currentMillis); + } + + protected final void sleepSecond(int timeSec) { + sleep(timeSec * 1000); + } +}