|
|
@@ -25,12 +25,12 @@ import com.alibaba.csp.sentinel.util.TimeUtil; |
|
|
|
|
|
|
|
/** |
|
|
|
* <p> |
|
|
|
* Basic data structure for statistic metrics. |
|
|
|
* Basic data structure for statistic metrics in Sentinel. |
|
|
|
* </p> |
|
|
|
* <p> |
|
|
|
* Using sliding window algorithm to count data. Each bucket cover {@link #windowLengthInMs} time span, |
|
|
|
* and the total time span is {@link #intervalInMs}, so the total bucket count is: |
|
|
|
* {@link #sampleCount} = intervalInMs / windowLengthInMs. |
|
|
|
* Leap array use sliding window algorithm to count data. Each bucket cover {code windowLengthInMs} time span, |
|
|
|
* and the total time span is {@link #intervalInMs}, so the total bucket amount is: |
|
|
|
* {@code sampleCount = intervalInMs / windowLengthInMs}. |
|
|
|
* </p> |
|
|
|
* |
|
|
|
* @param <T> type of statistic data |
|
|
@@ -47,7 +47,7 @@ public abstract class LeapArray<T> { |
|
|
|
protected final AtomicReferenceArray<WindowWrap<T>> array; |
|
|
|
|
|
|
|
/** |
|
|
|
* The fine-grained update lock is used only when current bucket is deprecated. |
|
|
|
* The conditional (predicate) update lock is used only when current bucket is deprecated. |
|
|
|
*/ |
|
|
|
private final ReentrantLock updateLock = new ReentrantLock(); |
|
|
|
|
|
|
@@ -58,9 +58,10 @@ public abstract class LeapArray<T> { |
|
|
|
* @param intervalInSec the total time span of this {@link LeapArray} in seconds. |
|
|
|
*/ |
|
|
|
public LeapArray(int windowLengthInMs, int intervalInSec) { |
|
|
|
// TODO: change `intervalInSec` to `intervalInMs` |
|
|
|
AssertUtil.isTrue(windowLengthInMs > 0, "bucket length is invalid: " + windowLengthInMs); |
|
|
|
int intervalInMs = intervalInSec * 1000; |
|
|
|
AssertUtil.isTrue(intervalInSec * 1000 > windowLengthInMs, |
|
|
|
AssertUtil.isTrue(intervalInMs > windowLengthInMs, |
|
|
|
"total time span of the window should be greater than bucket length"); |
|
|
|
AssertUtil.isTrue(intervalInMs % windowLengthInMs == 0, "time span needs to be evenly divided"); |
|
|
|
|
|
|
@@ -90,28 +91,36 @@ public abstract class LeapArray<T> { |
|
|
|
/** |
|
|
|
* Reset given bucket to provided start time and reset the value. |
|
|
|
* |
|
|
|
* @param startTime the start time of the bucket |
|
|
|
* @param startTime the start time of the bucket in milliseconds |
|
|
|
* @param windowWrap current bucket |
|
|
|
* @return new clean bucket at given start time |
|
|
|
*/ |
|
|
|
protected abstract WindowWrap<T> resetWindowTo(WindowWrap<T> windowWrap, long startTime); |
|
|
|
|
|
|
|
protected int calculateTimeIdx(/*@Valid*/ long timeMillis) { |
|
|
|
long timeId = timeMillis / windowLengthInMs; |
|
|
|
// Calculate current index so we can map the timestamp to the leap array. |
|
|
|
return (int)(timeId % array.length()); |
|
|
|
} |
|
|
|
|
|
|
|
protected long calculateWindowStart(/*@Valid*/ long timeMillis) { |
|
|
|
return timeMillis - timeMillis % windowLengthInMs; |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* Get bucket item at provided timestamp. |
|
|
|
* |
|
|
|
* @param time a valid timestamp |
|
|
|
* @param timeMillis a valid timestamp in milliseconds |
|
|
|
* @return current bucket item at provided timestamp if the time is valid; null if time is invalid |
|
|
|
*/ |
|
|
|
public WindowWrap<T> currentWindow(long time) { |
|
|
|
if (time < 0) { |
|
|
|
public WindowWrap<T> currentWindow(long timeMillis) { |
|
|
|
if (timeMillis < 0) { |
|
|
|
return null; |
|
|
|
} |
|
|
|
long timeId = time / windowLengthInMs; |
|
|
|
// Calculate current index so we can map the timestamp to the leap array. |
|
|
|
int idx = (int)(timeId % array.length()); |
|
|
|
|
|
|
|
int idx = calculateTimeIdx(timeMillis); |
|
|
|
// Calculate current bucket start time. |
|
|
|
long windowStart = time - time % windowLengthInMs; |
|
|
|
long windowStart = calculateWindowStart(timeMillis); |
|
|
|
|
|
|
|
/* |
|
|
|
* Get bucket item at given time from the array. |
|
|
@@ -171,7 +180,7 @@ public abstract class LeapArray<T> { |
|
|
|
* Note that the reset and clean-up operations are hard to be atomic, |
|
|
|
* so we need a update lock to guarantee the correctness of bucket update. |
|
|
|
* |
|
|
|
* The update lock is fine-grained and will take effect only when |
|
|
|
* The update lock is conditional (tiny scope) and will take effect only when |
|
|
|
* bucket is deprecated, so in most cases it won't lead to performance loss. |
|
|
|
*/ |
|
|
|
if (updateLock.tryLock()) { |
|
|
@@ -195,23 +204,23 @@ public abstract class LeapArray<T> { |
|
|
|
/** |
|
|
|
* Get the previous bucket item before provided timestamp. |
|
|
|
* |
|
|
|
* @param time a valid timestamp |
|
|
|
* @param timeMillis a valid timestamp in milliseconds |
|
|
|
* @return the previous bucket item before provided timestamp |
|
|
|
*/ |
|
|
|
public WindowWrap<T> getPreviousWindow(long time) { |
|
|
|
if (time < 0) { |
|
|
|
public WindowWrap<T> getPreviousWindow(long timeMillis) { |
|
|
|
if (timeMillis < 0) { |
|
|
|
return null; |
|
|
|
} |
|
|
|
long timeId = (time - windowLengthInMs) / windowLengthInMs; |
|
|
|
int idx = (int)(timeId % array.length()); |
|
|
|
time = time - windowLengthInMs; |
|
|
|
int idx = calculateTimeIdx(timeMillis); |
|
|
|
|
|
|
|
long previousTime = timeMillis - windowLengthInMs; |
|
|
|
WindowWrap<T> wrap = array.get(idx); |
|
|
|
|
|
|
|
if (wrap == null || isWindowDeprecated(wrap)) { |
|
|
|
return null; |
|
|
|
} |
|
|
|
|
|
|
|
if (wrap.windowStart() + windowLengthInMs < (time)) { |
|
|
|
if (wrap.windowStart() + windowLengthInMs < previousTime) { |
|
|
|
return null; |
|
|
|
} |
|
|
|
|
|
|
@@ -230,15 +239,14 @@ public abstract class LeapArray<T> { |
|
|
|
/** |
|
|
|
* Get statistic value from bucket for provided timestamp. |
|
|
|
* |
|
|
|
* @param time a valid timestamp |
|
|
|
* @param time a valid timestamp in milliseconds |
|
|
|
* @return the statistic value if bucket for provided timestamp is up-to-date; otherwise null |
|
|
|
*/ |
|
|
|
public T getWindowValue(long time) { |
|
|
|
if (time < 0) { |
|
|
|
return null; |
|
|
|
} |
|
|
|
long timeId = time / windowLengthInMs; |
|
|
|
int idx = (int)(timeId % array.length()); |
|
|
|
int idx = calculateTimeIdx(time); |
|
|
|
|
|
|
|
WindowWrap<T> old = array.get(idx); |
|
|
|
if (old == null || isWindowDeprecated(old)) { |
|
|
@@ -255,7 +263,7 @@ public abstract class LeapArray<T> { |
|
|
|
* @param windowWrap a non-null bucket |
|
|
|
* @return true if the bucket is deprecated; otherwise false |
|
|
|
*/ |
|
|
|
private boolean isWindowDeprecated(/*@NonNull*/ WindowWrap<T> windowWrap) { |
|
|
|
protected boolean isWindowDeprecated(/*@NonNull*/ WindowWrap<T> windowWrap) { |
|
|
|
return TimeUtil.currentTimeMillis() - windowWrap.windowStart() >= intervalInMs; |
|
|
|
} |
|
|
|
|
|
|
@@ -266,9 +274,10 @@ public abstract class LeapArray<T> { |
|
|
|
* @return valid bucket list for entire sliding window. |
|
|
|
*/ |
|
|
|
public List<WindowWrap<T>> list() { |
|
|
|
List<WindowWrap<T>> result = new ArrayList<WindowWrap<T>>(); |
|
|
|
int size = array.length(); |
|
|
|
List<WindowWrap<T>> result = new ArrayList<WindowWrap<T>>(size); |
|
|
|
|
|
|
|
for (int i = 0; i < array.length(); i++) { |
|
|
|
for (int i = 0; i < size; i++) { |
|
|
|
WindowWrap<T> windowWrap = array.get(i); |
|
|
|
if (windowWrap == null || isWindowDeprecated(windowWrap)) { |
|
|
|
continue; |
|
|
@@ -286,9 +295,10 @@ public abstract class LeapArray<T> { |
|
|
|
* @return aggregated value list for entire sliding window |
|
|
|
*/ |
|
|
|
public List<T> values() { |
|
|
|
List<T> result = new ArrayList<T>(); |
|
|
|
int size = array.length(); |
|
|
|
List<T> result = new ArrayList<T>(size); |
|
|
|
|
|
|
|
for (int i = 0; i < array.length(); i++) { |
|
|
|
for (int i = 0; i < size; i++) { |
|
|
|
WindowWrap<T> windowWrap = array.get(i); |
|
|
|
if (windowWrap == null || isWindowDeprecated(windowWrap)) { |
|
|
|
continue; |
|
|
@@ -298,6 +308,34 @@ public abstract class LeapArray<T> { |
|
|
|
return result; |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* Get the valid "head" bucket of the sliding window for provided timestamp. |
|
|
|
* Package-private for test. |
|
|
|
* |
|
|
|
* @param timeMillis a valid timestamp in milliseconds |
|
|
|
* @return the "head" bucket if it exists and is valid; otherwise null |
|
|
|
*/ |
|
|
|
WindowWrap<T> getValidHead(long timeMillis) { |
|
|
|
// Calculate index for expected head time. |
|
|
|
int idx = calculateTimeIdx(timeMillis + windowLengthInMs); |
|
|
|
|
|
|
|
WindowWrap<T> wrap = array.get(idx); |
|
|
|
if (wrap == null || isWindowDeprecated(wrap)) { |
|
|
|
return null; |
|
|
|
} |
|
|
|
|
|
|
|
return wrap; |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* Get the valid "head" bucket of the sliding window at current timestamp. |
|
|
|
* |
|
|
|
* @return the "head" bucket if it exists and is valid; otherwise null |
|
|
|
*/ |
|
|
|
public WindowWrap<T> getValidHead() { |
|
|
|
return getValidHead(TimeUtil.currentTimeMillis()); |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* Get sample count (total amount of buckets). |
|
|
|
* |
|
|
|