Signed-off-by: Eric Zhao <sczyh16@gmail.com>master
@@ -52,7 +52,7 @@ The description for fields of `ParamFlowRule`: | |||
| :----: | :----| :----| | |||
| resource| resource name (**required**) || | |||
| count | flow control threshold (**required**) || | |||
| grade | flow control mode (only QPS mode is supported) | QPS mode | | |||
| grade | metric type (QPS or thread count) | QPS mode | | |||
| paramIdx | the index of provided parameter in `SphU.entry(xxx, args)` (**required**) || | |||
| paramFlowItemList | the exception items of parameter; you can set threshold to a specific parameter value || | |||
@@ -16,6 +16,7 @@ | |||
package com.alibaba.csp.sentinel.init; | |||
import com.alibaba.csp.sentinel.slots.statistic.ParamFlowStatisticEntryCallback; | |||
import com.alibaba.csp.sentinel.slots.statistic.ParamFlowStatisticExitCallback; | |||
import com.alibaba.csp.sentinel.slots.statistic.StatisticSlotCallbackRegistry; | |||
/** | |||
@@ -31,5 +32,7 @@ public class ParamFlowStatisticSlotCallbackInit implements InitFunc { | |||
public void init() { | |||
StatisticSlotCallbackRegistry.addEntryCallback(ParamFlowStatisticEntryCallback.class.getName(), | |||
new ParamFlowStatisticEntryCallback()); | |||
StatisticSlotCallbackRegistry.addExitCallback(ParamFlowStatisticExitCallback.class.getName(), | |||
new ParamFlowStatisticExitCallback()); | |||
} | |||
} |
@@ -57,7 +57,7 @@ final class ParamFlowChecker { | |||
return true; | |||
} | |||
if (rule.isClusterMode()) { | |||
if (rule.isClusterMode() && rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) { | |||
return passClusterCheck(resourceWrapper, rule, count, value); | |||
} | |||
@@ -106,6 +106,14 @@ final class ParamFlowChecker { | |||
} | |||
return false; | |||
} | |||
} else if (rule.getGrade() == RuleConstant.FLOW_GRADE_THREAD) { | |||
long threadCount = getHotParameters(resourceWrapper).getThreadCount(rule.getParamIdx(), value); | |||
if (exclusionItems.contains(value)) { | |||
int itemThreshold = rule.getParsedHotItems().get(value); | |||
return ++threadCount <= itemThreshold; | |||
} | |||
long threshold = (long) rule.getCount(); | |||
return ++threadCount <= threshold; | |||
} | |||
return true; | |||
@@ -41,7 +41,7 @@ public class ParamFlowRule extends AbstractRule { | |||
} | |||
/** | |||
* The threshold type of flow control (1: QPS). | |||
* The threshold type of flow control (0: thread count, 1: QPS). | |||
*/ | |||
private int grade = RuleConstant.FLOW_GRADE_QPS; | |||
@@ -31,7 +31,7 @@ public final class ParamFlowRuleUtil { | |||
public static boolean isValidRule(ParamFlowRule rule) { | |||
return rule != null && !StringUtil.isBlank(rule.getResource()) && rule.getCount() >= 0 | |||
&& rule.getParamIdx() != null && rule.getParamIdx() >= 0 && checkCluster(rule); | |||
&& rule.getGrade() >= 0 && rule.getParamIdx() != null && rule.getParamIdx() >= 0 && checkCluster(rule); | |||
} | |||
private static boolean checkCluster(/*@PreChecked*/ ParamFlowRule rule) { | |||
@@ -20,10 +20,13 @@ import java.util.Collection; | |||
import java.util.HashMap; | |||
import java.util.Map; | |||
import java.util.concurrent.ConcurrentHashMap; | |||
import java.util.concurrent.atomic.AtomicInteger; | |||
import com.alibaba.csp.sentinel.log.RecordLog; | |||
import com.alibaba.csp.sentinel.node.IntervalProperty; | |||
import com.alibaba.csp.sentinel.node.SampleCountProperty; | |||
import com.alibaba.csp.sentinel.slots.statistic.cache.CacheMap; | |||
import com.alibaba.csp.sentinel.slots.statistic.cache.ConcurrentLinkedHashMapWrapper; | |||
import com.alibaba.csp.sentinel.slots.statistic.metric.HotParameterLeapArray; | |||
import com.alibaba.csp.sentinel.util.AssertUtil; | |||
@@ -52,13 +55,20 @@ public class ParameterMetric { | |||
private Map<Integer, HotParameterLeapArray> rollingParameters = | |||
new ConcurrentHashMap<Integer, HotParameterLeapArray>(); | |||
private Map<Integer, CacheMap<Object, AtomicInteger>> threadCountMap = | |||
new ConcurrentHashMap<Integer, CacheMap<Object, AtomicInteger>>(); | |||
public Map<Integer, HotParameterLeapArray> getRollingParameters() { | |||
return rollingParameters; | |||
} | |||
public Map<Integer, CacheMap<Object, AtomicInteger>> getThreadCountMap() { | |||
return threadCountMap; | |||
} | |||
public synchronized void clear() { | |||
rollingParameters.clear(); | |||
threadCountMap.clear(); | |||
} | |||
public void initializeForIndex(int index) { | |||
@@ -68,7 +78,129 @@ public class ParameterMetric { | |||
if (rollingParameters.get(index) == null) { | |||
rollingParameters.put(index, new HotParameterLeapArray(sampleCount, intervalMs)); | |||
} | |||
if (threadCountMap.get(index) == null) { | |||
threadCountMap.put(index, | |||
new ConcurrentLinkedHashMapWrapper<Object, AtomicInteger>(THREAD_COUNT_MAX_CAPACITY)); | |||
} | |||
} | |||
} | |||
} | |||
@SuppressWarnings("rawtypes") | |||
public void decreaseThreadCount(Object... args) { | |||
if (args == null) { | |||
return; | |||
} | |||
try { | |||
for (int index = 0; index < args.length; index++) { | |||
CacheMap<Object, AtomicInteger> threadCount = threadCountMap.get(index); | |||
if (threadCount == null) { | |||
continue; | |||
} | |||
Object arg = args[index]; | |||
if (arg == null) { | |||
continue; | |||
} | |||
if (Collection.class.isAssignableFrom(arg.getClass())) { | |||
for (Object value : ((Collection)arg)) { | |||
AtomicInteger oldValue = threadCount.putIfAbsent(value, new AtomicInteger()); | |||
if (oldValue != null) { | |||
int currentValue = oldValue.decrementAndGet(); | |||
if (currentValue <= 0) { | |||
threadCount.remove(value); | |||
} | |||
} | |||
} | |||
} else if (arg.getClass().isArray()) { | |||
int length = Array.getLength(arg); | |||
for (int i = 0; i < length; i++) { | |||
Object value = Array.get(arg, i); | |||
AtomicInteger oldValue = threadCount.putIfAbsent(value, new AtomicInteger()); | |||
if (oldValue != null) { | |||
int currentValue = oldValue.decrementAndGet(); | |||
if (currentValue <= 0) { | |||
threadCount.remove(value); | |||
} | |||
} | |||
} | |||
} else { | |||
AtomicInteger oldValue = threadCount.putIfAbsent(arg, new AtomicInteger()); | |||
if (oldValue != null) { | |||
int currentValue = oldValue.decrementAndGet(); | |||
if (currentValue <= 0) { | |||
threadCount.remove(arg); | |||
} | |||
} | |||
} | |||
} | |||
} catch (Throwable e) { | |||
RecordLog.warn("[ParameterMetric] Param exception", e); | |||
} | |||
} | |||
@SuppressWarnings("rawtypes") | |||
public void addThreadCount(Object... args) { | |||
if (args == null) { | |||
return; | |||
} | |||
try { | |||
for (int index = 0; index < args.length; index++) { | |||
CacheMap<Object, AtomicInteger> threadCount = threadCountMap.get(index); | |||
if (threadCount == null) { | |||
continue; | |||
} | |||
Object arg = args[index]; | |||
if (arg == null) { | |||
continue; | |||
} | |||
if (Collection.class.isAssignableFrom(arg.getClass())) { | |||
for (Object value : ((Collection)arg)) { | |||
AtomicInteger oldValue = threadCount.putIfAbsent(value, new AtomicInteger()); | |||
if (oldValue != null) { | |||
oldValue.incrementAndGet(); | |||
} else { | |||
threadCount.put(value, new AtomicInteger(1)); | |||
} | |||
} | |||
} else if (arg.getClass().isArray()) { | |||
int length = Array.getLength(arg); | |||
for (int i = 0; i < length; i++) { | |||
Object value = Array.get(arg, i); | |||
AtomicInteger oldValue = threadCount.putIfAbsent(value, new AtomicInteger()); | |||
if (oldValue != null) { | |||
oldValue.incrementAndGet(); | |||
} else { | |||
threadCount.put(value, new AtomicInteger(1)); | |||
} | |||
} | |||
} else { | |||
AtomicInteger oldValue = threadCount.putIfAbsent(arg, new AtomicInteger()); | |||
if (oldValue != null) { | |||
oldValue.incrementAndGet(); | |||
} else { | |||
threadCount.put(arg, new AtomicInteger(1)); | |||
} | |||
} | |||
} | |||
} catch (Throwable e) { | |||
RecordLog.warn("[ParameterMetric] Param exception", e); | |||
} | |||
} | |||
@@ -159,4 +291,15 @@ public class ParameterMetric { | |||
return new HashMap<Object, Double>(); | |||
} | |||
public long getThreadCount(int index, Object value) { | |||
if (threadCountMap.get(index) == null) { | |||
return 0; | |||
} | |||
AtomicInteger count = threadCountMap.get(index).get(value); | |||
return count == null ? 0L : count.get(); | |||
} | |||
private static final long THREAD_COUNT_MAX_CAPACITY = 4000; | |||
} |
@@ -37,6 +37,7 @@ public class ParamFlowStatisticEntryCallback implements ProcessorSlotEntryCallba | |||
if (parameterMetric != null) { | |||
parameterMetric.addPass(count, args); | |||
parameterMetric.addThreadCount(args); | |||
} | |||
} | |||
@@ -0,0 +1,40 @@ | |||
/* | |||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||
* | |||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||
* you may not use this file except in compliance with the License. | |||
* You may obtain a copy of the License at | |||
* | |||
* http://www.apache.org/licenses/LICENSE-2.0 | |||
* | |||
* Unless required by applicable law or agreed to in writing, software | |||
* distributed under the License is distributed on an "AS IS" BASIS, | |||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
* See the License for the specific language governing permissions and | |||
* limitations under the License. | |||
*/ | |||
package com.alibaba.csp.sentinel.slots.statistic; | |||
import com.alibaba.csp.sentinel.context.Context; | |||
import com.alibaba.csp.sentinel.slotchain.ProcessorSlotExitCallback; | |||
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; | |||
import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowSlot; | |||
import com.alibaba.csp.sentinel.slots.block.flow.param.ParameterMetric; | |||
/** | |||
* @author Eric Zhao | |||
* @since 0.2.0 | |||
*/ | |||
public class ParamFlowStatisticExitCallback implements ProcessorSlotExitCallback { | |||
@Override | |||
public void onExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { | |||
if (context.getCurEntry().getError() == null) { | |||
ParameterMetric parameterMetric = ParamFlowSlot.getParamMetric(resourceWrapper); | |||
if (parameterMetric != null) { | |||
parameterMetric.decreaseThreadCount(args); | |||
} | |||
} | |||
} | |||
} |
@@ -23,6 +23,7 @@ import java.util.Map; | |||
import com.alibaba.csp.sentinel.EntryType; | |||
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; | |||
import com.alibaba.csp.sentinel.slotchain.StringResourceWrapper; | |||
import com.alibaba.csp.sentinel.slots.block.RuleConstant; | |||
import org.junit.After; | |||
import org.junit.Before; | |||
@@ -129,6 +130,57 @@ public class ParamFlowCheckerTest { | |||
assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueD)); | |||
} | |||
@Test | |||
public void testSingleValueCheckThreadCountWithExceptionItems() { | |||
final String resourceName = "testSingleValueCheckThreadCountWithExceptionItems"; | |||
final ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN); | |||
int paramIdx = 0; | |||
long globalThreshold = 5L; | |||
int thresholdB = 3; | |||
int thresholdD = 7; | |||
ParamFlowRule rule = new ParamFlowRule(resourceName) | |||
.setCount(globalThreshold) | |||
.setParamIdx(paramIdx) | |||
.setGrade(RuleConstant.FLOW_GRADE_THREAD); | |||
String valueA = "valueA"; | |||
String valueB = "valueB"; | |||
String valueC = "valueC"; | |||
String valueD = "valueD"; | |||
// Directly set parsed map for test. | |||
Map<Object, Integer> map = new HashMap<Object, Integer>(); | |||
map.put(valueB, thresholdB); | |||
map.put(valueD, thresholdD); | |||
rule.setParsedHotItems(map); | |||
ParameterMetric metric = mock(ParameterMetric.class); | |||
when(metric.getThreadCount(paramIdx, valueA)).thenReturn(globalThreshold - 1); | |||
when(metric.getThreadCount(paramIdx, valueB)).thenReturn(globalThreshold - 1); | |||
when(metric.getThreadCount(paramIdx, valueC)).thenReturn(globalThreshold - 1); | |||
when(metric.getThreadCount(paramIdx, valueD)).thenReturn(globalThreshold + 1); | |||
ParamFlowSlot.getMetricsMap().put(resourceWrapper, metric); | |||
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); | |||
assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueB)); | |||
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueC)); | |||
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueD)); | |||
when(metric.getThreadCount(paramIdx, valueA)).thenReturn(globalThreshold); | |||
when(metric.getThreadCount(paramIdx, valueB)).thenReturn(thresholdB - 1L); | |||
when(metric.getThreadCount(paramIdx, valueC)).thenReturn(globalThreshold + 1); | |||
when(metric.getThreadCount(paramIdx, valueD)).thenReturn(globalThreshold - 1) | |||
.thenReturn((long)thresholdD); | |||
assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA)); | |||
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueB)); | |||
assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueC)); | |||
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueD)); | |||
assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueD)); | |||
} | |||
@Test | |||
public void testPassLocalCheckForCollection() { | |||
final String resourceName = "testPassLocalCheckForCollection"; | |||
@@ -79,7 +79,7 @@ public class ParamFlowRuleManagerTest { | |||
ParamFlowRule ruleC = new ParamFlowRule(resA) | |||
.setCount(8) | |||
.setParamIdx(1) | |||
.setGrade(RuleConstant.FLOW_GRADE_QPS); | |||
.setGrade(RuleConstant.FLOW_GRADE_THREAD); | |||
// Rule D is for resource B. | |||
ParamFlowRule ruleD = new ParamFlowRule(resB) | |||
.setCount(9) | |||
@@ -97,6 +97,7 @@ public class ParamFlowSlotTest { | |||
ParameterMetric metric = ParamFlowSlot.getParamMetric(resourceWrapper); | |||
assertNotNull(metric); | |||
assertNotNull(metric.getRollingParameters().get(index)); | |||
assertNotNull(metric.getThreadCountMap().get(index)); | |||
// Duplicate init. | |||
paramFlowSlot.initHotParamMetricsFor(resourceWrapper, index); | |||
@@ -15,9 +15,12 @@ | |||
*/ | |||
package com.alibaba.csp.sentinel.slots.block.flow.param; | |||
import java.util.Arrays; | |||
import java.util.HashMap; | |||
import java.util.Map; | |||
import java.util.concurrent.atomic.AtomicInteger; | |||
import com.alibaba.csp.sentinel.slots.statistic.cache.CacheMap; | |||
import com.alibaba.csp.sentinel.slots.statistic.metric.HotParameterLeapArray; | |||
import org.junit.Test; | |||
@@ -60,13 +63,119 @@ public class ParameterMetricTest { | |||
int index = 1; | |||
metric.initializeForIndex(index); | |||
HotParameterLeapArray leapArray = metric.getRollingParameters().get(index); | |||
CacheMap cacheMap = metric.getThreadCountMap().get(index); | |||
assertNotNull(leapArray); | |||
assertNotNull(cacheMap); | |||
metric.initializeForIndex(index); | |||
assertSame(leapArray, metric.getRollingParameters().get(index)); | |||
assertSame(cacheMap, metric.getThreadCountMap().get(index)); | |||
metric.clear(); | |||
assertEquals(0, metric.getRollingParameters().size()); | |||
assertEquals(0, metric.getThreadCountMap().size()); | |||
} | |||
@Test | |||
public void testAddAndDecreaseThreadCountCommon() { | |||
testAddAndDecreaseThreadCount(PARAM_TYPE_NORMAL); | |||
testAddAndDecreaseThreadCount(PARAM_TYPE_ARRAY); | |||
testAddAndDecreaseThreadCount(PARAM_TYPE_COLLECTION); | |||
} | |||
private void testAddAndDecreaseThreadCount(int paramType) { | |||
int paramIdx = 0; | |||
int n = 3; | |||
long[] v = new long[] {19L, 3L, 8L}; | |||
ParameterMetric metric = new ParameterMetric(); | |||
metric.initializeForIndex(paramIdx); | |||
assertTrue(metric.getThreadCountMap().containsKey(paramIdx)); | |||
switch (paramType) { | |||
case PARAM_TYPE_ARRAY: | |||
metric.addThreadCount((Object)v); | |||
break; | |||
case PARAM_TYPE_COLLECTION: | |||
metric.addThreadCount(Arrays.asList(v[0], v[1], v[2])); | |||
break; | |||
case PARAM_TYPE_NORMAL: | |||
default: | |||
metric.addThreadCount(v[0]); | |||
metric.addThreadCount(v[1]); | |||
metric.addThreadCount(v[2]); | |||
break; | |||
} | |||
assertEquals(1, metric.getThreadCountMap().size()); | |||
CacheMap<Object, AtomicInteger> threadCountMap = metric.getThreadCountMap().get(paramIdx); | |||
assertEquals(v.length, threadCountMap.size()); | |||
for (long vs : v) { | |||
assertEquals(1, threadCountMap.get(vs).get()); | |||
} | |||
for (int i = 1 ; i < n; i++) { | |||
switch (paramType) { | |||
case PARAM_TYPE_ARRAY: | |||
metric.addThreadCount((Object)v); | |||
break; | |||
case PARAM_TYPE_COLLECTION: | |||
metric.addThreadCount(Arrays.asList(v[0], v[1], v[2])); | |||
break; | |||
case PARAM_TYPE_NORMAL: | |||
default: | |||
metric.addThreadCount(v[0]); | |||
metric.addThreadCount(v[1]); | |||
metric.addThreadCount(v[2]); | |||
break; | |||
} | |||
} | |||
assertEquals(1, metric.getThreadCountMap().size()); | |||
threadCountMap = metric.getThreadCountMap().get(paramIdx); | |||
assertEquals(v.length, threadCountMap.size()); | |||
for (long vs : v) { | |||
assertEquals(n, threadCountMap.get(vs).get()); | |||
} | |||
for (int i = 1 ; i < n; i++) { | |||
switch (paramType) { | |||
case PARAM_TYPE_ARRAY: | |||
metric.decreaseThreadCount((Object)v); | |||
break; | |||
case PARAM_TYPE_COLLECTION: | |||
metric.decreaseThreadCount(Arrays.asList(v[0], v[1], v[2])); | |||
break; | |||
case PARAM_TYPE_NORMAL: | |||
default: | |||
metric.decreaseThreadCount(v[0]); | |||
metric.decreaseThreadCount(v[1]); | |||
metric.decreaseThreadCount(v[2]); | |||
break; | |||
} | |||
} | |||
assertEquals(1, metric.getThreadCountMap().size()); | |||
threadCountMap = metric.getThreadCountMap().get(paramIdx); | |||
assertEquals(v.length, threadCountMap.size()); | |||
for (long vs : v) { | |||
assertEquals(1, threadCountMap.get(vs).get()); | |||
} | |||
switch (paramType) { | |||
case PARAM_TYPE_ARRAY: | |||
metric.decreaseThreadCount((Object)v); | |||
break; | |||
case PARAM_TYPE_COLLECTION: | |||
metric.decreaseThreadCount(Arrays.asList(v[0], v[1], v[2])); | |||
break; | |||
case PARAM_TYPE_NORMAL: | |||
default: | |||
metric.decreaseThreadCount(v[0]); | |||
metric.decreaseThreadCount(v[1]); | |||
metric.decreaseThreadCount(v[2]); | |||
break; | |||
} | |||
assertEquals(1, metric.getThreadCountMap().size()); | |||
threadCountMap = metric.getThreadCountMap().get(paramIdx); | |||
assertEquals(0, threadCountMap.size()); | |||
} | |||
private static final int PARAM_TYPE_NORMAL = 0; | |||