- Remove "borrow-from-ref" mode - Improve flow checker to support both embedded server mode and client mode Signed-off-by: Eric Zhao <sczyh16@gmail.com>master
@@ -15,8 +15,6 @@ | |||
*/ | |||
package com.alibaba.csp.sentinel.cluster.log; | |||
import java.io.File; | |||
import com.alibaba.csp.sentinel.eagleeye.EagleEye; | |||
import com.alibaba.csp.sentinel.eagleeye.StatLogger; | |||
import com.alibaba.csp.sentinel.log.LogBase; | |||
@@ -26,16 +24,16 @@ import com.alibaba.csp.sentinel.log.LogBase; | |||
* @author Eric Zhao | |||
* @since 1.4.0 | |||
*/ | |||
public final class ClusterStatLogUtil { | |||
public final class ClusterClientStatLogUtil { | |||
private static final String FILE_NAME = "sentinel-cluster.log"; | |||
private static final String FILE_NAME = "sentinel-cluster-client.log"; | |||
private static StatLogger statLogger; | |||
static { | |||
String path = LogBase.getLogBaseDir() + FILE_NAME; | |||
statLogger = EagleEye.statLoggerBuilder("sentinel-cluster-record") | |||
statLogger = EagleEye.statLoggerBuilder("sentinel-cluster-client-record") | |||
.intervalSeconds(1) | |||
.entryDelimiter('|') | |||
.keyDelimiter(',') | |||
@@ -55,5 +53,5 @@ public final class ClusterStatLogUtil { | |||
statLogger.stat(msg).count(count); | |||
} | |||
private ClusterStatLogUtil() {} | |||
private ClusterClientStatLogUtil() {} | |||
} |
@@ -22,7 +22,7 @@ package com.alibaba.csp.sentinel.slots.block; | |||
public final class ClusterRuleConstant { | |||
public static final int FLOW_CLUSTER_STRATEGY_NORMAL = 0; | |||
public static final int FLOW_CLUSTER_STRATEGY_REF = 1; | |||
public static final int FLOW_CLUSTER_STRATEGY_BORROW_REF = 1; | |||
public static final int FLOW_THRESHOLD_AVG_LOCAL = 0; | |||
public static final int FLOW_THRESHOLD_GLOBAL = 1; | |||
@@ -34,17 +34,13 @@ public class ClusterFlowConfig { | |||
* Threshold type (average by local value or global value). | |||
*/ | |||
private int thresholdType = ClusterRuleConstant.FLOW_THRESHOLD_AVG_LOCAL; | |||
private boolean fallbackToLocalWhenFail; | |||
private boolean fallbackToLocalWhenFail = true; | |||
/** | |||
* 0: normal; 1: using reference (borrow from reference). | |||
* 0: normal. | |||
*/ | |||
private int strategy = ClusterRuleConstant.FLOW_CLUSTER_STRATEGY_NORMAL; | |||
private Long refFlowId; | |||
private int refSampleCount = 10; | |||
private double refRatio = 1d; | |||
public Long getFlowId() { | |||
return flowId; | |||
} | |||
@@ -72,33 +68,6 @@ public class ClusterFlowConfig { | |||
return this; | |||
} | |||
public Long getRefFlowId() { | |||
return refFlowId; | |||
} | |||
public ClusterFlowConfig setRefFlowId(Long refFlowId) { | |||
this.refFlowId = refFlowId; | |||
return this; | |||
} | |||
public int getRefSampleCount() { | |||
return refSampleCount; | |||
} | |||
public ClusterFlowConfig setRefSampleCount(int refSampleCount) { | |||
this.refSampleCount = refSampleCount; | |||
return this; | |||
} | |||
public double getRefRatio() { | |||
return refRatio; | |||
} | |||
public ClusterFlowConfig setRefRatio(double refRatio) { | |||
this.refRatio = refRatio; | |||
return this; | |||
} | |||
public boolean isFallbackToLocalWhenFail() { | |||
return fallbackToLocalWhenFail; | |||
} | |||
@@ -118,24 +87,15 @@ public class ClusterFlowConfig { | |||
if (thresholdType != that.thresholdType) { return false; } | |||
if (fallbackToLocalWhenFail != that.fallbackToLocalWhenFail) { return false; } | |||
if (strategy != that.strategy) { return false; } | |||
if (refSampleCount != that.refSampleCount) { return false; } | |||
if (Double.compare(that.refRatio, refRatio) != 0) { return false; } | |||
if (flowId != null ? !flowId.equals(that.flowId) : that.flowId != null) { return false; } | |||
return refFlowId != null ? refFlowId.equals(that.refFlowId) : that.refFlowId == null; | |||
return flowId != null ? flowId.equals(that.flowId) : that.flowId == null; | |||
} | |||
@Override | |||
public int hashCode() { | |||
int result; | |||
long temp; | |||
result = flowId != null ? flowId.hashCode() : 0; | |||
int result = flowId != null ? flowId.hashCode() : 0; | |||
result = 31 * result + thresholdType; | |||
result = 31 * result + (fallbackToLocalWhenFail ? 1 : 0); | |||
result = 31 * result + strategy; | |||
result = 31 * result + (refFlowId != null ? refFlowId.hashCode() : 0); | |||
result = 31 * result + refSampleCount; | |||
temp = Double.doubleToLongBits(refRatio); | |||
result = 31 * result + (int)(temp ^ (temp >>> 32)); | |||
return result; | |||
} | |||
@@ -146,9 +106,6 @@ public class ClusterFlowConfig { | |||
", thresholdType=" + thresholdType + | |||
", fallbackToLocalWhenFail=" + fallbackToLocalWhenFail + | |||
", strategy=" + strategy + | |||
", refFlowId=" + refFlowId + | |||
", refSampleCount=" + refSampleCount + | |||
", refRatio=" + refRatio + | |||
'}'; | |||
} | |||
} |
@@ -15,10 +15,12 @@ | |||
*/ | |||
package com.alibaba.csp.sentinel.slots.block.flow; | |||
import com.alibaba.csp.sentinel.cluster.ClusterTokenClient; | |||
import com.alibaba.csp.sentinel.cluster.TokenClientProvider; | |||
import com.alibaba.csp.sentinel.cluster.ClusterStateManager; | |||
import com.alibaba.csp.sentinel.cluster.server.EmbeddedClusterTokenServerProvider; | |||
import com.alibaba.csp.sentinel.cluster.client.TokenClientProvider; | |||
import com.alibaba.csp.sentinel.cluster.TokenResultStatus; | |||
import com.alibaba.csp.sentinel.cluster.TokenResult; | |||
import com.alibaba.csp.sentinel.cluster.TokenService; | |||
import com.alibaba.csp.sentinel.context.Context; | |||
import com.alibaba.csp.sentinel.log.RecordLog; | |||
import com.alibaba.csp.sentinel.node.DefaultNode; | |||
@@ -62,38 +64,6 @@ final class FlowRuleChecker { | |||
return rule.getRater().canPass(selectedNode, acquireCount); | |||
} | |||
static boolean passClusterCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, | |||
boolean prioritized) { | |||
try { | |||
ClusterTokenClient client = TokenClientProvider.getClient(); | |||
if (client != null) { | |||
TokenResult result = client.requestToken(rule.getClusterConfig().getFlowId(), acquireCount, | |||
prioritized); | |||
switch (result.getStatus()) { | |||
case TokenResultStatus.OK: | |||
return true; | |||
case TokenResultStatus.SHOULD_WAIT: | |||
// Wait for next tick. | |||
Thread.sleep(result.getWaitInMs()); | |||
return true; | |||
case TokenResultStatus.NO_RULE_EXISTS: | |||
case TokenResultStatus.BAD_REQUEST: | |||
case TokenResultStatus.FAIL: | |||
return passLocalCheck(rule, context, node, acquireCount, prioritized); | |||
case TokenResultStatus.BLOCKED: | |||
default: | |||
return false; | |||
} | |||
} | |||
// If client is absent, then fallback to local mode. | |||
} catch (Throwable ex) { | |||
RecordLog.warn("[FlowRuleChecker] Request cluster token unexpected failed", ex); | |||
} | |||
// TODO: choose whether fallback to local or inactivate the rule. | |||
// Downgrade to local flow control when token client or server for this rule is not available. | |||
return passLocalCheck(rule, context, node, acquireCount, prioritized); | |||
} | |||
static Node selectReferenceNode(FlowRule rule, Context context, DefaultNode node) { | |||
String refResource = rule.getRefResource(); | |||
int strategy = rule.getStrategy(); | |||
@@ -153,5 +123,67 @@ final class FlowRuleChecker { | |||
return null; | |||
} | |||
private static boolean passClusterCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, | |||
boolean prioritized) { | |||
try { | |||
TokenService clusterService = pickClusterService(); | |||
if (clusterService == null) { | |||
return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized); | |||
} | |||
long flowId = rule.getClusterConfig().getFlowId(); | |||
TokenResult result = clusterService.requestToken(flowId, acquireCount, prioritized); | |||
return applyTokenResult(result, rule, context, node, acquireCount, prioritized); | |||
// If client is absent, then fallback to local mode. | |||
} catch (Throwable ex) { | |||
RecordLog.warn("[FlowRuleChecker] Request cluster token unexpected failed", ex); | |||
} | |||
// Fallback to local flow control when token client or server for this rule is not available. | |||
// If fallback is not enabled, then directly pass. | |||
return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized); | |||
} | |||
private static boolean fallbackToLocalOrPass(FlowRule rule, Context context, DefaultNode node, int acquireCount, | |||
boolean prioritized) { | |||
if (rule.getClusterConfig().isFallbackToLocalWhenFail()) { | |||
return passLocalCheck(rule, context, node, acquireCount, prioritized); | |||
} else { | |||
// The rule won't be activated, just pass. | |||
return true; | |||
} | |||
} | |||
private static TokenService pickClusterService() { | |||
if (ClusterStateManager.isClient()) { | |||
return TokenClientProvider.getClient(); | |||
} | |||
if (ClusterStateManager.isServer()) { | |||
return EmbeddedClusterTokenServerProvider.getServer(); | |||
} | |||
return null; | |||
} | |||
private static boolean applyTokenResult(/*@NonNull*/ TokenResult result, FlowRule rule, Context context, DefaultNode node, | |||
int acquireCount, boolean prioritized) { | |||
switch (result.getStatus()) { | |||
case TokenResultStatus.OK: | |||
return true; | |||
case TokenResultStatus.SHOULD_WAIT: | |||
// Wait for next tick. | |||
try { | |||
Thread.sleep(result.getWaitInMs()); | |||
} catch (InterruptedException e) { | |||
e.printStackTrace(); | |||
} | |||
return true; | |||
case TokenResultStatus.NO_RULE_EXISTS: | |||
case TokenResultStatus.BAD_REQUEST: | |||
case TokenResultStatus.FAIL: | |||
return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized); | |||
case TokenResultStatus.BLOCKED: | |||
default: | |||
return false; | |||
} | |||
} | |||
private FlowRuleChecker() {} | |||
} |
@@ -189,10 +189,8 @@ public final class FlowRuleUtil { | |||
switch (rule.getStrategy()) { | |||
case ClusterRuleConstant.FLOW_CLUSTER_STRATEGY_NORMAL: | |||
return true; | |||
case ClusterRuleConstant.FLOW_CLUSTER_STRATEGY_REF: | |||
return validClusterRuleId(clusterConfig.getRefFlowId()); | |||
default: | |||
return true; | |||
return false; | |||
} | |||
} | |||
@@ -211,16 +211,16 @@ public abstract class LeapArray<T> { | |||
if (timeMillis < 0) { | |||
return null; | |||
} | |||
int idx = calculateTimeIdx(timeMillis); | |||
long previousTime = timeMillis - windowLengthInMs; | |||
long timeId = (timeMillis - windowLengthInMs) / windowLengthInMs; | |||
int idx = (int)(timeId % array.length()); | |||
timeMillis = timeMillis - windowLengthInMs; | |||
WindowWrap<T> wrap = array.get(idx); | |||
if (wrap == null || isWindowDeprecated(wrap)) { | |||
return null; | |||
} | |||
if (wrap.windowStart() + windowLengthInMs < previousTime) { | |||
if (wrap.windowStart() + windowLengthInMs < (timeMillis)) { | |||
return null; | |||
} | |||
@@ -0,0 +1,29 @@ | |||
/* | |||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||
* | |||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||
* you may not use this file except in compliance with the License. | |||
* You may obtain a copy of the License at | |||
* | |||
* http://www.apache.org/licenses/LICENSE-2.0 | |||
* | |||
* Unless required by applicable law or agreed to in writing, software | |||
* distributed under the License is distributed on an "AS IS" BASIS, | |||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
* See the License for the specific language governing permissions and | |||
* limitations under the License. | |||
*/ | |||
package com.alibaba.csp.sentinel.util.function; | |||
/** | |||
* Supplier functional interface from JDK 8. | |||
*/ | |||
public interface Supplier<T> { | |||
/** | |||
* Gets a result. | |||
* | |||
* @return a result | |||
*/ | |||
T get(); | |||
} |