From d798794ab3c4a78b0ea3e4c76f61101bb7868b47 Mon Sep 17 00:00:00 2001 From: Eric Zhao Date: Wed, 12 Sep 2018 11:46:21 +0800 Subject: [PATCH] Refactor the context and entry to support asynchronous invocation chain Signed-off-by: Eric Zhao --- .../com/alibaba/csp/sentinel/AsyncEntry.java | 84 ++++++++++++++ .../com/alibaba/csp/sentinel/CtEntry.java | 103 +++++++++++++++++ .../java/com/alibaba/csp/sentinel/CtSph.java | 109 ++++++++---------- .../java/com/alibaba/csp/sentinel/Sph.java | 19 ++- .../java/com/alibaba/csp/sentinel/SphU.java | 42 +++++++ .../alibaba/csp/sentinel/context/Context.java | 42 +++++-- .../csp/sentinel/context/ContextUtil.java | 41 +++++++ 7 files changed, 366 insertions(+), 74 deletions(-) create mode 100644 sentinel-core/src/main/java/com/alibaba/csp/sentinel/AsyncEntry.java create mode 100644 sentinel-core/src/main/java/com/alibaba/csp/sentinel/CtEntry.java diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/AsyncEntry.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/AsyncEntry.java new file mode 100644 index 00000000..82b49267 --- /dev/null +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/AsyncEntry.java @@ -0,0 +1,84 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel; + +import com.alibaba.csp.sentinel.context.Context; +import com.alibaba.csp.sentinel.slotchain.ProcessorSlot; +import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; + +/** + * The entry for asynchronous resources. + * + * @author Eric Zhao + * @since 0.2.0 + */ +public class AsyncEntry extends CtEntry { + + private Context asyncContext; + + AsyncEntry(ResourceWrapper resourceWrapper, ProcessorSlot chain, Context context) { + super(resourceWrapper, chain, context); + } + + /** + * Remove current entry from local context, but does not exit. + */ + void cleanCurrentEntryInLocal() { + Context originalContext = context; + if (originalContext != null) { + Entry curEntry = originalContext.getCurEntry(); + if (curEntry == this) { + Entry parent = this.parent; + originalContext.setCurEntry(parent); + if (parent != null) { + ((CtEntry)parent).child = null; + } + } else { + throw new IllegalStateException("Bad async context state"); + } + } + } + + public Context getAsyncContext() { + return asyncContext; + } + + /** + * The async context should not be initialized until the node for current resource has been set to current entry. + */ + void initAsyncContext() { + if (asyncContext == null) { + this.asyncContext = Context.newAsyncContext(context.getEntranceNode(), context.getName()) + .setOrigin(context.getOrigin()) + .setCurEntry(this); + } else { + throw new IllegalStateException("Duplicate initialize of async context"); + } + } + + @Override + protected void clearEntryContext() { + super.clearEntryContext(); + this.asyncContext = null; + } + + @Override + protected Entry trueExit(int count, Object... args) throws ErrorEntryFreeException { + exitForContext(asyncContext, count, args); + + return parent; + } +} diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/CtEntry.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/CtEntry.java new file mode 100644 index 00000000..b01f983c --- /dev/null +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/CtEntry.java @@ -0,0 +1,103 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel; + +import com.alibaba.csp.sentinel.context.Context; +import com.alibaba.csp.sentinel.context.ContextUtil; +import com.alibaba.csp.sentinel.node.Node; +import com.alibaba.csp.sentinel.slotchain.ProcessorSlot; +import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; + +/** + * Linked entry within current context. + * + * @author jialiang.linjl + * @author Eric Zhao + */ +class CtEntry extends Entry { + + protected Entry parent = null; + protected Entry child = null; + + protected ProcessorSlot chain; + protected Context context; + + CtEntry(ResourceWrapper resourceWrapper, ProcessorSlot chain, Context context) { + super(resourceWrapper); + this.chain = chain; + this.context = context; + + setUpEntryFor(context); + } + + private void setUpEntryFor(Context context) { + this.parent = context.getCurEntry(); + if (parent != null) { + ((CtEntry)parent).child = this; + } + context.setCurEntry(this); + } + + @Override + public void exit(int count, Object... args) throws ErrorEntryFreeException { + trueExit(count, args); + } + + protected void exitForContext(Context context, int count, Object... args) throws ErrorEntryFreeException { + if (context != null) { + if (context.getCurEntry() != this) { + // Clean previous call stack. + CtEntry e = (CtEntry)context.getCurEntry(); + while (e != null) { + e.exit(count, args); + e = (CtEntry)e.parent; + } + throw new ErrorEntryFreeException("The order of entry free is can't be paired with the order of entry"); + } else { + if (chain != null) { + chain.exit(context, resourceWrapper, count, args); + } + // Restore the call stack. + context.setCurEntry(parent); + if (parent != null) { + ((CtEntry)parent).child = null; + } + if (parent == null) { + // Auto-created entry indicates immediate exit. + ContextUtil.exit(); + } + // Clean the reference of context in current entry to avoid duplicate exit. + clearEntryContext(); + } + } + } + + protected void clearEntryContext() { + this.context = null; + } + + @Override + protected Entry trueExit(int count, Object... args) throws ErrorEntryFreeException { + exitForContext(context, count, args); + + return parent; + } + + @Override + public Node getLastNode() { + return parent == null ? null : parent.getCurNode(); + } +} \ No newline at end of file diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/CtSph.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/CtSph.java index 43f3379a..73830416 100755 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/CtSph.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/CtSph.java @@ -23,7 +23,6 @@ import com.alibaba.csp.sentinel.log.RecordLog; import com.alibaba.csp.sentinel.context.Context; import com.alibaba.csp.sentinel.context.ContextUtil; import com.alibaba.csp.sentinel.context.NullContext; -import com.alibaba.csp.sentinel.node.Node; import com.alibaba.csp.sentinel.slotchain.MethodResourceWrapper; import com.alibaba.csp.sentinel.slotchain.ProcessorSlot; import com.alibaba.csp.sentinel.slotchain.ProcessorSlotChain; @@ -53,6 +52,46 @@ public class CtSph implements Sph { private static final Object LOCK = new Object(); + private AsyncEntry asyncEntryInternal(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException { + Context context = ContextUtil.getContext(); + if (context instanceof NullContext) { + // Init the entry only. No rule checking will occur. + return new AsyncEntry(resourceWrapper, null, context); + } + if (context == null) { + context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", resourceWrapper.getType()); + } + + // Global switch is turned off, so no rule checking will be done. + if (!Constants.ON) { + return new AsyncEntry(resourceWrapper, null, context); + } + + ProcessorSlot chain = lookProcessChain(resourceWrapper); + + // Means processor cache size exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE}, so no rule checking will be done. + if (chain == null) { + return new AsyncEntry(resourceWrapper, null, context); + } + + AsyncEntry asyncEntry = new AsyncEntry(resourceWrapper, chain, context); + try { + chain.entry(context, resourceWrapper, null, count, args); + // Initiate the async context. + asyncEntry.initAsyncContext(); + } catch (BlockException e1) { + asyncEntry.exit(count, args); + throw e1; + } catch (Throwable e1) { + RecordLog.info("Sentinel unexpected exception", e1); + } finally { + // The asynchronous call may take time in background, and current context should not be hanged on it. + // So we need to remove current async entry from current context. + asyncEntry.cleanCurrentEntryInLocal(); + } + return asyncEntry; + } + /** * Do all {@link Rule}s checking about the resource. * @@ -145,68 +184,6 @@ public class CtSph implements Sph { return chain; } - private static class CtEntry extends Entry { - - protected Entry parent = null; - protected Entry child = null; - private ProcessorSlot chain; - private Context context; - - CtEntry(ResourceWrapper resourceWrapper, ProcessorSlot chain, Context context) { - super(resourceWrapper); - this.chain = chain; - this.context = context; - parent = context.getCurEntry(); - if (parent != null) { - ((CtEntry)parent).child = this; - } - context.setCurEntry(this); - } - - @Override - public void exit(int count, Object... args) throws ErrorEntryFreeException { - trueExit(count, args); - } - - @Override - protected Entry trueExit(int count, Object... args) throws ErrorEntryFreeException { - if (context != null) { - if (context.getCurEntry() != this) { - // Clean previous call stack. - CtEntry e = (CtEntry)context.getCurEntry(); - while (e != null) { - e.exit(count, args); - e = (CtEntry)e.parent; - } - throw new ErrorEntryFreeException( - "The order of entry free is can't be paired with the order of entry"); - } else { - if (chain != null) { - chain.exit(context, resourceWrapper, count, args); - } - // Modify the call stack. - context.setCurEntry(parent); - if (parent != null) { - ((CtEntry)parent).child = null; - } - if (parent == null) { - // Auto-created entry indicates immediate exit. - ContextUtil.exit(); - } - // Clean the reference of context in current entry to avoid duplicate exit. - context = null; - } - } - return parent; - - } - - @Override - public Node getLastNode() { - return parent == null ? null : parent.getCurNode(); - } - } - /** * This class is used for skip context name checking. */ @@ -275,4 +252,10 @@ public class CtSph implements Sph { StringResourceWrapper resource = new StringResourceWrapper(name, type); return entry(resource, count, args); } + + @Override + public AsyncEntry asyncEntry(String name, EntryType type, int count, Object... args) throws BlockException { + StringResourceWrapper resource = new StringResourceWrapper(name, type); + return asyncEntryInternal(resource, count, args); + } } diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/Sph.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/Sph.java index 0d8977f8..8564db56 100755 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/Sph.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/Sph.java @@ -27,6 +27,7 @@ import com.alibaba.csp.sentinel.slots.block.BlockException; * @author qinan.qn * @author jialiang.linjl * @author leyou + * @author Eric Zhao */ public interface Sph { @@ -135,11 +136,23 @@ public interface Sph { * @param type the resource is an inbound or an outbound method. This is used * to mark whether it can be blocked when the system is unstable * @param count the count that the resource requires - * @param args the parameters of the method. It can also be counted by setting - * hot parameter rule - * @return entry get. + * @param args the parameters of the method. It can also be counted by setting hot parameter rule + * @return entry get * @throws BlockException if the block criteria is met */ Entry entry(String name, EntryType type, int count, Object... args) throws BlockException; + /** + * Create a protected asynchronous resource. + * + * @param name the unique name for the protected resource + * @param type the resource is an inbound or an outbound method. This is used + * to mark whether it can be blocked when the system is unstable + * @param count the count that the resource requires + * @param args the parameters of the method. It can also be counted by setting hot parameter rule + * @return created asynchronous entry + * @throws BlockException if the block criteria is met + * @since 0.2.0 + */ + AsyncEntry asyncEntry(String name, EntryType type, int count, Object... args) throws BlockException; } diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/SphU.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/SphU.java index 7a8c97e0..ee3f769d 100755 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/SphU.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/SphU.java @@ -69,6 +69,7 @@ import com.alibaba.csp.sentinel.slots.system.SystemRuleManager; *

* * @author jialiang.linjl + * @author Eric Zhao * @see SphO */ public class SphU { @@ -200,4 +201,45 @@ public class SphU { public static Entry entry(String name, EntryType type, int count, Object... args) throws BlockException { return Env.sph.entry(name, type, count, args); } + + /** + * Checking all rules about the asynchronous resource. + * + * @param name the unique name of the protected resource + * @throws BlockException if the block criteria is met, eg. when any rule's threshold is exceeded + * @since 0.2.0 + */ + public static AsyncEntry asyncEntry(String name) throws BlockException { + return Env.sph.asyncEntry(name, EntryType.OUT, 1, OBJECTS0); + } + + /** + * Checking all {@link Rule}s about the asynchronous resource. + * + * @param name the unique name for the protected resource + * @param type the resource is an inbound or an outbound method. This is used + * to mark whether it can be blocked when the system is unstable, + * only inbound traffic could be blocked by {@link SystemRule} + * @throws BlockException if the block criteria is met, eg. when any rule's threshold is exceeded + * @since 0.2.0 + */ + public static AsyncEntry asyncEntry(String name, EntryType type) throws BlockException { + return Env.sph.asyncEntry(name, type, 1, OBJECTS0); + } + + /** + * Checking all {@link Rule}s about the asynchronous resource. + * + * @param name the unique name for the protected resource + * @param type the resource is an inbound or an outbound method. This is used + * to mark whether it can be blocked when the system is unstable, + * only inbound traffic could be blocked by {@link SystemRule} + * @param count tokens required + * @param args extra parameters + * @throws BlockException if the block criteria is met, eg. when any rule's threshold is exceeded + * @since 0.2.0 + */ + public static AsyncEntry asyncEntry(String name, EntryType type, int count, Object... args) throws BlockException { + return Env.sph.asyncEntry(name, type, count, args); + } } diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/context/Context.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/context/Context.java index 165c98a3..d970fb52 100755 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/context/Context.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/context/Context.java @@ -32,8 +32,9 @@ import com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot; *
  • the current {@link Entry}: the current invocation point.
  • *
  • the current {@link Node}: the statistics related to the * {@link Entry}.
  • - *
  • the origin:The origin is useful when we want to control different - * invoker/consumer separately. Usually the origin could be the Service Consumer's app name.
  • + *
  • the origin: The origin is useful when we want to control different + * invoker/consumer separately. Usually the origin could be the Service Consumer's app name + * or origin IP.
  • * *

    * Each {@link SphU}#entry() or {@link SphO}#entry() should be in a {@link Context}, @@ -58,7 +59,7 @@ public class Context { /** * Context name. */ - private String name; + private final String name; /** * The entrance node of current invocation tree. @@ -71,14 +72,36 @@ public class Context { private Entry curEntry; /** - * the origin of this context, usually the origin is the Service Consumer's app name. + * The origin of this context (usually indicate different invokers, e.g. service consumer name or origin IP). */ private String origin = ""; + private final boolean async; + + /** + * Create a new async context. + * + * @param entranceNode entrance node of the context + * @param name context name + * @return the new created context + * @since 0.2.0 + */ + public static Context newAsyncContext(DefaultNode entranceNode, String name) { + return new Context(name, entranceNode, true); + } + public Context(DefaultNode entranceNode, String name) { - super(); + this(name, entranceNode, false); + } + + public Context(String name, DefaultNode entranceNode, boolean async) { this.name = name; this.entranceNode = entranceNode; + this.async = async; + } + + public boolean isAsync() { + return async; } public String getName() { @@ -89,24 +112,27 @@ public class Context { return curEntry.getCurNode(); } - public void setCurNode(Node node) { + public Context setCurNode(Node node) { this.curEntry.setCurNode(node); + return this; } public Entry getCurEntry() { return curEntry; } - public void setCurEntry(Entry curEntry) { + public Context setCurEntry(Entry curEntry) { this.curEntry = curEntry; + return this; } public String getOrigin() { return origin; } - public void setOrigin(String origin) { + public Context setOrigin(String origin) { this.origin = origin; + return this; } public double getOriginTotalQps() { diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/context/ContextUtil.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/context/ContextUtil.java index 3b582262..a6175350 100755 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/context/ContextUtil.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/context/ContextUtil.java @@ -175,4 +175,45 @@ public class ContextUtil { public static Context getContext() { return contextHolder.get(); } + + /** + *

    + * Replace current context with the provided context. + * This is mainly designed for context switching (e.g. in asynchronous invocation). + *

    + *

    + * Note: When switching context manually, remember to restore the original context. + * For common scenarios, you can use {@link #runOnContext(Context, Runnable)}. + *

    + * + * @param newContext new context to set + * @return old context + * @since 0.2.0 + */ + private static Context replaceContext(Context newContext) { + Context backupContext = contextHolder.get(); + if (newContext == null) { + contextHolder.remove(); + } else { + contextHolder.set(newContext); + } + return backupContext; + } + + /** + * Execute the code within provided context. + * This is mainly designed for context switching (e.g. in asynchronous invocation). + * + * @param context the context + * @param f lambda to run within the context + * @since 0.2.0 + */ + public static void runOnContext(Context context, Runnable f) { + Context curContext = replaceContext(context); + try { + f.run(); + } finally { + replaceContext(curContext); + } + } }