Add Sentinel MetricExtension, which provides extension to Sentinel internal statistics. This extension provides callbacks when a request passes rule checking, blocked by flow control, successfully end or exception occurred. Accompanied by these events, response time and current thread number will be recorded too. Signed-off-by: Carpenter Lee <hooleeucas@163.com>master
@@ -17,9 +17,11 @@ package com.alibaba.csp.sentinel; | |||||
import com.alibaba.csp.sentinel.context.Context; | import com.alibaba.csp.sentinel.context.Context; | ||||
import com.alibaba.csp.sentinel.context.ContextUtil; | import com.alibaba.csp.sentinel.context.ContextUtil; | ||||
import com.alibaba.csp.sentinel.metric.extension.MetricExtensionProvider; | |||||
import com.alibaba.csp.sentinel.node.ClusterNode; | import com.alibaba.csp.sentinel.node.ClusterNode; | ||||
import com.alibaba.csp.sentinel.node.DefaultNode; | import com.alibaba.csp.sentinel.node.DefaultNode; | ||||
import com.alibaba.csp.sentinel.slots.block.BlockException; | import com.alibaba.csp.sentinel.slots.block.BlockException; | ||||
import com.alibaba.csp.sentinel.metric.extension.MetricExtension; | |||||
/** | /** | ||||
* This class is used to record other exceptions except block exception. | * This class is used to record other exceptions except block exception. | ||||
@@ -55,7 +57,7 @@ public final class Tracer { | |||||
} | } | ||||
DefaultNode curNode = (DefaultNode)context.getCurNode(); | DefaultNode curNode = (DefaultNode)context.getCurNode(); | ||||
traceExceptionToNode(e, count, curNode); | |||||
traceExceptionToNode(e, count, context.getCurEntry(), curNode); | |||||
} | } | ||||
/** | /** | ||||
@@ -74,7 +76,7 @@ public final class Tracer { | |||||
} | } | ||||
DefaultNode curNode = (DefaultNode)context.getCurNode(); | DefaultNode curNode = (DefaultNode)context.getCurNode(); | ||||
traceExceptionToNode(e, count, curNode); | |||||
traceExceptionToNode(e, count, context.getCurEntry(), curNode); | |||||
} | } | ||||
/** | /** | ||||
@@ -103,13 +105,16 @@ public final class Tracer { | |||||
} | } | ||||
DefaultNode curNode = (DefaultNode)entry.getCurNode(); | DefaultNode curNode = (DefaultNode)entry.getCurNode(); | ||||
traceExceptionToNode(e, count, curNode); | |||||
traceExceptionToNode(e, count, entry, curNode); | |||||
} | } | ||||
private static void traceExceptionToNode(Throwable t, int count, DefaultNode curNode) { | |||||
private static void traceExceptionToNode(Throwable t, int count, Entry entry, DefaultNode curNode) { | |||||
if (curNode == null) { | if (curNode == null) { | ||||
return; | return; | ||||
} | } | ||||
for (MetricExtension m : MetricExtensionProvider.getMetricExtensions()) { | |||||
m.addException(entry.getResourceWrapper().getName(), count, t); | |||||
} | |||||
// clusterNode can be null when Constants.ON is false. | // clusterNode can be null when Constants.ON is false. | ||||
ClusterNode clusterNode = curNode.getClusterNode(); | ClusterNode clusterNode = curNode.getClusterNode(); | ||||
@@ -0,0 +1,22 @@ | |||||
package com.alibaba.csp.sentinel.metric.extension; | |||||
import com.alibaba.csp.sentinel.init.InitFunc; | |||||
import com.alibaba.csp.sentinel.metric.extension.callback.MetricEntryCallback; | |||||
import com.alibaba.csp.sentinel.metric.extension.callback.MetricExitCallback; | |||||
import com.alibaba.csp.sentinel.slots.statistic.StatisticSlotCallbackRegistry; | |||||
/** | |||||
* Register callbacks for metric extension. | |||||
* | |||||
* @author Carpenter Lee | |||||
* @since 1.6.1 | |||||
*/ | |||||
public class MetricCallbackInit implements InitFunc { | |||||
@Override | |||||
public void init() throws Exception { | |||||
StatisticSlotCallbackRegistry.addEntryCallback(MetricEntryCallback.class.getCanonicalName(), | |||||
new MetricEntryCallback()); | |||||
StatisticSlotCallbackRegistry.addExitCallback(MetricExitCallback.class.getCanonicalName(), | |||||
new MetricExitCallback()); | |||||
} | |||||
} |
@@ -0,0 +1,86 @@ | |||||
package com.alibaba.csp.sentinel.metric.extension; | |||||
import com.alibaba.csp.sentinel.slots.block.BlockException; | |||||
/** | |||||
* This interface provides extension to Sentinel internal statistics. | |||||
* <p> | |||||
* Please note that all method in this class will invoke in the same thread of biz logic. | |||||
* It's necessary to not do time-consuming operation in any of the interface's method, | |||||
* otherwise biz logic will be blocked. | |||||
* </p> | |||||
* | |||||
* @author Carpenter Lee | |||||
* @since 1.6.1 | |||||
*/ | |||||
public interface MetricExtension { | |||||
/** | |||||
* Add current pass count of the resource name. | |||||
* | |||||
* @param n count to add | |||||
* @param resource resource name | |||||
* @param args additional arguments of the resource, eg. if the resource is a method name, | |||||
* the args will be the parameters of the method. | |||||
*/ | |||||
void addPass(String resource, int n, Object... args); | |||||
/** | |||||
* Add current block count of the resource name. | |||||
* | |||||
* @param n count to add | |||||
* @param resource resource name | |||||
* @param origin the original invoker. | |||||
* @param blockException block exception related. | |||||
* @param args additional arguments of the resource, eg. if the resource is a method name, | |||||
* the args will be the parameters of the method. | |||||
*/ | |||||
void addBlock(String resource, int n, String origin, BlockException blockException, Object... args); | |||||
/** | |||||
* Add current completed count of the resource name. | |||||
* | |||||
* @param n count to add | |||||
* @param resource resource name | |||||
* @param args additional arguments of the resource, eg. if the resource is a method name, | |||||
* the args will be the parameters of the method. | |||||
*/ | |||||
void addSuccess(String resource, int n, Object... args); | |||||
/** | |||||
* Add current exception count of the resource name. | |||||
* | |||||
* @param n count to add | |||||
* @param resource resource name | |||||
* @param throwable exception related. | |||||
*/ | |||||
void addException(String resource, int n, Throwable throwable); | |||||
/** | |||||
* Add response time of the resource name. | |||||
* | |||||
* @param rt response time in millisecond | |||||
* @param resource resource name | |||||
* @param args additional arguments of the resource, eg. if the resource is a method name, | |||||
* the args will be the parameters of the method. | |||||
*/ | |||||
void addRt(String resource, long rt, Object... args); | |||||
/** | |||||
* Increase current thread count of the resource name. | |||||
* | |||||
* @param resource resource name | |||||
* @param args additional arguments of the resource, eg. if the resource is a method name, | |||||
* the args will be the parameters of the method. | |||||
*/ | |||||
void increaseThreadNum(String resource, Object... args); | |||||
/** | |||||
* Decrease current thread count of the resource name. | |||||
* | |||||
* @param resource resource name | |||||
* @param args additional arguments of the resource, eg. if the resource is a method name, | |||||
* the args will be the parameters of the method. | |||||
*/ | |||||
void decreaseThreadNum(String resource, Object... args); | |||||
} |
@@ -0,0 +1,54 @@ | |||||
package com.alibaba.csp.sentinel.metric.extension; | |||||
import java.util.ArrayList; | |||||
import java.util.List; | |||||
import com.alibaba.csp.sentinel.log.RecordLog; | |||||
import com.alibaba.csp.sentinel.util.SpiLoader; | |||||
/** | |||||
* Get all {@link MetricExtension}s via SPI. | |||||
* | |||||
* @author Carpenter Lee | |||||
* @since 1.6.1 | |||||
*/ | |||||
public class MetricExtensionProvider { | |||||
private static List<MetricExtension> metricExtensions = new ArrayList<>(); | |||||
static { | |||||
resolveInstance(); | |||||
} | |||||
private static void resolveInstance() { | |||||
List<MetricExtension> extensions = SpiLoader.loadInstanceList(MetricExtension.class); | |||||
if (extensions == null) { | |||||
RecordLog.warn("[MetricExtensionProvider] WARN: No existing MetricExtension found"); | |||||
} else { | |||||
metricExtensions.addAll(extensions); | |||||
RecordLog.info("[MetricExtensionProvider] MetricExtension resolved, size=" + extensions.size()); | |||||
} | |||||
} | |||||
/** | |||||
* Get all metric extensions. DO NOT MODIFY the returned list, use {@link #addMetricExtension(MetricExtension)}. | |||||
* | |||||
* @return all metric extensions. | |||||
*/ | |||||
public static List<MetricExtension> getMetricExtensions() { | |||||
return metricExtensions; | |||||
} | |||||
/** | |||||
* Add metric extension. | |||||
* <p> | |||||
* Not that this method is NOT thread safe. | |||||
* </p> | |||||
* | |||||
* @param metricExtension the metric extension to add. | |||||
*/ | |||||
public static void addMetricExtension(MetricExtension metricExtension) { | |||||
metricExtensions.add(metricExtension); | |||||
} | |||||
} |
@@ -0,0 +1,34 @@ | |||||
package com.alibaba.csp.sentinel.metric.extension.callback; | |||||
import com.alibaba.csp.sentinel.context.Context; | |||||
import com.alibaba.csp.sentinel.metric.extension.MetricExtensionProvider; | |||||
import com.alibaba.csp.sentinel.metric.extension.MetricExtension; | |||||
import com.alibaba.csp.sentinel.node.DefaultNode; | |||||
import com.alibaba.csp.sentinel.slotchain.ProcessorSlotEntryCallback; | |||||
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; | |||||
import com.alibaba.csp.sentinel.slots.block.BlockException; | |||||
/** | |||||
* Metric extension entry callback. | |||||
* | |||||
* @author Carpenter Lee | |||||
* @since 1.6.1 | |||||
*/ | |||||
public class MetricEntryCallback implements ProcessorSlotEntryCallback<DefaultNode> { | |||||
@Override | |||||
public void onPass(Context context, ResourceWrapper resourceWrapper, DefaultNode param, | |||||
int count, Object... args) throws Exception { | |||||
for (MetricExtension m : MetricExtensionProvider.getMetricExtensions()) { | |||||
m.increaseThreadNum(resourceWrapper.getName(), args); | |||||
m.addPass(resourceWrapper.getName(), count, args); | |||||
} | |||||
} | |||||
@Override | |||||
public void onBlocked(BlockException ex, Context context, ResourceWrapper resourceWrapper, | |||||
DefaultNode param, int count, Object... args) { | |||||
for (MetricExtension m : MetricExtensionProvider.getMetricExtensions()) { | |||||
m.addBlock(resourceWrapper.getName(), count, context.getOrigin(), ex, args); | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,28 @@ | |||||
package com.alibaba.csp.sentinel.metric.extension.callback; | |||||
import com.alibaba.csp.sentinel.context.Context; | |||||
import com.alibaba.csp.sentinel.metric.extension.MetricExtensionProvider; | |||||
import com.alibaba.csp.sentinel.metric.extension.MetricExtension; | |||||
import com.alibaba.csp.sentinel.slotchain.ProcessorSlotExitCallback; | |||||
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; | |||||
import com.alibaba.csp.sentinel.util.TimeUtil; | |||||
/** | |||||
* Metric extension exit callback. | |||||
* | |||||
* @author Carpenter Lee | |||||
* @since 1.6.1 | |||||
*/ | |||||
public class MetricExitCallback implements ProcessorSlotExitCallback { | |||||
@Override | |||||
public void onExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { | |||||
for (MetricExtension m : MetricExtensionProvider.getMetricExtensions()) { | |||||
if (context.getCurEntry().getError() == null) { | |||||
long realRt = TimeUtil.currentTimeMillis() - context.getCurEntry().getCreateTime(); | |||||
m.addRt(resourceWrapper.getName(), realRt, args); | |||||
m.addSuccess(resourceWrapper.getName(), count, args); | |||||
m.decreaseThreadNum(resourceWrapper.getName(), args); | |||||
} | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1 @@ | |||||
com.alibaba.csp.sentinel.metric.extension.MetricCallbackInit |
@@ -0,0 +1,51 @@ | |||||
package com.alibaba.csp.sentinel.metric.extension.callback; | |||||
import com.alibaba.csp.sentinel.metric.extension.MetricExtension; | |||||
import com.alibaba.csp.sentinel.slots.block.BlockException; | |||||
/** | |||||
* @author Carpenter Lee | |||||
*/ | |||||
class FakeMetricExtension implements MetricExtension { | |||||
long pass = 0; | |||||
long block = 0; | |||||
long success = 0; | |||||
long exception = 0; | |||||
long rt = 0; | |||||
long thread = 0; | |||||
@Override | |||||
public void addPass(String resource, int n, Object... args) { | |||||
pass += n; | |||||
} | |||||
@Override | |||||
public void addBlock(String resource, int n, String origin, BlockException ex, Object... args) { | |||||
block += n; | |||||
} | |||||
@Override | |||||
public void addSuccess(String resource, int n, Object... args) { | |||||
success += n; | |||||
} | |||||
@Override | |||||
public void addException(String resource, int n, Throwable t) { | |||||
exception += n; | |||||
} | |||||
@Override | |||||
public void addRt(String resource, long rt, Object... args) { | |||||
this.rt += rt; | |||||
} | |||||
@Override | |||||
public void increaseThreadNum(String resource, Object... args) { | |||||
thread++; | |||||
} | |||||
@Override | |||||
public void decreaseThreadNum(String resource, Object... args) { | |||||
thread--; | |||||
} | |||||
} |
@@ -0,0 +1,48 @@ | |||||
package com.alibaba.csp.sentinel.metric.extension.callback; | |||||
import com.alibaba.csp.sentinel.EntryType; | |||||
import com.alibaba.csp.sentinel.context.Context; | |||||
import com.alibaba.csp.sentinel.metric.extension.MetricExtensionProvider; | |||||
import com.alibaba.csp.sentinel.slotchain.StringResourceWrapper; | |||||
import com.alibaba.csp.sentinel.slots.block.flow.FlowException; | |||||
import org.junit.Assert; | |||||
import org.junit.Test; | |||||
import static org.mockito.Mockito.mock; | |||||
import static org.mockito.Mockito.when; | |||||
/** | |||||
* @author Carpenter Lee | |||||
*/ | |||||
public class MetricEntryCallbackTest { | |||||
@Test | |||||
public void onPass() throws Exception { | |||||
FakeMetricExtension extension = new FakeMetricExtension(); | |||||
MetricExtensionProvider.addMetricExtension(extension); | |||||
MetricEntryCallback entryCallback = new MetricEntryCallback(); | |||||
StringResourceWrapper resourceWrapper = new StringResourceWrapper("resource", EntryType.OUT); | |||||
int count = 2; | |||||
Object[] args = {"args1", "args2"}; | |||||
entryCallback.onPass(null, resourceWrapper, null, count, args); | |||||
Assert.assertEquals(extension.pass, count); | |||||
Assert.assertEquals(extension.thread, 1); | |||||
} | |||||
@Test | |||||
public void onBlocked() throws Exception { | |||||
FakeMetricExtension extension = new FakeMetricExtension(); | |||||
MetricExtensionProvider.addMetricExtension(extension); | |||||
MetricEntryCallback entryCallback = new MetricEntryCallback(); | |||||
StringResourceWrapper resourceWrapper = new StringResourceWrapper("resource", EntryType.OUT); | |||||
Context context = mock(Context.class); | |||||
when(context.getOrigin()).thenReturn("origin1"); | |||||
int count = 2; | |||||
Object[] args = {"args1", "args2"}; | |||||
entryCallback.onBlocked(new FlowException("xx"), context, resourceWrapper, null, count, args); | |||||
Assert.assertEquals(extension.block, count); | |||||
} | |||||
} |
@@ -0,0 +1,43 @@ | |||||
package com.alibaba.csp.sentinel.metric.extension.callback; | |||||
import com.alibaba.csp.sentinel.Entry; | |||||
import com.alibaba.csp.sentinel.EntryType; | |||||
import com.alibaba.csp.sentinel.context.Context; | |||||
import com.alibaba.csp.sentinel.metric.extension.MetricExtensionProvider; | |||||
import com.alibaba.csp.sentinel.slotchain.StringResourceWrapper; | |||||
import com.alibaba.csp.sentinel.util.TimeUtil; | |||||
import org.junit.Assert; | |||||
import org.junit.Test; | |||||
import static org.mockito.Mockito.mock; | |||||
import static org.mockito.Mockito.when; | |||||
/** | |||||
* @author Carpenter Lee | |||||
*/ | |||||
public class MetricExitCallbackTest { | |||||
@Test | |||||
public void onExit() { | |||||
FakeMetricExtension extension = new FakeMetricExtension(); | |||||
MetricExtensionProvider.addMetricExtension(extension); | |||||
MetricExitCallback exitCallback = new MetricExitCallback(); | |||||
StringResourceWrapper resourceWrapper = new StringResourceWrapper("resource", EntryType.OUT); | |||||
int count = 2; | |||||
Object[] args = {"args1", "args2"}; | |||||
extension.rt = 20; | |||||
extension.success = 6; | |||||
extension.thread = 10; | |||||
Context context = mock(Context.class); | |||||
Entry entry = mock(Entry.class); | |||||
when(entry.getError()).thenReturn(null); | |||||
when(entry.getCreateTime()).thenReturn(TimeUtil.currentTimeMillis() - 100); | |||||
when(context.getCurEntry()).thenReturn(entry); | |||||
exitCallback.onExit(context, resourceWrapper, count, args); | |||||
Assert.assertEquals(120, extension.rt, 10); | |||||
Assert.assertEquals(extension.success, 6 + count); | |||||
Assert.assertEquals(extension.thread, 10 - 1); | |||||
} | |||||
} |