瀏覽代碼

Add flow control by frequent (hot spot) parameters (#156)

- Add callback registry for statistic slot for extensions.
- Add a new module `sentinel-parameter-flow-control` under `sentinel-extension`.
- Add a `CacheMap` interface to provide abstraction for cache. We use ConcurrentLinkedHashMap as the default implementation (LRU strategy)..
- Add a `ParameterMetric` class as frequent parameter metrics for a specific resource. The metric map is located in `ParamFlowSlot` rather than `ClusterNode`.
- Implement `ParameterLeapArray` as statistic data structure for frequent parameters in a period of time window.
- Add `ParamFlowSlot` as the checker slot; Add `ParamFlowChecker` to do rule checking; Add `ParamFlowRuleManager` to do rule managing.
- The statistic metrics for frequent parameters is enabled only if the related resource has configured parameter flow rule; Parameter metrics for removed rules will be cleared automatically.
- Leverage extensible `SlotChainBuilder` to provide a `HotParamSlotChainBuilder`.
- Add command handlers for hot param rules.
- Add test cases and demo.
master
Eric Zhao GitHub 6 年之前
父節點
當前提交
88a02623ac
沒有發現已知的金鑰在資料庫的簽署中 GPG Key ID: 4AEE18F83AFDEB23
共有 40 個文件被更改,包括 2977 次插入7 次删除
  1. +10
    -0
      pom.xml
  2. +3
    -3
      sentinel-core/src/main/java/com/alibaba/csp/sentinel/slotchain/ProcessorSlot.java
  3. +32
    -0
      sentinel-core/src/main/java/com/alibaba/csp/sentinel/slotchain/ProcessorSlotEntryCallback.java
  4. +29
    -0
      sentinel-core/src/main/java/com/alibaba/csp/sentinel/slotchain/ProcessorSlotExitCallback.java
  5. +18
    -4
      sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/StatisticSlot.java
  6. +85
    -0
      sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/StatisticSlotCallbackRegistry.java
  7. +1
    -0
      sentinel-demo/pom.xml
  8. +30
    -0
      sentinel-demo/sentinel-demo-parameter-flow-control/pom.xml
  9. +69
    -0
      sentinel-demo/sentinel-demo-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/demo/flow/param/ParamFlowQpsDemo.java
  10. +167
    -0
      sentinel-demo/sentinel-demo-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/demo/flow/param/ParamFlowQpsRunner.java
  11. +1
    -0
      sentinel-extension/pom.xml
  12. +61
    -0
      sentinel-extension/sentinel-parameter-flow-control/README.md
  13. +43
    -0
      sentinel-extension/sentinel-parameter-flow-control/pom.xml
  14. +64
    -0
      sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/command/handler/FetchTopParamsCommandHandler.java
  15. +36
    -0
      sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/command/handler/GetParamFlowRulesCommandHandler.java
  16. +95
    -0
      sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/command/handler/ModifyParamFlowRulesCommandHandler.java
  17. +35
    -0
      sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/init/ParamFlowStatisticSlotCallbackInit.java
  18. +52
    -0
      sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/HotParamSlotChainBuilder.java
  19. +100
    -0
      sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowChecker.java
  20. +48
    -0
      sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowException.java
  21. +101
    -0
      sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowItem.java
  22. +156
    -0
      sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowRule.java
  23. +233
    -0
      sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowRuleManager.java
  24. +158
    -0
      sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowSlot.java
  25. +147
    -0
      sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParameterMetric.java
  26. +31
    -0
      sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/RollingParamEvent.java
  27. +49
    -0
      sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/statistic/ParamFlowStatisticEntryCallback.java
  28. +45
    -0
      sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/statistic/cache/CacheMap.java
  29. +92
    -0
      sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/statistic/cache/ConcurrentLinkedHashMapWrapper.java
  30. +67
    -0
      sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/statistic/data/ParamMapBucket.java
  31. +127
    -0
      sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/statistic/metric/HotParameterLeapArray.java
  32. +3
    -0
      sentinel-extension/sentinel-parameter-flow-control/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.command.CommandHandler
  33. +1
    -0
      sentinel-extension/sentinel-parameter-flow-control/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.init.InitFunc
  34. +1
    -0
      sentinel-extension/sentinel-parameter-flow-control/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.slotchain.SlotChainBuilder
  35. +193
    -0
      sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowCheckerTest.java
  36. +192
    -0
      sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowRuleManagerTest.java
  37. +118
    -0
      sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowSlotTest.java
  38. +75
    -0
      sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParameterMetricTest.java
  39. +77
    -0
      sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/statistic/data/ParamMapBucketTest.java
  40. +132
    -0
      sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/statistic/metric/HotParameterLeapArrayTest.java

+ 10
- 0
pom.xml 查看文件

@@ -87,6 +87,11 @@
<artifactId>sentinel-annotation-aspectj</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-parameter-flow-control</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-datasource-extension</artifactId>
@@ -112,6 +117,11 @@
<artifactId>sentinel-transport-simple-http</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-transport-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-adapter</artifactId>


+ 3
- 3
sentinel-core/src/main/java/com/alibaba/csp/sentinel/slotchain/ProcessorSlot.java 查看文件

@@ -34,7 +34,7 @@ public interface ProcessorSlot<T> {
* @param param Generics parameter, usually is a {@link com.alibaba.csp.sentinel.node.Node}
* @param count tokens needed
* @param args parameters of the original call
* @throws Throwable
* @throws Throwable blocked exception or unexpected error
*/
void entry(Context context, ResourceWrapper resourceWrapper, T param, int count, Object... args)
throws Throwable;
@@ -44,10 +44,10 @@ public interface ProcessorSlot<T> {
*
* @param context current {@link Context}
* @param resourceWrapper current resource
* @param obj
* @param obj relevant object (e.g. Node)
* @param count tokens needed
* @param args parameters of the original call
* @throws Throwable
* @throws Throwable blocked exception or unexpected error
*/
void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, Object... args)
throws Throwable;


+ 32
- 0
sentinel-core/src/main/java/com/alibaba/csp/sentinel/slotchain/ProcessorSlotEntryCallback.java 查看文件

@@ -0,0 +1,32 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.slotchain;

import com.alibaba.csp.sentinel.context.Context;
import com.alibaba.csp.sentinel.slots.block.BlockException;

/**
* Callback for entering {@link com.alibaba.csp.sentinel.slots.statistic.StatisticSlot} (passed and blocked).
*
* @author Eric Zhao
* @since 0.2.0
*/
public interface ProcessorSlotEntryCallback<T> {

void onPass(Context context, ResourceWrapper resourceWrapper, T param, int count, Object... args) throws Exception;

void onBlocked(BlockException ex, Context context, ResourceWrapper resourceWrapper, T param, int count, Object... args);
}

+ 29
- 0
sentinel-core/src/main/java/com/alibaba/csp/sentinel/slotchain/ProcessorSlotExitCallback.java 查看文件

@@ -0,0 +1,29 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.slotchain;

import com.alibaba.csp.sentinel.context.Context;

/**
* Callback for exiting {@link com.alibaba.csp.sentinel.slots.statistic.StatisticSlot} (passed and blocked).
*
* @author Eric Zhao
* @since 0.2.0
*/
public interface ProcessorSlotExitCallback {

void onExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);
}

+ 18
- 4
sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/StatisticSlot.java 查看文件

@@ -15,6 +15,10 @@
*/
package com.alibaba.csp.sentinel.slots.statistic;

import java.util.Collection;

import com.alibaba.csp.sentinel.slotchain.ProcessorSlotEntryCallback;
import com.alibaba.csp.sentinel.slotchain.ProcessorSlotExitCallback;
import com.alibaba.csp.sentinel.util.TimeUtil;
import com.alibaba.csp.sentinel.Constants;
import com.alibaba.csp.sentinel.EntryType;
@@ -39,13 +43,13 @@ import com.alibaba.csp.sentinel.slots.block.BlockException;
* </p>
*
* @author jialiang.linjl
* @author Eric Zhao
*/
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args)
throws Throwable {

try {
fireEntry(context, resourceWrapper, node, count, args);
node.increaseThreadNum();
@@ -61,6 +65,9 @@ public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
Constants.ENTRY_NODE.addPassRequest();
}

for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (BlockException e) {
context.getCurEntry().setError(e);

@@ -74,6 +81,10 @@ public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
Constants.ENTRY_NODE.increaseBlockedQps();
}

for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onBlocked(e, context, resourceWrapper, node, count, args);
}

throw e;
} catch (Throwable e) {
context.getCurEntry().setError(e);
@@ -117,11 +128,14 @@ public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
Constants.ENTRY_NODE.decreaseThreadNum();
}
} else {
// error may happen
// node.rt(-2);
// Error may happen.
}

Collection<ProcessorSlotExitCallback> exitCallbacks = StatisticSlotCallbackRegistry.getExitCallbacks();
for (ProcessorSlotExitCallback handler : exitCallbacks) {
handler.onExit(context, resourceWrapper, count, args);
}

fireExit(context, resourceWrapper, count);
}

}

+ 85
- 0
sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/StatisticSlotCallbackRegistry.java 查看文件

@@ -0,0 +1,85 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.slots.statistic;

import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import com.alibaba.csp.sentinel.node.DefaultNode;
import com.alibaba.csp.sentinel.slotchain.ProcessorSlotEntryCallback;
import com.alibaba.csp.sentinel.slotchain.ProcessorSlotExitCallback;

/**
* <p>
* Callback registry for {@link StatisticSlot}. Now two kind of callbacks are supported:
* <ul>
* <li>{@link ProcessorSlotEntryCallback}: callback for entry (passed and blocked)</li>
* <li>{@link ProcessorSlotExitCallback}: callback for exiting {@link StatisticSlot}</li>
* </ul>
* </p>
*
* @author Eric Zhao
* @since 0.2.0
*/
public final class StatisticSlotCallbackRegistry {

private static final Map<String, ProcessorSlotEntryCallback<DefaultNode>> entryCallbackMap
= new ConcurrentHashMap<String, ProcessorSlotEntryCallback<DefaultNode>>();

private static final Map<String, ProcessorSlotExitCallback> exitCallbackMap
= new ConcurrentHashMap<String, ProcessorSlotExitCallback>();

public static void clearEntryCallback() {
entryCallbackMap.clear();
}

public static void clearExitCallback() {
exitCallbackMap.clear();
}

public static void addEntryCallback(String key, ProcessorSlotEntryCallback<DefaultNode> callback) {
entryCallbackMap.put(key, callback);
}

public static void addExitCallback(String key, ProcessorSlotExitCallback callback) {
exitCallbackMap.put(key, callback);
}

public static ProcessorSlotEntryCallback<DefaultNode> removeEntryCallback(String key) {
if (key == null) {
return null;
}
return entryCallbackMap.remove(key);
}

public static ProcessorSlotExitCallback removeExitCallback(String key) {
if (key == null) {
return null;
}
return exitCallbackMap.remove(key);
}

public static Collection<ProcessorSlotEntryCallback<DefaultNode>> getEntryCallbacks() {
return entryCallbackMap.values();
}

public static Collection<ProcessorSlotExitCallback> getExitCallbacks() {
return exitCallbackMap.values();
}

private StatisticSlotCallbackRegistry() {}
}

+ 1
- 0
sentinel-demo/pom.xml 查看文件

@@ -26,6 +26,7 @@
<module>sentinel-demo-zookeeper-datasource</module>
<module>sentinel-demo-apollo-datasource</module>
<module>sentinel-demo-annotation-spring-aop</module>
<module>sentinel-demo-parameter-flow-control</module>
</modules>

<dependencies>


+ 30
- 0
sentinel-demo/sentinel-demo-parameter-flow-control/pom.xml 查看文件

@@ -0,0 +1,30 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>sentinel-demo</artifactId>
<groupId>com.alibaba.csp</groupId>
<version>0.2.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>sentinel-demo-parameter-flow-control</artifactId>

<properties>
<java.source.version>1.8</java.source.version>
<java.target.version>1.8</java.target.version>
</properties>

<dependencies>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-parameter-flow-control</artifactId>
</dependency>

<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-transport-simple-http</artifactId>
</dependency>
</dependencies>
</project>

+ 69
- 0
sentinel-demo/sentinel-demo-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/demo/flow/param/ParamFlowQpsDemo.java 查看文件

@@ -0,0 +1,69 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.demo.flow.param;

import java.util.Collections;

import com.alibaba.csp.sentinel.slots.block.RuleConstant;
import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowItem;
import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowRuleManager;

/**
* This demo demonstrates flow control by frequent ("hot spot") parameters.
*
* @author Eric Zhao
* @since 0.2.0
*/
public class ParamFlowQpsDemo {

private static final int PARAM_A = 1;
private static final int PARAM_B = 2;
private static final int PARAM_C = 3;
private static final int PARAM_D = 4;

/**
* Here we prepare different parameters to validate flow control by parameters.
*/
private static final Integer[] PARAMS = new Integer[] {PARAM_A, PARAM_B, PARAM_C, PARAM_D};

private static final String RESOURCE_KEY = "resA";

public static void main(String[] args) {
initHotParamFlowRules();

final int threadCount = 8;
ParamFlowQpsRunner<Integer> runner = new ParamFlowQpsRunner<>(PARAMS, RESOURCE_KEY, threadCount, 120);
runner.simulateTraffic();
runner.tick();
}

private static void initHotParamFlowRules() {
// QPS mode, threshold is 5 for every frequent "hot spot" parameter in index 0 (the first arg).
ParamFlowRule rule = new ParamFlowRule(RESOURCE_KEY)
.setParamIdx(0)
.setBlockGrade(RuleConstant.FLOW_GRADE_QPS)
.setCount(5);
// We can set threshold count for specific parameter value individually.
// Here we add an exception item. That means: QPS threshold of entries with parameter `PARAM_B` (type: int)
// in index 0 will be 10, rather than the global threshold (5).
ParamFlowItem item = new ParamFlowItem().setObject(String.valueOf(PARAM_B))
.setClassType(int.class.getName())
.setCount(10);
rule.setParamFlowItemList(Collections.singletonList(item));
ParamFlowRuleManager.loadRules(Collections.singletonList(rule));
}
}

+ 167
- 0
sentinel-demo/sentinel-demo-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/demo/flow/param/ParamFlowQpsRunner.java 查看文件

@@ -0,0 +1,167 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.demo.flow.param;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import com.alibaba.csp.sentinel.Entry;
import com.alibaba.csp.sentinel.EntryType;
import com.alibaba.csp.sentinel.SphU;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.alibaba.csp.sentinel.util.StringUtil;
import com.alibaba.csp.sentinel.util.TimeUtil;

/**
* A traffic runner to simulate flow for different parameters.
*
* @author Eric Zhao
* @since 0.2.0
*/
class ParamFlowQpsRunner<T> {

private final T[] params;
private final String resourceName;
private int seconds;
private final int threadCount;

private final Map<T, AtomicLong> passCountMap = new ConcurrentHashMap<>();

private volatile boolean stop = false;

public ParamFlowQpsRunner(T[] params, String resourceName, int threadCount, int seconds) {
assertTrue(params != null && params.length > 0, "Parameter array should not be empty");
assertTrue(StringUtil.isNotBlank(resourceName), "Resource name cannot be empty");
assertTrue(seconds > 0, "Time period should be positive");
assertTrue(threadCount > 0 && threadCount <= 1000, "Invalid thread count");
this.params = params;
this.resourceName = resourceName;
this.seconds = seconds;
this.threadCount = threadCount;

for (T param : params) {
assertTrue(param != null, "Parameters should not be null");
passCountMap.putIfAbsent(param, new AtomicLong());
}
}

private void assertTrue(boolean b, String message) {
if (!b) {
throw new IllegalArgumentException(message);
}
}

/**
* Pick one of provided parameters randomly.
*
* @return picked parameter
*/
private T generateParam() {
int i = ThreadLocalRandom.current().nextInt(0, params.length);
return params[i];
}

void simulateTraffic() {
for (int i = 0; i < threadCount; i++) {
Thread t = new Thread(new RunTask());
t.setName("sentinel-simulate-traffic-task-" + i);
t.start();
}
}

void tick() {
Thread timer = new Thread(new TimerTask());
timer.setName("sentinel-timer-task");
timer.start();
}

private void passFor(T param) {
passCountMap.get(param).incrementAndGet();
}

final class RunTask implements Runnable {
@Override
public void run() {
while (!stop) {
Entry entry = null;

try {
T param = generateParam();
entry = SphU.entry(resourceName, EntryType.IN, 1, param);
// Add pass for parameter.
passFor(param);
} catch (BlockException e1) {
// block.incrementAndGet();
} catch (Exception e2) {
// biz exception
} finally {
// total.incrementAndGet();
if (entry != null) {
entry.exit();
}
}

try {
TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(0, 10));
} catch (InterruptedException e) {
// ignore
}
}
}
}

final class TimerTask implements Runnable {
@Override
public void run() {
long start = System.currentTimeMillis();
System.out.println("Begin to run! Go go go!");
System.out.println("See corresponding metrics.log for accurate statistic data");

Map<T, Long> map = new HashMap<>(params.length);
for (T param : params) {
map.putIfAbsent(param, 0L);
}
while (!stop) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
// There may be a mismatch for time window of internal sliding window.
// See corresponding `metrics.log` for accurate statistic log.
for (T param : params) {
long globalPass = passCountMap.get(param).get();
long oldPass = map.get(param);
long oneSecondPass = globalPass - oldPass;
map.put(param, globalPass);
System.out.println(String.format("[%d][%d] Hot param metrics for resource %s: "
+ "pass count for param <%s> is %d",
seconds, TimeUtil.currentTimeMillis(), resourceName, param, oneSecondPass));
}
if (seconds-- <= 0) {
stop = true;
}
}

long cost = System.currentTimeMillis() - start;
System.out.println("Time cost: " + cost + " ms");
System.exit(0);
}
}
}

+ 1
- 0
sentinel-extension/pom.xml 查看文件

@@ -18,6 +18,7 @@
<module>sentinel-datasource-apollo</module>
<module>sentinel-datasource-redis</module>
<module>sentinel-annotation-aspectj</module>
<module>sentinel-parameter-flow-control</module>
</modules>

</project>

+ 61
- 0
sentinel-extension/sentinel-parameter-flow-control/README.md 查看文件

@@ -0,0 +1,61 @@
# Sentinel Parameter Flow Control

This component provides functionality of flow control by frequent ("hot spot") parameters.

## Usage

To use Sentinel Parameter Flow Control, you need to add the following dependency to `pom.xml`:

```xml
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-parameter-flow-control</artifactId>
<version>x.y.z</version>
</dependency>
```

First you need to pass parameters with the following `SphU.entry` overloaded methods:

```java
public static Entry entry(String name, EntryType type, int count, Object... args) throws BlockException

public static Entry entry(Method method, EntryType type, int count, Object... args) throws BlockException
```

For example, if there are two parameters to provide, you can:

```java
// paramA in index 0, paramB in index 1.
SphU.entry(resourceName, EntryType.IN, 1, paramA, paramB);
```

Then you can configure parameter flow control rules via `loadRules` method in `ParamFlowRuleManager`:

```java
// QPS mode, threshold is 5 for every frequent "hot spot" parameter in index 0 (the first arg).
ParamFlowRule rule = new ParamFlowRule(RESOURCE_KEY)
.setParamIdx(0)
.setCount(5);
// We can set threshold count for specific parameter value individually.
// Here we add an exception item. That means: QPS threshold of entries with parameter `PARAM_B` (type: int)
// in index 0 will be 10, rather than the global threshold (5).
ParamFlowItem item = new ParamFlowItem().setObject(String.valueOf(PARAM_B))
.setClassType(int.class.getName())
.setCount(10);
rule.setParamFlowItemList(Collections.singletonList(item));
ParamFlowRuleManager.loadRules(Collections.singletonList(rule));
```

The description for fields of `ParamFlowRule`:

| Field | Description | Default |
| :----: | :----| :----|
| resource| resource name (**required**) ||
| count | flow control threshold (**required**) ||
| blockGrade | flow control mode (only QPS mode is supported) | QPS mode |
| paramIdx | the index of provided parameter in `SphU.entry(xxx, args)` (**required**) ||
| paramFlowItemList | the exception items of parameter; you can set threshold to a specific parameter value ||


Now the parameter flow control rules will take effect.


+ 43
- 0
sentinel-extension/sentinel-parameter-flow-control/pom.xml 查看文件

@@ -0,0 +1,43 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>sentinel-extension</artifactId>
<groupId>com.alibaba.csp</groupId>
<version>0.2.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>sentinel-parameter-flow-control</artifactId>
<packaging>jar</packaging>

<dependencies>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-core</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-transport-common</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.googlecode.concurrentlinkedhashmap</groupId>
<artifactId>concurrentlinkedhashmap-lru</artifactId>
<version>1.4.2</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

+ 64
- 0
sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/command/handler/FetchTopParamsCommandHandler.java 查看文件

@@ -0,0 +1,64 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.command.handler;

import java.util.Map;

import com.alibaba.csp.sentinel.command.CommandHandler;
import com.alibaba.csp.sentinel.command.CommandRequest;
import com.alibaba.csp.sentinel.command.CommandResponse;
import com.alibaba.csp.sentinel.command.annotation.CommandMapping;
import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowSlot;
import com.alibaba.csp.sentinel.slots.block.flow.param.ParameterMetric;
import com.alibaba.csp.sentinel.util.StringUtil;
import com.alibaba.fastjson.JSON;

/**
* @author Eric Zhao
* @since 0.2.0
*/
@CommandMapping(name = "topParams")
public class FetchTopParamsCommandHandler implements CommandHandler<String> {

@Override
public CommandResponse<String> handle(CommandRequest request) {
String resourceName = request.getParam("res");
if (StringUtil.isBlank(resourceName)) {
return CommandResponse.ofFailure(new IllegalArgumentException("Invalid parameter: res"));
}
String idx = request.getParam("idx");
int index;
try {
index = Integer.valueOf(idx);
} catch (Exception ex) {
return CommandResponse.ofFailure(ex, "Invalid parameter: idx");
}
String n = request.getParam("n");
int amount;
try {
amount = Integer.valueOf(n);
} catch (Exception ex) {
return CommandResponse.ofFailure(ex, "Invalid parameter: n");
}
ParameterMetric metric = ParamFlowSlot.getHotParamMetricForName(resourceName);
if (metric == null) {
return CommandResponse.ofSuccess("{}");
}
Map<Object, Double> values = metric.getTopPassParamCount(index, amount);

return CommandResponse.ofSuccess(JSON.toJSONString(values));
}
}

+ 36
- 0
sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/command/handler/GetParamFlowRulesCommandHandler.java 查看文件

@@ -0,0 +1,36 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.command.handler;

import com.alibaba.csp.sentinel.command.CommandHandler;
import com.alibaba.csp.sentinel.command.CommandRequest;
import com.alibaba.csp.sentinel.command.CommandResponse;
import com.alibaba.csp.sentinel.command.annotation.CommandMapping;
import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowRuleManager;
import com.alibaba.fastjson.JSON;

/**
* @author Eric Zhao
* @since 0.2.0
*/
@CommandMapping(name = "getParamFlowRules")
public class GetParamFlowRulesCommandHandler implements CommandHandler<String> {

@Override
public CommandResponse<String> handle(CommandRequest request) {
return CommandResponse.ofSuccess(JSON.toJSONString(ParamFlowRuleManager.getRules()));
}
}

+ 95
- 0
sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/command/handler/ModifyParamFlowRulesCommandHandler.java 查看文件

@@ -0,0 +1,95 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.command.handler;

import java.net.URLDecoder;
import java.util.List;

import com.alibaba.csp.sentinel.command.CommandHandler;
import com.alibaba.csp.sentinel.command.CommandRequest;
import com.alibaba.csp.sentinel.command.CommandResponse;
import com.alibaba.csp.sentinel.command.annotation.CommandMapping;
import com.alibaba.csp.sentinel.datasource.WritableDataSource;
import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowRuleManager;
import com.alibaba.csp.sentinel.util.StringUtil;
import com.alibaba.fastjson.JSONArray;

/**
* @author Eric Zhao
* @since 0.2.0
*/
@CommandMapping(name = "setParamFlowRules")
public class ModifyParamFlowRulesCommandHandler implements CommandHandler<String> {

private static WritableDataSource<List<ParamFlowRule>> paramFlowWds = null;

@Override
public CommandResponse<String> handle(CommandRequest request) {
String data = request.getParam("data");
if (StringUtil.isBlank(data)) {
return CommandResponse.ofFailure(new IllegalArgumentException("Bad data"));
}
try {
data = URLDecoder.decode(data, "utf-8");
} catch (Exception e) {
RecordLog.info("Decode rule data error", e);
return CommandResponse.ofFailure(e, "decode rule data error");
}

RecordLog.info(String.format("[API Server] Receiving rule change (type:parameter flow rule): %s", data));

String result = SUCCESS_MSG;
List<ParamFlowRule> flowRules = JSONArray.parseArray(data, ParamFlowRule.class);
ParamFlowRuleManager.loadRules(flowRules);
if (!writeToDataSource(paramFlowWds, flowRules)) {
result = WRITE_DS_FAILURE_MSG;
}
return CommandResponse.ofSuccess(result);
}

/**
* Write target value to given data source.
*
* @param dataSource writable data source
* @param value target value to save
* @param <T> value type
* @return true if write successful or data source is empty; false if error occurs
*/
private <T> boolean writeToDataSource(WritableDataSource<T> dataSource, T value) {
if (dataSource != null) {
try {
dataSource.write(value);
} catch (Exception e) {
RecordLog.warn("Write data source failed", e);
return false;
}
}
return true;
}

public synchronized static WritableDataSource<List<ParamFlowRule>> getWritableDataSource() {
return paramFlowWds;
}

public synchronized static void setWritableDataSource(WritableDataSource<List<ParamFlowRule>> hotParamWds) {
ModifyParamFlowRulesCommandHandler.paramFlowWds = hotParamWds;
}

private static final String SUCCESS_MSG = "success";
private static final String WRITE_DS_FAILURE_MSG = "partial success (write data source failed)";
}

+ 35
- 0
sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/init/ParamFlowStatisticSlotCallbackInit.java 查看文件

@@ -0,0 +1,35 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.init;

import com.alibaba.csp.sentinel.slots.statistic.ParamFlowStatisticEntryCallback;
import com.alibaba.csp.sentinel.slots.statistic.StatisticSlotCallbackRegistry;

/**
* Init function for adding callbacks to {@link StatisticSlotCallbackRegistry} to record metrics
* for frequent parameters in {@link com.alibaba.csp.sentinel.slots.statistic.StatisticSlot}.
*
* @author Eric Zhao
* @since 0.2.0
*/
public class ParamFlowStatisticSlotCallbackInit implements InitFunc {

@Override
public void init() {
StatisticSlotCallbackRegistry.addEntryCallback(ParamFlowStatisticEntryCallback.class.getName(),
new ParamFlowStatisticEntryCallback());
}
}

+ 52
- 0
sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/HotParamSlotChainBuilder.java 查看文件

@@ -0,0 +1,52 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.slots;

import com.alibaba.csp.sentinel.slotchain.DefaultProcessorSlotChain;
import com.alibaba.csp.sentinel.slotchain.ProcessorSlotChain;
import com.alibaba.csp.sentinel.slotchain.SlotChainBuilder;
import com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot;
import com.alibaba.csp.sentinel.slots.block.flow.FlowSlot;
import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowSlot;
import com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot;
import com.alibaba.csp.sentinel.slots.logger.LogSlot;
import com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot;
import com.alibaba.csp.sentinel.slots.statistic.StatisticSlot;
import com.alibaba.csp.sentinel.slots.system.SystemSlot;

/**
* @author Eric Zhao
* @since 0.2.0
*/
public class HotParamSlotChainBuilder implements SlotChainBuilder {

@Override
public ProcessorSlotChain build() {
ProcessorSlotChain chain = new DefaultProcessorSlotChain();
chain.addLast(new NodeSelectorSlot());
chain.addLast(new ClusterBuilderSlot());
chain.addLast(new LogSlot());
chain.addLast(new StatisticSlot());
chain.addLast(new ParamFlowSlot());
chain.addLast(new SystemSlot());
chain.addLast(new AuthoritySlot());
chain.addLast(new FlowSlot());
chain.addLast(new DegradeSlot());

return chain;
}
}

+ 100
- 0
sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowChecker.java 查看文件

@@ -0,0 +1,100 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.slots.block.flow.param;

import java.lang.reflect.Array;
import java.util.Collection;
import java.util.Set;

import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
import com.alibaba.csp.sentinel.slots.block.RuleConstant;

/**
* @author Eric Zhao
* @since 0.2.0
*/
final class ParamFlowChecker {

static boolean passCheck(ResourceWrapper resourceWrapper, /*@Valid*/ ParamFlowRule rule, /*@Valid*/ int count,
Object... args) {
if (args == null) {
return true;
}

int paramIdx = rule.getParamIdx();
if (args.length <= paramIdx) {
return true;
}

Object value = args[paramIdx];

return passLocalCheck(resourceWrapper, rule, count, value);
}

private static ParameterMetric getHotParameters(ResourceWrapper resourceWrapper) {
// Should not be null.
return ParamFlowSlot.getParamMetric(resourceWrapper);
}

private static boolean passLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count, Object value) {
try {
if (Collection.class.isAssignableFrom(value.getClass())) {
for (Object param : ((Collection)value)) {
if (!passSingleValueCheck(resourceWrapper, rule, count, param)) {
return false;
}
}
} else if (value.getClass().isArray()) {
int length = Array.getLength(value);
for (int i = 0; i < length; i++) {
Object param = Array.get(value, i);
if (!passSingleValueCheck(resourceWrapper, rule, count, param)) {
return false;
}
}
} else {
return passSingleValueCheck(resourceWrapper, rule, count, value);
}
} catch (Throwable e) {
RecordLog.info("[ParamFlowChecker] Unexpected error", e);
}

return true;
}

static boolean passSingleValueCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count, Object value) {
Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
if (rule.getBlockGrade() == RuleConstant.FLOW_GRADE_QPS) {
double curCount = getHotParameters(resourceWrapper).getPassParamQps(rule.getParamIdx(), value);

if (exclusionItems.contains(value)) {
// Pass check for exclusion items.
int itemQps = rule.getParsedHotItems().get(value);
return curCount + count <= itemQps;
} else if (curCount + count > rule.getCount()) {
if ((curCount - rule.getCount()) < 1 && (curCount - rule.getCount()) > 0) {
return true;
}
return false;
}
}

return true;
}

private ParamFlowChecker() {}
}

+ 48
- 0
sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowException.java 查看文件

@@ -0,0 +1,48 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.slots.block.flow.param;

import com.alibaba.csp.sentinel.slots.block.BlockException;

/**
* Block exception for frequent ("hot-spot") parameter flow control.
*
* @author jialiang.linjl
* @since 0.2.0
*/
public class ParamFlowException extends BlockException {

private final String resourceName;

public ParamFlowException(String resourceName, String message, Throwable cause) {
super(message, cause);
this.resourceName = resourceName;
}

public ParamFlowException(String resourceName, String message) {
super(message, message);
this.resourceName = resourceName;
}

public String getResourceName() {
return resourceName;
}

@Override
public Throwable fillInStackTrace() {
return this;
}
}

+ 101
- 0
sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowItem.java 查看文件

@@ -0,0 +1,101 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.slots.block.flow.param;

/**
* A flow control item for a specific parameter value.
*
* @author jialiang.linjl
* @author Eric Zhao
* @since 0.2.0
*/
public class ParamFlowItem {

private String object;
private Integer count;
private String classType;

public ParamFlowItem() {}

public ParamFlowItem(String object, Integer count, String classType) {
this.object = object;
this.count = count;
this.classType = classType;
}

public static <T> ParamFlowItem newItem(T object, Integer count) {
if (object == null) {
throw new IllegalArgumentException("Invalid object: null");
}
return new ParamFlowItem(object.toString(), count, object.getClass().getName());
}

public String getObject() {
return object;
}

public ParamFlowItem setObject(String object) {
this.object = object;
return this;
}

public Integer getCount() {
return count;
}

public ParamFlowItem setCount(Integer count) {
this.count = count;
return this;
}

public String getClassType() {
return classType;
}

public ParamFlowItem setClassType(String classType) {
this.classType = classType;
return this;
}

@Override
public boolean equals(Object o) {
if (this == o) { return true; }
if (o == null || getClass() != o.getClass()) { return false; }

ParamFlowItem item = (ParamFlowItem)o;

if (object != null ? !object.equals(item.object) : item.object != null) { return false; }
if (count != null ? !count.equals(item.count) : item.count != null) { return false; }
return classType != null ? classType.equals(item.classType) : item.classType == null;
}

@Override
public int hashCode() {
int result = object != null ? object.hashCode() : 0;
result = 31 * result + (count != null ? count.hashCode() : 0);
result = 31 * result + (classType != null ? classType.hashCode() : 0);
return result;
}

@Override
public String toString() {
return "ParamFlowItem{" +
"object=" + object +
", count=" + count +
", classType='" + classType + '\'' +
'}';
}
}

+ 156
- 0
sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowRule.java 查看文件

@@ -0,0 +1,156 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.slots.block.flow.param;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.alibaba.csp.sentinel.context.Context;
import com.alibaba.csp.sentinel.node.DefaultNode;
import com.alibaba.csp.sentinel.slots.block.AbstractRule;
import com.alibaba.csp.sentinel.slots.block.RuleConstant;

/**
* Rules for "hot-spot" frequent parameter flow control.
*
* @author jialiang.linjl
* @author Eric Zhao
* @since 0.2.0
*/
public class ParamFlowRule extends AbstractRule {

public ParamFlowRule() {}

public ParamFlowRule(String resourceName) {
setResource(resourceName);
}

/**
* The threshold type of flow control (1: QPS).
*/
private int blockGrade = RuleConstant.FLOW_GRADE_QPS;

/**
* Parameter index.
*/
private Integer paramIdx;

/**
* The threshold count.
*/
private double count;

/**
* Original exclusion items of parameters.
*/
private List<ParamFlowItem> paramFlowItemList = new ArrayList<ParamFlowItem>();

/**
* Parsed exclusion items of parameters. Only for internal use.
*/
private Map<Object, Integer> hotItems = new HashMap<Object, Integer>();

public int getBlockGrade() {
return blockGrade;
}

public ParamFlowRule setBlockGrade(int blockGrade) {
this.blockGrade = blockGrade;
return this;
}

public Integer getParamIdx() {
return paramIdx;
}

public ParamFlowRule setParamIdx(Integer paramIdx) {
this.paramIdx = paramIdx;
return this;
}

public double getCount() {
return count;
}

public ParamFlowRule setCount(double count) {
this.count = count;
return this;
}

public List<ParamFlowItem> getParamFlowItemList() {
return paramFlowItemList;
}

public ParamFlowRule setParamFlowItemList(List<ParamFlowItem> paramFlowItemList) {
this.paramFlowItemList = paramFlowItemList;
return this;
}

Map<Object, Integer> getParsedHotItems() {
return hotItems;
}

ParamFlowRule setParsedHotItems(Map<Object, Integer> hotItems) {
this.hotItems = hotItems;
return this;
}

@Override
@Deprecated
public boolean passCheck(Context context, DefaultNode node, int count, Object... args) {
return true;
}

@Override
public boolean equals(Object o) {
if (this == o) { return true; }
if (o == null || getClass() != o.getClass()) { return false; }
if (!super.equals(o)) { return false; }

ParamFlowRule rule = (ParamFlowRule)o;

if (blockGrade != rule.blockGrade) { return false; }
if (Double.compare(rule.count, count) != 0) { return false; }
if (paramIdx != null ? !paramIdx.equals(rule.paramIdx) : rule.paramIdx != null) { return false; }
return paramFlowItemList != null ? paramFlowItemList.equals(rule.paramFlowItemList) : rule.paramFlowItemList == null;
}

@Override
public int hashCode() {
int result = super.hashCode();
long temp;
result = 31 * result + blockGrade;
result = 31 * result + (paramIdx != null ? paramIdx.hashCode() : 0);
temp = Double.doubleToLongBits(count);
result = 31 * result + (int)(temp ^ (temp >>> 32));
result = 31 * result + (paramFlowItemList != null ? paramFlowItemList.hashCode() : 0);
return result;
}

@Override
public String toString() {
return "ParamFlowRule{" +
"resource=" + getResource() +
", limitApp=" + getLimitApp() +
", blockGrade=" + blockGrade +
", paramIdx=" + paramIdx +
", count=" + count +
", paramFlowItemList=" + paramFlowItemList +
'}';
}
}

+ 233
- 0
sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowRuleManager.java 查看文件

@@ -0,0 +1,233 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.slots.block.flow.param;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.property.DynamicSentinelProperty;
import com.alibaba.csp.sentinel.property.PropertyListener;
import com.alibaba.csp.sentinel.property.SentinelProperty;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.util.StringUtil;

/**
* Manager for frequent ("hot-spot") parameter flow rules.
*
* @author jialiang.linjl
* @author Eric Zhao
* @since 0.2.0
*/
public final class ParamFlowRuleManager {

private static final Map<String, List<ParamFlowRule>> paramFlowRules
= new ConcurrentHashMap<String, List<ParamFlowRule>>();

private final static RulePropertyListener PROPERTY_LISTENER = new RulePropertyListener();
private static SentinelProperty<List<ParamFlowRule>> currentProperty
= new DynamicSentinelProperty<List<ParamFlowRule>>();

static {
currentProperty.addListener(PROPERTY_LISTENER);
}

/**
* Load parameter flow rules. Former rules will be replaced.
*
* @param rules new rules to load.
*/
public static void loadRules(List<ParamFlowRule> rules) {
try {
currentProperty.updateValue(rules);
} catch (Throwable e) {
RecordLog.info("[ParamFlowRuleManager] Failed to load rules", e);
}
}

/**
* Listen to the {@link SentinelProperty} for {@link ParamFlowRule}s. The property is the source
* of {@link ParamFlowRule}s. Parameter flow rules can also be set by {@link #loadRules(List)} directly.
*
* @param property the property to listen
*/
public static void register2Property(SentinelProperty<List<ParamFlowRule>> property) {
synchronized (PROPERTY_LISTENER) {
currentProperty.removeListener(PROPERTY_LISTENER);
property.addListener(PROPERTY_LISTENER);
currentProperty = property;
RecordLog.info("[ParamFlowRuleManager] New property has been registered to hot param rule manager");
}
}

public static List<ParamFlowRule> getRulesOfResource(String resourceName) {
return paramFlowRules.get(resourceName);
}

public static boolean hasRules(String resourceName) {
List<ParamFlowRule> rules = paramFlowRules.get(resourceName);
return rules != null && !rules.isEmpty();
}

/**
* Get a copy of the rules.
*
* @return a new copy of the rules.
*/
public static List<ParamFlowRule> getRules() {
List<ParamFlowRule> rules = new ArrayList<ParamFlowRule>();
for (Map.Entry<String, List<ParamFlowRule>> entry : paramFlowRules.entrySet()) {
rules.addAll(entry.getValue());
}
return rules;
}

private static Object parseValue(String value, String classType) {
if (value == null) {
throw new IllegalArgumentException("Null value");
}
if (StringUtil.isBlank(classType)) {
// If the class type is not provided, then treat it as string.
return value;
}
// Handle primitive type.
if (int.class.toString().equals(classType) || Integer.class.getName().equals(classType)) {
return Integer.parseInt(value);
} else if (boolean.class.toString().equals(classType) || Boolean.class.getName().equals(classType)) {
return Boolean.parseBoolean(value);
} else if (long.class.toString().equals(classType) || Long.class.getName().equals(classType)) {
return Long.parseLong(value);
} else if (double.class.toString().equals(classType) || Double.class.getName().equals(classType)) {
return Double.parseDouble(value);
} else if (float.class.toString().equals(classType) || Float.class.getName().equals(classType)) {
return Float.parseFloat(value);
} else if (byte.class.toString().equals(classType) || Byte.class.getName().equals(classType)) {
return Byte.parseByte(value);
} else if (short.class.toString().equals(classType) || Short.class.getName().equals(classType)) {
return Short.parseShort(value);
} else if (char.class.toString().equals(classType)) {
char[] array = value.toCharArray();
return array.length > 0 ? array[0] : null;
}

return value;
}

static class RulePropertyListener implements PropertyListener<List<ParamFlowRule>> {

@Override
public void configUpdate(List<ParamFlowRule> list) {
Map<String, List<ParamFlowRule>> rules = aggregateHotParamRules(list);
if (rules != null) {
paramFlowRules.clear();
paramFlowRules.putAll(rules);
}
RecordLog.info("[ParamFlowRuleManager] Hot spot parameter flow rules received: " + paramFlowRules);
}

@Override
public void configLoad(List<ParamFlowRule> list) {
Map<String, List<ParamFlowRule>> rules = aggregateHotParamRules(list);
if (rules != null) {
paramFlowRules.clear();
paramFlowRules.putAll(rules);
}
RecordLog.info("[ParamFlowRuleManager] Hot spot parameter flow rules received: " + paramFlowRules);
}

private Map<String, List<ParamFlowRule>> aggregateHotParamRules(List<ParamFlowRule> list) {
Map<String, List<ParamFlowRule>> newRuleMap = new ConcurrentHashMap<String, List<ParamFlowRule>>();

if (list == null || list.isEmpty()) {
// No parameter flow rules, so clear all the metrics.
ParamFlowSlot.getMetricsMap().clear();
RecordLog.info("[ParamFlowRuleManager] No parameter flow rules, clearing all parameter metrics");
return newRuleMap;
}

for (ParamFlowRule rule : list) {
if (!isValidRule(rule)) {
RecordLog.warn("[ParamFlowRuleManager] Ignoring invalid rule when loading new rules: " + rule);
continue;
}

if (StringUtil.isBlank(rule.getLimitApp())) {
rule.setLimitApp(FlowRule.LIMIT_APP_DEFAULT);
}

if (rule.getParamFlowItemList() == null) {
rule.setParamFlowItemList(new ArrayList<ParamFlowItem>());
}

Map<Object, Integer> itemMap = parseHotItems(rule.getParamFlowItemList());
rule.setParsedHotItems(itemMap);

String resourceName = rule.getResource();
List<ParamFlowRule> ruleList = newRuleMap.get(resourceName);
if (ruleList == null) {
ruleList = new ArrayList<ParamFlowRule>();
newRuleMap.put(resourceName, ruleList);
}
ruleList.add(rule);
}

// Clear unused hot param metrics.
Set<String> previousResources = paramFlowRules.keySet();
for (String resource : previousResources) {
if (!newRuleMap.containsKey(resource)) {
ParamFlowSlot.clearHotParamMetricForName(resource);
}
}

return newRuleMap;
}
}

static Map<Object, Integer> parseHotItems(List<ParamFlowItem> items) {
Map<Object, Integer> itemMap = new HashMap<Object, Integer>();
if (items == null || items.isEmpty()) {
return itemMap;
}
for (ParamFlowItem item : items) {
// Value should not be null.
Object value;
try {
value = parseValue(item.getObject(), item.getClassType());
} catch (Exception ex) {
RecordLog.warn("[ParamFlowRuleManager] Failed to parse value for item: " + item, ex);
continue;
}
if (item.getCount() == null || item.getCount() < 0 || value == null) {
RecordLog.warn("[ParamFlowRuleManager] Ignoring invalid exclusion parameter item: " + item);
continue;
}
itemMap.put(value, item.getCount());
}
return itemMap;
}

static boolean isValidRule(ParamFlowRule rule) {
return rule != null && !StringUtil.isBlank(rule.getResource()) && rule.getCount() >= 0
&& rule.getParamIdx() != null && rule.getParamIdx() >= 0;
}

private ParamFlowRuleManager() {}
}


+ 158
- 0
sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowSlot.java 查看文件

@@ -0,0 +1,158 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.slots.block.flow.param;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import com.alibaba.csp.sentinel.EntryType;
import com.alibaba.csp.sentinel.context.Context;
import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.node.DefaultNode;
import com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
import com.alibaba.csp.sentinel.slotchain.StringResourceWrapper;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.alibaba.csp.sentinel.util.StringUtil;

/**
* A processor slot that is responsible for flow control by frequent ("hot spot") parameters.
*
* @author jialiang.linjl
* @author Eric Zhao
* @since 0.2.0
*/
public class ParamFlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

private static final Map<ResourceWrapper, ParameterMetric> metricsMap
= new ConcurrentHashMap<ResourceWrapper, ParameterMetric>();

/**
* Lock for a specific resource.
*/
private final Object LOCK = new Object();

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args)
throws Throwable {

if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
fireEntry(context, resourceWrapper, node, count, args);
return;
}

checkFlow(resourceWrapper, count, args);
fireEntry(context, resourceWrapper, node, count, args);
}

@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
fireExit(context, resourceWrapper, count, args);
}

void checkFlow(ResourceWrapper resourceWrapper, int count, Object... args)
throws BlockException {
if (ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
List<ParamFlowRule> rules = ParamFlowRuleManager.getRulesOfResource(resourceWrapper.getName());
if (rules == null) {
return;
}

for (ParamFlowRule rule : rules) {
// Initialize the parameter metrics.
initHotParamMetricsFor(resourceWrapper, rule.getParamIdx());

if (!ParamFlowChecker.passCheck(resourceWrapper, rule, count, args)) {

// Here we add the block count.
addBlockCount(resourceWrapper, count, args);

String message = "";
if (args.length > rule.getParamIdx()) {
Object value = args[rule.getParamIdx()];
message = String.valueOf(value);
}
throw new ParamFlowException(resourceWrapper.getName(), message);
}
}
}
}

private void addBlockCount(ResourceWrapper resourceWrapper, int count, Object... args) {
ParameterMetric parameterMetric = ParamFlowSlot.getParamMetric(resourceWrapper);

if (parameterMetric != null) {
parameterMetric.addBlock(count, args);
}
}

/**
* Init the parameter metric and index map for given resource.
* Package-private for test.
*
* @param resourceWrapper resource to init
* @param index index to initialize, which must be valid
*/
void initHotParamMetricsFor(ResourceWrapper resourceWrapper, /*@Valid*/ int index) {
ParameterMetric metric;
// Assume that the resource is valid.
if ((metric = metricsMap.get(resourceWrapper)) == null) {
synchronized (LOCK) {
if ((metric = metricsMap.get(resourceWrapper)) == null) {
metric = new ParameterMetric();
metricsMap.put(resourceWrapper, metric);
RecordLog.info("[ParamFlowSlot] Creating parameter metric for: " + resourceWrapper.getName());
}
}
}
metric.initializeForIndex(index);
}

public static ParameterMetric getParamMetric(ResourceWrapper resourceWrapper) {
if (resourceWrapper == null || resourceWrapper.getName() == null) {
return null;
}
return metricsMap.get(resourceWrapper);
}

public static ParameterMetric getHotParamMetricForName(String resourceName) {
if (StringUtil.isBlank(resourceName)) {
return null;
}
for (EntryType nodeType : EntryType.values()) {
ParameterMetric metric = metricsMap.get(new StringResourceWrapper(resourceName, nodeType));
if (metric != null) {
return metric;
}
}
return null;
}

static void clearHotParamMetricForName(String resourceName) {
if (StringUtil.isBlank(resourceName)) {
return;
}
for (EntryType nodeType : EntryType.values()) {
metricsMap.remove(new StringResourceWrapper(resourceName, nodeType));
}
RecordLog.info("[ParamFlowSlot] Clearing parameter metric for: " + resourceName);
}

public static Map<ResourceWrapper, ParameterMetric> getMetricsMap() {
return metricsMap;
}
}

+ 147
- 0
sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParameterMetric.java 查看文件

@@ -0,0 +1,147 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.slots.block.flow.param;

import java.lang.reflect.Array;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.node.IntervalProperty;
import com.alibaba.csp.sentinel.node.SampleCountProperty;
import com.alibaba.csp.sentinel.slots.statistic.metric.HotParameterLeapArray;

/**
* Metrics for frequent ("hot spot") parameters.
*
* @author Eric Zhao
* @since 0.2.0
*/
public class ParameterMetric {

private Map<Integer, HotParameterLeapArray> rollingParameters =
new ConcurrentHashMap<Integer, HotParameterLeapArray>();

public Map<Integer, HotParameterLeapArray> getRollingParameters() {
return rollingParameters;
}

public synchronized void clear() {
rollingParameters.clear();
}

public void initializeForIndex(int index) {
if (!rollingParameters.containsKey(index)) {
synchronized (this) {
// putIfAbsent
if (rollingParameters.get(index) == null) {
rollingParameters.put(index, new HotParameterLeapArray(
1000 / 2, IntervalProperty.INTERVAL));
}
}
}
}

public void addPass(int count, Object... args) {
add(RollingParamEvent.REQUEST_PASSED, count, args);
}

public void addBlock(int count, Object... args) {
add(RollingParamEvent.REQUEST_BLOCKED, count, args);
}

@SuppressWarnings("rawtypes")
private void add(RollingParamEvent event, int count, Object... args) {
if (args == null) {
return;
}
try {
for (int index = 0; index < args.length; index++) {
HotParameterLeapArray param = rollingParameters.get(index);
if (param == null) {
continue;
}

Object arg = args[index];
if (arg == null) {
continue;
}
if (Collection.class.isAssignableFrom(arg.getClass())) {
for (Object value : ((Collection)arg)) {
param.addValue(event, count, value);
}
} else if (arg.getClass().isArray()) {
int length = Array.getLength(arg);
for (int i = 0; i < length; i++) {
Object value = Array.get(arg, i);
param.addValue(event, count, value);
}
} else {
param.addValue(event, count, arg);
}

}
} catch (Throwable e) {
RecordLog.warn("[ParameterMetric] Param exception", e);
}
}

public double getPassParamQps(int index, Object value) {
try {
HotParameterLeapArray parameter = rollingParameters.get(index);
if (parameter == null || value == null) {
return -1;
}
return parameter.getRollingAvg(RollingParamEvent.REQUEST_PASSED, value);
} catch (Throwable e) {
RecordLog.info(e.getMessage(), e);
}

return -1;
}

public long getBlockParamQps(int index, Object value) {
try {
HotParameterLeapArray parameter = rollingParameters.get(index);
if (parameter == null || value == null) {
return -1;
}

return (long)rollingParameters.get(index).getRollingAvg(RollingParamEvent.REQUEST_BLOCKED, value);
} catch (Throwable e) {
RecordLog.info(e.getMessage(), e);
}

return -1;
}

public Map<Object, Double> getTopPassParamCount(int index, int number) {
try {
HotParameterLeapArray parameter = rollingParameters.get(index);
if (parameter == null) {
return new HashMap<Object, Double>();
}

return parameter.getTopValues(RollingParamEvent.REQUEST_PASSED, number);
} catch (Throwable e) {
RecordLog.info(e.getMessage(), e);
}

return new HashMap<Object, Double>();
}
}

+ 31
- 0
sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/RollingParamEvent.java 查看文件

@@ -0,0 +1,31 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.slots.block.flow.param;

/**
* @author Eric Zhao
* @since 0.2.0
*/
public enum RollingParamEvent {
/**
* Indicates that the request successfully passed the slot chain (entry).
*/
REQUEST_PASSED,
/**
* Indicates that the request is blocked by a specific slot.
*/
REQUEST_BLOCKED
}

+ 49
- 0
sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/statistic/ParamFlowStatisticEntryCallback.java 查看文件

@@ -0,0 +1,49 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.slots.statistic;

import com.alibaba.csp.sentinel.context.Context;
import com.alibaba.csp.sentinel.node.DefaultNode;
import com.alibaba.csp.sentinel.slotchain.ProcessorSlotEntryCallback;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowSlot;
import com.alibaba.csp.sentinel.slots.block.flow.param.ParameterMetric;

/**
* @author Eric Zhao
* @since 0.2.0
*/
public class ParamFlowStatisticEntryCallback implements ProcessorSlotEntryCallback<DefaultNode> {

@Override
public void onPass(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args)
throws Exception {
// The "hot spot" parameter metric is present only if parameter flow rules for the resource exist.
ParameterMetric parameterMetric = ParamFlowSlot.getParamMetric(resourceWrapper);

if (parameterMetric != null) {
parameterMetric.addPass(count, args);
}
}

@Override
public void onBlocked(BlockException ex, Context context, ResourceWrapper resourceWrapper, DefaultNode param,
int count, Object... args) {
// Here we don't add block count here because checking the type of block exception can affect performance.
// We add the block count when throwing the ParamFlowException instead.
}
}

+ 45
- 0
sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/statistic/cache/CacheMap.java 查看文件

@@ -0,0 +1,45 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.slots.statistic.cache;

import java.util.Set;

/**
* A common cache map interface.
*
* @param <K> type of the key
* @param <V> type of the value
* @author Eric Zhao
* @since 0.2.0
*/
public interface CacheMap<K, V> {

boolean containsKey(K key);

V get(K key);

V remove(K key);

V put(K key, V value);

V putIfAbsent(K key, V value);

long size();

void clear();

Set<K> ascendingKeySet();
}

+ 92
- 0
sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/statistic/cache/ConcurrentLinkedHashMapWrapper.java 查看文件

@@ -0,0 +1,92 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.slots.statistic.cache;

import java.util.Set;

import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
import com.googlecode.concurrentlinkedhashmap.Weighers;

/**
* A {@link ConcurrentLinkedHashMap} wrapper for the universal {@link CacheMap}.
*
* @author Eric Zhao
* @since 0.2.0
*/
public class ConcurrentLinkedHashMapWrapper<T, R> implements CacheMap<T, R> {

private static final int DEFAULT_CONCURRENCY_LEVEL = 16;

private final ConcurrentLinkedHashMap<T, R> map;

public ConcurrentLinkedHashMapWrapper(long size) {
if (size <= 0) {
throw new IllegalArgumentException("Cache max capacity should be positive: " + size);
}
this.map = new ConcurrentLinkedHashMap.Builder<T, R>()
.concurrencyLevel(DEFAULT_CONCURRENCY_LEVEL)
.maximumWeightedCapacity(size)
.weigher(Weighers.singleton())
.build();
}

public ConcurrentLinkedHashMapWrapper(ConcurrentLinkedHashMap<T, R> map) {
if (map == null) {
throw new IllegalArgumentException("Invalid map instance");
}
this.map = map;
}

@Override
public boolean containsKey(T key) {
return map.containsKey(key);
}

@Override
public R get(T key) {
return map.get(key);
}

@Override
public R remove(T key) {
return map.remove(key);
}

@Override
public R put(T key, R value) {
return map.put(key, value);
}

@Override
public R putIfAbsent(T key, R value) {
return map.putIfAbsent(key, value);
}

@Override
public long size() {
return map.weightedSize();
}

@Override
public void clear() {
map.clear();
}

@Override
public Set<T> ascendingKeySet() {
return map.ascendingKeySet();
}
}

+ 67
- 0
sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/statistic/data/ParamMapBucket.java 查看文件

@@ -0,0 +1,67 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.slots.statistic.data;

import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

import com.alibaba.csp.sentinel.slots.block.flow.param.RollingParamEvent;
import com.alibaba.csp.sentinel.slots.statistic.cache.CacheMap;
import com.alibaba.csp.sentinel.slots.statistic.cache.ConcurrentLinkedHashMapWrapper;

/**
* Represents metric bucket of frequent parameters in a period of time window.
*
* @author Eric Zhao
* @since 0.2.0
*/
public class ParamMapBucket {

private final CacheMap<Object, AtomicInteger>[] data;

@SuppressWarnings("unchecked")
public ParamMapBucket() {
RollingParamEvent[] events = RollingParamEvent.values();
this.data = new CacheMap[events.length];
for (RollingParamEvent event : events) {
data[event.ordinal()] = new ConcurrentLinkedHashMapWrapper<Object, AtomicInteger>(DEFAULT_MAX_CAPACITY);
}
}

public void reset() {
for (RollingParamEvent event : RollingParamEvent.values()) {
data[event.ordinal()].clear();
}
}

public int get(RollingParamEvent event, Object value) {
AtomicInteger counter = data[event.ordinal()].get(value);
return counter == null ? 0 : counter.intValue();
}

public ParamMapBucket add(RollingParamEvent event, int count, Object value) {
data[event.ordinal()].putIfAbsent(value, new AtomicInteger());
AtomicInteger counter = data[event.ordinal()].get(value);
counter.addAndGet(count);
return this;
}

public Set<Object> ascendingKeySet(RollingParamEvent type) {
return data[type.ordinal()].ascendingKeySet();
}

public static final int DEFAULT_MAX_CAPACITY = 200;
}

+ 127
- 0
sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/statistic/metric/HotParameterLeapArray.java 查看文件

@@ -0,0 +1,127 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.slots.statistic.metric;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import com.alibaba.csp.sentinel.slots.block.flow.param.RollingParamEvent;
import com.alibaba.csp.sentinel.slots.statistic.base.LeapArray;
import com.alibaba.csp.sentinel.slots.statistic.base.WindowWrap;
import com.alibaba.csp.sentinel.slots.statistic.data.ParamMapBucket;

/**
* The fundamental data structure for frequent parameters statistics in a time window.
*
* @author Eric Zhao
* @since 0.2.0
*/
public class HotParameterLeapArray extends LeapArray<ParamMapBucket> {

private int intervalInSec;

public HotParameterLeapArray(int windowLengthInMs, int intervalInSec) {
super(windowLengthInMs, intervalInSec);
this.intervalInSec = intervalInSec;
}

public int getIntervalInSec() {
return intervalInSec;
}

@Override
public ParamMapBucket newEmptyBucket() {
return new ParamMapBucket();
}

@Override
protected WindowWrap<ParamMapBucket> resetWindowTo(WindowWrap<ParamMapBucket> w, long startTime) {
w.resetTo(startTime);
w.value().reset();
return w;
}

public void addValue(RollingParamEvent event, int count, Object value) {
currentWindow().value().add(event, count, value);
}

public Map<Object, Double> getTopValues(RollingParamEvent event, int number) {
currentWindow();
List<ParamMapBucket> buckets = this.values();

Map<Object, Integer> result = new HashMap<Object, Integer>();

for (ParamMapBucket b : buckets) {
Set<Object> subSet = b.ascendingKeySet(event);
for (Object o : subSet) {
Integer count = result.get(o);
if (count == null) {
count = b.get(event, o);
} else {
count += b.get(event, o);
}
result.put(o, count);
}
}

// After merge, get the top set one.
Set<Entry<Object, Integer>> set = result.entrySet();
List<Entry<Object, Integer>> list = new ArrayList<Entry<Object, Integer>>(set);
Collections.sort(list, new Comparator<Entry<Object, Integer>>() {
@Override
public int compare(Entry<Object, Integer> a,
Entry<Object, Integer> b) {
return (b.getValue() == null ? 0 : b.getValue()) - (a.getValue() == null ? 0 : a.getValue());
}
});

Map<Object, Double> doubleResult = new HashMap<Object, Double>();

int size = list.size() > number ? number : list.size();
for (int i = 0; i < size; i++) {
Map.Entry<Object, Integer> x = list.get(i);
if (x.getValue() == 0) {
break;
}
doubleResult.put(x.getKey(), ((double)x.getValue()) / getIntervalInSec());
}

return doubleResult;
}

public long getRollingSum(RollingParamEvent event, Object value) {
currentWindow();

long sum = 0;

List<ParamMapBucket> buckets = this.values();
for (ParamMapBucket b : buckets) {
sum += b.get(event, value);
}

return sum;
}

public double getRollingAvg(RollingParamEvent event, Object value) {
return ((double) getRollingSum(event, value)) / getIntervalInSec();
}
}

+ 3
- 0
sentinel-extension/sentinel-parameter-flow-control/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.command.CommandHandler 查看文件

@@ -0,0 +1,3 @@
com.alibaba.csp.sentinel.command.handler.GetParamFlowRulesCommandHandler
com.alibaba.csp.sentinel.command.handler.ModifyParamFlowRulesCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchTopParamsCommandHandler

+ 1
- 0
sentinel-extension/sentinel-parameter-flow-control/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.init.InitFunc 查看文件

@@ -0,0 +1 @@
com.alibaba.csp.sentinel.init.ParamFlowStatisticSlotCallbackInit

+ 1
- 0
sentinel-extension/sentinel-parameter-flow-control/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.slotchain.SlotChainBuilder 查看文件

@@ -0,0 +1 @@
com.alibaba.csp.sentinel.slots.HotParamSlotChainBuilder

+ 193
- 0
sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowCheckerTest.java 查看文件

@@ -0,0 +1,193 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.slots.block.flow.param;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.alibaba.csp.sentinel.EntryType;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
import com.alibaba.csp.sentinel.slotchain.StringResourceWrapper;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

/**
* Test cases for {@link ParamFlowChecker}.
*
* @author Eric Zhao
*/
public class ParamFlowCheckerTest {

@Test
public void testHotParamCheckerPassCheckExceedArgs() {
final String resourceName = "testHotParamCheckerPassCheckExceedArgs";
final ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN);
int paramIdx = 1;

ParamFlowRule rule = new ParamFlowRule();
rule.setResource(resourceName);
rule.setCount(10);
rule.setParamIdx(paramIdx);

assertTrue("The rule will pass if the paramIdx exceeds provided args",
ParamFlowChecker.passCheck(resourceWrapper, rule, 1, "abc"));
}

@Test
public void testSingleValueCheckQpsWithoutExceptionItems() {
final String resourceName = "testSingleValueCheckQpsWithoutExceptionItems";
final ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN);
int paramIdx = 0;

long threshold = 5L;

ParamFlowRule rule = new ParamFlowRule();
rule.setResource(resourceName);
rule.setCount(threshold);
rule.setParamIdx(paramIdx);

String valueA = "valueA";
String valueB = "valueB";
ParameterMetric metric = mock(ParameterMetric.class);
when(metric.getPassParamQps(paramIdx, valueA)).thenReturn((double)threshold - 1);
when(metric.getPassParamQps(paramIdx, valueB)).thenReturn((double)threshold + 1);
ParamFlowSlot.getMetricsMap().put(resourceWrapper, metric);

assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueB));
}

@Test
public void testSingleValueCheckQpsWithExceptionItems() {
final String resourceName = "testSingleValueCheckQpsWithExceptionItems";
final ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN);
int paramIdx = 0;

long globalThreshold = 5L;
int thresholdB = 3;
int thresholdD = 7;

ParamFlowRule rule = new ParamFlowRule();
rule.setResource(resourceName);
rule.setCount(globalThreshold);
rule.setParamIdx(paramIdx);

String valueA = "valueA";
String valueB = "valueB";
String valueC = "valueC";
String valueD = "valueD";

// Directly set parsed map for test.
Map<Object, Integer> map = new HashMap<Object, Integer>();
map.put(valueB, thresholdB);
map.put(valueD, thresholdD);
rule.setParsedHotItems(map);

ParameterMetric metric = mock(ParameterMetric.class);
when(metric.getPassParamQps(paramIdx, valueA)).thenReturn((double)globalThreshold - 1);
when(metric.getPassParamQps(paramIdx, valueB)).thenReturn((double)globalThreshold - 1);
when(metric.getPassParamQps(paramIdx, valueC)).thenReturn((double)globalThreshold - 1);
when(metric.getPassParamQps(paramIdx, valueD)).thenReturn((double)globalThreshold + 1);
ParamFlowSlot.getMetricsMap().put(resourceWrapper, metric);

assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueB));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueC));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueD));

when(metric.getPassParamQps(paramIdx, valueA)).thenReturn((double)globalThreshold);
when(metric.getPassParamQps(paramIdx, valueB)).thenReturn((double)thresholdB - 1L);
when(metric.getPassParamQps(paramIdx, valueC)).thenReturn((double)globalThreshold + 1);
when(metric.getPassParamQps(paramIdx, valueD)).thenReturn((double)globalThreshold - 1)
.thenReturn((double)thresholdD);

assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueB));
assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueC));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueD));
assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueD));
}

@Test
public void testPassLocalCheckForCollection() {
final String resourceName = "testPassLocalCheckForCollection";
final ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN);
int paramIdx = 0;
double globalThreshold = 10;

ParamFlowRule rule = new ParamFlowRule(resourceName)
.setParamIdx(paramIdx)
.setCount(globalThreshold);

String v1 = "a", v2 = "B", v3 = "Cc";
List<String> list = Arrays.asList(v1, v2, v3);
ParameterMetric metric = mock(ParameterMetric.class);
when(metric.getPassParamQps(paramIdx, v1)).thenReturn(globalThreshold - 2)
.thenReturn(globalThreshold - 1);
when(metric.getPassParamQps(paramIdx, v2)).thenReturn(globalThreshold - 2)
.thenReturn(globalThreshold - 1);
when(metric.getPassParamQps(paramIdx, v3)).thenReturn(globalThreshold - 1)
.thenReturn(globalThreshold);
ParamFlowSlot.getMetricsMap().put(resourceWrapper, metric);

assertTrue(ParamFlowChecker.passCheck(resourceWrapper, rule, 1, list));
assertFalse(ParamFlowChecker.passCheck(resourceWrapper, rule, 1, list));
}

@Test
public void testPassLocalCheckForArray() {
final String resourceName = "testPassLocalCheckForArray";
final ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN);
int paramIdx = 0;
double globalThreshold = 10;

ParamFlowRule rule = new ParamFlowRule(resourceName)
.setParamIdx(paramIdx)
.setCount(globalThreshold);

String v1 = "a", v2 = "B", v3 = "Cc";
Object arr = new String[] {v1, v2, v3};
ParameterMetric metric = mock(ParameterMetric.class);
when(metric.getPassParamQps(paramIdx, v1)).thenReturn(globalThreshold - 2)
.thenReturn(globalThreshold - 1);
when(metric.getPassParamQps(paramIdx, v2)).thenReturn(globalThreshold - 2)
.thenReturn(globalThreshold - 1);
when(metric.getPassParamQps(paramIdx, v3)).thenReturn(globalThreshold - 1)
.thenReturn(globalThreshold);
ParamFlowSlot.getMetricsMap().put(resourceWrapper, metric);

assertTrue(ParamFlowChecker.passCheck(resourceWrapper, rule, 1, arr));
assertFalse(ParamFlowChecker.passCheck(resourceWrapper, rule, 1, arr));
}

@Before
public void setUp() throws Exception {
ParamFlowSlot.getMetricsMap().clear();
}

@After
public void tearDown() throws Exception {
ParamFlowSlot.getMetricsMap().clear();
}
}

+ 192
- 0
sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowRuleManagerTest.java 查看文件

@@ -0,0 +1,192 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.slots.block.flow.param;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import com.alibaba.csp.sentinel.EntryType;
import com.alibaba.csp.sentinel.slotchain.StringResourceWrapper;
import com.alibaba.csp.sentinel.slots.block.RuleConstant;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import static org.junit.Assert.*;

/**
* Test cases for {@link ParamFlowRuleManager}.
*
* @author Eric Zhao
* @since 0.2.0
*/
public class ParamFlowRuleManagerTest {

@Before
public void setUp() {
ParamFlowRuleManager.loadRules(null);
}

@After
public void tearDown() {
ParamFlowRuleManager.loadRules(null);
}

@Test
public void testLoadHotParamRulesClearingUnusedMetrics() {
final String resA = "resA";
ParamFlowRule ruleA = new ParamFlowRule(resA)
.setCount(1)
.setParamIdx(0);
ParamFlowRuleManager.loadRules(Collections.singletonList(ruleA));
ParamFlowSlot.getMetricsMap().put(new StringResourceWrapper(resA, EntryType.IN), new ParameterMetric());
assertNotNull(ParamFlowSlot.getHotParamMetricForName(resA));

final String resB = "resB";
ParamFlowRule ruleB = new ParamFlowRule(resB)
.setCount(2)
.setParamIdx(1);
ParamFlowRuleManager.loadRules(Collections.singletonList(ruleB));
assertNull("The unused hot param metric should be cleared", ParamFlowSlot.getHotParamMetricForName(resA));
}

@Test
public void testLoadHotParamRulesAndGet() {
final String resA = "abc";
final String resB = "foo";
final String resC = "baz";
// Rule A to C is for resource A.
// Rule A is invalid.
ParamFlowRule ruleA = new ParamFlowRule(resA).setCount(10);
ParamFlowRule ruleB = new ParamFlowRule(resA)
.setCount(28)
.setParamIdx(1);
ParamFlowRule ruleC = new ParamFlowRule(resA)
.setCount(8)
.setParamIdx(1)
.setBlockGrade(RuleConstant.FLOW_GRADE_QPS);
// Rule D is for resource B.
ParamFlowRule ruleD = new ParamFlowRule(resB)
.setCount(9)
.setParamIdx(0)
.setParamFlowItemList(Arrays.asList(ParamFlowItem.newItem(7L, 6), ParamFlowItem.newItem(9L, 4)));
ParamFlowRuleManager.loadRules(Arrays.asList(ruleA, ruleB, ruleC, ruleD));

// Test for ParamFlowRuleManager#hasRules
assertTrue(ParamFlowRuleManager.hasRules(resA));
assertTrue(ParamFlowRuleManager.hasRules(resB));
assertFalse(ParamFlowRuleManager.hasRules(resC));
// Test for ParamFlowRuleManager#getRulesOfResource
List<ParamFlowRule> rulesForResA = ParamFlowRuleManager.getRulesOfResource(resA);
assertEquals(2, rulesForResA.size());
assertFalse(rulesForResA.contains(ruleA));
assertTrue(rulesForResA.contains(ruleB));
assertTrue(rulesForResA.contains(ruleC));
List<ParamFlowRule> rulesForResB = ParamFlowRuleManager.getRulesOfResource(resB);
assertEquals(1, rulesForResB.size());
assertEquals(ruleD, rulesForResB.get(0));
// Test for ParamFlowRuleManager#getRules
List<ParamFlowRule> allRules = ParamFlowRuleManager.getRules();
assertFalse(allRules.contains(ruleA));
assertTrue(allRules.contains(ruleB));
assertTrue(allRules.contains(ruleC));
assertTrue(allRules.contains(ruleD));
}

@Test
public void testParseHotParamExceptionItemsFailure() {
String valueB = "Sentinel";
Integer valueC = 6;
char valueD = 6;
float valueE = 11.11f;
// Null object will not be parsed.
ParamFlowItem itemA = new ParamFlowItem(null, 1, double.class.getName());
// Hot item with empty class type will be treated as string.
ParamFlowItem itemB = new ParamFlowItem(valueB, 3, null);
ParamFlowItem itemE = new ParamFlowItem(String.valueOf(valueE), 3, "");
// Bad count will not be parsed.
ParamFlowItem itemC = ParamFlowItem.newItem(valueC, -5);
ParamFlowItem itemD = new ParamFlowItem(String.valueOf(valueD), null, char.class.getName());

List<ParamFlowItem> badItems = Arrays.asList(itemA, itemB, itemC, itemD, itemE);
Map<Object, Integer> parsedItems = ParamFlowRuleManager.parseHotItems(badItems);

// Value B and E will be parsed, but ignoring the type.
assertEquals(2, parsedItems.size());
assertEquals(itemB.getCount(), parsedItems.get(valueB));
assertFalse(parsedItems.containsKey(valueE));
assertEquals(itemE.getCount(), parsedItems.get(String.valueOf(valueE)));
}

@Test
public void testParseHotParamExceptionItemsSuccess() {
// Test for empty list.
assertEquals(0, ParamFlowRuleManager.parseHotItems(null).size());
assertEquals(0, ParamFlowRuleManager.parseHotItems(new ArrayList<ParamFlowItem>()).size());

// Test for boxing objects and primitive types.
Double valueA = 1.1d;
String valueB = "Sentinel";
Integer valueC = 6;
char valueD = 'c';
ParamFlowItem itemA = ParamFlowItem.newItem(valueA, 1);
ParamFlowItem itemB = ParamFlowItem.newItem(valueB, 3);
ParamFlowItem itemC = ParamFlowItem.newItem(valueC, 5);
ParamFlowItem itemD = new ParamFlowItem().setObject(String.valueOf(valueD))
.setClassType(char.class.getName())
.setCount(7);
List<ParamFlowItem> items = Arrays.asList(itemA, itemB, itemC, itemD);
Map<Object, Integer> parsedItems = ParamFlowRuleManager.parseHotItems(items);
assertEquals(itemA.getCount(), parsedItems.get(valueA));
assertEquals(itemB.getCount(), parsedItems.get(valueB));
assertEquals(itemC.getCount(), parsedItems.get(valueC));
assertEquals(itemD.getCount(), parsedItems.get(valueD));
}

@Test
public void testCheckValidHotParamRule() {
// Null or empty resource;
ParamFlowRule rule1 = new ParamFlowRule();
ParamFlowRule rule2 = new ParamFlowRule("");
assertFalse(ParamFlowRuleManager.isValidRule(null));
assertFalse(ParamFlowRuleManager.isValidRule(rule1));
assertFalse(ParamFlowRuleManager.isValidRule(rule2));

// Invalid threshold count.
ParamFlowRule rule3 = new ParamFlowRule("abc")
.setCount(-1)
.setParamIdx(1);
assertFalse(ParamFlowRuleManager.isValidRule(rule3));

// Parameter index not set or invalid.
ParamFlowRule rule4 = new ParamFlowRule("abc")
.setCount(1);
ParamFlowRule rule5 = new ParamFlowRule("abc")
.setCount(1)
.setParamIdx(-1);
assertFalse(ParamFlowRuleManager.isValidRule(rule4));
assertFalse(ParamFlowRuleManager.isValidRule(rule5));

ParamFlowRule goodRule = new ParamFlowRule("abc")
.setCount(10)
.setParamIdx(1);
assertTrue(ParamFlowRuleManager.isValidRule(goodRule));
}
}

+ 118
- 0
sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowSlotTest.java 查看文件

@@ -0,0 +1,118 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.slots.block.flow.param;

import java.util.Collections;

import com.alibaba.csp.sentinel.EntryType;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
import com.alibaba.csp.sentinel.slotchain.StringResourceWrapper;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

/**
* Test cases for {@link ParamFlowSlot}.
*
* @author Eric Zhao
* @since 0.2.0
*/
public class ParamFlowSlotTest {

private final ParamFlowSlot paramFlowSlot = new ParamFlowSlot();

@Test
public void testEntryWhenParamFlowRuleNotExists() throws Throwable {
String resourceName = "testEntryWhenParamFlowRuleNotExists";
ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN);
paramFlowSlot.entry(null, resourceWrapper, null, 1, "abc");
// The parameter metric instance will not be created.
assertNull(ParamFlowSlot.getParamMetric(resourceWrapper));
}

@Test
public void testEntryWhenParamFlowExists() throws Throwable {
String resourceName = "testEntryWhenParamFlowExists";
ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN);
long argToGo = 1L;
double count = 10;
ParamFlowRule rule = new ParamFlowRule(resourceName)
.setCount(count)
.setParamIdx(0);
ParamFlowRuleManager.loadRules(Collections.singletonList(rule));

ParameterMetric metric = mock(ParameterMetric.class);
// First pass, then blocked.
when(metric.getPassParamQps(rule.getParamIdx(), argToGo))
.thenReturn(count - 1)
.thenReturn(count);
// Insert the mock metric to control pass or block.
ParamFlowSlot.getMetricsMap().put(resourceWrapper, metric);

// The first entry will pass.
paramFlowSlot.entry(null, resourceWrapper, null, 1, argToGo);
// The second entry will be blocked.
try {
paramFlowSlot.entry(null, resourceWrapper, null, 1, argToGo);
} catch (ParamFlowException ex) {
assertEquals(String.valueOf(argToGo), ex.getMessage());
assertEquals(resourceName, ex.getResourceName());
return;
}
fail("The second entry should be blocked");
}

@Test
public void testGetNullParamMetric() {
assertNull(ParamFlowSlot.getParamMetric(null));
}

@Test
public void testInitParamMetrics() {
int index = 1;
String resourceName = "res-" + System.currentTimeMillis();
ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN);

assertNull(ParamFlowSlot.getParamMetric(resourceWrapper));

paramFlowSlot.initHotParamMetricsFor(resourceWrapper, index);
ParameterMetric metric = ParamFlowSlot.getParamMetric(resourceWrapper);
assertNotNull(metric);
assertNotNull(metric.getRollingParameters().get(index));

// Duplicate init.
paramFlowSlot.initHotParamMetricsFor(resourceWrapper, index);
assertSame(metric, ParamFlowSlot.getParamMetric(resourceWrapper));
}

@Before
public void setUp() throws Exception {
ParamFlowRuleManager.loadRules(null);
ParamFlowSlot.getMetricsMap().clear();
}

@After
public void tearDown() throws Exception {
// Clean the metrics map.
ParamFlowSlot.getMetricsMap().clear();
ParamFlowRuleManager.loadRules(null);
}
}

+ 75
- 0
sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParameterMetricTest.java 查看文件

@@ -0,0 +1,75 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.slots.block.flow.param;

import java.util.HashMap;
import java.util.Map;

import com.alibaba.csp.sentinel.slots.statistic.metric.HotParameterLeapArray;

import org.junit.Test;

import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

/**
* Test cases for {@link ParameterMetric}.
*
* @author Eric Zhao
* @since 0.2.0
*/
public class ParameterMetricTest {

@Test
public void testGetTopParamCount() {
ParameterMetric metric = new ParameterMetric();
int index = 1;
int n = 10;
RollingParamEvent event = RollingParamEvent.REQUEST_PASSED;
HotParameterLeapArray leapArray = mock(HotParameterLeapArray.class);
Map<Object, Double> topValues = new HashMap<Object, Double>() {{
put("a", 3d);
put("b", 7d);
}};
when(leapArray.getTopValues(event, n)).thenReturn(topValues);

// Get when not initialized.
assertEquals(0, metric.getTopPassParamCount(index, n).size());

metric.getRollingParameters().put(index, leapArray);
assertEquals(topValues, metric.getTopPassParamCount(index, n));
}

@Test
public void testInitAndClearHotParameterMetric() {
ParameterMetric metric = new ParameterMetric();
int index = 1;
metric.initializeForIndex(index);
HotParameterLeapArray leapArray = metric.getRollingParameters().get(index);
assertNotNull(leapArray);

metric.initializeForIndex(index);
assertSame(leapArray, metric.getRollingParameters().get(index));

metric.clear();
assertEquals(0, metric.getRollingParameters().size());
}

private static final int PARAM_TYPE_NORMAL = 0;
private static final int PARAM_TYPE_ARRAY = 1;
private static final int PARAM_TYPE_COLLECTION = 2;
}

+ 77
- 0
sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/statistic/data/ParamMapBucketTest.java 查看文件

@@ -0,0 +1,77 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.slots.statistic.data;

import com.alibaba.csp.sentinel.slots.block.flow.param.RollingParamEvent;

import org.junit.Test;

import static org.junit.Assert.*;

/**
* Test cases for {@link ParamMapBucket}.
*
* @author Eric Zhao
* @since 0.2.0
*/
public class ParamMapBucketTest {

@Test
public void testAddEviction() {
ParamMapBucket bucket = new ParamMapBucket();
for (int i = 0; i < ParamMapBucket.DEFAULT_MAX_CAPACITY; i++) {
bucket.add(RollingParamEvent.REQUEST_PASSED, 1, "param-" + i);
}
String lastParam = "param-end";
bucket.add(RollingParamEvent.REQUEST_PASSED, 1, lastParam);
assertEquals(0, bucket.get(RollingParamEvent.REQUEST_PASSED, "param-0"));
assertEquals(1, bucket.get(RollingParamEvent.REQUEST_PASSED, "param-1"));
assertEquals(1, bucket.get(RollingParamEvent.REQUEST_PASSED, lastParam));
}

@Test
public void testAddGetResetCommon() {
ParamMapBucket bucket = new ParamMapBucket();
double paramA = 1.1d;
double paramB = 2.2d;
double paramC = -19.7d;
// Block: A 5 | B 1 | C 6
// Pass: A 0 | B 1 | C 7
bucket.add(RollingParamEvent.REQUEST_BLOCKED, 3, paramA);
bucket.add(RollingParamEvent.REQUEST_PASSED, 1, paramB);
bucket.add(RollingParamEvent.REQUEST_BLOCKED, 1, paramB);
bucket.add(RollingParamEvent.REQUEST_BLOCKED, 2, paramA);
bucket.add(RollingParamEvent.REQUEST_PASSED, 6, paramC);
bucket.add(RollingParamEvent.REQUEST_BLOCKED, 4, paramC);
bucket.add(RollingParamEvent.REQUEST_PASSED, 1, paramC);
bucket.add(RollingParamEvent.REQUEST_BLOCKED, 2, paramC);

assertEquals(5, bucket.get(RollingParamEvent.REQUEST_BLOCKED, paramA));
assertEquals(1, bucket.get(RollingParamEvent.REQUEST_BLOCKED, paramB));
assertEquals(6, bucket.get(RollingParamEvent.REQUEST_BLOCKED, paramC));
assertEquals(0, bucket.get(RollingParamEvent.REQUEST_PASSED, paramA));
assertEquals(1, bucket.get(RollingParamEvent.REQUEST_PASSED, paramB));
assertEquals(7, bucket.get(RollingParamEvent.REQUEST_PASSED, paramC));

bucket.reset();
assertEquals(0, bucket.get(RollingParamEvent.REQUEST_BLOCKED, paramA));
assertEquals(0, bucket.get(RollingParamEvent.REQUEST_BLOCKED, paramB));
assertEquals(0, bucket.get(RollingParamEvent.REQUEST_BLOCKED, paramC));
assertEquals(0, bucket.get(RollingParamEvent.REQUEST_PASSED, paramA));
assertEquals(0, bucket.get(RollingParamEvent.REQUEST_PASSED, paramB));
assertEquals(0, bucket.get(RollingParamEvent.REQUEST_PASSED, paramC));
}
}

+ 132
- 0
sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/statistic/metric/HotParameterLeapArrayTest.java 查看文件

@@ -0,0 +1,132 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.slots.statistic.metric;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import com.alibaba.csp.sentinel.slots.block.flow.param.RollingParamEvent;
import com.alibaba.csp.sentinel.slots.statistic.base.WindowWrap;
import com.alibaba.csp.sentinel.slots.statistic.data.ParamMapBucket;

import org.junit.Test;

import static org.junit.Assert.*;
import static org.mockito.Mockito.*;

/**
* Test cases for {@link HotParameterLeapArray}.
*
* @author Eric Zhao
* @since 0.2.0
*/
public class HotParameterLeapArrayTest {

@Test
public void testAddValueToBucket() {
HotParameterLeapArray leapArray = mock(HotParameterLeapArray.class);
String paramA = "paramA";
int initialCountA = 3;
RollingParamEvent passEvent = RollingParamEvent.REQUEST_PASSED;
final ParamMapBucket bucket = new ParamMapBucket();
bucket.add(passEvent, initialCountA, paramA);

doCallRealMethod().when(leapArray).addValue(any(RollingParamEvent.class), anyInt(), any(Object.class));
when(leapArray.currentWindow()).thenReturn(new WindowWrap<ParamMapBucket>(0, 0, bucket));
assertEquals(initialCountA, leapArray.currentWindow().value().get(passEvent, paramA));

int delta = 2;
leapArray.addValue(passEvent, delta, paramA);
assertEquals(initialCountA + delta, leapArray.currentWindow().value().get(passEvent, paramA));
}

@Test
public void testGetTopValues() {
int intervalInSec = 2;
int a1 = 3, a2 = 5;
String paramPrefix = "param-";
HotParameterLeapArray leapArray = mock(HotParameterLeapArray.class);
when(leapArray.getIntervalInSec()).thenReturn(intervalInSec);

final ParamMapBucket b1 = generateBucket(a1, paramPrefix);
final ParamMapBucket b2 = generateBucket(a2, paramPrefix);
List<ParamMapBucket> buckets = new ArrayList<ParamMapBucket>() {{
add(b1);
add(b2);
}};
when(leapArray.values()).thenReturn(buckets);
when(leapArray.getTopValues(any(RollingParamEvent.class), any(int.class))).thenCallRealMethod();

Map<Object, Double> top2Values = leapArray.getTopValues(RollingParamEvent.REQUEST_PASSED, a1 - 1);
// Top 2 should be 5 and 3
assertEquals((double)5 * 10 / intervalInSec, top2Values.get(paramPrefix + 5), 0.01);
assertEquals((double)3 * 20 / intervalInSec, top2Values.get(paramPrefix + 3), 0.01);

Map<Object, Double> top4Values = leapArray.getTopValues(RollingParamEvent.REQUEST_PASSED, a2 - 1);
assertEquals(a2 - 1, top4Values.size());
assertFalse(top4Values.containsKey(paramPrefix + 1));

Map<Object, Double> topMoreValues = leapArray.getTopValues(RollingParamEvent.REQUEST_PASSED, a2 + 1);
assertEquals("This should contain all parameters but no more than " + a2, a2, topMoreValues.size());
}

private ParamMapBucket generateBucket(int amount, String prefix) {
if (amount <= 0) {
throw new IllegalArgumentException("Bad amount");
}
ParamMapBucket bucket = new ParamMapBucket();
for (int i = 1; i <= amount; i++) {
bucket.add(RollingParamEvent.REQUEST_PASSED, i * 10, prefix + i);
bucket.add(RollingParamEvent.REQUEST_BLOCKED, i, prefix + i);
}
return bucket;
}

@Test
public void testGetRollingSum() {
HotParameterLeapArray leapArray = mock(HotParameterLeapArray.class);
String v1 = "a", v2 = "B", v3 = "Cc";
int p1a = 19, p1b = 3;
int p2a = 6, p2c = 17;
RollingParamEvent passEvent = RollingParamEvent.REQUEST_PASSED;
final ParamMapBucket b1 = new ParamMapBucket()
.add(passEvent, p1a, v1)
.add(passEvent, p1b, v2);
final ParamMapBucket b2 = new ParamMapBucket()
.add(passEvent, p2a, v1)
.add(passEvent, p2c, v3);
List<ParamMapBucket> buckets = new ArrayList<ParamMapBucket>() {{ add(b1); add(b2); }};
when(leapArray.values()).thenReturn(buckets);
when(leapArray.getRollingSum(any(RollingParamEvent.class), any(Object.class))).thenCallRealMethod();

assertEquals(p1a + p2a, leapArray.getRollingSum(passEvent, v1));
assertEquals(p1b, leapArray.getRollingSum(passEvent, v2));
assertEquals(p2c, leapArray.getRollingSum(passEvent, v3));
}

@Test
public void testGetRollingAvg() {
HotParameterLeapArray leapArray = mock(HotParameterLeapArray.class);
when(leapArray.getRollingSum(any(RollingParamEvent.class), any(Object.class))).thenReturn(15L);
when(leapArray.getIntervalInSec()).thenReturn(1)
.thenReturn(2);
when(leapArray.getRollingAvg(any(RollingParamEvent.class), any(Object.class))).thenCallRealMethod();

assertEquals(15.0d, leapArray.getRollingAvg(RollingParamEvent.REQUEST_PASSED, "abc"), 0.001);
assertEquals(15.0d / 2, leapArray.getRollingAvg(RollingParamEvent.REQUEST_PASSED, "abc"), 0.001);
}
}

Loading…
取消
儲存