- Refactor the slot interface to support prioritized entry - Add `entryWithPriority` in SphU Signed-off-by: Eric Zhao <sczyh16@gmail.com>master
@@ -61,7 +61,8 @@ public class CtSph implements Sph { | |||
return entry; | |||
} | |||
private AsyncEntry asyncEntryInternal(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException { | |||
private AsyncEntry asyncEntryWithPriorityInternal(ResourceWrapper resourceWrapper, int count, boolean prioritized, | |||
Object... args) throws BlockException { | |||
Context context = ContextUtil.getContext(); | |||
if (context instanceof NullContext) { | |||
// The {@link NullContext} indicates that the amount of context has exceeded the threshold, | |||
@@ -87,7 +88,7 @@ public class CtSph implements Sph { | |||
AsyncEntry asyncEntry = new AsyncEntry(resourceWrapper, chain, context); | |||
try { | |||
chain.entry(context, resourceWrapper, null, count, args); | |||
chain.entry(context, resourceWrapper, null, count, prioritized, args); | |||
// Initiate the async context only when the entry successfully passed the slot chain. | |||
asyncEntry.initAsyncContext(); | |||
// The asynchronous call may take time in background, and current context should not be hanged on it. | |||
@@ -108,23 +109,12 @@ public class CtSph implements Sph { | |||
return asyncEntry; | |||
} | |||
/** | |||
* Do all {@link Rule}s checking about the resource. | |||
* | |||
* <p>Each distinct resource will use a {@link ProcessorSlot} to do rules checking. Same resource will use | |||
* same {@link ProcessorSlot} globally. </p> | |||
* | |||
* <p>Note that total {@link ProcessorSlot} count must not exceed {@link Constants#MAX_SLOT_CHAIN_SIZE}, | |||
* otherwise no rules checking will do. In this condition, all requests will pass directly, with no checking | |||
* or exception.</p> | |||
* | |||
* @param resourceWrapper resource name | |||
* @param count tokens needed | |||
* @param args arguments of user method call | |||
* @return {@link Entry} represents this call | |||
* @throws BlockException if any rule's threshold is exceeded | |||
*/ | |||
public Entry entry(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException { | |||
private AsyncEntry asyncEntryInternal(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException { | |||
return asyncEntryWithPriorityInternal(resourceWrapper, count, false, args); | |||
} | |||
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) | |||
throws BlockException { | |||
Context context = ContextUtil.getContext(); | |||
if (context instanceof NullContext) { | |||
// The {@link NullContext} indicates that the amount of context has exceeded the threshold, | |||
@@ -154,7 +144,7 @@ public class CtSph implements Sph { | |||
Entry e = new CtEntry(resourceWrapper, chain, context); | |||
try { | |||
chain.entry(context, resourceWrapper, null, count, args); | |||
chain.entry(context, resourceWrapper, null, count, prioritized, args); | |||
} catch (BlockException e1) { | |||
e.exit(count, args); | |||
throw e1; | |||
@@ -165,6 +155,26 @@ public class CtSph implements Sph { | |||
return e; | |||
} | |||
/** | |||
* Do all {@link Rule}s checking about the resource. | |||
* | |||
* <p>Each distinct resource will use a {@link ProcessorSlot} to do rules checking. Same resource will use | |||
* same {@link ProcessorSlot} globally. </p> | |||
* | |||
* <p>Note that total {@link ProcessorSlot} count must not exceed {@link Constants#MAX_SLOT_CHAIN_SIZE}, | |||
* otherwise no rules checking will do. In this condition, all requests will pass directly, with no checking | |||
* or exception.</p> | |||
* | |||
* @param resourceWrapper resource name | |||
* @param count tokens needed | |||
* @param args arguments of user method call | |||
* @return {@link Entry} represents this call | |||
* @throws BlockException if any rule's threshold is exceeded | |||
*/ | |||
public Entry entry(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException { | |||
return entryWithPriority(resourceWrapper, count, false, args); | |||
} | |||
/** | |||
* Get {@link ProcessorSlotChain} of the resource. new {@link ProcessorSlotChain} will | |||
* be created if the resource doesn't relate one. | |||
@@ -305,4 +315,10 @@ public class CtSph implements Sph { | |||
StringResourceWrapper resource = new StringResourceWrapper(name, type); | |||
return asyncEntryInternal(resource, count, args); | |||
} | |||
@Override | |||
public Entry entryWithPriority(String name, EntryType type, int count, boolean prioritized) throws BlockException { | |||
StringResourceWrapper resource = new StringResourceWrapper(name, type); | |||
return entryWithPriority(resource, count, prioritized); | |||
} | |||
} |
@@ -155,4 +155,18 @@ public interface Sph { | |||
* @since 0.2.0 | |||
*/ | |||
AsyncEntry asyncEntry(String name, EntryType type, int count, Object... args) throws BlockException; | |||
/** | |||
* Create a protected resource with priority. | |||
* | |||
* @param name the unique name for the protected resource | |||
* @param type the resource is an inbound or an outbound method. This is used | |||
* to mark whether it can be blocked when the system is unstable | |||
* @param count the count that the resource requires | |||
* @param prioritized whether the entry is prioritized | |||
* @return entry get | |||
* @throws BlockException if the block criteria is met | |||
* @since 1.4.0 | |||
*/ | |||
Entry entryWithPriority(String name, EntryType type, int count, boolean prioritized) throws BlockException; | |||
} |
@@ -242,4 +242,29 @@ public class SphU { | |||
public static AsyncEntry asyncEntry(String name, EntryType type, int count, Object... args) throws BlockException { | |||
return Env.sph.asyncEntry(name, type, count, args); | |||
} | |||
/** | |||
* Checking all {@link Rule}s related the resource. The entry is prioritized. | |||
* | |||
* @param name the unique name for the protected resource | |||
* @throws BlockException if the block criteria is met, eg. when any rule's threshold is exceeded. | |||
* @since 1.4.0 | |||
*/ | |||
public static Entry entryWithPriority(String name) throws BlockException { | |||
return Env.sph.entryWithPriority(name, EntryType.OUT, 1, true); | |||
} | |||
/** | |||
* Checking all {@link Rule}s related the resource. The entry is prioritized. | |||
* | |||
* @param name the unique name for the protected resource | |||
* @param type the resource is an inbound or an outbound method. This is used | |||
* to mark whether it can be blocked when the system is unstable, | |||
* only inbound traffic could be blocked by {@link SystemRule} | |||
* @throws BlockException if the block criteria is met, eg. when any rule's threshold is exceeded. | |||
* @since 1.4.0 | |||
*/ | |||
public static Entry entryWithPriority(String name, EntryType type) throws BlockException { | |||
return Env.sph.entryWithPriority(name, type, 1, true); | |||
} | |||
} |
@@ -26,18 +26,18 @@ public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> | |||
private AbstractLinkedProcessorSlot<?> next = null; | |||
@Override | |||
public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, Object... args) | |||
public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) | |||
throws Throwable { | |||
if (next != null) { | |||
next.transformEntry(context, resourceWrapper, obj, count, args); | |||
next.transformEntry(context, resourceWrapper, obj, count, prioritized, args); | |||
} | |||
} | |||
@SuppressWarnings("unchecked") | |||
void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, Object... args) | |||
void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args) | |||
throws Throwable { | |||
T t = (T)o; | |||
entry(context, resourceWrapper, t, count, args); | |||
entry(context, resourceWrapper, t, count, prioritized, args); | |||
} | |||
@Override | |||
@@ -26,9 +26,9 @@ public class DefaultProcessorSlotChain extends ProcessorSlotChain { | |||
AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() { | |||
@Override | |||
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, Object... args) | |||
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args) | |||
throws Throwable { | |||
super.fireEntry(context, resourceWrapper, t, count, args); | |||
super.fireEntry(context, resourceWrapper, t, count, prioritized, args); | |||
} | |||
@Override | |||
@@ -70,9 +70,9 @@ public class DefaultProcessorSlotChain extends ProcessorSlotChain { | |||
} | |||
@Override | |||
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, Object... args) | |||
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args) | |||
throws Throwable { | |||
first.transformEntry(context, resourceWrapper, t, count, args); | |||
first.transformEntry(context, resourceWrapper, t, count, prioritized, args); | |||
} | |||
@Override | |||
@@ -23,6 +23,7 @@ import com.alibaba.csp.sentinel.context.Context; | |||
* @author qinan.qn | |||
* @author jialiang.linjl | |||
* @author leyou(lihao) | |||
* @author Eric Zhao | |||
*/ | |||
public interface ProcessorSlot<T> { | |||
@@ -31,26 +32,28 @@ public interface ProcessorSlot<T> { | |||
* | |||
* @param context current {@link Context} | |||
* @param resourceWrapper current resource | |||
* @param param Generics parameter, usually is a {@link com.alibaba.csp.sentinel.node.Node} | |||
* @param param generics parameter, usually is a {@link com.alibaba.csp.sentinel.node.Node} | |||
* @param count tokens needed | |||
* @param prioritized whether the entry is prioritized | |||
* @param args parameters of the original call | |||
* @throws Throwable blocked exception or unexpected error | |||
*/ | |||
void entry(Context context, ResourceWrapper resourceWrapper, T param, int count, Object... args) | |||
throws Throwable; | |||
void entry(Context context, ResourceWrapper resourceWrapper, T param, int count, boolean prioritized, | |||
Object... args) throws Throwable; | |||
/** | |||
* Means finish of {@link #entry(Context, ResourceWrapper, Object, int, Object...)}. | |||
* Means finish of {@link #entry(Context, ResourceWrapper, Object, int, boolean, Object...)}. | |||
* | |||
* @param context current {@link Context} | |||
* @param resourceWrapper current resource | |||
* @param obj relevant object (e.g. Node) | |||
* @param count tokens needed | |||
* @param prioritized whether the entry is prioritized | |||
* @param args parameters of the original call | |||
* @throws Throwable blocked exception or unexpected error | |||
*/ | |||
void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, Object... args) | |||
throws Throwable; | |||
void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, | |||
Object... args) throws Throwable; | |||
/** | |||
* Exit of this slot. | |||
@@ -17,7 +17,7 @@ package com.alibaba.csp.sentinel.slots.block; | |||
import com.alibaba.csp.sentinel.node.IntervalProperty; | |||
/*** | |||
/** | |||
* @author youji.zj | |||
* @author jialiang.linjl | |||
*/ | |||
@@ -33,10 +33,10 @@ import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; | |||
public class AuthoritySlot extends AbstractLinkedProcessorSlot<DefaultNode> { | |||
@Override | |||
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args) | |||
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) | |||
throws Throwable { | |||
checkBlackWhiteAuthority(resourceWrapper, context); | |||
fireEntry(context, resourceWrapper, node, count, args); | |||
fireEntry(context, resourceWrapper, node, count, prioritized, args); | |||
} | |||
@Override | |||
@@ -29,10 +29,10 @@ import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; | |||
public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> { | |||
@Override | |||
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args) | |||
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) | |||
throws Throwable { | |||
DegradeRuleManager.checkDegrade(resourceWrapper, context, node, count); | |||
fireEntry(context, resourceWrapper, node, count, args); | |||
fireEntry(context, resourceWrapper, node, count, prioritized, args); | |||
} | |||
@Override | |||
@@ -136,11 +136,11 @@ import com.alibaba.csp.sentinel.slots.block.BlockException; | |||
public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> { | |||
@Override | |||
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args) | |||
throws Throwable { | |||
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, | |||
boolean prioritized, Object... args) throws Throwable { | |||
checkFlow(resourceWrapper, context, node, count); | |||
fireEntry(context, resourceWrapper, node, count, args); | |||
fireEntry(context, resourceWrapper, node, count, prioritized, args); | |||
} | |||
void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count) throws BlockException { | |||
@@ -74,7 +74,7 @@ public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> | |||
private ClusterNode clusterNode = null; | |||
@Override | |||
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args) | |||
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) | |||
throws Throwable { | |||
if (clusterNode == null) { | |||
synchronized (lock) { | |||
@@ -100,7 +100,7 @@ public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> | |||
context.getCurEntry().setOriginNode(originNode); | |||
} | |||
fireEntry(context, resourceWrapper, node, count, args); | |||
fireEntry(context, resourceWrapper, node, count, prioritized, args); | |||
} | |||
@Override | |||
@@ -29,10 +29,10 @@ import com.alibaba.csp.sentinel.slots.block.BlockException; | |||
public class LogSlot extends AbstractLinkedProcessorSlot<DefaultNode> { | |||
@Override | |||
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode obj, int count, Object... args) | |||
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode obj, int count, boolean prioritized, Object... args) | |||
throws Throwable { | |||
try { | |||
fireEntry(context, resourceWrapper, obj, count, args); | |||
fireEntry(context, resourceWrapper, obj, count, prioritized, args); | |||
} catch (BlockException e) { | |||
EagleEyeLogUtil.log(resourceWrapper.getName(), e.getClass().getSimpleName(), e.getRuleLimitApp(), | |||
context.getOrigin(), count); | |||
@@ -131,7 +131,7 @@ public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> { | |||
private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10); | |||
@Override | |||
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, Object... args) | |||
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) | |||
throws Throwable { | |||
/* | |||
* It's interesting that we use context name rather resource name as the map key. | |||
@@ -168,7 +168,7 @@ public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> { | |||
} | |||
context.setCurNode(node); | |||
fireEntry(context, resourceWrapper, node, count, args); | |||
fireEntry(context, resourceWrapper, node, count, prioritized, args); | |||
} | |||
@Override | |||
@@ -48,11 +48,11 @@ import com.alibaba.csp.sentinel.slots.block.BlockException; | |||
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> { | |||
@Override | |||
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args) | |||
throws Throwable { | |||
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, | |||
boolean prioritized, Object... args) throws Throwable { | |||
try { | |||
// Do some checking. | |||
fireEntry(context, resourceWrapper, node, count, args); | |||
fireEntry(context, resourceWrapper, node, count, prioritized, args); | |||
// Request passed, add thread count and pass count. | |||
node.increaseThreadNum(); | |||
@@ -30,10 +30,10 @@ import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; | |||
public class SystemSlot extends AbstractLinkedProcessorSlot<DefaultNode> { | |||
@Override | |||
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args) | |||
throws Throwable { | |||
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, | |||
boolean prioritized, Object... args) throws Throwable { | |||
SystemRuleManager.checkSystem(resourceWrapper); | |||
fireEntry(context, resourceWrapper, node, count, args); | |||
fireEntry(context, resourceWrapper, node, count, prioritized, args); | |||
} | |||
@Override | |||
@@ -308,7 +308,7 @@ public class CtSphTest { | |||
private class ShouldNotPassSlot extends AbstractLinkedProcessorSlot<DefaultNode> { | |||
@Override | |||
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode param, int count, | |||
Object... args) { | |||
boolean prioritized, Object... args) { | |||
throw new IllegalStateException("Should not enter this slot!"); | |||
} | |||
@@ -323,7 +323,7 @@ public class CtSphTest { | |||
@Override | |||
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode param, int count, | |||
Object... args) throws Throwable { | |||
boolean prioritized, Object... args) throws Throwable { | |||
throw new BlockException("custom") {}; | |||
} | |||
@@ -339,7 +339,7 @@ public class CtSphTest { | |||
@Override | |||
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode param, int count, | |||
Object... args) { | |||
boolean prioritized, Object... args) { | |||
entered = true; | |||
} | |||
@@ -28,12 +28,12 @@ import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; | |||
public class DemoSlot extends AbstractLinkedProcessorSlot<DefaultNode> { | |||
@Override | |||
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args) | |||
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) | |||
throws Throwable { | |||
System.out.println("Current context: " + context.getName()); | |||
System.out.println("Current entry resource: " + context.getCurEntry().getResourceWrapper().getName()); | |||
fireEntry(context, resourceWrapper, node, count, args); | |||
fireEntry(context, resourceWrapper, node, count, prioritized, args); | |||
} | |||
@Override | |||
@@ -47,16 +47,16 @@ public class ParamFlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> { | |||
private final Object LOCK = new Object(); | |||
@Override | |||
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args) | |||
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) | |||
throws Throwable { | |||
if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) { | |||
fireEntry(context, resourceWrapper, node, count, args); | |||
fireEntry(context, resourceWrapper, node, count, prioritized, args); | |||
return; | |||
} | |||
checkFlow(resourceWrapper, count, args); | |||
fireEntry(context, resourceWrapper, node, count, args); | |||
fireEntry(context, resourceWrapper, node, count, prioritized, args); | |||
} | |||
@Override | |||
@@ -43,7 +43,7 @@ public class ParamFlowSlotTest { | |||
public void testEntryWhenParamFlowRuleNotExists() throws Throwable { | |||
String resourceName = "testEntryWhenParamFlowRuleNotExists"; | |||
ResourceWrapper resourceWrapper = new StringResourceWrapper(resourceName, EntryType.IN); | |||
paramFlowSlot.entry(null, resourceWrapper, null, 1, "abc"); | |||
paramFlowSlot.entry(null, resourceWrapper, null, 1, false, "abc"); | |||
// The parameter metric instance will not be created. | |||
assertNull(ParamFlowSlot.getParamMetric(resourceWrapper)); | |||
} | |||
@@ -68,10 +68,10 @@ public class ParamFlowSlotTest { | |||
ParamFlowSlot.getMetricsMap().put(resourceWrapper, metric); | |||
// The first entry will pass. | |||
paramFlowSlot.entry(null, resourceWrapper, null, 1, argToGo); | |||
paramFlowSlot.entry(null, resourceWrapper, null, 1, false, argToGo); | |||
// The second entry will be blocked. | |||
try { | |||
paramFlowSlot.entry(null, resourceWrapper, null, 1, argToGo); | |||
paramFlowSlot.entry(null, resourceWrapper, null, 1, false, argToGo); | |||
} catch (ParamFlowException ex) { | |||
assertEquals(String.valueOf(argToGo), ex.getMessage()); | |||
assertEquals(resourceName, ex.getResourceName()); | |||