ソースを参照

Add occupy mechanism for future buckets of sliding window to support "prioritized requests final pass" (#568)

* Rename: MetricsLeapArray -> BucketLeapArray
* Add implementation for `FutureBucketLeapArray`, a kind of `BucketLeapArray` that only reserves for future buckets, which is used for calculating occupied future tokens.
* Add OccupiableBucketLeapArray that combines common BucketLeapArray with FutureBucketLeapArray. The rollingNumberInSecond in StatisticNode now uses OccupiableBucketLeapArray by default.
* Add OccupySupport interface. Node now implements OccupySupport interface.
* Add occupy-related methods in Metric and ArrayMetric.
* Handle prioritized requests in default traffic shaping controller.
* Update default occupyTimeout to 500ms

Signed-off-by: Eric Zhao <sczyh16@gmail.com>
master
Eric Zhao GitHub 5年前
コミット
044cdbb1bf
この署名に対応する既知のキーがデータベースに存在しません GPGキーID: 4AEE18F83AFDEB23
28個のファイルの変更957行の追加132行の削除
  1. +1
    -1
      sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/metric/ClusterMetricLeapArray.java
  2. +1
    -2
      sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/metric/ClusterParameterLeapArray.java
  3. +1
    -1
      sentinel-core/src/main/java/com/alibaba/csp/sentinel/node/Node.java
  4. +70
    -0
      sentinel-core/src/main/java/com/alibaba/csp/sentinel/node/OccupySupport.java
  5. +79
    -0
      sentinel-core/src/main/java/com/alibaba/csp/sentinel/node/OccupyTimeoutProperty.java
  6. +65
    -10
      sentinel-core/src/main/java/com/alibaba/csp/sentinel/node/StatisticNode.java
  7. +34
    -18
      sentinel-core/src/main/java/com/alibaba/csp/sentinel/node/metric/MetricNode.java
  8. +1
    -1
      sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleChecker.java
  9. +40
    -0
      sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/PriorityWaitException.java
  10. +20
    -3
      sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/DefaultController.java
  11. +5
    -3
      sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/MetricEvent.java
  12. +16
    -0
      sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/StatisticSlot.java
  13. +76
    -19
      sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/base/LeapArray.java
  14. +1
    -1
      sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/base/UnaryLeapArray.java
  15. +11
    -0
      sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/base/WindowWrap.java
  16. +22
    -0
      sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/data/MetricBucket.java
  17. +97
    -22
      sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/metric/ArrayMetric.java
  18. +5
    -5
      sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/metric/BucketLeapArray.java
  19. +68
    -8
      sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/metric/Metric.java
  20. +53
    -0
      sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/metric/occupy/FutureBucketLeapArray.java
  21. +101
    -0
      sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/metric/occupy/OccupiableBucketLeapArray.java
  22. +5
    -5
      sentinel-core/src/test/java/com/alibaba/csp/sentinel/node/StatisticNodeTest.java
  23. +1
    -1
      sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/statistic/base/LeapArrayTest.java
  24. +7
    -9
      sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/statistic/metric/ArrayMetricTest.java
  25. +17
    -22
      sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/statistic/metric/BucketLeapArrayTest.java
  26. +32
    -0
      sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/statistic/metric/FutureBucketLeapArrayTest.java
  27. +127
    -0
      sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/statistic/metric/OccupiableBucketLeapArrayTest.java
  28. +1
    -1
      sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/statistic/metric/HotParameterLeapArray.java

+ 1
- 1
sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/metric/ClusterMetricLeapArray.java ファイルの表示

@@ -40,7 +40,7 @@ public class ClusterMetricLeapArray extends LeapArray<ClusterMetricBucket> {
}

@Override
public ClusterMetricBucket newEmptyBucket() {
public ClusterMetricBucket newEmptyBucket(long timeMillis) {
return new ClusterMetricBucket();
}



+ 1
- 2
sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/metric/ClusterParameterLeapArray.java ファイルの表示

@@ -37,7 +37,7 @@ public class ClusterParameterLeapArray<C> extends LeapArray<CacheMap<Object, C>>
}

@Override
public CacheMap<Object, C> newEmptyBucket() {
public CacheMap<Object, C> newEmptyBucket(long timeMillis) {
return new ConcurrentLinkedHashMapWrapper<>(maxCapacity);
}

@@ -48,5 +48,4 @@ public class ClusterParameterLeapArray<C> extends LeapArray<CacheMap<Object, C>>
return w;
}


}

+ 1
- 1
sentinel-core/src/main/java/com/alibaba/csp/sentinel/node/Node.java ファイルの表示

@@ -28,7 +28,7 @@ import com.alibaba.csp.sentinel.slots.statistic.metric.DebugSupport;
* @author leyou
* @author Eric Zhao
*/
public interface Node extends DebugSupport {
public interface Node extends OccupySupport, DebugSupport {

/**
* Get incoming request per minute ({@code pass + block}).


+ 70
- 0
sentinel-core/src/main/java/com/alibaba/csp/sentinel/node/OccupySupport.java ファイルの表示

@@ -0,0 +1,70 @@
/*
* Copyright 1999-2019 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.node;

/**
* @author Eric Zhao
* @since 1.5.0
*/
public interface OccupySupport {

/**
* Try to occupy latter time windows' tokens. If occupy success, a value less than
* {@code occupyTimeout} in {@link OccupyTimeoutProperty} will be return.
*
* <p>
* Each time we occupy tokens of the future window, current thread should sleep for the
* corresponding time for smoothing QPS. We can't occupy tokens of the future with unlimited,
* the sleep time limit is {@code occupyTimeout} in {@link OccupyTimeoutProperty}.
* </p>
*
* @param currentTime current time millis.
* @param acquireCount tokens count to acquire.
* @param threshold qps threshold.
* @return time should sleep. Time >= {@code occupyTimeout} in {@link OccupyTimeoutProperty} means
* occupy fail, in this case, the request should be rejected immediately.
*/
long tryOccupyNext(long currentTime, int acquireCount, double threshold);

/**
* Get current waiting amount. Useful for debug.
*
* @return current waiting amount
*/
long waiting();

/**
* Add request that occupied.
*
* @param futureTime future timestamp that the acquireCount should be added on.
* @param acquireCount tokens count.
*/
void addWaitingRequest(long futureTime, int acquireCount);

/**
* Add occupied pass request, which represents pass requests that borrow the latter windows' token.
*
* @param acquireCount tokens count.
*/
void addOccupiedPass(int acquireCount);

/**
* Get current occupied pass QPS.
*
* @return current occupied pass QPS
*/
double occupiedPassQps();
}

+ 79
- 0
sentinel-core/src/main/java/com/alibaba/csp/sentinel/node/OccupyTimeoutProperty.java ファイルの表示

@@ -0,0 +1,79 @@
/*
* Copyright 1999-2019 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.node;

import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.property.SentinelProperty;
import com.alibaba.csp.sentinel.property.SimplePropertyListener;

/**
* @author jialiang.linjl
* @author Carpenter Lee
* @since 1.5.0
*/
public class OccupyTimeoutProperty {

/**
* <p>
* Max occupy timeout in milliseconds. Requests with priority can occupy tokens of the future statistic
* window, and {@code occupyTimeout} limit the max time length that can be occupied.
* </p>
* <p>
* Note that the timeout value should never be greeter than {@link IntervalProperty#INTERVAL}.
* </p>
* DO NOT MODIFY this value directly, use {@link #updateTimeout(int)},
* otherwise the modification will not take effect.
*/
private static volatile int occupyTimeout = 500;

public static void register2Property(SentinelProperty<Integer> property) {
property.addListener(new SimplePropertyListener<Integer>() {
@Override
public void configUpdate(Integer value) {
if (value != null) {
updateTimeout(value);
}
}
});
}

public static int getOccupyTimeout() {
return occupyTimeout;
}

/**
* Update the timeout value.</br>
* Note that the time out should never greeter than {@link IntervalProperty#INTERVAL},
* or it will be ignored.
*
* @param newInterval new value.
*/
public static void updateTimeout(int newInterval) {
if (newInterval < 0) {
RecordLog.warn("[OccupyTimeoutProperty] Illegal timeout value will be ignored: " + occupyTimeout);
return;
}
if (newInterval > IntervalProperty.INTERVAL) {
RecordLog.warn("[OccupyTimeoutProperty] Illegal timeout value will be ignored: " + occupyTimeout
+ ", should <= " + IntervalProperty.INTERVAL);
return;
}
if (newInterval != occupyTimeout) {
occupyTimeout = newInterval;
}
RecordLog.info("[OccupyTimeoutProperty] occupyTimeout updated to: " + occupyTimeout);
}
}

+ 65
- 10
sentinel-core/src/main/java/com/alibaba/csp/sentinel/node/StatisticNode.java ファイルの表示

@@ -99,7 +99,7 @@ public class StatisticNode implements Node {
* Holds statistics of the recent 60 seconds. The windowLengthInMs is deliberately set to 1000 milliseconds,
* meaning each bucket per second, in this way we can get accurate statistics of each second.
*/
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000);
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);

/**
* The counter for thread count.
@@ -116,7 +116,7 @@ public class StatisticNode implements Node {
// The fetch operation is thread-safe under a single-thread scheduler pool.
long currentTime = TimeUtil.currentTimeMillis();
currentTime = currentTime - currentTime % 1000;
Map<Long, MetricNode> metrics = new ConcurrentHashMap<Long, MetricNode>();
Map<Long, MetricNode> metrics = new ConcurrentHashMap<>();
List<MetricNode> nodesOfEverySecond = rollingCounterInMinute.details();
long newLastFetchTime = lastFetchTime;
// Iterate metrics of all resources, filter valid metrics (not-empty and up-to-date).
@@ -137,7 +137,7 @@ public class StatisticNode implements Node {

private boolean isValidMetricNode(MetricNode node) {
return node.getPassQps() > 0 || node.getBlockQps() > 0 || node.getSuccessQps() > 0
|| node.getExceptionQps() > 0 || node.getRt() > 0;
|| node.getExceptionQps() > 0 || node.getRt() > 0 || node.getOccupiedPassQps() > 0;
}

@Override
@@ -151,11 +151,6 @@ public class StatisticNode implements Node {
return totalRequest;
}

@Override
public long totalPass() {
return rollingCounterInMinute.pass();
}

@Override
public long blockRequest() {
return rollingCounterInMinute.block();
@@ -201,6 +196,11 @@ public class StatisticNode implements Node {
return rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec();
}

@Override
public long totalPass() {
return rollingCounterInMinute.pass();
}

@Override
public double successQps() {
return rollingCounterInSecond.success() / rollingCounterInSecond.getWindowIntervalInSec();
@@ -211,6 +211,11 @@ public class StatisticNode implements Node {
return rollingCounterInSecond.maxSuccess() * rollingCounterInSecond.getSampleCount();
}

@Override
public double occupiedPassQps() {
return rollingCounterInSecond.occupiedPass() / rollingCounterInSecond.getWindowIntervalInSec();
}

@Override
public double avgRt() {
long successCount = rollingCounterInSecond.success();
@@ -256,7 +261,6 @@ public class StatisticNode implements Node {
public void increaseExceptionQps(int count) {
rollingCounterInSecond.addException(count);
rollingCounterInMinute.addException(count);

}

@Override
@@ -271,6 +275,57 @@ public class StatisticNode implements Node {

@Override
public void debug() {
rollingCounterInSecond.debugQps();
rollingCounterInSecond.debug();
}

@Override
public long tryOccupyNext(long currentTime, int acquireCount, double threshold) {
double maxCount = threshold * IntervalProperty.INTERVAL / 1000;
long currentBorrow = rollingCounterInSecond.waiting();
if (currentBorrow >= maxCount) {
return OccupyTimeoutProperty.getOccupyTimeout();
}

int windowLength = IntervalProperty.INTERVAL / SampleCountProperty.SAMPLE_COUNT;
long earliestTime = currentTime - currentTime % windowLength + windowLength - IntervalProperty.INTERVAL;

int idx = 0;
/*
* Note: here {@code currentPass} may be less than it really is NOW, because time difference
* since call rollingCounterInSecond.pass(). So in high concurrency, the following code may
* lead more tokens be borrowed.
*/
long currentPass = rollingCounterInSecond.pass();
while (earliestTime < currentTime) {
long waitInMs = idx * windowLength + windowLength - currentTime % windowLength;
if (waitInMs >= OccupyTimeoutProperty.getOccupyTimeout()) {
break;
}
long windowPass = rollingCounterInSecond.getWindowPass(earliestTime);
if (currentPass + currentBorrow + acquireCount - windowPass <= maxCount) {
return waitInMs;
}
earliestTime += windowLength;
currentPass -= windowPass;
idx++;
}

return OccupyTimeoutProperty.getOccupyTimeout();
}

@Override
public long waiting() {
return rollingCounterInSecond.waiting();
}

@Override
public void addWaitingRequest(long futureTime, int acquireCount) {
rollingCounterInSecond.addWaiting(futureTime, acquireCount);
}

@Override
public void addOccupiedPass(int acquireCount) {
rollingCounterInMinute.addOccupiedPass(acquireCount);
rollingCounterInMinute.addPass(acquireCount);
}
}

+ 34
- 18
sentinel-core/src/main/java/com/alibaba/csp/sentinel/node/metric/MetricNode.java ファイルの表示

@@ -34,12 +34,25 @@ public class MetricNode {
private long exceptionQps;
private long rt;

/**
* @since 1.5.0
*/
private long occupiedPassQps;

private String resource;

public long getTimestamp() {
return timestamp;
}

public long getOccupiedPassQps() {
return occupiedPassQps;
}

public void setOccupiedPassQps(long occupiedPassQps) {
this.occupiedPassQps = occupiedPassQps;
}

public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
@@ -94,22 +107,17 @@ public class MetricNode {

@Override
public String toString() {
return "MetricNode{" +
"timestamp=" + timestamp +
", passQps=" + passQps +
", blockQps=" + blockQps +
", successQps=" + successQps +
", exceptionQps=" + exceptionQps +
", rt=" + rt +
", resource='" + resource + '\'' +
'}';
return "MetricNode{" + "timestamp=" + timestamp + ", passQps=" + passQps + ", blockQps=" + blockQps
+ ", successQps=" + successQps + ", exceptionQps=" + exceptionQps + ", rt=" + rt
+ ", occupiedPassQps=" + occupiedPassQps + ", resource='"
+ resource + '\'' + '}';
}

/**
* To formatting string. All "|" in {@link #resource} will be replaced with "_", format is:
* <br/>
* To formatting string. All "|" in {@link #resource} will be replaced with
* "_", format is: <br/>
* <code>
* timestamp|resource|passQps|blockQps|successQps|exceptionQps|rt
* timestamp|resource|passQps|blockQps|successQps|exceptionQps|rt|occupiedPassQps
* </code>
*
* @return string format of this.
@@ -123,12 +131,13 @@ public class MetricNode {
sb.append(blockQps).append("|");
sb.append(successQps).append("|");
sb.append(exceptionQps).append("|");
sb.append(rt);
sb.append(rt).append("|");
sb.append(occupiedPassQps);
return sb.toString();
}

/**
* Parse {@link MetricNode} from thin string, see {@link #toThinString()} ()}
* Parse {@link MetricNode} from thin string, see {@link #toThinString()}
*
* @param line
* @return
@@ -143,14 +152,17 @@ public class MetricNode {
node.setSuccessQps(Long.parseLong(strs[4]));
node.setExceptionQps(Long.parseLong(strs[5]));
node.setRt(Long.parseLong(strs[6]));
if (strs.length == 8) {
node.setOccupiedPassQps(Long.parseLong(strs[7]));
}
return node;
}

/**
* To formatting string. All "|" in {@link MetricNode#resource} will be replaced with "_", format is:
* <br/>
* To formatting string. All "|" in {@link MetricNode#resource} will be
* replaced with "_", format is: <br/>
* <code>
* timestamp|yyyy-MM-dd HH:mm:ss|resource|passQps|blockQps|successQps|exceptionQps|rt\n
* timestamp|yyyy-MM-dd HH:mm:ss|resource|passQps|blockQps|successQps|exceptionQps|rt|occupiedPassQps\n
* </code>
*
* @return string format of this.
@@ -167,7 +179,8 @@ public class MetricNode {
sb.append(getBlockQps()).append("|");
sb.append(getSuccessQps()).append("|");
sb.append(getExceptionQps()).append("|");
sb.append(getRt());
sb.append(getRt()).append("|");
sb.append(getOccupiedPassQps());
sb.append('\n');
return sb.toString();
}
@@ -189,6 +202,9 @@ public class MetricNode {
node.setSuccessQps(Long.parseLong(strs[5]));
node.setExceptionQps(Long.parseLong(strs[6]));
node.setRt(Long.parseLong(strs[7]));
if (strs.length == 9) {
node.setOccupiedPassQps(Long.parseLong(strs[8]));
}
return node;
}



+ 1
- 1
sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleChecker.java ファイルの表示

@@ -61,7 +61,7 @@ final class FlowRuleChecker {
return true;
}

return rule.getRater().canPass(selectedNode, acquireCount);
return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}

static Node selectReferenceNode(FlowRule rule, Context context, DefaultNode node) {


+ 40
- 0
sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/PriorityWaitException.java ファイルの表示

@@ -0,0 +1,40 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.slots.block.flow;

/**
* An exception that marks previous prioritized request has been waiting till now, then should pass.
*
* @author jialiang.linjl
* @since 1.5.0
*/
public class PriorityWaitException extends RuntimeException {

private final long waitInMs;

public PriorityWaitException(long waitInMs) {
this.waitInMs = waitInMs;
}

public long getWaitInMs() {
return waitInMs;
}

@Override
public Throwable fillInStackTrace() {
return this;
}
}

+ 20
- 3
sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/controller/DefaultController.java ファイルの表示

@@ -16,13 +16,17 @@
package com.alibaba.csp.sentinel.slots.block.flow.controller;

import com.alibaba.csp.sentinel.node.Node;
import com.alibaba.csp.sentinel.node.OccupyTimeoutProperty;
import com.alibaba.csp.sentinel.slots.block.RuleConstant;
import com.alibaba.csp.sentinel.slots.block.flow.PriorityWaitException;
import com.alibaba.csp.sentinel.slots.block.flow.TrafficShapingController;
import com.alibaba.csp.sentinel.util.TimeUtil;

/**
* Default throttling controller (immediately reject strategy).
*
* @author jialiang.linjl
* @author Eric Zhao
*/
public class DefaultController implements TrafficShapingController {

@@ -45,9 +49,22 @@ public class DefaultController implements TrafficShapingController {
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
int curCount = avgUsedTokens(node);
if (curCount + acquireCount > count) {
if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
long currentTime;
long waitInMs;
currentTime = TimeUtil.currentTimeMillis();
waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
node.addWaitingRequest(currentTime + waitInMs, acquireCount);
node.addOccupiedPass(acquireCount);
sleep(waitInMs);

// PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
throw new PriorityWaitException(waitInMs);
}
}
return false;
}

return true;
}

@@ -55,10 +72,10 @@ public class DefaultController implements TrafficShapingController {
if (node == null) {
return DEFAULT_AVG_USED_TOKENS;
}
return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int) node.passQps();
return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
}

private void sleep(int timeMillis) {
private void sleep(long timeMillis) {
try {
Thread.sleep(timeMillis);
} catch (InterruptedException e) {


+ 5
- 3
sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/MetricEvent.java ファイルの表示

@@ -31,7 +31,9 @@ public enum MetricEvent {
EXCEPTION,
SUCCESS,
RT,
OCCUPIED_PASS,
OCCUPIED_BLOCK,
WAITING

/**
* Passed in future quota (pre-occupied, since 1.5.0).
*/
OCCUPIED_PASS
}

+ 16
- 0
sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/StatisticSlot.java ファイルの表示

@@ -19,6 +19,7 @@ import java.util.Collection;

import com.alibaba.csp.sentinel.slotchain.ProcessorSlotEntryCallback;
import com.alibaba.csp.sentinel.slotchain.ProcessorSlotExitCallback;
import com.alibaba.csp.sentinel.slots.block.flow.PriorityWaitException;
import com.alibaba.csp.sentinel.util.TimeUtil;
import com.alibaba.csp.sentinel.Constants;
import com.alibaba.csp.sentinel.EntryType;
@@ -70,6 +71,21 @@ public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
Constants.ENTRY_NODE.addPassRequest(count);
}

// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (PriorityWaitException ex) {
node.increaseThreadNum();
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
}

if (resourceWrapper.getType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);


+ 76
- 19
sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/base/LeapArray.java ファイルの表示

@@ -28,7 +28,7 @@ import com.alibaba.csp.sentinel.util.TimeUtil;
* Basic data structure for statistic metrics in Sentinel.
* </p>
* <p>
* Leap array use sliding window algorithm to count data. Each bucket cover {code windowLengthInMs} time span,
* Leap array use sliding window algorithm to count data. Each bucket cover {@code windowLengthInMs} time span,
* and the total time span is {@link #intervalInMs}, so the total bucket amount is:
* {@code sampleCount = intervalInMs / windowLengthInMs}.
* </p>
@@ -54,8 +54,8 @@ public abstract class LeapArray<T> {
/**
* The total bucket count is: {@code sampleCount = intervalInMs / windowLengthInMs}.
*
* @param sampleCount bucket count of the sliding window
* @param intervalInMs the total time interval of this {@link LeapArray} in milliseconds
* @param sampleCount bucket count of the sliding window
* @param intervalInMs the total time interval of this {@link LeapArray} in milliseconds
*/
public LeapArray(int sampleCount, int intervalInMs) {
AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
@@ -66,7 +66,7 @@ public abstract class LeapArray<T> {
this.intervalInMs = intervalInMs;
this.sampleCount = sampleCount;

this.array = new AtomicReferenceArray<WindowWrap<T>>(sampleCount);
this.array = new AtomicReferenceArray<>(sampleCount);
}

/**
@@ -81,9 +81,10 @@ public abstract class LeapArray<T> {
/**
* Create a new statistic value for bucket.
*
* @param timeMillis current time in milliseconds
* @return the new empty bucket
*/
public abstract T newEmptyBucket();
public abstract T newEmptyBucket(long timeMillis);

/**
* Reset given bucket to provided start time and reset the value.
@@ -94,7 +95,7 @@ public abstract class LeapArray<T> {
*/
protected abstract WindowWrap<T> resetWindowTo(WindowWrap<T> windowWrap, long startTime);

protected int calculateTimeIdx(/*@Valid*/ long timeMillis) {
private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
long timeId = timeMillis / windowLengthInMs;
// Calculate current index so we can map the timestamp to the leap array.
return (int)(timeId % array.length());
@@ -141,7 +142,7 @@ public abstract class LeapArray<T> {
* then try to update circular array via a CAS operation. Only one thread can
* succeed to update, while other threads yield its time slice.
*/
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket());
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {
// Successfully updated, return the created bucket.
return window;
@@ -193,7 +194,7 @@ public abstract class LeapArray<T> {
}
} else if (windowStart < old.windowStart()) {
// Should not go through here, as the provided time is already behind.
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket());
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
@@ -236,21 +237,22 @@ public abstract class LeapArray<T> {
/**
* Get statistic value from bucket for provided timestamp.
*
* @param time a valid timestamp in milliseconds
* @param timeMillis a valid timestamp in milliseconds
* @return the statistic value if bucket for provided timestamp is up-to-date; otherwise null
*/
public T getWindowValue(long time) {
if (time < 0) {
public T getWindowValue(long timeMillis) {
if (timeMillis < 0) {
return null;
}
int idx = calculateTimeIdx(time);
int idx = calculateTimeIdx(timeMillis);

WindowWrap<T> old = array.get(idx);
if (old == null || isWindowDeprecated(old)) {
WindowWrap<T> bucket = array.get(idx);

if (bucket == null || !bucket.isTimeInWindow(timeMillis)) {
return null;
}

return old.value();
return bucket.value();
}

/**
@@ -260,8 +262,12 @@ public abstract class LeapArray<T> {
* @param windowWrap a non-null bucket
* @return true if the bucket is deprecated; otherwise false
*/
protected boolean isWindowDeprecated(/*@NonNull*/ WindowWrap<T> windowWrap) {
return TimeUtil.currentTimeMillis() - windowWrap.windowStart() >= intervalInMs;
public boolean isWindowDeprecated(/*@NonNull*/ WindowWrap<T> windowWrap) {
return isWindowDeprecated(TimeUtil.currentTimeMillis(), windowWrap);
}

public boolean isWindowDeprecated(long time, WindowWrap<T> windowWrap) {
return time - windowWrap.windowStart() > intervalInMs;
}

/**
@@ -271,12 +277,36 @@ public abstract class LeapArray<T> {
* @return valid bucket list for entire sliding window.
*/
public List<WindowWrap<T>> list() {
return list(TimeUtil.currentTimeMillis());
}

public List<WindowWrap<T>> list(long validTime) {
int size = array.length();
List<WindowWrap<T>> result = new ArrayList<WindowWrap<T>>(size);

for (int i = 0; i < size; i++) {
WindowWrap<T> windowWrap = array.get(i);
if (windowWrap == null || isWindowDeprecated(windowWrap)) {
if (windowWrap == null || isWindowDeprecated(validTime, windowWrap)) {
continue;
}
result.add(windowWrap);
}

return result;
}

/**
* Get all buckets for entire sliding window including deprecated buckets.
*
* @return all buckets for entire sliding window
*/
public List<WindowWrap<T>> listAll() {
int size = array.length();
List<WindowWrap<T>> result = new ArrayList<WindowWrap<T>>(size);

for (int i = 0; i < size; i++) {
WindowWrap<T> windowWrap = array.get(i);
if (windowWrap == null) {
continue;
}
result.add(windowWrap);
@@ -292,12 +322,19 @@ public abstract class LeapArray<T> {
* @return aggregated value list for entire sliding window
*/
public List<T> values() {
return values(TimeUtil.currentTimeMillis());
}

public List<T> values(long timeMillis) {
if (timeMillis < 0) {
return new ArrayList<T>();
}
int size = array.length();
List<T> result = new ArrayList<T>(size);

for (int i = 0; i < size; i++) {
WindowWrap<T> windowWrap = array.get(i);
if (windowWrap == null || isWindowDeprecated(windowWrap)) {
if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) {
continue;
}
result.add(windowWrap.value());
@@ -359,4 +396,24 @@ public abstract class LeapArray<T> {
public double getIntervalInSecond() {
return intervalInMs / 1000.0;
}

public void debug(long time) {
StringBuilder sb = new StringBuilder();
List<WindowWrap<T>> lists = list(time);
sb.append("Thread_").append(Thread.currentThread().getId()).append("_");
for (WindowWrap<T> window : lists) {
sb.append(window.windowStart()).append(":").append(window.value().toString());
}
System.out.println(sb.toString());
}

public long currentWaiting() {
// TODO: default method. Should remove this later.
return 0;
}

public void addWaiting(long time, int acquireCount) {
// Do nothing by default.
throw new UnsupportedOperationException();
}
}

+ 1
- 1
sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/base/UnaryLeapArray.java ファイルの表示

@@ -25,7 +25,7 @@ public class UnaryLeapArray extends LeapArray<LongAdder> {
}

@Override
public LongAdder newEmptyBucket() {
public LongAdder newEmptyBucket(long time) {
return new LongAdder();
}



+ 11
- 0
sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/base/WindowWrap.java ファイルの表示

@@ -77,6 +77,17 @@ public class WindowWrap<T> {
return this;
}

/**
* Check whether given timestamp is in current bucket.
*
* @param timeMillis valid timestamp in ms
* @return true if the given time is in current bucket, otherwise false
* @since 1.5.0
*/
public boolean isTimeInWindow(long timeMillis) {
return windowStart <= timeMillis && timeMillis < windowStart + windowLengthInMs;
}

@Override
public String toString() {
return "WindowWrap{" +


+ 22
- 0
sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/data/MetricBucket.java ファイルの表示

@@ -40,6 +40,15 @@ public class MetricBucket {
initMinRt();
}

public MetricBucket reset(MetricBucket bucket) {
for (MetricEvent event : MetricEvent.values()) {
counters[event.ordinal()].reset();
counters[event.ordinal()].add(bucket.get(event));
}
initMinRt();
return this;
}

private void initMinRt() {
this.minRt = Constants.TIME_DROP_VALVE;
}
@@ -70,6 +79,10 @@ public class MetricBucket {
return get(MetricEvent.PASS);
}

public long occupiedPass() {
return get(MetricEvent.OCCUPIED_PASS);
}

public long block() {
return get(MetricEvent.BLOCK);
}
@@ -94,6 +107,10 @@ public class MetricBucket {
add(MetricEvent.PASS, n);
}

public void addOccupiedPass(int n) {
add(MetricEvent.OCCUPIED_PASS, n);
}

public void addException(int n) {
add(MetricEvent.EXCEPTION, n);
}
@@ -114,4 +131,9 @@ public class MetricBucket {
minRt = rt;
}
}

@Override
public String toString() {
return "p: " + pass() + ", b: " + block() + ", w: " + occupiedPass();
}
}

+ 97
- 22
sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/metric/ArrayMetric.java ファイルの表示

@@ -20,27 +20,38 @@ import java.util.List;

import com.alibaba.csp.sentinel.Constants;
import com.alibaba.csp.sentinel.node.metric.MetricNode;
import com.alibaba.csp.sentinel.slots.statistic.MetricEvent;
import com.alibaba.csp.sentinel.slots.statistic.base.LeapArray;
import com.alibaba.csp.sentinel.slots.statistic.data.MetricBucket;
import com.alibaba.csp.sentinel.slots.statistic.base.WindowWrap;
import com.alibaba.csp.sentinel.slots.statistic.metric.occupy.OccupiableBucketLeapArray;

/**
* The basic metric class in Sentinel using a {@link MetricsLeapArray} internal.
* The basic metric class in Sentinel using a {@link BucketLeapArray} internal.
*
* @author jialiang.linjl
* @author Eric Zhao
*/
public class ArrayMetric implements Metric {

private final MetricsLeapArray data;
private final LeapArray<MetricBucket> data;

public ArrayMetric(int sampleCount, int intervalInMs) {
this.data = new MetricsLeapArray(sampleCount, intervalInMs);
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
}

public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
if (enableOccupy) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
} else {
this.data = new BucketLeapArray(sampleCount, intervalInMs);
}
}

/**
* For unit test.
*/
public ArrayMetric(MetricsLeapArray array) {
public ArrayMetric(LeapArray<MetricBucket> array) {
this.data = array;
}

@@ -104,6 +115,17 @@ public class ArrayMetric implements Metric {
return pass;
}

@Override
public long occupiedPass() {
data.currentWindow();
long pass = 0;
List<MetricBucket> list = data.values();
for (MetricBucket window : list) {
pass += window.occupiedPass();
}
return pass;
}

@Override
public long rt() {
data.currentWindow();
@@ -133,7 +155,8 @@ public class ArrayMetric implements Metric {
public List<MetricNode> details() {
List<MetricNode> details = new ArrayList<MetricNode>();
data.currentWindow();
for (WindowWrap<MetricBucket> window : data.list()) {
List<WindowWrap<MetricBucket>> list = data.list();
for (WindowWrap<MetricBucket> window : list) {
if (window == null) {
continue;
}
@@ -141,14 +164,16 @@ public class ArrayMetric implements Metric {
node.setBlockQps(window.value().block());
node.setExceptionQps(window.value().exception());
node.setPassQps(window.value().pass());
long passQps = window.value().success();
node.setSuccessQps(passQps);
if (passQps != 0) {
node.setRt(window.value().rt() / passQps);
long successQps = window.value().success();
node.setSuccessQps(successQps);
if (successQps != 0) {
node.setRt(window.value().rt() / successQps);
} else {
node.setRt(window.value().rt());
}
node.setTimestamp(window.windowStart());
node.setOccupiedPassQps(window.value().occupiedPass());

details.add(node);
}

@@ -158,7 +183,7 @@ public class ArrayMetric implements Metric {
@Override
public MetricBucket[] windows() {
data.currentWindow();
return data.values().toArray(new MetricBucket[data.values().size()]);
return data.values().toArray(new MetricBucket[0]);
}

@Override
@@ -173,6 +198,17 @@ public class ArrayMetric implements Metric {
wrap.value().addBlock(count);
}

@Override
public void addWaiting(long time, int acquireCount) {
data.addWaiting(time, acquireCount);
}

@Override
public void addOccupiedPass(int acquireCount) {
WindowWrap<MetricBucket> wrap = data.currentWindow();
wrap.value().addOccupiedPass(acquireCount);
}

@Override
public void addSuccess(int count) {
WindowWrap<MetricBucket> wrap = data.currentWindow();
@@ -192,18 +228,8 @@ public class ArrayMetric implements Metric {
}

@Override
public void debugQps() {
data.currentWindow();
StringBuilder sb = new StringBuilder();
sb.append(Thread.currentThread().getId()).append("_");
for (WindowWrap<MetricBucket> windowWrap : data.list()) {

sb.append(windowWrap.windowStart()).append(":").append(windowWrap.value().pass()).append(":")
.append(windowWrap.value().block());
sb.append(",");

}
System.out.println(sb);
public void debug() {
data.debug(System.currentTimeMillis());
}

@Override
@@ -226,6 +252,55 @@ public class ArrayMetric implements Metric {
return wrap.value().pass();
}

public void add(MetricEvent event, long count) {
data.currentWindow().value().add(event, count);
}

public long getCurrentCount(MetricEvent event) {
return data.currentWindow().value().get(event);
}

/**
* Get total sum for provided event in {@code intervalInSec}.
*
* @param event event to calculate
* @return total sum for event
*/
public long getSum(MetricEvent event) {
data.currentWindow();
long sum = 0;

List<MetricBucket> buckets = data.values();
for (MetricBucket bucket : buckets) {
sum += bucket.get(event);
}
return sum;
}

/**
* Get average count for provided event per second.
*
* @param event event to calculate
* @return average count per second for event
*/
public double getAvg(MetricEvent event) {
return getSum(event) / data.getIntervalInSecond();
}

@Override
public long getWindowPass(long timeMillis) {
MetricBucket bucket = data.getWindowValue(timeMillis);
if (bucket == null) {
return 0L;
}
return bucket.pass();
}

@Override
public long waiting() {
return data.currentWaiting();
}

@Override
public double getWindowIntervalInSec() {
return data.getIntervalInSecond();


sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/metric/MetricsLeapArray.java → sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/metric/BucketLeapArray.java ファイルの表示

@@ -16,24 +16,24 @@
package com.alibaba.csp.sentinel.slots.statistic.metric;

import com.alibaba.csp.sentinel.slots.statistic.base.LeapArray;
import com.alibaba.csp.sentinel.slots.statistic.data.MetricBucket;
import com.alibaba.csp.sentinel.slots.statistic.base.WindowWrap;
import com.alibaba.csp.sentinel.slots.statistic.data.MetricBucket;

/**
* The fundamental data structure for metric statistics in a time span.
*
* @see LeapArray
* @author jialiang.linjl
* @author Eric Zhao
* @see LeapArray
*/
public class MetricsLeapArray extends LeapArray<MetricBucket> {
public class BucketLeapArray extends LeapArray<MetricBucket> {

public MetricsLeapArray(int sampleCount, int intervalInMs) {
public BucketLeapArray(int sampleCount, int intervalInMs) {
super(sampleCount, intervalInMs);
}

@Override
public MetricBucket newEmptyBucket() {
public MetricBucket newEmptyBucket(long time) {
return new MetricBucket();
}


+ 68
- 8
sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/metric/Metric.java ファイルの表示

@@ -26,7 +26,7 @@ import com.alibaba.csp.sentinel.slots.statistic.data.MetricBucket;
* @author jialiang.linjl
* @author Eric Zhao
*/
public interface Metric {
public interface Metric extends DebugSupport {

/**
* Get total success count.
@@ -57,7 +57,7 @@ public interface Metric {
long block();

/**
* Get total pass count.
* Get total pass count. not include {@link #occupiedPass()}
*
* @return pass count
*/
@@ -92,22 +92,30 @@ public interface Metric {
MetricBucket[] windows();

/**
* Increment by one the current exception count.
* Add current exception count.
*
* @param n count to add
*/
void addException(int n);

/**
* Increment by one the current block count.
* Add current block count.
*
* @param n count to add
*/
void addBlock(int n);

/**
* Increment by one the current success count.
* Add current completed count.
*
* @param n count to add
*/
void addSuccess(int n);

/**
* Increment by one the current pass count.
* Add current pass count.
*
* @param n count to add
*/
void addPass(int n);

@@ -118,13 +126,65 @@ public interface Metric {
*/
void addRT(long rt);

/**
* Get the sliding window length in seconds.
*
* @return the sliding window length
*/
double getWindowIntervalInSec();

/**
* Get sample count of the sliding window.
*
* @return sample count of the sliding window.
*/
int getSampleCount();

// Tool methods.
/**
* Note: this operation will not perform refreshing, so will not generate new buckets.
*
* @param timeMillis valid time in ms
* @return pass count of the bucket exactly associated to provided timestamp, or 0 if the timestamp is invalid
* @since 1.5.0
*/
long getWindowPass(long timeMillis);

// Occupy-based (@since 1.5.0)

/**
* Add occupied pass, which represents pass requests that borrow the latter windows' token.
*
* @param acquireCount tokens count.
* @since 1.5.0
*/
void addOccupiedPass(int acquireCount);

/**
* Add request that occupied.
*
* @param futureTime future timestamp that the acquireCount should be added on.
* @param acquireCount tokens count.
* @since 1.5.0
*/
void addWaiting(long futureTime, int acquireCount);

/**
* Get waiting pass account
*
* @return waiting pass count
* @since 1.5.0
*/
long waiting();

/**
* Get occupied pass count.
*
* @return occupied pass count
* @since 1.5.0
*/
long occupiedPass();

void debugQps();
// Tool methods.

long previousWindowBlock();



+ 53
- 0
sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/metric/occupy/FutureBucketLeapArray.java ファイルの表示

@@ -0,0 +1,53 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.slots.statistic.metric.occupy;

import com.alibaba.csp.sentinel.slots.statistic.base.LeapArray;
import com.alibaba.csp.sentinel.slots.statistic.base.WindowWrap;
import com.alibaba.csp.sentinel.slots.statistic.data.MetricBucket;

/**
* A kind of {@code BucketLeapArray} that only reserves for future buckets.
*
* @author jialiang.linjl
* @since 1.5.0
*/
public class FutureBucketLeapArray extends LeapArray<MetricBucket> {

public FutureBucketLeapArray(int sampleCount, int intervalInMs) {
// This class is the original "BorrowBucketArray".
super(sampleCount, intervalInMs);
}

@Override
public MetricBucket newEmptyBucket(long time) {
return new MetricBucket();
}

@Override
protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long startTime) {
// Update the start time and reset value.
w.resetTo(startTime);
w.value().reset();
return w;
}

@Override
public boolean isWindowDeprecated(long time, WindowWrap<MetricBucket> windowWrap) {
// Tricky: will only calculate for future.
return time >= windowWrap.windowStart();
}
}

+ 101
- 0
sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/metric/occupy/OccupiableBucketLeapArray.java ファイルの表示

@@ -0,0 +1,101 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.slots.statistic.metric.occupy;

import java.util.List;

import com.alibaba.csp.sentinel.slots.statistic.MetricEvent;
import com.alibaba.csp.sentinel.slots.statistic.base.LeapArray;
import com.alibaba.csp.sentinel.slots.statistic.base.WindowWrap;
import com.alibaba.csp.sentinel.slots.statistic.data.MetricBucket;

/**
* @author jialiang.linjl
* @since 1.5.0
*/
public class OccupiableBucketLeapArray extends LeapArray<MetricBucket> {

private final FutureBucketLeapArray borrowArray;

public OccupiableBucketLeapArray(int sampleCount, int intervalInMs) {
// This class is the original "CombinedBucketArray".
super(sampleCount, intervalInMs);
this.borrowArray = new FutureBucketLeapArray(sampleCount, intervalInMs);
}

@Override
public MetricBucket newEmptyBucket(long time) {
MetricBucket newBucket = new MetricBucket();

MetricBucket borrowBucket = borrowArray.getWindowValue(time);
if (borrowBucket != null) {
newBucket.reset(borrowBucket);
}

return newBucket;
}

@Override
protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long time) {
// Update the start time and reset value.
w.resetTo(time);
MetricBucket borrowBucket = borrowArray.getWindowValue(time);
if (borrowBucket != null) {
w.value().reset();
w.value().addPass((int)borrowBucket.pass());
} else {
w.value().reset();
}

return w;
}

@Override
public long currentWaiting() {
borrowArray.currentWindow();
long currentWaiting = 0;
List<MetricBucket> list = borrowArray.values();

for (MetricBucket window : list) {
currentWaiting += window.pass();
}
return currentWaiting;
}

@Override
public void addWaiting(long time, int acquireCount) {
WindowWrap<MetricBucket> window = borrowArray.currentWindow(time);
window.value().add(MetricEvent.PASS, acquireCount);
}

@Override
public void debug(long time) {
StringBuilder sb = new StringBuilder();
List<WindowWrap<MetricBucket>> lists = listAll();
sb.append("a_Thread_").append(Thread.currentThread().getId()).append(" time=").append(time).append("; ");
for (WindowWrap<MetricBucket> window : lists) {
sb.append(window.windowStart()).append(":").append(window.value().toString()).append(";");
}
sb.append("\n");

lists = borrowArray.listAll();
sb.append("b_Thread_").append(Thread.currentThread().getId()).append(" time=").append(time).append("; ");
for (WindowWrap<MetricBucket> window : lists) {
sb.append(window.windowStart()).append(":").append(window.value().toString()).append(";");
}
System.out.println(sb.toString());
}
}

+ 5
- 5
sentinel-core/src/test/java/com/alibaba/csp/sentinel/node/StatisticNodeTest.java ファイルの表示

@@ -39,7 +39,7 @@ import static org.junit.Assert.assertTrue;
*/
public class StatisticNodeTest {

private static final String LOG_PREFIX = "[StatisticNodeTest]";
private static final String LOG_PREFIX = "[StatisticNodeTest] ";

private static final SimpleDateFormat SDF = new SimpleDateFormat("yyyy-HH-dd HH:mm:ss");

@@ -74,7 +74,7 @@ public class StatisticNodeTest {

tickEs.submit(new TickTask(node));

List<BizTask> bizTasks = new ArrayList<BizTask>(taskBizExecuteCount);
List<BizTask> bizTasks = new ArrayList<>(taskBizExecuteCount);
for (int i = 0; i < taskCount; i++) {
bizTasks.add(new BizTask(node, taskBizExecuteCount));
}
@@ -88,8 +88,8 @@ public class StatisticNodeTest {
log("all biz task done, waiting 3 second to exit");
sleep(3000);

bizEs.shutdown();
tickEs.shutdown();
bizEs.shutdownNow();
tickEs.shutdownNow();

// now no biz method execute, so there is no curThreadNum,passQps,successQps
assertEquals(0, node.curThreadNum(), 0.01);
@@ -192,7 +192,7 @@ public class StatisticNodeTest {
log(SDF.format(new Date()) + " curThreadNum=" + node.curThreadNum() + ",passQps=" + node.passQps()
+ ",successQps=" + node.successQps() + ",maxSuccessQps=" + node.maxSuccessQps()
+ ",totalRequest=" + node.totalRequest() + ",totalSuccess=" + node.totalSuccess()
+ ",avgRt=" + node.avgRt() + ",minRt=" + node.minRt());
+ ", avgRt=" + String.format("%.2f", node.avgRt()) + ", minRt=" + node.minRt());
}

private static void log(Object obj) {


+ 1
- 1
sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/statistic/base/LeapArrayTest.java ファイルの表示

@@ -35,7 +35,7 @@ public class LeapArrayTest extends AbstractTimeBasedTest {
int sampleCount = intervalInMs / windowLengthInMs;
LeapArray<AtomicInteger> leapArray = new LeapArray<AtomicInteger>(sampleCount, intervalInMs) {
@Override
public AtomicInteger newEmptyBucket() {
public AtomicInteger newEmptyBucket(long time) {
return new AtomicInteger(0);
}



sentinel-core/src/test/java/com/alibaba/csp/sentinel/base/metric/ArrayMetricTest.java → sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/statistic/metric/ArrayMetricTest.java ファイルの表示

@@ -13,20 +13,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.base.metric;
package com.alibaba.csp.sentinel.slots.statistic.metric;

import java.util.ArrayList;

import org.junit.Test;

import com.alibaba.csp.sentinel.slots.statistic.data.MetricBucket;
import com.alibaba.csp.sentinel.slots.statistic.base.WindowWrap;
import com.alibaba.csp.sentinel.slots.statistic.metric.ArrayMetric;
import com.alibaba.csp.sentinel.slots.statistic.metric.MetricsLeapArray;
import com.alibaba.csp.sentinel.slots.statistic.data.MetricBucket;

import static org.junit.Assert.*;
import org.junit.Test;

import static org.mockito.Mockito.*;
import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

/**
* Test cases for {@link ArrayMetric}.
@@ -39,7 +37,7 @@ public class ArrayMetricTest {

@Test
public void testOperateArrayMetric() {
MetricsLeapArray leapArray = mock(MetricsLeapArray.class);
BucketLeapArray leapArray = mock(BucketLeapArray.class);
final WindowWrap<MetricBucket> windowWrap = new WindowWrap<MetricBucket>(windowLengthInMs, 0, new MetricBucket());
when(leapArray.currentWindow()).thenReturn(windowWrap);
when(leapArray.values()).thenReturn(new ArrayList<MetricBucket>() {{ add(windowWrap.value()); }});

sentinel-core/src/test/java/com/alibaba/csp/sentinel/base/metric/MetricsLeapArrayTest.java → sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/statistic/metric/BucketLeapArrayTest.java ファイルの表示

@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.base.metric;
package com.alibaba.csp.sentinel.slots.statistic.metric;

import java.util.ArrayList;
import java.util.HashSet;
@@ -21,22 +21,20 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;

import com.alibaba.csp.sentinel.slots.statistic.base.WindowWrap;
import com.alibaba.csp.sentinel.slots.statistic.data.MetricBucket;
import com.alibaba.csp.sentinel.util.TimeUtil;
import com.alibaba.csp.sentinel.slots.statistic.base.WindowWrap;
import com.alibaba.csp.sentinel.slots.statistic.metric.MetricsLeapArray;
import com.alibaba.csp.sentinel.test.AbstractTimeBasedTest;

import org.junit.Test;

import static org.junit.Assert.*;

/**
* Test cases for {@link MetricsLeapArray}.
* Test cases for {@link BucketLeapArray}.
*
* @author Eric Zhao
*/
public class MetricsLeapArrayTest extends AbstractTimeBasedTest {
public class BucketLeapArrayTest {

private final int windowLengthInMs = 1000;
private final int intervalInSec = 2;
@@ -45,7 +43,7 @@ public class MetricsLeapArrayTest extends AbstractTimeBasedTest {

@Test
public void testNewWindow() {
MetricsLeapArray leapArray = new MetricsLeapArray(sampleCount, intervalInMs);
BucketLeapArray leapArray = new BucketLeapArray(sampleCount, intervalInMs);
long time = TimeUtil.currentTimeMillis();
WindowWrap<MetricBucket> window = leapArray.currentWindow(time);

@@ -57,7 +55,7 @@ public class MetricsLeapArrayTest extends AbstractTimeBasedTest {

@Test
public void testLeapArrayWindowStart() {
MetricsLeapArray leapArray = new MetricsLeapArray(sampleCount, intervalInMs);
BucketLeapArray leapArray = new BucketLeapArray(sampleCount, intervalInMs);
long firstTime = TimeUtil.currentTimeMillis();
long previousWindowStart = firstTime - firstTime % windowLengthInMs;

@@ -69,7 +67,7 @@ public class MetricsLeapArrayTest extends AbstractTimeBasedTest {

@Test
public void testWindowAfterOneInterval() {
MetricsLeapArray leapArray = new MetricsLeapArray(sampleCount, intervalInMs);
BucketLeapArray leapArray = new BucketLeapArray(sampleCount, intervalInMs);
long firstTime = TimeUtil.currentTimeMillis();
long previousWindowStart = firstTime - firstTime % windowLengthInMs;
WindowWrap<MetricBucket> window = leapArray.currentWindow(previousWindowStart);
@@ -109,7 +107,7 @@ public class MetricsLeapArrayTest extends AbstractTimeBasedTest {

@Deprecated
public void testWindowDeprecatedRefresh() {
MetricsLeapArray leapArray = new MetricsLeapArray(sampleCount, intervalInMs);
BucketLeapArray leapArray = new BucketLeapArray(sampleCount, intervalInMs);
final int len = sampleCount;
long firstTime = TimeUtil.currentTimeMillis();
List<WindowWrap<MetricBucket>> firstIterWindowList = new ArrayList<WindowWrap<MetricBucket>>(len);
@@ -129,7 +127,7 @@ public class MetricsLeapArrayTest extends AbstractTimeBasedTest {
public void testMultiThreadUpdateEmptyWindow() throws Exception {
final long time = TimeUtil.currentTimeMillis();
final int nThreads = 16;
final MetricsLeapArray leapArray = new MetricsLeapArray(sampleCount, intervalInMs);
final BucketLeapArray leapArray = new BucketLeapArray(sampleCount, intervalInMs);
final CountDownLatch latch = new CountDownLatch(nThreads);
Runnable task = new Runnable() {
@Override
@@ -150,8 +148,7 @@ public class MetricsLeapArrayTest extends AbstractTimeBasedTest {

@Test
public void testGetPreviousWindow() {
setCurrentMillis(System.currentTimeMillis());
MetricsLeapArray leapArray = new MetricsLeapArray(sampleCount, intervalInMs);
BucketLeapArray leapArray = new BucketLeapArray(sampleCount, intervalInMs);
long time = TimeUtil.currentTimeMillis();
WindowWrap<MetricBucket> previousWindow = leapArray.currentWindow(time);
assertNull(leapArray.getPreviousWindow(time));
@@ -168,10 +165,8 @@ public class MetricsLeapArrayTest extends AbstractTimeBasedTest {
final int windowLengthInMs = 100;
final int intervalInMs = 1000;
final int sampleCount = intervalInMs / windowLengthInMs;
setCurrentMillis(System.currentTimeMillis());
MetricsLeapArray leapArray = new MetricsLeapArray(sampleCount, intervalInMs);

BucketLeapArray leapArray = new BucketLeapArray(sampleCount, intervalInMs);
long time = TimeUtil.currentTimeMillis();

Set<WindowWrap<MetricBucket>> windowWraps = new HashSet<WindowWrap<MetricBucket>>();
@@ -184,7 +179,7 @@ public class MetricsLeapArrayTest extends AbstractTimeBasedTest {
assertTrue(windowWraps.contains(wrap));
}

sleep(windowLengthInMs + intervalInMs);
Thread.sleep(windowLengthInMs + intervalInMs);

// This will replace the deprecated bucket, so all deprecated buckets will be reset.
leapArray.currentWindow(time + windowLengthInMs + intervalInMs).value().addPass(1);
@@ -198,8 +193,8 @@ public class MetricsLeapArrayTest extends AbstractTimeBasedTest {
final int intervalInSec = 1;
final int intervalInMs = intervalInSec * 1000;
final int sampleCount = intervalInMs / windowLengthInMs;
MetricsLeapArray leapArray = new MetricsLeapArray(sampleCount, intervalInMs);
BucketLeapArray leapArray = new BucketLeapArray(sampleCount, intervalInMs);
long time = TimeUtil.currentTimeMillis();

Set<WindowWrap<MetricBucket>> windowWraps = new HashSet<WindowWrap<MetricBucket>>();
@@ -207,7 +202,7 @@ public class MetricsLeapArrayTest extends AbstractTimeBasedTest {
windowWraps.add(leapArray.currentWindow(time));
windowWraps.add(leapArray.currentWindow(time + windowLengthInMs));

sleep(intervalInMs + windowLengthInMs * 3);
Thread.sleep(intervalInMs + windowLengthInMs * 3);

List<WindowWrap<MetricBucket>> list = leapArray.list();
for (WindowWrap<MetricBucket> wrap : list) {
@@ -220,4 +215,4 @@ public class MetricsLeapArrayTest extends AbstractTimeBasedTest {

assertEquals(1, leapArray.list().size());
}
}
}

+ 32
- 0
sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/statistic/metric/FutureBucketLeapArrayTest.java ファイルの表示

@@ -0,0 +1,32 @@
package com.alibaba.csp.sentinel.slots.statistic.metric;

import com.alibaba.csp.sentinel.slots.statistic.metric.occupy.FutureBucketLeapArray;
import com.alibaba.csp.sentinel.util.TimeUtil;

import org.junit.Test;

import static org.junit.Assert.assertEquals;

/**
* Test cases for {@link FutureBucketLeapArray}.
*
* @author jialiang.linjl
*/
public class FutureBucketLeapArrayTest {

private final int windowLengthInMs = 200;
private final int intervalInSec = 2;
private final int intervalInMs = intervalInSec * 1000;
private final int sampleCount = intervalInMs / windowLengthInMs;

@Test
public void testFutureMetricLeapArray() {
FutureBucketLeapArray array = new FutureBucketLeapArray(sampleCount, intervalInMs);

long currentTime = TimeUtil.currentTimeMillis();
for (int i = 0; i < intervalInSec * 1000; i = i + windowLengthInMs) {
array.currentWindow(i + currentTime).value().addPass(1);
assertEquals(array.values(i + currentTime).size(), 0);
}
}
}

+ 127
- 0
sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/statistic/metric/OccupiableBucketLeapArrayTest.java ファイルの表示

@@ -0,0 +1,127 @@
package com.alibaba.csp.sentinel.slots.statistic.metric;

import java.util.List;
import java.util.concurrent.CountDownLatch;

import com.alibaba.csp.sentinel.slots.statistic.base.WindowWrap;
import com.alibaba.csp.sentinel.slots.statistic.data.MetricBucket;
import com.alibaba.csp.sentinel.slots.statistic.metric.occupy.OccupiableBucketLeapArray;
import com.alibaba.csp.sentinel.util.TimeUtil;

import org.junit.Test;

import static org.junit.Assert.assertEquals;

/**
* Test cases for {@link OccupiableBucketLeapArray}.
*
* @author jialiang.linjl
*/
public class OccupiableBucketLeapArrayTest {

private final int windowLengthInMs = 200;
private final int intervalInSec = 2;
private final int intervalInMs = intervalInSec * 1000;
private final int sampleCount = intervalInMs / windowLengthInMs;

@Test
public void testNewWindow() {
long currentTime = TimeUtil.currentTimeMillis();
OccupiableBucketLeapArray leapArray = new OccupiableBucketLeapArray(sampleCount, intervalInMs);

WindowWrap<MetricBucket> currentWindow = leapArray.currentWindow(currentTime);
currentWindow.value().addPass(1);
assertEquals(currentWindow.value().pass(), 1L);

leapArray.addWaiting(currentTime + windowLengthInMs, 1);
assertEquals(leapArray.currentWaiting(), 1);
assertEquals(currentWindow.value().pass(), 1L);

}

@Test
public void testWindowInOneInterval() {
OccupiableBucketLeapArray leapArray = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
long currentTime = TimeUtil.currentTimeMillis();

WindowWrap<MetricBucket> currentWindow = leapArray.currentWindow(currentTime);
currentWindow.value().addPass(1);
assertEquals(currentWindow.value().pass(), 1L);

leapArray.addWaiting(currentTime + windowLengthInMs, 2);
assertEquals(leapArray.currentWaiting(), 2);
assertEquals(currentWindow.value().pass(), 1L);

leapArray.currentWindow(currentTime + windowLengthInMs);
List<MetricBucket> values = leapArray.values(currentTime + windowLengthInMs);
assertEquals(values.size(), 2);

long sum = 0;
for (MetricBucket bucket : values) {
sum += bucket.pass();
}
assertEquals(sum, 3);
}

@Test
public void testMultiThreadUpdateEmptyWindow() throws Exception {
final long time = TimeUtil.currentTimeMillis();
final int nThreads = 16;
final OccupiableBucketLeapArray leapArray = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
final CountDownLatch latch = new CountDownLatch(nThreads);
Runnable task = new Runnable() {
@Override
public void run() {
leapArray.currentWindow(time).value().addPass(1);
leapArray.addWaiting(time + windowLengthInMs, 1);
latch.countDown();
}
};

for (int i = 0; i < nThreads; i++) {
new Thread(task).start();
}

latch.await();

assertEquals(nThreads, leapArray.currentWindow(time).value().pass());
assertEquals(nThreads, leapArray.currentWaiting());

leapArray.currentWindow(time + windowLengthInMs);
long sum = 0;
List<MetricBucket> values = leapArray.values(time + windowLengthInMs);
for (MetricBucket bucket : values) {
sum += bucket.pass();
}
assertEquals(values.size(), 2);
assertEquals(sum, nThreads * 2);
}

@Test
public void testWindowAfterOneInterval() {
OccupiableBucketLeapArray leapArray = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
long currentTime = TimeUtil.currentTimeMillis();

System.out.println(currentTime);
for (int i = 0; i < intervalInSec * 1000 / windowLengthInMs; i++) {
WindowWrap<MetricBucket> currentWindow = leapArray.currentWindow(currentTime + i * windowLengthInMs);
currentWindow.value().addPass(1);
leapArray.addWaiting(currentTime + (i + 1) * windowLengthInMs, 1);
System.out.println(currentTime + i * windowLengthInMs);
leapArray.debug(currentTime + i * windowLengthInMs);
}

System.out.println(currentTime + intervalInSec * 1000);
List<MetricBucket> values = leapArray
.values(currentTime - currentTime % windowLengthInMs + intervalInSec * 1000);
leapArray.debug(currentTime + intervalInSec * 1000);
assertEquals(values.size(), intervalInSec * 1000 / windowLengthInMs);

long sum = 0;
for (MetricBucket bucket : values) {
sum += bucket.pass();
}
assertEquals(sum, 2 * intervalInSec * 1000 / windowLengthInMs - 1);
assertEquals(leapArray.currentWaiting(), 10);
}
}

+ 1
- 1
sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/statistic/metric/HotParameterLeapArray.java ファイルの表示

@@ -42,7 +42,7 @@ public class HotParameterLeapArray extends LeapArray<ParamMapBucket> {
}

@Override
public ParamMapBucket newEmptyBucket() {
public ParamMapBucket newEmptyBucket(long timeMillis) {
return new ParamMapBucket();
}



読み込み中…
キャンセル
保存