2. Add more java doc for LeapArray.master
@@ -16,8 +16,8 @@ | |||
package com.alibaba.csp.sentinel.node; | |||
import com.alibaba.csp.sentinel.log.RecordLog; | |||
import com.alibaba.csp.sentinel.property.PropertyListener; | |||
import com.alibaba.csp.sentinel.property.SentinelProperty; | |||
import com.alibaba.csp.sentinel.property.SimplePropertyListener; | |||
import com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot; | |||
/*** | |||
@@ -25,36 +25,42 @@ import com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot; | |||
* | |||
* @author youji.zj | |||
* @author jialiang.linjl | |||
* @author CarpenterLee | |||
*/ | |||
public class IntervalProperty { | |||
/** | |||
* <p> | |||
* Interval in seconds. This variable determines sensitivity of the QPS calculation. | |||
* </p> | |||
* DO NOT MODIFY this value directly, use {@link #updateInterval(int)}, otherwise the modification will not | |||
* take effect. | |||
*/ | |||
public static volatile int INTERVAL = 1; | |||
public static void init(SentinelProperty<Integer> dataSource) { | |||
dataSource.addListener(new FlowIntervalPropertyListener()); | |||
} | |||
private static class FlowIntervalPropertyListener implements PropertyListener<Integer> { | |||
@Override | |||
public void configUpdate(Integer value) { | |||
if (value == null) { | |||
value = 1; | |||
public static void register2Property(SentinelProperty<Integer> property) { | |||
property.addListener(new SimplePropertyListener<Integer>() { | |||
@Override | |||
public void configUpdate(Integer value) { | |||
if (value != null) { | |||
updateInterval(value); | |||
} | |||
} | |||
INTERVAL = value; | |||
RecordLog.info("Init flow interval: " + INTERVAL); | |||
} | |||
}); | |||
} | |||
@Override | |||
public void configLoad(Integer value) { | |||
if (value == null) { | |||
value = 1; | |||
} | |||
INTERVAL = value; | |||
for (ClusterNode node : ClusterBuilderSlot.getClusterNodeMap().values()) { | |||
node.reset(); | |||
} | |||
RecordLog.info("Flow interval change received: " + INTERVAL); | |||
/** | |||
* Update the {@link #INTERVAL}, All {@link ClusterNode}s will be reset if newInterval is | |||
* different from {@link #INTERVAL} | |||
* | |||
* @param newInterval New interval to set. | |||
*/ | |||
public static void updateInterval(int newInterval) { | |||
if (newInterval != INTERVAL) { | |||
INTERVAL = newInterval; | |||
ClusterBuilderSlot.resetClusterNodes(); | |||
} | |||
RecordLog.info("INTERVAL updated to: " + INTERVAL); | |||
} | |||
} |
@@ -119,7 +119,8 @@ public interface Node { | |||
void decreaseThreadNum(); | |||
/** | |||
* Reset the internal counter. | |||
* Reset the internal counter. Reset is needed when {@link IntervalProperty#INTERVAL} or | |||
* {@link SampleCountProperty#SAMPLE_COUNT} is changed. | |||
*/ | |||
void reset(); | |||
@@ -21,31 +21,45 @@ import com.alibaba.csp.sentinel.property.SimplePropertyListener; | |||
import com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot; | |||
/** | |||
* Holds statistic buckets count per second. | |||
* | |||
* @author jialiang.linjl | |||
* @author CarpenterLee | |||
*/ | |||
public class SampleCountProperty { | |||
public static volatile int sampleCount = 2; | |||
public static void init(SentinelProperty<Integer> property) { | |||
/** | |||
* <p> | |||
* Statistic buckets count per second. This variable determines sensitivity of the QPS calculation. | |||
* DO NOT MODIFY this value directly, use {@link #updateSampleCount(int)}, otherwise the modification will not | |||
* take effect. | |||
* </p> | |||
* Node that this value must be divisor of 1000. | |||
*/ | |||
public static volatile int SAMPLE_COUNT = 2; | |||
try { | |||
property.addListener(new SimplePropertyListener<Integer>() { | |||
@Override | |||
public void configUpdate(Integer value) { | |||
if (value != null) { | |||
sampleCount = value; | |||
// Reset the value. | |||
for (ClusterNode node : ClusterBuilderSlot.getClusterNodeMap().values()) { | |||
node.reset(); | |||
} | |||
} | |||
RecordLog.info("Current SampleCount: " + sampleCount); | |||
public static void register2Property(SentinelProperty<Integer> property) { | |||
property.addListener(new SimplePropertyListener<Integer>() { | |||
@Override | |||
public void configUpdate(Integer value) { | |||
if (value != null) { | |||
updateSampleCount(value); | |||
} | |||
} | |||
}); | |||
} | |||
}); | |||
} catch (Exception e) { | |||
RecordLog.info(e.getMessage(), e); | |||
/** | |||
* Update the {@link #SAMPLE_COUNT}. All {@link ClusterNode}s will be reset if newSampleCount | |||
* is different from {@link #SAMPLE_COUNT}. | |||
* | |||
* @param newSampleCount New sample count to set. This value must be divisor of 1000. | |||
*/ | |||
public static void updateSampleCount(int newSampleCount) { | |||
if (newSampleCount != SAMPLE_COUNT) { | |||
SAMPLE_COUNT = newSampleCount; | |||
ClusterBuilderSlot.resetClusterNodes(); | |||
} | |||
RecordLog.info("SAMPLE_COUNT updated to: " + SAMPLE_COUNT); | |||
} | |||
} |
@@ -31,7 +31,7 @@ import com.alibaba.csp.sentinel.slots.statistic.metric.Metric; | |||
*/ | |||
public class StatisticNode implements Node { | |||
private transient Metric rollingCounterInSecond = new ArrayMetric(1000 / SampleCountProperty.sampleCount, | |||
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(1000 / SampleCountProperty.SAMPLE_COUNT, | |||
IntervalProperty.INTERVAL); | |||
/** | |||
@@ -70,7 +70,7 @@ public class StatisticNode implements Node { | |||
@Override | |||
public void reset() { | |||
rollingCounterInSecond = new ArrayMetric(1000 / SampleCountProperty.sampleCount, IntervalProperty.INTERVAL); | |||
rollingCounterInSecond = new ArrayMetric(1000 / SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL); | |||
} | |||
@Override | |||
@@ -131,7 +131,7 @@ public class StatisticNode implements Node { | |||
@Override | |||
public long maxSuccessQps() { | |||
return rollingCounterInSecond.maxSuccess() * SampleCountProperty.sampleCount; | |||
return rollingCounterInSecond.maxSuccess() * SampleCountProperty.SAMPLE_COUNT; | |||
} | |||
@Override | |||
@@ -36,7 +36,6 @@ public class MetricTimerListener implements Runnable { | |||
public void run() { | |||
Map<Long, List<MetricNode>> maps = new TreeMap<Long, List<MetricNode>>(); | |||
// 每5秒打印一次,把丢弃的seconds都给丢掉。 | |||
for (Entry<ResourceWrapper, ClusterNode> e : ClusterBuilderSlot.getClusterNodeMap().entrySet()) { | |||
String name = e.getKey().getName(); | |||
ClusterNode node = e.getValue(); | |||
@@ -24,7 +24,9 @@ import com.alibaba.csp.sentinel.context.Context; | |||
import com.alibaba.csp.sentinel.context.ContextUtil; | |||
import com.alibaba.csp.sentinel.node.ClusterNode; | |||
import com.alibaba.csp.sentinel.node.DefaultNode; | |||
import com.alibaba.csp.sentinel.node.IntervalProperty; | |||
import com.alibaba.csp.sentinel.node.Node; | |||
import com.alibaba.csp.sentinel.node.SampleCountProperty; | |||
import com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot; | |||
import com.alibaba.csp.sentinel.slotchain.ProcessorSlotChain; | |||
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; | |||
@@ -150,4 +152,13 @@ public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> | |||
return clusterNodeMap; | |||
} | |||
/** | |||
* Reset all {@link ClusterNode}s. Reset is needed when {@link IntervalProperty#INTERVAL} or | |||
* {@link SampleCountProperty#SAMPLE_COUNT} is changed. | |||
*/ | |||
public static void resetClusterNodes() { | |||
for (ClusterNode node : clusterNodeMap.values()) { | |||
node.reset(); | |||
} | |||
} | |||
} |
@@ -23,15 +23,23 @@ import java.util.concurrent.locks.ReentrantLock; | |||
import com.alibaba.csp.sentinel.util.TimeUtil; | |||
/** | |||
* <p> | |||
* Basic data structure for statistic metrics. | |||
* </p> | |||
* <p> | |||
* Using sliding window algorithm to count data. Each bucket cover {@link #windowLengthInMs} time span, | |||
* and the total time span is {@link #intervalInMs}, so the total bucket count is: | |||
* {@link #sampleCount} = intervalInMs / windowLengthInMs. | |||
* </p> | |||
* | |||
* @param <T> type of data wrapper | |||
* @param <T> type of data bucket. | |||
* @author jialiang.linjl | |||
* @author Eric Zhao | |||
* @author CarpenterLee | |||
*/ | |||
public abstract class LeapArray<T> { | |||
protected int windowLength; | |||
protected int windowLengthInMs; | |||
protected int sampleCount; | |||
protected int intervalInMs; | |||
@@ -39,10 +47,15 @@ public abstract class LeapArray<T> { | |||
private final ReentrantLock updateLock = new ReentrantLock(); | |||
public LeapArray(int windowLength, int intervalInSec) { | |||
this.windowLength = windowLength; | |||
/** | |||
* The total bucket count is: {@link #sampleCount} = intervalInSec * 1000 / windowLengthInMs. | |||
* @param windowLengthInMs a single window bucket's time length in milliseconds. | |||
* @param intervalInSec the total time span of this {@link LeapArray} in seconds. | |||
*/ | |||
public LeapArray(int windowLengthInMs, int intervalInSec) { | |||
this.windowLengthInMs = windowLengthInMs; | |||
this.intervalInMs = intervalInSec * 1000; | |||
this.sampleCount = intervalInMs / windowLength; | |||
this.sampleCount = intervalInMs / windowLengthInMs; | |||
this.array = new AtomicReferenceArray<WindowWrap<T>>(sampleCount); | |||
} | |||
@@ -79,17 +92,17 @@ public abstract class LeapArray<T> { | |||
* @return the window at provided timestamp | |||
*/ | |||
public WindowWrap<T> currentWindow(long time) { | |||
long timeId = time / windowLength; | |||
long timeId = time / windowLengthInMs; | |||
// Calculate current index. | |||
int idx = (int)(timeId % array.length()); | |||
// Cut the time to current window start. | |||
time = time - time % windowLength; | |||
time = time - time % windowLengthInMs; | |||
while (true) { | |||
WindowWrap<T> old = array.get(idx); | |||
if (old == null) { | |||
WindowWrap<T> window = new WindowWrap<T>(windowLength, time, newEmptyBucket()); | |||
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, time, newEmptyBucket()); | |||
if (array.compareAndSet(idx, null, window)) { | |||
return window; | |||
} else { | |||
@@ -111,22 +124,22 @@ public abstract class LeapArray<T> { | |||
} else if (time < old.windowStart()) { | |||
// Cannot go through here. | |||
return new WindowWrap<T>(windowLength, time, newEmptyBucket()); | |||
return new WindowWrap<T>(windowLengthInMs, time, newEmptyBucket()); | |||
} | |||
} | |||
} | |||
public WindowWrap<T> getPreviousWindow(long time) { | |||
long timeId = (time - windowLength) / windowLength; | |||
long timeId = (time - windowLengthInMs) / windowLengthInMs; | |||
int idx = (int)(timeId % array.length()); | |||
time = time - windowLength; | |||
time = time - windowLengthInMs; | |||
WindowWrap<T> wrap = array.get(idx); | |||
if (wrap == null || isWindowDeprecated(wrap)) { | |||
return null; | |||
} | |||
if (wrap.windowStart() + windowLength < (time)) { | |||
if (wrap.windowStart() + windowLengthInMs < (time)) { | |||
return null; | |||
} | |||
@@ -138,7 +151,7 @@ public abstract class LeapArray<T> { | |||
} | |||
public T getWindowValue(long time) { | |||
long timeId = time / windowLength; | |||
long timeId = time / windowLengthInMs; | |||
int idx = (int)(timeId % array.length()); | |||
WindowWrap<T> old = array.get(idx); | |||
@@ -25,9 +25,9 @@ package com.alibaba.csp.sentinel.slots.statistic.base; | |||
public class WindowWrap<T> { | |||
/** | |||
* The length of the window. | |||
* a single window bucket's time length in milliseconds. | |||
*/ | |||
private final long windowLength; | |||
private final long windowLengthInMs; | |||
/** | |||
* Start time of the window in milliseconds. | |||
@@ -40,18 +40,18 @@ public class WindowWrap<T> { | |||
private T value; | |||
/** | |||
* @param windowLength the time length of the window | |||
* @param windowLengthInMs a single window bucket's time length in milliseconds. | |||
* @param windowStart the start timestamp of the window | |||
* @param value window data | |||
*/ | |||
public WindowWrap(long windowLength, long windowStart, T value) { | |||
this.windowLength = windowLength; | |||
public WindowWrap(long windowLengthInMs, long windowStart, T value) { | |||
this.windowLengthInMs = windowLengthInMs; | |||
this.windowStart = windowStart; | |||
this.value = value; | |||
} | |||
public long windowLength() { | |||
return windowLength; | |||
return windowLengthInMs; | |||
} | |||
public long windowStart() { | |||
@@ -74,7 +74,7 @@ public class WindowWrap<T> { | |||
@Override | |||
public String toString() { | |||
return "WindowWrap{" + | |||
"windowLength=" + windowLength + | |||
"windowLengthInMs=" + windowLengthInMs + | |||
", windowStart=" + windowStart + | |||
", value=" + value + | |||
'}'; | |||
@@ -33,8 +33,14 @@ public class ArrayMetric implements Metric { | |||
private final MetricsLeapArray data; | |||
public ArrayMetric(int windowLength, int interval) { | |||
this.data = new MetricsLeapArray(windowLength, interval); | |||
/** | |||
* Constructor | |||
* | |||
* @param windowLengthInMs a single window bucket's time length in milliseconds. | |||
* @param intervalInSec the total time span of this {@link ArrayMetric} in seconds. | |||
*/ | |||
public ArrayMetric(int windowLengthInMs, int intervalInSec) { | |||
this.data = new MetricsLeapArray(windowLengthInMs, intervalInSec); | |||
} | |||
/** | |||
@@ -22,11 +22,18 @@ import com.alibaba.csp.sentinel.slots.statistic.base.WindowWrap; | |||
/** | |||
* The fundamental data structure for metric statistics in a time window. | |||
* | |||
* @see LeapArray | |||
* @author jialiang.linjl | |||
* @author Eric Zhao | |||
*/ | |||
public class MetricsLeapArray extends LeapArray<MetricBucket> { | |||
/** | |||
* Constructor | |||
* | |||
* @param windowLengthInMs a single window bucket's time length in milliseconds. | |||
* @param intervalInSec the total time span of this {@link MetricsLeapArray} in seconds. | |||
*/ | |||
public MetricsLeapArray(int windowLengthInMs, int intervalInSec) { | |||
super(windowLengthInMs, intervalInSec); | |||
} | |||