From a731811d27bea054c13d4fcfc80cadac00c4c8ce Mon Sep 17 00:00:00 2001 From: Eric Zhao Date: Sun, 9 Dec 2018 21:55:30 +0800 Subject: [PATCH] Polish default cluster server module for initial work Signed-off-by: Eric Zhao --- .../cluster/flow/ClusterFlowChecker.java | 115 +------ .../cluster/flow/ClusterFlowRuleManager.java | 137 -------- .../cluster/flow/ClusterParamFlowChecker.java | 52 ++- .../flow/ClusterParamFlowRuleManager.java | 138 -------- .../cluster/flow/DefaultTokenService.java | 12 +- .../flow/rule/ClusterFlowRuleManager.java | 320 ++++++++++++++++++ .../rule/ClusterParamFlowRuleManager.java | 307 +++++++++++++++++ .../flow/rule/NamespaceFlowProperty.java | 56 +++ .../statistic/ClusterMetricStatistics.java | 10 + .../ClusterParamMetricStatistics.java | 10 + .../flow/statistic/data/ClusterFlowEvent.java | 10 + .../statistic/data/ClusterMetricBucket.java | 1 + .../flow/statistic/metric/ClusterMetric.java | 32 +- .../metric/ClusterMetricLeapArray.java | 19 +- .../statistic/metric/ClusterParamMetric.java | 19 +- .../metric/ClusterParameterLeapArray.java | 16 +- .../server/DefaultEmbeddedTokenServer.java | 61 ++++ .../cluster/server/NettyTransportServer.java | 47 ++- .../server/SentinelDefaultTokenServer.java | 142 ++++++++ .../cluster/server/TokenServiceProvider.java | 1 + .../codec/DefaultRequestEntityDecoder.java | 1 - .../codec/data/FlowRequestDataDecoder.java | 1 - .../data/ParamFlowRequestDataDecoder.java | 4 +- .../codec/data/PingRequestDataDecoder.java | 40 +++ .../codec/data/PingResponseDataWriter.java | 36 ++ .../codec/netty/NettyRequestDecoder.java | 1 - .../config/ClusterServerConfigManager.java | 311 ++++++++++++++++- .../server/config/ServerFlowConfig.java | 97 ++++++ .../server/config/ServerTransportConfig.java | 64 ++++ .../config/ServerTransportConfigObserver.java | 30 ++ .../cluster/server/connection/Connection.java | 1 + .../connection/ConnectionDescriptor.java | 69 ++++ .../server/connection/ConnectionGroup.java | 35 +- .../server/connection/ConnectionManager.java | 74 ++++ .../server/connection/ConnectionPool.java | 17 +- .../server/connection/NettyConnection.java | 1 + .../connection/ScanIdleConnectionTask.java | 16 +- .../server/handler/TokenServerHandler.java | 61 +++- .../init/DefaultClusterServerInitFunc.java | 66 ++++ .../server/log/ClusterServerStatLogUtil.java | 56 +++ .../processor/FlowRequestProcessor.java | 3 + .../processor/ParamFlowRequestProcessor.java | 3 + ...try.java => RequestProcessorProvider.java} | 35 +- .../cluster/server/util/ClusterRuleUtil.java | 1 + ....cluster.server.EmbeddedClusterTokenServer | 1 + ....cluster.server.processor.RequestProcessor | 2 + .../com.alibaba.csp.sentinel.init.InitFunc | 1 + .../sentinel/cluster/ClusterFlowTestUtil.java | 54 +++ .../cluster/flow/ClusterFlowCheckerTest.java | 79 +++++ 49 files changed, 2158 insertions(+), 507 deletions(-) delete mode 100644 sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/ClusterFlowRuleManager.java delete mode 100644 sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/ClusterParamFlowRuleManager.java create mode 100644 sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/rule/ClusterFlowRuleManager.java create mode 100644 sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/rule/ClusterParamFlowRuleManager.java create mode 100644 sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/rule/NamespaceFlowProperty.java create mode 100644 sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/DefaultEmbeddedTokenServer.java create mode 100644 sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/SentinelDefaultTokenServer.java create mode 100644 sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/codec/data/PingRequestDataDecoder.java create mode 100644 sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/codec/data/PingResponseDataWriter.java create mode 100644 sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/config/ServerFlowConfig.java create mode 100644 sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/config/ServerTransportConfig.java create mode 100644 sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/config/ServerTransportConfigObserver.java create mode 100644 sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/ConnectionDescriptor.java create mode 100644 sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/init/DefaultClusterServerInitFunc.java create mode 100644 sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/log/ClusterServerStatLogUtil.java rename sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/processor/{RequestProcessorRegistry.java => RequestProcessorProvider.java} (55%) create mode 100644 sentinel-cluster/sentinel-cluster-server-default/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.cluster.server.EmbeddedClusterTokenServer create mode 100644 sentinel-cluster/sentinel-cluster-server-default/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.cluster.server.processor.RequestProcessor create mode 100755 sentinel-cluster/sentinel-cluster-server-default/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.init.InitFunc create mode 100644 sentinel-cluster/sentinel-cluster-server-default/src/test/java/com/alibaba/csp/sentinel/cluster/ClusterFlowTestUtil.java create mode 100644 sentinel-cluster/sentinel-cluster-server-default/src/test/java/com/alibaba/csp/sentinel/cluster/flow/ClusterFlowCheckerTest.java diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/ClusterFlowChecker.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/ClusterFlowChecker.java index 836d338f..f9fb0108 100644 --- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/ClusterFlowChecker.java +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/ClusterFlowChecker.java @@ -17,111 +17,22 @@ package com.alibaba.csp.sentinel.cluster.flow; import com.alibaba.csp.sentinel.cluster.TokenResultStatus; import com.alibaba.csp.sentinel.cluster.TokenResult; +import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterFlowRuleManager; import com.alibaba.csp.sentinel.cluster.flow.statistic.ClusterMetricStatistics; import com.alibaba.csp.sentinel.cluster.server.config.ClusterServerConfigManager; import com.alibaba.csp.sentinel.cluster.flow.statistic.data.ClusterFlowEvent; import com.alibaba.csp.sentinel.cluster.flow.statistic.metric.ClusterMetric; +import com.alibaba.csp.sentinel.cluster.server.log.ClusterServerStatLogUtil; import com.alibaba.csp.sentinel.slots.block.ClusterRuleConstant; import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; -import com.alibaba.csp.sentinel.util.TimeUtil; /** + * Flow checker for cluster flow rules. + * * @author Eric Zhao * @since 1.4.0 */ -public final class ClusterFlowChecker { - - static TokenResult tryAcquireOrBorrowFromRefResource(FlowRule rule, int acquireCount, boolean prioritized) { - // 1. First try acquire its own count. - - // TokenResult ownResult = acquireClusterToken(rule, acquireCount, prioritized); - ClusterMetric metric = ClusterMetricStatistics.getMetric(rule.getClusterConfig().getFlowId()); - if (metric == null) { - return new TokenResult(TokenResultStatus.FAIL); - } - - double latestQps = metric.getAvg(ClusterFlowEvent.PASS_REQUEST); - double globalThreshold = calcGlobalThreshold(rule) * ClusterServerConfigManager.exceedCount; - double nextRemaining = globalThreshold - latestQps - acquireCount; - - if (nextRemaining >= 0) { - // TODO: checking logic and metric operation should be separated. - metric.add(ClusterFlowEvent.PASS, acquireCount); - metric.add(ClusterFlowEvent.PASS_REQUEST, 1); - if (prioritized) { - // Add prioritized pass. - metric.add(ClusterFlowEvent.OCCUPIED_PASS, acquireCount); - } - // Remaining count is cut down to a smaller integer. - return new TokenResult(TokenResultStatus.OK) - .setRemaining((int) nextRemaining) - .setWaitInMs(0); - } - - if (prioritized) { - double occupyAvg = metric.getAvg(ClusterFlowEvent.WAITING); - if (occupyAvg <= ClusterServerConfigManager.maxOccupyRatio * globalThreshold) { - int waitInMs = metric.tryOccupyNext(ClusterFlowEvent.PASS, acquireCount, globalThreshold); - if (waitInMs > 0) { - return new TokenResult(TokenResultStatus.SHOULD_WAIT) - .setRemaining(0) - .setWaitInMs(waitInMs); - } - // Or else occupy failed, should be blocked. - } - } - - // 2. If failed, try to borrow from reference resource. - - // Assume it's valid as checked before. - if (!ClusterServerConfigManager.borrowRefEnabled) { - return new TokenResult(TokenResultStatus.NOT_AVAILABLE); - } - Long refFlowId = rule.getClusterConfig().getRefFlowId(); - FlowRule refFlowRule = ClusterFlowRuleManager.getFlowRuleById(refFlowId); - if (refFlowRule == null) { - return new TokenResult(TokenResultStatus.NO_REF_RULE_EXISTS); - } - // TODO: check here - - ClusterMetric refMetric = ClusterMetricStatistics.getMetric(refFlowId); - if (refMetric == null) { - return new TokenResult(TokenResultStatus.FAIL); - } - double refOrders = refMetric.getAvg(ClusterFlowEvent.PASS); - double refQps = refMetric.getAvg(ClusterFlowEvent.PASS_REQUEST); - - double splitRatio = refQps > 0 ? refOrders / refQps : 1; - - double selfGlobalThreshold = ClusterServerConfigManager.exceedCount * calcGlobalThreshold(rule); - double refGlobalThreshold = ClusterServerConfigManager.exceedCount * calcGlobalThreshold(refFlowRule); - - long currentTime = TimeUtil.currentTimeMillis(); - long latestRefTime = 0 /*refFlowRule.clusterQps.getStableWindowStartTime()*/; - int sampleCount = 10; - - if (currentTime > latestRefTime - && (refOrders / refGlobalThreshold + 1.0d / sampleCount >= ((double)(currentTime - latestRefTime)) / 1000) - || refOrders == refGlobalThreshold) { - return blockedResult(); - } - - // double latestQps = metric.getAvg(ClusterFlowEvent.PASS); - double refRatio = rule.getClusterConfig().getRefRatio(); - - if (refOrders / splitRatio + (acquireCount + latestQps) * refRatio - <= refGlobalThreshold / splitRatio + selfGlobalThreshold * refRatio) { - metric.add(ClusterFlowEvent.PASS, acquireCount); - metric.add(ClusterFlowEvent.PASS_REQUEST, 1); - - return new TokenResult(TokenResultStatus.OK); - } - - // TODO: log here? - metric.add(ClusterFlowEvent.BLOCK, acquireCount); - - return blockedResult(); - } +final class ClusterFlowChecker { private static double calcGlobalThreshold(FlowRule rule) { double count = rule.getCount(); @@ -130,20 +41,20 @@ public final class ClusterFlowChecker { return count; case ClusterRuleConstant.FLOW_THRESHOLD_AVG_LOCAL: default: - // TODO: get real connected count grouped. - int connectedCount = 1; + int connectedCount = ClusterFlowRuleManager.getConnectedCount(rule.getClusterConfig().getFlowId()); return count * connectedCount; } } static TokenResult acquireClusterToken(/*@Valid*/ FlowRule rule, int acquireCount, boolean prioritized) { - ClusterMetric metric = ClusterMetricStatistics.getMetric(rule.getClusterConfig().getFlowId()); + Long id = rule.getClusterConfig().getFlowId(); + ClusterMetric metric = ClusterMetricStatistics.getMetric(id); if (metric == null) { return new TokenResult(TokenResultStatus.FAIL); } double latestQps = metric.getAvg(ClusterFlowEvent.PASS_REQUEST); - double globalThreshold = calcGlobalThreshold(rule) * ClusterServerConfigManager.exceedCount; + double globalThreshold = calcGlobalThreshold(rule) * ClusterServerConfigManager.getExceedCount(); double nextRemaining = globalThreshold - latestQps - acquireCount; if (nextRemaining >= 0) { @@ -160,10 +71,13 @@ public final class ClusterFlowChecker { .setWaitInMs(0); } else { if (prioritized) { + // Try to occupy incoming buckets. double occupyAvg = metric.getAvg(ClusterFlowEvent.WAITING); - if (occupyAvg <= ClusterServerConfigManager.maxOccupyRatio * globalThreshold) { + if (occupyAvg <= ClusterServerConfigManager.getMaxOccupyRatio() * globalThreshold) { int waitInMs = metric.tryOccupyNext(ClusterFlowEvent.PASS, acquireCount, globalThreshold); + // waitInMs > 0 indicates pre-occupy incoming buckets successfully. if (waitInMs > 0) { + ClusterServerStatLogUtil.log("flow|waiting|" + id); return new TokenResult(TokenResultStatus.SHOULD_WAIT) .setRemaining(0) .setWaitInMs(waitInMs); @@ -174,9 +88,12 @@ public final class ClusterFlowChecker { // Blocked. metric.add(ClusterFlowEvent.BLOCK, acquireCount); metric.add(ClusterFlowEvent.BLOCK_REQUEST, 1); + ClusterServerStatLogUtil.log("flow|block|" + id, acquireCount); + ClusterServerStatLogUtil.log("flow|block_request|" + id, 1); if (prioritized) { // Add prioritized block. metric.add(ClusterFlowEvent.OCCUPIED_BLOCK, acquireCount); + ClusterServerStatLogUtil.log("flow|occupied_block|" + id, 1); } return blockedResult(); diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/ClusterFlowRuleManager.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/ClusterFlowRuleManager.java deleted file mode 100644 index 2f45ec63..00000000 --- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/ClusterFlowRuleManager.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Copyright 1999-2018 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.csp.sentinel.cluster.flow; - -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - -import com.alibaba.csp.sentinel.cluster.flow.statistic.ClusterMetricStatistics; -import com.alibaba.csp.sentinel.cluster.flow.statistic.metric.ClusterMetric; -import com.alibaba.csp.sentinel.cluster.server.util.ClusterRuleUtil; -import com.alibaba.csp.sentinel.log.RecordLog; -import com.alibaba.csp.sentinel.property.DynamicSentinelProperty; -import com.alibaba.csp.sentinel.property.PropertyListener; -import com.alibaba.csp.sentinel.property.SentinelProperty; -import com.alibaba.csp.sentinel.slots.block.RuleConstant; -import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; -import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleUtil; -import com.alibaba.csp.sentinel.util.StringUtil; - -/** - * @author Eric Zhao - * @since 1.4.0 - */ -public final class ClusterFlowRuleManager { - - private static final Map FLOW_RULES = new ConcurrentHashMap<>(); - - private static final PropertyListener> PROPERTY_LISTENER = new FlowRulePropertyListener(); - private static SentinelProperty> currentProperty = new DynamicSentinelProperty<>(); - - static { - currentProperty.addListener(PROPERTY_LISTENER); - } - - /** - * Listen to the {@link SentinelProperty} for {@link FlowRule}s. - * The property is the source of cluster {@link FlowRule}s. - * - * @param property the property to listen. - */ - public static void register2Property(SentinelProperty> property) { - synchronized (PROPERTY_LISTENER) { - RecordLog.info("[ClusterFlowRuleManager] Registering new property to cluster flow rule manager"); - currentProperty.removeListener(PROPERTY_LISTENER); - property.addListener(PROPERTY_LISTENER); - currentProperty = property; - } - } - - public static FlowRule getFlowRuleById(Long id) { - if (!ClusterRuleUtil.validId(id)) { - return null; - } - return FLOW_RULES.get(id); - } - - private static Map buildClusterFlowRuleMap(List list) { - Map ruleMap = new ConcurrentHashMap<>(); - if (list == null || list.isEmpty()) { - return ruleMap; - } - - for (FlowRule rule : list) { - if (!rule.isClusterMode()) { - continue; - } - if (!FlowRuleUtil.isValidRule(rule)) { - RecordLog.warn( - "[ClusterFlowRuleManager] Ignoring invalid flow rule when loading new flow rules: " + rule); - continue; - } - if (StringUtil.isBlank(rule.getLimitApp())) { - rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT); - } - - // Flow id should not be null after filtered. - Long flowId = rule.getClusterConfig().getFlowId(); - if (flowId == null) { - continue; - } - ruleMap.put(flowId, rule); - - // Prepare cluster metric from valid flow ID. - ClusterMetricStatistics.putMetricIfAbsent(flowId, new ClusterMetric(100, 1)); - } - - // Cleanup unused cluster metrics. - Set previousSet = FLOW_RULES.keySet(); - for (Long id : previousSet) { - if (!ruleMap.containsKey(id)) { - ClusterMetricStatistics.removeMetric(id); - } - } - - return ruleMap; - } - - private static final class FlowRulePropertyListener implements PropertyListener> { - - @Override - public void configUpdate(List conf) { - Map rules = buildClusterFlowRuleMap(conf); - if (rules != null) { - FLOW_RULES.clear(); - FLOW_RULES.putAll(rules); - } - RecordLog.info("[ClusterFlowRuleManager] Cluster flow rules received: " + FLOW_RULES); - } - - @Override - public void configLoad(List conf) { - Map rules = buildClusterFlowRuleMap(conf); - if (rules != null) { - FLOW_RULES.clear(); - FLOW_RULES.putAll(rules); - } - RecordLog.info("[ClusterFlowRuleManager] Cluster flow rules loaded: " + FLOW_RULES); - } - } - - private ClusterFlowRuleManager() {} -} diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/ClusterParamFlowChecker.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/ClusterParamFlowChecker.java index 04d09668..798ecf84 100644 --- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/ClusterParamFlowChecker.java +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/ClusterParamFlowChecker.java @@ -19,30 +19,36 @@ import java.util.Collection; import com.alibaba.csp.sentinel.cluster.TokenResult; import com.alibaba.csp.sentinel.cluster.TokenResultStatus; +import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterParamFlowRuleManager; import com.alibaba.csp.sentinel.cluster.flow.statistic.ClusterParamMetricStatistics; import com.alibaba.csp.sentinel.cluster.flow.statistic.metric.ClusterParamMetric; +import com.alibaba.csp.sentinel.cluster.server.log.ClusterServerStatLogUtil; import com.alibaba.csp.sentinel.slots.block.ClusterRuleConstant; import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowRule; /** + * @author jialiang.linjl * @author Eric Zhao + * @since 1.4.0 */ public final class ClusterParamFlowChecker { static TokenResult acquireClusterToken(ParamFlowRule rule, int count, Collection values) { - ClusterParamMetric metric = ClusterParamMetricStatistics.getMetric(rule.getClusterConfig().getFlowId()); + Long id = rule.getClusterConfig().getFlowId(); + ClusterParamMetric metric = ClusterParamMetricStatistics.getMetric(id); if (metric == null) { // Unexpected state, return FAIL. return new TokenResult(TokenResultStatus.FAIL); } + double remaining = -1; boolean hasPassed = true; Object blockObject = null; for (Object value : values) { - // TODO: origin is int * int, but current double! - double curCount = metric.getAvg(value); - - double threshold = calcGlobalThreshold(rule); - if (++curCount > threshold) { + double latestQps = metric.getAvg(value); + double threshold = calcGlobalThreshold(rule, value); + double nextRemaining = threshold - latestQps - count; + remaining = nextRemaining; + if (nextRemaining < 0) { hasPassed = false; blockObject = value; break; @@ -53,30 +59,50 @@ public final class ClusterParamFlowChecker { for (Object value : values) { metric.addValue(value, count); } + ClusterServerStatLogUtil.log(String.format("param|pass|%d", id)); } else { - // TODO: log here? + ClusterServerStatLogUtil.log(String.format("param|block|%d|%s", id, blockObject)); + } + if (values.size() > 1) { + // Remaining field is unsupported for multi-values. + remaining = -1; } - return hasPassed ? newRawResponse(TokenResultStatus.OK): newRawResponse(TokenResultStatus.BLOCKED); + return hasPassed ? newPassResponse((int)remaining): newBlockResponse(); + } + + private static TokenResult newPassResponse(int remaining) { + return new TokenResult(TokenResultStatus.OK) + .setRemaining(remaining) + .setWaitInMs(0); } - private static TokenResult newRawResponse(int status) { - return new TokenResult(status) + private static TokenResult newBlockResponse() { + return new TokenResult(TokenResultStatus.BLOCKED) .setRemaining(0) .setWaitInMs(0); } - private static double calcGlobalThreshold(ParamFlowRule rule) { - double count = rule.getCount(); + private static double calcGlobalThreshold(ParamFlowRule rule, Object value) { + double count = getRawThreshold(rule, value); switch (rule.getClusterConfig().getThresholdType()) { case ClusterRuleConstant.FLOW_THRESHOLD_GLOBAL: return count; case ClusterRuleConstant.FLOW_THRESHOLD_AVG_LOCAL: default: - int connectedCount = 1; // TODO: get real connected count grouped. + int connectedCount = ClusterParamFlowRuleManager.getConnectedCount(rule.getClusterConfig().getFlowId()); return count * connectedCount; } } + private static double getRawThreshold(ParamFlowRule rule, Object value) { + Integer itemCount = rule.retrieveExclusiveItemCount(value); + if (itemCount == null) { + return rule.getCount(); + } else { + return itemCount; + } + } + private ClusterParamFlowChecker() {} } diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/ClusterParamFlowRuleManager.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/ClusterParamFlowRuleManager.java deleted file mode 100644 index c0a4278f..00000000 --- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/ClusterParamFlowRuleManager.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Copyright 1999-2018 Alibaba Group Holding Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.csp.sentinel.cluster.flow; - -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - -import com.alibaba.csp.sentinel.cluster.flow.statistic.ClusterParamMetricStatistics; -import com.alibaba.csp.sentinel.cluster.flow.statistic.metric.ClusterParamMetric; -import com.alibaba.csp.sentinel.cluster.server.util.ClusterRuleUtil; -import com.alibaba.csp.sentinel.log.RecordLog; -import com.alibaba.csp.sentinel.property.DynamicSentinelProperty; -import com.alibaba.csp.sentinel.property.PropertyListener; -import com.alibaba.csp.sentinel.property.SentinelProperty; -import com.alibaba.csp.sentinel.slots.block.RuleConstant; -import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowRule; -import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowRuleUtil; -import com.alibaba.csp.sentinel.util.StringUtil; - -/** - * @author Eric Zhao - * @since 1.4.0 - */ -public final class ClusterParamFlowRuleManager { - - private static final Map PARAM_RULES = new ConcurrentHashMap<>(); - - private final static RulePropertyListener PROPERTY_LISTENER = new RulePropertyListener(); - private static SentinelProperty> currentProperty - = new DynamicSentinelProperty>(); - - static { - currentProperty.addListener(PROPERTY_LISTENER); - } - - /** - * Listen to the {@link SentinelProperty} for {@link ParamFlowRule}s. - * The property is the source of {@link ParamFlowRule}s. - * - * @param property the property to listen - */ - public static void register2Property(SentinelProperty> property) { - synchronized (PROPERTY_LISTENER) { - currentProperty.removeListener(PROPERTY_LISTENER); - property.addListener(PROPERTY_LISTENER); - currentProperty = property; - RecordLog.info("[ClusterParamFlowRuleManager] New property has been registered to cluster param rule manager"); - } - } - - public static ParamFlowRule getParamFlowRuleById(Long id) { - if (!ClusterRuleUtil.validId(id)) { - return null; - } - return PARAM_RULES.get(id); - } - - static class RulePropertyListener implements PropertyListener> { - - @Override - public void configUpdate(List conf) { - Map rules = buildClusterRuleMap(conf); - if (rules != null) { - PARAM_RULES.clear(); - PARAM_RULES.putAll(rules); - } - RecordLog.info("[ClusterFlowRuleManager] Cluster param flow rules received: " + PARAM_RULES); - } - - @Override - public void configLoad(List conf) { - Map rules = buildClusterRuleMap(conf); - if (rules != null) { - PARAM_RULES.clear(); - PARAM_RULES.putAll(rules); - } - RecordLog.info("[ClusterFlowRuleManager] Cluster param flow rules received: " + PARAM_RULES); - } - } - - private static Map buildClusterRuleMap(List list) { - Map ruleMap = new ConcurrentHashMap<>(); - if (list == null || list.isEmpty()) { - return ruleMap; - } - - for (ParamFlowRule rule : list) { - if (!rule.isClusterMode()) { - continue; - } - if (!ParamFlowRuleUtil.isValidRule(rule)) { - RecordLog.warn( - "[ClusterParamFlowRuleManager] Ignoring invalid param flow rule when loading new flow rules: " + rule); - continue; - } - if (StringUtil.isBlank(rule.getLimitApp())) { - rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT); - } - - // Flow id should not be null after filtered. - Long flowId = rule.getClusterConfig().getFlowId(); - if (flowId == null) { - continue; - } - ruleMap.put(flowId, rule); - - // Prepare cluster metric from valid flow ID. - ClusterParamMetricStatistics.putMetricIfAbsent(flowId, new ClusterParamMetric(100, 1)); - } - - // Cleanup unused cluster metrics. - Set previousSet = PARAM_RULES.keySet(); - for (Long id : previousSet) { - if (!ruleMap.containsKey(id)) { - ClusterParamMetricStatistics.removeMetric(id); - } - } - - return ruleMap; - } - - private ClusterParamFlowRuleManager() {} -} diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/DefaultTokenService.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/DefaultTokenService.java index f12a6d64..2953a135 100644 --- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/DefaultTokenService.java +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/DefaultTokenService.java @@ -20,7 +20,8 @@ import java.util.Collection; import com.alibaba.csp.sentinel.cluster.TokenResultStatus; import com.alibaba.csp.sentinel.cluster.TokenResult; import com.alibaba.csp.sentinel.cluster.TokenService; -import com.alibaba.csp.sentinel.slots.block.ClusterRuleConstant; +import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterFlowRuleManager; +import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterParamFlowRuleManager; import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowRule; @@ -42,24 +43,17 @@ public class DefaultTokenService implements TokenService { if (rule == null) { return new TokenResult(TokenResultStatus.NO_RULE_EXISTS); } - if (isUsingReference(rule)) { - return ClusterFlowChecker.tryAcquireOrBorrowFromRefResource(rule, acquireCount, prioritized); - } return ClusterFlowChecker.acquireClusterToken(rule, acquireCount, prioritized); } - private boolean isUsingReference(FlowRule rule) { - return rule.getClusterConfig().getStrategy() == ClusterRuleConstant.FLOW_CLUSTER_STRATEGY_REF; - } - @Override public TokenResult requestParamToken(Long ruleId, int acquireCount, Collection params) { if (notValidRequest(ruleId, acquireCount) || params == null || params.isEmpty()) { return badRequest(); } // The rule should be valid. - ParamFlowRule rule = ClusterParamFlowRuleManager.getParamFlowRuleById(ruleId); + ParamFlowRule rule = ClusterParamFlowRuleManager.getParamRuleById(ruleId); if (rule == null) { return new TokenResult(TokenResultStatus.NO_RULE_EXISTS); } diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/rule/ClusterFlowRuleManager.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/rule/ClusterFlowRuleManager.java new file mode 100644 index 00000000..27b6853b --- /dev/null +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/rule/ClusterFlowRuleManager.java @@ -0,0 +1,320 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.cluster.flow.rule; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import com.alibaba.csp.sentinel.cluster.flow.statistic.ClusterMetricStatistics; +import com.alibaba.csp.sentinel.cluster.flow.statistic.metric.ClusterMetric; +import com.alibaba.csp.sentinel.cluster.server.ServerConstants; +import com.alibaba.csp.sentinel.cluster.server.config.ClusterServerConfigManager; +import com.alibaba.csp.sentinel.cluster.server.connection.ConnectionManager; +import com.alibaba.csp.sentinel.cluster.server.util.ClusterRuleUtil; +import com.alibaba.csp.sentinel.log.RecordLog; +import com.alibaba.csp.sentinel.property.DynamicSentinelProperty; +import com.alibaba.csp.sentinel.property.PropertyListener; +import com.alibaba.csp.sentinel.property.SentinelProperty; +import com.alibaba.csp.sentinel.slots.block.RuleConstant; +import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; +import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleUtil; +import com.alibaba.csp.sentinel.util.AssertUtil; +import com.alibaba.csp.sentinel.util.StringUtil; +import com.alibaba.csp.sentinel.util.function.Function; +import com.alibaba.csp.sentinel.util.function.Predicate; + +/** + * Manager for cluster flow rules. + * + * @author Eric Zhao + * @since 1.4.0 + */ +public final class ClusterFlowRuleManager { + + /** + * The default cluster flow rule property supplier that creates a new dynamic property + * for a specific namespace to do rule management manually. + */ + public static final Function>> DEFAULT_PROPERTY_SUPPLIER = + new Function>>() { + @Override + public SentinelProperty> apply(String namespace) { + return new DynamicSentinelProperty<>(); + } + }; + + /** + * (flowId, clusterRule) + */ + private static final Map FLOW_RULES = new ConcurrentHashMap<>(); + /** + * (namespace, [flowId...]) + */ + private static final Map> NAMESPACE_FLOW_ID_MAP = new ConcurrentHashMap<>(); + /** + *

This map (flowId, namespace) is used for getting connected count + * when checking a specific rule in {@code ruleId}:

+ * + *
+     * ruleId -> namespace -> connection group -> connected count
+     * 
+ */ + private static final Map FLOW_NAMESPACE_MAP = new ConcurrentHashMap<>(); + + /** + * (namespace, property-listener wrapper) + */ + private static final Map> PROPERTY_MAP = new ConcurrentHashMap<>(); + /** + * Cluster flow rule property supplier for a specific namespace. + */ + private static volatile Function>> propertySupplier + = DEFAULT_PROPERTY_SUPPLIER; + + private static final Object UPDATE_LOCK = new Object(); + + static { + initDefaultProperty(); + } + + private static void initDefaultProperty() { + // The server should always support default namespace, + // so register a default property for default namespace. + SentinelProperty> defaultProperty = new DynamicSentinelProperty<>(); + String defaultNamespace = ServerConstants.DEFAULT_NAMESPACE; + registerPropertyInternal(defaultNamespace, defaultProperty); + } + + public static void setPropertySupplier(Function>> propertySupplier) { + ClusterFlowRuleManager.propertySupplier = propertySupplier; + } + + /** + * Listen to the {@link SentinelProperty} for cluster {@link FlowRule}s. + * The property is the source of cluster {@link FlowRule}s for a specific namespace. + * + * @param namespace namespace to register + */ + public static void register2Property(String namespace) { + AssertUtil.notEmpty(namespace, "namespace cannot be empty"); + if (propertySupplier == null) { + RecordLog.warn( + "[ClusterFlowRuleManager] Cluster flow property supplier is absent, cannot register property"); + return; + } + SentinelProperty> property = propertySupplier.apply(namespace); + if (property == null) { + RecordLog.warn( + "[ClusterFlowRuleManager] Wrong created property from cluster flow property supplier, ignoring"); + return; + } + synchronized (UPDATE_LOCK) { + RecordLog.info("[ClusterFlowRuleManager] Registering new property to cluster flow rule manager" + + " for namespace <{0}>", namespace); + registerPropertyInternal(namespace, property); + } + } + + /** + * Listen to the {@link SentinelProperty} for cluster {@link FlowRule}s if current property for namespace is absent. + * The property is the source of cluster {@link FlowRule}s for a specific namespace. + * + * @param namespace namespace to register + */ + public static void registerPropertyIfAbsent(String namespace) { + AssertUtil.notEmpty(namespace, "namespace cannot be empty"); + if (!PROPERTY_MAP.containsKey(namespace)) { + synchronized (UPDATE_LOCK) { + if (!PROPERTY_MAP.containsKey(namespace)) { + register2Property(namespace); + } + } + } + } + + private static void registerPropertyInternal(/*@NonNull*/ String namespace, /*@Valid*/ + SentinelProperty> property) { + NamespaceFlowProperty oldProperty = PROPERTY_MAP.get(namespace); + if (oldProperty != null) { + oldProperty.getProperty().removeListener(oldProperty.getListener()); + } + PropertyListener> listener = new FlowRulePropertyListener(namespace); + property.addListener(listener); + PROPERTY_MAP.put(namespace, new NamespaceFlowProperty<>(namespace, property, listener)); + Set flowIdSet = NAMESPACE_FLOW_ID_MAP.get(namespace); + if (flowIdSet == null) { + resetNamespaceFlowIdMapFor(namespace); + } + } + + public static void removeProperty(String namespace) { + AssertUtil.notEmpty(namespace, "namespace cannot be empty"); + synchronized (UPDATE_LOCK) { + NamespaceFlowProperty property = PROPERTY_MAP.get(namespace); + if (property != null) { + property.getProperty().removeListener(property.getListener()); + PROPERTY_MAP.remove(namespace); + } + RecordLog.info("[ClusterFlowRuleManager] Removing property from cluster flow rule manager" + + " for namespace <{0}>", namespace); + } + } + + private static void removePropertyListeners() { + for (NamespaceFlowProperty property : PROPERTY_MAP.values()) { + property.getProperty().removeListener(property.getListener()); + } + } + + private static void restorePropertyListeners() { + for (NamespaceFlowProperty p : PROPERTY_MAP.values()) { + p.getProperty().removeListener(p.getListener()); + p.getProperty().addListener(p.getListener()); + } + } + + public static FlowRule getFlowRuleById(Long id) { + if (!ClusterRuleUtil.validId(id)) { + return null; + } + return FLOW_RULES.get(id); + } + + private static void resetNamespaceFlowIdMapFor(/*@Valid*/ String namespace) { + NAMESPACE_FLOW_ID_MAP.put(namespace, new HashSet()); + } + + private static void clearAndResetRulesFor(/*@Valid*/ String namespace) { + Set flowIdSet = NAMESPACE_FLOW_ID_MAP.get(namespace); + if (flowIdSet != null && !flowIdSet.isEmpty()) { + for (Long flowId : flowIdSet) { + FLOW_RULES.remove(flowId); + FLOW_NAMESPACE_MAP.remove(flowId); + } + flowIdSet.clear(); + } else { + resetNamespaceFlowIdMapFor(namespace); + } + } + + private static void clearAndResetRulesConditional(/*@Valid*/ String namespace, Predicate predicate) { + Set oldIdSet = NAMESPACE_FLOW_ID_MAP.get(namespace); + if (oldIdSet != null && !oldIdSet.isEmpty()) { + for (Long flowId : oldIdSet) { + if (predicate.test(flowId)) { + FLOW_RULES.remove(flowId); + FLOW_NAMESPACE_MAP.remove(flowId); + ClusterMetricStatistics.removeMetric(flowId); + } + } + oldIdSet.clear(); + } + } + + /** + * Get connected count for associated namespace of given {@code flowId}. + * + * @param flowId unique flow ID + * @return connected count + */ + public static int getConnectedCount(long flowId) { + if (flowId <= 0) { + return 0; + } + String namespace = FLOW_NAMESPACE_MAP.get(flowId); + if (namespace == null) { + return 0; + } + return ConnectionManager.getConnectedCount(namespace); + } + + private static void applyClusterFlowRule(List list, /*@Valid*/ String namespace) { + if (list == null || list.isEmpty()) { + clearAndResetRulesFor(namespace); + return; + } + final ConcurrentHashMap ruleMap = new ConcurrentHashMap<>(); + + Set flowIdSet = new HashSet<>(); + + for (FlowRule rule : list) { + if (!rule.isClusterMode()) { + continue; + } + if (!FlowRuleUtil.isValidRule(rule)) { + RecordLog.warn( + "[ClusterFlowRuleManager] Ignoring invalid flow rule when loading new flow rules: " + rule); + continue; + } + if (StringUtil.isBlank(rule.getLimitApp())) { + rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT); + } + + // Flow id should not be null after filtered. + Long flowId = rule.getClusterConfig().getFlowId(); + if (flowId == null) { + continue; + } + ruleMap.put(flowId, rule); + FLOW_NAMESPACE_MAP.put(flowId, namespace); + flowIdSet.add(flowId); + + // Prepare cluster metric from valid flow ID. + ClusterMetricStatistics.putMetricIfAbsent(flowId, + new ClusterMetric(ClusterServerConfigManager.getSampleCount(), + ClusterServerConfigManager.getIntervalMs())); + } + + // Cleanup unused cluster metrics. + clearAndResetRulesConditional(namespace, new Predicate() { + @Override + public boolean test(Long flowId) { + return !ruleMap.containsKey(flowId); + } + }); + + FLOW_RULES.putAll(ruleMap); + NAMESPACE_FLOW_ID_MAP.put(namespace, flowIdSet); + } + + private static final class FlowRulePropertyListener implements PropertyListener> { + + private final String namespace; + + public FlowRulePropertyListener(String namespace) { + this.namespace = namespace; + } + + @Override + public synchronized void configUpdate(List conf) { + applyClusterFlowRule(conf, namespace); + RecordLog.info("[ClusterFlowRuleManager] Cluster flow rules received for namespace <{0}>: {1}", + namespace, FLOW_RULES); + } + + @Override + public synchronized void configLoad(List conf) { + applyClusterFlowRule(conf, namespace); + RecordLog.info("[ClusterFlowRuleManager] Cluster flow rules loaded for namespace <{0}>: {1}", + namespace, FLOW_RULES); + } + } + + private ClusterFlowRuleManager() {} +} diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/rule/ClusterParamFlowRuleManager.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/rule/ClusterParamFlowRuleManager.java new file mode 100644 index 00000000..d6b255bc --- /dev/null +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/rule/ClusterParamFlowRuleManager.java @@ -0,0 +1,307 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.cluster.flow.rule; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import com.alibaba.csp.sentinel.cluster.flow.statistic.ClusterParamMetricStatistics; +import com.alibaba.csp.sentinel.cluster.flow.statistic.metric.ClusterParamMetric; +import com.alibaba.csp.sentinel.cluster.server.ServerConstants; +import com.alibaba.csp.sentinel.cluster.server.config.ClusterServerConfigManager; +import com.alibaba.csp.sentinel.cluster.server.connection.ConnectionManager; +import com.alibaba.csp.sentinel.cluster.server.util.ClusterRuleUtil; +import com.alibaba.csp.sentinel.log.RecordLog; +import com.alibaba.csp.sentinel.property.DynamicSentinelProperty; +import com.alibaba.csp.sentinel.property.PropertyListener; +import com.alibaba.csp.sentinel.property.SentinelProperty; +import com.alibaba.csp.sentinel.slots.block.RuleConstant; +import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowRule; +import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowRuleUtil; +import com.alibaba.csp.sentinel.util.AssertUtil; +import com.alibaba.csp.sentinel.util.StringUtil; +import com.alibaba.csp.sentinel.util.function.Function; +import com.alibaba.csp.sentinel.util.function.Predicate; + +/** + * Manager for cluster parameter flow rules. + * + * @author Eric Zhao + * @since 1.4.0 + */ +public final class ClusterParamFlowRuleManager { + + /** + * The default cluster parameter flow rule property supplier that creates a new + * dynamic property for a specific namespace to manually do rule management. + */ + public static final Function>> DEFAULT_PROPERTY_SUPPLIER = + new Function>>() { + @Override + public SentinelProperty> apply(String namespace) { + return new DynamicSentinelProperty<>(); + } + }; + + /** + * (id, clusterParamRule) + */ + private static final Map PARAM_RULES = new ConcurrentHashMap<>(); + /** + * (namespace, [flowId...]) + */ + private static final Map> NAMESPACE_FLOW_ID_MAP = new ConcurrentHashMap<>(); + /** + * (flowId, namespace) + */ + private static final Map FLOW_NAMESPACE_MAP = new ConcurrentHashMap<>(); + + /** + * (namespace, property-listener wrapper) + */ + private static final Map> PROPERTY_MAP = new ConcurrentHashMap<>(); + /** + * Cluster parameter flow rule property supplier for a specific namespace. + */ + private static volatile Function>> propertySupplier + = DEFAULT_PROPERTY_SUPPLIER; + + private static final Object UPDATE_LOCK = new Object(); + + static { + initDefaultProperty(); + } + + private static void initDefaultProperty() { + SentinelProperty> defaultProperty = new DynamicSentinelProperty<>(); + String defaultNamespace = ServerConstants.DEFAULT_NAMESPACE; + registerPropertyInternal(defaultNamespace, defaultProperty); + } + + public static void setPropertySupplier( + Function>> propertySupplier) { + ClusterParamFlowRuleManager.propertySupplier = propertySupplier; + } + + /** + * Listen to the {@link SentinelProperty} for cluster {@link ParamFlowRule}s. + * The property is the source of cluster {@link ParamFlowRule}s for a specific namespace. + * + * @param namespace namespace to register + */ + public static void register2Property(String namespace) { + AssertUtil.notEmpty(namespace, "namespace cannot be empty"); + if (propertySupplier == null) { + RecordLog.warn( + "[ClusterParamFlowRuleManager] Cluster param rule property supplier is absent, cannot register " + + "property"); + return; + } + SentinelProperty> property = propertySupplier.apply(namespace); + if (property == null) { + RecordLog.warn( + "[ClusterParamFlowRuleManager] Wrong created property from cluster param rule property supplier, " + + "ignoring"); + return; + } + synchronized (UPDATE_LOCK) { + RecordLog.info("[ClusterParamFlowRuleManager] Registering new property to cluster param rule manager" + + " for namespace <{0}>", namespace); + registerPropertyInternal(namespace, property); + } + } + + public static void registerPropertyIfAbsent(String namespace) { + AssertUtil.notEmpty(namespace, "namespace cannot be empty"); + if (!PROPERTY_MAP.containsKey(namespace)) { + synchronized (UPDATE_LOCK) { + if (!PROPERTY_MAP.containsKey(namespace)) { + register2Property(namespace); + } + } + } + } + + private static void registerPropertyInternal(/*@NonNull*/ String namespace, /*@Valid*/ + SentinelProperty> property) { + NamespaceFlowProperty oldProperty = PROPERTY_MAP.get(namespace); + if (oldProperty != null) { + oldProperty.getProperty().removeListener(oldProperty.getListener()); + } + PropertyListener> listener = new ParamRulePropertyListener(namespace); + property.addListener(listener); + PROPERTY_MAP.put(namespace, new NamespaceFlowProperty<>(namespace, property, listener)); + Set flowIdSet = NAMESPACE_FLOW_ID_MAP.get(namespace); + if (flowIdSet == null) { + resetNamespaceFlowIdMapFor(namespace); + } + } + + public static void removeProperty(String namespace) { + AssertUtil.notEmpty(namespace, "namespace cannot be empty"); + synchronized (UPDATE_LOCK) { + NamespaceFlowProperty property = PROPERTY_MAP.get(namespace); + if (property != null) { + property.getProperty().removeListener(property.getListener()); + PROPERTY_MAP.remove(namespace); + } + RecordLog.info("[ClusterParamFlowRuleManager] Removing property from cluster flow rule manager" + + " for namespace <{0}>", namespace); + } + } + + private static void removePropertyListeners() { + for (NamespaceFlowProperty property : PROPERTY_MAP.values()) { + property.getProperty().removeListener(property.getListener()); + } + } + + private static void restorePropertyListeners() { + for (NamespaceFlowProperty p : PROPERTY_MAP.values()) { + p.getProperty().removeListener(p.getListener()); + p.getProperty().addListener(p.getListener()); + } + } + + private static void resetNamespaceFlowIdMapFor(/*@Valid*/ String namespace) { + NAMESPACE_FLOW_ID_MAP.put(namespace, new HashSet()); + } + + private static void clearAndResetRulesFor(/*@Valid*/ String namespace) { + Set flowIdSet = NAMESPACE_FLOW_ID_MAP.get(namespace); + if (flowIdSet != null && !flowIdSet.isEmpty()) { + for (Long flowId : flowIdSet) { + PARAM_RULES.remove(flowId); + FLOW_NAMESPACE_MAP.remove(flowId); + } + flowIdSet.clear(); + } else { + resetNamespaceFlowIdMapFor(namespace); + } + } + + private static void clearAndResetRulesConditional(/*@Valid*/ String namespace, Predicate predicate) { + Set oldIdSet = NAMESPACE_FLOW_ID_MAP.get(namespace); + if (oldIdSet != null && !oldIdSet.isEmpty()) { + for (Long flowId : oldIdSet) { + if (predicate.test(flowId)) { + PARAM_RULES.remove(flowId); + FLOW_NAMESPACE_MAP.remove(flowId); + ClusterParamMetricStatistics.removeMetric(flowId); + } + } + oldIdSet.clear(); + } + } + + public static ParamFlowRule getParamRuleById(Long id) { + if (!ClusterRuleUtil.validId(id)) { + return null; + } + return PARAM_RULES.get(id); + } + + public static int getConnectedCount(long flowId) { + if (flowId <= 0) { + return 0; + } + String namespace = FLOW_NAMESPACE_MAP.get(flowId); + if (namespace == null) { + return 0; + } + return ConnectionManager.getConnectedCount(namespace); + } + + private static class ParamRulePropertyListener implements PropertyListener> { + + private final String namespace; + + public ParamRulePropertyListener(String namespace) { + this.namespace = namespace; + } + + @Override + public void configLoad(List conf) { + applyClusterParamRules(conf, namespace); + RecordLog.info("[ClusterParamFlowRuleManager] Cluster parameter rules loaded for namespace <{0}>: {1}", + namespace, PARAM_RULES); + } + + @Override + public void configUpdate(List conf) { + applyClusterParamRules(conf, namespace); + RecordLog.info("[ClusterParamFlowRuleManager] Cluster parameter rules received for namespace <{0}>: {1}", + namespace, PARAM_RULES); + } + } + + private static void applyClusterParamRules(List list, /*@Valid*/ String namespace) { + if (list == null || list.isEmpty()) { + clearAndResetRulesFor(namespace); + return; + } + final ConcurrentHashMap ruleMap = new ConcurrentHashMap<>(); + + Set flowIdSet = new HashSet<>(); + + for (ParamFlowRule rule : list) { + if (!rule.isClusterMode()) { + continue; + } + if (!ParamFlowRuleUtil.isValidRule(rule)) { + RecordLog.warn( + "[ClusterParamFlowRuleManager] Ignoring invalid param flow rule when loading new flow rules: " + + rule); + continue; + } + if (StringUtil.isBlank(rule.getLimitApp())) { + rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT); + } + + ParamFlowRuleUtil.fillExceptionFlowItems(rule); + + // Flow id should not be null after filtered. + Long flowId = rule.getClusterConfig().getFlowId(); + if (flowId == null) { + continue; + } + ruleMap.put(flowId, rule); + FLOW_NAMESPACE_MAP.put(flowId, namespace); + flowIdSet.add(flowId); + + // Prepare cluster parameter metric from valid rule ID. + ClusterParamMetricStatistics.putMetricIfAbsent(flowId, + new ClusterParamMetric(ClusterServerConfigManager.getSampleCount(), + ClusterServerConfigManager.getIntervalMs())); + } + + // Cleanup unused cluster parameter metrics. + clearAndResetRulesConditional(namespace, new Predicate() { + @Override + public boolean test(Long flowId) { + return !ruleMap.containsKey(flowId); + } + }); + + PARAM_RULES.putAll(ruleMap); + NAMESPACE_FLOW_ID_MAP.put(namespace, flowIdSet); + } + + private ClusterParamFlowRuleManager() {} +} diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/rule/NamespaceFlowProperty.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/rule/NamespaceFlowProperty.java new file mode 100644 index 00000000..37732b43 --- /dev/null +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/rule/NamespaceFlowProperty.java @@ -0,0 +1,56 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.cluster.flow.rule; + +import java.util.List; + +import com.alibaba.csp.sentinel.property.PropertyListener; +import com.alibaba.csp.sentinel.property.SentinelProperty; + +/** + * A property wrapper for list of rules of a given namespace. + * This is useful for auto-management of the property and listener. + * + * @param type of the rule + * @author Eric Zhao + * @since 1.4.0 + */ +class NamespaceFlowProperty { + + private final String namespace; + private final SentinelProperty> property; + private final PropertyListener> listener; + + public NamespaceFlowProperty(String namespace, + SentinelProperty> property, + PropertyListener> listener) { + this.namespace = namespace; + this.property = property; + this.listener = listener; + } + + public SentinelProperty> getProperty() { + return property; + } + + public String getNamespace() { + return namespace; + } + + public PropertyListener> getListener() { + return listener; + } +} diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/ClusterMetricStatistics.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/ClusterMetricStatistics.java index 155cc32d..e574f7dc 100644 --- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/ClusterMetricStatistics.java +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/ClusterMetricStatistics.java @@ -16,9 +16,11 @@ package com.alibaba.csp.sentinel.cluster.flow.statistic; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import com.alibaba.csp.sentinel.cluster.flow.statistic.metric.ClusterMetric; +import com.alibaba.csp.sentinel.cluster.server.config.ClusterServerConfigManager; import com.alibaba.csp.sentinel.util.AssertUtil; /** @@ -55,5 +57,13 @@ public final class ClusterMetricStatistics { return METRIC_MAP.get(id); } + public static void resetFlowMetrics() { + Set keySet = METRIC_MAP.keySet(); + for (Long id : keySet) { + METRIC_MAP.put(id, new ClusterMetric(ClusterServerConfigManager.getSampleCount(), + ClusterServerConfigManager.getIntervalMs())); + } + } + private ClusterMetricStatistics() {} } diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/ClusterParamMetricStatistics.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/ClusterParamMetricStatistics.java index 73445cda..73632493 100644 --- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/ClusterParamMetricStatistics.java +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/ClusterParamMetricStatistics.java @@ -16,9 +16,11 @@ package com.alibaba.csp.sentinel.cluster.flow.statistic; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import com.alibaba.csp.sentinel.cluster.flow.statistic.metric.ClusterParamMetric; +import com.alibaba.csp.sentinel.cluster.server.config.ClusterServerConfigManager; import com.alibaba.csp.sentinel.util.AssertUtil; /** @@ -55,5 +57,13 @@ public final class ClusterParamMetricStatistics { return METRIC_MAP.get(id); } + public static void resetFlowMetrics() { + Set keySet = METRIC_MAP.keySet(); + for (Long id : keySet) { + METRIC_MAP.put(id, new ClusterParamMetric(ClusterServerConfigManager.getSampleCount(), + ClusterServerConfigManager.getIntervalMs())); + } + } + private ClusterParamMetricStatistics() {} } diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/data/ClusterFlowEvent.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/data/ClusterFlowEvent.java index 3f674ca9..8b79094f 100644 --- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/data/ClusterFlowEvent.java +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/data/ClusterFlowEvent.java @@ -17,6 +17,7 @@ package com.alibaba.csp.sentinel.cluster.flow.statistic.data; /** * @author Eric Zhao + * @since 1.4.0 */ public enum ClusterFlowEvent { @@ -36,7 +37,16 @@ public enum ClusterFlowEvent { * Token request (from client) blocked. */ BLOCK_REQUEST, + /** + * Pass (pre-occupy incoming buckets). + */ OCCUPIED_PASS, + /** + * Block (pre-occupy incoming buckets failed). + */ OCCUPIED_BLOCK, + /** + * Waiting due to flow shaping or for next bucket tick. + */ WAITING } diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/data/ClusterMetricBucket.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/data/ClusterMetricBucket.java index 6f1eee13..a27dac5a 100644 --- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/data/ClusterMetricBucket.java +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/data/ClusterMetricBucket.java @@ -19,6 +19,7 @@ import com.alibaba.csp.sentinel.slots.statistic.base.LongAdder; /** * @author Eric Zhao + * @since 1.4.0 */ public class ClusterMetricBucket { diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/metric/ClusterMetric.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/metric/ClusterMetric.java index ba992759..eea3c459 100644 --- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/metric/ClusterMetric.java +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/metric/ClusterMetric.java @@ -19,6 +19,7 @@ import java.util.List; import com.alibaba.csp.sentinel.cluster.flow.statistic.data.ClusterFlowEvent; import com.alibaba.csp.sentinel.cluster.flow.statistic.data.ClusterMetricBucket; +import com.alibaba.csp.sentinel.util.AssertUtil; /** * @author Eric Zhao @@ -28,8 +29,12 @@ public class ClusterMetric { private final ClusterMetricLeapArray metric; - public ClusterMetric(int windowLengthInMs, int intervalInSec) { - this.metric = new ClusterMetricLeapArray(windowLengthInMs, intervalInSec); + public ClusterMetric(int sampleCount, int intervalInMs) { + AssertUtil.isTrue(sampleCount > 0, "sampleCount should be positive"); + AssertUtil.isTrue(intervalInMs > 0, "interval should be positive"); + AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided"); + int windowLengthInMs = intervalInMs / sampleCount; + this.metric = new ClusterMetricLeapArray(windowLengthInMs, intervalInMs); } public void add(ClusterFlowEvent event, long count) { @@ -40,6 +45,12 @@ public class ClusterMetric { return metric.currentWindow().value().get(event); } + /** + * Get total sum for provided event in {@code intervalInSec}. + * + * @param event event to calculate + * @return total sum for event + */ public long getSum(ClusterFlowEvent event) { metric.currentWindow(); long sum = 0; @@ -51,11 +62,18 @@ public class ClusterMetric { return sum; } + /** + * Get average count for provided event per second. + * + * @param event event to calculate + * @return average count per second for event + */ public double getAvg(ClusterFlowEvent event) { return getSum(event) / metric.getIntervalInSecond(); } /** + * Try to pre-occupy upcoming buckets. * * @return time to wait for next bucket (in ms); 0 if cannot occupy next buckets */ @@ -70,7 +88,13 @@ public class ClusterMetric { } private boolean canOccupy(ClusterFlowEvent event, int acquireCount, double latestQps, double threshold) { - // TODO - return metric.getOccupiedCount(event) + latestQps + acquireCount /*- xxx*/ <= threshold; + long headPass = metric.getFirstCountOfWindow(event); + long occupiedCount = metric.getOccupiedCount(event); + // bucket to occupy (= incoming bucket) + // ↓ + // | head bucket | | | | current bucket | + // +-------------+----+----+----+----------- ----+ + // (headPass) + return latestQps + (acquireCount + occupiedCount) - headPass <= threshold; } } diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/metric/ClusterMetricLeapArray.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/metric/ClusterMetricLeapArray.java index 86c02d07..40d2d752 100644 --- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/metric/ClusterMetricLeapArray.java +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/metric/ClusterMetricLeapArray.java @@ -31,13 +31,13 @@ public class ClusterMetricLeapArray extends LeapArray { private boolean hasOccupied = false; /** - * The total bucket count is: {@link #sampleCount} = intervalInSec * 1000 / windowLengthInMs. + * The total bucket count is: {@link #sampleCount} = intervalInMs / windowLengthInMs. * * @param windowLengthInMs a single window bucket's time length in milliseconds. - * @param intervalInSec the total time span of this {@link LeapArray} in seconds. + * @param intervalInMs the total time span of this {@link LeapArray} in milliseconds. */ - public ClusterMetricLeapArray(int windowLengthInMs, int intervalInSec) { - super(windowLengthInMs, intervalInSec); + public ClusterMetricLeapArray(int windowLengthInMs, int intervalInMs) { + super(windowLengthInMs, intervalInMs / 1000); ClusterFlowEvent[] events = ClusterFlowEvent.values(); this.occupyCounter = new LongAdder[events.length]; for (ClusterFlowEvent event : events) { @@ -84,4 +84,15 @@ public class ClusterMetricLeapArray extends LeapArray { public long getOccupiedCount(ClusterFlowEvent event) { return occupyCounter[event.ordinal()].sum(); } + + public long getFirstCountOfWindow(ClusterFlowEvent event) { + if (event == null) { + return 0; + } + WindowWrap windowWrap = getValidHead(); + if (windowWrap == null) { + return 0; + } + return windowWrap.value().get(event); + } } diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/metric/ClusterParamMetric.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/metric/ClusterParamMetric.java index 9dd52c45..4daf7e41 100644 --- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/metric/ClusterParamMetric.java +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/metric/ClusterParamMetric.java @@ -19,20 +19,28 @@ import java.util.List; import com.alibaba.csp.sentinel.slots.statistic.base.LongAdder; import com.alibaba.csp.sentinel.slots.statistic.cache.CacheMap; +import com.alibaba.csp.sentinel.util.AssertUtil; /** * @author Eric Zhao + * @since 1.4.0 */ public class ClusterParamMetric { + public static final int DEFAULT_CLUSTER_MAX_CAPACITY = 4000; + private final ClusterParameterLeapArray metric; - public ClusterParamMetric(int windowLengthInMs, int intervalInSec) { - this.metric = new ClusterParameterLeapArray<>(windowLengthInMs, intervalInSec); + public ClusterParamMetric(int sampleCount, int intervalInMs) { + this(sampleCount, intervalInMs, DEFAULT_CLUSTER_MAX_CAPACITY); } - public ClusterParamMetric(int windowLengthInMs, int intervalInSec, int maxCapacity) { - this.metric = new ClusterParameterLeapArray<>(windowLengthInMs, intervalInSec, maxCapacity); + public ClusterParamMetric(int sampleCount, int intervalInMs, int maxCapacity) { + AssertUtil.isTrue(sampleCount > 0, "sampleCount should be positive"); + AssertUtil.isTrue(intervalInMs > 0, "interval should be positive"); + AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided"); + int windowLengthInMs = intervalInMs / sampleCount; + this.metric = new ClusterParameterLeapArray<>(windowLengthInMs, intervalInMs, maxCapacity); } public long getSum(Object value) { @@ -45,7 +53,8 @@ public class ClusterParamMetric { List> buckets = metric.values(); for (CacheMap bucket : buckets) { - sum += getCount(bucket.get(value)); + long count = getCount(bucket.get(value)); + sum += count; } return sum; } diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/metric/ClusterParameterLeapArray.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/metric/ClusterParameterLeapArray.java index d11c64c6..dbb603b4 100644 --- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/metric/ClusterParameterLeapArray.java +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/metric/ClusterParameterLeapArray.java @@ -22,20 +22,16 @@ import com.alibaba.csp.sentinel.slots.statistic.cache.ConcurrentLinkedHashMapWra import com.alibaba.csp.sentinel.util.AssertUtil; /** - * @author Eric Zhao * @param counter type + * @author Eric Zhao * @since 1.4.0 */ public class ClusterParameterLeapArray extends LeapArray> { private final int maxCapacity; - public ClusterParameterLeapArray(int windowLengthInMs, int intervalInSec) { - this(windowLengthInMs, intervalInSec, DEFAULT_CLUSTER_MAX_CAPACITY); - } - - public ClusterParameterLeapArray(int windowLengthInMs, int intervalInSec, int maxCapacity) { - super(windowLengthInMs, intervalInSec); + public ClusterParameterLeapArray(int windowLengthInMs, int intervalInMs, int maxCapacity) { + super(windowLengthInMs, intervalInMs / 1000); AssertUtil.isTrue(maxCapacity > 0, "maxCapacity of LRU map should be positive"); this.maxCapacity = maxCapacity; } @@ -46,11 +42,11 @@ public class ClusterParameterLeapArray extends LeapArray> } @Override - protected WindowWrap> resetWindowTo(WindowWrap> w, - long startTime) { + protected WindowWrap> resetWindowTo(WindowWrap> w, long startTime) { + w.resetTo(startTime); w.value().clear(); return w; } - public static final int DEFAULT_CLUSTER_MAX_CAPACITY = 4000; + } diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/DefaultEmbeddedTokenServer.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/DefaultEmbeddedTokenServer.java new file mode 100644 index 00000000..fd95cbf2 --- /dev/null +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/DefaultEmbeddedTokenServer.java @@ -0,0 +1,61 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.cluster.server; + +import java.util.Collection; + +import com.alibaba.csp.sentinel.cluster.TokenResult; +import com.alibaba.csp.sentinel.cluster.TokenResultStatus; +import com.alibaba.csp.sentinel.cluster.TokenService; + +/** + * Default embedded token server in Sentinel which wraps the {@link SentinelDefaultTokenServer} + * and the {@link TokenService} from SPI provider. + * + * @author Eric Zhao + * @since 1.4.0 + */ +public class DefaultEmbeddedTokenServer implements EmbeddedClusterTokenServer { + + private final TokenService tokenService = TokenServiceProvider.getService(); + private final ClusterTokenServer server = new SentinelDefaultTokenServer(true); + + @Override + public void start() throws Exception { + server.start(); + } + + @Override + public void stop() throws Exception { + server.stop(); + } + + @Override + public TokenResult requestToken(Long ruleId, int acquireCount, boolean prioritized) { + if (tokenService != null) { + return tokenService.requestToken(ruleId, acquireCount, prioritized); + } + return new TokenResult(TokenResultStatus.FAIL); + } + + @Override + public TokenResult requestParamToken(Long ruleId, int acquireCount, Collection params) { + if (tokenService != null) { + return tokenService.requestParamToken(ruleId, acquireCount, params); + } + return new TokenResult(TokenResultStatus.FAIL); + } +} diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/NettyTransportServer.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/NettyTransportServer.java index 5e3025b9..aded579a 100644 --- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/NettyTransportServer.java +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/NettyTransportServer.java @@ -46,13 +46,16 @@ import static com.alibaba.csp.sentinel.cluster.server.ServerConstants.*; /** * @author Eric Zhao + * @since 1.4.0 */ public class NettyTransportServer implements ClusterTokenServer { - private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt( - "io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2)); + private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1, + SystemPropertyUtil.getInt("io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2)); + private static final int MAX_RETRY_TIMES = 3; + private static final int RETRY_SLEEP_MS = 1000; - private final int port = 11111; + private final int port; private NioEventLoopGroup bossGroup; private NioEventLoopGroup workerGroup; @@ -62,6 +65,10 @@ public class NettyTransportServer implements ClusterTokenServer { private final AtomicInteger currentState = new AtomicInteger(SERVER_STATUS_OFF); private final AtomicInteger failedTimes = new AtomicInteger(0); + public NettyTransportServer(int port) { + this.port = port; + } + @Override public void start() { if (!currentState.compareAndSet(SERVER_STATUS_OFF, SERVER_STATUS_STARTING)) { @@ -92,23 +99,27 @@ public class NettyTransportServer implements ClusterTokenServer { .childOption(ChannelOption.SO_TIMEOUT, 10) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_RCVBUF, 32 * 1024); - b.bind(Integer.valueOf(port)).addListener(new GenericFutureListener() { + b.bind(port).addListener(new GenericFutureListener() { @Override public void operationComplete(ChannelFuture future) { if (future.cause() != null) { - RecordLog.info("Token server start failed", future.cause()); + RecordLog.info("[NettyTransportServer] Token server start failed (port=" + port + ")", + future.cause()); currentState.compareAndSet(SERVER_STATUS_STARTING, SERVER_STATUS_OFF); - - //try { - // Thread.sleep((failStartTimes.get() + 1) * 1000); - // start(); - //} catch (Throwable e) { - // RecordLog.info("Fail to start token server:", e); - //} + int failCount = failedTimes.incrementAndGet(); + if (failCount > MAX_RETRY_TIMES) { + return; + } + + try { + Thread.sleep(failCount * RETRY_SLEEP_MS); + start(); + } catch (Throwable e) { + RecordLog.info("[NettyTransportServer] Failed to start token server when retrying", e); + } } else { - RecordLog.info("Token server start success"); + RecordLog.info("[NettyTransportServer] Token server started success at port " + port); currentState.compareAndSet(SERVER_STATUS_STARTING, SERVER_STATUS_STARTED); - //failStartTimes.set(0); } } }); @@ -119,9 +130,9 @@ public class NettyTransportServer implements ClusterTokenServer { // If still initializing, wait for ready. while (currentState.get() == SERVER_STATUS_STARTING) { try { - Thread.sleep(1000); + Thread.sleep(500); } catch (InterruptedException e) { - e.printStackTrace(); + // Ignore. } } @@ -133,9 +144,9 @@ public class NettyTransportServer implements ClusterTokenServer { failedTimes.set(0); - RecordLog.info("Token server stopped"); + RecordLog.info("[NettyTransportServer] Sentinel token server stopped"); } catch (Exception ex) { - RecordLog.warn("Failed to stop token server", ex); + RecordLog.warn("[NettyTransportServer] Failed to stop token server (port=" + port + ")", ex); } } } diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/SentinelDefaultTokenServer.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/SentinelDefaultTokenServer.java new file mode 100644 index 00000000..c6a42db9 --- /dev/null +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/SentinelDefaultTokenServer.java @@ -0,0 +1,142 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.cluster.server; + +import java.util.concurrent.atomic.AtomicBoolean; + +import com.alibaba.csp.sentinel.cluster.registry.ConfigSupplierRegistry; +import com.alibaba.csp.sentinel.cluster.server.config.ClusterServerConfigManager; +import com.alibaba.csp.sentinel.cluster.server.config.ServerTransportConfig; +import com.alibaba.csp.sentinel.cluster.server.config.ServerTransportConfigObserver; +import com.alibaba.csp.sentinel.cluster.server.connection.ConnectionManager; +import com.alibaba.csp.sentinel.init.InitExecutor; +import com.alibaba.csp.sentinel.log.RecordLog; +import com.alibaba.csp.sentinel.util.HostNameUtil; +import com.alibaba.csp.sentinel.util.StringUtil; + +/** + * @author Eric Zhao + * @since 1.4.0 + */ +public class SentinelDefaultTokenServer implements ClusterTokenServer { + + private final boolean embedded; + + private ClusterTokenServer server; + private int port; + private final AtomicBoolean shouldStart = new AtomicBoolean(false); + + static { + InitExecutor.doInit(); + } + + public SentinelDefaultTokenServer() { + this(false); + } + + public SentinelDefaultTokenServer(boolean embedded) { + this.embedded = embedded; + ClusterServerConfigManager.addTransportConfigChangeObserver(new ServerTransportConfigObserver() { + @Override + public void onTransportConfigChange(ServerTransportConfig config) { + changeServerConfig(config); + } + }); + initNewServer(); + } + + private void initNewServer() { + if (server != null) { + return; + } + int port = ClusterServerConfigManager.getPort(); + if (port > 0) { + this.server = new NettyTransportServer(port); + this.port = port; + } + } + + private synchronized void changeServerConfig(ServerTransportConfig config) { + if (config == null || config.getPort() <= 0) { + return; + } + int newPort = config.getPort(); + if (newPort == port) { + return; + } + try { + if (server != null) { + stopServerIfStarted(); + } + this.server = new NettyTransportServer(newPort); + this.port = newPort; + startServerIfScheduled(); + } catch (Exception ex) { + RecordLog.warn("[SentinelDefaultTokenServer] Failed to apply modification to token server", ex); + } + } + + private void startServerIfScheduled() throws Exception { + if (shouldStart.get()) { + if (server != null) { + server.start(); + if (embedded) { + RecordLog.info("[SentinelDefaultTokenServer] Running in embedded mode"); + handleEmbeddedStart(); + } + } + } + } + + private void stopServerIfStarted() throws Exception { + if (shouldStart.get()) { + if (server != null) { + server.stop(); + if (embedded) { + handleEmbeddedStop(); + } + } + } + } + + private void handleEmbeddedStop() { + String namespace = ConfigSupplierRegistry.getNamespaceSupplier().get(); + if (StringUtil.isNotEmpty(namespace)) { + ConnectionManager.removeConnection(namespace, HostNameUtil.getIp()); + } + } + + private void handleEmbeddedStart() { + String namespace = ConfigSupplierRegistry.getNamespaceSupplier().get(); + if (StringUtil.isNotEmpty(namespace)) { + ConnectionManager.addConnection(namespace, HostNameUtil.getIp()); + } + } + + @Override + public void start() throws Exception { + if (shouldStart.compareAndSet(false, true)) { + startServerIfScheduled(); + } + } + + @Override + public void stop() throws Exception { + if (shouldStart.compareAndSet(true, false)) { + stopServerIfStarted(); + } + } +} diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/TokenServiceProvider.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/TokenServiceProvider.java index 8e3c2706..969e5400 100644 --- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/TokenServiceProvider.java +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/TokenServiceProvider.java @@ -52,6 +52,7 @@ public final class TokenServiceProvider { } if (hasOther) { + // Pick the first. service = list.get(0); } else { // No custom token service, using default. diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/codec/DefaultRequestEntityDecoder.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/codec/DefaultRequestEntityDecoder.java index 9012ae3f..bdd42e64 100644 --- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/codec/DefaultRequestEntityDecoder.java +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/codec/DefaultRequestEntityDecoder.java @@ -54,7 +54,6 @@ public class DefaultRequestEntityDecoder implements RequestEntityDecoder { @@ -38,7 +40,6 @@ public class ParamFlowRequestDataDecoder implements EntityDecoder 0) { - // TODO: should check rules exist here? List params = new ArrayList<>(amount); for (int i = 0; i < amount; i++) { decodeParam(source, params); @@ -48,7 +49,6 @@ public class ParamFlowRequestDataDecoder implements EntityDecoder { + + @Override + public String decode(ByteBuf source) { + if (source.readableBytes() >= 4) { + int length = source.readInt(); + if (length > 0 && source.readableBytes() > 0) { + byte[] bytes = new byte[length]; + source.readBytes(bytes); + return new String(bytes); + } + } + return null; + } +} diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/codec/data/PingResponseDataWriter.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/codec/data/PingResponseDataWriter.java new file mode 100644 index 00000000..26e31ecd --- /dev/null +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/codec/data/PingResponseDataWriter.java @@ -0,0 +1,36 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.cluster.server.codec.data; + +import com.alibaba.csp.sentinel.cluster.codec.EntityWriter; +import com.alibaba.csp.sentinel.util.StringUtil; + +import io.netty.buffer.ByteBuf; + +/** + * @author Eric Zhao + * @since 1.4.0 + */ +public class PingResponseDataWriter implements EntityWriter { + + @Override + public void writeTo(Integer entity, ByteBuf target) { + if (entity == null || target == null) { + return; + } + target.writeByte(entity); + } +} diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/codec/netty/NettyRequestDecoder.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/codec/netty/NettyRequestDecoder.java index d5257f19..f5c37b34 100644 --- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/codec/netty/NettyRequestDecoder.java +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/codec/netty/NettyRequestDecoder.java @@ -36,7 +36,6 @@ public class NettyRequestDecoder extends ByteToMessageDecoder { protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { RequestEntityDecoder requestDecoder = ServerEntityCodecProvider.getRequestEntityDecoder(); if (requestDecoder == null) { - // TODO: may need to throw exception? RecordLog.warn("[NettyRequestDecoder] Cannot resolve the global request entity decoder, " + "dropping the request"); return; diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/config/ClusterServerConfigManager.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/config/ClusterServerConfigManager.java index 299a1275..4054d824 100644 --- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/config/ClusterServerConfigManager.java +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/config/ClusterServerConfigManager.java @@ -15,22 +15,317 @@ */ package com.alibaba.csp.sentinel.cluster.server.config; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterFlowRuleManager; +import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterParamFlowRuleManager; +import com.alibaba.csp.sentinel.cluster.flow.statistic.ClusterMetricStatistics; +import com.alibaba.csp.sentinel.cluster.flow.statistic.ClusterParamMetricStatistics; +import com.alibaba.csp.sentinel.cluster.server.ServerConstants; +import com.alibaba.csp.sentinel.log.RecordLog; +import com.alibaba.csp.sentinel.property.DynamicSentinelProperty; +import com.alibaba.csp.sentinel.property.PropertyListener; +import com.alibaba.csp.sentinel.property.SentinelProperty; +import com.alibaba.csp.sentinel.util.AssertUtil; + /** * @author Eric Zhao + * @since 1.4.0 */ public final class ClusterServerConfigManager { - private static final int DEFAULT_PORT = 8730; - private static final int DEFAULT_IDLE_SECONDS = 600; + /** + * Server global transport and scope config. + */ + private static volatile int port = ServerTransportConfig.DEFAULT_PORT; + private static volatile int idleSeconds = ServerTransportConfig.DEFAULT_IDLE_SECONDS; + private static volatile Set namespaceSet = Collections.singleton(ServerConstants.DEFAULT_NAMESPACE); + + /** + * Server global flow config. + */ + private static volatile double exceedCount = ServerFlowConfig.DEFAULT_EXCEED_COUNT; + private static volatile double maxOccupyRatio = ServerFlowConfig.DEFAULT_MAX_OCCUPY_RATIO; + private static volatile int intervalMs = ServerFlowConfig.DEFAULT_INTERVAL_MS; + private static volatile int sampleCount = ServerFlowConfig.DEFAULT_SAMPLE_COUNT; + + /** + * Namespace-specific flow config for token server. + * Format: (namespace, config). + */ + private static final Map NAMESPACE_CONF = new ConcurrentHashMap<>(); + + private static final List TRANSPORT_CONFIG_OBSERVERS = new ArrayList<>(); + + /** + * Property for cluster server global transport configuration. + */ + private static SentinelProperty transportConfigProperty = new DynamicSentinelProperty<>(); + /** + * Property for cluster server namespace set. + */ + private static SentinelProperty> namespaceSetProperty = new DynamicSentinelProperty<>(); + /** + * Property for cluster server global flow control configuration. + */ + private static SentinelProperty globalFlowProperty = new DynamicSentinelProperty<>(); + + private static final PropertyListener TRANSPORT_PROPERTY_LISTENER + = new ServerGlobalTransportPropertyListener(); + private static final PropertyListener GLOBAL_FLOW_PROPERTY_LISTENER + = new ServerGlobalFlowPropertyListener(); + private static final PropertyListener> NAMESPACE_SET_PROPERTY_LISTENER + = new ServerNamespaceSetPropertyListener(); + + static { + transportConfigProperty.addListener(TRANSPORT_PROPERTY_LISTENER); + globalFlowProperty.addListener(GLOBAL_FLOW_PROPERTY_LISTENER); + namespaceSetProperty.addListener(NAMESPACE_SET_PROPERTY_LISTENER); + } + + public static void registerNamespaceSetProperty(SentinelProperty> property) { + synchronized (NAMESPACE_SET_PROPERTY_LISTENER) { + RecordLog.info( + "[ClusterServerConfigManager] Registering new namespace set dynamic property to Sentinel server " + + "config manager"); + namespaceSetProperty.removeListener(NAMESPACE_SET_PROPERTY_LISTENER); + property.addListener(NAMESPACE_SET_PROPERTY_LISTENER); + namespaceSetProperty = property; + } + } + + public static void registerServerTransportProperty(SentinelProperty property) { + synchronized (TRANSPORT_PROPERTY_LISTENER) { + RecordLog.info( + "[ClusterServerConfigManager] Registering new server transport dynamic property to Sentinel server " + + "config manager"); + transportConfigProperty.removeListener(TRANSPORT_PROPERTY_LISTENER); + property.addListener(TRANSPORT_PROPERTY_LISTENER); + transportConfigProperty = property; + } + } + + private static class ServerNamespaceSetPropertyListener implements PropertyListener> { + + @Override + public synchronized void configLoad(Set set) { + if (set == null || set.isEmpty()) { + RecordLog.warn("[ClusterServerConfigManager] WARN: empty initial server namespace set"); + return; + } + applyNamespaceSetChange(set); + } + + @Override + public synchronized void configUpdate(Set set) { + // TODO: should debounce? + applyNamespaceSetChange(set); + } + } + + private static void applyNamespaceSetChange(Set newSet) { + if (newSet == null) { + return; + } + RecordLog.info("[ClusterServerConfigManager] Server namespace set will be update to: " + newSet); + if (newSet.isEmpty()) { + ClusterServerConfigManager.namespaceSet = Collections.singleton(ServerConstants.DEFAULT_NAMESPACE); + return; + } + newSet.add(ServerConstants.DEFAULT_NAMESPACE); + + Set oldSet = ClusterServerConfigManager.namespaceSet; + if (oldSet != null && !oldSet.isEmpty()) { + for (String ns : oldSet) { + if (!newSet.contains(ns)) { + ClusterFlowRuleManager.removeProperty(ns); + ClusterParamFlowRuleManager.removeProperty(ns); + } + } + } + + ClusterServerConfigManager.namespaceSet = newSet; + for (String ns : newSet) { + ClusterFlowRuleManager.registerPropertyIfAbsent(ns); + ClusterParamFlowRuleManager.registerPropertyIfAbsent(ns); + } + } + + private static class ServerGlobalTransportPropertyListener implements PropertyListener { + + @Override + public void configLoad(ServerTransportConfig config) { + if (config == null) { + RecordLog.warn("[ClusterServerConfigManager] Empty initial server transport config"); + return; + } + applyConfig(config); + } + + @Override + public void configUpdate(ServerTransportConfig config) { + applyConfig(config); + } + + private synchronized void applyConfig(ServerTransportConfig config) { + if (!isValidTransportConfig(config)) { + RecordLog.warn( + "[ClusterServerConfigManager] Invalid cluster server transport config, ignoring: " + config); + return; + } + RecordLog.info("[ClusterServerConfigManager] Updating new server transport config: " + config); + if (config.getIdleSeconds() != idleSeconds) { + idleSeconds = config.getIdleSeconds(); + } + updateTokenServer(config); + } + } + + private static class ServerGlobalFlowPropertyListener implements PropertyListener { + + @Override + public void configUpdate(ServerFlowConfig config) { + applyGlobalFlowConfig(config); + } + + @Override + public void configLoad(ServerFlowConfig config) { + applyGlobalFlowConfig(config); + } + } + + private static synchronized void applyGlobalFlowConfig(ServerFlowConfig config) { + if (!isValidFlowConfig(config)) { + RecordLog.warn( + "[ClusterServerConfigManager] Invalid cluster server global flow config, ignoring: " + config); + return; + } + RecordLog.info("[ClusterServerConfigManager] Updating new server global flow config: " + config); + if (config.getExceedCount() != exceedCount) { + exceedCount = config.getExceedCount(); + } + if (config.getMaxOccupyRatio() != maxOccupyRatio) { + maxOccupyRatio = config.getMaxOccupyRatio(); + } + int newIntervalMs = config.getIntervalMs(); + int newSampleCount = config.getSampleCount(); + if (newIntervalMs != intervalMs || newSampleCount != sampleCount) { + if (newIntervalMs <= 0 || newSampleCount <= 0 || newIntervalMs % newSampleCount != 0) { + RecordLog.warn("[ClusterServerConfigManager] Ignoring invalid flow interval or sample count"); + } else { + intervalMs = newIntervalMs; + sampleCount = newSampleCount; + // Reset all the metrics. + ClusterMetricStatistics.resetFlowMetrics(); + ClusterParamMetricStatistics.resetFlowMetrics(); + } + } + } + + public static void updateTokenServer(ServerTransportConfig config) { + int newPort = config.getPort(); + AssertUtil.isTrue(newPort > 0, "token server port should be valid (positive)"); + if (newPort == port) { + return; + } + ClusterServerConfigManager.port = newPort; + + for (ServerTransportConfigObserver observer : TRANSPORT_CONFIG_OBSERVERS) { + observer.onTransportConfigChange(config); + } + } + + public static boolean isValidTransportConfig(ServerTransportConfig config) { + return config != null && config.getPort() > 0; + } + + public static boolean isValidFlowConfig(ServerFlowConfig config) { + return config != null && config.getMaxOccupyRatio() >= 0 && config.getExceedCount() >= 0; + } + + public static void addTransportConfigChangeObserver(ServerTransportConfigObserver observer) { + AssertUtil.notNull(observer, "observer cannot be null"); + TRANSPORT_CONFIG_OBSERVERS.add(observer); + } + + public static double getExceedCount(String namespace) { + AssertUtil.notEmpty(namespace, "namespace cannot be empty"); + ServerFlowConfig config = NAMESPACE_CONF.get(namespace); + if (config != null) { + return config.getExceedCount(); + } + return exceedCount; + } + + public static double getMaxOccupyRatio(String namespace) { + AssertUtil.notEmpty(namespace, "namespace cannot be empty"); + ServerFlowConfig config = NAMESPACE_CONF.get(namespace); + if (config != null) { + return config.getMaxOccupyRatio(); + } + return maxOccupyRatio; + } + + public static int getIntervalMs(String namespace) { + AssertUtil.notEmpty(namespace, "namespace cannot be empty"); + ServerFlowConfig config = NAMESPACE_CONF.get(namespace); + if (config != null) { + return config.getIntervalMs(); + } + return intervalMs; + } + + /** + * Get sample count of provided namespace. + * + * @param namespace valid namespace + * @return the sample count of namespace; if the namespace does not have customized value, use the global value + */ + public static int getSampleCount(String namespace) { + AssertUtil.notEmpty(namespace, "namespace cannot be empty"); + ServerFlowConfig config = NAMESPACE_CONF.get(namespace); + if (config != null) { + return config.getSampleCount(); + } + return sampleCount; + } + + public static double getExceedCount() { + return exceedCount; + } + + public static double getMaxOccupyRatio() { + return maxOccupyRatio; + } + + public static Set getNamespaceSet() { + return namespaceSet; + } + + public static int getPort() { + return port; + } + + public static int getIdleSeconds() { + return idleSeconds; + } - public static volatile int port = DEFAULT_PORT; + public static int getIntervalMs() { + return intervalMs; + } - public static volatile double exceedCount = 1.0d; - public static volatile boolean borrowRefEnabled = true; - public static volatile int idleSeconds = DEFAULT_IDLE_SECONDS; - public static volatile double maxOccupyRatio = 1.0d; + public static int getSampleCount() { + return sampleCount; + } - // TODO: implement here. + public static void setNamespaceSet(Set namespaceSet) { + applyNamespaceSetChange(namespaceSet); + } private ClusterServerConfigManager() {} } diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/config/ServerFlowConfig.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/config/ServerFlowConfig.java new file mode 100644 index 00000000..00991605 --- /dev/null +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/config/ServerFlowConfig.java @@ -0,0 +1,97 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.cluster.server.config; + +import com.alibaba.csp.sentinel.cluster.server.ServerConstants; + +/** + * @author Eric Zhao + * @since 1.4.0 + */ +public class ServerFlowConfig { + + public static final double DEFAULT_EXCEED_COUNT = 1.0d; + public static final double DEFAULT_MAX_OCCUPY_RATIO = 1.0d; + + public static final int DEFAULT_INTERVAL_MS = 1000; + public static final int DEFAULT_SAMPLE_COUNT= 10; + + private final String namespace; + + private double exceedCount = DEFAULT_EXCEED_COUNT; + private double maxOccupyRatio = DEFAULT_MAX_OCCUPY_RATIO; + private int intervalMs = DEFAULT_INTERVAL_MS; + private int sampleCount = DEFAULT_SAMPLE_COUNT; + + public ServerFlowConfig() { + this(ServerConstants.DEFAULT_NAMESPACE); + } + + public ServerFlowConfig(String namespace) { + this.namespace = namespace; + } + + public String getNamespace() { + return namespace; + } + + public double getExceedCount() { + return exceedCount; + } + + public ServerFlowConfig setExceedCount(double exceedCount) { + this.exceedCount = exceedCount; + return this; + } + + public double getMaxOccupyRatio() { + return maxOccupyRatio; + } + + public ServerFlowConfig setMaxOccupyRatio(double maxOccupyRatio) { + this.maxOccupyRatio = maxOccupyRatio; + return this; + } + + public int getIntervalMs() { + return intervalMs; + } + + public ServerFlowConfig setIntervalMs(int intervalMs) { + this.intervalMs = intervalMs; + return this; + } + + public int getSampleCount() { + return sampleCount; + } + + public ServerFlowConfig setSampleCount(int sampleCount) { + this.sampleCount = sampleCount; + return this; + } + + @Override + public String toString() { + return "ServerFlowConfig{" + + "namespace='" + namespace + '\'' + + ", exceedCount=" + exceedCount + + ", maxOccupyRatio=" + maxOccupyRatio + + ", intervalMs=" + intervalMs + + ", sampleCount=" + sampleCount + + '}'; + } +} diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/config/ServerTransportConfig.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/config/ServerTransportConfig.java new file mode 100644 index 00000000..74575ede --- /dev/null +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/config/ServerTransportConfig.java @@ -0,0 +1,64 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.cluster.server.config; + +/** + * @author Eric Zhao + * @since 1.4.0 + */ +public class ServerTransportConfig { + + public static final int DEFAULT_PORT = 8730; + public static final int DEFAULT_IDLE_SECONDS = 600; + + private int port; + private int idleSeconds; + + public ServerTransportConfig() { + this(DEFAULT_PORT, DEFAULT_IDLE_SECONDS); + } + + public ServerTransportConfig(int port, int idleSeconds) { + this.port = port; + this.idleSeconds = idleSeconds; + } + + public int getPort() { + return port; + } + + public ServerTransportConfig setPort(int port) { + this.port = port; + return this; + } + + public int getIdleSeconds() { + return idleSeconds; + } + + public ServerTransportConfig setIdleSeconds(int idleSeconds) { + this.idleSeconds = idleSeconds; + return this; + } + + @Override + public String toString() { + return "ServerTransportConfig{" + + "port=" + port + + ", idleSeconds=" + idleSeconds + + '}'; + } +} diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/config/ServerTransportConfigObserver.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/config/ServerTransportConfigObserver.java new file mode 100644 index 00000000..deccf380 --- /dev/null +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/config/ServerTransportConfigObserver.java @@ -0,0 +1,30 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.cluster.server.config; + +/** + * @author Eric Zhao + * @since 1.4.0 + */ +public interface ServerTransportConfigObserver { + + /** + * Callback on server transport config (e.g. port) change. + * + * @param config new server transport config + */ + void onTransportConfigChange(ServerTransportConfig config); +} diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/Connection.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/Connection.java index 29d6ecba..a409c661 100644 --- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/Connection.java +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/Connection.java @@ -20,6 +20,7 @@ import java.net.SocketAddress; /** * @author xuyue * @author Eric Zhao + * @since 1.4.0 */ public interface Connection extends AutoCloseable { diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/ConnectionDescriptor.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/ConnectionDescriptor.java new file mode 100644 index 00000000..5dd107a2 --- /dev/null +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/ConnectionDescriptor.java @@ -0,0 +1,69 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.cluster.server.connection; + +import java.util.Objects; + +/** + * @author Eric Zhao + * @since 1.4.0 + */ +public class ConnectionDescriptor { + + private String address; + private String host; + + public String getAddress() { + return address; + } + + public ConnectionDescriptor setAddress(String address) { + this.address = address; + return this; + } + + public String getHost() { + return host; + } + + public ConnectionDescriptor setHost(String host) { + this.host = host; + return this; + } + + @Override + public boolean equals(Object o) { + if (this == o) { return true; } + if (o == null || getClass() != o.getClass()) { return false; } + + ConnectionDescriptor that = (ConnectionDescriptor)o; + + return Objects.equals(address, that.address); + } + + @Override + public int hashCode() { + return address != null ? address.hashCode() : 0; + } + + @Override + public String toString() { + return "ConnectionDescriptor{" + + "address='" + address + '\'' + + ", host='" + host + '\'' + + '}'; + } +} diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/ConnectionGroup.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/ConnectionGroup.java index 6046a19c..c4f84719 100644 --- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/ConnectionGroup.java +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/ConnectionGroup.java @@ -15,6 +15,8 @@ */ package com.alibaba.csp.sentinel.cluster.server.connection; +import java.util.Collections; +import java.util.HashSet; import java.util.Set; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.atomic.AtomicInteger; @@ -23,16 +25,17 @@ import com.alibaba.csp.sentinel.cluster.server.ServerConstants; import com.alibaba.csp.sentinel.util.AssertUtil; /** + * The connection group stores connection set for a specific namespace. + * * @author Eric Zhao * @since 1.4.0 */ public class ConnectionGroup { - private String namespace; + private final String namespace; - private Set addressSet = new ConcurrentSkipListSet<>(); - private Set hostSet = new ConcurrentSkipListSet<>(); - private AtomicInteger connectedCount = new AtomicInteger(); + private final Set connectionSet = Collections.synchronizedSet(new HashSet()); + private final AtomicInteger connectedCount = new AtomicInteger(); public ConnectionGroup(String namespace) { AssertUtil.notEmpty(namespace, "namespace cannot be empty"); @@ -46,24 +49,26 @@ public class ConnectionGroup { public ConnectionGroup addConnection(String address) { AssertUtil.notEmpty(address, "address cannot be empty"); - addressSet.add(address); String[] ip = address.split(":"); + String host; if (ip != null && ip.length >= 1) { - hostSet.add(ip[0]); + host = ip[0]; + } else { + host = address; } + connectionSet.add(new ConnectionDescriptor().setAddress(address).setHost(host)); connectedCount.incrementAndGet(); + return this; } public ConnectionGroup removeConnection(String address) { AssertUtil.notEmpty(address, "address cannot be empty"); - addressSet.remove(address); - String[] ip = address.split(":"); - if (ip != null && ip.length >= 1) { - hostSet.remove(ip[0]); + if (connectionSet.remove(new ConnectionDescriptor().setAddress(address))) { + connectedCount.decrementAndGet(); } - connectedCount.decrementAndGet(); + return this; } @@ -71,12 +76,8 @@ public class ConnectionGroup { return namespace; } - public Set getAddressSet() { - return addressSet; - } - - public Set getHostSet() { - return hostSet; + public Set getConnectionSet() { + return connectionSet; } public int getConnectedCount() { diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/ConnectionManager.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/ConnectionManager.java index 100ba684..8ce244fe 100644 --- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/ConnectionManager.java +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/ConnectionManager.java @@ -18,15 +18,89 @@ package com.alibaba.csp.sentinel.cluster.server.connection; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import com.alibaba.csp.sentinel.log.RecordLog; +import com.alibaba.csp.sentinel.util.AssertUtil; + /** + * Manager for namespace-scope {@link ConnectionGroup}. + * * @author Eric Zhao * @since 1.4.0 */ public final class ConnectionManager { + /** + * Connection map (namespace, connection). + */ private static final Map CONN_MAP = new ConcurrentHashMap<>(); + /** + * namespace map (address, namespace). + */ + private static final Map NAMESPACE_MAP = new ConcurrentHashMap<>(); + + /** + * Get connected count for specific namespace. + * + * @param namespace namespace to check + * @return connected count for specific namespace + */ + public static int getConnectedCount(String namespace) { + AssertUtil.notEmpty(namespace, "namespace should not be empty"); + ConnectionGroup group = CONN_MAP.get(namespace); + return group == null ? 0 : group.getConnectedCount(); + } + + public static ConnectionGroup getOrCreateGroup(String namespace) { + AssertUtil.assertNotBlank(namespace, "namespace should not be empty"); + ConnectionGroup group = CONN_MAP.get(namespace); + if (group == null) { + synchronized (CREATE_LOCK) { + if (CONN_MAP.get(namespace) == null) { + group = new ConnectionGroup(namespace); + CONN_MAP.put(namespace, group); + } + } + } + return group; + } + + public static void removeConnection(String address) { + AssertUtil.assertNotBlank(address, "address should not be empty"); + String namespace = NAMESPACE_MAP.get(address); + if (namespace != null) { + ConnectionGroup group = CONN_MAP.get(namespace); + if (group == null) { + return; + } + group.removeConnection(address); + RecordLog.info("[ConnectionManager] Client <{0}> disconnected and removed from namespace <{1}>", address, namespace); + } + NAMESPACE_MAP.remove(address); + } + + public static void removeConnection(String namespace, String address) { + AssertUtil.assertNotBlank(namespace, "namespace should not be empty"); + AssertUtil.assertNotBlank(address, "address should not be empty"); + ConnectionGroup group = CONN_MAP.get(namespace); + if (group == null) { + return; + } + group.removeConnection(address); + NAMESPACE_MAP.remove(address); + RecordLog.info("[ConnectionManager] Client <{0}> disconnected and removed from namespace <{1}>", address, namespace); + } + public static ConnectionGroup addConnection(String namespace, String address) { + AssertUtil.assertNotBlank(namespace, "namespace should not be empty"); + AssertUtil.assertNotBlank(address, "address should not be empty"); + ConnectionGroup group = getOrCreateGroup(namespace); + group.addConnection(address); + NAMESPACE_MAP.put(address, namespace); + RecordLog.info("[ConnectionManager] Client <{0}> registered with namespace <{1}>", address, namespace); + return group; + } + private static final Object CREATE_LOCK = new Object(); private ConnectionManager() {} } diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/ConnectionPool.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/ConnectionPool.java index 4bc15dfb..e1e320ce 100644 --- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/ConnectionPool.java +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/ConnectionPool.java @@ -50,11 +50,6 @@ public class ConnectionPool { */ private ScheduledFuture scanTaskFuture = null; - /** - * 创建一个connection,并放入连接池中 - * - * @param channel - */ public void createConnection(Channel channel) { if (channel != null) { Connection connection = new NettyConnection(channel, this); @@ -83,7 +78,7 @@ public class ConnectionPool { * @return formatted key */ private String getConnectionKey(Channel channel) { - InetSocketAddress socketAddress = (InetSocketAddress) channel.remoteAddress(); + InetSocketAddress socketAddress = (InetSocketAddress)channel.remoteAddress(); String remoteIp = socketAddress.getAddress().getHostAddress(); int remotePort = socketAddress.getPort(); return remoteIp + ":" + remotePort; @@ -93,16 +88,10 @@ public class ConnectionPool { return ip + ":" + port; } - /** - * 刷新一个连接上的最新read时间 - * - * @param channel - */ public void refreshLastReadTime(Channel channel) { if (channel != null) { String connKey = getConnectionKey(channel); Connection connection = CONNECTION_MAP.get(connKey); - //不应该为null,需要处理这种情况吗? if (connection != null) { connection.refreshLastReadTime(System.currentTimeMillis()); } @@ -124,7 +113,7 @@ public class ConnectionPool { return connections; } - public int count(){ + public int count() { return CONNECTION_MAP.size(); } @@ -141,7 +130,7 @@ public class ConnectionPool { public void refreshIdleTask() { if (scanTaskFuture == null || scanTaskFuture.cancel(false)) { startScan(); - }else { + } else { RecordLog.info("The result of canceling scanTask is error."); } } diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/NettyConnection.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/NettyConnection.java index 3463871e..a3351ced 100644 --- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/NettyConnection.java +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/NettyConnection.java @@ -22,6 +22,7 @@ import io.netty.channel.Channel; /** * @author xuyue + * @since 1.4.0 */ public class NettyConnection implements Connection { diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/ScanIdleConnectionTask.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/ScanIdleConnectionTask.java index 22af12d5..a559aeeb 100644 --- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/ScanIdleConnectionTask.java +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/ScanIdleConnectionTask.java @@ -3,15 +3,17 @@ package com.alibaba.csp.sentinel.cluster.server.connection; import java.util.List; import com.alibaba.csp.sentinel.cluster.server.config.ClusterServerConfigManager; +import com.alibaba.csp.sentinel.cluster.server.config.ServerTransportConfig; import com.alibaba.csp.sentinel.log.RecordLog; /** * @author xuyue * @author Eric Zhao + * @since 1.4.0 */ public class ScanIdleConnectionTask implements Runnable { - private ConnectionPool connectionPool; + private final ConnectionPool connectionPool; public ScanIdleConnectionTask(ConnectionPool connectionPool) { this.connectionPool = connectionPool; @@ -20,15 +22,15 @@ public class ScanIdleConnectionTask implements Runnable { @Override public void run() { try { - int idleSeconds = ClusterServerConfigManager.idleSeconds; - long idleTime = idleSeconds * 1000; - if (idleTime < 0) { - idleTime = 600 * 1000; + int idleSeconds = ClusterServerConfigManager.getIdleSeconds(); + long idleTimeMillis = idleSeconds * 1000; + if (idleTimeMillis < 0) { + idleTimeMillis = ServerTransportConfig.DEFAULT_IDLE_SECONDS * 1000; } long now = System.currentTimeMillis(); List connections = connectionPool.listAllConnection(); for (Connection conn : connections) { - if ((now - conn.getLastReadTime()) > idleTime) { + if ((now - conn.getLastReadTime()) > idleTimeMillis) { RecordLog.info( String.format("[ScanIdleConnectionTask] The connection <%s:%d> has been idle for <%d>s. " + "It will be closed now.", conn.getRemoteIP(), conn.getRemotePort(), idleSeconds) @@ -37,7 +39,7 @@ public class ScanIdleConnectionTask implements Runnable { } } } catch (Throwable t) { - // TODO: should log here. + RecordLog.warn("[ScanIdleConnectionTask] Failed to clean-up idle tasks", t); } } } diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/handler/TokenServerHandler.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/handler/TokenServerHandler.java index 0aed66c8..69d113cb 100644 --- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/handler/TokenServerHandler.java +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/handler/TokenServerHandler.java @@ -18,11 +18,12 @@ package com.alibaba.csp.sentinel.cluster.server.handler; import com.alibaba.csp.sentinel.cluster.ClusterConstants; import com.alibaba.csp.sentinel.cluster.request.ClusterRequest; import com.alibaba.csp.sentinel.cluster.response.ClusterResponse; -import com.alibaba.csp.sentinel.cluster.response.data.FlowTokenResponseData; +import com.alibaba.csp.sentinel.cluster.server.connection.ConnectionManager; import com.alibaba.csp.sentinel.cluster.server.connection.ConnectionPool; import com.alibaba.csp.sentinel.cluster.server.processor.RequestProcessor; -import com.alibaba.csp.sentinel.cluster.server.processor.RequestProcessorRegistry; +import com.alibaba.csp.sentinel.cluster.server.processor.RequestProcessorProvider; import com.alibaba.csp.sentinel.log.RecordLog; +import com.alibaba.csp.sentinel.util.StringUtil; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; @@ -35,36 +36,46 @@ import io.netty.channel.ChannelInboundHandlerAdapter; */ public class TokenServerHandler extends ChannelInboundHandlerAdapter { - private final ConnectionPool connectionPool; + private final ConnectionPool globalConnectionPool; - public TokenServerHandler(ConnectionPool connectionPool) { - this.connectionPool = connectionPool; + public TokenServerHandler(ConnectionPool globalConnectionPool) { + this.globalConnectionPool = globalConnectionPool; } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { - System.out.println("[TokenServerHandler] Connection established"); - super.channelActive(ctx); + globalConnectionPool.createConnection(ctx.channel()); + String remoteAddress = getRemoteAddress(ctx); + System.out.println("[TokenServerHandler] Connection established, remote client address: " + remoteAddress); //TODO: DEBUG } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { - System.out.println("[TokenServerHandler] Connection inactive"); - super.channelInactive(ctx); + String remoteAddress = getRemoteAddress(ctx); + System.out.println("[TokenServerHandler] Connection inactive, remote client address: " + remoteAddress); //TODO: DEBUG + globalConnectionPool.remove(ctx.channel()); + ConnectionManager.removeConnection(remoteAddress); } @Override @SuppressWarnings("unchecked") public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - connectionPool.refreshLastReadTime(ctx.channel()); - System.out.println(String.format("[%s] Server message recv: %s", System.currentTimeMillis(), msg)); + globalConnectionPool.refreshLastReadTime(ctx.channel()); + System.out.println(String.format("[%s] Server message recv: %s", System.currentTimeMillis(), msg)); //TODO: DEBUG if (msg instanceof ClusterRequest) { ClusterRequest request = (ClusterRequest)msg; - RequestProcessor processor = RequestProcessorRegistry.getProcessor(request.getType()); + // Client ping with its namespace, add to connection manager. + if (request.getType() == ClusterConstants.MSG_TYPE_PING) { + handlePingRequest(ctx, request); + return; + } + + // Pick request processor for request type. + RequestProcessor processor = RequestProcessorProvider.getProcessor(request.getType()); if (processor == null) { - System.out.println("[TokenServerHandler] No processor for request type: " + request.getType()); - writeNoProcessorResponse(ctx, request); + RecordLog.warn("[TokenServerHandler] No processor for request type: " + request.getType()); + writeBadResponse(ctx, request); } else { ClusterResponse response = processor.processRequest(request); writeResponse(ctx, response); @@ -72,7 +83,7 @@ public class TokenServerHandler extends ChannelInboundHandlerAdapter { } } - private void writeNoProcessorResponse(ChannelHandlerContext ctx, ClusterRequest request) { + private void writeBadResponse(ChannelHandlerContext ctx, ClusterRequest request) { ClusterResponse response = new ClusterResponse<>(request.getId(), request.getType(), ClusterConstants.RESPONSE_STATUS_BAD, null); writeResponse(ctx, response); @@ -81,4 +92,24 @@ public class TokenServerHandler extends ChannelInboundHandlerAdapter { private void writeResponse(ChannelHandlerContext ctx, ClusterResponse response) { ctx.writeAndFlush(response); } + + private void handlePingRequest(ChannelHandlerContext ctx, ClusterRequest request) { + if (request.getData() == null || StringUtil.isBlank((String)request.getData())) { + writeBadResponse(ctx, request); + return; + } + String namespace = (String)request.getData(); + String clientAddress = getRemoteAddress(ctx); + // Add the remote namespace to connection manager. + int curCount = ConnectionManager.addConnection(namespace, clientAddress).getConnectedCount(); + int status = ClusterConstants.RESPONSE_STATUS_OK; + ClusterResponse response = new ClusterResponse<>(request.getId(), request.getType(), status, curCount); + writeResponse(ctx, response); + + RecordLog.info("[TokenServerHandler] Client <{0}> registered with namespace <{1}>", clientAddress, namespace); + } + + private String getRemoteAddress(ChannelHandlerContext ctx) { + return ctx.channel().remoteAddress().toString(); + } } diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/init/DefaultClusterServerInitFunc.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/init/DefaultClusterServerInitFunc.java new file mode 100644 index 00000000..82b3bdc3 --- /dev/null +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/init/DefaultClusterServerInitFunc.java @@ -0,0 +1,66 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.cluster.server.init; + +import com.alibaba.csp.sentinel.cluster.ClusterConstants; +import com.alibaba.csp.sentinel.cluster.server.TokenServiceProvider; +import com.alibaba.csp.sentinel.cluster.server.codec.data.FlowRequestDataDecoder; +import com.alibaba.csp.sentinel.cluster.server.codec.data.FlowResponseDataWriter; +import com.alibaba.csp.sentinel.cluster.server.codec.data.ParamFlowRequestDataDecoder; +import com.alibaba.csp.sentinel.cluster.server.codec.data.PingRequestDataDecoder; +import com.alibaba.csp.sentinel.cluster.server.codec.data.PingResponseDataWriter; +import com.alibaba.csp.sentinel.cluster.server.codec.registry.RequestDataDecodeRegistry; +import com.alibaba.csp.sentinel.cluster.server.codec.registry.ResponseDataWriterRegistry; +import com.alibaba.csp.sentinel.cluster.server.processor.RequestProcessorProvider; +import com.alibaba.csp.sentinel.init.InitFunc; +import com.alibaba.csp.sentinel.log.RecordLog; + +/** + * @author Eric Zhao + * @since 1.4.0 + */ +public class DefaultClusterServerInitFunc implements InitFunc { + + @Override + public void init() throws Exception { + initDefaultEntityDecoders(); + initDefaultEntityWriters(); + + initDefaultProcessors(); + + // Eagerly-trigger the SPI pre-load of token service. + TokenServiceProvider.getService(); + + RecordLog.info("[DefaultClusterServerInitFunc] Default entity codec and processors registered"); + } + + private void initDefaultEntityWriters() { + ResponseDataWriterRegistry.addWriter(ClusterConstants.MSG_TYPE_PING, new PingResponseDataWriter()); + ResponseDataWriterRegistry.addWriter(ClusterConstants.MSG_TYPE_FLOW, new FlowResponseDataWriter()); + ResponseDataWriterRegistry.addWriter(ClusterConstants.MSG_TYPE_PARAM_FLOW, new FlowResponseDataWriter()); + } + + private void initDefaultEntityDecoders() { + RequestDataDecodeRegistry.addDecoder(ClusterConstants.MSG_TYPE_PING, new PingRequestDataDecoder()); + RequestDataDecodeRegistry.addDecoder(ClusterConstants.MSG_TYPE_FLOW, new FlowRequestDataDecoder()); + RequestDataDecodeRegistry.addDecoder(ClusterConstants.MSG_TYPE_PARAM_FLOW, new ParamFlowRequestDataDecoder()); + } + + private void initDefaultProcessors() { + // Eagerly-trigger the SPI pre-load. + RequestProcessorProvider.getProcessor(0); + } +} diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/log/ClusterServerStatLogUtil.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/log/ClusterServerStatLogUtil.java new file mode 100644 index 00000000..f97441ca --- /dev/null +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/log/ClusterServerStatLogUtil.java @@ -0,0 +1,56 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.cluster.server.log; + +import com.alibaba.csp.sentinel.eagleeye.EagleEye; +import com.alibaba.csp.sentinel.eagleeye.StatLogger; +import com.alibaba.csp.sentinel.log.LogBase; + +/** + * @author Eric Zhao + * @since 1.4.0 + */ +public final class ClusterServerStatLogUtil { + + private static final String FILE_NAME = "sentinel-server.log"; + + private static StatLogger statLogger; + + static { + String path = LogBase.getLogBaseDir() + FILE_NAME; + + statLogger = EagleEye.statLoggerBuilder("sentinel-cluster-server-record") + .intervalSeconds(1) + .entryDelimiter('|') + .keyDelimiter(',') + .valueDelimiter(',') + .maxEntryCount(5000) + .configLogFilePath(path) + .maxFileSizeMB(300) + .maxBackupIndex(3) + .buildSingleton(); + } + + public static void log(String msg) { + statLogger.stat(msg).count(); + } + + public static void log(String msg, int count) { + statLogger.stat(msg).count(count); + } + + private ClusterServerStatLogUtil() {} +} diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/processor/FlowRequestProcessor.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/processor/FlowRequestProcessor.java index 827960ae..d08a84b3 100644 --- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/processor/FlowRequestProcessor.java +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/processor/FlowRequestProcessor.java @@ -15,8 +15,10 @@ */ package com.alibaba.csp.sentinel.cluster.server.processor; +import com.alibaba.csp.sentinel.cluster.ClusterConstants; import com.alibaba.csp.sentinel.cluster.TokenResult; import com.alibaba.csp.sentinel.cluster.TokenService; +import com.alibaba.csp.sentinel.cluster.annotation.RequestType; import com.alibaba.csp.sentinel.cluster.request.ClusterRequest; import com.alibaba.csp.sentinel.cluster.request.data.FlowRequestData; import com.alibaba.csp.sentinel.cluster.response.ClusterResponse; @@ -27,6 +29,7 @@ import com.alibaba.csp.sentinel.cluster.server.TokenServiceProvider; * @author Eric Zhao * @since 1.4.0 */ +@RequestType(ClusterConstants.MSG_TYPE_FLOW) public class FlowRequestProcessor implements RequestProcessor { @Override diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/processor/ParamFlowRequestProcessor.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/processor/ParamFlowRequestProcessor.java index 37bdf159..888fc2b5 100644 --- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/processor/ParamFlowRequestProcessor.java +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/processor/ParamFlowRequestProcessor.java @@ -17,8 +17,10 @@ package com.alibaba.csp.sentinel.cluster.server.processor; import java.util.Collection; +import com.alibaba.csp.sentinel.cluster.ClusterConstants; import com.alibaba.csp.sentinel.cluster.TokenResult; import com.alibaba.csp.sentinel.cluster.TokenService; +import com.alibaba.csp.sentinel.cluster.annotation.RequestType; import com.alibaba.csp.sentinel.cluster.request.ClusterRequest; import com.alibaba.csp.sentinel.cluster.request.data.ParamFlowRequestData; import com.alibaba.csp.sentinel.cluster.response.ClusterResponse; @@ -29,6 +31,7 @@ import com.alibaba.csp.sentinel.cluster.server.TokenServiceProvider; * @author Eric Zhao * @since 1.4.0 */ +@RequestType(ClusterConstants.MSG_TYPE_PARAM_FLOW) public class ParamFlowRequestProcessor implements RequestProcessor { @Override diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/processor/RequestProcessorRegistry.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/processor/RequestProcessorProvider.java similarity index 55% rename from sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/processor/RequestProcessorRegistry.java rename to sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/processor/RequestProcessorProvider.java index 87aa6bae..bd572f27 100644 --- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/processor/RequestProcessorRegistry.java +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/processor/RequestProcessorProvider.java @@ -16,23 +16,49 @@ package com.alibaba.csp.sentinel.cluster.server.processor; import java.util.Map; +import java.util.ServiceLoader; import java.util.concurrent.ConcurrentHashMap; +import com.alibaba.csp.sentinel.cluster.annotation.RequestType; import com.alibaba.csp.sentinel.util.AssertUtil; /** * @author Eric Zhao * @since 1.4.0 */ -public final class RequestProcessorRegistry { +public final class RequestProcessorProvider { private static final Map PROCESSOR_MAP = new ConcurrentHashMap<>(); + private static final ServiceLoader SERVICE_LOADER = ServiceLoader.load(RequestProcessor.class); + + static { + loadAndInit(); + } + + private static void loadAndInit() { + for (RequestProcessor processor : SERVICE_LOADER) { + Integer type = parseRequestType(processor); + if (type != null) { + PROCESSOR_MAP.put(type, processor); + } + } + } + + private static Integer parseRequestType(RequestProcessor processor) { + RequestType requestType = processor.getClass().getAnnotation(RequestType.class); + if (requestType != null) { + return requestType.value(); + } else { + return null; + } + } + public static RequestProcessor getProcessor(int type) { return PROCESSOR_MAP.get(type); } - public static void addProcessorIfAbsent(int type, RequestProcessor processor) { + static void addProcessorIfAbsent(int type, RequestProcessor processor) { // TBD: use putIfAbsent in JDK 1.8. if (PROCESSOR_MAP.containsKey(type)) { return; @@ -40,10 +66,11 @@ public final class RequestProcessorRegistry { PROCESSOR_MAP.put(type, processor); } - public static void addProcessor(int type, RequestProcessor processor) { + static void addProcessor(int type, RequestProcessor processor) { AssertUtil.notNull(processor, "processor cannot be null"); PROCESSOR_MAP.put(type, processor); } - private RequestProcessorRegistry() {} + + private RequestProcessorProvider() {} } diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/util/ClusterRuleUtil.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/util/ClusterRuleUtil.java index d226b04d..485f2adc 100644 --- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/util/ClusterRuleUtil.java +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/util/ClusterRuleUtil.java @@ -17,6 +17,7 @@ package com.alibaba.csp.sentinel.cluster.server.util; /** * @author Eric Zhao + * @since 1.4.0 */ public final class ClusterRuleUtil { diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.cluster.server.EmbeddedClusterTokenServer b/sentinel-cluster/sentinel-cluster-server-default/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.cluster.server.EmbeddedClusterTokenServer new file mode 100644 index 00000000..ccfb81e4 --- /dev/null +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.cluster.server.EmbeddedClusterTokenServer @@ -0,0 +1 @@ +com.alibaba.csp.sentinel.cluster.server.DefaultEmbeddedTokenServer \ No newline at end of file diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.cluster.server.processor.RequestProcessor b/sentinel-cluster/sentinel-cluster-server-default/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.cluster.server.processor.RequestProcessor new file mode 100644 index 00000000..991b781d --- /dev/null +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.cluster.server.processor.RequestProcessor @@ -0,0 +1,2 @@ +com.alibaba.csp.sentinel.cluster.server.processor.FlowRequestProcessor +com.alibaba.csp.sentinel.cluster.server.processor.ParamFlowRequestProcessor \ No newline at end of file diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.init.InitFunc b/sentinel-cluster/sentinel-cluster-server-default/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.init.InitFunc new file mode 100755 index 00000000..95b8c9db --- /dev/null +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.init.InitFunc @@ -0,0 +1 @@ +com.alibaba.csp.sentinel.cluster.server.init.DefaultClusterServerInitFunc \ No newline at end of file diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/test/java/com/alibaba/csp/sentinel/cluster/ClusterFlowTestUtil.java b/sentinel-cluster/sentinel-cluster-server-default/src/test/java/com/alibaba/csp/sentinel/cluster/ClusterFlowTestUtil.java new file mode 100644 index 00000000..cfb0e7d9 --- /dev/null +++ b/sentinel-cluster/sentinel-cluster-server-default/src/test/java/com/alibaba/csp/sentinel/cluster/ClusterFlowTestUtil.java @@ -0,0 +1,54 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.cluster; + +import static org.junit.Assert.*; + +/** + * Useful for testing clustered flow control. + * Only used for test. + * + * @author Eric Zhao + * @since 1.4.0 + */ +public final class ClusterFlowTestUtil { + + public static void assertResultPass(TokenResult result) { + assertNotNull(result); + assertEquals(TokenResultStatus.OK, (int) result.getStatus()); + } + + public static void assertResultBlock(TokenResult result) { + assertNotNull(result); + assertEquals(TokenResultStatus.BLOCKED, (int) result.getStatus()); + } + + public static void assertResultWait(TokenResult result, int waitInMs) { + assertNotNull(result); + assertEquals(TokenResultStatus.SHOULD_WAIT, (int) result.getStatus()); + assertEquals(waitInMs, result.getWaitInMs()); + } + + public static void sleep(int t) { + try { + Thread.sleep(t); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + private ClusterFlowTestUtil() {} +} diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/test/java/com/alibaba/csp/sentinel/cluster/flow/ClusterFlowCheckerTest.java b/sentinel-cluster/sentinel-cluster-server-default/src/test/java/com/alibaba/csp/sentinel/cluster/flow/ClusterFlowCheckerTest.java new file mode 100644 index 00000000..2b062e69 --- /dev/null +++ b/sentinel-cluster/sentinel-cluster-server-default/src/test/java/com/alibaba/csp/sentinel/cluster/flow/ClusterFlowCheckerTest.java @@ -0,0 +1,79 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.cluster.flow; + +import com.alibaba.csp.sentinel.cluster.TokenResult; +import com.alibaba.csp.sentinel.cluster.flow.statistic.ClusterMetricStatistics; +import com.alibaba.csp.sentinel.cluster.flow.statistic.metric.ClusterMetric; +import com.alibaba.csp.sentinel.slots.block.ClusterRuleConstant; +import com.alibaba.csp.sentinel.slots.block.flow.ClusterFlowConfig; +import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; + +import org.junit.Ignore; +import org.junit.Test; + +import static org.junit.Assert.*; +import static com.alibaba.csp.sentinel.cluster.ClusterFlowTestUtil.*; + +/** + * @author Eric Zhao + * @since 1.4.0 + */ +@Ignore +public class ClusterFlowCheckerTest { + + @Test + public void testAcquireClusterTokenOccupyPass() { + long flowId = 98765L; + final int threshold = 5; + FlowRule clusterRule = new FlowRule("abc") + .setCount(threshold) + .setClusterMode(true) + .setClusterConfig(new ClusterFlowConfig() + .setFlowId(flowId) + .setThresholdType(ClusterRuleConstant.FLOW_THRESHOLD_GLOBAL)); + int sampleCount = 5; + int intervalInMs = 1000; + int bucketLength = intervalInMs / sampleCount; + ClusterMetric metric = new ClusterMetric(sampleCount, intervalInMs); + ClusterMetricStatistics.putMetric(flowId, metric); + + System.out.println(System.currentTimeMillis()); + assertResultPass(tryAcquire(clusterRule, false)); + assertResultPass(tryAcquire(clusterRule, false)); + sleep(bucketLength); + assertResultPass(tryAcquire(clusterRule, false)); + sleep(bucketLength); + assertResultPass(tryAcquire(clusterRule, true)); + assertResultPass(tryAcquire(clusterRule, false)); + assertResultBlock(tryAcquire(clusterRule, true)); + sleep(bucketLength); + assertResultBlock(tryAcquire(clusterRule, false)); + assertResultBlock(tryAcquire(clusterRule, false)); + sleep(bucketLength); + assertResultBlock(tryAcquire(clusterRule, false)); + assertResultWait(tryAcquire(clusterRule, true), bucketLength); + assertResultBlock(tryAcquire(clusterRule, false)); + sleep(bucketLength); + assertResultPass(tryAcquire(clusterRule, false)); + + ClusterMetricStatistics.removeMetric(flowId); + } + + private TokenResult tryAcquire(FlowRule clusterRule, boolean occupy) { + return ClusterFlowChecker.acquireClusterToken(clusterRule, 1, occupy); + } +} \ No newline at end of file