Signed-off-by: Eric Zhao <sczyh16@gmail.com>master
@@ -190,7 +190,7 @@ public class ContextUtil { | |||
* @return old context | |||
* @since 0.2.0 | |||
*/ | |||
private static Context replaceContext(Context newContext) { | |||
static Context replaceContext(Context newContext) { | |||
Context backupContext = contextHolder.get(); | |||
if (newContext == null) { | |||
contextHolder.remove(); | |||
@@ -0,0 +1,224 @@ | |||
/* | |||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||
* | |||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||
* you may not use this file except in compliance with the License. | |||
* You may obtain a copy of the License at | |||
* | |||
* http://www.apache.org/licenses/LICENSE-2.0 | |||
* | |||
* Unless required by applicable law or agreed to in writing, software | |||
* distributed under the License is distributed on an "AS IS" BASIS, | |||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
* See the License for the specific language governing permissions and | |||
* limitations under the License. | |||
*/ | |||
package com.alibaba.csp.sentinel; | |||
import java.util.Set; | |||
import java.util.concurrent.ExecutorService; | |||
import java.util.concurrent.Executors; | |||
import java.util.concurrent.TimeUnit; | |||
import com.alibaba.csp.sentinel.context.ContextUtil; | |||
import com.alibaba.csp.sentinel.node.DefaultNode; | |||
import com.alibaba.csp.sentinel.node.EntranceNode; | |||
import com.alibaba.csp.sentinel.node.Node; | |||
import com.alibaba.csp.sentinel.slots.block.BlockException; | |||
import org.junit.After; | |||
import org.junit.Before; | |||
import org.junit.Test; | |||
import static org.junit.Assert.*; | |||
/** | |||
* Integration test for asynchronous entry, including common scenarios. | |||
* | |||
* @author Eric Zhao | |||
*/ | |||
public class AsyncEntryIntegrationTest { | |||
@Before | |||
public void clearContext() { | |||
if (ContextUtil.getContext() != null) { | |||
ContextUtil.getContext().setCurEntry(null); | |||
ContextUtil.exit(); | |||
} | |||
} | |||
private final ExecutorService pool = Executors.newFixedThreadPool(10); | |||
private void anotherAsync() { | |||
try { | |||
final AsyncEntry entry = SphU.asyncEntry("test-another-async"); | |||
runAsync(new Runnable() { | |||
@Override | |||
public void run() { | |||
ContextUtil.runOnContext(entry.getAsyncContext(), new Runnable() { | |||
@Override | |||
public void run() { | |||
try { | |||
TimeUnit.SECONDS.sleep(2); | |||
anotherSyncInAsync(); | |||
System.out.println("Async result: 666"); | |||
} catch (InterruptedException e) { | |||
// Ignore. | |||
} finally { | |||
entry.exit(); | |||
} | |||
} | |||
}); | |||
} | |||
}); | |||
} catch (BlockException ex) { | |||
ex.printStackTrace(); | |||
} | |||
} | |||
private void fetchSync() { | |||
Entry entry = null; | |||
try { | |||
entry = SphU.entry("test-sync"); | |||
} catch (BlockException ex) { | |||
ex.printStackTrace(); | |||
} finally { | |||
if (entry != null) { | |||
entry.exit(); | |||
} | |||
} | |||
} | |||
private void fetchSyncInAsync() { | |||
Entry entry = null; | |||
try { | |||
entry = SphU.entry("test-sync-in-async"); | |||
} catch (BlockException ex) { | |||
ex.printStackTrace(); | |||
} finally { | |||
if (entry != null) { | |||
entry.exit(); | |||
} | |||
} | |||
} | |||
public void anotherSyncInAsync() { | |||
Entry entry = null; | |||
try { | |||
entry = SphU.entry("test-another-in-async"); | |||
} catch (BlockException ex) { | |||
ex.printStackTrace(); | |||
} finally { | |||
if (entry != null) { | |||
entry.exit(); | |||
} | |||
} | |||
} | |||
private void doAsyncThenSync() { | |||
try { | |||
// First we call an asynchronous resource. | |||
final AsyncEntry entry = SphU.asyncEntry("test-async"); | |||
this.invoke("abc", new Consumer<String>() { | |||
@Override | |||
public void accept(final String resp) { | |||
// The thread is different from original caller thread for async entry. | |||
// So we need to wrap in the async context so that nested sync invocation entry | |||
// can be linked to the parent asynchronous entry. | |||
ContextUtil.runOnContext(entry.getAsyncContext(), new Runnable() { | |||
@Override | |||
public void run() { | |||
try { | |||
// In the callback, we do another async invocation under the async context. | |||
anotherAsync(); | |||
System.out.println(resp); | |||
// Then we do a sync entry under current async context. | |||
fetchSyncInAsync(); | |||
} finally { | |||
// Exit the async entry. | |||
entry.exit(); | |||
} | |||
} | |||
}); | |||
} | |||
}); | |||
// Then we call a sync resource. | |||
fetchSync(); | |||
} catch (BlockException ex) { | |||
// Request blocked, handle the exception. | |||
ex.printStackTrace(); | |||
} | |||
} | |||
@Test | |||
public void testAsyncEntryUnderSyncEntry() throws Exception { | |||
// Expected invocation chain: | |||
// EntranceNode: machine-root | |||
// -EntranceNode: async-context | |||
// --test-top | |||
// ---test-async | |||
// ----test-sync-in-async | |||
// ----test-another-async | |||
// -----test-another-in-async | |||
// ---test-sync | |||
ContextUtil.enter(contextName, origin); | |||
Entry entry = null; | |||
try { | |||
entry = SphU.entry("test-top"); | |||
doAsyncThenSync(); | |||
} catch (BlockException ex) { | |||
ex.printStackTrace(); | |||
} finally { | |||
if (entry != null) { | |||
entry.exit(); | |||
} | |||
ContextUtil.exit(); | |||
} | |||
TimeUnit.SECONDS.sleep(10); | |||
testTreeCorrect(); | |||
} | |||
private void testTreeCorrect() { | |||
DefaultNode root = Constants.ROOT; | |||
Set<Node> childListForRoot = root.getChildList(); | |||
// TODO: check child tree | |||
} | |||
@After | |||
public void shutdown() { | |||
pool.shutdownNow(); | |||
ContextUtil.exit(); | |||
} | |||
private void runAsync(Runnable f) { | |||
// In Java 8, we can use CompletableFuture.runAsync(f) instead. | |||
pool.submit(f); | |||
} | |||
private void invoke(final String arg, final Consumer<String> handler) { | |||
runAsync(new Runnable() { | |||
@Override | |||
public void run() { | |||
try { | |||
TimeUnit.SECONDS.sleep(3); | |||
String resp = arg + ": " + System.currentTimeMillis(); | |||
handler.accept(resp); | |||
} catch (Exception ex) { | |||
ex.printStackTrace(); | |||
} | |||
} | |||
}); | |||
} | |||
private interface Consumer<T> { | |||
void accept(T t); | |||
} | |||
private final String contextName = "async-context"; | |||
private final String origin = "originA"; | |||
} |
@@ -0,0 +1,76 @@ | |||
package com.alibaba.csp.sentinel; | |||
import com.alibaba.csp.sentinel.context.Context; | |||
import com.alibaba.csp.sentinel.context.ContextUtil; | |||
import com.alibaba.csp.sentinel.slotchain.StringResourceWrapper; | |||
import org.junit.Test; | |||
import static org.junit.Assert.*; | |||
/** | |||
* Test cases for {@link AsyncEntry}. | |||
* | |||
* @author Eric Zhao | |||
* @since 0.2.0 | |||
*/ | |||
public class AsyncEntryTest { | |||
@Test | |||
public void testCleanCurrentEntryInLocal() { | |||
final String contextName = "abc"; | |||
try { | |||
ContextUtil.enter(contextName); | |||
Context curContext = ContextUtil.getContext(); | |||
AsyncEntry entry = new AsyncEntry(new StringResourceWrapper("testCleanCurrentEntryInLocal", EntryType.OUT), | |||
null, curContext); | |||
assertSame(entry, curContext.getCurEntry()); | |||
entry.cleanCurrentEntryInLocal(); | |||
assertNotSame(entry, curContext.getCurEntry()); | |||
} finally { | |||
ContextUtil.getContext().setCurEntry(null); | |||
ContextUtil.exit(); | |||
} | |||
} | |||
@Test | |||
public void testInitAndGetAsyncContext() { | |||
final String contextName = "abc"; | |||
final String origin = "xxx"; | |||
try { | |||
ContextUtil.enter(contextName, origin); | |||
Context curContext = ContextUtil.getContext(); | |||
AsyncEntry entry = new AsyncEntry(new StringResourceWrapper("testInitAndGetAsyncContext", EntryType.OUT), | |||
null, curContext); | |||
assertNull(entry.getAsyncContext()); | |||
entry.initAsyncContext(); | |||
System.out.println(curContext.getName()); | |||
System.out.println(curContext.getOrigin()); | |||
Context asyncContext = entry.getAsyncContext(); | |||
assertNotNull(asyncContext); | |||
assertEquals(contextName, asyncContext.getName()); | |||
assertEquals(origin, asyncContext.getOrigin()); | |||
assertSame(curContext.getEntranceNode(), asyncContext.getEntranceNode()); | |||
assertSame(entry, asyncContext.getCurEntry()); | |||
assertTrue(asyncContext.isAsync()); | |||
} finally { | |||
ContextUtil.getContext().setCurEntry(null); | |||
ContextUtil.exit(); | |||
} | |||
} | |||
@Test(expected = IllegalStateException.class) | |||
public void testDuplicateInitAsyncContext() { | |||
Context context = new Context(null, "abc"); | |||
AsyncEntry entry = new AsyncEntry(new StringResourceWrapper("testDuplicateInitAsyncContext", EntryType.OUT), | |||
null, context); | |||
entry.initAsyncContext(); | |||
// Duplicate init. | |||
entry.initAsyncContext(); | |||
} | |||
} |
@@ -1,35 +0,0 @@ | |||
/* | |||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||
* | |||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||
* you may not use this file except in compliance with the License. | |||
* You may obtain a copy of the License at | |||
* | |||
* http://www.apache.org/licenses/LICENSE-2.0 | |||
* | |||
* Unless required by applicable law or agreed to in writing, software | |||
* distributed under the License is distributed on an "AS IS" BASIS, | |||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
* See the License for the specific language governing permissions and | |||
* limitations under the License. | |||
*/ | |||
package com.alibaba.csp.sentinel; | |||
import org.junit.Test; | |||
import com.alibaba.csp.sentinel.context.ContextUtil; | |||
/** | |||
* @author jialiang.linjl | |||
*/ | |||
public class ContextTest { | |||
@Test | |||
public void testEnterContext() { | |||
// TODO: rewrite this unit test | |||
ContextUtil.enter("entry", "origin"); | |||
ContextUtil.exit(); | |||
} | |||
} |
@@ -0,0 +1,93 @@ | |||
/* | |||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||
* | |||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||
* you may not use this file except in compliance with the License. | |||
* You may obtain a copy of the License at | |||
* | |||
* http://www.apache.org/licenses/LICENSE-2.0 | |||
* | |||
* Unless required by applicable law or agreed to in writing, software | |||
* distributed under the License is distributed on an "AS IS" BASIS, | |||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
* See the License for the specific language governing permissions and | |||
* limitations under the License. | |||
*/ | |||
package com.alibaba.csp.sentinel.context; | |||
import org.junit.After; | |||
import org.junit.Test; | |||
import static org.junit.Assert.*; | |||
/** | |||
* Test cases for {@link Context} and {@link ContextUtil}. | |||
* | |||
* @author jialiang.linjl | |||
* @author Eric Zhao | |||
*/ | |||
public class ContextTest { | |||
@After | |||
public void cleanUp() { | |||
ContextUtil.exit(); | |||
} | |||
@Test | |||
public void testEnterContext() { | |||
final String contextName = "contextA"; | |||
final String origin = "originA"; | |||
ContextUtil.enter(contextName, origin); | |||
Context curContext = ContextUtil.getContext(); | |||
assertEquals(contextName, curContext.getName()); | |||
assertEquals(origin, curContext.getOrigin()); | |||
assertFalse(curContext.isAsync()); | |||
ContextUtil.exit(); | |||
assertNull(ContextUtil.getContext()); | |||
} | |||
@Test | |||
public void testReplaceContext() { | |||
final String contextName = "contextA"; | |||
final String origin = "originA"; | |||
ContextUtil.enter(contextName, origin); | |||
Context contextB = Context.newAsyncContext(null, "contextB") | |||
.setOrigin("originA"); | |||
Context contextA = ContextUtil.replaceContext(contextB); | |||
assertEquals(contextName, contextA.getName()); | |||
assertEquals(origin, contextA.getOrigin()); | |||
assertFalse(contextA.isAsync()); | |||
Context curContextAfterReplace = ContextUtil.getContext(); | |||
assertEquals(contextB.getName(), curContextAfterReplace.getName()); | |||
assertEquals(contextB.getOrigin(), curContextAfterReplace.getOrigin()); | |||
assertTrue(curContextAfterReplace.isAsync()); | |||
ContextUtil.replaceContext(null); | |||
assertNull(ContextUtil.getContext()); | |||
} | |||
@Test | |||
public void testRunOnContext() { | |||
final String contextName = "contextA"; | |||
final String origin = "originA"; | |||
ContextUtil.enter(contextName, origin); | |||
final Context contextB = Context.newAsyncContext(null, "contextB") | |||
.setOrigin("originB"); | |||
assertEquals(contextName, ContextUtil.getContext().getName()); | |||
ContextUtil.runOnContext(contextB, new Runnable() { | |||
@Override | |||
public void run() { | |||
Context curContext = ContextUtil.getContext(); | |||
assertEquals(contextB.getName(), curContext.getName()); | |||
assertEquals(contextB.getOrigin(), curContext.getOrigin()); | |||
assertTrue(curContext.isAsync()); | |||
} | |||
}); | |||
assertEquals(contextName, ContextUtil.getContext().getName()); | |||
} | |||
} |
@@ -12,6 +12,11 @@ | |||
<packaging>pom</packaging> | |||
<name>sentinel-demo</name> | |||
<properties> | |||
<java.source.version>1.8</java.source.version> | |||
<java.target.version>1.8</java.target.version> | |||
</properties> | |||
<modules> | |||
<module>sentinel-demo-basic</module> | |||
<module>sentinel-demo-dynamic-file-rule</module> | |||
@@ -0,0 +1,218 @@ | |||
/* | |||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||
* | |||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||
* you may not use this file except in compliance with the License. | |||
* You may obtain a copy of the License at | |||
* | |||
* http://www.apache.org/licenses/LICENSE-2.0 | |||
* | |||
* Unless required by applicable law or agreed to in writing, software | |||
* distributed under the License is distributed on an "AS IS" BASIS, | |||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
* See the License for the specific language governing permissions and | |||
* limitations under the License. | |||
*/ | |||
package com.alibaba.csp.sentinel.demo; | |||
import java.util.Arrays; | |||
import java.util.List; | |||
import java.util.concurrent.CompletableFuture; | |||
import java.util.concurrent.TimeUnit; | |||
import java.util.function.Consumer; | |||
import com.alibaba.csp.sentinel.AsyncEntry; | |||
import com.alibaba.csp.sentinel.Entry; | |||
import com.alibaba.csp.sentinel.SphU; | |||
import com.alibaba.csp.sentinel.context.ContextUtil; | |||
import com.alibaba.csp.sentinel.slots.block.BlockException; | |||
import com.alibaba.csp.sentinel.slots.block.RuleConstant; | |||
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; | |||
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager; | |||
/** | |||
* An example for asynchronous entry in Sentinel. | |||
* | |||
* @author Eric Zhao | |||
* @since 0.2.0 | |||
*/ | |||
public class AsyncEntryDemo { | |||
private void invoke(String arg, Consumer<String> handler) { | |||
CompletableFuture.runAsync(() -> { | |||
try { | |||
TimeUnit.SECONDS.sleep(3); | |||
String resp = arg + ": " + System.currentTimeMillis(); | |||
handler.accept(resp); | |||
} catch (Exception ex) { | |||
ex.printStackTrace(); | |||
} | |||
}); | |||
} | |||
private void anotherAsync() { | |||
try { | |||
final AsyncEntry entry = SphU.asyncEntry("test-another-async"); | |||
CompletableFuture.runAsync(() -> { | |||
ContextUtil.runOnContext(entry.getAsyncContext(), () -> { | |||
try { | |||
TimeUnit.SECONDS.sleep(2); | |||
anotherSyncInAsync(); | |||
System.out.println("Async result: 666"); | |||
} catch (InterruptedException e) { | |||
// Ignore. | |||
} finally { | |||
entry.exit(); | |||
} | |||
}); | |||
}); | |||
} catch (BlockException ex) { | |||
ex.printStackTrace(); | |||
} | |||
} | |||
private void fetchSync() { | |||
Entry entry = null; | |||
try { | |||
entry = SphU.entry("test-sync"); | |||
} catch (BlockException ex) { | |||
ex.printStackTrace(); | |||
} finally { | |||
if (entry != null) { | |||
entry.exit(); | |||
} | |||
} | |||
} | |||
private void fetchSyncInAsync() { | |||
Entry entry = null; | |||
try { | |||
entry = SphU.entry("test-sync-in-async"); | |||
} catch (BlockException ex) { | |||
ex.printStackTrace(); | |||
} finally { | |||
if (entry != null) { | |||
entry.exit(); | |||
} | |||
} | |||
} | |||
private void anotherSyncInAsync() { | |||
Entry entry = null; | |||
try { | |||
entry = SphU.entry("test-another-sync-in-async"); | |||
} catch (BlockException ex) { | |||
ex.printStackTrace(); | |||
} finally { | |||
if (entry != null) { | |||
entry.exit(); | |||
} | |||
} | |||
} | |||
private void directlyAsync() { | |||
try { | |||
final AsyncEntry entry = SphU.asyncEntry("test-async-not-nested"); | |||
CompletableFuture.runAsync(() -> { | |||
// If no nested entry later, we don't have to wrap in `ContextUtil.runOnContext()`. | |||
try { | |||
TimeUnit.SECONDS.sleep(1); | |||
} catch (InterruptedException ex) { | |||
// Ignore. | |||
} finally { | |||
// Exit the async entry. | |||
entry.exit(); | |||
} | |||
}); | |||
} catch (BlockException e) { | |||
// Request blocked, handle the exception. | |||
e.printStackTrace(); | |||
} | |||
} | |||
private void doAsyncThenSync() { | |||
try { | |||
// First we call an asynchronous resource. | |||
final AsyncEntry entry = SphU.asyncEntry("test-async"); | |||
this.invoke("abc", resp -> { | |||
// The thread is different from original caller thread for async entry. | |||
// So we need to wrap in the async context so that nested sync invocation entry | |||
// can be linked to the parent asynchronous entry. | |||
ContextUtil.runOnContext(entry.getAsyncContext(), () -> { | |||
try { | |||
// In the callback, we do another async invocation several times under the async context. | |||
for (int i = 0; i < 7; i++) { | |||
anotherAsync(); | |||
} | |||
System.out.println(resp); | |||
// Then we do a sync entry under current async context. | |||
fetchSyncInAsync(); | |||
} finally { | |||
// Exit the async entry. | |||
entry.exit(); | |||
} | |||
}); | |||
}); | |||
// Then we call a sync resource. | |||
fetchSync(); | |||
} catch (BlockException ex) { | |||
// Request blocked, handle the exception. | |||
ex.printStackTrace(); | |||
} | |||
} | |||
public static void main(String[] args) { | |||
initFlowRule(); | |||
AsyncEntryDemo service = new AsyncEntryDemo(); | |||
// Expected invocation chain: | |||
// | |||
// EntranceNode: machine-root | |||
// -EntranceNode: async-context | |||
// --test-top | |||
// ---test-sync | |||
// ---test-async | |||
// ----test-another-async | |||
// -----test-another-sync-in-async | |||
// ----test-sync-in-async | |||
ContextUtil.enter("async-context", "originA"); | |||
Entry entry = null; | |||
try { | |||
entry = SphU.entry("test-top"); | |||
System.out.println("Do something..."); | |||
service.doAsyncThenSync(); | |||
} catch (BlockException ex) { | |||
// Request blocked, handle the exception. | |||
ex.printStackTrace(); | |||
} finally { | |||
if (entry != null) { | |||
entry.exit(); | |||
} | |||
ContextUtil.exit(); | |||
} | |||
} | |||
private static void initFlowRule() { | |||
// Rule 1 won't take effect as the limitApp doesn't match. | |||
FlowRule rule1 = new FlowRule() | |||
.setResource("test-another-sync-in-async") | |||
.setLimitApp("originB") | |||
.as(FlowRule.class) | |||
.setCount(4) | |||
.setGrade(RuleConstant.FLOW_GRADE_QPS); | |||
// Rule 2 will take effect. | |||
FlowRule rule2 = new FlowRule() | |||
.setResource("test-another-async") | |||
.setLimitApp("default") | |||
.as(FlowRule.class) | |||
.setCount(5) | |||
.setGrade(RuleConstant.FLOW_GRADE_QPS); | |||
List<FlowRule> ruleList = Arrays.asList(rule1, rule2); | |||
FlowRuleManager.loadRules(ruleList); | |||
} | |||
} |