diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/AsyncEntry.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/AsyncEntry.java new file mode 100644 index 00000000..82b49267 --- /dev/null +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/AsyncEntry.java @@ -0,0 +1,84 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel; + +import com.alibaba.csp.sentinel.context.Context; +import com.alibaba.csp.sentinel.slotchain.ProcessorSlot; +import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; + +/** + * The entry for asynchronous resources. + * + * @author Eric Zhao + * @since 0.2.0 + */ +public class AsyncEntry extends CtEntry { + + private Context asyncContext; + + AsyncEntry(ResourceWrapper resourceWrapper, ProcessorSlot chain, Context context) { + super(resourceWrapper, chain, context); + } + + /** + * Remove current entry from local context, but does not exit. + */ + void cleanCurrentEntryInLocal() { + Context originalContext = context; + if (originalContext != null) { + Entry curEntry = originalContext.getCurEntry(); + if (curEntry == this) { + Entry parent = this.parent; + originalContext.setCurEntry(parent); + if (parent != null) { + ((CtEntry)parent).child = null; + } + } else { + throw new IllegalStateException("Bad async context state"); + } + } + } + + public Context getAsyncContext() { + return asyncContext; + } + + /** + * The async context should not be initialized until the node for current resource has been set to current entry. + */ + void initAsyncContext() { + if (asyncContext == null) { + this.asyncContext = Context.newAsyncContext(context.getEntranceNode(), context.getName()) + .setOrigin(context.getOrigin()) + .setCurEntry(this); + } else { + throw new IllegalStateException("Duplicate initialize of async context"); + } + } + + @Override + protected void clearEntryContext() { + super.clearEntryContext(); + this.asyncContext = null; + } + + @Override + protected Entry trueExit(int count, Object... args) throws ErrorEntryFreeException { + exitForContext(asyncContext, count, args); + + return parent; + } +} diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/CtEntry.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/CtEntry.java new file mode 100644 index 00000000..b01f983c --- /dev/null +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/CtEntry.java @@ -0,0 +1,103 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel; + +import com.alibaba.csp.sentinel.context.Context; +import com.alibaba.csp.sentinel.context.ContextUtil; +import com.alibaba.csp.sentinel.node.Node; +import com.alibaba.csp.sentinel.slotchain.ProcessorSlot; +import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; + +/** + * Linked entry within current context. + * + * @author jialiang.linjl + * @author Eric Zhao + */ +class CtEntry extends Entry { + + protected Entry parent = null; + protected Entry child = null; + + protected ProcessorSlot chain; + protected Context context; + + CtEntry(ResourceWrapper resourceWrapper, ProcessorSlot chain, Context context) { + super(resourceWrapper); + this.chain = chain; + this.context = context; + + setUpEntryFor(context); + } + + private void setUpEntryFor(Context context) { + this.parent = context.getCurEntry(); + if (parent != null) { + ((CtEntry)parent).child = this; + } + context.setCurEntry(this); + } + + @Override + public void exit(int count, Object... args) throws ErrorEntryFreeException { + trueExit(count, args); + } + + protected void exitForContext(Context context, int count, Object... args) throws ErrorEntryFreeException { + if (context != null) { + if (context.getCurEntry() != this) { + // Clean previous call stack. + CtEntry e = (CtEntry)context.getCurEntry(); + while (e != null) { + e.exit(count, args); + e = (CtEntry)e.parent; + } + throw new ErrorEntryFreeException("The order of entry free is can't be paired with the order of entry"); + } else { + if (chain != null) { + chain.exit(context, resourceWrapper, count, args); + } + // Restore the call stack. + context.setCurEntry(parent); + if (parent != null) { + ((CtEntry)parent).child = null; + } + if (parent == null) { + // Auto-created entry indicates immediate exit. + ContextUtil.exit(); + } + // Clean the reference of context in current entry to avoid duplicate exit. + clearEntryContext(); + } + } + } + + protected void clearEntryContext() { + this.context = null; + } + + @Override + protected Entry trueExit(int count, Object... args) throws ErrorEntryFreeException { + exitForContext(context, count, args); + + return parent; + } + + @Override + public Node getLastNode() { + return parent == null ? null : parent.getCurNode(); + } +} \ No newline at end of file diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/CtSph.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/CtSph.java index 8c0385bb..18e38693 100755 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/CtSph.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/CtSph.java @@ -23,7 +23,6 @@ import com.alibaba.csp.sentinel.log.RecordLog; import com.alibaba.csp.sentinel.context.Context; import com.alibaba.csp.sentinel.context.ContextUtil; import com.alibaba.csp.sentinel.context.NullContext; -import com.alibaba.csp.sentinel.node.Node; import com.alibaba.csp.sentinel.slotchain.MethodResourceWrapper; import com.alibaba.csp.sentinel.slotchain.ProcessorSlot; import com.alibaba.csp.sentinel.slotchain.ProcessorSlotChain; @@ -54,6 +53,46 @@ public class CtSph implements Sph { private static final Object LOCK = new Object(); + private AsyncEntry asyncEntryInternal(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException { + Context context = ContextUtil.getContext(); + if (context instanceof NullContext) { + // Init the entry only. No rule checking will occur. + return new AsyncEntry(resourceWrapper, null, context); + } + if (context == null) { + context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", resourceWrapper.getType()); + } + + // Global switch is turned off, so no rule checking will be done. + if (!Constants.ON) { + return new AsyncEntry(resourceWrapper, null, context); + } + + ProcessorSlot chain = lookProcessChain(resourceWrapper); + + // Means processor cache size exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE}, so no rule checking will be done. + if (chain == null) { + return new AsyncEntry(resourceWrapper, null, context); + } + + AsyncEntry asyncEntry = new AsyncEntry(resourceWrapper, chain, context); + try { + chain.entry(context, resourceWrapper, null, count, args); + // Initiate the async context. + asyncEntry.initAsyncContext(); + } catch (BlockException e1) { + asyncEntry.exit(count, args); + throw e1; + } catch (Throwable e1) { + RecordLog.info("Sentinel unexpected exception", e1); + } finally { + // The asynchronous call may take time in background, and current context should not be hanged on it. + // So we need to remove current async entry from current context. + asyncEntry.cleanCurrentEntryInLocal(); + } + return asyncEntry; + } + /** * Do all {@link Rule}s checking about the resource. * @@ -146,68 +185,6 @@ public class CtSph implements Sph { return chain; } - private static class CtEntry extends Entry { - - protected Entry parent = null; - protected Entry child = null; - private ProcessorSlot chain; - private Context context; - - CtEntry(ResourceWrapper resourceWrapper, ProcessorSlot chain, Context context) { - super(resourceWrapper); - this.chain = chain; - this.context = context; - parent = context.getCurEntry(); - if (parent != null) { - ((CtEntry)parent).child = this; - } - context.setCurEntry(this); - } - - @Override - public void exit(int count, Object... args) throws ErrorEntryFreeException { - trueExit(count, args); - } - - @Override - protected Entry trueExit(int count, Object... args) throws ErrorEntryFreeException { - if (context != null) { - if (context.getCurEntry() != this) { - // Clean previous call stack. - CtEntry e = (CtEntry)context.getCurEntry(); - while (e != null) { - e.exit(count, args); - e = (CtEntry)e.parent; - } - throw new ErrorEntryFreeException( - "The order of entry free is can't be paired with the order of entry"); - } else { - if (chain != null) { - chain.exit(context, resourceWrapper, count, args); - } - // Modify the call stack. - context.setCurEntry(parent); - if (parent != null) { - ((CtEntry)parent).child = null; - } - if (parent == null) { - // Auto-created entry indicates immediate exit. - ContextUtil.exit(); - } - // Clean the reference of context in current entry to avoid duplicate exit. - context = null; - } - } - return parent; - - } - - @Override - public Node getLastNode() { - return parent == null ? null : parent.getCurNode(); - } - } - /** * This class is used for skip context name checking. */ @@ -276,4 +253,10 @@ public class CtSph implements Sph { StringResourceWrapper resource = new StringResourceWrapper(name, type); return entry(resource, count, args); } + + @Override + public AsyncEntry asyncEntry(String name, EntryType type, int count, Object... args) throws BlockException { + StringResourceWrapper resource = new StringResourceWrapper(name, type); + return asyncEntryInternal(resource, count, args); + } } diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/Sph.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/Sph.java index 0d8977f8..8564db56 100755 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/Sph.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/Sph.java @@ -27,6 +27,7 @@ import com.alibaba.csp.sentinel.slots.block.BlockException; * @author qinan.qn * @author jialiang.linjl * @author leyou + * @author Eric Zhao */ public interface Sph { @@ -135,11 +136,23 @@ public interface Sph { * @param type the resource is an inbound or an outbound method. This is used * to mark whether it can be blocked when the system is unstable * @param count the count that the resource requires - * @param args the parameters of the method. It can also be counted by setting - * hot parameter rule - * @return entry get. + * @param args the parameters of the method. It can also be counted by setting hot parameter rule + * @return entry get * @throws BlockException if the block criteria is met */ Entry entry(String name, EntryType type, int count, Object... args) throws BlockException; + /** + * Create a protected asynchronous resource. + * + * @param name the unique name for the protected resource + * @param type the resource is an inbound or an outbound method. This is used + * to mark whether it can be blocked when the system is unstable + * @param count the count that the resource requires + * @param args the parameters of the method. It can also be counted by setting hot parameter rule + * @return created asynchronous entry + * @throws BlockException if the block criteria is met + * @since 0.2.0 + */ + AsyncEntry asyncEntry(String name, EntryType type, int count, Object... args) throws BlockException; } diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/SphU.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/SphU.java index 7a8c97e0..ee3f769d 100755 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/SphU.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/SphU.java @@ -69,6 +69,7 @@ import com.alibaba.csp.sentinel.slots.system.SystemRuleManager; *

* * @author jialiang.linjl + * @author Eric Zhao * @see SphO */ public class SphU { @@ -200,4 +201,45 @@ public class SphU { public static Entry entry(String name, EntryType type, int count, Object... args) throws BlockException { return Env.sph.entry(name, type, count, args); } + + /** + * Checking all rules about the asynchronous resource. + * + * @param name the unique name of the protected resource + * @throws BlockException if the block criteria is met, eg. when any rule's threshold is exceeded + * @since 0.2.0 + */ + public static AsyncEntry asyncEntry(String name) throws BlockException { + return Env.sph.asyncEntry(name, EntryType.OUT, 1, OBJECTS0); + } + + /** + * Checking all {@link Rule}s about the asynchronous resource. + * + * @param name the unique name for the protected resource + * @param type the resource is an inbound or an outbound method. This is used + * to mark whether it can be blocked when the system is unstable, + * only inbound traffic could be blocked by {@link SystemRule} + * @throws BlockException if the block criteria is met, eg. when any rule's threshold is exceeded + * @since 0.2.0 + */ + public static AsyncEntry asyncEntry(String name, EntryType type) throws BlockException { + return Env.sph.asyncEntry(name, type, 1, OBJECTS0); + } + + /** + * Checking all {@link Rule}s about the asynchronous resource. + * + * @param name the unique name for the protected resource + * @param type the resource is an inbound or an outbound method. This is used + * to mark whether it can be blocked when the system is unstable, + * only inbound traffic could be blocked by {@link SystemRule} + * @param count tokens required + * @param args extra parameters + * @throws BlockException if the block criteria is met, eg. when any rule's threshold is exceeded + * @since 0.2.0 + */ + public static AsyncEntry asyncEntry(String name, EntryType type, int count, Object... args) throws BlockException { + return Env.sph.asyncEntry(name, type, count, args); + } } diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/context/Context.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/context/Context.java index 165c98a3..d970fb52 100755 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/context/Context.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/context/Context.java @@ -32,8 +32,9 @@ import com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot; *
  • the current {@link Entry}: the current invocation point.
  • *
  • the current {@link Node}: the statistics related to the * {@link Entry}.
  • - *
  • the origin:The origin is useful when we want to control different - * invoker/consumer separately. Usually the origin could be the Service Consumer's app name.
  • + *
  • the origin: The origin is useful when we want to control different + * invoker/consumer separately. Usually the origin could be the Service Consumer's app name + * or origin IP.
  • * *

    * Each {@link SphU}#entry() or {@link SphO}#entry() should be in a {@link Context}, @@ -58,7 +59,7 @@ public class Context { /** * Context name. */ - private String name; + private final String name; /** * The entrance node of current invocation tree. @@ -71,14 +72,36 @@ public class Context { private Entry curEntry; /** - * the origin of this context, usually the origin is the Service Consumer's app name. + * The origin of this context (usually indicate different invokers, e.g. service consumer name or origin IP). */ private String origin = ""; + private final boolean async; + + /** + * Create a new async context. + * + * @param entranceNode entrance node of the context + * @param name context name + * @return the new created context + * @since 0.2.0 + */ + public static Context newAsyncContext(DefaultNode entranceNode, String name) { + return new Context(name, entranceNode, true); + } + public Context(DefaultNode entranceNode, String name) { - super(); + this(name, entranceNode, false); + } + + public Context(String name, DefaultNode entranceNode, boolean async) { this.name = name; this.entranceNode = entranceNode; + this.async = async; + } + + public boolean isAsync() { + return async; } public String getName() { @@ -89,24 +112,27 @@ public class Context { return curEntry.getCurNode(); } - public void setCurNode(Node node) { + public Context setCurNode(Node node) { this.curEntry.setCurNode(node); + return this; } public Entry getCurEntry() { return curEntry; } - public void setCurEntry(Entry curEntry) { + public Context setCurEntry(Entry curEntry) { this.curEntry = curEntry; + return this; } public String getOrigin() { return origin; } - public void setOrigin(String origin) { + public Context setOrigin(String origin) { this.origin = origin; + return this; } public double getOriginTotalQps() { diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/context/ContextUtil.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/context/ContextUtil.java index 3b582262..752ea34e 100755 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/context/ContextUtil.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/context/ContextUtil.java @@ -175,4 +175,45 @@ public class ContextUtil { public static Context getContext() { return contextHolder.get(); } + + /** + *

    + * Replace current context with the provided context. + * This is mainly designed for context switching (e.g. in asynchronous invocation). + *

    + *

    + * Note: When switching context manually, remember to restore the original context. + * For common scenarios, you can use {@link #runOnContext(Context, Runnable)}. + *

    + * + * @param newContext new context to set + * @return old context + * @since 0.2.0 + */ + static Context replaceContext(Context newContext) { + Context backupContext = contextHolder.get(); + if (newContext == null) { + contextHolder.remove(); + } else { + contextHolder.set(newContext); + } + return backupContext; + } + + /** + * Execute the code within provided context. + * This is mainly designed for context switching (e.g. in asynchronous invocation). + * + * @param context the context + * @param f lambda to run within the context + * @since 0.2.0 + */ + public static void runOnContext(Context context, Runnable f) { + Context curContext = replaceContext(context); + try { + f.run(); + } finally { + replaceContext(curContext); + } + } } diff --git a/sentinel-core/src/test/java/com/alibaba/csp/sentinel/AsyncEntryIntegrationTest.java b/sentinel-core/src/test/java/com/alibaba/csp/sentinel/AsyncEntryIntegrationTest.java new file mode 100644 index 00000000..589ac19a --- /dev/null +++ b/sentinel-core/src/test/java/com/alibaba/csp/sentinel/AsyncEntryIntegrationTest.java @@ -0,0 +1,224 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel; + +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import com.alibaba.csp.sentinel.context.ContextUtil; +import com.alibaba.csp.sentinel.node.DefaultNode; +import com.alibaba.csp.sentinel.node.EntranceNode; +import com.alibaba.csp.sentinel.node.Node; +import com.alibaba.csp.sentinel.slots.block.BlockException; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.*; + +/** + * Integration test for asynchronous entry, including common scenarios. + * + * @author Eric Zhao + */ +public class AsyncEntryIntegrationTest { + + @Before + public void clearContext() { + if (ContextUtil.getContext() != null) { + ContextUtil.getContext().setCurEntry(null); + ContextUtil.exit(); + } + } + + private final ExecutorService pool = Executors.newFixedThreadPool(10); + + private void anotherAsync() { + try { + final AsyncEntry entry = SphU.asyncEntry("test-another-async"); + + runAsync(new Runnable() { + @Override + public void run() { + ContextUtil.runOnContext(entry.getAsyncContext(), new Runnable() { + @Override + public void run() { + try { + TimeUnit.SECONDS.sleep(2); + anotherSyncInAsync(); + System.out.println("Async result: 666"); + } catch (InterruptedException e) { + // Ignore. + } finally { + entry.exit(); + } + } + }); + } + }); + } catch (BlockException ex) { + ex.printStackTrace(); + } + } + + private void fetchSync() { + Entry entry = null; + try { + entry = SphU.entry("test-sync"); + } catch (BlockException ex) { + ex.printStackTrace(); + } finally { + if (entry != null) { + entry.exit(); + } + } + } + + private void fetchSyncInAsync() { + Entry entry = null; + try { + entry = SphU.entry("test-sync-in-async"); + } catch (BlockException ex) { + ex.printStackTrace(); + } finally { + if (entry != null) { + entry.exit(); + } + } + } + + public void anotherSyncInAsync() { + Entry entry = null; + try { + entry = SphU.entry("test-another-in-async"); + } catch (BlockException ex) { + ex.printStackTrace(); + } finally { + if (entry != null) { + entry.exit(); + } + } + } + + private void doAsyncThenSync() { + try { + // First we call an asynchronous resource. + final AsyncEntry entry = SphU.asyncEntry("test-async"); + this.invoke("abc", new Consumer() { + @Override + public void accept(final String resp) { + // The thread is different from original caller thread for async entry. + // So we need to wrap in the async context so that nested sync invocation entry + // can be linked to the parent asynchronous entry. + ContextUtil.runOnContext(entry.getAsyncContext(), new Runnable() { + @Override + public void run() { + try { + // In the callback, we do another async invocation under the async context. + anotherAsync(); + + System.out.println(resp); + + // Then we do a sync entry under current async context. + fetchSyncInAsync(); + } finally { + // Exit the async entry. + entry.exit(); + } + } + }); + } + }); + // Then we call a sync resource. + fetchSync(); + } catch (BlockException ex) { + // Request blocked, handle the exception. + ex.printStackTrace(); + } + } + + @Test + public void testAsyncEntryUnderSyncEntry() throws Exception { + // Expected invocation chain: + // EntranceNode: machine-root + // -EntranceNode: async-context + // --test-top + // ---test-async + // ----test-sync-in-async + // ----test-another-async + // -----test-another-in-async + // ---test-sync + ContextUtil.enter(contextName, origin); + Entry entry = null; + try { + entry = SphU.entry("test-top"); + doAsyncThenSync(); + } catch (BlockException ex) { + ex.printStackTrace(); + } finally { + if (entry != null) { + entry.exit(); + } + ContextUtil.exit(); + } + + TimeUnit.SECONDS.sleep(10); + testTreeCorrect(); + } + + private void testTreeCorrect() { + DefaultNode root = Constants.ROOT; + Set childListForRoot = root.getChildList(); + // TODO: check child tree + } + + @After + public void shutdown() { + pool.shutdownNow(); + ContextUtil.exit(); + } + + private void runAsync(Runnable f) { + // In Java 8, we can use CompletableFuture.runAsync(f) instead. + pool.submit(f); + } + + private void invoke(final String arg, final Consumer handler) { + runAsync(new Runnable() { + @Override + public void run() { + try { + TimeUnit.SECONDS.sleep(3); + String resp = arg + ": " + System.currentTimeMillis(); + handler.accept(resp); + + } catch (Exception ex) { + ex.printStackTrace(); + } + } + }); + } + + private interface Consumer { + void accept(T t); + } + + private final String contextName = "async-context"; + private final String origin = "originA"; +} diff --git a/sentinel-core/src/test/java/com/alibaba/csp/sentinel/AsyncEntryTest.java b/sentinel-core/src/test/java/com/alibaba/csp/sentinel/AsyncEntryTest.java new file mode 100644 index 00000000..c09dfb29 --- /dev/null +++ b/sentinel-core/src/test/java/com/alibaba/csp/sentinel/AsyncEntryTest.java @@ -0,0 +1,76 @@ +package com.alibaba.csp.sentinel; + +import com.alibaba.csp.sentinel.context.Context; +import com.alibaba.csp.sentinel.context.ContextUtil; +import com.alibaba.csp.sentinel.slotchain.StringResourceWrapper; + +import org.junit.Test; + +import static org.junit.Assert.*; + +/** + * Test cases for {@link AsyncEntry}. + * + * @author Eric Zhao + * @since 0.2.0 + */ +public class AsyncEntryTest { + + @Test + public void testCleanCurrentEntryInLocal() { + final String contextName = "abc"; + try { + ContextUtil.enter(contextName); + Context curContext = ContextUtil.getContext(); + AsyncEntry entry = new AsyncEntry(new StringResourceWrapper("testCleanCurrentEntryInLocal", EntryType.OUT), + null, curContext); + + assertSame(entry, curContext.getCurEntry()); + + entry.cleanCurrentEntryInLocal(); + assertNotSame(entry, curContext.getCurEntry()); + } finally { + ContextUtil.getContext().setCurEntry(null); + ContextUtil.exit(); + } + } + + @Test + public void testInitAndGetAsyncContext() { + final String contextName = "abc"; + final String origin = "xxx"; + try { + ContextUtil.enter(contextName, origin); + Context curContext = ContextUtil.getContext(); + AsyncEntry entry = new AsyncEntry(new StringResourceWrapper("testInitAndGetAsyncContext", EntryType.OUT), + null, curContext); + assertNull(entry.getAsyncContext()); + + entry.initAsyncContext(); + System.out.println(curContext.getName()); + System.out.println(curContext.getOrigin()); + + Context asyncContext = entry.getAsyncContext(); + assertNotNull(asyncContext); + assertEquals(contextName, asyncContext.getName()); + assertEquals(origin, asyncContext.getOrigin()); + assertSame(curContext.getEntranceNode(), asyncContext.getEntranceNode()); + assertSame(entry, asyncContext.getCurEntry()); + assertTrue(asyncContext.isAsync()); + } finally { + ContextUtil.getContext().setCurEntry(null); + ContextUtil.exit(); + } + } + + @Test(expected = IllegalStateException.class) + public void testDuplicateInitAsyncContext() { + Context context = new Context(null, "abc"); + AsyncEntry entry = new AsyncEntry(new StringResourceWrapper("testDuplicateInitAsyncContext", EntryType.OUT), + null, context); + entry.initAsyncContext(); + + // Duplicate init. + entry.initAsyncContext(); + } +} \ No newline at end of file diff --git a/sentinel-core/src/test/java/com/alibaba/csp/sentinel/ContextTest.java b/sentinel-core/src/test/java/com/alibaba/csp/sentinel/ContextTest.java deleted file mode 100755 index cb09f44a..00000000 --- a/sentinel-core/src/test/java/com/alibaba/csp/sentinel/ContextTest.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright 1999-2018 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.csp.sentinel; - -import org.junit.Test; - -import com.alibaba.csp.sentinel.context.ContextUtil; - -/** - * @author jialiang.linjl - */ -public class ContextTest { - - @Test - public void testEnterContext() { - // TODO: rewrite this unit test - ContextUtil.enter("entry", "origin"); - - ContextUtil.exit(); - } - -} diff --git a/sentinel-core/src/test/java/com/alibaba/csp/sentinel/context/ContextTest.java b/sentinel-core/src/test/java/com/alibaba/csp/sentinel/context/ContextTest.java new file mode 100755 index 00000000..09656a61 --- /dev/null +++ b/sentinel-core/src/test/java/com/alibaba/csp/sentinel/context/ContextTest.java @@ -0,0 +1,93 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.context; + +import org.junit.After; +import org.junit.Test; + +import static org.junit.Assert.*; + +/** + * Test cases for {@link Context} and {@link ContextUtil}. + * + * @author jialiang.linjl + * @author Eric Zhao + */ +public class ContextTest { + + @After + public void cleanUp() { + ContextUtil.exit(); + } + + @Test + public void testEnterContext() { + final String contextName = "contextA"; + final String origin = "originA"; + ContextUtil.enter(contextName, origin); + + Context curContext = ContextUtil.getContext(); + assertEquals(contextName, curContext.getName()); + assertEquals(origin, curContext.getOrigin()); + assertFalse(curContext.isAsync()); + + ContextUtil.exit(); + assertNull(ContextUtil.getContext()); + } + + @Test + public void testReplaceContext() { + final String contextName = "contextA"; + final String origin = "originA"; + ContextUtil.enter(contextName, origin); + + Context contextB = Context.newAsyncContext(null, "contextB") + .setOrigin("originA"); + Context contextA = ContextUtil.replaceContext(contextB); + assertEquals(contextName, contextA.getName()); + assertEquals(origin, contextA.getOrigin()); + assertFalse(contextA.isAsync()); + + Context curContextAfterReplace = ContextUtil.getContext(); + assertEquals(contextB.getName(), curContextAfterReplace.getName()); + assertEquals(contextB.getOrigin(), curContextAfterReplace.getOrigin()); + assertTrue(curContextAfterReplace.isAsync()); + + ContextUtil.replaceContext(null); + assertNull(ContextUtil.getContext()); + } + + @Test + public void testRunOnContext() { + final String contextName = "contextA"; + final String origin = "originA"; + ContextUtil.enter(contextName, origin); + + final Context contextB = Context.newAsyncContext(null, "contextB") + .setOrigin("originB"); + assertEquals(contextName, ContextUtil.getContext().getName()); + ContextUtil.runOnContext(contextB, new Runnable() { + @Override + public void run() { + Context curContext = ContextUtil.getContext(); + assertEquals(contextB.getName(), curContext.getName()); + assertEquals(contextB.getOrigin(), curContext.getOrigin()); + assertTrue(curContext.isAsync()); + } + }); + assertEquals(contextName, ContextUtil.getContext().getName()); + } +} diff --git a/sentinel-demo/pom.xml b/sentinel-demo/pom.xml index 31cb99a4..1ca9b159 100755 --- a/sentinel-demo/pom.xml +++ b/sentinel-demo/pom.xml @@ -12,6 +12,11 @@ pom sentinel-demo + + 1.8 + 1.8 + + sentinel-demo-basic sentinel-demo-dynamic-file-rule diff --git a/sentinel-demo/sentinel-demo-basic/src/main/java/com/alibaba/csp/sentinel/demo/AsyncEntryDemo.java b/sentinel-demo/sentinel-demo-basic/src/main/java/com/alibaba/csp/sentinel/demo/AsyncEntryDemo.java new file mode 100644 index 00000000..d9f5c391 --- /dev/null +++ b/sentinel-demo/sentinel-demo-basic/src/main/java/com/alibaba/csp/sentinel/demo/AsyncEntryDemo.java @@ -0,0 +1,218 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.demo; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +import com.alibaba.csp.sentinel.AsyncEntry; +import com.alibaba.csp.sentinel.Entry; +import com.alibaba.csp.sentinel.SphU; +import com.alibaba.csp.sentinel.context.ContextUtil; +import com.alibaba.csp.sentinel.slots.block.BlockException; +import com.alibaba.csp.sentinel.slots.block.RuleConstant; +import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; +import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager; + +/** + * An example for asynchronous entry in Sentinel. + * + * @author Eric Zhao + * @since 0.2.0 + */ +public class AsyncEntryDemo { + + private void invoke(String arg, Consumer handler) { + CompletableFuture.runAsync(() -> { + try { + TimeUnit.SECONDS.sleep(3); + String resp = arg + ": " + System.currentTimeMillis(); + handler.accept(resp); + } catch (Exception ex) { + ex.printStackTrace(); + } + }); + } + + private void anotherAsync() { + try { + final AsyncEntry entry = SphU.asyncEntry("test-another-async"); + + CompletableFuture.runAsync(() -> { + ContextUtil.runOnContext(entry.getAsyncContext(), () -> { + try { + TimeUnit.SECONDS.sleep(2); + anotherSyncInAsync(); + System.out.println("Async result: 666"); + } catch (InterruptedException e) { + // Ignore. + } finally { + entry.exit(); + } + }); + }); + } catch (BlockException ex) { + ex.printStackTrace(); + } + } + + private void fetchSync() { + Entry entry = null; + try { + entry = SphU.entry("test-sync"); + } catch (BlockException ex) { + ex.printStackTrace(); + } finally { + if (entry != null) { + entry.exit(); + } + } + } + + private void fetchSyncInAsync() { + Entry entry = null; + try { + entry = SphU.entry("test-sync-in-async"); + } catch (BlockException ex) { + ex.printStackTrace(); + } finally { + if (entry != null) { + entry.exit(); + } + } + } + + private void anotherSyncInAsync() { + Entry entry = null; + try { + entry = SphU.entry("test-another-sync-in-async"); + } catch (BlockException ex) { + ex.printStackTrace(); + } finally { + if (entry != null) { + entry.exit(); + } + } + } + + private void directlyAsync() { + try { + final AsyncEntry entry = SphU.asyncEntry("test-async-not-nested"); + + CompletableFuture.runAsync(() -> { + // If no nested entry later, we don't have to wrap in `ContextUtil.runOnContext()`. + try { + TimeUnit.SECONDS.sleep(1); + } catch (InterruptedException ex) { + // Ignore. + } finally { + // Exit the async entry. + entry.exit(); + } + }); + } catch (BlockException e) { + // Request blocked, handle the exception. + e.printStackTrace(); + } + } + + private void doAsyncThenSync() { + try { + // First we call an asynchronous resource. + final AsyncEntry entry = SphU.asyncEntry("test-async"); + this.invoke("abc", resp -> { + // The thread is different from original caller thread for async entry. + // So we need to wrap in the async context so that nested sync invocation entry + // can be linked to the parent asynchronous entry. + ContextUtil.runOnContext(entry.getAsyncContext(), () -> { + try { + // In the callback, we do another async invocation several times under the async context. + for (int i = 0; i < 7; i++) { + anotherAsync(); + } + + System.out.println(resp); + + // Then we do a sync entry under current async context. + fetchSyncInAsync(); + } finally { + // Exit the async entry. + entry.exit(); + } + }); + }); + // Then we call a sync resource. + fetchSync(); + } catch (BlockException ex) { + // Request blocked, handle the exception. + ex.printStackTrace(); + } + } + + public static void main(String[] args) { + initFlowRule(); + + AsyncEntryDemo service = new AsyncEntryDemo(); + + // Expected invocation chain: + // + // EntranceNode: machine-root + // -EntranceNode: async-context + // --test-top + // ---test-sync + // ---test-async + // ----test-another-async + // -----test-another-sync-in-async + // ----test-sync-in-async + ContextUtil.enter("async-context", "originA"); + Entry entry = null; + try { + entry = SphU.entry("test-top"); + System.out.println("Do something..."); + service.doAsyncThenSync(); + } catch (BlockException ex) { + // Request blocked, handle the exception. + ex.printStackTrace(); + } finally { + if (entry != null) { + entry.exit(); + } + ContextUtil.exit(); + } + } + + private static void initFlowRule() { + // Rule 1 won't take effect as the limitApp doesn't match. + FlowRule rule1 = new FlowRule() + .setResource("test-another-sync-in-async") + .setLimitApp("originB") + .as(FlowRule.class) + .setCount(4) + .setGrade(RuleConstant.FLOW_GRADE_QPS); + // Rule 2 will take effect. + FlowRule rule2 = new FlowRule() + .setResource("test-another-async") + .setLimitApp("default") + .as(FlowRule.class) + .setCount(5) + .setGrade(RuleConstant.FLOW_GRADE_QPS); + List ruleList = Arrays.asList(rule1, rule2); + FlowRuleManager.loadRules(ruleList); + } +}