浏览代码

Fix the bug that numeric overflow might occur when refilling tokens in ParamFlowChecker (#838)

- use AtomicLong to replace AtomicInteger

Signed-off-by: Eric Zhao <sczyh16@gmail.com>
master
Eric Zhao GitHub 5 年前
父节点
当前提交
d59beaec66
找不到此签名对应的密钥 GPG 密钥 ID: 4AEE18F83AFDEB23
共有 5 个文件被更改,包括 61 次插入21 次删除
  1. +10
    -10
      sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowChecker.java
  2. +4
    -4
      sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParameterMetric.java
  3. +1
    -1
      sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowCheckerTest.java
  4. +44
    -4
      sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowDefaultCheckerTest.java
  5. +2
    -2
      sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowSlotTest.java

+ 10
- 10
sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowChecker.java 查看文件

@@ -121,7 +121,7 @@ public final class ParamFlowChecker {
static boolean passDefaultLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,
Object value) {
ParameterMetric metric = getParameterMetric(resourceWrapper);
CacheMap<Object, AtomicInteger> tokenCounters = metric == null ? null : metric.getRuleTokenCounter(rule);
CacheMap<Object, AtomicLong> tokenCounters = metric == null ? null : metric.getRuleTokenCounter(rule);
CacheMap<Object, AtomicLong> timeCounters = metric == null ? null : metric.getRuleTimeCounter(rule);

if (tokenCounters == null || timeCounters == null) {
@@ -130,7 +130,7 @@ public final class ParamFlowChecker {

// Calculate max token count (threshold)
Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
int tokenCount = (int)rule.getCount();
long tokenCount = (long)rule.getCount();
if (exclusionItems.contains(value)) {
tokenCount = rule.getParsedHotItems().get(value);
}
@@ -139,7 +139,7 @@ public final class ParamFlowChecker {
return false;
}

int maxCount = tokenCount + rule.getBurstCount();
long maxCount = tokenCount + rule.getBurstCount();
if (acquireCount > maxCount) {
return false;
}
@@ -150,7 +150,7 @@ public final class ParamFlowChecker {
AtomicLong lastAddTokenTime = timeCounters.putIfAbsent(value, new AtomicLong(currentTime));
if (lastAddTokenTime == null) {
// Token never added, just replenish the tokens and consume {@code acquireCount} immediately.
tokenCounters.putIfAbsent(value, new AtomicInteger(maxCount - acquireCount));
tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
return true;
}

@@ -158,15 +158,15 @@ public final class ParamFlowChecker {
long passTime = currentTime - lastAddTokenTime.get();
// A simplified token bucket algorithm that will replenish the tokens only when statistic window has passed.
if (passTime > rule.getDurationInSec() * 1000) {
AtomicInteger oldQps = tokenCounters.putIfAbsent(value, new AtomicInteger(maxCount - acquireCount));
AtomicLong oldQps = tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
if (oldQps == null) {
// Might not be accurate here.
lastAddTokenTime.set(currentTime);
return true;
} else {
int restQps = oldQps.get();
int toAddCount = (int)((passTime * tokenCount) / (rule.getDurationInSec() * 1000));
int newQps = (restQps + toAddCount) > maxCount ? (maxCount - acquireCount)
long restQps = oldQps.get();
long toAddCount = (passTime * tokenCount) / (rule.getDurationInSec() * 1000);
long newQps = toAddCount + restQps > maxCount ? (maxCount - acquireCount)
: (restQps + toAddCount - acquireCount);

if (newQps < 0) {
@@ -179,9 +179,9 @@ public final class ParamFlowChecker {
Thread.yield();
}
} else {
AtomicInteger oldQps = tokenCounters.get(value);
AtomicLong oldQps = tokenCounters.get(value);
if (oldQps != null) {
int oldQpsValue = oldQps.get();
long oldQpsValue = oldQps.get();
if (oldQpsValue - acquireCount >= 0) {
if (oldQps.compareAndSet(oldQpsValue, oldQpsValue - acquireCount)) {
return true;


+ 4
- 4
sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParameterMetric.java 查看文件

@@ -51,7 +51,7 @@ public class ParameterMetric {
*
* @since 1.6.0
*/
private final Map<ParamFlowRule, CacheMap<Object, AtomicInteger>> ruleTokenCounter = new HashMap<>();
private final Map<ParamFlowRule, CacheMap<Object, AtomicLong>> ruleTokenCounter = new HashMap<>();
private final Map<Integer, CacheMap<Object, AtomicInteger>> threadCountMap = new HashMap<>();

/**
@@ -61,7 +61,7 @@ public class ParameterMetric {
* @return the associated token counter
* @since 1.6.0
*/
public CacheMap<Object, AtomicInteger> getRuleTokenCounter(ParamFlowRule rule) {
public CacheMap<Object, AtomicLong> getRuleTokenCounter(ParamFlowRule rule) {
return ruleTokenCounter.get(rule);
}

@@ -98,7 +98,7 @@ public class ParameterMetric {
synchronized (lock) {
if (ruleTokenCounter.get(rule) == null) {
long size = Math.min(BASE_PARAM_MAX_CAPACITY * rule.getDurationInSec(), TOTAL_MAX_CAPACITY);
ruleTokenCounter.put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicInteger>(size));
ruleTokenCounter.put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(size));
}
}
}
@@ -245,7 +245,7 @@ public class ParameterMetric {
*
* @return the token counter map
*/
Map<ParamFlowRule, CacheMap<Object, AtomicInteger>> getRuleTokenCounterMap() {
Map<ParamFlowRule, CacheMap<Object, AtomicLong>> getRuleTokenCounterMap() {
return ruleTokenCounter;
}



+ 1
- 1
sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowCheckerTest.java 查看文件

@@ -160,7 +160,7 @@ public class ParamFlowCheckerTest {
ParameterMetric metric = new ParameterMetric();
ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric);
metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTokenCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicInteger>(4000));
metric.getRuleTokenCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));

assertTrue(ParamFlowChecker.passCheck(resourceWrapper, rule, 1, list));
assertFalse(ParamFlowChecker.passCheck(resourceWrapper, rule, 1, list));


+ 44
- 4
sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowDefaultCheckerTest.java 查看文件

@@ -38,9 +38,49 @@ import com.alibaba.csp.sentinel.util.TimeUtil;
/**
* @author jialiang.linjl
* @author Eric Zhao
*/
public class ParamFlowDefaultCheckerTest extends AbstractTimeBasedTest {
@Test
public void testCheckQpsWithLongIntervalAndHighThreshold() {
// This test case is intended to avoid number overflow.
final String resourceName = "testCheckQpsWithLongIntervalAndHighThreshold";
final ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN);
int paramIdx = 0;
// Set a large threshold.
long threshold = 25000L;
ParamFlowRule rule = new ParamFlowRule(resourceName)
.setCount(threshold)
.setParamIdx(paramIdx);
String valueA = "valueA";
ParameterMetric metric = new ParameterMetric();
ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric);
metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTokenCounterMap().put(rule,
new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
// We mock the time directly to avoid unstable behaviour.
setCurrentMillis(System.currentTimeMillis());
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
// 24 hours passed.
// This can make `toAddCount` larger that Integer.MAX_VALUE.
sleep(1000 * 60 * 60 * 24);
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
// 48 hours passed.
sleep(1000 * 60 * 60 * 48);
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
}
@Test
public void testParamFlowDefaultCheckSingleQps() {
final String resourceName = "testParamFlowDefaultCheckSingleQps";
@@ -59,7 +99,7 @@ public class ParamFlowDefaultCheckerTest extends AbstractTimeBasedTest {
ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric);
metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTokenCounterMap().put(rule,
new ConcurrentLinkedHashMapWrapper<Object, AtomicInteger>(4000));
new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
// We mock the time directly to avoid unstable behaviour.
setCurrentMillis(System.currentTimeMillis());
@@ -99,7 +139,7 @@ public class ParamFlowDefaultCheckerTest extends AbstractTimeBasedTest {
ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric);
metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTokenCounterMap().put(rule,
new ConcurrentLinkedHashMapWrapper<Object, AtomicInteger>(4000));
new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
// We mock the time directly to avoid unstable behaviour.
setCurrentMillis(System.currentTimeMillis());
@@ -169,7 +209,7 @@ public class ParamFlowDefaultCheckerTest extends AbstractTimeBasedTest {
ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric);
metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTokenCounterMap().put(rule,
new ConcurrentLinkedHashMapWrapper<Object, AtomicInteger>(4000));
new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
// We mock the time directly to avoid unstable behaviour.
setCurrentMillis(System.currentTimeMillis());
@@ -222,7 +262,7 @@ public class ParamFlowDefaultCheckerTest extends AbstractTimeBasedTest {
ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric);
metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTokenCounterMap().put(rule,
new ConcurrentLinkedHashMapWrapper<Object, AtomicInteger>(4000));
new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
int threadCount = 40;
final CountDownLatch waitLatch = new CountDownLatch(threadCount);


+ 2
- 2
sentinel-extension/sentinel-parameter-flow-control/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowSlotTest.java 查看文件

@@ -99,8 +99,8 @@ public class ParamFlowSlotTest {

ParameterMetric metric = mock(ParameterMetric.class);

CacheMap<Object, AtomicLong> map = new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000);
CacheMap<Object, AtomicInteger> map2 = new ConcurrentLinkedHashMapWrapper<Object, AtomicInteger>(4000);
CacheMap<Object, AtomicLong> map = new ConcurrentLinkedHashMapWrapper<>(4000);
CacheMap<Object, AtomicLong> map2 = new ConcurrentLinkedHashMapWrapper<>(4000);
when(metric.getRuleTimeCounter(rule)).thenReturn(map);
when(metric.getRuleTokenCounter(rule)).thenReturn(map2);
map.put(argToGo, new AtomicLong(TimeUtil.currentTimeMillis()));


正在加载...
取消
保存