diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/cluster/log/ClusterStatLogUtil.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/cluster/log/ClusterClientStatLogUtil.java similarity index 89% rename from sentinel-core/src/main/java/com/alibaba/csp/sentinel/cluster/log/ClusterStatLogUtil.java rename to sentinel-core/src/main/java/com/alibaba/csp/sentinel/cluster/log/ClusterClientStatLogUtil.java index 16f914c6..03b1655c 100644 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/cluster/log/ClusterStatLogUtil.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/cluster/log/ClusterClientStatLogUtil.java @@ -15,8 +15,6 @@ */ package com.alibaba.csp.sentinel.cluster.log; -import java.io.File; - import com.alibaba.csp.sentinel.eagleeye.EagleEye; import com.alibaba.csp.sentinel.eagleeye.StatLogger; import com.alibaba.csp.sentinel.log.LogBase; @@ -26,16 +24,16 @@ import com.alibaba.csp.sentinel.log.LogBase; * @author Eric Zhao * @since 1.4.0 */ -public final class ClusterStatLogUtil { +public final class ClusterClientStatLogUtil { - private static final String FILE_NAME = "sentinel-cluster.log"; + private static final String FILE_NAME = "sentinel-cluster-client.log"; private static StatLogger statLogger; static { String path = LogBase.getLogBaseDir() + FILE_NAME; - statLogger = EagleEye.statLoggerBuilder("sentinel-cluster-record") + statLogger = EagleEye.statLoggerBuilder("sentinel-cluster-client-record") .intervalSeconds(1) .entryDelimiter('|') .keyDelimiter(',') @@ -55,5 +53,5 @@ public final class ClusterStatLogUtil { statLogger.stat(msg).count(count); } - private ClusterStatLogUtil() {} + private ClusterClientStatLogUtil() {} } diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/ClusterRuleConstant.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/ClusterRuleConstant.java index 78cacf34..c04fa554 100644 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/ClusterRuleConstant.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/ClusterRuleConstant.java @@ -22,7 +22,7 @@ package com.alibaba.csp.sentinel.slots.block; public final class ClusterRuleConstant { public static final int FLOW_CLUSTER_STRATEGY_NORMAL = 0; - public static final int FLOW_CLUSTER_STRATEGY_REF = 1; + public static final int FLOW_CLUSTER_STRATEGY_BORROW_REF = 1; public static final int FLOW_THRESHOLD_AVG_LOCAL = 0; public static final int FLOW_THRESHOLD_GLOBAL = 1; diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/ClusterFlowConfig.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/ClusterFlowConfig.java index b12a0743..7e3c53c4 100644 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/ClusterFlowConfig.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/ClusterFlowConfig.java @@ -34,17 +34,13 @@ public class ClusterFlowConfig { * Threshold type (average by local value or global value). */ private int thresholdType = ClusterRuleConstant.FLOW_THRESHOLD_AVG_LOCAL; - private boolean fallbackToLocalWhenFail; + private boolean fallbackToLocalWhenFail = true; /** - * 0: normal; 1: using reference (borrow from reference). + * 0: normal. */ private int strategy = ClusterRuleConstant.FLOW_CLUSTER_STRATEGY_NORMAL; - private Long refFlowId; - private int refSampleCount = 10; - private double refRatio = 1d; - public Long getFlowId() { return flowId; } @@ -72,33 +68,6 @@ public class ClusterFlowConfig { return this; } - public Long getRefFlowId() { - return refFlowId; - } - - public ClusterFlowConfig setRefFlowId(Long refFlowId) { - this.refFlowId = refFlowId; - return this; - } - - public int getRefSampleCount() { - return refSampleCount; - } - - public ClusterFlowConfig setRefSampleCount(int refSampleCount) { - this.refSampleCount = refSampleCount; - return this; - } - - public double getRefRatio() { - return refRatio; - } - - public ClusterFlowConfig setRefRatio(double refRatio) { - this.refRatio = refRatio; - return this; - } - public boolean isFallbackToLocalWhenFail() { return fallbackToLocalWhenFail; } @@ -118,24 +87,15 @@ public class ClusterFlowConfig { if (thresholdType != that.thresholdType) { return false; } if (fallbackToLocalWhenFail != that.fallbackToLocalWhenFail) { return false; } if (strategy != that.strategy) { return false; } - if (refSampleCount != that.refSampleCount) { return false; } - if (Double.compare(that.refRatio, refRatio) != 0) { return false; } - if (flowId != null ? !flowId.equals(that.flowId) : that.flowId != null) { return false; } - return refFlowId != null ? refFlowId.equals(that.refFlowId) : that.refFlowId == null; + return flowId != null ? flowId.equals(that.flowId) : that.flowId == null; } @Override public int hashCode() { - int result; - long temp; - result = flowId != null ? flowId.hashCode() : 0; + int result = flowId != null ? flowId.hashCode() : 0; result = 31 * result + thresholdType; result = 31 * result + (fallbackToLocalWhenFail ? 1 : 0); result = 31 * result + strategy; - result = 31 * result + (refFlowId != null ? refFlowId.hashCode() : 0); - result = 31 * result + refSampleCount; - temp = Double.doubleToLongBits(refRatio); - result = 31 * result + (int)(temp ^ (temp >>> 32)); return result; } @@ -146,9 +106,6 @@ public class ClusterFlowConfig { ", thresholdType=" + thresholdType + ", fallbackToLocalWhenFail=" + fallbackToLocalWhenFail + ", strategy=" + strategy + - ", refFlowId=" + refFlowId + - ", refSampleCount=" + refSampleCount + - ", refRatio=" + refRatio + '}'; } } diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleChecker.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleChecker.java index 7518d7c1..08cf5dd2 100644 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleChecker.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleChecker.java @@ -15,10 +15,12 @@ */ package com.alibaba.csp.sentinel.slots.block.flow; -import com.alibaba.csp.sentinel.cluster.ClusterTokenClient; -import com.alibaba.csp.sentinel.cluster.TokenClientProvider; +import com.alibaba.csp.sentinel.cluster.ClusterStateManager; +import com.alibaba.csp.sentinel.cluster.server.EmbeddedClusterTokenServerProvider; +import com.alibaba.csp.sentinel.cluster.client.TokenClientProvider; import com.alibaba.csp.sentinel.cluster.TokenResultStatus; import com.alibaba.csp.sentinel.cluster.TokenResult; +import com.alibaba.csp.sentinel.cluster.TokenService; import com.alibaba.csp.sentinel.context.Context; import com.alibaba.csp.sentinel.log.RecordLog; import com.alibaba.csp.sentinel.node.DefaultNode; @@ -62,38 +64,6 @@ final class FlowRuleChecker { return rule.getRater().canPass(selectedNode, acquireCount); } - static boolean passClusterCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, - boolean prioritized) { - try { - ClusterTokenClient client = TokenClientProvider.getClient(); - if (client != null) { - TokenResult result = client.requestToken(rule.getClusterConfig().getFlowId(), acquireCount, - prioritized); - switch (result.getStatus()) { - case TokenResultStatus.OK: - return true; - case TokenResultStatus.SHOULD_WAIT: - // Wait for next tick. - Thread.sleep(result.getWaitInMs()); - return true; - case TokenResultStatus.NO_RULE_EXISTS: - case TokenResultStatus.BAD_REQUEST: - case TokenResultStatus.FAIL: - return passLocalCheck(rule, context, node, acquireCount, prioritized); - case TokenResultStatus.BLOCKED: - default: - return false; - } - } - // If client is absent, then fallback to local mode. - } catch (Throwable ex) { - RecordLog.warn("[FlowRuleChecker] Request cluster token unexpected failed", ex); - } - // TODO: choose whether fallback to local or inactivate the rule. - // Downgrade to local flow control when token client or server for this rule is not available. - return passLocalCheck(rule, context, node, acquireCount, prioritized); - } - static Node selectReferenceNode(FlowRule rule, Context context, DefaultNode node) { String refResource = rule.getRefResource(); int strategy = rule.getStrategy(); @@ -153,5 +123,67 @@ final class FlowRuleChecker { return null; } + private static boolean passClusterCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, + boolean prioritized) { + try { + TokenService clusterService = pickClusterService(); + if (clusterService == null) { + return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized); + } + long flowId = rule.getClusterConfig().getFlowId(); + TokenResult result = clusterService.requestToken(flowId, acquireCount, prioritized); + return applyTokenResult(result, rule, context, node, acquireCount, prioritized); + // If client is absent, then fallback to local mode. + } catch (Throwable ex) { + RecordLog.warn("[FlowRuleChecker] Request cluster token unexpected failed", ex); + } + // Fallback to local flow control when token client or server for this rule is not available. + // If fallback is not enabled, then directly pass. + return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized); + } + + private static boolean fallbackToLocalOrPass(FlowRule rule, Context context, DefaultNode node, int acquireCount, + boolean prioritized) { + if (rule.getClusterConfig().isFallbackToLocalWhenFail()) { + return passLocalCheck(rule, context, node, acquireCount, prioritized); + } else { + // The rule won't be activated, just pass. + return true; + } + } + + private static TokenService pickClusterService() { + if (ClusterStateManager.isClient()) { + return TokenClientProvider.getClient(); + } + if (ClusterStateManager.isServer()) { + return EmbeddedClusterTokenServerProvider.getServer(); + } + return null; + } + + private static boolean applyTokenResult(/*@NonNull*/ TokenResult result, FlowRule rule, Context context, DefaultNode node, + int acquireCount, boolean prioritized) { + switch (result.getStatus()) { + case TokenResultStatus.OK: + return true; + case TokenResultStatus.SHOULD_WAIT: + // Wait for next tick. + try { + Thread.sleep(result.getWaitInMs()); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return true; + case TokenResultStatus.NO_RULE_EXISTS: + case TokenResultStatus.BAD_REQUEST: + case TokenResultStatus.FAIL: + return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized); + case TokenResultStatus.BLOCKED: + default: + return false; + } + } + private FlowRuleChecker() {} } \ No newline at end of file diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleUtil.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleUtil.java index bea269f1..63d8ba28 100644 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleUtil.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowRuleUtil.java @@ -189,10 +189,8 @@ public final class FlowRuleUtil { switch (rule.getStrategy()) { case ClusterRuleConstant.FLOW_CLUSTER_STRATEGY_NORMAL: return true; - case ClusterRuleConstant.FLOW_CLUSTER_STRATEGY_REF: - return validClusterRuleId(clusterConfig.getRefFlowId()); default: - return true; + return false; } } diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/base/LeapArray.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/base/LeapArray.java index 38061dc3..d3044f3c 100755 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/base/LeapArray.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/base/LeapArray.java @@ -211,16 +211,16 @@ public abstract class LeapArray { if (timeMillis < 0) { return null; } - int idx = calculateTimeIdx(timeMillis); - - long previousTime = timeMillis - windowLengthInMs; + long timeId = (timeMillis - windowLengthInMs) / windowLengthInMs; + int idx = (int)(timeId % array.length()); + timeMillis = timeMillis - windowLengthInMs; WindowWrap wrap = array.get(idx); if (wrap == null || isWindowDeprecated(wrap)) { return null; } - if (wrap.windowStart() + windowLengthInMs < previousTime) { + if (wrap.windowStart() + windowLengthInMs < (timeMillis)) { return null; } diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/util/function/Supplier.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/util/function/Supplier.java new file mode 100644 index 00000000..02b3208c --- /dev/null +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/util/function/Supplier.java @@ -0,0 +1,29 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.util.function; + +/** + * Supplier functional interface from JDK 8. + */ +public interface Supplier { + + /** + * Gets a result. + * + * @return a result + */ + T get(); +}