* Also polish related complete callbacks Signed-off-by: Eric Zhao <sczyh16@gmail.com>master
@@ -15,6 +15,7 @@ | |||||
*/ | */ | ||||
package com.alibaba.csp.sentinel; | package com.alibaba.csp.sentinel; | ||||
import com.alibaba.csp.sentinel.slots.block.BlockException; | |||||
import com.alibaba.csp.sentinel.util.TimeUtil; | import com.alibaba.csp.sentinel.util.TimeUtil; | ||||
import com.alibaba.csp.sentinel.context.ContextUtil; | import com.alibaba.csp.sentinel.context.ContextUtil; | ||||
import com.alibaba.csp.sentinel.node.Node; | import com.alibaba.csp.sentinel.node.Node; | ||||
@@ -44,6 +45,7 @@ import com.alibaba.csp.sentinel.context.Context; | |||||
* @author qinan.qn | * @author qinan.qn | ||||
* @author jialiang.linjl | * @author jialiang.linjl | ||||
* @author leyou(lihao) | * @author leyou(lihao) | ||||
* @author Eric Zhao | |||||
* @see SphU | * @see SphU | ||||
* @see Context | * @see Context | ||||
* @see ContextUtil | * @see ContextUtil | ||||
@@ -52,18 +54,23 @@ public abstract class Entry implements AutoCloseable { | |||||
private static final Object[] OBJECTS0 = new Object[0]; | private static final Object[] OBJECTS0 = new Object[0]; | ||||
private long createTime; | |||||
private final long createTimestamp; | |||||
private long completeTimestamp; | |||||
private Node curNode; | private Node curNode; | ||||
/** | /** | ||||
* {@link Node} of the specific origin, Usually the origin is the Service Consumer. | * {@link Node} of the specific origin, Usually the origin is the Service Consumer. | ||||
*/ | */ | ||||
private Node originNode; | private Node originNode; | ||||
private Throwable error; | private Throwable error; | ||||
protected ResourceWrapper resourceWrapper; | |||||
private BlockException blockError; | |||||
protected final ResourceWrapper resourceWrapper; | |||||
public Entry(ResourceWrapper resourceWrapper) { | public Entry(ResourceWrapper resourceWrapper) { | ||||
this.resourceWrapper = resourceWrapper; | this.resourceWrapper = resourceWrapper; | ||||
this.createTime = TimeUtil.currentTimeMillis(); | |||||
this.createTimestamp = TimeUtil.currentTimeMillis(); | |||||
} | } | ||||
public ResourceWrapper getResourceWrapper() { | public ResourceWrapper getResourceWrapper() { | ||||
@@ -119,8 +126,17 @@ public abstract class Entry implements AutoCloseable { | |||||
*/ | */ | ||||
public abstract Node getLastNode(); | public abstract Node getLastNode(); | ||||
public long getCreateTime() { | |||||
return createTime; | |||||
public long getCreateTimestamp() { | |||||
return createTimestamp; | |||||
} | |||||
public long getCompleteTimestamp() { | |||||
return completeTimestamp; | |||||
} | |||||
public Entry setCompleteTimestamp(long completeTimestamp) { | |||||
this.completeTimestamp = completeTimestamp; | |||||
return this; | |||||
} | } | ||||
public Node getCurNode() { | public Node getCurNode() { | ||||
@@ -131,6 +147,15 @@ public abstract class Entry implements AutoCloseable { | |||||
this.curNode = node; | this.curNode = node; | ||||
} | } | ||||
public BlockException getBlockError() { | |||||
return blockError; | |||||
} | |||||
public Entry setBlockError(BlockException blockError) { | |||||
this.blockError = blockError; | |||||
return this; | |||||
} | |||||
public Throwable getError() { | public Throwable getError() { | ||||
return error; | return error; | ||||
} | } | ||||
@@ -14,14 +14,22 @@ import com.alibaba.csp.sentinel.util.TimeUtil; | |||||
* @since 1.6.1 | * @since 1.6.1 | ||||
*/ | */ | ||||
public class MetricExitCallback implements ProcessorSlotExitCallback { | public class MetricExitCallback implements ProcessorSlotExitCallback { | ||||
@Override | @Override | ||||
public void onExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { | public void onExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { | ||||
for (MetricExtension m : MetricExtensionProvider.getMetricExtensions()) { | for (MetricExtension m : MetricExtensionProvider.getMetricExtensions()) { | ||||
if (context.getCurEntry().getError() == null) { | |||||
long realRt = TimeUtil.currentTimeMillis() - context.getCurEntry().getCreateTime(); | |||||
m.addRt(resourceWrapper.getName(), realRt, args); | |||||
m.addSuccess(resourceWrapper.getName(), count, args); | |||||
m.decreaseThreadNum(resourceWrapper.getName(), args); | |||||
if (context.getCurEntry().getBlockError() != null) { | |||||
continue; | |||||
} | |||||
String resource = resourceWrapper.getName(); | |||||
long realRt = TimeUtil.currentTimeMillis() - context.getCurEntry().getCreateTimestamp(); | |||||
m.addRt(resource, realRt, args); | |||||
m.addSuccess(resource, count, args); | |||||
m.decreaseThreadNum(resource, args); | |||||
Throwable ex = context.getCurEntry().getError(); | |||||
if (ex != null) { | |||||
m.addException(resource, count, ex); | |||||
} | } | ||||
} | } | ||||
} | } | ||||
@@ -17,7 +17,7 @@ package com.alibaba.csp.sentinel.slots.statistic; | |||||
import java.util.Collection; | import java.util.Collection; | ||||
import com.alibaba.csp.sentinel.config.SentinelConfig; | |||||
import com.alibaba.csp.sentinel.node.Node; | |||||
import com.alibaba.csp.sentinel.slotchain.ProcessorSlotEntryCallback; | import com.alibaba.csp.sentinel.slotchain.ProcessorSlotEntryCallback; | ||||
import com.alibaba.csp.sentinel.slotchain.ProcessorSlotExitCallback; | import com.alibaba.csp.sentinel.slotchain.ProcessorSlotExitCallback; | ||||
import com.alibaba.csp.sentinel.slots.block.flow.PriorityWaitException; | import com.alibaba.csp.sentinel.slots.block.flow.PriorityWaitException; | ||||
@@ -95,7 +95,7 @@ public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> { | |||||
} | } | ||||
} catch (BlockException e) { | } catch (BlockException e) { | ||||
// Blocked, set block exception to current entry. | // Blocked, set block exception to current entry. | ||||
context.getCurEntry().setError(e); | |||||
context.getCurEntry().setBlockError(e); | |||||
// Add block count. | // Add block count. | ||||
node.increaseBlockQps(count); | node.increaseBlockQps(count); | ||||
@@ -115,52 +115,31 @@ public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> { | |||||
throw e; | throw e; | ||||
} catch (Throwable e) { | } catch (Throwable e) { | ||||
// Unexpected error, set error to current entry. | |||||
// Unexpected internal error, set error to current entry. | |||||
context.getCurEntry().setError(e); | context.getCurEntry().setError(e); | ||||
// This should not happen. | |||||
node.increaseExceptionQps(count); | |||||
if (context.getCurEntry().getOriginNode() != null) { | |||||
context.getCurEntry().getOriginNode().increaseExceptionQps(count); | |||||
} | |||||
if (resourceWrapper.getEntryType() == EntryType.IN) { | |||||
Constants.ENTRY_NODE.increaseExceptionQps(count); | |||||
} | |||||
throw e; | throw e; | ||||
} | } | ||||
} | } | ||||
@Override | @Override | ||||
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { | public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { | ||||
DefaultNode node = (DefaultNode)context.getCurNode(); | |||||
if (context.getCurEntry().getError() == null) { | |||||
// Calculate response time (max RT is statisticMaxRt from SentinelConfig). | |||||
long rt = TimeUtil.currentTimeMillis() - context.getCurEntry().getCreateTime(); | |||||
int maxStatisticRt = SentinelConfig.statisticMaxRt(); | |||||
if (rt > maxStatisticRt) { | |||||
rt = maxStatisticRt; | |||||
} | |||||
Node node = context.getCurNode(); | |||||
// Record response time and success count. | |||||
node.addRtAndSuccess(rt, count); | |||||
if (context.getCurEntry().getOriginNode() != null) { | |||||
context.getCurEntry().getOriginNode().addRtAndSuccess(rt, count); | |||||
} | |||||
if (context.getCurEntry().getBlockError() == null) { | |||||
// Calculate response time (use completeStatTime as the time of completion). | |||||
long completeStatTime = TimeUtil.currentTimeMillis(); | |||||
context.getCurEntry().setCompleteTimestamp(completeStatTime); | |||||
long rt = completeStatTime - context.getCurEntry().getCreateTimestamp(); | |||||
node.decreaseThreadNum(); | |||||
if (context.getCurEntry().getOriginNode() != null) { | |||||
context.getCurEntry().getOriginNode().decreaseThreadNum(); | |||||
} | |||||
Throwable error = context.getCurEntry().getError(); | |||||
// Record response time and success count. | |||||
recordCompleteFor(node, count, rt, error); | |||||
recordCompleteFor(context.getCurEntry().getOriginNode(), count, rt, error); | |||||
if (resourceWrapper.getEntryType() == EntryType.IN) { | if (resourceWrapper.getEntryType() == EntryType.IN) { | ||||
Constants.ENTRY_NODE.addRtAndSuccess(rt, count); | |||||
Constants.ENTRY_NODE.decreaseThreadNum(); | |||||
recordCompleteFor(Constants.ENTRY_NODE, count, rt, error); | |||||
} | } | ||||
} else { | |||||
// Error may happen. | |||||
} | } | ||||
// Handle exit event with registered exit callback handlers. | // Handle exit event with registered exit callback handlers. | ||||
@@ -171,4 +150,16 @@ public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> { | |||||
fireExit(context, resourceWrapper, count); | fireExit(context, resourceWrapper, count); | ||||
} | } | ||||
private void recordCompleteFor(Node node, int batchCount, long rt, Throwable error) { | |||||
if (node == null) { | |||||
return; | |||||
} | |||||
node.addRtAndSuccess(rt, batchCount); | |||||
node.decreaseThreadNum(); | |||||
if (error != null && !(error instanceof BlockException)) { | |||||
node.increaseExceptionQps(batchCount); | |||||
} | |||||
} | |||||
} | } |
@@ -55,7 +55,7 @@ public class MetricExitCallbackTest extends AbstractTimeBasedTest { | |||||
int deltaMs = 100; | int deltaMs = 100; | ||||
when(entry.getError()).thenReturn(null); | when(entry.getError()).thenReturn(null); | ||||
when(entry.getCreateTime()).thenReturn(curMillis - deltaMs); | |||||
when(entry.getCreateTimestamp()).thenReturn(curMillis - deltaMs); | |||||
when(context.getCurEntry()).thenReturn(entry); | when(context.getCurEntry()).thenReturn(entry); | ||||
exitCallback.onExit(context, resourceWrapper, count, args); | exitCallback.onExit(context, resourceWrapper, count, args); | ||||
Assert.assertEquals(prevRt + deltaMs, extension.rt); | Assert.assertEquals(prevRt + deltaMs, extension.rt); | ||||
@@ -29,7 +29,7 @@ public class ParamFlowStatisticExitCallback implements ProcessorSlotExitCallback | |||||
@Override | @Override | ||||
public void onExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { | public void onExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { | ||||
if (context.getCurEntry().getError() == null) { | |||||
if (context.getCurEntry().getBlockError() == null) { | |||||
ParameterMetric parameterMetric = ParameterMetricStorage.getParamMetric(resourceWrapper); | ParameterMetric parameterMetric = ParameterMetricStorage.getParamMetric(resourceWrapper); | ||||
if (parameterMetric != null) { | if (parameterMetric != null) { | ||||