From 62decf04640e875610fb18bacb4a649f3e5402c8 Mon Sep 17 00:00:00 2001 From: Eric Zhao Date: Thu, 20 Sep 2018 23:20:53 +0800 Subject: [PATCH] Some bug fixes and enhancement for async entry support Signed-off-by: Eric Zhao --- .../com/alibaba/csp/sentinel/AsyncEntry.java | 11 +++- .../java/com/alibaba/csp/sentinel/CtSph.java | 57 +++++++++++++++---- 2 files changed, 55 insertions(+), 13 deletions(-) diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/AsyncEntry.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/AsyncEntry.java index 82b49267..84b7c7e3 100644 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/AsyncEntry.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/AsyncEntry.java @@ -16,6 +16,8 @@ package com.alibaba.csp.sentinel; import com.alibaba.csp.sentinel.context.Context; +import com.alibaba.csp.sentinel.context.NullContext; +import com.alibaba.csp.sentinel.log.RecordLog; import com.alibaba.csp.sentinel.slotchain.ProcessorSlot; import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; @@ -37,6 +39,9 @@ public class AsyncEntry extends CtEntry { * Remove current entry from local context, but does not exit. */ void cleanCurrentEntryInLocal() { + if (context instanceof NullContext) { + return; + } Context originalContext = context; if (originalContext != null) { Entry curEntry = originalContext.getCurEntry(); @@ -61,11 +66,15 @@ public class AsyncEntry extends CtEntry { */ void initAsyncContext() { if (asyncContext == null) { + if (context instanceof NullContext) { + asyncContext = context; + return; + } this.asyncContext = Context.newAsyncContext(context.getEntranceNode(), context.getName()) .setOrigin(context.getOrigin()) .setCurEntry(this); } else { - throw new IllegalStateException("Duplicate initialize of async context"); + RecordLog.warn("[AsyncEntry] Duplicate initialize of async context for entry: " + resourceWrapper.getName()); } } diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/CtSph.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/CtSph.java index 8749d656..27e7c805 100755 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/CtSph.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/CtSph.java @@ -53,42 +53,56 @@ public class CtSph implements Sph { private static final Object LOCK = new Object(); + private AsyncEntry asyncEntryWithNoChain(ResourceWrapper resourceWrapper, Context context) { + AsyncEntry entry = new AsyncEntry(resourceWrapper, null, context); + entry.initAsyncContext(); + // The async entry will be removed from current context as soon as it has been created. + entry.cleanCurrentEntryInLocal(); + return entry; + } + private AsyncEntry asyncEntryInternal(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException { Context context = ContextUtil.getContext(); if (context instanceof NullContext) { // The {@link NullContext} indicates that the amount of context has exceeded the threshold, // so here init the entry only. No rule checking will be done. - return new AsyncEntry(resourceWrapper, null, context); + return asyncEntryWithNoChain(resourceWrapper, context); } if (context == null) { + // Using default context. context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", resourceWrapper.getType()); } // Global switch is turned off, so no rule checking will be done. if (!Constants.ON) { - return new AsyncEntry(resourceWrapper, null, context); + return asyncEntryWithNoChain(resourceWrapper, context); } ProcessorSlot chain = lookProcessChain(resourceWrapper); // Means processor cache size exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE}, so no rule checking will be done. if (chain == null) { - return new AsyncEntry(resourceWrapper, null, context); + return asyncEntryWithNoChain(resourceWrapper, context); } AsyncEntry asyncEntry = new AsyncEntry(resourceWrapper, chain, context); try { chain.entry(context, resourceWrapper, null, count, args); - // Initiate the async context. + // Initiate the async context only when the entry successfully passed the slot chain. asyncEntry.initAsyncContext(); + // The asynchronous call may take time in background, and current context should not be hanged on it. + // So we need to remove current async entry from current context. + asyncEntry.cleanCurrentEntryInLocal(); } catch (BlockException e1) { - asyncEntry.exit(count, args); + // When blocked, the async entry will be exited on current context. + // The async context will not be initialized. + asyncEntry.exitForContext(context, count, args); throw e1; } catch (Throwable e1) { - RecordLog.info("Sentinel unexpected exception", e1); - } finally { - // The asynchronous call may take time in background, and current context should not be hanged on it. - // So we need to remove current async entry from current context. + // This should not happen, unless there are errors existing in Sentinel internal. + // When this happens, async context is not initialized. + RecordLog.warn("Sentinel unexpected exception in asyncEntryInternal", e1); + asyncEntry.cleanCurrentEntryInLocal(); } return asyncEntry; @@ -119,6 +133,7 @@ public class CtSph implements Sph { } if (context == null) { + // Using default context. context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", resourceWrapper.getType()); } @@ -130,8 +145,8 @@ public class CtSph implements Sph { ProcessorSlot chain = lookProcessChain(resourceWrapper); /* - * Means processor cache size exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE}, so no - * rule checking will be done. + * Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE}, + * so no rule checking will be done. */ if (chain == null) { return new CtEntry(resourceWrapper, null, context); @@ -165,7 +180,7 @@ public class CtSph implements Sph { * @param resourceWrapper target resource * @return {@link ProcessorSlotChain} of the resource */ - private ProcessorSlot lookProcessChain(ResourceWrapper resourceWrapper) { + ProcessorSlot lookProcessChain(ResourceWrapper resourceWrapper) { ProcessorSlotChain chain = chainMap.get(resourceWrapper); if (chain == null) { synchronized (LOCK) { @@ -188,6 +203,24 @@ public class CtSph implements Sph { return chain; } + /** + * Reset the slot chain map. Only for internal test. + * + * @since 0.2.0 + */ + static void resetChainMap() { + chainMap.clear(); + } + + /** + * Only for internal test. + * + * @since 0.2.0 + */ + static Map getChainMap() { + return chainMap; + } + /** * This class is used for skip context name checking. */