- Add several `asyncEntry` method in `Sph` interface and `SphU` class. - Add a new `AsyncEntry` class for asynchronous entry representation. Some refactor for `CtEntry`. - Refactored `CtSph` and implement `asyncEntryInternal` for asynchronous entry. - Add `runOnContext` and `replaceContext` method in `ContextUtil` for context switching.master
@@ -0,0 +1,84 @@ | |||||
/* | |||||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||||
* | |||||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||||
* you may not use this file except in compliance with the License. | |||||
* You may obtain a copy of the License at | |||||
* | |||||
* http://www.apache.org/licenses/LICENSE-2.0 | |||||
* | |||||
* Unless required by applicable law or agreed to in writing, software | |||||
* distributed under the License is distributed on an "AS IS" BASIS, | |||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
* See the License for the specific language governing permissions and | |||||
* limitations under the License. | |||||
*/ | |||||
package com.alibaba.csp.sentinel; | |||||
import com.alibaba.csp.sentinel.context.Context; | |||||
import com.alibaba.csp.sentinel.slotchain.ProcessorSlot; | |||||
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; | |||||
/** | |||||
* The entry for asynchronous resources. | |||||
* | |||||
* @author Eric Zhao | |||||
* @since 0.2.0 | |||||
*/ | |||||
public class AsyncEntry extends CtEntry { | |||||
private Context asyncContext; | |||||
AsyncEntry(ResourceWrapper resourceWrapper, ProcessorSlot<Object> chain, Context context) { | |||||
super(resourceWrapper, chain, context); | |||||
} | |||||
/** | |||||
* Remove current entry from local context, but does not exit. | |||||
*/ | |||||
void cleanCurrentEntryInLocal() { | |||||
Context originalContext = context; | |||||
if (originalContext != null) { | |||||
Entry curEntry = originalContext.getCurEntry(); | |||||
if (curEntry == this) { | |||||
Entry parent = this.parent; | |||||
originalContext.setCurEntry(parent); | |||||
if (parent != null) { | |||||
((CtEntry)parent).child = null; | |||||
} | |||||
} else { | |||||
throw new IllegalStateException("Bad async context state"); | |||||
} | |||||
} | |||||
} | |||||
public Context getAsyncContext() { | |||||
return asyncContext; | |||||
} | |||||
/** | |||||
* The async context should not be initialized until the node for current resource has been set to current entry. | |||||
*/ | |||||
void initAsyncContext() { | |||||
if (asyncContext == null) { | |||||
this.asyncContext = Context.newAsyncContext(context.getEntranceNode(), context.getName()) | |||||
.setOrigin(context.getOrigin()) | |||||
.setCurEntry(this); | |||||
} else { | |||||
throw new IllegalStateException("Duplicate initialize of async context"); | |||||
} | |||||
} | |||||
@Override | |||||
protected void clearEntryContext() { | |||||
super.clearEntryContext(); | |||||
this.asyncContext = null; | |||||
} | |||||
@Override | |||||
protected Entry trueExit(int count, Object... args) throws ErrorEntryFreeException { | |||||
exitForContext(asyncContext, count, args); | |||||
return parent; | |||||
} | |||||
} |
@@ -0,0 +1,103 @@ | |||||
/* | |||||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||||
* | |||||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||||
* you may not use this file except in compliance with the License. | |||||
* You may obtain a copy of the License at | |||||
* | |||||
* http://www.apache.org/licenses/LICENSE-2.0 | |||||
* | |||||
* Unless required by applicable law or agreed to in writing, software | |||||
* distributed under the License is distributed on an "AS IS" BASIS, | |||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
* See the License for the specific language governing permissions and | |||||
* limitations under the License. | |||||
*/ | |||||
package com.alibaba.csp.sentinel; | |||||
import com.alibaba.csp.sentinel.context.Context; | |||||
import com.alibaba.csp.sentinel.context.ContextUtil; | |||||
import com.alibaba.csp.sentinel.node.Node; | |||||
import com.alibaba.csp.sentinel.slotchain.ProcessorSlot; | |||||
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; | |||||
/** | |||||
* Linked entry within current context. | |||||
* | |||||
* @author jialiang.linjl | |||||
* @author Eric Zhao | |||||
*/ | |||||
class CtEntry extends Entry { | |||||
protected Entry parent = null; | |||||
protected Entry child = null; | |||||
protected ProcessorSlot<Object> chain; | |||||
protected Context context; | |||||
CtEntry(ResourceWrapper resourceWrapper, ProcessorSlot<Object> chain, Context context) { | |||||
super(resourceWrapper); | |||||
this.chain = chain; | |||||
this.context = context; | |||||
setUpEntryFor(context); | |||||
} | |||||
private void setUpEntryFor(Context context) { | |||||
this.parent = context.getCurEntry(); | |||||
if (parent != null) { | |||||
((CtEntry)parent).child = this; | |||||
} | |||||
context.setCurEntry(this); | |||||
} | |||||
@Override | |||||
public void exit(int count, Object... args) throws ErrorEntryFreeException { | |||||
trueExit(count, args); | |||||
} | |||||
protected void exitForContext(Context context, int count, Object... args) throws ErrorEntryFreeException { | |||||
if (context != null) { | |||||
if (context.getCurEntry() != this) { | |||||
// Clean previous call stack. | |||||
CtEntry e = (CtEntry)context.getCurEntry(); | |||||
while (e != null) { | |||||
e.exit(count, args); | |||||
e = (CtEntry)e.parent; | |||||
} | |||||
throw new ErrorEntryFreeException("The order of entry free is can't be paired with the order of entry"); | |||||
} else { | |||||
if (chain != null) { | |||||
chain.exit(context, resourceWrapper, count, args); | |||||
} | |||||
// Restore the call stack. | |||||
context.setCurEntry(parent); | |||||
if (parent != null) { | |||||
((CtEntry)parent).child = null; | |||||
} | |||||
if (parent == null) { | |||||
// Auto-created entry indicates immediate exit. | |||||
ContextUtil.exit(); | |||||
} | |||||
// Clean the reference of context in current entry to avoid duplicate exit. | |||||
clearEntryContext(); | |||||
} | |||||
} | |||||
} | |||||
protected void clearEntryContext() { | |||||
this.context = null; | |||||
} | |||||
@Override | |||||
protected Entry trueExit(int count, Object... args) throws ErrorEntryFreeException { | |||||
exitForContext(context, count, args); | |||||
return parent; | |||||
} | |||||
@Override | |||||
public Node getLastNode() { | |||||
return parent == null ? null : parent.getCurNode(); | |||||
} | |||||
} |
@@ -23,7 +23,6 @@ import com.alibaba.csp.sentinel.log.RecordLog; | |||||
import com.alibaba.csp.sentinel.context.Context; | import com.alibaba.csp.sentinel.context.Context; | ||||
import com.alibaba.csp.sentinel.context.ContextUtil; | import com.alibaba.csp.sentinel.context.ContextUtil; | ||||
import com.alibaba.csp.sentinel.context.NullContext; | import com.alibaba.csp.sentinel.context.NullContext; | ||||
import com.alibaba.csp.sentinel.node.Node; | |||||
import com.alibaba.csp.sentinel.slotchain.MethodResourceWrapper; | import com.alibaba.csp.sentinel.slotchain.MethodResourceWrapper; | ||||
import com.alibaba.csp.sentinel.slotchain.ProcessorSlot; | import com.alibaba.csp.sentinel.slotchain.ProcessorSlot; | ||||
import com.alibaba.csp.sentinel.slotchain.ProcessorSlotChain; | import com.alibaba.csp.sentinel.slotchain.ProcessorSlotChain; | ||||
@@ -54,6 +53,46 @@ public class CtSph implements Sph { | |||||
private static final Object LOCK = new Object(); | private static final Object LOCK = new Object(); | ||||
private AsyncEntry asyncEntryInternal(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException { | |||||
Context context = ContextUtil.getContext(); | |||||
if (context instanceof NullContext) { | |||||
// Init the entry only. No rule checking will occur. | |||||
return new AsyncEntry(resourceWrapper, null, context); | |||||
} | |||||
if (context == null) { | |||||
context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", resourceWrapper.getType()); | |||||
} | |||||
// Global switch is turned off, so no rule checking will be done. | |||||
if (!Constants.ON) { | |||||
return new AsyncEntry(resourceWrapper, null, context); | |||||
} | |||||
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper); | |||||
// Means processor cache size exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE}, so no rule checking will be done. | |||||
if (chain == null) { | |||||
return new AsyncEntry(resourceWrapper, null, context); | |||||
} | |||||
AsyncEntry asyncEntry = new AsyncEntry(resourceWrapper, chain, context); | |||||
try { | |||||
chain.entry(context, resourceWrapper, null, count, args); | |||||
// Initiate the async context. | |||||
asyncEntry.initAsyncContext(); | |||||
} catch (BlockException e1) { | |||||
asyncEntry.exit(count, args); | |||||
throw e1; | |||||
} catch (Throwable e1) { | |||||
RecordLog.info("Sentinel unexpected exception", e1); | |||||
} finally { | |||||
// The asynchronous call may take time in background, and current context should not be hanged on it. | |||||
// So we need to remove current async entry from current context. | |||||
asyncEntry.cleanCurrentEntryInLocal(); | |||||
} | |||||
return asyncEntry; | |||||
} | |||||
/** | /** | ||||
* Do all {@link Rule}s checking about the resource. | * Do all {@link Rule}s checking about the resource. | ||||
* | * | ||||
@@ -146,68 +185,6 @@ public class CtSph implements Sph { | |||||
return chain; | return chain; | ||||
} | } | ||||
private static class CtEntry extends Entry { | |||||
protected Entry parent = null; | |||||
protected Entry child = null; | |||||
private ProcessorSlot<Object> chain; | |||||
private Context context; | |||||
CtEntry(ResourceWrapper resourceWrapper, ProcessorSlot<Object> chain, Context context) { | |||||
super(resourceWrapper); | |||||
this.chain = chain; | |||||
this.context = context; | |||||
parent = context.getCurEntry(); | |||||
if (parent != null) { | |||||
((CtEntry)parent).child = this; | |||||
} | |||||
context.setCurEntry(this); | |||||
} | |||||
@Override | |||||
public void exit(int count, Object... args) throws ErrorEntryFreeException { | |||||
trueExit(count, args); | |||||
} | |||||
@Override | |||||
protected Entry trueExit(int count, Object... args) throws ErrorEntryFreeException { | |||||
if (context != null) { | |||||
if (context.getCurEntry() != this) { | |||||
// Clean previous call stack. | |||||
CtEntry e = (CtEntry)context.getCurEntry(); | |||||
while (e != null) { | |||||
e.exit(count, args); | |||||
e = (CtEntry)e.parent; | |||||
} | |||||
throw new ErrorEntryFreeException( | |||||
"The order of entry free is can't be paired with the order of entry"); | |||||
} else { | |||||
if (chain != null) { | |||||
chain.exit(context, resourceWrapper, count, args); | |||||
} | |||||
// Modify the call stack. | |||||
context.setCurEntry(parent); | |||||
if (parent != null) { | |||||
((CtEntry)parent).child = null; | |||||
} | |||||
if (parent == null) { | |||||
// Auto-created entry indicates immediate exit. | |||||
ContextUtil.exit(); | |||||
} | |||||
// Clean the reference of context in current entry to avoid duplicate exit. | |||||
context = null; | |||||
} | |||||
} | |||||
return parent; | |||||
} | |||||
@Override | |||||
public Node getLastNode() { | |||||
return parent == null ? null : parent.getCurNode(); | |||||
} | |||||
} | |||||
/** | /** | ||||
* This class is used for skip context name checking. | * This class is used for skip context name checking. | ||||
*/ | */ | ||||
@@ -276,4 +253,10 @@ public class CtSph implements Sph { | |||||
StringResourceWrapper resource = new StringResourceWrapper(name, type); | StringResourceWrapper resource = new StringResourceWrapper(name, type); | ||||
return entry(resource, count, args); | return entry(resource, count, args); | ||||
} | } | ||||
@Override | |||||
public AsyncEntry asyncEntry(String name, EntryType type, int count, Object... args) throws BlockException { | |||||
StringResourceWrapper resource = new StringResourceWrapper(name, type); | |||||
return asyncEntryInternal(resource, count, args); | |||||
} | |||||
} | } |
@@ -27,6 +27,7 @@ import com.alibaba.csp.sentinel.slots.block.BlockException; | |||||
* @author qinan.qn | * @author qinan.qn | ||||
* @author jialiang.linjl | * @author jialiang.linjl | ||||
* @author leyou | * @author leyou | ||||
* @author Eric Zhao | |||||
*/ | */ | ||||
public interface Sph { | public interface Sph { | ||||
@@ -135,11 +136,23 @@ public interface Sph { | |||||
* @param type the resource is an inbound or an outbound method. This is used | * @param type the resource is an inbound or an outbound method. This is used | ||||
* to mark whether it can be blocked when the system is unstable | * to mark whether it can be blocked when the system is unstable | ||||
* @param count the count that the resource requires | * @param count the count that the resource requires | ||||
* @param args the parameters of the method. It can also be counted by setting | |||||
* hot parameter rule | |||||
* @return entry get. | |||||
* @param args the parameters of the method. It can also be counted by setting hot parameter rule | |||||
* @return entry get | |||||
* @throws BlockException if the block criteria is met | * @throws BlockException if the block criteria is met | ||||
*/ | */ | ||||
Entry entry(String name, EntryType type, int count, Object... args) throws BlockException; | Entry entry(String name, EntryType type, int count, Object... args) throws BlockException; | ||||
/** | |||||
* Create a protected asynchronous resource. | |||||
* | |||||
* @param name the unique name for the protected resource | |||||
* @param type the resource is an inbound or an outbound method. This is used | |||||
* to mark whether it can be blocked when the system is unstable | |||||
* @param count the count that the resource requires | |||||
* @param args the parameters of the method. It can also be counted by setting hot parameter rule | |||||
* @return created asynchronous entry | |||||
* @throws BlockException if the block criteria is met | |||||
* @since 0.2.0 | |||||
*/ | |||||
AsyncEntry asyncEntry(String name, EntryType type, int count, Object... args) throws BlockException; | |||||
} | } |
@@ -69,6 +69,7 @@ import com.alibaba.csp.sentinel.slots.system.SystemRuleManager; | |||||
* </p> | * </p> | ||||
* | * | ||||
* @author jialiang.linjl | * @author jialiang.linjl | ||||
* @author Eric Zhao | |||||
* @see SphO | * @see SphO | ||||
*/ | */ | ||||
public class SphU { | public class SphU { | ||||
@@ -200,4 +201,45 @@ public class SphU { | |||||
public static Entry entry(String name, EntryType type, int count, Object... args) throws BlockException { | public static Entry entry(String name, EntryType type, int count, Object... args) throws BlockException { | ||||
return Env.sph.entry(name, type, count, args); | return Env.sph.entry(name, type, count, args); | ||||
} | } | ||||
/** | |||||
* Checking all rules about the asynchronous resource. | |||||
* | |||||
* @param name the unique name of the protected resource | |||||
* @throws BlockException if the block criteria is met, eg. when any rule's threshold is exceeded | |||||
* @since 0.2.0 | |||||
*/ | |||||
public static AsyncEntry asyncEntry(String name) throws BlockException { | |||||
return Env.sph.asyncEntry(name, EntryType.OUT, 1, OBJECTS0); | |||||
} | |||||
/** | |||||
* Checking all {@link Rule}s about the asynchronous resource. | |||||
* | |||||
* @param name the unique name for the protected resource | |||||
* @param type the resource is an inbound or an outbound method. This is used | |||||
* to mark whether it can be blocked when the system is unstable, | |||||
* only inbound traffic could be blocked by {@link SystemRule} | |||||
* @throws BlockException if the block criteria is met, eg. when any rule's threshold is exceeded | |||||
* @since 0.2.0 | |||||
*/ | |||||
public static AsyncEntry asyncEntry(String name, EntryType type) throws BlockException { | |||||
return Env.sph.asyncEntry(name, type, 1, OBJECTS0); | |||||
} | |||||
/** | |||||
* Checking all {@link Rule}s about the asynchronous resource. | |||||
* | |||||
* @param name the unique name for the protected resource | |||||
* @param type the resource is an inbound or an outbound method. This is used | |||||
* to mark whether it can be blocked when the system is unstable, | |||||
* only inbound traffic could be blocked by {@link SystemRule} | |||||
* @param count tokens required | |||||
* @param args extra parameters | |||||
* @throws BlockException if the block criteria is met, eg. when any rule's threshold is exceeded | |||||
* @since 0.2.0 | |||||
*/ | |||||
public static AsyncEntry asyncEntry(String name, EntryType type, int count, Object... args) throws BlockException { | |||||
return Env.sph.asyncEntry(name, type, count, args); | |||||
} | |||||
} | } |
@@ -32,8 +32,9 @@ import com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot; | |||||
* <li>the current {@link Entry}: the current invocation point.</li> | * <li>the current {@link Entry}: the current invocation point.</li> | ||||
* <li>the current {@link Node}: the statistics related to the | * <li>the current {@link Node}: the statistics related to the | ||||
* {@link Entry}.</li> | * {@link Entry}.</li> | ||||
* <li>the origin:The origin is useful when we want to control different | |||||
* invoker/consumer separately. Usually the origin could be the Service Consumer's app name. </li> | |||||
* <li>the origin: The origin is useful when we want to control different | |||||
* invoker/consumer separately. Usually the origin could be the Service Consumer's app name | |||||
* or origin IP. </li> | |||||
* </ul> | * </ul> | ||||
* <p> | * <p> | ||||
* Each {@link SphU}#entry() or {@link SphO}#entry() should be in a {@link Context}, | * Each {@link SphU}#entry() or {@link SphO}#entry() should be in a {@link Context}, | ||||
@@ -58,7 +59,7 @@ public class Context { | |||||
/** | /** | ||||
* Context name. | * Context name. | ||||
*/ | */ | ||||
private String name; | |||||
private final String name; | |||||
/** | /** | ||||
* The entrance node of current invocation tree. | * The entrance node of current invocation tree. | ||||
@@ -71,14 +72,36 @@ public class Context { | |||||
private Entry curEntry; | private Entry curEntry; | ||||
/** | /** | ||||
* the origin of this context, usually the origin is the Service Consumer's app name. | |||||
* The origin of this context (usually indicate different invokers, e.g. service consumer name or origin IP). | |||||
*/ | */ | ||||
private String origin = ""; | private String origin = ""; | ||||
private final boolean async; | |||||
/** | |||||
* Create a new async context. | |||||
* | |||||
* @param entranceNode entrance node of the context | |||||
* @param name context name | |||||
* @return the new created context | |||||
* @since 0.2.0 | |||||
*/ | |||||
public static Context newAsyncContext(DefaultNode entranceNode, String name) { | |||||
return new Context(name, entranceNode, true); | |||||
} | |||||
public Context(DefaultNode entranceNode, String name) { | public Context(DefaultNode entranceNode, String name) { | ||||
super(); | |||||
this(name, entranceNode, false); | |||||
} | |||||
public Context(String name, DefaultNode entranceNode, boolean async) { | |||||
this.name = name; | this.name = name; | ||||
this.entranceNode = entranceNode; | this.entranceNode = entranceNode; | ||||
this.async = async; | |||||
} | |||||
public boolean isAsync() { | |||||
return async; | |||||
} | } | ||||
public String getName() { | public String getName() { | ||||
@@ -89,24 +112,27 @@ public class Context { | |||||
return curEntry.getCurNode(); | return curEntry.getCurNode(); | ||||
} | } | ||||
public void setCurNode(Node node) { | |||||
public Context setCurNode(Node node) { | |||||
this.curEntry.setCurNode(node); | this.curEntry.setCurNode(node); | ||||
return this; | |||||
} | } | ||||
public Entry getCurEntry() { | public Entry getCurEntry() { | ||||
return curEntry; | return curEntry; | ||||
} | } | ||||
public void setCurEntry(Entry curEntry) { | |||||
public Context setCurEntry(Entry curEntry) { | |||||
this.curEntry = curEntry; | this.curEntry = curEntry; | ||||
return this; | |||||
} | } | ||||
public String getOrigin() { | public String getOrigin() { | ||||
return origin; | return origin; | ||||
} | } | ||||
public void setOrigin(String origin) { | |||||
public Context setOrigin(String origin) { | |||||
this.origin = origin; | this.origin = origin; | ||||
return this; | |||||
} | } | ||||
public double getOriginTotalQps() { | public double getOriginTotalQps() { | ||||
@@ -175,4 +175,45 @@ public class ContextUtil { | |||||
public static Context getContext() { | public static Context getContext() { | ||||
return contextHolder.get(); | return contextHolder.get(); | ||||
} | } | ||||
/** | |||||
* <p> | |||||
* Replace current context with the provided context. | |||||
* This is mainly designed for context switching (e.g. in asynchronous invocation). | |||||
* </p> | |||||
* <p> | |||||
* Note: When switching context manually, remember to restore the original context. | |||||
* For common scenarios, you can use {@link #runOnContext(Context, Runnable)}. | |||||
* </p> | |||||
* | |||||
* @param newContext new context to set | |||||
* @return old context | |||||
* @since 0.2.0 | |||||
*/ | |||||
static Context replaceContext(Context newContext) { | |||||
Context backupContext = contextHolder.get(); | |||||
if (newContext == null) { | |||||
contextHolder.remove(); | |||||
} else { | |||||
contextHolder.set(newContext); | |||||
} | |||||
return backupContext; | |||||
} | |||||
/** | |||||
* Execute the code within provided context. | |||||
* This is mainly designed for context switching (e.g. in asynchronous invocation). | |||||
* | |||||
* @param context the context | |||||
* @param f lambda to run within the context | |||||
* @since 0.2.0 | |||||
*/ | |||||
public static void runOnContext(Context context, Runnable f) { | |||||
Context curContext = replaceContext(context); | |||||
try { | |||||
f.run(); | |||||
} finally { | |||||
replaceContext(curContext); | |||||
} | |||||
} | |||||
} | } |
@@ -0,0 +1,224 @@ | |||||
/* | |||||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||||
* | |||||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||||
* you may not use this file except in compliance with the License. | |||||
* You may obtain a copy of the License at | |||||
* | |||||
* http://www.apache.org/licenses/LICENSE-2.0 | |||||
* | |||||
* Unless required by applicable law or agreed to in writing, software | |||||
* distributed under the License is distributed on an "AS IS" BASIS, | |||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
* See the License for the specific language governing permissions and | |||||
* limitations under the License. | |||||
*/ | |||||
package com.alibaba.csp.sentinel; | |||||
import java.util.Set; | |||||
import java.util.concurrent.ExecutorService; | |||||
import java.util.concurrent.Executors; | |||||
import java.util.concurrent.TimeUnit; | |||||
import com.alibaba.csp.sentinel.context.ContextUtil; | |||||
import com.alibaba.csp.sentinel.node.DefaultNode; | |||||
import com.alibaba.csp.sentinel.node.EntranceNode; | |||||
import com.alibaba.csp.sentinel.node.Node; | |||||
import com.alibaba.csp.sentinel.slots.block.BlockException; | |||||
import org.junit.After; | |||||
import org.junit.Before; | |||||
import org.junit.Test; | |||||
import static org.junit.Assert.*; | |||||
/** | |||||
* Integration test for asynchronous entry, including common scenarios. | |||||
* | |||||
* @author Eric Zhao | |||||
*/ | |||||
public class AsyncEntryIntegrationTest { | |||||
@Before | |||||
public void clearContext() { | |||||
if (ContextUtil.getContext() != null) { | |||||
ContextUtil.getContext().setCurEntry(null); | |||||
ContextUtil.exit(); | |||||
} | |||||
} | |||||
private final ExecutorService pool = Executors.newFixedThreadPool(10); | |||||
private void anotherAsync() { | |||||
try { | |||||
final AsyncEntry entry = SphU.asyncEntry("test-another-async"); | |||||
runAsync(new Runnable() { | |||||
@Override | |||||
public void run() { | |||||
ContextUtil.runOnContext(entry.getAsyncContext(), new Runnable() { | |||||
@Override | |||||
public void run() { | |||||
try { | |||||
TimeUnit.SECONDS.sleep(2); | |||||
anotherSyncInAsync(); | |||||
System.out.println("Async result: 666"); | |||||
} catch (InterruptedException e) { | |||||
// Ignore. | |||||
} finally { | |||||
entry.exit(); | |||||
} | |||||
} | |||||
}); | |||||
} | |||||
}); | |||||
} catch (BlockException ex) { | |||||
ex.printStackTrace(); | |||||
} | |||||
} | |||||
private void fetchSync() { | |||||
Entry entry = null; | |||||
try { | |||||
entry = SphU.entry("test-sync"); | |||||
} catch (BlockException ex) { | |||||
ex.printStackTrace(); | |||||
} finally { | |||||
if (entry != null) { | |||||
entry.exit(); | |||||
} | |||||
} | |||||
} | |||||
private void fetchSyncInAsync() { | |||||
Entry entry = null; | |||||
try { | |||||
entry = SphU.entry("test-sync-in-async"); | |||||
} catch (BlockException ex) { | |||||
ex.printStackTrace(); | |||||
} finally { | |||||
if (entry != null) { | |||||
entry.exit(); | |||||
} | |||||
} | |||||
} | |||||
public void anotherSyncInAsync() { | |||||
Entry entry = null; | |||||
try { | |||||
entry = SphU.entry("test-another-in-async"); | |||||
} catch (BlockException ex) { | |||||
ex.printStackTrace(); | |||||
} finally { | |||||
if (entry != null) { | |||||
entry.exit(); | |||||
} | |||||
} | |||||
} | |||||
private void doAsyncThenSync() { | |||||
try { | |||||
// First we call an asynchronous resource. | |||||
final AsyncEntry entry = SphU.asyncEntry("test-async"); | |||||
this.invoke("abc", new Consumer<String>() { | |||||
@Override | |||||
public void accept(final String resp) { | |||||
// The thread is different from original caller thread for async entry. | |||||
// So we need to wrap in the async context so that nested sync invocation entry | |||||
// can be linked to the parent asynchronous entry. | |||||
ContextUtil.runOnContext(entry.getAsyncContext(), new Runnable() { | |||||
@Override | |||||
public void run() { | |||||
try { | |||||
// In the callback, we do another async invocation under the async context. | |||||
anotherAsync(); | |||||
System.out.println(resp); | |||||
// Then we do a sync entry under current async context. | |||||
fetchSyncInAsync(); | |||||
} finally { | |||||
// Exit the async entry. | |||||
entry.exit(); | |||||
} | |||||
} | |||||
}); | |||||
} | |||||
}); | |||||
// Then we call a sync resource. | |||||
fetchSync(); | |||||
} catch (BlockException ex) { | |||||
// Request blocked, handle the exception. | |||||
ex.printStackTrace(); | |||||
} | |||||
} | |||||
@Test | |||||
public void testAsyncEntryUnderSyncEntry() throws Exception { | |||||
// Expected invocation chain: | |||||
// EntranceNode: machine-root | |||||
// -EntranceNode: async-context | |||||
// --test-top | |||||
// ---test-async | |||||
// ----test-sync-in-async | |||||
// ----test-another-async | |||||
// -----test-another-in-async | |||||
// ---test-sync | |||||
ContextUtil.enter(contextName, origin); | |||||
Entry entry = null; | |||||
try { | |||||
entry = SphU.entry("test-top"); | |||||
doAsyncThenSync(); | |||||
} catch (BlockException ex) { | |||||
ex.printStackTrace(); | |||||
} finally { | |||||
if (entry != null) { | |||||
entry.exit(); | |||||
} | |||||
ContextUtil.exit(); | |||||
} | |||||
TimeUnit.SECONDS.sleep(10); | |||||
testTreeCorrect(); | |||||
} | |||||
private void testTreeCorrect() { | |||||
DefaultNode root = Constants.ROOT; | |||||
Set<Node> childListForRoot = root.getChildList(); | |||||
// TODO: check child tree | |||||
} | |||||
@After | |||||
public void shutdown() { | |||||
pool.shutdownNow(); | |||||
ContextUtil.exit(); | |||||
} | |||||
private void runAsync(Runnable f) { | |||||
// In Java 8, we can use CompletableFuture.runAsync(f) instead. | |||||
pool.submit(f); | |||||
} | |||||
private void invoke(final String arg, final Consumer<String> handler) { | |||||
runAsync(new Runnable() { | |||||
@Override | |||||
public void run() { | |||||
try { | |||||
TimeUnit.SECONDS.sleep(3); | |||||
String resp = arg + ": " + System.currentTimeMillis(); | |||||
handler.accept(resp); | |||||
} catch (Exception ex) { | |||||
ex.printStackTrace(); | |||||
} | |||||
} | |||||
}); | |||||
} | |||||
private interface Consumer<T> { | |||||
void accept(T t); | |||||
} | |||||
private final String contextName = "async-context"; | |||||
private final String origin = "originA"; | |||||
} |
@@ -0,0 +1,76 @@ | |||||
package com.alibaba.csp.sentinel; | |||||
import com.alibaba.csp.sentinel.context.Context; | |||||
import com.alibaba.csp.sentinel.context.ContextUtil; | |||||
import com.alibaba.csp.sentinel.slotchain.StringResourceWrapper; | |||||
import org.junit.Test; | |||||
import static org.junit.Assert.*; | |||||
/** | |||||
* Test cases for {@link AsyncEntry}. | |||||
* | |||||
* @author Eric Zhao | |||||
* @since 0.2.0 | |||||
*/ | |||||
public class AsyncEntryTest { | |||||
@Test | |||||
public void testCleanCurrentEntryInLocal() { | |||||
final String contextName = "abc"; | |||||
try { | |||||
ContextUtil.enter(contextName); | |||||
Context curContext = ContextUtil.getContext(); | |||||
AsyncEntry entry = new AsyncEntry(new StringResourceWrapper("testCleanCurrentEntryInLocal", EntryType.OUT), | |||||
null, curContext); | |||||
assertSame(entry, curContext.getCurEntry()); | |||||
entry.cleanCurrentEntryInLocal(); | |||||
assertNotSame(entry, curContext.getCurEntry()); | |||||
} finally { | |||||
ContextUtil.getContext().setCurEntry(null); | |||||
ContextUtil.exit(); | |||||
} | |||||
} | |||||
@Test | |||||
public void testInitAndGetAsyncContext() { | |||||
final String contextName = "abc"; | |||||
final String origin = "xxx"; | |||||
try { | |||||
ContextUtil.enter(contextName, origin); | |||||
Context curContext = ContextUtil.getContext(); | |||||
AsyncEntry entry = new AsyncEntry(new StringResourceWrapper("testInitAndGetAsyncContext", EntryType.OUT), | |||||
null, curContext); | |||||
assertNull(entry.getAsyncContext()); | |||||
entry.initAsyncContext(); | |||||
System.out.println(curContext.getName()); | |||||
System.out.println(curContext.getOrigin()); | |||||
Context asyncContext = entry.getAsyncContext(); | |||||
assertNotNull(asyncContext); | |||||
assertEquals(contextName, asyncContext.getName()); | |||||
assertEquals(origin, asyncContext.getOrigin()); | |||||
assertSame(curContext.getEntranceNode(), asyncContext.getEntranceNode()); | |||||
assertSame(entry, asyncContext.getCurEntry()); | |||||
assertTrue(asyncContext.isAsync()); | |||||
} finally { | |||||
ContextUtil.getContext().setCurEntry(null); | |||||
ContextUtil.exit(); | |||||
} | |||||
} | |||||
@Test(expected = IllegalStateException.class) | |||||
public void testDuplicateInitAsyncContext() { | |||||
Context context = new Context(null, "abc"); | |||||
AsyncEntry entry = new AsyncEntry(new StringResourceWrapper("testDuplicateInitAsyncContext", EntryType.OUT), | |||||
null, context); | |||||
entry.initAsyncContext(); | |||||
// Duplicate init. | |||||
entry.initAsyncContext(); | |||||
} | |||||
} |
@@ -1,35 +0,0 @@ | |||||
/* | |||||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||||
* | |||||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||||
* you may not use this file except in compliance with the License. | |||||
* You may obtain a copy of the License at | |||||
* | |||||
* http://www.apache.org/licenses/LICENSE-2.0 | |||||
* | |||||
* Unless required by applicable law or agreed to in writing, software | |||||
* distributed under the License is distributed on an "AS IS" BASIS, | |||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
* See the License for the specific language governing permissions and | |||||
* limitations under the License. | |||||
*/ | |||||
package com.alibaba.csp.sentinel; | |||||
import org.junit.Test; | |||||
import com.alibaba.csp.sentinel.context.ContextUtil; | |||||
/** | |||||
* @author jialiang.linjl | |||||
*/ | |||||
public class ContextTest { | |||||
@Test | |||||
public void testEnterContext() { | |||||
// TODO: rewrite this unit test | |||||
ContextUtil.enter("entry", "origin"); | |||||
ContextUtil.exit(); | |||||
} | |||||
} |
@@ -0,0 +1,93 @@ | |||||
/* | |||||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||||
* | |||||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||||
* you may not use this file except in compliance with the License. | |||||
* You may obtain a copy of the License at | |||||
* | |||||
* http://www.apache.org/licenses/LICENSE-2.0 | |||||
* | |||||
* Unless required by applicable law or agreed to in writing, software | |||||
* distributed under the License is distributed on an "AS IS" BASIS, | |||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
* See the License for the specific language governing permissions and | |||||
* limitations under the License. | |||||
*/ | |||||
package com.alibaba.csp.sentinel.context; | |||||
import org.junit.After; | |||||
import org.junit.Test; | |||||
import static org.junit.Assert.*; | |||||
/** | |||||
* Test cases for {@link Context} and {@link ContextUtil}. | |||||
* | |||||
* @author jialiang.linjl | |||||
* @author Eric Zhao | |||||
*/ | |||||
public class ContextTest { | |||||
@After | |||||
public void cleanUp() { | |||||
ContextUtil.exit(); | |||||
} | |||||
@Test | |||||
public void testEnterContext() { | |||||
final String contextName = "contextA"; | |||||
final String origin = "originA"; | |||||
ContextUtil.enter(contextName, origin); | |||||
Context curContext = ContextUtil.getContext(); | |||||
assertEquals(contextName, curContext.getName()); | |||||
assertEquals(origin, curContext.getOrigin()); | |||||
assertFalse(curContext.isAsync()); | |||||
ContextUtil.exit(); | |||||
assertNull(ContextUtil.getContext()); | |||||
} | |||||
@Test | |||||
public void testReplaceContext() { | |||||
final String contextName = "contextA"; | |||||
final String origin = "originA"; | |||||
ContextUtil.enter(contextName, origin); | |||||
Context contextB = Context.newAsyncContext(null, "contextB") | |||||
.setOrigin("originA"); | |||||
Context contextA = ContextUtil.replaceContext(contextB); | |||||
assertEquals(contextName, contextA.getName()); | |||||
assertEquals(origin, contextA.getOrigin()); | |||||
assertFalse(contextA.isAsync()); | |||||
Context curContextAfterReplace = ContextUtil.getContext(); | |||||
assertEquals(contextB.getName(), curContextAfterReplace.getName()); | |||||
assertEquals(contextB.getOrigin(), curContextAfterReplace.getOrigin()); | |||||
assertTrue(curContextAfterReplace.isAsync()); | |||||
ContextUtil.replaceContext(null); | |||||
assertNull(ContextUtil.getContext()); | |||||
} | |||||
@Test | |||||
public void testRunOnContext() { | |||||
final String contextName = "contextA"; | |||||
final String origin = "originA"; | |||||
ContextUtil.enter(contextName, origin); | |||||
final Context contextB = Context.newAsyncContext(null, "contextB") | |||||
.setOrigin("originB"); | |||||
assertEquals(contextName, ContextUtil.getContext().getName()); | |||||
ContextUtil.runOnContext(contextB, new Runnable() { | |||||
@Override | |||||
public void run() { | |||||
Context curContext = ContextUtil.getContext(); | |||||
assertEquals(contextB.getName(), curContext.getName()); | |||||
assertEquals(contextB.getOrigin(), curContext.getOrigin()); | |||||
assertTrue(curContext.isAsync()); | |||||
} | |||||
}); | |||||
assertEquals(contextName, ContextUtil.getContext().getName()); | |||||
} | |||||
} |
@@ -12,6 +12,11 @@ | |||||
<packaging>pom</packaging> | <packaging>pom</packaging> | ||||
<name>sentinel-demo</name> | <name>sentinel-demo</name> | ||||
<properties> | |||||
<java.source.version>1.8</java.source.version> | |||||
<java.target.version>1.8</java.target.version> | |||||
</properties> | |||||
<modules> | <modules> | ||||
<module>sentinel-demo-basic</module> | <module>sentinel-demo-basic</module> | ||||
<module>sentinel-demo-dynamic-file-rule</module> | <module>sentinel-demo-dynamic-file-rule</module> | ||||
@@ -0,0 +1,218 @@ | |||||
/* | |||||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||||
* | |||||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||||
* you may not use this file except in compliance with the License. | |||||
* You may obtain a copy of the License at | |||||
* | |||||
* http://www.apache.org/licenses/LICENSE-2.0 | |||||
* | |||||
* Unless required by applicable law or agreed to in writing, software | |||||
* distributed under the License is distributed on an "AS IS" BASIS, | |||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
* See the License for the specific language governing permissions and | |||||
* limitations under the License. | |||||
*/ | |||||
package com.alibaba.csp.sentinel.demo; | |||||
import java.util.Arrays; | |||||
import java.util.List; | |||||
import java.util.concurrent.CompletableFuture; | |||||
import java.util.concurrent.TimeUnit; | |||||
import java.util.function.Consumer; | |||||
import com.alibaba.csp.sentinel.AsyncEntry; | |||||
import com.alibaba.csp.sentinel.Entry; | |||||
import com.alibaba.csp.sentinel.SphU; | |||||
import com.alibaba.csp.sentinel.context.ContextUtil; | |||||
import com.alibaba.csp.sentinel.slots.block.BlockException; | |||||
import com.alibaba.csp.sentinel.slots.block.RuleConstant; | |||||
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; | |||||
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager; | |||||
/** | |||||
* An example for asynchronous entry in Sentinel. | |||||
* | |||||
* @author Eric Zhao | |||||
* @since 0.2.0 | |||||
*/ | |||||
public class AsyncEntryDemo { | |||||
private void invoke(String arg, Consumer<String> handler) { | |||||
CompletableFuture.runAsync(() -> { | |||||
try { | |||||
TimeUnit.SECONDS.sleep(3); | |||||
String resp = arg + ": " + System.currentTimeMillis(); | |||||
handler.accept(resp); | |||||
} catch (Exception ex) { | |||||
ex.printStackTrace(); | |||||
} | |||||
}); | |||||
} | |||||
private void anotherAsync() { | |||||
try { | |||||
final AsyncEntry entry = SphU.asyncEntry("test-another-async"); | |||||
CompletableFuture.runAsync(() -> { | |||||
ContextUtil.runOnContext(entry.getAsyncContext(), () -> { | |||||
try { | |||||
TimeUnit.SECONDS.sleep(2); | |||||
anotherSyncInAsync(); | |||||
System.out.println("Async result: 666"); | |||||
} catch (InterruptedException e) { | |||||
// Ignore. | |||||
} finally { | |||||
entry.exit(); | |||||
} | |||||
}); | |||||
}); | |||||
} catch (BlockException ex) { | |||||
ex.printStackTrace(); | |||||
} | |||||
} | |||||
private void fetchSync() { | |||||
Entry entry = null; | |||||
try { | |||||
entry = SphU.entry("test-sync"); | |||||
} catch (BlockException ex) { | |||||
ex.printStackTrace(); | |||||
} finally { | |||||
if (entry != null) { | |||||
entry.exit(); | |||||
} | |||||
} | |||||
} | |||||
private void fetchSyncInAsync() { | |||||
Entry entry = null; | |||||
try { | |||||
entry = SphU.entry("test-sync-in-async"); | |||||
} catch (BlockException ex) { | |||||
ex.printStackTrace(); | |||||
} finally { | |||||
if (entry != null) { | |||||
entry.exit(); | |||||
} | |||||
} | |||||
} | |||||
private void anotherSyncInAsync() { | |||||
Entry entry = null; | |||||
try { | |||||
entry = SphU.entry("test-another-sync-in-async"); | |||||
} catch (BlockException ex) { | |||||
ex.printStackTrace(); | |||||
} finally { | |||||
if (entry != null) { | |||||
entry.exit(); | |||||
} | |||||
} | |||||
} | |||||
private void directlyAsync() { | |||||
try { | |||||
final AsyncEntry entry = SphU.asyncEntry("test-async-not-nested"); | |||||
CompletableFuture.runAsync(() -> { | |||||
// If no nested entry later, we don't have to wrap in `ContextUtil.runOnContext()`. | |||||
try { | |||||
TimeUnit.SECONDS.sleep(1); | |||||
} catch (InterruptedException ex) { | |||||
// Ignore. | |||||
} finally { | |||||
// Exit the async entry. | |||||
entry.exit(); | |||||
} | |||||
}); | |||||
} catch (BlockException e) { | |||||
// Request blocked, handle the exception. | |||||
e.printStackTrace(); | |||||
} | |||||
} | |||||
private void doAsyncThenSync() { | |||||
try { | |||||
// First we call an asynchronous resource. | |||||
final AsyncEntry entry = SphU.asyncEntry("test-async"); | |||||
this.invoke("abc", resp -> { | |||||
// The thread is different from original caller thread for async entry. | |||||
// So we need to wrap in the async context so that nested sync invocation entry | |||||
// can be linked to the parent asynchronous entry. | |||||
ContextUtil.runOnContext(entry.getAsyncContext(), () -> { | |||||
try { | |||||
// In the callback, we do another async invocation several times under the async context. | |||||
for (int i = 0; i < 7; i++) { | |||||
anotherAsync(); | |||||
} | |||||
System.out.println(resp); | |||||
// Then we do a sync entry under current async context. | |||||
fetchSyncInAsync(); | |||||
} finally { | |||||
// Exit the async entry. | |||||
entry.exit(); | |||||
} | |||||
}); | |||||
}); | |||||
// Then we call a sync resource. | |||||
fetchSync(); | |||||
} catch (BlockException ex) { | |||||
// Request blocked, handle the exception. | |||||
ex.printStackTrace(); | |||||
} | |||||
} | |||||
public static void main(String[] args) { | |||||
initFlowRule(); | |||||
AsyncEntryDemo service = new AsyncEntryDemo(); | |||||
// Expected invocation chain: | |||||
// | |||||
// EntranceNode: machine-root | |||||
// -EntranceNode: async-context | |||||
// --test-top | |||||
// ---test-sync | |||||
// ---test-async | |||||
// ----test-another-async | |||||
// -----test-another-sync-in-async | |||||
// ----test-sync-in-async | |||||
ContextUtil.enter("async-context", "originA"); | |||||
Entry entry = null; | |||||
try { | |||||
entry = SphU.entry("test-top"); | |||||
System.out.println("Do something..."); | |||||
service.doAsyncThenSync(); | |||||
} catch (BlockException ex) { | |||||
// Request blocked, handle the exception. | |||||
ex.printStackTrace(); | |||||
} finally { | |||||
if (entry != null) { | |||||
entry.exit(); | |||||
} | |||||
ContextUtil.exit(); | |||||
} | |||||
} | |||||
private static void initFlowRule() { | |||||
// Rule 1 won't take effect as the limitApp doesn't match. | |||||
FlowRule rule1 = new FlowRule() | |||||
.setResource("test-another-sync-in-async") | |||||
.setLimitApp("originB") | |||||
.as(FlowRule.class) | |||||
.setCount(4) | |||||
.setGrade(RuleConstant.FLOW_GRADE_QPS); | |||||
// Rule 2 will take effect. | |||||
FlowRule rule2 = new FlowRule() | |||||
.setResource("test-another-async") | |||||
.setLimitApp("default") | |||||
.as(FlowRule.class) | |||||
.setCount(5) | |||||
.setGrade(RuleConstant.FLOW_GRADE_QPS); | |||||
List<FlowRule> ruleList = Arrays.asList(rule1, rule2); | |||||
FlowRuleManager.loadRules(ruleList); | |||||
} | |||||
} |