* Refactor the workflow to fix the bug that circuit breaker may remain half-open state forever when the request is blocked by upcoming rules: revert the state change in exit handler (as a temporary workaround) * Add exit handler in Entry as a per-invocation hook.master
@@ -15,12 +15,17 @@ | |||||
*/ | */ | ||||
package com.alibaba.csp.sentinel; | package com.alibaba.csp.sentinel; | ||||
import java.util.Iterator; | |||||
import java.util.LinkedList; | |||||
import com.alibaba.csp.sentinel.context.Context; | import com.alibaba.csp.sentinel.context.Context; | ||||
import com.alibaba.csp.sentinel.context.ContextUtil; | import com.alibaba.csp.sentinel.context.ContextUtil; | ||||
import com.alibaba.csp.sentinel.context.NullContext; | import com.alibaba.csp.sentinel.context.NullContext; | ||||
import com.alibaba.csp.sentinel.log.RecordLog; | |||||
import com.alibaba.csp.sentinel.node.Node; | import com.alibaba.csp.sentinel.node.Node; | ||||
import com.alibaba.csp.sentinel.slotchain.ProcessorSlot; | import com.alibaba.csp.sentinel.slotchain.ProcessorSlot; | ||||
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; | import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; | ||||
import com.alibaba.csp.sentinel.util.function.BiConsumer; | |||||
/** | /** | ||||
* Linked entry within current context. | * Linked entry within current context. | ||||
@@ -35,6 +40,8 @@ class CtEntry extends Entry { | |||||
protected ProcessorSlot<Object> chain; | protected ProcessorSlot<Object> chain; | ||||
protected Context context; | protected Context context; | ||||
protected LinkedList<BiConsumer<Context, Entry>> exitHandlers; | |||||
CtEntry(ResourceWrapper resourceWrapper, ProcessorSlot<Object> chain, Context context) { | CtEntry(ResourceWrapper resourceWrapper, ProcessorSlot<Object> chain, Context context) { | ||||
super(resourceWrapper); | super(resourceWrapper); | ||||
@@ -102,10 +109,32 @@ class CtEntry extends Entry { | |||||
protected void clearEntryContext() { | protected void clearEntryContext() { | ||||
this.context = null; | this.context = null; | ||||
} | } | ||||
@Override | |||||
public void whenComplete(BiConsumer<Context, Entry> consumer) { | |||||
if (this.exitHandlers == null) { | |||||
this.exitHandlers = new LinkedList<>(); | |||||
} | |||||
this.exitHandlers.add(consumer); | |||||
} | |||||
@Override | @Override | ||||
protected Entry trueExit(int count, Object... args) throws ErrorEntryFreeException { | protected Entry trueExit(int count, Object... args) throws ErrorEntryFreeException { | ||||
exitForContext(context, count, args); | exitForContext(context, count, args); | ||||
if (this.exitHandlers != null) { | |||||
Iterator<BiConsumer<Context, Entry>> it = this.exitHandlers.iterator(); | |||||
BiConsumer<Context, Entry> cur; | |||||
while (it.hasNext()) { | |||||
cur = it.next(); | |||||
try { | |||||
cur.accept(this.context, this); | |||||
} catch (Exception e) { | |||||
RecordLog.warn("Error invoking exit handler", e); | |||||
} | |||||
} | |||||
this.exitHandlers = null; | |||||
} | |||||
return parent; | return parent; | ||||
} | } | ||||
@@ -17,6 +17,7 @@ package com.alibaba.csp.sentinel; | |||||
import com.alibaba.csp.sentinel.slots.block.BlockException; | import com.alibaba.csp.sentinel.slots.block.BlockException; | ||||
import com.alibaba.csp.sentinel.util.TimeUtil; | import com.alibaba.csp.sentinel.util.TimeUtil; | ||||
import com.alibaba.csp.sentinel.util.function.BiConsumer; | |||||
import com.alibaba.csp.sentinel.context.ContextUtil; | import com.alibaba.csp.sentinel.context.ContextUtil; | ||||
import com.alibaba.csp.sentinel.node.Node; | import com.alibaba.csp.sentinel.node.Node; | ||||
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; | import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; | ||||
@@ -178,4 +179,13 @@ public abstract class Entry implements AutoCloseable { | |||||
this.originNode = originNode; | this.originNode = originNode; | ||||
} | } | ||||
/** | |||||
* Like `CompletableFuture` since JDK8 it guarantees specified consumer | |||||
* is invoked when this entry exited. | |||||
* Use it when you did some STATEFUL operations on entries. | |||||
* | |||||
* @param consumer | |||||
*/ | |||||
public abstract void whenComplete(BiConsumer<Context, Entry> consumer); | |||||
} | } |
@@ -26,7 +26,6 @@ import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; | |||||
import com.alibaba.csp.sentinel.slots.block.BlockException; | import com.alibaba.csp.sentinel.slots.block.BlockException; | ||||
import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.CircuitBreaker; | import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.CircuitBreaker; | ||||
import com.alibaba.csp.sentinel.spi.SpiOrder; | import com.alibaba.csp.sentinel.spi.SpiOrder; | ||||
import com.alibaba.csp.sentinel.util.TimeUtil; | |||||
/** | /** | ||||
* A {@link ProcessorSlot} dedicates to circuit breaking. | * A {@link ProcessorSlot} dedicates to circuit breaking. | ||||
@@ -40,18 +39,18 @@ public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> { | |||||
@Override | @Override | ||||
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, | public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, | ||||
boolean prioritized, Object... args) throws Throwable { | boolean prioritized, Object... args) throws Throwable { | ||||
performChecking(resourceWrapper); | |||||
performChecking(context, resourceWrapper); | |||||
fireEntry(context, resourceWrapper, node, count, prioritized, args); | fireEntry(context, resourceWrapper, node, count, prioritized, args); | ||||
} | } | ||||
void performChecking(ResourceWrapper r) throws BlockException { | |||||
void performChecking(Context context, ResourceWrapper r) throws BlockException { | |||||
List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName()); | List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName()); | ||||
if (circuitBreakers == null || circuitBreakers.isEmpty()) { | if (circuitBreakers == null || circuitBreakers.isEmpty()) { | ||||
return; | return; | ||||
} | } | ||||
for (CircuitBreaker cb : circuitBreakers) { | for (CircuitBreaker cb : circuitBreakers) { | ||||
if (!cb.tryPass()) { | |||||
if (!cb.tryPass(context, r)) { | |||||
throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule()); | throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule()); | ||||
} | } | ||||
} | } | ||||
@@ -71,14 +70,9 @@ public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> { | |||||
} | } | ||||
if (curEntry.getBlockError() == null) { | if (curEntry.getBlockError() == null) { | ||||
long completeTime = curEntry.getCompleteTimestamp(); | |||||
if (completeTime <= 0) { | |||||
completeTime = TimeUtil.currentTimeMillis(); | |||||
} | |||||
long rt = completeTime - curEntry.getCreateTimestamp(); | |||||
Throwable error = curEntry.getError(); | |||||
// passed request | |||||
for (CircuitBreaker circuitBreaker : circuitBreakers) { | for (CircuitBreaker circuitBreaker : circuitBreakers) { | ||||
circuitBreaker.onRequestComplete(rt, error); | |||||
circuitBreaker.onRequestComplete(context, r); | |||||
} | } | ||||
} | } | ||||
@@ -17,10 +17,14 @@ package com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker; | |||||
import java.util.concurrent.atomic.AtomicReference; | import java.util.concurrent.atomic.AtomicReference; | ||||
import com.alibaba.csp.sentinel.Entry; | |||||
import com.alibaba.csp.sentinel.context.Context; | |||||
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; | |||||
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule; | import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule; | ||||
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRuleManager; | import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRuleManager; | ||||
import com.alibaba.csp.sentinel.util.AssertUtil; | import com.alibaba.csp.sentinel.util.AssertUtil; | ||||
import com.alibaba.csp.sentinel.util.TimeUtil; | import com.alibaba.csp.sentinel.util.TimeUtil; | ||||
import com.alibaba.csp.sentinel.util.function.BiConsumer; | |||||
/** | /** | ||||
* @author Eric Zhao | * @author Eric Zhao | ||||
@@ -61,14 +65,14 @@ public abstract class AbstractCircuitBreaker implements CircuitBreaker { | |||||
} | } | ||||
@Override | @Override | ||||
public boolean tryPass() { | |||||
public boolean tryPass(Context context, ResourceWrapper r) { | |||||
// Template implementation. | // Template implementation. | ||||
if (currentState.get() == State.CLOSED) { | if (currentState.get() == State.CLOSED) { | ||||
return true; | return true; | ||||
} | } | ||||
if (currentState.get() == State.OPEN) { | if (currentState.get() == State.OPEN) { | ||||
// For half-open state we allow a request for trial. | // For half-open state we allow a request for trial. | ||||
return retryTimeoutArrived() && fromOpenToHalfOpen(); | |||||
return retryTimeoutArrived() && fromOpenToHalfOpen(context); | |||||
} | } | ||||
return false; | return false; | ||||
} | } | ||||
@@ -91,30 +95,44 @@ public abstract class AbstractCircuitBreaker implements CircuitBreaker { | |||||
if (currentState.compareAndSet(prev, State.OPEN)) { | if (currentState.compareAndSet(prev, State.OPEN)) { | ||||
updateNextRetryTimestamp(); | updateNextRetryTimestamp(); | ||||
for (CircuitBreakerStateChangeObserver observer : observerRegistry.getStateChangeObservers()) { | |||||
observer.onStateChange(prev, State.OPEN, rule, snapshotValue); | |||||
} | |||||
notifyObservers(prev, State.OPEN, snapshotValue); | |||||
return true; | return true; | ||||
} | } | ||||
return false; | return false; | ||||
} | } | ||||
protected boolean fromOpenToHalfOpen() { | |||||
protected boolean fromOpenToHalfOpen(Context context) { | |||||
if (currentState.compareAndSet(State.OPEN, State.HALF_OPEN)) { | if (currentState.compareAndSet(State.OPEN, State.HALF_OPEN)) { | ||||
for (CircuitBreakerStateChangeObserver observer : observerRegistry.getStateChangeObservers()) { | |||||
observer.onStateChange(State.OPEN, State.HALF_OPEN, rule, null); | |||||
} | |||||
notifyObservers(State.OPEN, State.HALF_OPEN, null); | |||||
Entry entry = context.getCurEntry(); | |||||
entry.whenComplete(new BiConsumer<Context, Entry>() { | |||||
@Override | |||||
public void accept(Context context, Entry entry) { | |||||
if (entry.getBlockError() != null) { | |||||
// Fallback to OPEN due to detecting request is blocked | |||||
currentState.compareAndSet(State.HALF_OPEN, State.OPEN); | |||||
notifyObservers(State.HALF_OPEN, State.OPEN, 1.0d); | |||||
return; | |||||
} | |||||
} | |||||
}); | |||||
return true; | return true; | ||||
} | } | ||||
return false; | return false; | ||||
} | } | ||||
private void notifyObservers(CircuitBreaker.State prevState, CircuitBreaker.State newState, Double snapshotValue) { | |||||
for (CircuitBreakerStateChangeObserver observer : observerRegistry.getStateChangeObservers()) { | |||||
observer.onStateChange(prevState, newState, rule, snapshotValue); | |||||
} | |||||
} | |||||
protected boolean fromHalfOpenToOpen(double snapshotValue) { | protected boolean fromHalfOpenToOpen(double snapshotValue) { | ||||
if (currentState.compareAndSet(State.HALF_OPEN, State.OPEN)) { | if (currentState.compareAndSet(State.HALF_OPEN, State.OPEN)) { | ||||
updateNextRetryTimestamp(); | updateNextRetryTimestamp(); | ||||
for (CircuitBreakerStateChangeObserver observer : observerRegistry.getStateChangeObservers()) { | |||||
observer.onStateChange(State.HALF_OPEN, State.OPEN, rule, snapshotValue); | |||||
} | |||||
notifyObservers(State.HALF_OPEN, State.OPEN, snapshotValue); | |||||
return true; | return true; | ||||
} | } | ||||
return false; | return false; | ||||
@@ -123,9 +141,7 @@ public abstract class AbstractCircuitBreaker implements CircuitBreaker { | |||||
protected boolean fromHalfOpenToClose() { | protected boolean fromHalfOpenToClose() { | ||||
if (currentState.compareAndSet(State.HALF_OPEN, State.CLOSED)) { | if (currentState.compareAndSet(State.HALF_OPEN, State.CLOSED)) { | ||||
resetStat(); | resetStat(); | ||||
for (CircuitBreakerStateChangeObserver observer : observerRegistry.getStateChangeObservers()) { | |||||
observer.onStateChange(State.HALF_OPEN, State.CLOSED, rule, null); | |||||
} | |||||
notifyObservers(State.HALF_OPEN, State.CLOSED, null); | |||||
return true; | return true; | ||||
} | } | ||||
return false; | return false; | ||||
@@ -15,6 +15,8 @@ | |||||
*/ | */ | ||||
package com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker; | package com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker; | ||||
import com.alibaba.csp.sentinel.context.Context; | |||||
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; | |||||
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule; | import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule; | ||||
/** | /** | ||||
@@ -32,11 +34,13 @@ public interface CircuitBreaker { | |||||
DegradeRule getRule(); | DegradeRule getRule(); | ||||
/** | /** | ||||
* Acquires permission of an invocation only if it is available at the time of invocation. | |||||
* Acquires permission of an invocation only if it is available at the time of invoking. | |||||
* | * | ||||
* @param context | |||||
* @param r | |||||
* @return {@code true} if permission was acquired and {@code false} otherwise | * @return {@code true} if permission was acquired and {@code false} otherwise | ||||
*/ | */ | ||||
boolean tryPass(); | |||||
boolean tryPass(Context context, ResourceWrapper r); | |||||
/** | /** | ||||
* Get current state of the circuit breaker. | * Get current state of the circuit breaker. | ||||
@@ -46,13 +50,12 @@ public interface CircuitBreaker { | |||||
State currentState(); | State currentState(); | ||||
/** | /** | ||||
* Record a completed request with the given response time and error (if present) and | |||||
* handle state transformation of the circuit breaker. | |||||
* Called when a `passed` invocation finished. | |||||
* | * | ||||
* @param rt the response time of this entry | |||||
* @param error the error of this entry (if present) | |||||
* @param context context of current invocation | |||||
* @param wrapper current resource | |||||
*/ | */ | ||||
void onRequestComplete(long rt, Throwable error); | |||||
void onRequestComplete(Context context, ResourceWrapper wrapper); | |||||
/** | /** | ||||
* Circuit breaker state. | * Circuit breaker state. | ||||
@@ -17,6 +17,9 @@ package com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker; | |||||
import java.util.List; | import java.util.List; | ||||
import com.alibaba.csp.sentinel.Entry; | |||||
import com.alibaba.csp.sentinel.context.Context; | |||||
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; | |||||
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule; | import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule; | ||||
import com.alibaba.csp.sentinel.slots.statistic.base.LeapArray; | import com.alibaba.csp.sentinel.slots.statistic.base.LeapArray; | ||||
import com.alibaba.csp.sentinel.slots.statistic.base.LongAdder; | import com.alibaba.csp.sentinel.slots.statistic.base.LongAdder; | ||||
@@ -60,7 +63,12 @@ public class ExceptionCircuitBreaker extends AbstractCircuitBreaker { | |||||
} | } | ||||
@Override | @Override | ||||
public void onRequestComplete(long rt, Throwable error) { | |||||
public void onRequestComplete(Context context, ResourceWrapper r) { | |||||
Entry entry = context.getCurEntry(); | |||||
if (entry == null) { | |||||
return; | |||||
} | |||||
Throwable error = entry.getError(); | |||||
SimpleErrorCounter counter = stat.currentWindow().value(); | SimpleErrorCounter counter = stat.currentWindow().value(); | ||||
if (error != null) { | if (error != null) { | ||||
counter.getErrorCount().add(1); | counter.getErrorCount().add(1); | ||||
@@ -74,7 +82,9 @@ public class ExceptionCircuitBreaker extends AbstractCircuitBreaker { | |||||
if (currentState.get() == State.OPEN) { | if (currentState.get() == State.OPEN) { | ||||
return; | return; | ||||
} | } | ||||
if (currentState.get() == State.HALF_OPEN) { | if (currentState.get() == State.HALF_OPEN) { | ||||
// In detecting request | |||||
if (error == null) { | if (error == null) { | ||||
fromHalfOpenToClose(); | fromHalfOpenToClose(); | ||||
} else { | } else { | ||||
@@ -82,6 +92,7 @@ public class ExceptionCircuitBreaker extends AbstractCircuitBreaker { | |||||
} | } | ||||
return; | return; | ||||
} | } | ||||
List<SimpleErrorCounter> counters = stat.values(); | List<SimpleErrorCounter> counters = stat.values(); | ||||
long errCount = 0; | long errCount = 0; | ||||
long totalCount = 0; | long totalCount = 0; | ||||
@@ -17,12 +17,16 @@ package com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker; | |||||
import java.util.List; | import java.util.List; | ||||
import com.alibaba.csp.sentinel.Entry; | |||||
import com.alibaba.csp.sentinel.context.Context; | |||||
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; | |||||
import com.alibaba.csp.sentinel.slots.block.RuleConstant; | import com.alibaba.csp.sentinel.slots.block.RuleConstant; | ||||
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule; | import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule; | ||||
import com.alibaba.csp.sentinel.slots.statistic.base.LeapArray; | import com.alibaba.csp.sentinel.slots.statistic.base.LeapArray; | ||||
import com.alibaba.csp.sentinel.slots.statistic.base.LongAdder; | import com.alibaba.csp.sentinel.slots.statistic.base.LongAdder; | ||||
import com.alibaba.csp.sentinel.slots.statistic.base.WindowWrap; | import com.alibaba.csp.sentinel.slots.statistic.base.WindowWrap; | ||||
import com.alibaba.csp.sentinel.util.AssertUtil; | import com.alibaba.csp.sentinel.util.AssertUtil; | ||||
import com.alibaba.csp.sentinel.util.TimeUtil; | |||||
/** | /** | ||||
* @author Eric Zhao | * @author Eric Zhao | ||||
@@ -57,8 +61,17 @@ public class ResponseTimeCircuitBreaker extends AbstractCircuitBreaker { | |||||
} | } | ||||
@Override | @Override | ||||
public void onRequestComplete(long rt, Throwable error) { | |||||
public void onRequestComplete(Context context, ResourceWrapper wrapper) { | |||||
SlowRequestCounter counter = slidingCounter.currentWindow().value(); | SlowRequestCounter counter = slidingCounter.currentWindow().value(); | ||||
Entry entry = context.getCurEntry(); | |||||
if (entry == null) { | |||||
return; | |||||
} | |||||
long completeTime = entry.getCompleteTimestamp(); | |||||
if (completeTime <= 0) { | |||||
completeTime = TimeUtil.currentTimeMillis(); | |||||
} | |||||
long rt = completeTime - entry.getCreateTimestamp(); | |||||
if (rt > maxAllowedRt) { | if (rt > maxAllowedRt) { | ||||
counter.slowCount.add(1); | counter.slowCount.add(1); | ||||
} | } | ||||
@@ -71,7 +84,9 @@ public class ResponseTimeCircuitBreaker extends AbstractCircuitBreaker { | |||||
if (currentState.get() == State.OPEN) { | if (currentState.get() == State.OPEN) { | ||||
return; | return; | ||||
} | } | ||||
if (currentState.get() == State.HALF_OPEN) { | if (currentState.get() == State.HALF_OPEN) { | ||||
// In detecting request | |||||
// TODO: improve logic for half-open recovery | // TODO: improve logic for half-open recovery | ||||
if (rt > maxAllowedRt) { | if (rt > maxAllowedRt) { | ||||
fromHalfOpenToOpen(1.0d); | fromHalfOpenToOpen(1.0d); | ||||
@@ -0,0 +1,24 @@ | |||||
/* | |||||
* Copyright 1999-2019 Alibaba Group Holding Ltd. | |||||
* | |||||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||||
* you may not use this file except in compliance with the License. | |||||
* You may obtain a copy of the License at | |||||
* | |||||
* https://www.apache.org/licenses/LICENSE-2.0 | |||||
* | |||||
* Unless required by applicable law or agreed to in writing, software | |||||
* distributed under the License is distributed on an "AS IS" BASIS, | |||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
* See the License for the specific language governing permissions and | |||||
* limitations under the License. | |||||
*/ | |||||
package com.alibaba.csp.sentinel.util.function; | |||||
/** | |||||
* BiConsumer interface from JDK 8. | |||||
*/ | |||||
public interface BiConsumer<T, U> { | |||||
void accept(T t, U u); | |||||
} |
@@ -1,8 +1,10 @@ | |||||
package com.alibaba.csp.sentinel; | package com.alibaba.csp.sentinel; | ||||
import com.alibaba.csp.sentinel.context.Context; | |||||
import com.alibaba.csp.sentinel.node.Node; | import com.alibaba.csp.sentinel.node.Node; | ||||
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; | import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; | ||||
import com.alibaba.csp.sentinel.slotchain.StringResourceWrapper; | import com.alibaba.csp.sentinel.slotchain.StringResourceWrapper; | ||||
import com.alibaba.csp.sentinel.util.function.BiConsumer; | |||||
import org.junit.Test; | import org.junit.Test; | ||||
@@ -64,5 +66,10 @@ public class EntryTest { | |||||
public Node getLastNode() { | public Node getLastNode() { | ||||
return null; | return null; | ||||
} | } | ||||
@Override | |||||
public void whenComplete(BiConsumer<Context, Entry> consumer) { | |||||
// do nothing | |||||
} | |||||
} | } | ||||
} | |||||
} |
@@ -15,11 +15,8 @@ | |||||
*/ | */ | ||||
package com.alibaba.csp.sentinel.slots.block.degrade; | package com.alibaba.csp.sentinel.slots.block.degrade; | ||||
import com.alibaba.csp.sentinel.Entry; | |||||
import com.alibaba.csp.sentinel.SphU; | |||||
import com.alibaba.csp.sentinel.Tracer; | |||||
import com.alibaba.csp.sentinel.slots.block.BlockException; | |||||
import com.alibaba.csp.sentinel.slots.block.RuleConstant; | import com.alibaba.csp.sentinel.slots.block.RuleConstant; | ||||
import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.CircuitBreaker; | |||||
import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.CircuitBreaker.State; | import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.CircuitBreaker.State; | ||||
import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.CircuitBreakerStateChangeObserver; | import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.CircuitBreakerStateChangeObserver; | ||||
import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.EventObserverRegistry; | import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.EventObserverRegistry; | ||||
@@ -31,6 +28,7 @@ import org.junit.Test; | |||||
import java.util.ArrayList; | import java.util.ArrayList; | ||||
import java.util.Arrays; | import java.util.Arrays; | ||||
import java.util.List; | |||||
import java.util.concurrent.ThreadLocalRandom; | import java.util.concurrent.ThreadLocalRandom; | ||||
import static org.junit.Assert.assertEquals; | import static org.junit.Assert.assertEquals; | ||||
@@ -54,41 +52,6 @@ public class CircuitBreakingIntegrationTest extends AbstractTimeBasedTest { | |||||
DegradeRuleManager.loadRules(new ArrayList<DegradeRule>()); | DegradeRuleManager.loadRules(new ArrayList<DegradeRule>()); | ||||
} | } | ||||
private boolean entryAndSleepFor(String res, int sleepMs) { | |||||
Entry entry = null; | |||||
try { | |||||
entry = SphU.entry(res); | |||||
sleep(sleepMs); | |||||
} catch (BlockException ex) { | |||||
return false; | |||||
} catch (Exception ex) { | |||||
Tracer.traceEntry(ex, entry); | |||||
} finally { | |||||
if (entry != null) { | |||||
entry.exit(); | |||||
} | |||||
} | |||||
return true; | |||||
} | |||||
private boolean entryWithErrorIfPresent(String res, Exception ex) { | |||||
Entry entry = null; | |||||
try { | |||||
entry = SphU.entry(res); | |||||
if (ex != null) { | |||||
Tracer.traceEntry(ex, entry); | |||||
} | |||||
sleep(ThreadLocalRandom.current().nextInt(5, 10)); | |||||
} catch (BlockException b) { | |||||
return false; | |||||
} finally { | |||||
if (entry != null) { | |||||
entry.exit(); | |||||
} | |||||
} | |||||
return true; | |||||
} | |||||
@Test | @Test | ||||
public void testSlowRequestMode() throws Exception { | public void testSlowRequestMode() throws Exception { | ||||
CircuitBreakerStateChangeObserver observer = mock(CircuitBreakerStateChangeObserver.class); | CircuitBreakerStateChangeObserver observer = mock(CircuitBreakerStateChangeObserver.class); | ||||
@@ -209,5 +172,62 @@ public class CircuitBreakingIntegrationTest extends AbstractTimeBasedTest { | |||||
public void testExceptionCountMode() throws Throwable { | public void testExceptionCountMode() throws Throwable { | ||||
// TODO | // TODO | ||||
} | } | ||||
private void verifyState(List<CircuitBreaker> breakers, int target) { | |||||
int state = 0; | |||||
for (CircuitBreaker breaker : breakers) { | |||||
if (breaker.currentState() == State.OPEN) { | |||||
state ++; | |||||
} else if (breaker.currentState() == State.HALF_OPEN) { | |||||
state --; | |||||
} else { | |||||
state -= 2; | |||||
} | |||||
} | |||||
assertEquals(target, state); | |||||
} | |||||
@Test | |||||
public void testMultipleHalfOpenedBreaders() throws Exception { | |||||
CircuitBreakerStateChangeObserver observer = mock(CircuitBreakerStateChangeObserver.class); | |||||
setCurrentMillis(System.currentTimeMillis() / 1000 * 1000); | |||||
int retryTimeoutSec = 2; | |||||
int maxRt = 50; | |||||
int statIntervalMs = 20000; | |||||
int minRequestAmount = 1; | |||||
String res = "CircuitBreakingIntegrationTest_testMultipleHalfOpenedBreaders"; | |||||
EventObserverRegistry.getInstance().addStateChangeObserver(res, observer); | |||||
// initial two rules | |||||
DegradeRuleManager.loadRules(Arrays.asList( | |||||
new DegradeRule(res).setTimeWindow(retryTimeoutSec).setCount(maxRt) | |||||
.setStatIntervalMs(statIntervalMs).setMinRequestAmount(minRequestAmount) | |||||
.setSlowRatioThreshold(0.8d).setGrade(0), | |||||
new DegradeRule(res).setTimeWindow(retryTimeoutSec * 2).setCount(maxRt) | |||||
.setStatIntervalMs(statIntervalMs).setMinRequestAmount(minRequestAmount) | |||||
.setSlowRatioThreshold(0.8d).setGrade(0) | |||||
)); | |||||
assertTrue(entryAndSleepFor(res, 100)); | |||||
// they are open now | |||||
for (CircuitBreaker breaker : DegradeRuleManager.getCircuitBreakers(res)) { | |||||
assertEquals(CircuitBreaker.State.OPEN, breaker.currentState()); | |||||
} | |||||
sleepSecond(3); | |||||
for (int i = 0; i < 10; i ++) { | |||||
assertFalse(entryAndSleepFor(res, 100)); | |||||
} | |||||
// Now one is in open state while the other experiences open -> half-open -> open | |||||
verifyState(DegradeRuleManager.getCircuitBreakers(res), 2); | |||||
sleepSecond(3); | |||||
// They will all recover | |||||
for (int i = 0; i < 10; i ++) { | |||||
assertTrue(entryAndSleepFor(res, 1)); | |||||
} | |||||
verifyState(DegradeRuleManager.getCircuitBreakers(res), -4); | |||||
} | |||||
} | } |
@@ -15,97 +15,71 @@ | |||||
*/ | */ | ||||
package com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker; | package com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker; | ||||
import com.alibaba.csp.sentinel.slots.block.RuleConstant; | |||||
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule; | |||||
import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.CircuitBreaker.State; | |||||
import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.ExceptionCircuitBreaker.SimpleErrorCounter; | |||||
import com.alibaba.csp.sentinel.slots.statistic.base.LeapArray; | |||||
import com.alibaba.csp.sentinel.slots.statistic.base.WindowWrap; | |||||
import com.alibaba.csp.sentinel.test.AbstractTimeBasedTest; | |||||
import static org.junit.Assert.assertFalse; | |||||
import static org.junit.Assert.assertTrue; | |||||
import java.util.ArrayList; | |||||
import java.util.Arrays; | |||||
import org.junit.After; | |||||
import org.junit.Before; | |||||
import org.junit.Test; | import org.junit.Test; | ||||
import static org.junit.Assert.*; | |||||
import static org.mockito.Mockito.mock; | |||||
import static org.mockito.Mockito.when; | |||||
import com.alibaba.csp.sentinel.slots.block.BlockException; | |||||
import com.alibaba.csp.sentinel.slots.block.RuleConstant; | |||||
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule; | |||||
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRuleManager; | |||||
import com.alibaba.csp.sentinel.test.AbstractTimeBasedTest; | |||||
/** | /** | ||||
* @author Eric Zhao | * @author Eric Zhao | ||||
*/ | */ | ||||
public class ExceptionCircuitBreakerTest extends AbstractTimeBasedTest { | public class ExceptionCircuitBreakerTest extends AbstractTimeBasedTest { | ||||
@Before | |||||
public void setUp() { | |||||
DegradeRuleManager.loadRules(new ArrayList<DegradeRule>()); | |||||
} | |||||
@Test | |||||
@SuppressWarnings("unchecked") | |||||
public void testStateChangeAndTryAcquire() { | |||||
int retryTimeout = 10; | |||||
DegradeRule rule = new DegradeRule("abc") | |||||
.setCount(0.5d) | |||||
.setGrade(RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO) | |||||
.setStatIntervalMs(20 * 1000) | |||||
.setTimeWindow(retryTimeout) | |||||
.setMinRequestAmount(10); | |||||
LeapArray<SimpleErrorCounter> stat = mock(LeapArray.class); | |||||
SimpleErrorCounter counter = new SimpleErrorCounter(); | |||||
WindowWrap<SimpleErrorCounter> bucket = new WindowWrap<>(20000, 0, counter); | |||||
when(stat.currentWindow()).thenReturn(bucket); | |||||
ExceptionCircuitBreaker cb = new ExceptionCircuitBreaker(rule, stat); | |||||
assertTrue(cb.tryPass()); | |||||
assertTrue(cb.tryPass()); | |||||
setCurrentMillis(System.currentTimeMillis()); | |||||
cb.fromCloseToOpen(0.52d); | |||||
assertEquals(State.OPEN, cb.currentState()); | |||||
assertFalse(cb.tryPass()); | |||||
assertFalse(cb.tryPass()); | |||||
// Wait for next retry checkpoint. | |||||
sleepSecond(retryTimeout); | |||||
sleep(100); | |||||
// Try a request to trigger state transformation. | |||||
assertTrue(cb.tryPass()); | |||||
assertEquals(State.HALF_OPEN, cb.currentState()); | |||||
// Mark this request as error | |||||
cb.onRequestComplete(20, new IllegalArgumentException()); | |||||
assertEquals(State.OPEN, cb.currentState()); | |||||
// Wait for next retry checkpoint. | |||||
sleepSecond(retryTimeout); | |||||
sleep(100); | |||||
assertTrue(cb.tryPass()); | |||||
assertEquals(State.HALF_OPEN, cb.currentState()); | |||||
setCurrentMillis(System.currentTimeMillis()); | |||||
// Mark this request as success. | |||||
cb.onRequestComplete(20, null); | |||||
assertEquals(State.CLOSED, cb.currentState()); | |||||
@After | |||||
public void tearDown() throws Exception { | |||||
DegradeRuleManager.loadRules(new ArrayList<DegradeRule>()); | |||||
} | } | ||||
@Test | @Test | ||||
@SuppressWarnings("unchecked") | |||||
public void testRecordErrorOrSuccess() { | |||||
public void testRecordErrorOrSuccess() throws BlockException { | |||||
String resource = "testRecordErrorOrSuccess"; | |||||
int retryTimeoutMillis = 10 * 1000; | |||||
int retryTimeout = retryTimeoutMillis / 1000; | |||||
DegradeRule rule = new DegradeRule("abc") | DegradeRule rule = new DegradeRule("abc") | ||||
.setCount(0.5d) | |||||
.setCount(0.2d) | |||||
.setGrade(RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO) | .setGrade(RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO) | ||||
.setStatIntervalMs(20 * 1000) | .setStatIntervalMs(20 * 1000) | ||||
.setTimeWindow(10) | |||||
.setMinRequestAmount(10); | |||||
LeapArray<SimpleErrorCounter> stat = mock(LeapArray.class); | |||||
SimpleErrorCounter counter = new SimpleErrorCounter(); | |||||
WindowWrap<SimpleErrorCounter> bucket = new WindowWrap<>(20000, 0, counter); | |||||
when(stat.currentWindow()).thenReturn(bucket); | |||||
CircuitBreaker cb = new ExceptionCircuitBreaker(rule, stat); | |||||
cb.onRequestComplete(15, null); | |||||
assertEquals(1L, counter.getTotalCount().longValue()); | |||||
assertEquals(0L, counter.getErrorCount().longValue()); | |||||
cb.onRequestComplete(15, new IllegalArgumentException()); | |||||
assertEquals(2L, counter.getTotalCount().longValue()); | |||||
assertEquals(1L, counter.getErrorCount().longValue()); | |||||
.setTimeWindow(retryTimeout) | |||||
.setMinRequestAmount(1); | |||||
rule.setResource(resource); | |||||
DegradeRuleManager.loadRules(Arrays.asList(rule)); | |||||
assertTrue(entryAndSleepFor(resource, 10)); | |||||
assertTrue(entryWithErrorIfPresent(resource, new IllegalArgumentException())); // -> open | |||||
assertFalse(entryWithErrorIfPresent(resource, new IllegalArgumentException())); | |||||
assertFalse(entryAndSleepFor(resource, 100)); | |||||
sleep(retryTimeoutMillis / 2); | |||||
assertFalse(entryAndSleepFor(resource, 100)); | |||||
sleep(retryTimeoutMillis / 2); | |||||
assertTrue(entryWithErrorIfPresent(resource, new IllegalArgumentException())); // -> half -> open | |||||
assertFalse(entryAndSleepFor(resource, 100)); | |||||
assertFalse(entryAndSleepFor(resource, 100)); | |||||
sleep(retryTimeoutMillis); | |||||
assertTrue(entryAndSleepFor(resource, 100)); // -> half -> closed | |||||
assertTrue(entryAndSleepFor(resource, 100)); | |||||
assertTrue(entryAndSleepFor(resource, 100)); | |||||
assertTrue(entryAndSleepFor(resource, 100)); | |||||
assertTrue(entryAndSleepFor(resource, 100)); | |||||
assertTrue(entryAndSleepFor(resource, 100)); | |||||
assertTrue(entryAndSleepFor(resource, 100)); | |||||
assertTrue(entryWithErrorIfPresent(resource, new IllegalArgumentException())); | |||||
assertTrue(entryAndSleepFor(resource, 100)); | |||||
} | } | ||||
} | } |
@@ -15,11 +15,17 @@ | |||||
*/ | */ | ||||
package com.alibaba.csp.sentinel.test; | package com.alibaba.csp.sentinel.test; | ||||
import java.util.concurrent.ThreadLocalRandom; | |||||
import org.junit.runner.RunWith; | import org.junit.runner.RunWith; | ||||
import org.powermock.api.mockito.PowerMockito; | import org.powermock.api.mockito.PowerMockito; | ||||
import org.powermock.core.classloader.annotations.PrepareForTest; | import org.powermock.core.classloader.annotations.PrepareForTest; | ||||
import org.powermock.modules.junit4.PowerMockRunner; | import org.powermock.modules.junit4.PowerMockRunner; | ||||
import com.alibaba.csp.sentinel.Entry; | |||||
import com.alibaba.csp.sentinel.SphU; | |||||
import com.alibaba.csp.sentinel.Tracer; | |||||
import com.alibaba.csp.sentinel.slots.block.BlockException; | |||||
import com.alibaba.csp.sentinel.util.TimeUtil; | import com.alibaba.csp.sentinel.util.TimeUtil; | ||||
/** | /** | ||||
@@ -55,4 +61,39 @@ public abstract class AbstractTimeBasedTest { | |||||
protected final void sleepSecond(int timeSec) { | protected final void sleepSecond(int timeSec) { | ||||
sleep(timeSec * 1000); | sleep(timeSec * 1000); | ||||
} | } | ||||
protected final boolean entryAndSleepFor(String res, int sleepMs) { | |||||
Entry entry = null; | |||||
try { | |||||
entry = SphU.entry(res); | |||||
sleep(sleepMs); | |||||
} catch (BlockException ex) { | |||||
return false; | |||||
} catch (Exception ex) { | |||||
Tracer.traceEntry(ex, entry); | |||||
} finally { | |||||
if (entry != null) { | |||||
entry.exit(); | |||||
} | |||||
} | |||||
return true; | |||||
} | |||||
protected final boolean entryWithErrorIfPresent(String res, Exception ex) { | |||||
Entry entry = null; | |||||
try { | |||||
entry = SphU.entry(res); | |||||
if (ex != null) { | |||||
Tracer.traceEntry(ex, entry); | |||||
} | |||||
sleep(ThreadLocalRandom.current().nextInt(5, 10)); | |||||
} catch (BlockException b) { | |||||
return false; | |||||
} finally { | |||||
if (entry != null) { | |||||
entry.exit(); | |||||
} | |||||
} | |||||
return true; | |||||
} | |||||
} | } |