From 6314caab80e758a8f0eaa9e3f1fa157d57e92de1 Mon Sep 17 00:00:00 2001 From: yunfeiyanggzq Date: Fri, 4 Sep 2020 08:56:52 +0800 Subject: [PATCH] Add basic cluster concurrency limiting impl in token server module Signed-off-by: yunfeiyanggzq --- .../sentinel/cluster/ClusterConstants.java | 3 + .../flow/ConcurrentClusterFlowChecker.java | 102 +++++++++++++ .../cluster/flow/DefaultTokenService.java | 31 +++- .../flow/rule/ClusterFlowRuleManager.java | 56 +++---- .../ClusterConcurrentCheckerLogListener.java | 55 +++++++ .../concurrent/CurrentConcurrencyManager.java | 98 +++++++++++++ .../statistic/concurrent/TokenCacheNode.java | 131 +++++++++++++++++ .../concurrent/TokenCacheNodeManager.java | 82 +++++++++++ .../concurrent/expire/ExpireStrategy.java | 26 ++++ .../expire/RegularExpireStrategy.java | 137 ++++++++++++++++++ .../server/connection/ConnectionManager.java | 3 + 11 files changed, 696 insertions(+), 28 deletions(-) create mode 100644 sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/ConcurrentClusterFlowChecker.java create mode 100644 sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/ClusterConcurrentCheckerLogListener.java create mode 100644 sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/CurrentConcurrencyManager.java create mode 100644 sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/TokenCacheNode.java create mode 100644 sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/TokenCacheNodeManager.java create mode 100644 sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/expire/ExpireStrategy.java create mode 100644 sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/expire/RegularExpireStrategy.java diff --git a/sentinel-cluster/sentinel-cluster-common-default/src/main/java/com/alibaba/csp/sentinel/cluster/ClusterConstants.java b/sentinel-cluster/sentinel-cluster-common-default/src/main/java/com/alibaba/csp/sentinel/cluster/ClusterConstants.java index c523413d..0930fda9 100644 --- a/sentinel-cluster/sentinel-cluster-common-default/src/main/java/com/alibaba/csp/sentinel/cluster/ClusterConstants.java +++ b/sentinel-cluster/sentinel-cluster-common-default/src/main/java/com/alibaba/csp/sentinel/cluster/ClusterConstants.java @@ -24,6 +24,9 @@ public final class ClusterConstants { public static final int MSG_TYPE_PING = 0; public static final int MSG_TYPE_FLOW = 1; public static final int MSG_TYPE_PARAM_FLOW = 2; + public static final int MSG_TYPE_CONCURRENT_FLOW_ACQUIRE = 3; + public static final int MSG_TYPE_CONCURRENT_FLOW_RELEASE = 4; + public static final int RESPONSE_STATUS_BAD = -1; public static final int RESPONSE_STATUS_OK = 0; diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/ConcurrentClusterFlowChecker.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/ConcurrentClusterFlowChecker.java new file mode 100644 index 00000000..9083bf92 --- /dev/null +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/ConcurrentClusterFlowChecker.java @@ -0,0 +1,102 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.cluster.flow; + +import com.alibaba.csp.sentinel.cluster.TokenResult; +import com.alibaba.csp.sentinel.cluster.TokenResultStatus; +import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterFlowRuleManager; +import com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.CurrentConcurrencyManager; +import com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.TokenCacheNode; +import com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.TokenCacheNodeManager; +import com.alibaba.csp.sentinel.cluster.server.log.ClusterServerStatLogUtil; +import com.alibaba.csp.sentinel.log.RecordLog; +import com.alibaba.csp.sentinel.slots.block.ClusterRuleConstant; +import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @author yunfeiyanggzq + */ +final public class ConcurrentClusterFlowChecker { + + public static double calcGlobalThreshold(FlowRule rule) { + double count = rule.getCount(); + switch (rule.getClusterConfig().getThresholdType()) { + case ClusterRuleConstant.FLOW_THRESHOLD_GLOBAL: + return count; + case ClusterRuleConstant.FLOW_THRESHOLD_AVG_LOCAL: + default: + int connectedCount = ClusterFlowRuleManager.getConnectedCount(rule.getClusterConfig().getFlowId()); + return count * connectedCount; + } + } + + public static TokenResult acquireConcurrentToken(/*@Valid*/ String clientAddress, FlowRule rule, int acquireCount) { + long flowId = rule.getClusterConfig().getFlowId(); + AtomicInteger nowCalls = CurrentConcurrencyManager.get(flowId); + if (nowCalls == null) { + RecordLog.warn("[ConcurrentClusterFlowChecker] Fail to get nowCalls by flowId<{}>", flowId); + return new TokenResult(TokenResultStatus.FAIL); + } + + // check before enter the lock to improve the efficiency + if (nowCalls.get() + acquireCount > calcGlobalThreshold(rule)) { + ClusterServerStatLogUtil.log("concurrent|block|" + flowId, acquireCount); + return new TokenResult(TokenResultStatus.BLOCKED); + } + + // ensure the atomicity of operations + // lock different nowCalls to improve the efficiency + synchronized (nowCalls) { + // check again whether the request can pass. + if (nowCalls.get() + acquireCount > calcGlobalThreshold(rule)) { + ClusterServerStatLogUtil.log("concurrent|block|" + flowId, acquireCount); + return new TokenResult(TokenResultStatus.BLOCKED); + } else { + nowCalls.getAndAdd(acquireCount); + } + } + ClusterServerStatLogUtil.log("concurrent|pass|" + flowId, acquireCount); + TokenCacheNode node = TokenCacheNode.generateTokenCacheNode(rule, acquireCount, clientAddress); + TokenCacheNodeManager.putTokenCacheNode(node.getTokenId(), node); + TokenResult tokenResult = new TokenResult(TokenResultStatus.OK); + tokenResult.setTokenId(node.getTokenId()); + return tokenResult; + } + + public static TokenResult releaseConcurrentToken(/*@Valid*/ long tokenId) { + TokenCacheNode node = TokenCacheNodeManager.getTokenCacheNode(tokenId); + if (node == null) { + RecordLog.info("[ConcurrentClusterFlowChecker] Token<{}> is already released", tokenId); + return new TokenResult(TokenResultStatus.ALREADY_RELEASE); + } + FlowRule rule = ClusterFlowRuleManager.getFlowRuleById(node.getFlowId()); + if (rule == null) { + RecordLog.info("[ConcurrentClusterFlowChecker] Fail to get rule by flowId<{}>", node.getFlowId()); + return new TokenResult(TokenResultStatus.NO_RULE_EXISTS); + } + if (TokenCacheNodeManager.removeTokenCacheNode(tokenId) == null) { + RecordLog.info("[ConcurrentClusterFlowChecker] Token<{}> is already released for flowId<{}>", tokenId, node.getFlowId()); + return new TokenResult(TokenResultStatus.ALREADY_RELEASE); + } + int acquireCount = node.getAcquireCount(); + AtomicInteger nowCalls = CurrentConcurrencyManager.get(node.getFlowId()); + nowCalls.getAndAdd(-1 * acquireCount); + ClusterServerStatLogUtil.log("concurrent|release|" + rule.getClusterConfig().getFlowId(), acquireCount); + return new TokenResult(TokenResultStatus.RELEASE_OK); + } +} diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/DefaultTokenService.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/DefaultTokenService.java index 2953a135..4e78a50d 100644 --- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/DefaultTokenService.java +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/DefaultTokenService.java @@ -15,16 +15,16 @@ */ package com.alibaba.csp.sentinel.cluster.flow; -import java.util.Collection; - -import com.alibaba.csp.sentinel.cluster.TokenResultStatus; import com.alibaba.csp.sentinel.cluster.TokenResult; +import com.alibaba.csp.sentinel.cluster.TokenResultStatus; import com.alibaba.csp.sentinel.cluster.TokenService; import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterFlowRuleManager; import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterParamFlowRuleManager; import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowRule; +import java.util.Collection; + /** * Default implementation for cluster {@link TokenService}. * @@ -61,10 +61,35 @@ public class DefaultTokenService implements TokenService { return ClusterParamFlowChecker.acquireClusterToken(rule, acquireCount, params); } + @Override + public TokenResult requestConcurrentToken(String clientAddress, Long ruleId, int acquireCount) { + if (notValidRequest(clientAddress, ruleId, acquireCount)) { + return badRequest(); + } + // The rule should be valid. + FlowRule rule = ClusterFlowRuleManager.getFlowRuleById(ruleId); + if (rule == null) { + return new TokenResult(TokenResultStatus.NO_RULE_EXISTS); + } + return ConcurrentClusterFlowChecker.acquireConcurrentToken(clientAddress, rule, acquireCount); + } + + @Override + public void releaseConcurrentToken(Long tokenId) { + if (tokenId == null) { + return; + } + ConcurrentClusterFlowChecker.releaseConcurrentToken(tokenId); + } + private boolean notValidRequest(Long id, int count) { return id == null || id <= 0 || count <= 0; } + private boolean notValidRequest(String address, Long id, int count) { + return address == null || "".equals(address) || id == null || id <= 0 || count <= 0; + } + private TokenResult badRequest() { return new TokenResult(TokenResultStatus.BAD_REQUEST); } diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/rule/ClusterFlowRuleManager.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/rule/ClusterFlowRuleManager.java index 00fa4b88..58982eb6 100644 --- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/rule/ClusterFlowRuleManager.java +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/rule/ClusterFlowRuleManager.java @@ -15,17 +15,10 @@ */ package com.alibaba.csp.sentinel.cluster.flow.rule; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - import com.alibaba.csp.sentinel.cluster.flow.statistic.ClusterMetricStatistics; +import com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.CurrentConcurrencyManager; import com.alibaba.csp.sentinel.cluster.flow.statistic.metric.ClusterMetric; import com.alibaba.csp.sentinel.cluster.server.ServerConstants; -import com.alibaba.csp.sentinel.cluster.server.config.ClusterServerConfigManager; import com.alibaba.csp.sentinel.cluster.server.connection.ConnectionManager; import com.alibaba.csp.sentinel.cluster.server.util.ClusterRuleUtil; import com.alibaba.csp.sentinel.log.RecordLog; @@ -41,6 +34,9 @@ import com.alibaba.csp.sentinel.util.StringUtil; import com.alibaba.csp.sentinel.util.function.Function; import com.alibaba.csp.sentinel.util.function.Predicate; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + /** * Manager for cluster flow rules. * @@ -54,12 +50,12 @@ public final class ClusterFlowRuleManager { * for a specific namespace to do rule management manually. */ public static final Function>> DEFAULT_PROPERTY_SUPPLIER = - new Function>>() { - @Override - public SentinelProperty> apply(String namespace) { - return new DynamicSentinelProperty<>(); - } - }; + new Function>>() { + @Override + public SentinelProperty> apply(String namespace) { + return new DynamicSentinelProperty<>(); + } + }; /** * (flowId, clusterRule) @@ -87,7 +83,7 @@ public final class ClusterFlowRuleManager { * Cluster flow rule property supplier for a specific namespace. */ private static volatile Function>> propertySupplier - = DEFAULT_PROPERTY_SUPPLIER; + = DEFAULT_PROPERTY_SUPPLIER; private static final Object UPDATE_LOCK = new Object(); @@ -118,18 +114,18 @@ public final class ClusterFlowRuleManager { AssertUtil.notEmpty(namespace, "namespace cannot be empty"); if (propertySupplier == null) { RecordLog.warn( - "[ClusterFlowRuleManager] Cluster flow property supplier is absent, cannot register property"); + "[ClusterFlowRuleManager] Cluster flow property supplier is absent, cannot register property"); return; } SentinelProperty> property = propertySupplier.apply(namespace); if (property == null) { RecordLog.warn( - "[ClusterFlowRuleManager] Wrong created property from cluster flow property supplier, ignoring"); + "[ClusterFlowRuleManager] Wrong created property from cluster flow property supplier, ignoring"); return; } synchronized (UPDATE_LOCK) { RecordLog.info("[ClusterFlowRuleManager] Registering new property to cluster flow rule manager" - + " for namespace <{}>", namespace); + + " for namespace <{}>", namespace); registerPropertyInternal(namespace, property); } } @@ -180,7 +176,7 @@ public final class ClusterFlowRuleManager { PROPERTY_MAP.remove(namespace); } RecordLog.info("[ClusterFlowRuleManager] Removing property from cluster flow rule manager" - + " for namespace <{}>", namespace); + + " for namespace <{}>", namespace); } } @@ -253,7 +249,7 @@ public final class ClusterFlowRuleManager { * Load flow rules for a specific namespace. The former rules of the namespace will be replaced. * * @param namespace a valid namespace - * @param rules rule list + * @param rules rule list */ public static void loadRules(String namespace, List rules) { AssertUtil.notEmpty(namespace, "namespace cannot be empty"); @@ -278,6 +274,9 @@ public final class ClusterFlowRuleManager { for (Long flowId : flowIdSet) { FLOW_RULES.remove(flowId); FLOW_NAMESPACE_MAP.remove(flowId); + if (CurrentConcurrencyManager.containsFlowId(flowId)) { + CurrentConcurrencyManager.remove(flowId); + } } flowIdSet.clear(); } else { @@ -293,6 +292,9 @@ public final class ClusterFlowRuleManager { FLOW_RULES.remove(flowId); FLOW_NAMESPACE_MAP.remove(flowId); ClusterMetricStatistics.removeMetric(flowId); + if (CurrentConcurrencyManager.containsFlowId(flowId)) { + CurrentConcurrencyManager.remove(flowId); + } } } oldIdSet.clear(); @@ -335,7 +337,7 @@ public final class ClusterFlowRuleManager { } if (!FlowRuleUtil.isValidRule(rule)) { RecordLog.warn( - "[ClusterFlowRuleManager] Ignoring invalid flow rule when loading new flow rules: " + rule); + "[ClusterFlowRuleManager] Ignoring invalid flow rule when loading new flow rules: " + rule); continue; } if (StringUtil.isBlank(rule.getLimitApp())) { @@ -351,10 +353,13 @@ public final class ClusterFlowRuleManager { ruleMap.put(flowId, rule); FLOW_NAMESPACE_MAP.put(flowId, namespace); flowIdSet.add(flowId); + if (!CurrentConcurrencyManager.containsFlowId(flowId)) { + CurrentConcurrencyManager.put(flowId, 0); + } // Prepare cluster metric from valid flow ID. ClusterMetricStatistics.putMetricIfAbsent(flowId, - new ClusterMetric(clusterConfig.getSampleCount(), clusterConfig.getWindowIntervalMs())); + new ClusterMetric(clusterConfig.getSampleCount(), clusterConfig.getWindowIntervalMs())); } // Cleanup unused cluster metrics. @@ -381,16 +386,17 @@ public final class ClusterFlowRuleManager { public synchronized void configUpdate(List conf) { applyClusterFlowRule(conf, namespace); RecordLog.info("[ClusterFlowRuleManager] Cluster flow rules received for namespace <{}>: {}", - namespace, FLOW_RULES); + namespace, FLOW_RULES); } @Override public synchronized void configLoad(List conf) { applyClusterFlowRule(conf, namespace); RecordLog.info("[ClusterFlowRuleManager] Cluster flow rules loaded for namespace <{}>: {}", - namespace, FLOW_RULES); + namespace, FLOW_RULES); } } - private ClusterFlowRuleManager() {} + private ClusterFlowRuleManager() { + } } diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/ClusterConcurrentCheckerLogListener.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/ClusterConcurrentCheckerLogListener.java new file mode 100644 index 00000000..4f13b403 --- /dev/null +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/ClusterConcurrentCheckerLogListener.java @@ -0,0 +1,55 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent; + +import com.alibaba.csp.sentinel.cluster.flow.ConcurrentClusterFlowChecker; +import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterFlowRuleManager; +import com.alibaba.csp.sentinel.cluster.server.log.ClusterServerStatLogUtil; +import com.alibaba.csp.sentinel.log.RecordLog; +import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; + +import java.util.Set; + +/** + * @author yunfeiyanggzq + */ +public class ClusterConcurrentCheckerLogListener implements Runnable { + @Override + public void run() { + try { + collectInformation(); + } catch (Exception e) { + RecordLog.warn("[ClusterConcurrentCheckerLogListener] Failed to record concurrent flow control regularly", e); + } + } + + private void collectInformation() { + Set keySet = CurrentConcurrencyManager.getConcurrencyMapKeySet(); + for (long flowId : keySet) { + FlowRule rule = ClusterFlowRuleManager.getFlowRuleById(flowId); + if (rule == null || CurrentConcurrencyManager.get(flowId).get() == 0) { + continue; + } + double concurrencyLevel = ConcurrentClusterFlowChecker.calcGlobalThreshold(rule); + String resource = rule.getResource(); + ClusterServerStatLogUtil.log(String.format("concurrent|resource:%s|flowId:%dl|concurrencyLevel:%fl|currentConcurrency", resource, flowId,concurrencyLevel),CurrentConcurrencyManager.get(flowId).get()); + } + if (TokenCacheNodeManager.getSize() != 0){ + ClusterServerStatLogUtil.log("flow|totalTokenSize", TokenCacheNodeManager.getSize()); + } + + } +} diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/CurrentConcurrencyManager.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/CurrentConcurrencyManager.java new file mode 100644 index 00000000..de91ab2f --- /dev/null +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/CurrentConcurrencyManager.java @@ -0,0 +1,98 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent; + +import com.alibaba.csp.sentinel.concurrent.NamedThreadFactory; + +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * We use a ConcurrentHashMap type structure to store nowCalls corresponding to + * rules, where the key is flowId and the value is nowCalls. Because nowCalls may be accessed and + * modified by multiple threads, we consider to design it as an AtomicInteger class . Each newly + * created rule will add a nowCalls object to this map. If the concurrency corresponding to a rule changes, + * we will update the corresponding nowCalls in real time. Each request to obtain a token will increase the nowCalls; + * and the request to release the token will reduce the nowCalls. + * + * @author yunfeiyanggzq + */ +public final class CurrentConcurrencyManager { + /** + * use ConcurrentHashMap to store the nowCalls of rules. + */ + private static final ConcurrentHashMap NOW_CALLS_MAP = new ConcurrentHashMap(); + + @SuppressWarnings("PMD.ThreadPoolCreationRule") + private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1, + new NamedThreadFactory("sentinel-cluster-concurrency-record-task", true)); + + static { + ClusterConcurrentCheckerLogListener logTask = new ClusterConcurrentCheckerLogListener(); + SCHEDULER.scheduleAtFixedRate(logTask, 0, 1, TimeUnit.SECONDS); + } + + /** + * add current concurrency. + */ + public static void addConcurrency(Long flowId, Integer acquireCount) { + + AtomicInteger nowCalls = NOW_CALLS_MAP.get(flowId); + if (nowCalls == null) { + return; + } + nowCalls.getAndAdd(acquireCount); + } + + /** + * get the current concurrency. + */ + public static AtomicInteger get(Long flowId) { + return NOW_CALLS_MAP.get(flowId); + } + + /** + * delete the current concurrency. + */ + public static void remove(Long flowId) { + NOW_CALLS_MAP.remove(flowId); + } + + /** + * put the current concurrency. + */ + public static void put(Long flowId, Integer nowCalls) { + NOW_CALLS_MAP.put(flowId, new AtomicInteger(nowCalls)); + } + + /** + * check flow id. + */ + public static boolean containsFlowId(Long flowId) { + return NOW_CALLS_MAP.containsKey(flowId); + } + + /** + * get NOW_CALLS_MAP. + */ + public static Set getConcurrencyMapKeySet() { + return NOW_CALLS_MAP.keySet(); + } +} diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/TokenCacheNode.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/TokenCacheNode.java new file mode 100644 index 00000000..be734ad3 --- /dev/null +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/TokenCacheNode.java @@ -0,0 +1,131 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent; + +import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; + +import java.util.UUID; + +/** + * We use TokenCacheNodeManager to store the tokenId, whose the underlying storage structure + * is ConcurrentLinkedHashMap, Its storage node is TokenCacheNode. In order to operate the nowCalls value when + * the expired tokenId is deleted regularly, we need to store the flowId in TokenCacheNode. + * + * @author yunfeiyanggzq + */ +public class TokenCacheNode { + /** + * the TokenId of the token + */ + private Long tokenId; + /** + * the client goes offline detection time + */ + private Long clientTimeout; + /** + * the resource called over time detection time + */ + private Long resourceTimeout; + /** + * the flow rule id corresponding to the token + */ + private Long flowId; + /** + * the number this token occupied + */ + private int acquireCount; + + /** + * the address of the client holds the token. + */ + private String clientAddress; + + public TokenCacheNode() { + } + + public static TokenCacheNode generateTokenCacheNode(FlowRule rule, int acquireCount, String clientAddress) { + TokenCacheNode node = new TokenCacheNode(); + // getMostSignificantBits() returns the most significant 64 bits of this UUID's 128 bit value. + // The probability of collision is extremely low. + node.setTokenId(UUID.randomUUID().getMostSignificantBits()); + node.setFlowId(rule.getClusterConfig().getFlowId()); + node.setClientTimeout(rule.getClusterConfig().getClientOfflineTime()); + node.setResourceTimeout(rule.getClusterConfig().getResourceTimeout()); + node.setAcquireCount(acquireCount); + node.setClientAddress(clientAddress); + return node; + } + + public Long getTokenId() { + return tokenId; + } + + public void setTokenId(Long tokenId) { + this.tokenId = tokenId; + } + + public Long getClientTimeout() { + return clientTimeout; + } + + public void setClientTimeout(Long clientTimeout) { + this.clientTimeout = clientTimeout + System.currentTimeMillis(); + } + + public Long getResourceTimeout() { + return this.resourceTimeout; + } + + public void setResourceTimeout(Long resourceTimeout) { + this.resourceTimeout = resourceTimeout + System.currentTimeMillis(); + } + + public Long getFlowId() { + return flowId; + } + + public void setFlowId(Long flowId) { + this.flowId = flowId; + } + + public int getAcquireCount() { + return acquireCount; + } + + public void setAcquireCount(int acquireCount) { + this.acquireCount = acquireCount; + } + + public String getClientAddress() { + return clientAddress; + } + + public void setClientAddress(String clientAddress) { + this.clientAddress = clientAddress; + } + + @Override + public String toString() { + return "TokenCacheNode{" + + "tokenId=" + tokenId + + ", clientTimeout=" + clientTimeout + + ", resourceTimeout=" + resourceTimeout + + ", flowId=" + flowId + + ", acquireCount=" + acquireCount + + ", clientAddress='" + clientAddress + '\'' + + '}'; + } +} diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/TokenCacheNodeManager.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/TokenCacheNodeManager.java new file mode 100644 index 00000000..890f62e6 --- /dev/null +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/TokenCacheNodeManager.java @@ -0,0 +1,82 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent; + +import com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.expire.RegularExpireStrategy; +import com.alibaba.csp.sentinel.util.AssertUtil; +import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap; +import com.googlecode.concurrentlinkedhashmap.Weighers; + +import java.util.Set; + +/** + * @author yunfeiyanggzq + */ +public class TokenCacheNodeManager { + private static ConcurrentLinkedHashMap TOKEN_CACHE_NODE_MAP; + + + private static final int DEFAULT_CONCURRENCY_LEVEL = 16; + private static final int DEFAULT_CAPACITY = Integer.MAX_VALUE; + + static { + prepare(DEFAULT_CONCURRENCY_LEVEL, DEFAULT_CAPACITY); + } + + public static void prepare(int concurrencyLevel, int maximumWeightedCapacity) { + AssertUtil.isTrue(concurrencyLevel > 0, "concurrencyLevel must be positive"); + AssertUtil.isTrue(maximumWeightedCapacity > 0, "maximumWeightedCapacity must be positive"); + + TOKEN_CACHE_NODE_MAP = new ConcurrentLinkedHashMap.Builder() + .concurrencyLevel(concurrencyLevel) + .maximumWeightedCapacity(maximumWeightedCapacity) + .weigher(Weighers.singleton()) + .build(); + // Start the task of regularly clearing expired keys + RegularExpireStrategy strategy = new RegularExpireStrategy(TOKEN_CACHE_NODE_MAP); + strategy.startClearTaskRegularly(); + } + + + public static TokenCacheNode getTokenCacheNode(long tokenId) { + //use getQuietly to prevent disorder + return TOKEN_CACHE_NODE_MAP.getQuietly(tokenId); + } + + public static void putTokenCacheNode(long tokenId, TokenCacheNode cacheNode) { + TOKEN_CACHE_NODE_MAP.put(tokenId, cacheNode); + } + + public static boolean isContainsTokenId(long tokenId) { + return TOKEN_CACHE_NODE_MAP.containsKey(tokenId); + } + + public static TokenCacheNode removeTokenCacheNode(long tokenId) { + return TOKEN_CACHE_NODE_MAP.remove(tokenId); + } + + public static int getSize() { + return TOKEN_CACHE_NODE_MAP.size(); + } + + public static Set getCacheKeySet() { + return TOKEN_CACHE_NODE_MAP.keySet(); + } + + public static boolean validToken(TokenCacheNode cacheNode) { + return cacheNode.getTokenId() != null && cacheNode.getFlowId() != null && cacheNode.getClientTimeout() >= 0 && cacheNode.getResourceTimeout() >= 0; + } +} diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/expire/ExpireStrategy.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/expire/ExpireStrategy.java new file mode 100644 index 00000000..f26710af --- /dev/null +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/expire/ExpireStrategy.java @@ -0,0 +1,26 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.expire; + +/** + * @author yunfeiyagnggzq + */ +public interface ExpireStrategy { + /** + * clean expired token regularly. + */ + void startClearTaskRegularly(); +} diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/expire/RegularExpireStrategy.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/expire/RegularExpireStrategy.java new file mode 100644 index 00000000..5222e7b3 --- /dev/null +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/expire/RegularExpireStrategy.java @@ -0,0 +1,137 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.expire; + +import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterFlowRuleManager; +import com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.CurrentConcurrencyManager; +import com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.TokenCacheNode; +import com.alibaba.csp.sentinel.cluster.server.connection.ConnectionManager; +import com.alibaba.csp.sentinel.concurrent.NamedThreadFactory; +import com.alibaba.csp.sentinel.log.RecordLog; +import com.alibaba.csp.sentinel.util.AssertUtil; +import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * We need to consider the situation that the token client goes offline + * or the resource call times out. It can be detected by sourceTimeout + * and clientTimeout. The resource calls timeout detection is triggered + * on the token client. If the resource is called over time, the token + * client will request the token server to release token or refresh the + * token. The client offline detection is triggered on the token server. + * If the offline detection time is exceeded, token server will trigger + * the detection token client’s status. If the token client is offline, + * token server will delete the corresponding tokenId. If it is not offline, + * token server will continue to save it. + * + * @author yunfeiyanggzq + **/ +public class RegularExpireStrategy implements ExpireStrategy { + /** + * The max number of token deleted each time, + * the number of expired key-value pairs deleted each time does not exceed this number + */ + private long executeCount = 1000; + /** + * Length of time for task execution + */ + private long executeDuration = 800; + /** + * Frequency of task execution + */ + private long executeRate = 1000; + /** + * the local cache of tokenId + */ + private ConcurrentLinkedHashMap localCache; + + @SuppressWarnings("PMD.ThreadPoolCreationRule") + private static ScheduledExecutorService executor = Executors.newScheduledThreadPool(1, + new NamedThreadFactory("regular clear expired token thread")); + + + public RegularExpireStrategy(ConcurrentLinkedHashMap localCache) { + AssertUtil.isTrue(localCache != null, " local cache can't be null"); + this.localCache = localCache; + } + + @Override + public void startClearTaskRegularly() { + executor.scheduleAtFixedRate(new ClearExpiredTokenTask(), 0, executeRate, TimeUnit.MILLISECONDS); + } + + private class ClearExpiredTokenTask implements Runnable { + @Override + public void run() { + try { + clearToken(); + } catch (Throwable e) { + e.printStackTrace(); + RecordLog.warn("[RegularExpireStrategy] undefined throwable during clear token: ", e); + } + } + } + + private void clearToken() { + long start = System.currentTimeMillis(); + List keyList = new ArrayList<>(localCache.keySet()); + for (int i = 0; i < executeCount && i < keyList.size(); i++) { + // time out execution exit + if (System.currentTimeMillis() - start > executeDuration) { + RecordLog.info("[RegularExpireStrategy] End the process of expired token detection because of execute time is more than executeDuration:", executeDuration); + break; + } + Long key = keyList.get(i); + TokenCacheNode node = localCache.get(key); + if (node == null) { + continue; + } + + // remove the token whose client is offline and saved for more than clientTimeout + if (!ConnectionManager.isClientOnline(node.getClientAddress()) && node.getClientTimeout() - System.currentTimeMillis() < 0) { + removeToken(key, node); + RecordLog.info("[RegularExpireStrategy] Delete the expired token<{}> because of client offline for ruleId<{}>", node.getTokenId(), node.getFlowId()); + continue; + } + + // If we find that token's save time is more than 2 times of the client's call resource timeout time, + // the token will be determined to timeout. + long resourceTimeout = ClusterFlowRuleManager.getFlowRuleById(node.getFlowId()).getClusterConfig().getResourceTimeout(); + if (System.currentTimeMillis() - node.getResourceTimeout() > resourceTimeout) { + removeToken(key, node); + RecordLog.info("[RegularExpireStrategy] Delete the expired token<{}> because of resource timeout for ruleId<{}>", node.getTokenId(), node.getFlowId()); + } + } + } + + private void removeToken(long tokenId, TokenCacheNode node) { + if (localCache.remove(tokenId) == null) { + RecordLog.info("[RegularExpireStrategy] Token<{}> is already released for ruleId<{}>", tokenId, node.getFlowId()); + return; + } + AtomicInteger nowCalls = CurrentConcurrencyManager.get(node.getFlowId()); + if (nowCalls == null) { + return; + } + nowCalls.getAndAdd(node.getAcquireCount() * -1); + } +} diff --git a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/ConnectionManager.java b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/ConnectionManager.java index 5f6a42d1..aa3a87be 100644 --- a/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/ConnectionManager.java +++ b/sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/ConnectionManager.java @@ -112,6 +112,9 @@ public final class ConnectionManager { return group; } + public static boolean isClientOnline(String address){ + return NAMESPACE_MAP.containsKey(address); + } static void clear() { CONN_MAP.clear(); NAMESPACE_MAP.clear();