浏览代码

Some bug fixes and enhancement for async entry support

Signed-off-by: Eric Zhao <sczyh16@gmail.com>
master
Eric Zhao 6 年前
父节点
当前提交
62decf0464
共有 2 个文件被更改,包括 55 次插入13 次删除
  1. +10
    -1
      sentinel-core/src/main/java/com/alibaba/csp/sentinel/AsyncEntry.java
  2. +45
    -12
      sentinel-core/src/main/java/com/alibaba/csp/sentinel/CtSph.java

+ 10
- 1
sentinel-core/src/main/java/com/alibaba/csp/sentinel/AsyncEntry.java 查看文件

@@ -16,6 +16,8 @@
package com.alibaba.csp.sentinel;

import com.alibaba.csp.sentinel.context.Context;
import com.alibaba.csp.sentinel.context.NullContext;
import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.slotchain.ProcessorSlot;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;

@@ -37,6 +39,9 @@ public class AsyncEntry extends CtEntry {
* Remove current entry from local context, but does not exit.
*/
void cleanCurrentEntryInLocal() {
if (context instanceof NullContext) {
return;
}
Context originalContext = context;
if (originalContext != null) {
Entry curEntry = originalContext.getCurEntry();
@@ -61,11 +66,15 @@ public class AsyncEntry extends CtEntry {
*/
void initAsyncContext() {
if (asyncContext == null) {
if (context instanceof NullContext) {
asyncContext = context;
return;
}
this.asyncContext = Context.newAsyncContext(context.getEntranceNode(), context.getName())
.setOrigin(context.getOrigin())
.setCurEntry(this);
} else {
throw new IllegalStateException("Duplicate initialize of async context");
RecordLog.warn("[AsyncEntry] Duplicate initialize of async context for entry: " + resourceWrapper.getName());
}
}



+ 45
- 12
sentinel-core/src/main/java/com/alibaba/csp/sentinel/CtSph.java 查看文件

@@ -53,42 +53,56 @@ public class CtSph implements Sph {

private static final Object LOCK = new Object();

private AsyncEntry asyncEntryWithNoChain(ResourceWrapper resourceWrapper, Context context) {
AsyncEntry entry = new AsyncEntry(resourceWrapper, null, context);
entry.initAsyncContext();
// The async entry will be removed from current context as soon as it has been created.
entry.cleanCurrentEntryInLocal();
return entry;
}

private AsyncEntry asyncEntryInternal(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
Context context = ContextUtil.getContext();
if (context instanceof NullContext) {
// The {@link NullContext} indicates that the amount of context has exceeded the threshold,
// so here init the entry only. No rule checking will be done.
return new AsyncEntry(resourceWrapper, null, context);
return asyncEntryWithNoChain(resourceWrapper, context);
}
if (context == null) {
// Using default context.
context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", resourceWrapper.getType());
}

// Global switch is turned off, so no rule checking will be done.
if (!Constants.ON) {
return new AsyncEntry(resourceWrapper, null, context);
return asyncEntryWithNoChain(resourceWrapper, context);
}

ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);

// Means processor cache size exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE}, so no rule checking will be done.
if (chain == null) {
return new AsyncEntry(resourceWrapper, null, context);
return asyncEntryWithNoChain(resourceWrapper, context);
}

AsyncEntry asyncEntry = new AsyncEntry(resourceWrapper, chain, context);
try {
chain.entry(context, resourceWrapper, null, count, args);
// Initiate the async context.
// Initiate the async context only when the entry successfully passed the slot chain.
asyncEntry.initAsyncContext();
// The asynchronous call may take time in background, and current context should not be hanged on it.
// So we need to remove current async entry from current context.
asyncEntry.cleanCurrentEntryInLocal();
} catch (BlockException e1) {
asyncEntry.exit(count, args);
// When blocked, the async entry will be exited on current context.
// The async context will not be initialized.
asyncEntry.exitForContext(context, count, args);
throw e1;
} catch (Throwable e1) {
RecordLog.info("Sentinel unexpected exception", e1);
} finally {
// The asynchronous call may take time in background, and current context should not be hanged on it.
// So we need to remove current async entry from current context.
// This should not happen, unless there are errors existing in Sentinel internal.
// When this happens, async context is not initialized.
RecordLog.warn("Sentinel unexpected exception in asyncEntryInternal", e1);
asyncEntry.cleanCurrentEntryInLocal();
}
return asyncEntry;
@@ -119,6 +133,7 @@ public class CtSph implements Sph {
}

if (context == null) {
// Using default context.
context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", resourceWrapper.getType());
}

@@ -130,8 +145,8 @@ public class CtSph implements Sph {
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);

/*
* Means processor cache size exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE}, so no
* rule checking will be done.
* Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},
* so no rule checking will be done.
*/
if (chain == null) {
return new CtEntry(resourceWrapper, null, context);
@@ -165,7 +180,7 @@ public class CtSph implements Sph {
* @param resourceWrapper target resource
* @return {@link ProcessorSlotChain} of the resource
*/
private ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
ProcessorSlotChain chain = chainMap.get(resourceWrapper);
if (chain == null) {
synchronized (LOCK) {
@@ -188,6 +203,24 @@ public class CtSph implements Sph {
return chain;
}

/**
* Reset the slot chain map. Only for internal test.
*
* @since 0.2.0
*/
static void resetChainMap() {
chainMap.clear();
}

/**
* Only for internal test.
*
* @since 0.2.0
*/
static Map<ResourceWrapper, ProcessorSlotChain> getChainMap() {
return chainMap;
}

/**
* This class is used for skip context name checking.
*/


正在加载...
取消
保存