@@ -16,6 +16,7 @@ | |||
package com.alibaba.csp.sentinel.node; | |||
import java.util.HashMap; | |||
import java.util.Map; | |||
import java.util.concurrent.locks.ReentrantLock; | |||
import com.alibaba.csp.sentinel.context.ContextUtil; | |||
@@ -31,7 +32,7 @@ import com.alibaba.csp.sentinel.slots.block.BlockException; | |||
* To distinguish invocation from different origin (declared in | |||
* {@link ContextUtil#enter(String name, String origin)}), | |||
* one {@link ClusterNode} holds an {@link #originCountMap}, this map holds {@link StatisticNode} | |||
* of different origin. Use {@link #getOriginNode(String)} to get {@link Node} of the specific | |||
* of different origin. Use {@link #getOrCreateOriginNode(String)} to get {@link Node} of the specific | |||
* origin.<br/> | |||
* Note that 'origin' usually is Service Consumer's app name. | |||
* </p> | |||
@@ -42,30 +43,31 @@ import com.alibaba.csp.sentinel.slots.block.BlockException; | |||
public class ClusterNode extends StatisticNode { | |||
/** | |||
* <p> | |||
* the longer the application runs, the more stable this mapping will | |||
* The longer the application runs, the more stable this mapping will | |||
* become. so we don't concurrent map but a lock. as this lock only happens | |||
* at the very beginning while concurrent map will hold the lock all the | |||
* time | |||
* </p> | |||
* at the very beginning while concurrent map will hold the lock all the time. | |||
*/ | |||
private HashMap<String, StatisticNode> originCountMap = new HashMap<String, StatisticNode>(); | |||
private ReentrantLock lock = new ReentrantLock(); | |||
private Map<String, StatisticNode> originCountMap = new HashMap<String, StatisticNode>(); | |||
private final ReentrantLock lock = new ReentrantLock(); | |||
/** | |||
* Get {@link Node} of the specific origin. Usually the origin is the Service Consumer's app name. | |||
* <p>Get {@link Node} of the specific origin. Usually the origin is the Service Consumer's app name.</p> | |||
* <p>If the origin node for given origin is absent, then a new {@link StatisticNode} | |||
* for the origin will be created and returned.</p> | |||
* | |||
* @param origin The caller's name. It is declared in the | |||
* @param origin The caller's name, which is designated in the {@code parameter} parameter | |||
* {@link ContextUtil#enter(String name, String origin)}. | |||
* @return the {@link Node} of the specific origin. | |||
* @return the {@link Node} of the specific origin | |||
*/ | |||
public Node getOriginNode(String origin) { | |||
public Node getOrCreateOriginNode(String origin) { | |||
StatisticNode statisticNode = originCountMap.get(origin); | |||
if (statisticNode == null) { | |||
try { | |||
lock.lock(); | |||
statisticNode = originCountMap.get(origin); | |||
if (statisticNode == null) { | |||
// The node is absent, create a new node for the origin. | |||
statisticNode = new StatisticNode(); | |||
HashMap<String, StatisticNode> newMap = new HashMap<String, StatisticNode>( | |||
originCountMap.size() + 1); | |||
@@ -80,15 +82,15 @@ public class ClusterNode extends StatisticNode { | |||
return statisticNode; | |||
} | |||
public synchronized HashMap<String, StatisticNode> getOriginCountMap() { | |||
public synchronized Map<String, StatisticNode> getOriginCountMap() { | |||
return originCountMap; | |||
} | |||
/** | |||
* Add exception count only when {@code throwable} is not {@link BlockException#isBlockException(Throwable)} | |||
* Add exception count only when given {@code throwable} is not a {@link BlockException}. | |||
* | |||
* @param throwable | |||
* @param count count to add. | |||
* @param throwable target exception | |||
* @param count count to add | |||
*/ | |||
public void trace(Throwable throwable, int count) { | |||
if (!BlockException.isBlockException(throwable)) { | |||
@@ -40,10 +40,19 @@ import com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot; | |||
*/ | |||
public class DefaultNode extends StatisticNode { | |||
/** | |||
* The resource associated with the node. | |||
*/ | |||
private ResourceWrapper id; | |||
private volatile HashSet<Node> childList = new HashSet<Node>(); | |||
/** | |||
* The list of all child nodes. | |||
*/ | |||
private volatile Set<Node> childList = new HashSet<Node>(); | |||
/** | |||
* Associated cluster node. | |||
*/ | |||
private ClusterNode clusterNode; | |||
public DefaultNode(ResourceWrapper id, ClusterNode clusterNode) { | |||
@@ -63,22 +72,32 @@ public class DefaultNode extends StatisticNode { | |||
this.clusterNode = clusterNode; | |||
} | |||
/** | |||
* Add child node to current node. | |||
* | |||
* @param node valid child node | |||
*/ | |||
public void addChild(Node node) { | |||
if (node == null) { | |||
RecordLog.warn("Trying to add null child to node <{0}>, ignored", id.getName()); | |||
return; | |||
} | |||
if (!childList.contains(node)) { | |||
synchronized (this) { | |||
if (!childList.contains(node)) { | |||
HashSet<Node> newSet = new HashSet<Node>(childList.size() + 1); | |||
Set<Node> newSet = new HashSet<Node>(childList.size() + 1); | |||
newSet.addAll(childList); | |||
newSet.add(node); | |||
childList = newSet; | |||
} | |||
} | |||
RecordLog.info(String.format("Add child %s to %s", ((DefaultNode)node).id.getName(), id.getName())); | |||
RecordLog.info("Add child <{0}> to node <{1}>", ((DefaultNode)node).id.getName(), id.getName()); | |||
} | |||
} | |||
/** | |||
* Reset the child node list. | |||
*/ | |||
public void removeChildList() { | |||
this.childList = new HashSet<Node>(); | |||
} | |||
@@ -18,6 +18,8 @@ package com.alibaba.csp.sentinel.node; | |||
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; | |||
/** | |||
* Default implementation of {@link NodeBuilder}. | |||
* | |||
* @author qinan.qn | |||
*/ | |||
public class DefaultNodeBuilder implements NodeBuilder { | |||
@@ -25,6 +25,7 @@ import com.alibaba.csp.sentinel.node.metric.MetricNode; | |||
* | |||
* @author qinan.qn | |||
* @author leyou | |||
* @author Eric Zhao | |||
*/ | |||
public interface Node { | |||
@@ -70,6 +71,11 @@ public interface Node { | |||
*/ | |||
long successQps(); | |||
/** | |||
* Get estimated max success QPS till now. | |||
* | |||
* @return max success QPS | |||
*/ | |||
long maxSuccessQps(); | |||
/** | |||
@@ -79,9 +85,16 @@ public interface Node { | |||
/** | |||
* Get average rt per second. | |||
* | |||
* @return average response time per second | |||
*/ | |||
long avgRt(); | |||
/** | |||
* Get minimal response time. | |||
* | |||
* @return recorded minimal response time | |||
*/ | |||
long minRt(); | |||
/** | |||
@@ -99,23 +112,43 @@ public interface Node { | |||
*/ | |||
long previousPassQps(); | |||
/** | |||
* Fetch all valid metric nodes of resources. | |||
* | |||
* @return valid metric nodes of resources | |||
*/ | |||
Map<Long, MetricNode> metrics(); | |||
/** | |||
* Add pass count. | |||
*/ | |||
void addPassRequest(); | |||
/** | |||
* Add rt and success count. | |||
* | |||
* @param rt | |||
* @param rt response time | |||
*/ | |||
void rt(long rt); | |||
/** | |||
* Increase the block count. | |||
*/ | |||
void increaseBlockQps(); | |||
/** | |||
* Increase the biz exception count. | |||
*/ | |||
void increaseExceptionQps(); | |||
/** | |||
* Increase current thread count. | |||
*/ | |||
void increaseThreadNum(); | |||
/** | |||
* Increase current thread count. | |||
*/ | |||
void decreaseThreadNum(); | |||
/** | |||
@@ -24,7 +24,19 @@ import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; | |||
*/ | |||
public interface NodeBuilder { | |||
/** | |||
* Create a new {@link DefaultNode} as tree node. | |||
* | |||
* @param id resource | |||
* @param clusterNode the cluster node of the provided resource | |||
* @return new created tree node | |||
*/ | |||
DefaultNode buildTreeNode(ResourceWrapper id, ClusterNode clusterNode); | |||
/** | |||
* Create a new {@link ClusterNode} as universal statistic node for a single resource. | |||
* | |||
* @return new created cluster node | |||
*/ | |||
ClusterNode buildClusterNode(); | |||
} |
@@ -106,25 +106,24 @@ public class StatisticNode implements Node { | |||
*/ | |||
private AtomicInteger curThreadNum = new AtomicInteger(0); | |||
/** | |||
* The last timestamp when metrics were fetched. | |||
*/ | |||
private long lastFetchTime = -1; | |||
@Override | |||
public Map<Long, MetricNode> metrics() { | |||
// The fetch operation is thread-safe under a single-thread scheduler pool. | |||
long currentTime = TimeUtil.currentTimeMillis(); | |||
currentTime = currentTime - currentTime % 1000; | |||
Map<Long, MetricNode> metrics = new ConcurrentHashMap<Long, MetricNode>(); | |||
List<MetricNode> nodesOfEverySecond = rollingCounterInMinute.details(); | |||
long newLastFetchTime = lastFetchTime; | |||
// Iterate metrics of all resources, filter valid metrics (not-empty and up-to-date). | |||
for (MetricNode node : nodesOfEverySecond) { | |||
if (node.getTimestamp() > lastFetchTime && node.getTimestamp() < currentTime) { | |||
if (node.getPassQps() != 0 | |||
|| node.getBlockQps() != 0 | |||
|| node.getSuccessQps() != 0 | |||
|| node.getExceptionQps() != 0 | |||
|| node.getRt() != 0) { | |||
metrics.put(node.getTimestamp(), node); | |||
newLastFetchTime = Math.max(newLastFetchTime, node.getTimestamp()); | |||
} | |||
if (isNodeInTime(node, currentTime) && isValidMetricNode(node)) { | |||
metrics.put(node.getTimestamp(), node); | |||
newLastFetchTime = Math.max(newLastFetchTime, node.getTimestamp()); | |||
} | |||
} | |||
lastFetchTime = newLastFetchTime; | |||
@@ -132,6 +131,15 @@ public class StatisticNode implements Node { | |||
return metrics; | |||
} | |||
private boolean isNodeInTime(MetricNode node, long currentTime) { | |||
return node.getTimestamp() > lastFetchTime && node.getTimestamp() < currentTime; | |||
} | |||
private boolean isValidMetricNode(MetricNode node) { | |||
return node.getPassQps() > 0 || node.getBlockQps() > 0 || node.getSuccessQps() > 0 | |||
|| node.getExceptionQps() > 0 || node.getRt() > 0; | |||
} | |||
@Override | |||
public void reset() { | |||
rollingCounterInSecond = new ArrayMetric(1000 / SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL); | |||
@@ -19,6 +19,12 @@ import java.text.DateFormat; | |||
import java.text.SimpleDateFormat; | |||
import java.util.Date; | |||
/** | |||
* Metrics data for a specific resource at given {@code timestamp}. | |||
* | |||
* @author jialiang.linjl | |||
* @author Carpenter Lee | |||
*/ | |||
public class MetricNode { | |||
private long timestamp; | |||
@@ -27,6 +27,9 @@ import com.alibaba.csp.sentinel.node.ClusterNode; | |||
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; | |||
import com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot; | |||
/** | |||
* @author jialiang.linjl | |||
*/ | |||
public class MetricTimerListener implements Runnable { | |||
private static final MetricWriter metricWriter = new MetricWriter(SentinelConfig.singleMetricFileSize(), | |||
@@ -57,11 +60,10 @@ public class MetricTimerListener implements Runnable { | |||
try { | |||
metricWriter.write(entry.getKey(), entry.getValue()); | |||
} catch (Exception e) { | |||
RecordLog.info("write metric error: ", e); | |||
RecordLog.warn("[MetricTimerListener] Write metric error", e); | |||
} | |||
} | |||
} | |||
} | |||
} |
@@ -22,12 +22,17 @@ import java.nio.charset.Charset; | |||
import java.util.ArrayList; | |||
import java.util.List; | |||
/** | |||
* Reads metrics data from log file. | |||
*/ | |||
class MetricsReader { | |||
/** | |||
* avoid OOM in any case | |||
* Avoid OOM in any cases. | |||
*/ | |||
private static final int maxLinesReturn = 100000; | |||
private Charset charset; | |||
private static final int MAX_LINES_RETURN = 100000; | |||
private final Charset charset; | |||
public MetricsReader(Charset charset) { | |||
this.charset = charset; | |||
@@ -58,7 +63,7 @@ class MetricsReader { | |||
} else { | |||
return false; | |||
} | |||
if (list.size() >= maxLinesReturn) { | |||
if (list.size() >= MAX_LINES_RETURN) { | |||
return false; | |||
} | |||
} | |||
@@ -28,7 +28,7 @@ import com.alibaba.csp.sentinel.slots.statistic.StatisticSlot; | |||
import com.alibaba.csp.sentinel.slots.system.SystemSlot; | |||
/** | |||
* Helper class to create {@link ProcessorSlotChain}. | |||
* Builder for a default {@link ProcessorSlotChain}. | |||
* | |||
* @author qinan.qn | |||
* @author leyou | |||
@@ -29,6 +29,7 @@ public class RateLimiterController implements TrafficShapingController { | |||
private final int maxQueueingTimeMs; | |||
private final double count; | |||
private final AtomicLong latestPassedTime = new AtomicLong(-1); | |||
public RateLimiterController(int timeOut, double count) { | |||
@@ -38,21 +39,19 @@ public class RateLimiterController implements TrafficShapingController { | |||
@Override | |||
public boolean canPass(Node node, int acquireCount) { | |||
// 按照斜率来计算计划中应该什么时候通过 | |||
long currentTime = TimeUtil.currentTimeMillis(); | |||
// Calculate the interval between every two requests. | |||
long costTime = Math.round(1.0 * (acquireCount) / count * 1000); | |||
//期待时间 | |||
// Expected pass time of this request. | |||
long expectedTime = costTime + latestPassedTime.get(); | |||
if (expectedTime <= currentTime) { | |||
//这里会有冲突,然而冲突就冲突吧. | |||
// Contention may exist here, but it's okay. | |||
latestPassedTime.set(currentTime); | |||
return true; | |||
} else { | |||
// 计算自己需要的等待时间 | |||
// Calculate the time to wait. | |||
long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis(); | |||
if (waitTime >= maxQueueingTimeMs) { | |||
return false; | |||
@@ -70,7 +69,6 @@ public class RateLimiterController implements TrafficShapingController { | |||
} | |||
} | |||
} | |||
return false; | |||
} | |||
@@ -51,7 +51,7 @@ public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> | |||
* <p> | |||
* Remember that same resource({@link ResourceWrapper#equals(Object)}) will share | |||
* the same {@link ProcessorSlotChain} globally, no matter in witch context. So if | |||
* code goes into {@link #entry(Context, ResourceWrapper, DefaultNode, int, Object...)}, | |||
* code goes into {@link #entry(Context, ResourceWrapper, DefaultNode, int, boolean, Object...)}, | |||
* the resource name must be same but context name may not. | |||
* </p> | |||
* <p> | |||
@@ -62,8 +62,7 @@ public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> | |||
* <p> | |||
* The longer the application runs, the more stable this mapping will | |||
* become. so we don't concurrent map but a lock. as this lock only happens | |||
* at the very beginning while concurrent map will hold the lock all the | |||
* time | |||
* at the very beginning while concurrent map will hold the lock all the time. | |||
* </p> | |||
*/ | |||
private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap | |||
@@ -74,7 +73,8 @@ public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> | |||
private ClusterNode clusterNode = null; | |||
@Override | |||
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) | |||
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, | |||
boolean prioritized, Object... args) | |||
throws Throwable { | |||
if (clusterNode == null) { | |||
synchronized (lock) { | |||
@@ -96,7 +96,7 @@ public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> | |||
* the specific origin. | |||
*/ | |||
if (!"".equals(context.getOrigin())) { | |||
Node originNode = node.getClusterNode().getOriginNode(context.getOrigin()); | |||
Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin()); | |||
context.getCurEntry().setOriginNode(originNode); | |||
} | |||
@@ -45,6 +45,9 @@ public abstract class LeapArray<T> { | |||
protected final AtomicReferenceArray<WindowWrap<T>> array; | |||
/** | |||
* The fine-grained update lock is used only when current bucket is deprecated. | |||
*/ | |||
private final ReentrantLock updateLock = new ReentrantLock(); | |||
/** | |||
@@ -15,6 +15,8 @@ | |||
*/ | |||
package com.alibaba.csp.sentinel.slots.clusterbuilder; | |||
import static org.junit.Assert.assertNotSame; | |||
import static org.junit.Assert.assertSame; | |||
import static org.junit.Assert.assertTrue; | |||
import org.junit.Test; | |||
@@ -28,7 +30,7 @@ import com.alibaba.csp.sentinel.node.Node; | |||
/** | |||
* @author jialiang.linjl | |||
*/ | |||
public class ClusterNodeBuilder { | |||
public class ClusterNodeBuilderTest { | |||
@Test | |||
public void clusterNodeBuilder_normal() throws Exception { | |||
@@ -37,10 +39,10 @@ public class ClusterNodeBuilder { | |||
Entry nodeA = SphU.entry("nodeA"); | |||
Node curNode = nodeA.getCurNode(); | |||
assertTrue(curNode.getClass() == DefaultNode.class); | |||
assertSame(curNode.getClass(), DefaultNode.class); | |||
DefaultNode dN = (DefaultNode)curNode; | |||
assertTrue(dN.getClusterNode().getOriginCountMap().containsKey("caller1")); | |||
assertTrue(nodeA.getOriginNode() == dN.getClusterNode().getOriginNode("caller1")); | |||
assertSame(nodeA.getOriginNode(), dN.getClusterNode().getOrCreateOriginNode("caller1")); | |||
if (nodeA != null) { | |||
nodeA.exit(); | |||
@@ -52,10 +54,10 @@ public class ClusterNodeBuilder { | |||
nodeA = SphU.entry("nodeA"); | |||
curNode = nodeA.getCurNode(); | |||
assertTrue(curNode.getClass() == DefaultNode.class); | |||
assertSame(curNode.getClass(), DefaultNode.class); | |||
DefaultNode dN1 = (DefaultNode)curNode; | |||
assertTrue(dN1.getClusterNode().getOriginCountMap().containsKey("caller2")); | |||
assertTrue(dN1 != dN); | |||
assertNotSame(dN1, dN); | |||
if (nodeA != null) { | |||
nodeA.exit(); |