diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/CtEntry.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/CtEntry.java index d59e52a5..9b491743 100644 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/CtEntry.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/CtEntry.java @@ -15,12 +15,17 @@ */ package com.alibaba.csp.sentinel; +import java.util.Iterator; +import java.util.LinkedList; + import com.alibaba.csp.sentinel.context.Context; import com.alibaba.csp.sentinel.context.ContextUtil; import com.alibaba.csp.sentinel.context.NullContext; +import com.alibaba.csp.sentinel.log.RecordLog; import com.alibaba.csp.sentinel.node.Node; import com.alibaba.csp.sentinel.slotchain.ProcessorSlot; import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; +import com.alibaba.csp.sentinel.util.function.BiConsumer; /** * Linked entry within current context. @@ -35,6 +40,8 @@ class CtEntry extends Entry { protected ProcessorSlot chain; protected Context context; + protected LinkedList> exitHandlers; + CtEntry(ResourceWrapper resourceWrapper, ProcessorSlot chain, Context context) { super(resourceWrapper); @@ -102,10 +109,32 @@ class CtEntry extends Entry { protected void clearEntryContext() { this.context = null; } + + @Override + public void whenComplete(BiConsumer consumer) { + if (this.exitHandlers == null) { + this.exitHandlers = new LinkedList<>(); + } + this.exitHandlers.add(consumer); + } @Override protected Entry trueExit(int count, Object... args) throws ErrorEntryFreeException { exitForContext(context, count, args); + + if (this.exitHandlers != null) { + Iterator> it = this.exitHandlers.iterator(); + BiConsumer cur; + while (it.hasNext()) { + cur = it.next(); + try { + cur.accept(this.context, this); + } catch (Exception e) { + RecordLog.warn("Error invoking exit handler", e); + } + } + this.exitHandlers = null; + } return parent; } diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/Entry.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/Entry.java index dd763773..fbe19a14 100755 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/Entry.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/Entry.java @@ -17,6 +17,7 @@ package com.alibaba.csp.sentinel; import com.alibaba.csp.sentinel.slots.block.BlockException; import com.alibaba.csp.sentinel.util.TimeUtil; +import com.alibaba.csp.sentinel.util.function.BiConsumer; import com.alibaba.csp.sentinel.context.ContextUtil; import com.alibaba.csp.sentinel.node.Node; import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; @@ -178,4 +179,13 @@ public abstract class Entry implements AutoCloseable { this.originNode = originNode; } + /** + * Like `CompletableFuture` since JDK8 it guarantees specified consumer + * is invoked when this entry exited. + * Use it when you did some STATEFUL operations on entries. + * + * @param consumer + */ + public abstract void whenComplete(BiConsumer consumer); + } diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/DegradeSlot.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/DegradeSlot.java index 9bc8c793..32c05018 100755 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/DegradeSlot.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/DegradeSlot.java @@ -26,7 +26,6 @@ import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; import com.alibaba.csp.sentinel.slots.block.BlockException; import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.CircuitBreaker; import com.alibaba.csp.sentinel.spi.SpiOrder; -import com.alibaba.csp.sentinel.util.TimeUtil; /** * A {@link ProcessorSlot} dedicates to circuit breaking. @@ -40,18 +39,18 @@ public class DegradeSlot extends AbstractLinkedProcessorSlot { @Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { - performChecking(resourceWrapper); + performChecking(context, resourceWrapper); fireEntry(context, resourceWrapper, node, count, prioritized, args); } - void performChecking(ResourceWrapper r) throws BlockException { + void performChecking(Context context, ResourceWrapper r) throws BlockException { List circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName()); if (circuitBreakers == null || circuitBreakers.isEmpty()) { return; } for (CircuitBreaker cb : circuitBreakers) { - if (!cb.tryPass()) { + if (!cb.tryPass(context, r)) { throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule()); } } @@ -71,14 +70,9 @@ public class DegradeSlot extends AbstractLinkedProcessorSlot { } if (curEntry.getBlockError() == null) { - long completeTime = curEntry.getCompleteTimestamp(); - if (completeTime <= 0) { - completeTime = TimeUtil.currentTimeMillis(); - } - long rt = completeTime - curEntry.getCreateTimestamp(); - Throwable error = curEntry.getError(); + // passed request for (CircuitBreaker circuitBreaker : circuitBreakers) { - circuitBreaker.onRequestComplete(rt, error); + circuitBreaker.onRequestComplete(context, r); } } diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/AbstractCircuitBreaker.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/AbstractCircuitBreaker.java index 2d3c2370..c516b66b 100644 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/AbstractCircuitBreaker.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/AbstractCircuitBreaker.java @@ -17,10 +17,14 @@ package com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker; import java.util.concurrent.atomic.AtomicReference; +import com.alibaba.csp.sentinel.Entry; +import com.alibaba.csp.sentinel.context.Context; +import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule; import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRuleManager; import com.alibaba.csp.sentinel.util.AssertUtil; import com.alibaba.csp.sentinel.util.TimeUtil; +import com.alibaba.csp.sentinel.util.function.BiConsumer; /** * @author Eric Zhao @@ -61,14 +65,14 @@ public abstract class AbstractCircuitBreaker implements CircuitBreaker { } @Override - public boolean tryPass() { + public boolean tryPass(Context context, ResourceWrapper r) { // Template implementation. if (currentState.get() == State.CLOSED) { return true; } if (currentState.get() == State.OPEN) { // For half-open state we allow a request for trial. - return retryTimeoutArrived() && fromOpenToHalfOpen(); + return retryTimeoutArrived() && fromOpenToHalfOpen(context); } return false; } @@ -91,30 +95,44 @@ public abstract class AbstractCircuitBreaker implements CircuitBreaker { if (currentState.compareAndSet(prev, State.OPEN)) { updateNextRetryTimestamp(); - for (CircuitBreakerStateChangeObserver observer : observerRegistry.getStateChangeObservers()) { - observer.onStateChange(prev, State.OPEN, rule, snapshotValue); - } + notifyObservers(prev, State.OPEN, snapshotValue); return true; } return false; } - protected boolean fromOpenToHalfOpen() { + protected boolean fromOpenToHalfOpen(Context context) { if (currentState.compareAndSet(State.OPEN, State.HALF_OPEN)) { - for (CircuitBreakerStateChangeObserver observer : observerRegistry.getStateChangeObservers()) { - observer.onStateChange(State.OPEN, State.HALF_OPEN, rule, null); - } + notifyObservers(State.OPEN, State.HALF_OPEN, null); + Entry entry = context.getCurEntry(); + entry.whenComplete(new BiConsumer() { + + @Override + public void accept(Context context, Entry entry) { + if (entry.getBlockError() != null) { + // Fallback to OPEN due to detecting request is blocked + currentState.compareAndSet(State.HALF_OPEN, State.OPEN); + notifyObservers(State.HALF_OPEN, State.OPEN, 1.0d); + return; + } + + } + }); return true; } return false; } + + private void notifyObservers(CircuitBreaker.State prevState, CircuitBreaker.State newState, Double snapshotValue) { + for (CircuitBreakerStateChangeObserver observer : observerRegistry.getStateChangeObservers()) { + observer.onStateChange(prevState, newState, rule, snapshotValue); + } + } protected boolean fromHalfOpenToOpen(double snapshotValue) { if (currentState.compareAndSet(State.HALF_OPEN, State.OPEN)) { updateNextRetryTimestamp(); - for (CircuitBreakerStateChangeObserver observer : observerRegistry.getStateChangeObservers()) { - observer.onStateChange(State.HALF_OPEN, State.OPEN, rule, snapshotValue); - } + notifyObservers(State.HALF_OPEN, State.OPEN, snapshotValue); return true; } return false; @@ -123,9 +141,7 @@ public abstract class AbstractCircuitBreaker implements CircuitBreaker { protected boolean fromHalfOpenToClose() { if (currentState.compareAndSet(State.HALF_OPEN, State.CLOSED)) { resetStat(); - for (CircuitBreakerStateChangeObserver observer : observerRegistry.getStateChangeObservers()) { - observer.onStateChange(State.HALF_OPEN, State.CLOSED, rule, null); - } + notifyObservers(State.HALF_OPEN, State.CLOSED, null); return true; } return false; diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/CircuitBreaker.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/CircuitBreaker.java index 3e5faf43..cc4b349b 100644 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/CircuitBreaker.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/CircuitBreaker.java @@ -15,6 +15,8 @@ */ package com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker; +import com.alibaba.csp.sentinel.context.Context; +import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule; /** @@ -32,11 +34,13 @@ public interface CircuitBreaker { DegradeRule getRule(); /** - * Acquires permission of an invocation only if it is available at the time of invocation. + * Acquires permission of an invocation only if it is available at the time of invoking. * + * @param context + * @param r * @return {@code true} if permission was acquired and {@code false} otherwise */ - boolean tryPass(); + boolean tryPass(Context context, ResourceWrapper r); /** * Get current state of the circuit breaker. @@ -46,13 +50,12 @@ public interface CircuitBreaker { State currentState(); /** - * Record a completed request with the given response time and error (if present) and - * handle state transformation of the circuit breaker. + * Called when a `passed` invocation finished. * - * @param rt the response time of this entry - * @param error the error of this entry (if present) + * @param context context of current invocation + * @param wrapper current resource */ - void onRequestComplete(long rt, Throwable error); + void onRequestComplete(Context context, ResourceWrapper wrapper); /** * Circuit breaker state. diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/ExceptionCircuitBreaker.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/ExceptionCircuitBreaker.java index d57a037b..fa286f72 100644 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/ExceptionCircuitBreaker.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/ExceptionCircuitBreaker.java @@ -17,6 +17,9 @@ package com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker; import java.util.List; +import com.alibaba.csp.sentinel.Entry; +import com.alibaba.csp.sentinel.context.Context; +import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule; import com.alibaba.csp.sentinel.slots.statistic.base.LeapArray; import com.alibaba.csp.sentinel.slots.statistic.base.LongAdder; @@ -60,7 +63,12 @@ public class ExceptionCircuitBreaker extends AbstractCircuitBreaker { } @Override - public void onRequestComplete(long rt, Throwable error) { + public void onRequestComplete(Context context, ResourceWrapper r) { + Entry entry = context.getCurEntry(); + if (entry == null) { + return; + } + Throwable error = entry.getError(); SimpleErrorCounter counter = stat.currentWindow().value(); if (error != null) { counter.getErrorCount().add(1); @@ -74,7 +82,9 @@ public class ExceptionCircuitBreaker extends AbstractCircuitBreaker { if (currentState.get() == State.OPEN) { return; } + if (currentState.get() == State.HALF_OPEN) { + // In detecting request if (error == null) { fromHalfOpenToClose(); } else { @@ -82,6 +92,7 @@ public class ExceptionCircuitBreaker extends AbstractCircuitBreaker { } return; } + List counters = stat.values(); long errCount = 0; long totalCount = 0; diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/ResponseTimeCircuitBreaker.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/ResponseTimeCircuitBreaker.java index 252aaa43..7ff9e182 100644 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/ResponseTimeCircuitBreaker.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/ResponseTimeCircuitBreaker.java @@ -17,12 +17,16 @@ package com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker; import java.util.List; +import com.alibaba.csp.sentinel.Entry; +import com.alibaba.csp.sentinel.context.Context; +import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; import com.alibaba.csp.sentinel.slots.block.RuleConstant; import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule; import com.alibaba.csp.sentinel.slots.statistic.base.LeapArray; import com.alibaba.csp.sentinel.slots.statistic.base.LongAdder; import com.alibaba.csp.sentinel.slots.statistic.base.WindowWrap; import com.alibaba.csp.sentinel.util.AssertUtil; +import com.alibaba.csp.sentinel.util.TimeUtil; /** * @author Eric Zhao @@ -57,8 +61,17 @@ public class ResponseTimeCircuitBreaker extends AbstractCircuitBreaker { } @Override - public void onRequestComplete(long rt, Throwable error) { + public void onRequestComplete(Context context, ResourceWrapper wrapper) { SlowRequestCounter counter = slidingCounter.currentWindow().value(); + Entry entry = context.getCurEntry(); + if (entry == null) { + return; + } + long completeTime = entry.getCompleteTimestamp(); + if (completeTime <= 0) { + completeTime = TimeUtil.currentTimeMillis(); + } + long rt = completeTime - entry.getCreateTimestamp(); if (rt > maxAllowedRt) { counter.slowCount.add(1); } @@ -71,7 +84,9 @@ public class ResponseTimeCircuitBreaker extends AbstractCircuitBreaker { if (currentState.get() == State.OPEN) { return; } + if (currentState.get() == State.HALF_OPEN) { + // In detecting request // TODO: improve logic for half-open recovery if (rt > maxAllowedRt) { fromHalfOpenToOpen(1.0d); diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/util/function/BiConsumer.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/util/function/BiConsumer.java new file mode 100644 index 00000000..48bdcee2 --- /dev/null +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/util/function/BiConsumer.java @@ -0,0 +1,24 @@ +/* + * Copyright 1999-2019 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.util.function; + +/** + * BiConsumer interface from JDK 8. + */ +public interface BiConsumer { + + void accept(T t, U u); +} diff --git a/sentinel-core/src/test/java/com/alibaba/csp/sentinel/EntryTest.java b/sentinel-core/src/test/java/com/alibaba/csp/sentinel/EntryTest.java index 3cfb8c09..237fcbc1 100644 --- a/sentinel-core/src/test/java/com/alibaba/csp/sentinel/EntryTest.java +++ b/sentinel-core/src/test/java/com/alibaba/csp/sentinel/EntryTest.java @@ -1,8 +1,10 @@ package com.alibaba.csp.sentinel; +import com.alibaba.csp.sentinel.context.Context; import com.alibaba.csp.sentinel.node.Node; import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; import com.alibaba.csp.sentinel.slotchain.StringResourceWrapper; +import com.alibaba.csp.sentinel.util.function.BiConsumer; import org.junit.Test; @@ -64,5 +66,10 @@ public class EntryTest { public Node getLastNode() { return null; } + + @Override + public void whenComplete(BiConsumer consumer) { + // do nothing + } } -} \ No newline at end of file +} diff --git a/sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/block/degrade/CircuitBreakingIntegrationTest.java b/sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/block/degrade/CircuitBreakingIntegrationTest.java index 797943fe..7f4800df 100755 --- a/sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/block/degrade/CircuitBreakingIntegrationTest.java +++ b/sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/block/degrade/CircuitBreakingIntegrationTest.java @@ -15,11 +15,8 @@ */ package com.alibaba.csp.sentinel.slots.block.degrade; -import com.alibaba.csp.sentinel.Entry; -import com.alibaba.csp.sentinel.SphU; -import com.alibaba.csp.sentinel.Tracer; -import com.alibaba.csp.sentinel.slots.block.BlockException; import com.alibaba.csp.sentinel.slots.block.RuleConstant; +import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.CircuitBreaker; import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.CircuitBreaker.State; import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.CircuitBreakerStateChangeObserver; import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.EventObserverRegistry; @@ -31,6 +28,7 @@ import org.junit.Test; import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import java.util.concurrent.ThreadLocalRandom; import static org.junit.Assert.assertEquals; @@ -54,41 +52,6 @@ public class CircuitBreakingIntegrationTest extends AbstractTimeBasedTest { DegradeRuleManager.loadRules(new ArrayList()); } - private boolean entryAndSleepFor(String res, int sleepMs) { - Entry entry = null; - try { - entry = SphU.entry(res); - sleep(sleepMs); - } catch (BlockException ex) { - return false; - } catch (Exception ex) { - Tracer.traceEntry(ex, entry); - } finally { - if (entry != null) { - entry.exit(); - } - } - return true; - } - - private boolean entryWithErrorIfPresent(String res, Exception ex) { - Entry entry = null; - try { - entry = SphU.entry(res); - if (ex != null) { - Tracer.traceEntry(ex, entry); - } - sleep(ThreadLocalRandom.current().nextInt(5, 10)); - } catch (BlockException b) { - return false; - } finally { - if (entry != null) { - entry.exit(); - } - } - return true; - } - @Test public void testSlowRequestMode() throws Exception { CircuitBreakerStateChangeObserver observer = mock(CircuitBreakerStateChangeObserver.class); @@ -209,5 +172,62 @@ public class CircuitBreakingIntegrationTest extends AbstractTimeBasedTest { public void testExceptionCountMode() throws Throwable { // TODO } - + + private void verifyState(List breakers, int target) { + int state = 0; + for (CircuitBreaker breaker : breakers) { + if (breaker.currentState() == State.OPEN) { + state ++; + } else if (breaker.currentState() == State.HALF_OPEN) { + state --; + } else { + state -= 2; + } + } + assertEquals(target, state); + } + + @Test + public void testMultipleHalfOpenedBreaders() throws Exception { + CircuitBreakerStateChangeObserver observer = mock(CircuitBreakerStateChangeObserver.class); + setCurrentMillis(System.currentTimeMillis() / 1000 * 1000); + int retryTimeoutSec = 2; + int maxRt = 50; + int statIntervalMs = 20000; + int minRequestAmount = 1; + String res = "CircuitBreakingIntegrationTest_testMultipleHalfOpenedBreaders"; + EventObserverRegistry.getInstance().addStateChangeObserver(res, observer); + // initial two rules + DegradeRuleManager.loadRules(Arrays.asList( + new DegradeRule(res).setTimeWindow(retryTimeoutSec).setCount(maxRt) + .setStatIntervalMs(statIntervalMs).setMinRequestAmount(minRequestAmount) + .setSlowRatioThreshold(0.8d).setGrade(0), + new DegradeRule(res).setTimeWindow(retryTimeoutSec * 2).setCount(maxRt) + .setStatIntervalMs(statIntervalMs).setMinRequestAmount(minRequestAmount) + .setSlowRatioThreshold(0.8d).setGrade(0) + )); + assertTrue(entryAndSleepFor(res, 100)); + // they are open now + for (CircuitBreaker breaker : DegradeRuleManager.getCircuitBreakers(res)) { + assertEquals(CircuitBreaker.State.OPEN, breaker.currentState()); + } + + sleepSecond(3); + + for (int i = 0; i < 10; i ++) { + assertFalse(entryAndSleepFor(res, 100)); + } + // Now one is in open state while the other experiences open -> half-open -> open + verifyState(DegradeRuleManager.getCircuitBreakers(res), 2); + + sleepSecond(3); + + // They will all recover + for (int i = 0; i < 10; i ++) { + assertTrue(entryAndSleepFor(res, 1)); + } + + verifyState(DegradeRuleManager.getCircuitBreakers(res), -4); + } + } diff --git a/sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/ExceptionCircuitBreakerTest.java b/sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/ExceptionCircuitBreakerTest.java index 22724f27..e9d7178d 100644 --- a/sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/ExceptionCircuitBreakerTest.java +++ b/sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/ExceptionCircuitBreakerTest.java @@ -15,97 +15,71 @@ */ package com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker; -import com.alibaba.csp.sentinel.slots.block.RuleConstant; -import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule; -import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.CircuitBreaker.State; -import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.ExceptionCircuitBreaker.SimpleErrorCounter; -import com.alibaba.csp.sentinel.slots.statistic.base.LeapArray; -import com.alibaba.csp.sentinel.slots.statistic.base.WindowWrap; -import com.alibaba.csp.sentinel.test.AbstractTimeBasedTest; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Arrays; +import org.junit.After; +import org.junit.Before; import org.junit.Test; -import static org.junit.Assert.*; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import com.alibaba.csp.sentinel.slots.block.BlockException; +import com.alibaba.csp.sentinel.slots.block.RuleConstant; +import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule; +import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRuleManager; +import com.alibaba.csp.sentinel.test.AbstractTimeBasedTest; /** * @author Eric Zhao */ public class ExceptionCircuitBreakerTest extends AbstractTimeBasedTest { + + @Before + public void setUp() { + DegradeRuleManager.loadRules(new ArrayList()); + } - @Test - @SuppressWarnings("unchecked") - public void testStateChangeAndTryAcquire() { - int retryTimeout = 10; - DegradeRule rule = new DegradeRule("abc") - .setCount(0.5d) - .setGrade(RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO) - .setStatIntervalMs(20 * 1000) - .setTimeWindow(retryTimeout) - .setMinRequestAmount(10); - LeapArray stat = mock(LeapArray.class); - SimpleErrorCounter counter = new SimpleErrorCounter(); - WindowWrap bucket = new WindowWrap<>(20000, 0, counter); - when(stat.currentWindow()).thenReturn(bucket); - - ExceptionCircuitBreaker cb = new ExceptionCircuitBreaker(rule, stat); - - assertTrue(cb.tryPass()); - assertTrue(cb.tryPass()); - - setCurrentMillis(System.currentTimeMillis()); - cb.fromCloseToOpen(0.52d); - assertEquals(State.OPEN, cb.currentState()); - - assertFalse(cb.tryPass()); - assertFalse(cb.tryPass()); - - // Wait for next retry checkpoint. - sleepSecond(retryTimeout); - sleep(100); - // Try a request to trigger state transformation. - assertTrue(cb.tryPass()); - assertEquals(State.HALF_OPEN, cb.currentState()); - - // Mark this request as error - cb.onRequestComplete(20, new IllegalArgumentException()); - assertEquals(State.OPEN, cb.currentState()); - - // Wait for next retry checkpoint. - sleepSecond(retryTimeout); - sleep(100); - assertTrue(cb.tryPass()); - assertEquals(State.HALF_OPEN, cb.currentState()); - - setCurrentMillis(System.currentTimeMillis()); - // Mark this request as success. - cb.onRequestComplete(20, null); - assertEquals(State.CLOSED, cb.currentState()); + @After + public void tearDown() throws Exception { + DegradeRuleManager.loadRules(new ArrayList()); } @Test - @SuppressWarnings("unchecked") - public void testRecordErrorOrSuccess() { + public void testRecordErrorOrSuccess() throws BlockException { + String resource = "testRecordErrorOrSuccess"; + int retryTimeoutMillis = 10 * 1000; + int retryTimeout = retryTimeoutMillis / 1000; DegradeRule rule = new DegradeRule("abc") - .setCount(0.5d) + .setCount(0.2d) .setGrade(RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO) .setStatIntervalMs(20 * 1000) - .setTimeWindow(10) - .setMinRequestAmount(10); - LeapArray stat = mock(LeapArray.class); - SimpleErrorCounter counter = new SimpleErrorCounter(); - WindowWrap bucket = new WindowWrap<>(20000, 0, counter); - when(stat.currentWindow()).thenReturn(bucket); - - CircuitBreaker cb = new ExceptionCircuitBreaker(rule, stat); - cb.onRequestComplete(15, null); - - assertEquals(1L, counter.getTotalCount().longValue()); - assertEquals(0L, counter.getErrorCount().longValue()); - - cb.onRequestComplete(15, new IllegalArgumentException()); - assertEquals(2L, counter.getTotalCount().longValue()); - assertEquals(1L, counter.getErrorCount().longValue()); + .setTimeWindow(retryTimeout) + .setMinRequestAmount(1); + rule.setResource(resource); + DegradeRuleManager.loadRules(Arrays.asList(rule)); + + assertTrue(entryAndSleepFor(resource, 10)); + + assertTrue(entryWithErrorIfPresent(resource, new IllegalArgumentException())); // -> open + assertFalse(entryWithErrorIfPresent(resource, new IllegalArgumentException())); + assertFalse(entryAndSleepFor(resource, 100)); + sleep(retryTimeoutMillis / 2); + assertFalse(entryAndSleepFor(resource, 100)); + sleep(retryTimeoutMillis / 2); + assertTrue(entryWithErrorIfPresent(resource, new IllegalArgumentException())); // -> half -> open + assertFalse(entryAndSleepFor(resource, 100)); + assertFalse(entryAndSleepFor(resource, 100)); + sleep(retryTimeoutMillis); + assertTrue(entryAndSleepFor(resource, 100)); // -> half -> closed + assertTrue(entryAndSleepFor(resource, 100)); + assertTrue(entryAndSleepFor(resource, 100)); + assertTrue(entryAndSleepFor(resource, 100)); + assertTrue(entryAndSleepFor(resource, 100)); + assertTrue(entryAndSleepFor(resource, 100)); + assertTrue(entryAndSleepFor(resource, 100)); + assertTrue(entryWithErrorIfPresent(resource, new IllegalArgumentException())); + assertTrue(entryAndSleepFor(resource, 100)); } } \ No newline at end of file diff --git a/sentinel-core/src/test/java/com/alibaba/csp/sentinel/test/AbstractTimeBasedTest.java b/sentinel-core/src/test/java/com/alibaba/csp/sentinel/test/AbstractTimeBasedTest.java index 2c9f8fe4..e22d1a64 100644 --- a/sentinel-core/src/test/java/com/alibaba/csp/sentinel/test/AbstractTimeBasedTest.java +++ b/sentinel-core/src/test/java/com/alibaba/csp/sentinel/test/AbstractTimeBasedTest.java @@ -15,11 +15,17 @@ */ package com.alibaba.csp.sentinel.test; +import java.util.concurrent.ThreadLocalRandom; + import org.junit.runner.RunWith; import org.powermock.api.mockito.PowerMockito; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; +import com.alibaba.csp.sentinel.Entry; +import com.alibaba.csp.sentinel.SphU; +import com.alibaba.csp.sentinel.Tracer; +import com.alibaba.csp.sentinel.slots.block.BlockException; import com.alibaba.csp.sentinel.util.TimeUtil; /** @@ -55,4 +61,39 @@ public abstract class AbstractTimeBasedTest { protected final void sleepSecond(int timeSec) { sleep(timeSec * 1000); } + + protected final boolean entryAndSleepFor(String res, int sleepMs) { + Entry entry = null; + try { + entry = SphU.entry(res); + sleep(sleepMs); + } catch (BlockException ex) { + return false; + } catch (Exception ex) { + Tracer.traceEntry(ex, entry); + } finally { + if (entry != null) { + entry.exit(); + } + } + return true; + } + + protected final boolean entryWithErrorIfPresent(String res, Exception ex) { + Entry entry = null; + try { + entry = SphU.entry(res); + if (ex != null) { + Tracer.traceEntry(ex, entry); + } + sleep(ThreadLocalRandom.current().nextInt(5, 10)); + } catch (BlockException b) { + return false; + } finally { + if (entry != null) { + entry.exit(); + } + } + return true; + } }