Browse Source

Refactor flow algorithm for parameter flow control and support more traffic shaping mode (#677)

* support burst count, statistic interval per rule and throttling mode.
* Add fields in ParamFlowRule to support burst count, throttle, statistic interval per rule.
* Deprecate HotParamLeapArray and use token bucket algorithm directly.
master
Eric Zhao GitHub 5 years ago
parent
commit
e9719d32ec
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 986 additions and 426 deletions
  1. +11
    -5
      sentinel-demo/sentinel-demo-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/demo/flow/param/ParamFlowQpsDemo.java
  2. +29
    -17
      sentinel-demo/sentinel-demo-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/demo/flow/param/ParamFlowQpsRunner.java
  3. +10
    -0
      sentinel-extension/sentinel-parameter-flow-control/pom.xml
  4. +0
    -64
      sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/command/handler/FetchTopParamsCommandHandler.java
  5. +1
    -1
      sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/command/handler/GetParamFlowRulesCommandHandler.java
  6. +1
    -1
      sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/command/handler/ModifyParamFlowRulesCommandHandler.java
  7. +149
    -19
      sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowChecker.java
  8. +61
    -20
      sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowRule.java
  9. +6
    -4
      sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowRuleManager.java
  10. +4
    -1
      sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowRuleUtil.java
  11. +4
    -16
      sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowSlot.java
  12. +82
    -128
      sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParameterMetric.java
  13. +1
    -3
      sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/statistic/ParamFlowStatisticEntryCallback.java
  14. +1
    -2
      sentinel-extension/sentinel-parameter-flow-control/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.command.CommandHandler
  15. +37
    -82
      sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowCheckerTest.java
  16. +280
    -0
      sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowDefaultCheckerTest.java
  17. +36
    -15
      sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowSlotTest.java
  18. +164
    -0
      sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowThrottleRateLimitingCheckerTest.java
  19. +51
    -48
      sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParameterMetricTest.java
  20. +58
    -0
      sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/test/AbstractTimeBasedTest.java

+ 11
- 5
sentinel-demo/sentinel-demo-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/demo/flow/param/ParamFlowQpsDemo.java View File

@@ -42,21 +42,27 @@ public class ParamFlowQpsDemo {


private static final String RESOURCE_KEY = "resA"; private static final String RESOURCE_KEY = "resA";


public static void main(String[] args) {
initHotParamFlowRules();
public static void main(String[] args) throws Exception {
initParamFlowRules();


final int threadCount = 8;
final int threadCount = 20;
ParamFlowQpsRunner<Integer> runner = new ParamFlowQpsRunner<>(PARAMS, RESOURCE_KEY, threadCount, 120); ParamFlowQpsRunner<Integer> runner = new ParamFlowQpsRunner<>(PARAMS, RESOURCE_KEY, threadCount, 120);
runner.simulateTraffic();
runner.tick(); runner.tick();

Thread.sleep(1000);
runner.simulateTraffic();
} }


private static void initHotParamFlowRules() {
private static void initParamFlowRules() {
// QPS mode, threshold is 5 for every frequent "hot spot" parameter in index 0 (the first arg). // QPS mode, threshold is 5 for every frequent "hot spot" parameter in index 0 (the first arg).
ParamFlowRule rule = new ParamFlowRule(RESOURCE_KEY) ParamFlowRule rule = new ParamFlowRule(RESOURCE_KEY)
.setParamIdx(0) .setParamIdx(0)
.setGrade(RuleConstant.FLOW_GRADE_QPS) .setGrade(RuleConstant.FLOW_GRADE_QPS)
//.setDurationInSec(3)
//.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER)
//.setMaxQueueingTimeMs(600)
.setCount(5); .setCount(5);

// We can set threshold count for specific parameter value individually. // We can set threshold count for specific parameter value individually.
// Here we add an exception item. That means: QPS threshold of entries with parameter `PARAM_B` (type: int) // Here we add an exception item. That means: QPS threshold of entries with parameter `PARAM_B` (type: int)
// in index 0 will be 10, rather than the global threshold (5). // in index 0 will be 10, rather than the global threshold (5).


+ 29
- 17
sentinel-demo/sentinel-demo-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/demo/flow/param/ParamFlowQpsRunner.java View File

@@ -43,6 +43,7 @@ class ParamFlowQpsRunner<T> {
private final int threadCount; private final int threadCount;


private final Map<T, AtomicLong> passCountMap = new ConcurrentHashMap<>(); private final Map<T, AtomicLong> passCountMap = new ConcurrentHashMap<>();
private final Map<T, AtomicLong> blockCountMap = new ConcurrentHashMap<>();


private volatile boolean stop = false; private volatile boolean stop = false;


@@ -59,6 +60,7 @@ class ParamFlowQpsRunner<T> {
for (T param : params) { for (T param : params) {
assertTrue(param != null, "Parameters should not be null"); assertTrue(param != null, "Parameters should not be null");
passCountMap.putIfAbsent(param, new AtomicLong()); passCountMap.putIfAbsent(param, new AtomicLong());
blockCountMap.putIfAbsent(param, new AtomicLong());
} }
} }


@@ -94,11 +96,18 @@ class ParamFlowQpsRunner<T> {


private void passFor(T param) { private void passFor(T param) {
passCountMap.get(param).incrementAndGet(); passCountMap.get(param).incrementAndGet();
// System.out.println(String.format("Parameter <%s> passed at: %d", param, TimeUtil.currentTimeMillis()));
}

private void blockFor(T param) {
blockCountMap.get(param).incrementAndGet();
} }


final class RunTask implements Runnable { final class RunTask implements Runnable {

@Override @Override
public void run() { public void run() {

while (!stop) { while (!stop) {
Entry entry = null; Entry entry = null;
T param = generateParam(); T param = generateParam();
@@ -106,8 +115,9 @@ class ParamFlowQpsRunner<T> {
entry = SphU.entry(resourceName, EntryType.IN, 1, param); entry = SphU.entry(resourceName, EntryType.IN, 1, param);
// Add pass for parameter. // Add pass for parameter.
passFor(param); passFor(param);
} catch (BlockException e1) {
} catch (BlockException e) {
// block.incrementAndGet(); // block.incrementAndGet();
blockFor(param);
} catch (Exception ex) { } catch (Exception ex) {
// biz exception // biz exception
ex.printStackTrace(); ex.printStackTrace();
@@ -118,15 +128,19 @@ class ParamFlowQpsRunner<T> {
} }
} }


try {
TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(0, 10));
} catch (InterruptedException e) {
// ignore
}
sleep(ThreadLocalRandom.current().nextInt(0, 10));
} }
} }
} }


private void sleep(int timeMs) {
try {
TimeUnit.MILLISECONDS.sleep(timeMs);
} catch (InterruptedException e) {
// ignore
}
}

final class TimerTask implements Runnable { final class TimerTask implements Runnable {
@Override @Override
public void run() { public void run() {
@@ -139,21 +153,19 @@ class ParamFlowQpsRunner<T> {
map.putIfAbsent(param, 0L); map.putIfAbsent(param, 0L);
} }
while (!stop) { while (!stop) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
sleep(1000);

// There may be a mismatch for time window of internal sliding window. // There may be a mismatch for time window of internal sliding window.
// See corresponding `metrics.log` for accurate statistic log. // See corresponding `metrics.log` for accurate statistic log.
for (T param : params) { for (T param : params) {
long globalPass = passCountMap.get(param).get();
long oldPass = map.get(param);
long oneSecondPass = globalPass - oldPass;
map.put(param, globalPass);
System.out.println(String.format("[%d][%d] Parameter flow metrics for resource %s: "
+ "pass count for param <%s> is %d",
seconds, TimeUtil.currentTimeMillis(), resourceName, param, oneSecondPass));

System.out.println(String.format(
"[%d][%d] Parameter flow metrics for resource %s: "
+ "pass count for param <%s> is %d, block count: %d",
seconds, TimeUtil.currentTimeMillis(), resourceName, param,
passCountMap.get(param).getAndSet(0), blockCountMap.get(param).getAndSet(0)));
} }
System.out.println("=============================");
if (seconds-- <= 0) { if (seconds-- <= 0) {
stop = true; stop = true;
} }


+ 10
- 0
sentinel-extension/sentinel-parameter-flow-control/pom.xml View File

@@ -39,5 +39,15 @@
<artifactId>mockito-core</artifactId> <artifactId>mockito-core</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito2</artifactId>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
</project> </project>

+ 0
- 64
sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/command/handler/FetchTopParamsCommandHandler.java View File

@@ -1,64 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.command.handler;

import java.util.Map;

import com.alibaba.csp.sentinel.command.CommandHandler;
import com.alibaba.csp.sentinel.command.CommandRequest;
import com.alibaba.csp.sentinel.command.CommandResponse;
import com.alibaba.csp.sentinel.command.annotation.CommandMapping;
import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowSlot;
import com.alibaba.csp.sentinel.slots.block.flow.param.ParameterMetric;
import com.alibaba.csp.sentinel.util.StringUtil;
import com.alibaba.fastjson.JSON;

/**
* @author Eric Zhao
* @since 0.2.0
*/
@CommandMapping(name = "topParams", desc = "get topN param in specified resource, accept param: res={resourceName}&idx={paramIndex}&n={topN}")
public class FetchTopParamsCommandHandler implements CommandHandler<String> {

@Override
public CommandResponse<String> handle(CommandRequest request) {
String resourceName = request.getParam("res");
if (StringUtil.isBlank(resourceName)) {
return CommandResponse.ofFailure(new IllegalArgumentException("Invalid parameter: res"));
}
String idx = request.getParam("idx");
int index;
try {
index = Integer.valueOf(idx);
} catch (Exception ex) {
return CommandResponse.ofFailure(ex, "Invalid parameter: idx");
}
String n = request.getParam("n");
int amount;
try {
amount = Integer.valueOf(n);
} catch (Exception ex) {
return CommandResponse.ofFailure(ex, "Invalid parameter: n");
}
ParameterMetric metric = ParamFlowSlot.getHotParamMetricForName(resourceName);
if (metric == null) {
return CommandResponse.ofSuccess("{}");
}
Map<Object, Double> values = metric.getTopPassParamCount(index, amount);

return CommandResponse.ofSuccess(JSON.toJSONString(values));
}
}

+ 1
- 1
sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/command/handler/GetParamFlowRulesCommandHandler.java View File

@@ -26,7 +26,7 @@ import com.alibaba.fastjson.JSON;
* @author Eric Zhao * @author Eric Zhao
* @since 0.2.0 * @since 0.2.0
*/ */
@CommandMapping(name = "getParamFlowRules", desc = "get param flow rules")
@CommandMapping(name = "getParamFlowRules", desc = "Get all parameter flow rules")
public class GetParamFlowRulesCommandHandler implements CommandHandler<String> { public class GetParamFlowRulesCommandHandler implements CommandHandler<String> {


@Override @Override


+ 1
- 1
sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/command/handler/ModifyParamFlowRulesCommandHandler.java View File

@@ -33,7 +33,7 @@ import com.alibaba.fastjson.JSONArray;
* @author Eric Zhao * @author Eric Zhao
* @since 0.2.0 * @since 0.2.0
*/ */
@CommandMapping(name = "setParamFlowRules", desc = "set param flow rules, accept param: data={paramFlowRule Json}")
@CommandMapping(name = "setParamFlowRules", desc = "Set parameter flow rules, while previous rules will be replaced.")
public class ModifyParamFlowRulesCommandHandler implements CommandHandler<String> { public class ModifyParamFlowRulesCommandHandler implements CommandHandler<String> {


private static WritableDataSource<List<ParamFlowRule>> paramFlowWds = null; private static WritableDataSource<List<ParamFlowRule>> paramFlowWds = null;


+ 149
- 19
sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowChecker.java View File

@@ -21,16 +21,21 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;


import com.alibaba.csp.sentinel.cluster.ClusterStateManager; import com.alibaba.csp.sentinel.cluster.ClusterStateManager;
import com.alibaba.csp.sentinel.cluster.TokenService;
import com.alibaba.csp.sentinel.cluster.client.TokenClientProvider;
import com.alibaba.csp.sentinel.cluster.TokenResult; import com.alibaba.csp.sentinel.cluster.TokenResult;
import com.alibaba.csp.sentinel.cluster.TokenResultStatus; import com.alibaba.csp.sentinel.cluster.TokenResultStatus;
import com.alibaba.csp.sentinel.cluster.TokenService;
import com.alibaba.csp.sentinel.cluster.client.TokenClientProvider;
import com.alibaba.csp.sentinel.cluster.server.EmbeddedClusterTokenServerProvider; import com.alibaba.csp.sentinel.cluster.server.EmbeddedClusterTokenServerProvider;
import com.alibaba.csp.sentinel.log.RecordLog; import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
import com.alibaba.csp.sentinel.slots.block.RuleConstant; import com.alibaba.csp.sentinel.slots.block.RuleConstant;
import com.alibaba.csp.sentinel.slots.statistic.cache.CacheMap;
import com.alibaba.csp.sentinel.util.TimeUtil;


/** /**
* Rule checker for parameter flow control. * Rule checker for parameter flow control.
@@ -91,23 +96,17 @@ final class ParamFlowChecker {
return true; return true;
} }


static boolean passSingleValueCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count, Object value) {
Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
static boolean passSingleValueCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,
Object value) {
if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) { if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
double curCount = getHotParameters(resourceWrapper).getPassParamQps(rule.getParamIdx(), value);

if (exclusionItems.contains(value)) {
// Pass check for exclusion items.
int itemQps = rule.getParsedHotItems().get(value);
return curCount + count <= itemQps;
} else if (curCount + count > rule.getCount()) {
if ((curCount - rule.getCount()) < 1 && (curCount - rule.getCount()) > 0) {
return true;
}
return false;
if (rule.getControlBehavior() == RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER) {
return passThrottleLocalCheck(resourceWrapper, rule, acquireCount, value);
} else {
return passDefaultLocalCheck(resourceWrapper, rule, acquireCount, value);
} }
} else if (rule.getGrade() == RuleConstant.FLOW_GRADE_THREAD) { } else if (rule.getGrade() == RuleConstant.FLOW_GRADE_THREAD) {
long threadCount = getHotParameters(resourceWrapper).getThreadCount(rule.getParamIdx(), value);
Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
long threadCount = getParameterMetric(resourceWrapper).getThreadCount(rule.getParamIdx(), value);
if (exclusionItems.contains(value)) { if (exclusionItems.contains(value)) {
int itemThreshold = rule.getParsedHotItems().get(value); int itemThreshold = rule.getParsedHotItems().get(value);
return ++threadCount <= itemThreshold; return ++threadCount <= itemThreshold;
@@ -119,7 +118,136 @@ final class ParamFlowChecker {
return true; return true;
} }


private static ParameterMetric getHotParameters(ResourceWrapper resourceWrapper) {
static boolean passDefaultLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,
Object value) {
ParameterMetric metric = getParameterMetric(resourceWrapper);
CacheMap<Object, AtomicInteger> tokenCounters = metric == null ? null : metric.getRuleTokenCounter(rule);
CacheMap<Object, AtomicLong> timeCounters = metric == null ? null : metric.getRuleTimeCounter(rule);

if (tokenCounters == null || timeCounters == null) {
return true;
}

// Calculate max token count (threshold)
Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
int tokenCount = (int)rule.getCount();
if (exclusionItems.contains(value)) {
tokenCount = rule.getParsedHotItems().get(value);
}

if (tokenCount == 0) {
return false;
}

int maxCount = tokenCount + rule.getBurstCount();
if (acquireCount > maxCount) {
return false;
}

while (true) {
long currentTime = TimeUtil.currentTimeMillis();

AtomicLong lastAddTokenTime = timeCounters.putIfAbsent(value, new AtomicLong(currentTime));
if (lastAddTokenTime == null) {
// Token never added, just replenish the tokens and consume {@code acquireCount} immediately.
tokenCounters.putIfAbsent(value, new AtomicInteger(maxCount - acquireCount));
return true;
}

// Calculate the time duration since last token was added.
long passTime = currentTime - lastAddTokenTime.get();
// A simplified token bucket algorithm that will replenish the tokens only when statistic window has passed.
if (passTime > rule.getDurationInSec() * 1000) {
AtomicInteger oldQps = tokenCounters.putIfAbsent(value, new AtomicInteger(maxCount - acquireCount));
if (oldQps == null) {
// Might not be accurate here.
lastAddTokenTime.set(currentTime);
return true;
} else {
int restQps = oldQps.get();
int toAddCount = (int)((passTime * tokenCount) / (rule.getDurationInSec() * 1000));
int newQps = (restQps + toAddCount) > maxCount ? (maxCount - acquireCount)
: (restQps + toAddCount - acquireCount);

if (newQps < 0) {
return false;
}
if (oldQps.compareAndSet(restQps, newQps)) {
lastAddTokenTime.set(currentTime);
return true;
}
Thread.yield();
}
} else {
AtomicInteger oldQps = tokenCounters.get(value);
if (oldQps != null) {
int oldQpsValue = oldQps.get();
if (oldQpsValue - acquireCount >= 0) {
if (oldQps.compareAndSet(oldQpsValue, oldQpsValue - acquireCount)) {
return true;
}
} else {
return false;
}
}
Thread.yield();
}
}
}

static boolean passThrottleLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,
Object value) {
ParameterMetric metric = getParameterMetric(resourceWrapper);
CacheMap<Object, AtomicLong> timeRecorderMap = metric == null ? null : metric.getRuleTimeCounter(rule);
if (timeRecorderMap == null) {
return true;
}

// Calculate max token count (threshold)
Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
long tokenCount = (long)rule.getCount();
if (exclusionItems.contains(value)) {
tokenCount = rule.getParsedHotItems().get(value);
}

if (tokenCount == 0) {
return false;
}

long costTime = Math.round(1.0 * 1000 * acquireCount * rule.getDurationInSec() / tokenCount);
while (true) {
long currentTime = TimeUtil.currentTimeMillis();
AtomicLong timeRecorder = timeRecorderMap.putIfAbsent(value, new AtomicLong(currentTime));
if (timeRecorder == null) {
return true;
}
//AtomicLong timeRecorder = timeRecorderMap.get(value);
long lastPassTime = timeRecorder.get();
long expectedTime = lastPassTime + costTime;

if (expectedTime <= currentTime || expectedTime - currentTime < rule.getMaxQueueingTimeMs()) {
AtomicLong lastPastTimeRef = timeRecorderMap.get(value);
if (lastPastTimeRef.compareAndSet(lastPassTime, currentTime)) {
long waitTime = expectedTime - currentTime;
if (waitTime > 0) {
lastPastTimeRef.set(expectedTime);
try {
TimeUnit.MILLISECONDS.sleep(waitTime);
} catch (InterruptedException e) {
RecordLog.warn("passThrottleLocalCheck: wait interrupted", e);
}
}
return true;
} else {
Thread.yield();
}
} else {
return false;
}
}
}

private static ParameterMetric getParameterMetric(ResourceWrapper resourceWrapper) {
// Should not be null. // Should not be null.
return ParamFlowSlot.getParamMetric(resourceWrapper); return ParamFlowSlot.getParamMetric(resourceWrapper);
} }
@@ -148,7 +276,8 @@ final class ParamFlowChecker {


TokenService clusterService = pickClusterService(); TokenService clusterService = pickClusterService();
if (clusterService == null) { if (clusterService == null) {
// No available cluster client or server, fallback to local or pass in need.
// No available cluster client or server, fallback to local or
// pass in need.
return fallbackToLocalOrPass(resourceWrapper, rule, count, params); return fallbackToLocalOrPass(resourceWrapper, rule, count, params);
} }


@@ -187,5 +316,6 @@ final class ParamFlowChecker {
return null; return null;
} }


private ParamFlowChecker() {}
private ParamFlowChecker() {
}
} }

+ 61
- 20
sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowRule.java View File

@@ -19,6 +19,7 @@ import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;


import com.alibaba.csp.sentinel.context.Context; import com.alibaba.csp.sentinel.context.Context;
import com.alibaba.csp.sentinel.node.DefaultNode; import com.alibaba.csp.sentinel.node.DefaultNode;
@@ -55,6 +56,15 @@ public class ParamFlowRule extends AbstractRule {
*/ */
private double count; private double count;


/**
* Traffic shaping behavior (since 1.6.0).
*/
private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT;

private int maxQueueingTimeMs = 0;
private int burstCount = 0;
private long durationInSec = 1;

/** /**
* Original exclusion items of parameters. * Original exclusion items of parameters.
*/ */
@@ -74,6 +84,42 @@ public class ParamFlowRule extends AbstractRule {
*/ */
private ParamFlowClusterConfig clusterConfig; private ParamFlowClusterConfig clusterConfig;


public int getControlBehavior() {
return controlBehavior;
}

public ParamFlowRule setControlBehavior(int controlBehavior) {
this.controlBehavior = controlBehavior;
return this;
}

public int getMaxQueueingTimeMs() {
return maxQueueingTimeMs;
}

public ParamFlowRule setMaxQueueingTimeMs(int maxQueueingTimeMs) {
this.maxQueueingTimeMs = maxQueueingTimeMs;
return this;
}

public int getBurstCount() {
return burstCount;
}

public ParamFlowRule setBurstCount(int burstCount) {
this.burstCount = burstCount;
return this;
}

public long getDurationInSec() {
return durationInSec;
}

public ParamFlowRule setDurationInSec(long durationInSec) {
this.durationInSec = durationInSec;
return this;
}

public int getGrade() { public int getGrade() {
return grade; return grade;
} }
@@ -156,15 +202,18 @@ public class ParamFlowRule extends AbstractRule {
if (o == null || getClass() != o.getClass()) { return false; } if (o == null || getClass() != o.getClass()) { return false; }
if (!super.equals(o)) { return false; } if (!super.equals(o)) { return false; }


ParamFlowRule rule = (ParamFlowRule)o;
ParamFlowRule that = (ParamFlowRule)o;


if (grade != rule.grade) { return false; }
if (Double.compare(rule.count, count) != 0) { return false; }
if (clusterMode != rule.clusterMode) { return false; }
if (paramIdx != null ? !paramIdx.equals(rule.paramIdx) : rule.paramIdx != null) { return false; }
if (paramFlowItemList != null ? !paramFlowItemList.equals(rule.paramFlowItemList)
: rule.paramFlowItemList != null) { return false; }
return clusterConfig != null ? clusterConfig.equals(rule.clusterConfig) : rule.clusterConfig == null;
if (grade != that.grade) { return false; }
if (Double.compare(that.count, count) != 0) { return false; }
if (controlBehavior != that.controlBehavior) { return false; }
if (maxQueueingTimeMs != that.maxQueueingTimeMs) { return false; }
if (burstCount != that.burstCount) { return false; }
if (durationInSec != that.durationInSec) { return false; }
if (clusterMode != that.clusterMode) { return false; }
if (!Objects.equals(paramIdx, that.paramIdx)) { return false; }
if (!Objects.equals(paramFlowItemList, that.paramFlowItemList)) { return false; }
return Objects.equals(clusterConfig, that.clusterConfig);
} }


@Override @Override
@@ -175,21 +224,13 @@ public class ParamFlowRule extends AbstractRule {
result = 31 * result + (paramIdx != null ? paramIdx.hashCode() : 0); result = 31 * result + (paramIdx != null ? paramIdx.hashCode() : 0);
temp = Double.doubleToLongBits(count); temp = Double.doubleToLongBits(count);
result = 31 * result + (int)(temp ^ (temp >>> 32)); result = 31 * result + (int)(temp ^ (temp >>> 32));
result = 31 * result + controlBehavior;
result = 31 * result + maxQueueingTimeMs;
result = 31 * result + burstCount;
result = 31 * result + (int)(durationInSec ^ (durationInSec >>> 32));
result = 31 * result + (paramFlowItemList != null ? paramFlowItemList.hashCode() : 0); result = 31 * result + (paramFlowItemList != null ? paramFlowItemList.hashCode() : 0);
result = 31 * result + (clusterMode ? 1 : 0); result = 31 * result + (clusterMode ? 1 : 0);
result = 31 * result + (clusterConfig != null ? clusterConfig.hashCode() : 0); result = 31 * result + (clusterConfig != null ? clusterConfig.hashCode() : 0);
return result; return result;
} }

@Override
public String toString() {
return "ParamFlowRule{" +
"grade=" + grade +
", paramIdx=" + paramIdx +
", count=" + count +
", paramFlowItemList=" + paramFlowItemList +
", clusterMode=" + clusterMode +
", clusterConfig=" + clusterConfig +
'}';
}
} }

+ 6
- 4
sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowRuleManager.java View File

@@ -62,8 +62,9 @@ public final class ParamFlowRuleManager {
} }


/** /**
* Listen to the {@link SentinelProperty} for {@link ParamFlowRule}s. The property is the source
* of {@link ParamFlowRule}s. Parameter flow rules can also be set by {@link #loadRules(List)} directly.
* Listen to the {@link SentinelProperty} for {@link ParamFlowRule}s. The
* property is the source of {@link ParamFlowRule}s. Parameter flow rules
* can also be set by {@link #loadRules(List)} directly.
* *
* @param property the property to listen * @param property the property to listen
*/ */
@@ -149,6 +150,7 @@ public final class ParamFlowRuleManager {
ruleSet = new HashSet<>(); ruleSet = new HashSet<>();
newRuleMap.put(resourceName, ruleSet); newRuleMap.put(resourceName, ruleSet);
} }

ruleSet.add(rule); ruleSet.add(rule);
} }


@@ -164,6 +166,6 @@ public final class ParamFlowRuleManager {
} }
} }


private ParamFlowRuleManager() {}
private ParamFlowRuleManager() {
}
} }


+ 4
- 1
sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowRuleUtil.java View File

@@ -31,7 +31,10 @@ public final class ParamFlowRuleUtil {


public static boolean isValidRule(ParamFlowRule rule) { public static boolean isValidRule(ParamFlowRule rule) {
return rule != null && !StringUtil.isBlank(rule.getResource()) && rule.getCount() >= 0 return rule != null && !StringUtil.isBlank(rule.getResource()) && rule.getCount() >= 0
&& rule.getGrade() >= 0 && rule.getParamIdx() != null && checkCluster(rule);
&& rule.getGrade() >= 0 && rule.getParamIdx() != null
&& rule.getBurstCount() >= 0 && rule.getControlBehavior() >= 0
&& rule.getDurationInSec() > 0 && rule.getMaxQueueingTimeMs() >= 0
&& checkCluster(rule);
} }


private static boolean checkCluster(/*@PreChecked*/ ParamFlowRule rule) { private static boolean checkCluster(/*@PreChecked*/ ParamFlowRule rule) {


+ 4
- 16
sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowSlot.java View File

@@ -87,13 +87,9 @@ public class ParamFlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
applyRealParamIdx(rule, args.length); applyRealParamIdx(rule, args.length);


// Initialize the parameter metrics. // Initialize the parameter metrics.
initHotParamMetricsFor(resourceWrapper, rule.getParamIdx());
initHotParamMetricsFor(resourceWrapper, rule);


if (!ParamFlowChecker.passCheck(resourceWrapper, rule, count, args)) { if (!ParamFlowChecker.passCheck(resourceWrapper, rule, count, args)) {

// Here we add the block count.
addBlockCount(resourceWrapper, count, args);

String triggeredParam = ""; String triggeredParam = "";
if (args.length > rule.getParamIdx()) { if (args.length > rule.getParamIdx()) {
Object value = args[rule.getParamIdx()]; Object value = args[rule.getParamIdx()];
@@ -104,22 +100,14 @@ public class ParamFlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
} }
} }


private void addBlockCount(ResourceWrapper resourceWrapper, int count, Object... args) {
ParameterMetric parameterMetric = ParamFlowSlot.getParamMetric(resourceWrapper);

if (parameterMetric != null) {
parameterMetric.addBlock(count, args);
}
}

/** /**
* Init the parameter metric and index map for given resource. * Init the parameter metric and index map for given resource.
* Package-private for test. * Package-private for test.
* *
* @param resourceWrapper resource to init * @param resourceWrapper resource to init
* @param index index to initialize, which must be valid
* @param rule relevant rule
*/ */
void initHotParamMetricsFor(ResourceWrapper resourceWrapper, /*@Valid*/ int index) {
void initHotParamMetricsFor(ResourceWrapper resourceWrapper, /*@Valid*/ ParamFlowRule rule) {
ParameterMetric metric; ParameterMetric metric;
// Assume that the resource is valid. // Assume that the resource is valid.
if ((metric = metricsMap.get(resourceWrapper)) == null) { if ((metric = metricsMap.get(resourceWrapper)) == null) {
@@ -131,7 +119,7 @@ public class ParamFlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
} }
} }
} }
metric.initializeForIndex(index);
metric.initialize(rule);
} }


public static ParameterMetric getParamMetric(ResourceWrapper resourceWrapper) { public static ParameterMetric getParamMetric(ResourceWrapper resourceWrapper) {


+ 82
- 128
sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParameterMetric.java View File

@@ -19,16 +19,12 @@ import java.lang.reflect.Array;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;


import com.alibaba.csp.sentinel.log.RecordLog; import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.node.IntervalProperty;
import com.alibaba.csp.sentinel.node.SampleCountProperty;
import com.alibaba.csp.sentinel.slots.statistic.cache.CacheMap; import com.alibaba.csp.sentinel.slots.statistic.cache.CacheMap;
import com.alibaba.csp.sentinel.slots.statistic.cache.ConcurrentLinkedHashMapWrapper; import com.alibaba.csp.sentinel.slots.statistic.cache.ConcurrentLinkedHashMapWrapper;
import com.alibaba.csp.sentinel.slots.statistic.metric.HotParameterLeapArray;
import com.alibaba.csp.sentinel.util.AssertUtil;


/** /**
* Metrics for frequent ("hot spot") parameters. * Metrics for frequent ("hot spot") parameters.
@@ -38,49 +34,79 @@ import com.alibaba.csp.sentinel.util.AssertUtil;
*/ */
public class ParameterMetric { public class ParameterMetric {


private final int sampleCount;
private final int intervalMs;

public ParameterMetric() {
this(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL);
private static final int THREAD_COUNT_MAX_CAPACITY = 4000;
private static final int BASE_PARAM_MAX_CAPACITY = 4000;
private static final int TOTAL_MAX_CAPACITY = 20_0000;

private final Object lock = new Object();

/**
* Format: (rule, (value, timeRecorder))
*
* @since 1.6.0
*/
private final Map<ParamFlowRule, CacheMap<Object, AtomicLong>> ruleTimeCounters = new HashMap<>();
/**
* Format: (rule, (value, tokenCounter))
*
* @since 1.6.0
*/
private final Map<ParamFlowRule, CacheMap<Object, AtomicInteger>> ruleTokenCounter = new HashMap<>();
private final Map<Integer, CacheMap<Object, AtomicInteger>> threadCountMap = new HashMap<>();

/**
* Get the token counter for given parameter rule.
*
* @param rule valid parameter rule
* @return the associated token counter
* @since 1.6.0
*/
public CacheMap<Object, AtomicInteger> getRuleTokenCounter(ParamFlowRule rule) {
return ruleTokenCounter.get(rule);
} }


public ParameterMetric(int sampleCount, int intervalInMs) {
AssertUtil.isTrue(sampleCount > 0, "sampleCount should be positive");
AssertUtil.isTrue(intervalInMs > 0, "window interval should be positive");
AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");
this.sampleCount = sampleCount;
this.intervalMs = intervalInMs;
/**
* Get the time record counter for given parameter rule.
*
* @param rule valid parameter rule
* @return the associated time counter
* @since 1.6.0
*/
public CacheMap<Object, AtomicLong> getRuleTimeCounter(ParamFlowRule rule) {
return ruleTimeCounters.get(rule);
} }


private Map<Integer, HotParameterLeapArray> rollingParameters =
new ConcurrentHashMap<Integer, HotParameterLeapArray>();
private Map<Integer, CacheMap<Object, AtomicInteger>> threadCountMap =
new ConcurrentHashMap<Integer, CacheMap<Object, AtomicInteger>>();

public Map<Integer, HotParameterLeapArray> getRollingParameters() {
return rollingParameters;
}

public Map<Integer, CacheMap<Object, AtomicInteger>> getThreadCountMap() {
return threadCountMap;
public void clear() {
synchronized (lock) {
threadCountMap.clear();
ruleTimeCounters.clear();
ruleTokenCounter.clear();
}
} }


public synchronized void clear() {
rollingParameters.clear();
threadCountMap.clear();
}
public void initialize(ParamFlowRule rule) {
if (!ruleTimeCounters.containsKey(rule)) {
synchronized (lock) {
if (ruleTimeCounters.get(rule) == null) {
long size = Math.min(BASE_PARAM_MAX_CAPACITY * rule.getDurationInSec(), TOTAL_MAX_CAPACITY);
ruleTimeCounters.put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(size));
}
}
}


public void initializeForIndex(int index) {
if (!rollingParameters.containsKey(index)) {
synchronized (this) {
// putIfAbsent
if (rollingParameters.get(index) == null) {
rollingParameters.put(index, new HotParameterLeapArray(sampleCount, intervalMs));
if (!ruleTokenCounter.containsKey(rule)) {
synchronized (lock) {
if (ruleTokenCounter.get(rule) == null) {
long size = Math.min(BASE_PARAM_MAX_CAPACITY * rule.getDurationInSec(), TOTAL_MAX_CAPACITY);
ruleTokenCounter.put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicInteger>(size));
} }
}
}


if (threadCountMap.get(index) == null) {
threadCountMap.put(index,
if (!threadCountMap.containsKey(rule.getParamIdx())) {
synchronized (lock) {
if (threadCountMap.get(rule.getParamIdx()) == null) {
threadCountMap.put(rule.getParamIdx(),
new ConcurrentLinkedHashMapWrapper<Object, AtomicInteger>(THREAD_COUNT_MAX_CAPACITY)); new ConcurrentLinkedHashMapWrapper<Object, AtomicInteger>(THREAD_COUNT_MAX_CAPACITY));
} }
} }
@@ -204,102 +230,30 @@ public class ParameterMetric {
} }
} }


public void addPass(int count, Object... args) {
add(RollingParamEvent.REQUEST_PASSED, count, args);
}

public void addBlock(int count, Object... args) {
add(RollingParamEvent.REQUEST_BLOCKED, count, args);
}

@SuppressWarnings("rawtypes")
private void add(RollingParamEvent event, int count, Object... args) {
if (args == null) {
return;
}
try {
for (int index = 0; index < args.length; index++) {
HotParameterLeapArray param = rollingParameters.get(index);
if (param == null) {
continue;
}

Object arg = args[index];
if (arg == null) {
continue;
}
if (Collection.class.isAssignableFrom(arg.getClass())) {
for (Object value : ((Collection)arg)) {
param.addValue(event, count, value);
}
} else if (arg.getClass().isArray()) {
int length = Array.getLength(arg);
for (int i = 0; i < length; i++) {
Object value = Array.get(arg, i);
param.addValue(event, count, value);
}
} else {
param.addValue(event, count, arg);
}

}
} catch (Throwable e) {
RecordLog.warn("[ParameterMetric] Param exception", e);
}
}

public double getPassParamQps(int index, Object value) {
try {
HotParameterLeapArray parameter = rollingParameters.get(index);
if (parameter == null || value == null) {
return -1;
}
return parameter.getRollingAvg(RollingParamEvent.REQUEST_PASSED, value);
} catch (Throwable e) {
RecordLog.info(e.getMessage(), e);
public long getThreadCount(int index, Object value) {
CacheMap<Object, AtomicInteger> cacheMap = threadCountMap.get(index);
if (cacheMap == null) {
return 0;
} }


return -1;
AtomicInteger count = cacheMap.get(value);
return count == null ? 0L : count.get();
} }


public long getBlockParamQps(int index, Object value) {
try {
HotParameterLeapArray parameter = rollingParameters.get(index);
if (parameter == null || value == null) {
return -1;
}

return (long)rollingParameters.get(index).getRollingAvg(RollingParamEvent.REQUEST_BLOCKED, value);
} catch (Throwable e) {
RecordLog.info(e.getMessage(), e);
}

return -1;
/**
* Get the token counter map. Package-private for test.
*
* @return the token counter map
*/
Map<ParamFlowRule, CacheMap<Object, AtomicInteger>> getRuleTokenCounterMap() {
return ruleTokenCounter;
} }


public Map<Object, Double> getTopPassParamCount(int index, int number) {
try {
HotParameterLeapArray parameter = rollingParameters.get(index);
if (parameter == null) {
return new HashMap<Object, Double>();
}

return parameter.getTopValues(RollingParamEvent.REQUEST_PASSED, number);
} catch (Throwable e) {
RecordLog.info(e.getMessage(), e);
}

return new HashMap<Object, Double>();
Map<Integer, CacheMap<Object, AtomicInteger>> getThreadCountMap() {
return threadCountMap;
} }


public long getThreadCount(int index, Object value) {
if (threadCountMap.get(index) == null) {
return 0;
}

AtomicInteger count = threadCountMap.get(index).get(value);
return count == null ? 0L : count.get();
Map<ParamFlowRule, CacheMap<Object, AtomicLong>> getRuleTimeCounterMap() {
return ruleTimeCounters;
} }

private static final long THREAD_COUNT_MAX_CAPACITY = 4000;
} }

+ 1
- 3
sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/statistic/ParamFlowStatisticEntryCallback.java View File

@@ -30,13 +30,11 @@ import com.alibaba.csp.sentinel.slots.block.flow.param.ParameterMetric;
public class ParamFlowStatisticEntryCallback implements ProcessorSlotEntryCallback<DefaultNode> { public class ParamFlowStatisticEntryCallback implements ProcessorSlotEntryCallback<DefaultNode> {


@Override @Override
public void onPass(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args)
throws Exception {
public void onPass(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args) {
// The "hot spot" parameter metric is present only if parameter flow rules for the resource exist. // The "hot spot" parameter metric is present only if parameter flow rules for the resource exist.
ParameterMetric parameterMetric = ParamFlowSlot.getParamMetric(resourceWrapper); ParameterMetric parameterMetric = ParamFlowSlot.getParamMetric(resourceWrapper);


if (parameterMetric != null) { if (parameterMetric != null) {
parameterMetric.addPass(count, args);
parameterMetric.addThreadCount(args); parameterMetric.addThreadCount(args);
} }
} }


+ 1
- 2
sentinel-extension/sentinel-parameter-flow-control/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.command.CommandHandler View File

@@ -1,3 +1,2 @@
com.alibaba.csp.sentinel.command.handler.GetParamFlowRulesCommandHandler com.alibaba.csp.sentinel.command.handler.GetParamFlowRulesCommandHandler
com.alibaba.csp.sentinel.command.handler.ModifyParamFlowRulesCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchTopParamsCommandHandler
com.alibaba.csp.sentinel.command.handler.ModifyParamFlowRulesCommandHandler

+ 37
- 82
sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowCheckerTest.java View File

@@ -15,23 +15,29 @@
*/ */
package com.alibaba.csp.sentinel.slots.block.flow.param; package com.alibaba.csp.sentinel.slots.block.flow.param;


import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;

import com.alibaba.csp.sentinel.EntryType;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
import com.alibaba.csp.sentinel.slotchain.StringResourceWrapper;
import com.alibaba.csp.sentinel.slots.block.RuleConstant;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;


import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;


import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.alibaba.csp.sentinel.EntryType;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
import com.alibaba.csp.sentinel.slotchain.StringResourceWrapper;
import com.alibaba.csp.sentinel.slots.block.RuleConstant;
import com.alibaba.csp.sentinel.slots.statistic.cache.ConcurrentLinkedHashMapWrapper;
import com.alibaba.csp.sentinel.util.TimeUtil;


/** /**
* Test cases for {@link ParamFlowChecker}. * Test cases for {@link ParamFlowChecker}.
@@ -56,43 +62,21 @@ public class ParamFlowCheckerTest {
} }


@Test @Test
public void testSingleValueCheckQpsWithoutExceptionItems() {
final String resourceName = "testSingleValueCheckQpsWithoutExceptionItems";
final ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN);
int paramIdx = 0;

long threshold = 5L;

ParamFlowRule rule = new ParamFlowRule();
rule.setResource(resourceName);
rule.setCount(threshold);
rule.setParamIdx(paramIdx);

String valueA = "valueA";
String valueB = "valueB";
ParameterMetric metric = mock(ParameterMetric.class);
when(metric.getPassParamQps(paramIdx, valueA)).thenReturn((double)threshold - 1);
when(metric.getPassParamQps(paramIdx, valueB)).thenReturn((double)threshold + 1);
ParamFlowSlot.getMetricsMap().put(resourceWrapper, metric);

assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueB));
}

@Test
public void testSingleValueCheckQpsWithExceptionItems() {
public void testSingleValueCheckQpsWithExceptionItems() throws InterruptedException {
final String resourceName = "testSingleValueCheckQpsWithExceptionItems"; final String resourceName = "testSingleValueCheckQpsWithExceptionItems";
final ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN); final ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN);
TimeUtil.currentTimeMillis();
int paramIdx = 0; int paramIdx = 0;


long globalThreshold = 5L; long globalThreshold = 5L;
int thresholdB = 3;
int thresholdB = 0;
int thresholdD = 7; int thresholdD = 7;


ParamFlowRule rule = new ParamFlowRule(); ParamFlowRule rule = new ParamFlowRule();
rule.setResource(resourceName); rule.setResource(resourceName);
rule.setCount(globalThreshold); rule.setCount(globalThreshold);
rule.setParamIdx(paramIdx); rule.setParamIdx(paramIdx);
rule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER);


String valueA = "valueA"; String valueA = "valueA";
String valueB = "valueB"; String valueB = "valueB";
@@ -105,29 +89,13 @@ public class ParamFlowCheckerTest {
map.put(valueD, thresholdD); map.put(valueD, thresholdD);
rule.setParsedHotItems(map); rule.setParsedHotItems(map);


ParameterMetric metric = mock(ParameterMetric.class);
when(metric.getPassParamQps(paramIdx, valueA)).thenReturn((double)globalThreshold - 1);
when(metric.getPassParamQps(paramIdx, valueB)).thenReturn((double)globalThreshold - 1);
when(metric.getPassParamQps(paramIdx, valueC)).thenReturn((double)globalThreshold - 1);
when(metric.getPassParamQps(paramIdx, valueD)).thenReturn((double)globalThreshold + 1);
ParameterMetric metric = new ParameterMetric();
ParamFlowSlot.getMetricsMap().put(resourceWrapper, metric); ParamFlowSlot.getMetricsMap().put(resourceWrapper, metric);
metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));


assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueB)); assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueB));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueC));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueD));

when(metric.getPassParamQps(paramIdx, valueA)).thenReturn((double)globalThreshold);
when(metric.getPassParamQps(paramIdx, valueB)).thenReturn((double)thresholdB - 1L);
when(metric.getPassParamQps(paramIdx, valueC)).thenReturn((double)globalThreshold + 1);
when(metric.getPassParamQps(paramIdx, valueD)).thenReturn((double)globalThreshold - 1)
.thenReturn((double)thresholdD);

assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueB));
assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueC));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueD));
assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueD));
TimeUnit.SECONDS.sleep(3);
} }


@Test @Test
@@ -140,9 +108,7 @@ public class ParamFlowCheckerTest {
int thresholdB = 3; int thresholdB = 3;
int thresholdD = 7; int thresholdD = 7;


ParamFlowRule rule = new ParamFlowRule(resourceName)
.setCount(globalThreshold)
.setParamIdx(paramIdx)
ParamFlowRule rule = new ParamFlowRule(resourceName).setCount(globalThreshold).setParamIdx(paramIdx)
.setGrade(RuleConstant.FLOW_GRADE_THREAD); .setGrade(RuleConstant.FLOW_GRADE_THREAD);


String valueA = "valueA"; String valueA = "valueA";
@@ -171,8 +137,7 @@ public class ParamFlowCheckerTest {
when(metric.getThreadCount(paramIdx, valueA)).thenReturn(globalThreshold); when(metric.getThreadCount(paramIdx, valueA)).thenReturn(globalThreshold);
when(metric.getThreadCount(paramIdx, valueB)).thenReturn(thresholdB - 1L); when(metric.getThreadCount(paramIdx, valueB)).thenReturn(thresholdB - 1L);
when(metric.getThreadCount(paramIdx, valueC)).thenReturn(globalThreshold + 1); when(metric.getThreadCount(paramIdx, valueC)).thenReturn(globalThreshold + 1);
when(metric.getThreadCount(paramIdx, valueD)).thenReturn(globalThreshold - 1)
.thenReturn((long)thresholdD);
when(metric.getThreadCount(paramIdx, valueD)).thenReturn(globalThreshold - 1).thenReturn((long)thresholdD);


assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueB)); assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueB));
@@ -182,52 +147,42 @@ public class ParamFlowCheckerTest {
} }


@Test @Test
public void testPassLocalCheckForCollection() {
public void testPassLocalCheckForCollection() throws InterruptedException {
final String resourceName = "testPassLocalCheckForCollection"; final String resourceName = "testPassLocalCheckForCollection";
final ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN); final ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN);
int paramIdx = 0; int paramIdx = 0;
double globalThreshold = 10;
double globalThreshold = 1;


ParamFlowRule rule = new ParamFlowRule(resourceName)
.setParamIdx(paramIdx)
.setCount(globalThreshold);
ParamFlowRule rule = new ParamFlowRule(resourceName).setParamIdx(paramIdx).setCount(globalThreshold);


String v1 = "a", v2 = "B", v3 = "Cc"; String v1 = "a", v2 = "B", v3 = "Cc";
List<String> list = Arrays.asList(v1, v2, v3); List<String> list = Arrays.asList(v1, v2, v3);
ParameterMetric metric = mock(ParameterMetric.class);
when(metric.getPassParamQps(paramIdx, v1)).thenReturn(globalThreshold - 2)
.thenReturn(globalThreshold - 1);
when(metric.getPassParamQps(paramIdx, v2)).thenReturn(globalThreshold - 2)
.thenReturn(globalThreshold - 1);
when(metric.getPassParamQps(paramIdx, v3)).thenReturn(globalThreshold - 1)
.thenReturn(globalThreshold);
ParameterMetric metric = new ParameterMetric();
ParamFlowSlot.getMetricsMap().put(resourceWrapper, metric); ParamFlowSlot.getMetricsMap().put(resourceWrapper, metric);
metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTokenCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicInteger>(4000));


assertTrue(ParamFlowChecker.passCheck(resourceWrapper, rule, 1, list)); assertTrue(ParamFlowChecker.passCheck(resourceWrapper, rule, 1, list));
assertFalse(ParamFlowChecker.passCheck(resourceWrapper, rule, 1, list)); assertFalse(ParamFlowChecker.passCheck(resourceWrapper, rule, 1, list));
} }


@Test @Test
public void testPassLocalCheckForArray() {
public void testPassLocalCheckForArray() throws InterruptedException {
final String resourceName = "testPassLocalCheckForArray"; final String resourceName = "testPassLocalCheckForArray";
final ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN); final ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN);
int paramIdx = 0; int paramIdx = 0;
double globalThreshold = 10;
double globalThreshold = 1;


ParamFlowRule rule = new ParamFlowRule(resourceName)
.setParamIdx(paramIdx)
.setCount(globalThreshold);
ParamFlowRule rule = new ParamFlowRule(resourceName).setParamIdx(paramIdx)
.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER).setCount(globalThreshold);

TimeUtil.currentTimeMillis();


String v1 = "a", v2 = "B", v3 = "Cc"; String v1 = "a", v2 = "B", v3 = "Cc";
Object arr = new String[] {v1, v2, v3}; Object arr = new String[] {v1, v2, v3};
ParameterMetric metric = mock(ParameterMetric.class);
when(metric.getPassParamQps(paramIdx, v1)).thenReturn(globalThreshold - 2)
.thenReturn(globalThreshold - 1);
when(metric.getPassParamQps(paramIdx, v2)).thenReturn(globalThreshold - 2)
.thenReturn(globalThreshold - 1);
when(metric.getPassParamQps(paramIdx, v3)).thenReturn(globalThreshold - 1)
.thenReturn(globalThreshold);
ParameterMetric metric = new ParameterMetric();
ParamFlowSlot.getMetricsMap().put(resourceWrapper, metric); ParamFlowSlot.getMetricsMap().put(resourceWrapper, metric);
metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));


assertTrue(ParamFlowChecker.passCheck(resourceWrapper, rule, 1, arr)); assertTrue(ParamFlowChecker.passCheck(resourceWrapper, rule, 1, arr));
assertFalse(ParamFlowChecker.passCheck(resourceWrapper, rule, 1, arr)); assertFalse(ParamFlowChecker.passCheck(resourceWrapper, rule, 1, arr));


+ 280
- 0
sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowDefaultCheckerTest.java View File

@@ -0,0 +1,280 @@
package com.alibaba.csp.sentinel.slots.block.flow.param;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.alibaba.csp.sentinel.EntryType;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
import com.alibaba.csp.sentinel.slotchain.StringResourceWrapper;
import com.alibaba.csp.sentinel.slots.statistic.cache.ConcurrentLinkedHashMapWrapper;
import com.alibaba.csp.sentinel.test.AbstractTimeBasedTest;
import com.alibaba.csp.sentinel.util.TimeUtil;
/**
* @author jialiang.linjl
*/
public class ParamFlowDefaultCheckerTest extends AbstractTimeBasedTest {
@Test
public void testParamFlowDefaultCheckSingleQps() {
final String resourceName = "testParamFlowDefaultCheckSingleQps";
final ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN);
int paramIdx = 0;
long threshold = 5L;
ParamFlowRule rule = new ParamFlowRule();
rule.setResource(resourceName);
rule.setCount(threshold);
rule.setParamIdx(paramIdx);
String valueA = "valueA";
ParameterMetric metric = new ParameterMetric();
ParamFlowSlot.getMetricsMap().put(resourceWrapper, metric);
metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTokenCounterMap().put(rule,
new ConcurrentLinkedHashMapWrapper<Object, AtomicInteger>(4000));
// We mock the time directly to avoid unstable behaviour.
setCurrentMillis(System.currentTimeMillis());
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
sleep(3000);
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
}
@Test
public void testParamFlowDefaultCheckSingleQpsWithBurst() throws InterruptedException {
final String resourceName = "testParamFlowDefaultCheckSingleQpsWithBurst";
final ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN);
int paramIdx = 0;
long threshold = 5L;
ParamFlowRule rule = new ParamFlowRule();
rule.setResource(resourceName);
rule.setCount(threshold);
rule.setParamIdx(paramIdx);
rule.setBurstCount(3);
String valueA = "valueA";
ParameterMetric metric = new ParameterMetric();
ParamFlowSlot.getMetricsMap().put(resourceWrapper, metric);
metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTokenCounterMap().put(rule,
new ConcurrentLinkedHashMapWrapper<Object, AtomicInteger>(4000));
// We mock the time directly to avoid unstable behaviour.
setCurrentMillis(System.currentTimeMillis());
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
sleep(1002);
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
sleep(1002);
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
sleep(2000);
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
sleep(1002);
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
}
@Test
public void testParamFlowDefaultCheckQpsInDifferentDuration() throws InterruptedException {
final String resourceName = "testParamFlowDefaultCheckQpsInDifferentDuration";
final ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN);
int paramIdx = 0;
long threshold = 5L;
ParamFlowRule rule = new ParamFlowRule();
rule.setResource(resourceName);
rule.setCount(threshold);
rule.setParamIdx(paramIdx);
rule.setDurationInSec(60);
String valueA = "helloWorld";
ParameterMetric metric = new ParameterMetric();
ParamFlowSlot.getMetricsMap().put(resourceWrapper, metric);
metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTokenCounterMap().put(rule,
new ConcurrentLinkedHashMapWrapper<Object, AtomicInteger>(4000));
// We mock the time directly to avoid unstable behaviour.
setCurrentMillis(System.currentTimeMillis());
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
sleepSecond(1);
assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
sleepSecond(10);
assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
sleepSecond(30);
assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
sleepSecond(30);
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
}
@Test
public void testParamFlowDefaultCheckSingleValueCheckQpsMultipleThreads() throws Exception {
// In this test case we use the actual time.
useActualTime();
final String resourceName = "testParamFlowDefaultCheckSingleValueCheckQpsMultipleThreads";
final ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN);
int paramIdx = 0;
long threshold = 5L;
final ParamFlowRule rule = new ParamFlowRule();
rule.setResource(resourceName);
rule.setCount(threshold);
rule.setParamIdx(paramIdx);
final String valueA = "valueA";
ParameterMetric metric = new ParameterMetric();
ParamFlowSlot.getMetricsMap().put(resourceWrapper, metric);
metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTokenCounterMap().put(rule,
new ConcurrentLinkedHashMapWrapper<Object, AtomicInteger>(4000));
int threadCount = 40;
final CountDownLatch waitLatch = new CountDownLatch(threadCount);
final AtomicInteger successCount = new AtomicInteger();
for (int i = 0; i < threadCount; i++) {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
if (ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)) {
successCount.incrementAndGet();
}
waitLatch.countDown();
}
});
t.setName("sentinel-simulate-traffic-task-" + i);
t.start();
}
waitLatch.await();
assertEquals(successCount.get(), threshold);
successCount.set(0);
System.out.println("testParamFlowDefaultCheckSingleValueCheckQpsMultipleThreads: sleep for 3 seconds");
TimeUnit.SECONDS.sleep(3);
successCount.set(0);
final CountDownLatch waitLatch1 = new CountDownLatch(threadCount);
final long currentTime = TimeUtil.currentTimeMillis();
final long endTime = currentTime + rule.getDurationInSec() * 1000 - 1;
for (int i = 0; i < threadCount; i++) {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
long currentTime1 = currentTime;
while (currentTime1 <= endTime) {
if (ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)) {
successCount.incrementAndGet();
}
try {
TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(20));
} catch (InterruptedException e) {
e.printStackTrace();
}
currentTime1 = TimeUtil.currentTimeMillis();
}
waitLatch1.countDown();
}
});
t.setName("sentinel-simulate-traffic-task-" + i);
t.start();
}
waitLatch1.await();
assertEquals(successCount.get(), threshold);
}
@Before
public void setUp() throws Exception {
ParamFlowSlot.getMetricsMap().clear();
}
@After
public void tearDown() throws Exception {
ParamFlowSlot.getMetricsMap().clear();
}
}

+ 36
- 15
sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowSlotTest.java View File

@@ -15,19 +15,28 @@
*/ */
package com.alibaba.csp.sentinel.slots.block.flow.param; package com.alibaba.csp.sentinel.slots.block.flow.param;


import java.util.Collections;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;


import com.alibaba.csp.sentinel.EntryType;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
import com.alibaba.csp.sentinel.slotchain.StringResourceWrapper;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;


import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;


import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.alibaba.csp.sentinel.EntryType;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
import com.alibaba.csp.sentinel.slotchain.StringResourceWrapper;
import com.alibaba.csp.sentinel.slots.statistic.cache.CacheMap;
import com.alibaba.csp.sentinel.slots.statistic.cache.ConcurrentLinkedHashMapWrapper;
import com.alibaba.csp.sentinel.util.TimeUtil;


/** /**
* Test cases for {@link ParamFlowSlot}. * Test cases for {@link ParamFlowSlot}.
@@ -81,17 +90,21 @@ public class ParamFlowSlotTest {
String resourceName = "testEntryWhenParamFlowExists"; String resourceName = "testEntryWhenParamFlowExists";
ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN); ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN);
long argToGo = 1L; long argToGo = 1L;
double count = 10;
double count = 1;
ParamFlowRule rule = new ParamFlowRule(resourceName) ParamFlowRule rule = new ParamFlowRule(resourceName)
.setCount(count) .setCount(count)
.setBurstCount(0)
.setParamIdx(0); .setParamIdx(0);
ParamFlowRuleManager.loadRules(Collections.singletonList(rule)); ParamFlowRuleManager.loadRules(Collections.singletonList(rule));


ParameterMetric metric = mock(ParameterMetric.class); ParameterMetric metric = mock(ParameterMetric.class);
// First pass, then blocked.
when(metric.getPassParamQps(rule.getParamIdx(), argToGo))
.thenReturn(count - 1)
.thenReturn(count);

CacheMap<Object, AtomicLong> map = new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000);
CacheMap<Object, AtomicInteger> map2 = new ConcurrentLinkedHashMapWrapper<Object, AtomicInteger>(4000);
when(metric.getRuleTimeCounter(rule)).thenReturn(map);
when(metric.getRuleTokenCounter(rule)).thenReturn(map2);
map.put(argToGo, new AtomicLong(TimeUtil.currentTimeMillis()));

// Insert the mock metric to control pass or block. // Insert the mock metric to control pass or block.
ParamFlowSlot.getMetricsMap().put(resourceWrapper, metric); ParamFlowSlot.getMetricsMap().put(resourceWrapper, metric);


@@ -115,21 +128,29 @@ public class ParamFlowSlotTest {


@Test @Test
public void testInitParamMetrics() { public void testInitParamMetrics() {

ParamFlowRule rule = new ParamFlowRule();
rule.setParamIdx(1);
int index = 1; int index = 1;
String resourceName = "res-" + System.currentTimeMillis(); String resourceName = "res-" + System.currentTimeMillis();
ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN); ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN);


assertNull(ParamFlowSlot.getParamMetric(resourceWrapper)); assertNull(ParamFlowSlot.getParamMetric(resourceWrapper));


paramFlowSlot.initHotParamMetricsFor(resourceWrapper, index);
paramFlowSlot.initHotParamMetricsFor(resourceWrapper, rule);
ParameterMetric metric = ParamFlowSlot.getParamMetric(resourceWrapper); ParameterMetric metric = ParamFlowSlot.getParamMetric(resourceWrapper);
assertNotNull(metric); assertNotNull(metric);
assertNotNull(metric.getRollingParameters().get(index));
assertNotNull(metric.getRuleTimeCounterMap().get(rule));
assertNotNull(metric.getThreadCountMap().get(index)); assertNotNull(metric.getThreadCountMap().get(index));


// Duplicate init. // Duplicate init.
paramFlowSlot.initHotParamMetricsFor(resourceWrapper, index);
paramFlowSlot.initHotParamMetricsFor(resourceWrapper, rule);
assertSame(metric, ParamFlowSlot.getParamMetric(resourceWrapper));

ParamFlowRule rule2 = new ParamFlowRule();
rule2.setParamIdx(1);
assertSame(metric, ParamFlowSlot.getParamMetric(resourceWrapper)); assertSame(metric, ParamFlowSlot.getParamMetric(resourceWrapper));

} }


@Before @Before


+ 164
- 0
sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowThrottleRateLimitingCheckerTest.java View File

@@ -0,0 +1,164 @@
package com.alibaba.csp.sentinel.slots.block.flow.param;

import static org.junit.Assert.assertEquals;

import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import com.alibaba.csp.sentinel.EntryType;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
import com.alibaba.csp.sentinel.slotchain.StringResourceWrapper;
import com.alibaba.csp.sentinel.slots.block.RuleConstant;
import com.alibaba.csp.sentinel.slots.statistic.cache.ConcurrentLinkedHashMapWrapper;
import com.alibaba.csp.sentinel.util.TimeUtil;

/**
* @author jialiang.linjl
*/
public class ParamFlowThrottleRateLimitingCheckerTest {

@Test
public void testSingleValueThrottleCheckQps() throws Exception {
final String resourceName = "testSingleValueThrottleCheckQps";
final ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN);
int paramIdx = 0;
TimeUtil.currentTimeMillis();

long threshold = 5L;

ParamFlowRule rule = new ParamFlowRule();
rule.setResource(resourceName);
rule.setCount(threshold);
rule.setParamIdx(paramIdx);
rule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER);

String valueA = "valueA";
ParameterMetric metric = new ParameterMetric();
ParamFlowSlot.getMetricsMap().put(resourceWrapper, metric);
metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));

long currentTime = TimeUtil.currentTimeMillis();
long endTime = currentTime + rule.getDurationInSec() * 1000;
int successCount = 0;
while (currentTime <= endTime - 10) {
if (ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)) {
successCount++;
}
currentTime = TimeUtil.currentTimeMillis();
}
assertEquals(successCount, threshold);

System.out.println("testSingleValueThrottleCheckQps: sleep for 3 seconds");
TimeUnit.SECONDS.sleep(3);

currentTime = TimeUtil.currentTimeMillis();
endTime = currentTime + rule.getDurationInSec() * 1000;
successCount = 0;
while (currentTime <= endTime - 10) {
if (ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)) {
successCount++;
}
currentTime = TimeUtil.currentTimeMillis();
}
assertEquals(successCount, threshold);
}

@Test
public void testSingleValueThrottleCheckQpsMultipleThreads() throws Exception {
final String resourceName = "testSingleValueThrottleCheckQpsMultipleThreads";
final ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN);
int paramIdx = 0;

long threshold = 5L;

final ParamFlowRule rule = new ParamFlowRule(resourceName)
.setCount(threshold)
.setParamIdx(paramIdx)
.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER);

final String valueA = "valueA";
ParameterMetric metric = new ParameterMetric();
ParamFlowSlot.getMetricsMap().put(resourceWrapper, metric);
metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));

int threadCount = 40;
System.out.println(metric.getRuleTimeCounter(rule));

final CountDownLatch waitLatch = new CountDownLatch(threadCount);
final AtomicInteger successCount = new AtomicInteger();
for (int i = 0; i < threadCount; i++) {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
if (ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)) {
successCount.incrementAndGet();
}
waitLatch.countDown();
}

});
t.setName("sentinel-simulate-traffic-task-" + i);
t.start();
}
waitLatch.await();

assertEquals(successCount.get(), 1);
System.out.println(threadCount);
successCount.set(0);

System.out.println("testSingleValueThrottleCheckQpsMultipleThreads: sleep for 3 seconds");
TimeUnit.SECONDS.sleep(3);

successCount.set(0);
final CountDownLatch waitLatch1 = new CountDownLatch(threadCount);
final long currentTime = TimeUtil.currentTimeMillis();
final long endTime = currentTime + rule.getDurationInSec() * 1000 - 1;
for (int i = 0; i < threadCount; i++) {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
long currentTime1 = currentTime;
while (currentTime1 <= endTime) {
if (ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)) {
successCount.incrementAndGet();
}

Random random = new Random();

try {
TimeUnit.MILLISECONDS.sleep(random.nextInt(20));
} catch (InterruptedException e) {
e.printStackTrace();
}
currentTime1 = TimeUtil.currentTimeMillis();
}

waitLatch1.countDown();
}

});
t.setName("sentinel-simulate-traffic-task-" + i);
t.start();
}
waitLatch1.await();

assertEquals(successCount.get(), threshold);
}

@Before
public void setUp() throws Exception {
ParamFlowSlot.getMetricsMap().clear();
}

@After
public void tearDown() throws Exception {
ParamFlowSlot.getMetricsMap().clear();
}
}

+ 51
- 48
sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParameterMetricTest.java View File

@@ -15,19 +15,20 @@
*/ */
package com.alibaba.csp.sentinel.slots.block.flow.param; package com.alibaba.csp.sentinel.slots.block.flow.param;


import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;

import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;

import com.alibaba.csp.sentinel.slots.statistic.cache.CacheMap;
import com.alibaba.csp.sentinel.slots.statistic.metric.HotParameterLeapArray;
import java.util.concurrent.atomic.AtomicLong;


import org.junit.Test; import org.junit.Test;


import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.alibaba.csp.sentinel.slots.block.RuleConstant;
import com.alibaba.csp.sentinel.slots.statistic.cache.CacheMap;


/** /**
* Test cases for {@link ParameterMetric}. * Test cases for {@link ParameterMetric}.
@@ -38,42 +39,41 @@ import static org.mockito.Mockito.when;
public class ParameterMetricTest { public class ParameterMetricTest {


@Test @Test
public void testGetTopParamCount() {
ParameterMetric metric = new ParameterMetric();
int index = 1;
int n = 10;
RollingParamEvent event = RollingParamEvent.REQUEST_PASSED;
HotParameterLeapArray leapArray = mock(HotParameterLeapArray.class);
Map<Object, Double> topValues = new HashMap<Object, Double>() {{
put("a", 3d);
put("b", 7d);
}};
when(leapArray.getTopValues(event, n)).thenReturn(topValues);

// Get when not initialized.
assertEquals(0, metric.getTopPassParamCount(index, n).size());

metric.getRollingParameters().put(index, leapArray);
assertEquals(topValues, metric.getTopPassParamCount(index, n));
}

@Test
public void testInitAndClearHotParameterMetric() {
public void testInitAndClearParameterMetric() {
// Create a parameter metric for resource "abc".
ParameterMetric metric = new ParameterMetric(); ParameterMetric metric = new ParameterMetric();
int index = 1;
metric.initializeForIndex(index);
HotParameterLeapArray leapArray = metric.getRollingParameters().get(index);
CacheMap cacheMap = metric.getThreadCountMap().get(index);
assertNotNull(leapArray);
assertNotNull(cacheMap);


metric.initializeForIndex(index);
assertSame(leapArray, metric.getRollingParameters().get(index));
assertSame(cacheMap, metric.getThreadCountMap().get(index));
ParamFlowRule rule = new ParamFlowRule("abc")
.setParamIdx(1);
metric.initialize(rule);
CacheMap<Object, AtomicInteger> threadCountMap = metric.getThreadCountMap().get(rule.getParamIdx());
assertNotNull(threadCountMap);
CacheMap<Object, AtomicLong> timeRecordMap = metric.getRuleTimeCounter(rule);
assertNotNull(timeRecordMap);
metric.initialize(rule);
assertSame(threadCountMap, metric.getThreadCountMap().get(rule.getParamIdx()));
assertSame(timeRecordMap, metric.getRuleTimeCounter(rule));

ParamFlowRule rule2 = new ParamFlowRule("abc")
.setParamIdx(1);
metric.initialize(rule2);
CacheMap<Object, AtomicLong> timeRecordMap2 = metric.getRuleTimeCounter(rule2);
assertSame(timeRecordMap, timeRecordMap2);

rule2.setParamIdx(2);
metric.initialize(rule2);
assertNotSame(timeRecordMap2, metric.getRuleTimeCounter(rule2));

ParamFlowRule rule3 = new ParamFlowRule("abc")
.setParamIdx(1)
.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER);
metric.initialize(rule3);
assertNotSame(timeRecordMap, metric.getRuleTimeCounter(rule3));


metric.clear(); metric.clear();
assertEquals(0, metric.getRollingParameters().size());
assertEquals(0, metric.getThreadCountMap().size()); assertEquals(0, metric.getThreadCountMap().size());
assertEquals(0, metric.getRuleTimeCounterMap().size());
assertEquals(0, metric.getRuleTokenCounterMap().size());
} }


@Test @Test
@@ -84,12 +84,15 @@ public class ParameterMetricTest {
} }


private void testAddAndDecreaseThreadCount(int paramType) { private void testAddAndDecreaseThreadCount(int paramType) {
int paramIdx = 0;

ParamFlowRule rule = new ParamFlowRule();
rule.setParamIdx(0);

int n = 3; int n = 3;
long[] v = new long[] {19L, 3L, 8L}; long[] v = new long[] {19L, 3L, 8L};
ParameterMetric metric = new ParameterMetric(); ParameterMetric metric = new ParameterMetric();
metric.initializeForIndex(paramIdx);
assertTrue(metric.getThreadCountMap().containsKey(paramIdx));
metric.initialize(rule);
assertTrue(metric.getThreadCountMap().containsKey(rule.getParamIdx()));


switch (paramType) { switch (paramType) {
case PARAM_TYPE_ARRAY: case PARAM_TYPE_ARRAY:
@@ -107,13 +110,13 @@ public class ParameterMetricTest {
} }


assertEquals(1, metric.getThreadCountMap().size()); assertEquals(1, metric.getThreadCountMap().size());
CacheMap<Object, AtomicInteger> threadCountMap = metric.getThreadCountMap().get(paramIdx);
CacheMap<Object, AtomicInteger> threadCountMap = metric.getThreadCountMap().get(rule.getParamIdx());
assertEquals(v.length, threadCountMap.size()); assertEquals(v.length, threadCountMap.size());
for (long vs : v) { for (long vs : v) {
assertEquals(1, threadCountMap.get(vs).get()); assertEquals(1, threadCountMap.get(vs).get());
} }


for (int i = 1 ; i < n; i++) {
for (int i = 1; i < n; i++) {
switch (paramType) { switch (paramType) {
case PARAM_TYPE_ARRAY: case PARAM_TYPE_ARRAY:
metric.addThreadCount((Object)v); metric.addThreadCount((Object)v);
@@ -130,13 +133,13 @@ public class ParameterMetricTest {
} }
} }
assertEquals(1, metric.getThreadCountMap().size()); assertEquals(1, metric.getThreadCountMap().size());
threadCountMap = metric.getThreadCountMap().get(paramIdx);
threadCountMap = metric.getThreadCountMap().get(rule.getParamIdx());
assertEquals(v.length, threadCountMap.size()); assertEquals(v.length, threadCountMap.size());
for (long vs : v) { for (long vs : v) {
assertEquals(n, threadCountMap.get(vs).get()); assertEquals(n, threadCountMap.get(vs).get());
} }


for (int i = 1 ; i < n; i++) {
for (int i = 1; i < n; i++) {
switch (paramType) { switch (paramType) {
case PARAM_TYPE_ARRAY: case PARAM_TYPE_ARRAY:
metric.decreaseThreadCount((Object)v); metric.decreaseThreadCount((Object)v);
@@ -153,7 +156,7 @@ public class ParameterMetricTest {
} }
} }
assertEquals(1, metric.getThreadCountMap().size()); assertEquals(1, metric.getThreadCountMap().size());
threadCountMap = metric.getThreadCountMap().get(paramIdx);
threadCountMap = metric.getThreadCountMap().get(rule.getParamIdx());
assertEquals(v.length, threadCountMap.size()); assertEquals(v.length, threadCountMap.size());
for (long vs : v) { for (long vs : v) {
assertEquals(1, threadCountMap.get(vs).get()); assertEquals(1, threadCountMap.get(vs).get());
@@ -174,7 +177,7 @@ public class ParameterMetricTest {
break; break;
} }
assertEquals(1, metric.getThreadCountMap().size()); assertEquals(1, metric.getThreadCountMap().size());
threadCountMap = metric.getThreadCountMap().get(paramIdx);
threadCountMap = metric.getThreadCountMap().get(rule.getParamIdx());
assertEquals(0, threadCountMap.size()); assertEquals(0, threadCountMap.size());
} }




+ 58
- 0
sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/test/AbstractTimeBasedTest.java View File

@@ -0,0 +1,58 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.test;

import com.alibaba.csp.sentinel.util.TimeUtil;

import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

/**
* Mock support for {@link TimeUtil}
*
* @author jason
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({TimeUtil.class})
public abstract class AbstractTimeBasedTest {

private long currentMillis = 0;

{
PowerMockito.mockStatic(TimeUtil.class);
PowerMockito.when(TimeUtil.currentTimeMillis()).thenReturn(currentMillis);
}

protected final void useActualTime() {
PowerMockito.when(TimeUtil.currentTimeMillis()).thenCallRealMethod();
}

protected final void setCurrentMillis(long cur) {
currentMillis = cur;
PowerMockito.when(TimeUtil.currentTimeMillis()).thenReturn(currentMillis);
}

protected final void sleep(int t) {
currentMillis += t;
PowerMockito.when(TimeUtil.currentTimeMillis()).thenReturn(currentMillis);
}

protected final void sleepSecond(int timeSec) {
sleep(timeSec * 1000);
}
}

Loading…
Cancel
Save