Browse Source

Refactor degrade hierarchy with new circuit breaker mechanism and improve strategy

* Add `CircuitBreaker` abstraction (with half-open state) and add circuit breaker state change event observer support.
* Improve circuit breaking strategy (avg RT → slow request ratio) and make statistics of each rule dependent (to support arbitrary statistic interval).
* Add simple "trial" mechanism (aka. half-open).
* Refactor mechanism of metric recording and state change handling for circuit breakers: record RT and error when requests have completed (i.e. `onExit`, based on #1420).

Signed-off-by: Eric Zhao <sczyh16@gmail.com>
master
Eric Zhao 4 years ago
parent
commit
c0c27c86b7
10 changed files with 901 additions and 206 deletions
  1. +46
    -119
      sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/DegradeRule.java
  2. +115
    -80
      sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/DegradeRuleManager.java
  3. +50
    -7
      sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/DegradeSlot.java
  4. +147
    -0
      sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/AbstractCircuitBreaker.java
  5. +79
    -0
      sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/CircuitBreaker.java
  6. +42
    -0
      sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/CircuitBreakerStateChangeObserver.java
  7. +46
    -0
      sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/CircuitBreakerStrategy.java
  8. +70
    -0
      sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/EventObserverRegistry.java
  9. +156
    -0
      sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/ExceptionCircuitBreaker.java
  10. +150
    -0
      sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/ResponseTimeCircuitBreaker.java

+ 46
- 119
sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/DegradeRule.java View File

@@ -15,19 +15,12 @@
*/
package com.alibaba.csp.sentinel.slots.block.degrade;

import com.alibaba.csp.sentinel.concurrent.NamedThreadFactory;
import com.alibaba.csp.sentinel.context.Context;
import com.alibaba.csp.sentinel.node.ClusterNode;
import com.alibaba.csp.sentinel.node.DefaultNode;
import com.alibaba.csp.sentinel.slots.block.AbstractRule;
import com.alibaba.csp.sentinel.slots.block.RuleConstant;
import com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.Objects;

/**
* <p>
@@ -52,13 +45,10 @@ import java.util.concurrent.atomic.AtomicLong;
* </ul>
*
* @author jialiang.linjl
* @author Eric Zhao
*/
public class DegradeRule extends AbstractRule {

@SuppressWarnings("PMD.ThreadPoolCreationRule")
private static ScheduledExecutorService pool = Executors.newScheduledThreadPool(
Runtime.getRuntime().availableProcessors(), new NamedThreadFactory("sentinel-degrade-reset-task", true));

public DegradeRule() {}

public DegradeRule(String resourceName) {
@@ -66,33 +56,34 @@ public class DegradeRule extends AbstractRule {
}

/**
* RT threshold or exception ratio threshold count.
* Circuit breaking strategy (0: average RT, 1: exception ratio, 2: exception count).
*/
private double count;
private int grade = RuleConstant.DEGRADE_GRADE_RT;

/**
* Degrade recover timeout (in seconds) when degradation occurs.
* Threshold count.
*/
private int timeWindow;
private double count;

/**
* Degrade strategy (0: average RT, 1: exception ratio, 2: exception count).
* Recovery timeout (in seconds) when circuit breaker opens. After the timeout, the circuit breaker will
* transform to half-open state for trying a few requests.
*/
private int grade = RuleConstant.DEGRADE_GRADE_RT;
private int timeWindow;

/**
* Minimum number of consecutive slow requests that can trigger RT circuit breaking.
* Minimum number of requests (in an active statistic time span) that can trigger circuit breaking.
*
* @since 1.7.0
*/
private int rtSlowRequestAmount = RuleConstant.DEGRADE_DEFAULT_SLOW_REQUEST_AMOUNT;
private int minRequestAmount = RuleConstant.DEGRADE_DEFAULT_MIN_REQUEST_AMOUNT;

/**
* Minimum number of requests (in an active statistic time span) that can trigger circuit breaking.
*
* @since 1.7.0
* The threshold of slow request ratio in RT mode.
*/
private int minRequestAmount = RuleConstant.DEGRADE_DEFAULT_MIN_REQUEST_AMOUNT;
private double slowRatioThreshold = 1.0d;

private int statIntervalMs = 1000;

public int getGrade() {
return grade;
@@ -121,21 +112,30 @@ public class DegradeRule extends AbstractRule {
return this;
}

public int getRtSlowRequestAmount() {
return rtSlowRequestAmount;
public int getMinRequestAmount() {
return minRequestAmount;
}

public DegradeRule setRtSlowRequestAmount(int rtSlowRequestAmount) {
this.rtSlowRequestAmount = rtSlowRequestAmount;
public DegradeRule setMinRequestAmount(int minRequestAmount) {
this.minRequestAmount = minRequestAmount;
return this;
}

public int getMinRequestAmount() {
return minRequestAmount;
public double getSlowRatioThreshold() {
return slowRatioThreshold;
}

public DegradeRule setMinRequestAmount(int minRequestAmount) {
this.minRequestAmount = minRequestAmount;
public DegradeRule setSlowRatioThreshold(double slowRatioThreshold) {
this.slowRatioThreshold = slowRatioThreshold;
return this;
}

public int getStatIntervalMs() {
return statIntervalMs;
}

public DegradeRule setStatIntervalMs(int statIntervalMs) {
this.statIntervalMs = statIntervalMs;
return this;
}

@@ -144,23 +144,19 @@ public class DegradeRule extends AbstractRule {
if (this == o) { return true; }
if (o == null || getClass() != o.getClass()) { return false; }
if (!super.equals(o)) { return false; }
DegradeRule that = (DegradeRule) o;
return Double.compare(that.count, count) == 0 &&
timeWindow == that.timeWindow &&
grade == that.grade &&
rtSlowRequestAmount == that.rtSlowRequestAmount &&
minRequestAmount == that.minRequestAmount;
DegradeRule rule = (DegradeRule)o;
return Double.compare(rule.count, count) == 0 &&
timeWindow == rule.timeWindow &&
grade == rule.grade &&
minRequestAmount == rule.minRequestAmount &&
Double.compare(rule.slowRatioThreshold, slowRatioThreshold) == 0 &&
statIntervalMs == rule.statIntervalMs;
}

@Override
public int hashCode() {
int result = super.hashCode();
result = 31 * result + new Double(count).hashCode();
result = 31 * result + timeWindow;
result = 31 * result + grade;
result = 31 * result + rtSlowRequestAmount;
result = 31 * result + minRequestAmount;
return result;
return Objects.hash(super.hashCode(), count, timeWindow, grade, minRequestAmount,
slowRatioThreshold, statIntervalMs);
}

@Override
@@ -171,84 +167,15 @@ public class DegradeRule extends AbstractRule {
", count=" + count +
", limitApp=" + getLimitApp() +
", timeWindow=" + timeWindow +
", rtSlowRequestAmount=" + rtSlowRequestAmount +
", minRequestAmount=" + minRequestAmount +
"}";
", slowRatioThreshold=" + slowRatioThreshold +
", statIntervalMs=" + statIntervalMs +
'}';
}

// Internal implementation (will be deprecated and moved outside).

private AtomicLong passCount = new AtomicLong(0);
private final AtomicBoolean cut = new AtomicBoolean(false);

@Override
public boolean passCheck(Context context, DefaultNode node, int acquireCount, Object... args) {
if (cut.get()) {
return false;
}

ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(this.getResource());
if (clusterNode == null) {
return true;
}

if (grade == RuleConstant.DEGRADE_GRADE_RT) {
double rt = clusterNode.avgRt();
if (rt < this.count) {
passCount.set(0);
return true;
}

// Sentinel will degrade the service only if count exceeds.
if (passCount.incrementAndGet() < rtSlowRequestAmount) {
return true;
}
} else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO) {
double exception = clusterNode.exceptionQps();
double success = clusterNode.successQps();
double total = clusterNode.totalQps();
// If total amount is less than minRequestAmount, the request will pass.
if (total < minRequestAmount) {
return true;
}

// In the same aligned statistic time window,
// "success" (aka. completed count) = exception count + non-exception count (realSuccess)
double realSuccess = success - exception;
if (realSuccess <= 0 && exception < minRequestAmount) {
return true;
}

if (exception / success < count) {
return true;
}
} else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT) {
double exception = clusterNode.totalException();
if (exception < count) {
return true;
}
}

if (cut.compareAndSet(false, true)) {
ResetTask resetTask = new ResetTask(this);
pool.schedule(resetTask, timeWindow, TimeUnit.SECONDS);
}

@Deprecated
public boolean passCheck(Context context, DefaultNode node, int count, Object... args) {
return false;
}

private static final class ResetTask implements Runnable {

private DegradeRule rule;

ResetTask(DegradeRule rule) {
this.rule = rule;
}

@Override
public void run() {
rule.passCount.set(0);
rule.cut.set(false);
}
}
}

+ 115
- 80
sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/DegradeRuleManager.java View File

@@ -21,29 +21,29 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import com.alibaba.csp.sentinel.config.SentinelConfig;
import com.alibaba.csp.sentinel.context.Context;
import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.node.DefaultNode;
import com.alibaba.csp.sentinel.property.DynamicSentinelProperty;
import com.alibaba.csp.sentinel.property.PropertyListener;
import com.alibaba.csp.sentinel.property.SentinelProperty;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.alibaba.csp.sentinel.slots.block.RuleConstant;
import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.CircuitBreaker;
import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.ExceptionCircuitBreaker;
import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.ResponseTimeCircuitBreaker;
import com.alibaba.csp.sentinel.util.AssertUtil;
import com.alibaba.csp.sentinel.util.StringUtil;

/**
* The rule manager for circuit breaking rules ({@link DegradeRule}).
*
* @author youji.zj
* @author jialiang.linjl
* @author Eric Zhao
*/
public final class DegradeRuleManager {

private static final Map<String, Set<DegradeRule>> degradeRules = new ConcurrentHashMap<>();
private static volatile Map<String, List<CircuitBreaker>> circuitBreakers = new HashMap<>();
private static volatile Map<String, Set<DegradeRule>> ruleMap = new HashMap<>();

private static final RulePropertyListener LISTENER = new RulePropertyListener();
private static SentinelProperty<List<DegradeRule>> currentProperty
@@ -69,41 +69,37 @@ public final class DegradeRuleManager {
}
}

public static void checkDegrade(ResourceWrapper resource, Context context, DefaultNode node, int count)
throws BlockException {

Set<DegradeRule> rules = degradeRules.get(resource.getName());
if (rules == null) {
return;
}

for (DegradeRule rule : rules) {
if (!rule.passCheck(context, node, count)) {
throw new DegradeException(rule.getLimitApp(), rule);
}
}
static List<CircuitBreaker> getCircuitBreakers(String resourceName) {
return circuitBreakers.get(resourceName);
}

public static boolean hasConfig(String resource) {
if (resource == null) {
return false;
}
return degradeRules.containsKey(resource);
return circuitBreakers.containsKey(resource);
}

/**
* Get a copy of the rules.
* <p>Get existing circuit breaking rules.</p>
* <p>Note: DO NOT modify the rules from the returned list directly.
* The behavior is <strong>undefined</strong>.</p>
*
* @return a new copy of the rules.
* @return list of existing circuit breaking rules, or empty list if no rules were loaded
*/
public static List<DegradeRule> getRules() {
List<DegradeRule> rules = new ArrayList<>();
for (Map.Entry<String, Set<DegradeRule>> entry : degradeRules.entrySet()) {
for (Map.Entry<String, Set<DegradeRule>> entry : ruleMap.entrySet()) {
rules.addAll(entry.getValue());
}
return rules;
}

public static Set<DegradeRule> getRulesOfResource(String resource) {
AssertUtil.assertNotBlank(resource, "resource name cannot be blank");
return ruleMap.get(resource);
}

/**
* Load {@link DegradeRule}s, former rules will be replaced.
*
@@ -113,7 +109,7 @@ public final class DegradeRuleManager {
try {
currentProperty.updateValue(rules);
} catch (Throwable e) {
RecordLog.warn("[DegradeRuleManager] Unexpected error when loading degrade rules", e);
RecordLog.error("[DegradeRuleManager] Unexpected error when loading degrade rules", e);
}
}

@@ -128,7 +124,7 @@ public final class DegradeRuleManager {
public static boolean setRulesForResource(String resourceName, Set<DegradeRule> rules) {
AssertUtil.notEmpty(resourceName, "resourceName cannot be empty");
try {
Map<String, Set<DegradeRule>> newRuleMap = new HashMap<>(degradeRules);
Map<String, Set<DegradeRule>> newRuleMap = new HashMap<>(ruleMap);
if (rules == null) {
newRuleMap.remove(resourceName);
} else {
@@ -146,88 +142,127 @@ public final class DegradeRuleManager {
}
return currentProperty.updateValue(allRules);
} catch (Throwable e) {
RecordLog.warn(
"[DegradeRuleManager] Unexpected error when setting degrade rules for resource: " + resourceName, e);
RecordLog.error("[DegradeRuleManager] Unexpected error when setting circuit breaking"
+ " rules for resource: " + resourceName, e);
return false;
}
}

private static CircuitBreaker getExistingSameCbOrNew(/*@Valid*/ DegradeRule rule) {
List<CircuitBreaker> cbs = getCircuitBreakers(rule.getResource());
if (cbs == null || cbs.isEmpty()) {
return newCircuitBreakerFrom(rule);
}
for (CircuitBreaker cb : cbs) {
if (rule.equals(cb.getRule())) {
// Reuse the circuit breaker if the rule remains unchanged.
return cb;
}
}
return newCircuitBreakerFrom(rule);
}

/**
* Create a circuit breaker instance from provided circuit breaking rule.
*
* @param rule a valid circuit breaking rule
* @return new circuit breaker based on provided rule; null if rule is invalid or unsupported type
*/
private static CircuitBreaker newCircuitBreakerFrom(/*@Valid*/ DegradeRule rule) {
switch (rule.getGrade()) {
case RuleConstant.DEGRADE_GRADE_RT:
return new ResponseTimeCircuitBreaker(rule);
case RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO:
case RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT:
return new ExceptionCircuitBreaker(rule);
default:
return null;
}
}

public static boolean isValidRule(DegradeRule rule) {
boolean baseValid = rule != null && !StringUtil.isBlank(rule.getResource())
&& rule.getCount() >= 0 && rule.getTimeWindow() > 0;
if (!baseValid) {
return false;
}
if (rule.getMinRequestAmount() <= 0 || rule.getStatIntervalMs() <= 0) {
return false;
}
switch (rule.getGrade()) {
case RuleConstant.DEGRADE_GRADE_RT:
return rule.getSlowRatioThreshold() >= 0 && rule.getSlowRatioThreshold() <= 1;
case RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO:
return rule.getCount() <= 1;
case RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT:
return true;
default:
return false;
}
}

private static class RulePropertyListener implements PropertyListener<List<DegradeRule>> {

private synchronized void reloadFrom(List<DegradeRule> list) {
Map<String, List<CircuitBreaker>> cbs = buildCircuitBreakers(list);
Map<String, Set<DegradeRule>> rm = new HashMap<>(cbs.size());

for (Map.Entry<String, List<CircuitBreaker>> e : cbs.entrySet()) {
assert e.getValue() != null && !e.getValue().isEmpty();

Set<DegradeRule> rules = new HashSet<>(e.getValue().size());
for (CircuitBreaker cb : e.getValue()) {
rules.add(cb.getRule());
}
rm.put(e.getKey(), rules);
}

DegradeRuleManager.circuitBreakers = cbs;
DegradeRuleManager.ruleMap = rm;
}

@Override
public void configUpdate(List<DegradeRule> conf) {
Map<String, Set<DegradeRule>> rules = loadDegradeConf(conf);
if (rules != null) {
degradeRules.clear();
degradeRules.putAll(rules);
}
RecordLog.info("[DegradeRuleManager] Degrade rules received: " + degradeRules);
reloadFrom(conf);
RecordLog.info("[DegradeRuleManager] Degrade rules has been updated to: " + ruleMap);
}

@Override
public void configLoad(List<DegradeRule> conf) {
Map<String, Set<DegradeRule>> rules = loadDegradeConf(conf);
if (rules != null) {
degradeRules.clear();
degradeRules.putAll(rules);
}
RecordLog.info("[DegradeRuleManager] Degrade rules loaded: " + degradeRules);
reloadFrom(conf);
RecordLog.info("[DegradeRuleManager] Degrade rules loaded: " + ruleMap);
}

private Map<String, Set<DegradeRule>> loadDegradeConf(List<DegradeRule> list) {
Map<String, Set<DegradeRule>> newRuleMap = new ConcurrentHashMap<>();

private Map<String, List<CircuitBreaker>> buildCircuitBreakers(List<DegradeRule> list) {
Map<String, List<CircuitBreaker>> cbMap = new HashMap<>(8);
if (list == null || list.isEmpty()) {
return newRuleMap;
return cbMap;
}

for (DegradeRule rule : list) {
if (!isValidRule(rule)) {
RecordLog.warn(
"[DegradeRuleManager] Ignoring invalid degrade rule when loading new rules: " + rule);
RecordLog.warn("[DegradeRuleManager] Ignoring invalid rule when loading new rules: " + rule);
continue;
}

if (StringUtil.isBlank(rule.getLimitApp())) {
rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
}

String identity = rule.getResource();
Set<DegradeRule> ruleSet = newRuleMap.get(identity);
if (ruleSet == null) {
ruleSet = new HashSet<>();
newRuleMap.put(identity, ruleSet);
CircuitBreaker cb = getExistingSameCbOrNew(rule);
if (cb == null) {
RecordLog.warn("[DegradeRuleManager] Unknown circuit breaking strategy, ignoring: " + rule);
continue;
}
ruleSet.add(rule);
}

return newRuleMap;
}
}
String resourceName = rule.getResource();

public static boolean isValidRule(DegradeRule rule) {
boolean baseValid = rule != null && !StringUtil.isBlank(rule.getResource())
&& rule.getCount() >= 0 && rule.getTimeWindow() > 0;
if (!baseValid) {
return false;
}
int maxAllowedRt = SentinelConfig.statisticMaxRt();
if (rule.getGrade() == RuleConstant.DEGRADE_GRADE_RT) {
if (rule.getRtSlowRequestAmount() <= 0) {
return false;
}
// Warn for RT mode that exceeds the {@code TIME_DROP_VALVE}.
if (rule.getCount() > maxAllowedRt) {
RecordLog.warn(String.format("[DegradeRuleManager] WARN: setting large RT threshold (%.1f ms)"
+ " in RT mode will not take effect since it exceeds the max allowed value (%d ms)",
rule.getCount(), maxAllowedRt));
List<CircuitBreaker> cbList = cbMap.get(resourceName);
if (cbList == null) {
cbList = new ArrayList<>();
cbMap.put(resourceName, cbList);
}
cbList.add(cb);
}
return cbMap;
}

// Check exception ratio mode.
if (rule.getGrade() == RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO) {
return rule.getCount() <= 1 && rule.getMinRequestAmount() > 0;
}
return true;
}
}

+ 50
- 7
sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/DegradeSlot.java View File

@@ -15,30 +15,73 @@
*/
package com.alibaba.csp.sentinel.slots.block.degrade;

import java.util.List;

import com.alibaba.csp.sentinel.Entry;
import com.alibaba.csp.sentinel.context.Context;
import com.alibaba.csp.sentinel.node.DefaultNode;
import com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot;
import com.alibaba.csp.sentinel.slotchain.ProcessorSlot;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker.CircuitBreaker;
import com.alibaba.csp.sentinel.spi.SpiOrder;
import com.alibaba.csp.sentinel.util.TimeUtil;

/**
* A {@link ProcessorSlot} dedicates to {@link DegradeRule} checking.
* A {@link ProcessorSlot} dedicates to circuit breaking.
*
* @author leyou
* @author Carpenter Lee
* @author Eric Zhao
*/
@SpiOrder(-1000)
public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)
throws Throwable {
DegradeRuleManager.checkDegrade(resourceWrapper, context, node, count);
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
performChecking(resourceWrapper);

fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

void performChecking(ResourceWrapper r) throws BlockException {
List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
if (circuitBreakers == null || circuitBreakers.isEmpty()) {
return;
}
for (CircuitBreaker cb : circuitBreakers) {
if (!cb.tryPass()) {
throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
}
}
}

@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
fireExit(context, resourceWrapper, count, args);
public void exit(Context context, ResourceWrapper r, int count, Object... args) {
Entry curEntry = context.getCurEntry();
if (curEntry.getBlockError() != null) {
fireExit(context, r, count, args);
return;
}
List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
if (circuitBreakers == null || circuitBreakers.isEmpty()) {
fireExit(context, r, count, args);
return;
}

if (curEntry.getBlockError() == null) {
long completeTime = curEntry.getCompleteTimestamp();
if (completeTime <= 0) {
completeTime = TimeUtil.currentTimeMillis();
}
long rt = completeTime - curEntry.getCreateTimestamp();
Throwable error = curEntry.getError();
for (CircuitBreaker circuitBreaker : circuitBreakers) {
circuitBreaker.onRequestComplete(rt, error);
}
}

fireExit(context, r, count, args);
}
}

+ 147
- 0
sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/AbstractCircuitBreaker.java View File

@@ -0,0 +1,147 @@
/*
* Copyright 1999-2019 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker;

import java.util.concurrent.atomic.AtomicReference;

import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRuleManager;
import com.alibaba.csp.sentinel.util.AssertUtil;
import com.alibaba.csp.sentinel.util.TimeUtil;

/**
* @author Eric Zhao
* @since 1.8.0
*/
public abstract class AbstractCircuitBreaker implements CircuitBreaker {

protected final DegradeRule rule;
protected final int recoveryTimeoutMs;

private final EventObserverRegistry observerRegistry;

protected final AtomicReference<State> currentState = new AtomicReference<>(State.CLOSED);
protected volatile long nextRetryTimestamp;

public AbstractCircuitBreaker(DegradeRule rule) {
this(rule, EventObserverRegistry.getInstance());
}

AbstractCircuitBreaker(DegradeRule rule, EventObserverRegistry observerRegistry) {
AssertUtil.notNull(observerRegistry, "observerRegistry cannot be null");
if (!DegradeRuleManager.isValidRule(rule)) {
throw new IllegalArgumentException("Invalid DegradeRule: " + rule);
}
this.observerRegistry = observerRegistry;
this.rule = rule;
this.recoveryTimeoutMs = rule.getTimeWindow() * 1000;
}

@Override
public DegradeRule getRule() {
return rule;
}

@Override
public State currentState() {
return currentState.get();
}

@Override
public boolean tryPass() {
// Template implementation.
if (currentState.get() == State.CLOSED) {
return true;
}
if (currentState.get() == State.OPEN) {
// For half-open state we allow a request for trial.
return retryTimeoutArrived() && fromOpenToHalfOpen();
}
return false;
}

/**
* Reset the statistic data.
*/
abstract void resetStat();

protected boolean retryTimeoutArrived() {
return TimeUtil.currentTimeMillis() >= nextRetryTimestamp;
}

protected void updateNextRetryTimestamp() {
this.nextRetryTimestamp = TimeUtil.currentTimeMillis() + recoveryTimeoutMs;
}

protected boolean fromCloseToOpen(double snapshotValue) {
State prev = State.CLOSED;
if (currentState.compareAndSet(prev, State.OPEN)) {
updateNextRetryTimestamp();

for (CircuitBreakerStateChangeObserver observer : observerRegistry.getStateChangeObservers()) {
observer.onStateChange(prev, State.OPEN, rule, snapshotValue);
}
return true;
}
return false;
}

protected boolean fromOpenToHalfOpen() {
if (currentState.compareAndSet(State.OPEN, State.HALF_OPEN)) {
for (CircuitBreakerStateChangeObserver observer : observerRegistry.getStateChangeObservers()) {
observer.onStateChange(State.OPEN, State.HALF_OPEN, rule, null);
}
return true;
}
return false;
}

protected boolean fromHalfOpenToOpen(double snapshotValue) {
if (currentState.compareAndSet(State.HALF_OPEN, State.OPEN)) {
updateNextRetryTimestamp();
for (CircuitBreakerStateChangeObserver observer : observerRegistry.getStateChangeObservers()) {
observer.onStateChange(State.HALF_OPEN, State.OPEN, rule, snapshotValue);
}
return true;
}
return false;
}

protected boolean fromHalfOpenToClose() {
if (currentState.compareAndSet(State.HALF_OPEN, State.CLOSED)) {
resetStat();
for (CircuitBreakerStateChangeObserver observer : observerRegistry.getStateChangeObservers()) {
observer.onStateChange(State.HALF_OPEN, State.CLOSED, rule, null);
}
return true;
}
return false;
}

protected void transformToOpen(double triggerValue) {
State cs = currentState.get();
switch (cs) {
case CLOSED:
fromCloseToOpen(triggerValue);
break;
case HALF_OPEN:
fromHalfOpenToOpen(triggerValue);
break;
default:
break;
}
}
}

+ 79
- 0
sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/CircuitBreaker.java View File

@@ -0,0 +1,79 @@
/*
* Copyright 1999-2019 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker;

import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule;

/**
* <p>Basic <a href="https://martinfowler.com/bliki/CircuitBreaker.html">circuit breaker</a> interface.</p>
*
* @author Eric Zhao
*/
public interface CircuitBreaker {

/**
* Get the associated circuit breaking rule.
*
* @return associated circuit breaking rule
*/
DegradeRule getRule();

/**
* Acquires permission of an invocation only if it is available at the time of invocation.
*
* @return {@code true} if permission was acquired and {@code false} otherwise
*/
boolean tryPass();

/**
* Get current state of the circuit breaker.
*
* @return current state of the circuit breaker
*/
State currentState();

/**
* Record a completed request with the given response time and error (if present) and
* handle state transformation of the circuit breaker.
*
* @param rt the response time of this entry
* @param error the error of this entry (if present)
*/
void onRequestComplete(long rt, Throwable error);

/**
* Circuit breaker state.
*/
enum State {
/**
* In {@code OPEN} state, all requests will be rejected until the next recovery time point.
*/
OPEN,
/**
* In {@code HALF_OPEN} state, the circuit breaker will allow a "probe" invocation.
* If the invocation is abnormal according to the strategy (e.g. it's slow), the circuit breaker
* will re-transform to the {@code OPEN} state and wait for the next recovery time point;
* otherwise the resource will be regarded as "recovered" and the circuit breaker
* will cease cutting off requests and transform to {@code CLOSED} state.
*/
HALF_OPEN,
/**
* In {@code CLOSED} state, all requests are permitted. When current metric value exceeds the threshold,
* the circuit breaker will transform to {@code OPEN} state.
*/
CLOSED
}
}

+ 42
- 0
sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/CircuitBreakerStateChangeObserver.java View File

@@ -0,0 +1,42 @@
/*
* Copyright 1999-2019 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker;

import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule;

/**
* @author Eric Zhao
* @since 1.8.0
*/
public interface CircuitBreakerStateChangeObserver {

/**
* <p>Observer method triggered when circuit breaker state changed. The transformation could be:</p>
* <ul>
* <li>From {@code CLOSED} to {@code OPEN} (with the triggered metric)</li>
* <li>From {@code OPEN} to {@code HALF_OPEN}</li>
* <li>From {@code OPEN} to {@code CLOSED}</li>
* <li>From {@code HALF_OPEN} to {@code OPEN} (with the triggered metric)</li>
* </ul>
*
* @param prevState previous state of the circuit breaker
* @param newState new state of the circuit breaker
* @param rule associated rule
* @param snapshotValue triggered value on circuit breaker opens (null if the new state is CLOSED or HALF_OPEN)
*/
void onStateChange(CircuitBreaker.State prevState, CircuitBreaker.State newState, DegradeRule rule,
Double snapshotValue);
}

+ 46
- 0
sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/CircuitBreakerStrategy.java View File

@@ -0,0 +1,46 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker;

/**
* @author Eric Zhao
* @since 1.8.0
*/
public enum CircuitBreakerStrategy {

/**
* Circuit breaker opens (cuts off) when slow request ratio exceeds the threshold.
*/
SLOW_REQUEST_RATIO(0),
/**
* Circuit breaker opens (cuts off) when error ratio exceeds the threshold.
*/
ERROR_RATIO(1),
/**
* Circuit breaker opens (cuts off) when error count exceeds the threshold.
*/
ERROR_COUNT(2);

private int type;

CircuitBreakerStrategy(int type) {
this.type = type;
}

public int getType() {
return type;
}
}

+ 70
- 0
sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/EventObserverRegistry.java View File

@@ -0,0 +1,70 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.alibaba.csp.sentinel.util.AssertUtil;

/**
* <p>Registry for circuit breaker event observers.</p>
*
* @author Eric Zhao
* @since 1.8.0
*/
public class EventObserverRegistry {

private final Map<String, CircuitBreakerStateChangeObserver> stateChangeObserverMap = new HashMap<>();

/**
* Register a circuit breaker state change observer.
*
* @param name observer name
* @param observer a valid observer
*/
public void addStateChangeObserver(String name, CircuitBreakerStateChangeObserver observer) {
AssertUtil.notNull(name, "name cannot be null");
AssertUtil.notNull(observer, "observer cannot be null");
stateChangeObserverMap.put(name, observer);
}

public boolean removeStateChangeObserver(String name) {
AssertUtil.notNull(name, "name cannot be null");
return stateChangeObserverMap.remove(name) != null;
}

/**
* Get all registered state chane observers.
*
* @return all registered state chane observers
*/
public List<CircuitBreakerStateChangeObserver> getStateChangeObservers() {
return new ArrayList<>(stateChangeObserverMap.values());
}

public static EventObserverRegistry getInstance() {
return InstanceHolder.instance;
}

private static class InstanceHolder {
private static EventObserverRegistry instance = new EventObserverRegistry();
}

EventObserverRegistry() {}
}

+ 156
- 0
sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/ExceptionCircuitBreaker.java View File

@@ -0,0 +1,156 @@
/*
* Copyright 1999-2019 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker;

import java.util.List;

import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule;
import com.alibaba.csp.sentinel.slots.statistic.base.LeapArray;
import com.alibaba.csp.sentinel.slots.statistic.base.LongAdder;
import com.alibaba.csp.sentinel.slots.statistic.base.WindowWrap;
import com.alibaba.csp.sentinel.util.AssertUtil;

import static com.alibaba.csp.sentinel.slots.block.RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO;
import static com.alibaba.csp.sentinel.slots.block.RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT;

/**
* @author Eric Zhao
* @since 1.8.0
*/
public class ExceptionCircuitBreaker extends AbstractCircuitBreaker {

private final int strategy;
private final int minRequestAmount;
private final double threshold;

private final LeapArray<SimpleErrorCounter> stat;

public ExceptionCircuitBreaker(DegradeRule rule) {
this(rule, new SimpleErrorCounterLeapArray(1, rule.getStatIntervalMs()));
}

ExceptionCircuitBreaker(DegradeRule rule, LeapArray<SimpleErrorCounter> stat) {
super(rule);
this.strategy = rule.getGrade();
boolean modeOk = strategy == DEGRADE_GRADE_EXCEPTION_RATIO || strategy == DEGRADE_GRADE_EXCEPTION_COUNT;
AssertUtil.isTrue(modeOk, "rule strategy should be error-ratio or error-count");
AssertUtil.notNull(stat, "stat cannot be null");
this.minRequestAmount = rule.getMinRequestAmount();
this.threshold = rule.getCount();
this.stat = stat;
}

@Override
protected void resetStat() {
// Reset current bucket (bucket count = 1).
stat.currentWindow().value().reset();
}

@Override
public void onRequestComplete(long rt, Throwable error) {
SimpleErrorCounter counter = stat.currentWindow().value();
if (error != null) {
counter.getErrorCount().add(1);
}
counter.getTotalCount().add(1);

handleStateChangeWhenThresholdExceeded(error);
}

private void handleStateChangeWhenThresholdExceeded(Throwable error) {
if (currentState.get() == State.OPEN) {
return;
}
if (currentState.get() == State.HALF_OPEN) {
if (error == null) {
fromHalfOpenToClose();
} else {
fromHalfOpenToOpen(1.0d);
}
return;
}
List<SimpleErrorCounter> counters = stat.values();
long errCount = 0;
long totalCount = 0;
for (SimpleErrorCounter counter : counters) {
errCount += counter.errorCount.sum();
totalCount += counter.totalCount.sum();
}
if (totalCount < minRequestAmount) {
return;
}
double curCount = errCount;
if (strategy == DEGRADE_GRADE_EXCEPTION_RATIO) {
// Use errorRatio
curCount = errCount * 1.0d / totalCount;
}
if (curCount > threshold) {
transformToOpen(curCount);
}
}

static class SimpleErrorCounter {
private LongAdder errorCount;
private LongAdder totalCount;

public SimpleErrorCounter() {
this.errorCount = new LongAdder();
this.totalCount = new LongAdder();
}

public LongAdder getErrorCount() {
return errorCount;
}

public LongAdder getTotalCount() {
return totalCount;
}

public SimpleErrorCounter reset() {
errorCount.reset();
totalCount.reset();
return this;
}

@Override
public String toString() {
return "SimpleErrorCounter{" +
"errorCount=" + errorCount +
", totalCount=" + totalCount +
'}';
}
}

static class SimpleErrorCounterLeapArray extends LeapArray<SimpleErrorCounter> {

public SimpleErrorCounterLeapArray(int sampleCount, int intervalInMs) {
super(sampleCount, intervalInMs);
}

@Override
public SimpleErrorCounter newEmptyBucket(long timeMillis) {
return new SimpleErrorCounter();
}

@Override
protected WindowWrap<SimpleErrorCounter> resetWindowTo(WindowWrap<SimpleErrorCounter> w, long startTime) {
// Update the start time and reset value.
w.resetTo(startTime);
w.value().reset();
return w;
}
}
}

+ 150
- 0
sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/degrade/circuitbreaker/ResponseTimeCircuitBreaker.java View File

@@ -0,0 +1,150 @@
/*
* Copyright 1999-2019 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.slots.block.degrade.circuitbreaker;

import java.util.List;

import com.alibaba.csp.sentinel.slots.block.RuleConstant;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule;
import com.alibaba.csp.sentinel.slots.statistic.base.LeapArray;
import com.alibaba.csp.sentinel.slots.statistic.base.LongAdder;
import com.alibaba.csp.sentinel.slots.statistic.base.WindowWrap;
import com.alibaba.csp.sentinel.util.AssertUtil;

/**
* @author Eric Zhao
* @since 1.8.0
*/
public class ResponseTimeCircuitBreaker extends AbstractCircuitBreaker {

private final long maxAllowedRt;
private final double maxSlowRequestRatio;
private final int minRequestAmount;

private final LeapArray<SlowRequestCounter> slidingCounter;

public ResponseTimeCircuitBreaker(DegradeRule rule) {
this(rule, new SlowRequestLeapArray(1, rule.getStatIntervalMs()));
}

ResponseTimeCircuitBreaker(DegradeRule rule, LeapArray<SlowRequestCounter> stat) {
super(rule);
AssertUtil.isTrue(rule.getGrade() == RuleConstant.DEGRADE_GRADE_RT, "rule metric type should be RT");
AssertUtil.notNull(stat, "stat cannot be null");
this.maxAllowedRt = Math.round(rule.getCount());
this.maxSlowRequestRatio = rule.getSlowRatioThreshold();
this.minRequestAmount = rule.getMinRequestAmount();
this.slidingCounter = stat;
}

@Override
public void resetStat() {
// Reset current bucket (bucket count = 1).
slidingCounter.currentWindow().value().reset();
}

@Override
public void onRequestComplete(long rt, Throwable error) {
SlowRequestCounter counter = slidingCounter.currentWindow().value();
if (rt > maxAllowedRt) {
counter.slowCount.add(1);
}
counter.totalCount.add(1);

handleStateChangeWhenThresholdExceeded(rt);
}

private void handleStateChangeWhenThresholdExceeded(long rt) {
if (currentState.get() == State.OPEN) {
return;
}
if (currentState.get() == State.HALF_OPEN) {
// TODO: improve logic for half-open recovery
if (rt > maxAllowedRt) {
fromHalfOpenToOpen(1.0d);
} else {
fromHalfOpenToClose();
}
return;
}

List<SlowRequestCounter> counters = slidingCounter.values();
long slowCount = 0;
long totalCount = 0;
for (SlowRequestCounter counter : counters) {
slowCount += counter.slowCount.sum();
totalCount += counter.totalCount.sum();
}
if (totalCount < minRequestAmount) {
return;
}
double currentRatio = slowCount * 1.0d / totalCount;
if (currentRatio > maxSlowRequestRatio) {
transformToOpen(currentRatio);
}
}

static class SlowRequestCounter {
private LongAdder slowCount;
private LongAdder totalCount;

public SlowRequestCounter() {
this.slowCount = new LongAdder();
this.totalCount = new LongAdder();
}

public LongAdder getSlowCount() {
return slowCount;
}

public LongAdder getTotalCount() {
return totalCount;
}

public SlowRequestCounter reset() {
slowCount.reset();
totalCount.reset();
return this;
}

@Override
public String toString() {
return "SlowRequestCounter{" +
"slowCount=" + slowCount +
", totalCount=" + totalCount +
'}';
}
}

static class SlowRequestLeapArray extends LeapArray<SlowRequestCounter> {

public SlowRequestLeapArray(int sampleCount, int intervalInMs) {
super(sampleCount, intervalInMs);
}

@Override
public SlowRequestCounter newEmptyBucket(long timeMillis) {
return new SlowRequestCounter();
}

@Override
protected WindowWrap<SlowRequestCounter> resetWindowTo(WindowWrap<SlowRequestCounter> w, long startTime) {
w.resetTo(startTime);
w.value().reset();
return w;
}
}
}

Loading…
Cancel
Save