ソースを参照

Add basic cluster concurrency limiting impl in token server module

Signed-off-by: yunfeiyanggzq <yunfeiyang@buaa.edu.cn>
master
yunfeiyanggzq Eric Zhao 4年前
コミット
6314caab80
11個のファイルの変更696行の追加28行の削除
  1. +3
    -0
      sentinel-cluster/sentinel-cluster-common-default/src/main/java/com/alibaba/csp/sentinel/cluster/ClusterConstants.java
  2. +102
    -0
      sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/ConcurrentClusterFlowChecker.java
  3. +28
    -3
      sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/DefaultTokenService.java
  4. +31
    -25
      sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/rule/ClusterFlowRuleManager.java
  5. +55
    -0
      sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/ClusterConcurrentCheckerLogListener.java
  6. +98
    -0
      sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/CurrentConcurrencyManager.java
  7. +131
    -0
      sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/TokenCacheNode.java
  8. +82
    -0
      sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/TokenCacheNodeManager.java
  9. +26
    -0
      sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/expire/ExpireStrategy.java
  10. +137
    -0
      sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/expire/RegularExpireStrategy.java
  11. +3
    -0
      sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/ConnectionManager.java

+ 3
- 0
sentinel-cluster/sentinel-cluster-common-default/src/main/java/com/alibaba/csp/sentinel/cluster/ClusterConstants.java ファイルの表示

@@ -24,6 +24,9 @@ public final class ClusterConstants {
public static final int MSG_TYPE_PING = 0;
public static final int MSG_TYPE_FLOW = 1;
public static final int MSG_TYPE_PARAM_FLOW = 2;
public static final int MSG_TYPE_CONCURRENT_FLOW_ACQUIRE = 3;
public static final int MSG_TYPE_CONCURRENT_FLOW_RELEASE = 4;


public static final int RESPONSE_STATUS_BAD = -1;
public static final int RESPONSE_STATUS_OK = 0;


+ 102
- 0
sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/ConcurrentClusterFlowChecker.java ファイルの表示

@@ -0,0 +1,102 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.flow;

import com.alibaba.csp.sentinel.cluster.TokenResult;
import com.alibaba.csp.sentinel.cluster.TokenResultStatus;
import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterFlowRuleManager;
import com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.CurrentConcurrencyManager;
import com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.TokenCacheNode;
import com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.TokenCacheNodeManager;
import com.alibaba.csp.sentinel.cluster.server.log.ClusterServerStatLogUtil;
import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.slots.block.ClusterRuleConstant;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;

import java.util.concurrent.atomic.AtomicInteger;

/**
* @author yunfeiyanggzq
*/
final public class ConcurrentClusterFlowChecker {

public static double calcGlobalThreshold(FlowRule rule) {
double count = rule.getCount();
switch (rule.getClusterConfig().getThresholdType()) {
case ClusterRuleConstant.FLOW_THRESHOLD_GLOBAL:
return count;
case ClusterRuleConstant.FLOW_THRESHOLD_AVG_LOCAL:
default:
int connectedCount = ClusterFlowRuleManager.getConnectedCount(rule.getClusterConfig().getFlowId());
return count * connectedCount;
}
}

public static TokenResult acquireConcurrentToken(/*@Valid*/ String clientAddress, FlowRule rule, int acquireCount) {
long flowId = rule.getClusterConfig().getFlowId();
AtomicInteger nowCalls = CurrentConcurrencyManager.get(flowId);
if (nowCalls == null) {
RecordLog.warn("[ConcurrentClusterFlowChecker] Fail to get nowCalls by flowId<{}>", flowId);
return new TokenResult(TokenResultStatus.FAIL);
}

// check before enter the lock to improve the efficiency
if (nowCalls.get() + acquireCount > calcGlobalThreshold(rule)) {
ClusterServerStatLogUtil.log("concurrent|block|" + flowId, acquireCount);
return new TokenResult(TokenResultStatus.BLOCKED);
}

// ensure the atomicity of operations
// lock different nowCalls to improve the efficiency
synchronized (nowCalls) {
// check again whether the request can pass.
if (nowCalls.get() + acquireCount > calcGlobalThreshold(rule)) {
ClusterServerStatLogUtil.log("concurrent|block|" + flowId, acquireCount);
return new TokenResult(TokenResultStatus.BLOCKED);
} else {
nowCalls.getAndAdd(acquireCount);
}
}
ClusterServerStatLogUtil.log("concurrent|pass|" + flowId, acquireCount);
TokenCacheNode node = TokenCacheNode.generateTokenCacheNode(rule, acquireCount, clientAddress);
TokenCacheNodeManager.putTokenCacheNode(node.getTokenId(), node);
TokenResult tokenResult = new TokenResult(TokenResultStatus.OK);
tokenResult.setTokenId(node.getTokenId());
return tokenResult;
}

public static TokenResult releaseConcurrentToken(/*@Valid*/ long tokenId) {
TokenCacheNode node = TokenCacheNodeManager.getTokenCacheNode(tokenId);
if (node == null) {
RecordLog.info("[ConcurrentClusterFlowChecker] Token<{}> is already released", tokenId);
return new TokenResult(TokenResultStatus.ALREADY_RELEASE);
}
FlowRule rule = ClusterFlowRuleManager.getFlowRuleById(node.getFlowId());
if (rule == null) {
RecordLog.info("[ConcurrentClusterFlowChecker] Fail to get rule by flowId<{}>", node.getFlowId());
return new TokenResult(TokenResultStatus.NO_RULE_EXISTS);
}
if (TokenCacheNodeManager.removeTokenCacheNode(tokenId) == null) {
RecordLog.info("[ConcurrentClusterFlowChecker] Token<{}> is already released for flowId<{}>", tokenId, node.getFlowId());
return new TokenResult(TokenResultStatus.ALREADY_RELEASE);
}
int acquireCount = node.getAcquireCount();
AtomicInteger nowCalls = CurrentConcurrencyManager.get(node.getFlowId());
nowCalls.getAndAdd(-1 * acquireCount);
ClusterServerStatLogUtil.log("concurrent|release|" + rule.getClusterConfig().getFlowId(), acquireCount);
return new TokenResult(TokenResultStatus.RELEASE_OK);
}
}

+ 28
- 3
sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/DefaultTokenService.java ファイルの表示

@@ -15,16 +15,16 @@
*/
package com.alibaba.csp.sentinel.cluster.flow;

import java.util.Collection;

import com.alibaba.csp.sentinel.cluster.TokenResultStatus;
import com.alibaba.csp.sentinel.cluster.TokenResult;
import com.alibaba.csp.sentinel.cluster.TokenResultStatus;
import com.alibaba.csp.sentinel.cluster.TokenService;
import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterFlowRuleManager;
import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterParamFlowRuleManager;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowRule;

import java.util.Collection;

/**
* Default implementation for cluster {@link TokenService}.
*
@@ -61,10 +61,35 @@ public class DefaultTokenService implements TokenService {
return ClusterParamFlowChecker.acquireClusterToken(rule, acquireCount, params);
}

@Override
public TokenResult requestConcurrentToken(String clientAddress, Long ruleId, int acquireCount) {
if (notValidRequest(clientAddress, ruleId, acquireCount)) {
return badRequest();
}
// The rule should be valid.
FlowRule rule = ClusterFlowRuleManager.getFlowRuleById(ruleId);
if (rule == null) {
return new TokenResult(TokenResultStatus.NO_RULE_EXISTS);
}
return ConcurrentClusterFlowChecker.acquireConcurrentToken(clientAddress, rule, acquireCount);
}

@Override
public void releaseConcurrentToken(Long tokenId) {
if (tokenId == null) {
return;
}
ConcurrentClusterFlowChecker.releaseConcurrentToken(tokenId);
}

private boolean notValidRequest(Long id, int count) {
return id == null || id <= 0 || count <= 0;
}

private boolean notValidRequest(String address, Long id, int count) {
return address == null || "".equals(address) || id == null || id <= 0 || count <= 0;
}

private TokenResult badRequest() {
return new TokenResult(TokenResultStatus.BAD_REQUEST);
}


+ 31
- 25
sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/rule/ClusterFlowRuleManager.java ファイルの表示

@@ -15,17 +15,10 @@
*/
package com.alibaba.csp.sentinel.cluster.flow.rule;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import com.alibaba.csp.sentinel.cluster.flow.statistic.ClusterMetricStatistics;
import com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.CurrentConcurrencyManager;
import com.alibaba.csp.sentinel.cluster.flow.statistic.metric.ClusterMetric;
import com.alibaba.csp.sentinel.cluster.server.ServerConstants;
import com.alibaba.csp.sentinel.cluster.server.config.ClusterServerConfigManager;
import com.alibaba.csp.sentinel.cluster.server.connection.ConnectionManager;
import com.alibaba.csp.sentinel.cluster.server.util.ClusterRuleUtil;
import com.alibaba.csp.sentinel.log.RecordLog;
@@ -41,6 +34,9 @@ import com.alibaba.csp.sentinel.util.StringUtil;
import com.alibaba.csp.sentinel.util.function.Function;
import com.alibaba.csp.sentinel.util.function.Predicate;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

/**
* Manager for cluster flow rules.
*
@@ -54,12 +50,12 @@ public final class ClusterFlowRuleManager {
* for a specific namespace to do rule management manually.
*/
public static final Function<String, SentinelProperty<List<FlowRule>>> DEFAULT_PROPERTY_SUPPLIER =
new Function<String, SentinelProperty<List<FlowRule>>>() {
@Override
public SentinelProperty<List<FlowRule>> apply(String namespace) {
return new DynamicSentinelProperty<>();
}
};
new Function<String, SentinelProperty<List<FlowRule>>>() {
@Override
public SentinelProperty<List<FlowRule>> apply(String namespace) {
return new DynamicSentinelProperty<>();
}
};

/**
* (flowId, clusterRule)
@@ -87,7 +83,7 @@ public final class ClusterFlowRuleManager {
* Cluster flow rule property supplier for a specific namespace.
*/
private static volatile Function<String, SentinelProperty<List<FlowRule>>> propertySupplier
= DEFAULT_PROPERTY_SUPPLIER;
= DEFAULT_PROPERTY_SUPPLIER;

private static final Object UPDATE_LOCK = new Object();

@@ -118,18 +114,18 @@ public final class ClusterFlowRuleManager {
AssertUtil.notEmpty(namespace, "namespace cannot be empty");
if (propertySupplier == null) {
RecordLog.warn(
"[ClusterFlowRuleManager] Cluster flow property supplier is absent, cannot register property");
"[ClusterFlowRuleManager] Cluster flow property supplier is absent, cannot register property");
return;
}
SentinelProperty<List<FlowRule>> property = propertySupplier.apply(namespace);
if (property == null) {
RecordLog.warn(
"[ClusterFlowRuleManager] Wrong created property from cluster flow property supplier, ignoring");
"[ClusterFlowRuleManager] Wrong created property from cluster flow property supplier, ignoring");
return;
}
synchronized (UPDATE_LOCK) {
RecordLog.info("[ClusterFlowRuleManager] Registering new property to cluster flow rule manager"
+ " for namespace <{}>", namespace);
+ " for namespace <{}>", namespace);
registerPropertyInternal(namespace, property);
}
}
@@ -180,7 +176,7 @@ public final class ClusterFlowRuleManager {
PROPERTY_MAP.remove(namespace);
}
RecordLog.info("[ClusterFlowRuleManager] Removing property from cluster flow rule manager"
+ " for namespace <{}>", namespace);
+ " for namespace <{}>", namespace);
}
}

@@ -253,7 +249,7 @@ public final class ClusterFlowRuleManager {
* Load flow rules for a specific namespace. The former rules of the namespace will be replaced.
*
* @param namespace a valid namespace
* @param rules rule list
* @param rules rule list
*/
public static void loadRules(String namespace, List<FlowRule> rules) {
AssertUtil.notEmpty(namespace, "namespace cannot be empty");
@@ -278,6 +274,9 @@ public final class ClusterFlowRuleManager {
for (Long flowId : flowIdSet) {
FLOW_RULES.remove(flowId);
FLOW_NAMESPACE_MAP.remove(flowId);
if (CurrentConcurrencyManager.containsFlowId(flowId)) {
CurrentConcurrencyManager.remove(flowId);
}
}
flowIdSet.clear();
} else {
@@ -293,6 +292,9 @@ public final class ClusterFlowRuleManager {
FLOW_RULES.remove(flowId);
FLOW_NAMESPACE_MAP.remove(flowId);
ClusterMetricStatistics.removeMetric(flowId);
if (CurrentConcurrencyManager.containsFlowId(flowId)) {
CurrentConcurrencyManager.remove(flowId);
}
}
}
oldIdSet.clear();
@@ -335,7 +337,7 @@ public final class ClusterFlowRuleManager {
}
if (!FlowRuleUtil.isValidRule(rule)) {
RecordLog.warn(
"[ClusterFlowRuleManager] Ignoring invalid flow rule when loading new flow rules: " + rule);
"[ClusterFlowRuleManager] Ignoring invalid flow rule when loading new flow rules: " + rule);
continue;
}
if (StringUtil.isBlank(rule.getLimitApp())) {
@@ -351,10 +353,13 @@ public final class ClusterFlowRuleManager {
ruleMap.put(flowId, rule);
FLOW_NAMESPACE_MAP.put(flowId, namespace);
flowIdSet.add(flowId);
if (!CurrentConcurrencyManager.containsFlowId(flowId)) {
CurrentConcurrencyManager.put(flowId, 0);
}

// Prepare cluster metric from valid flow ID.
ClusterMetricStatistics.putMetricIfAbsent(flowId,
new ClusterMetric(clusterConfig.getSampleCount(), clusterConfig.getWindowIntervalMs()));
new ClusterMetric(clusterConfig.getSampleCount(), clusterConfig.getWindowIntervalMs()));
}

// Cleanup unused cluster metrics.
@@ -381,16 +386,17 @@ public final class ClusterFlowRuleManager {
public synchronized void configUpdate(List<FlowRule> conf) {
applyClusterFlowRule(conf, namespace);
RecordLog.info("[ClusterFlowRuleManager] Cluster flow rules received for namespace <{}>: {}",
namespace, FLOW_RULES);
namespace, FLOW_RULES);
}

@Override
public synchronized void configLoad(List<FlowRule> conf) {
applyClusterFlowRule(conf, namespace);
RecordLog.info("[ClusterFlowRuleManager] Cluster flow rules loaded for namespace <{}>: {}",
namespace, FLOW_RULES);
namespace, FLOW_RULES);
}
}

private ClusterFlowRuleManager() {}
private ClusterFlowRuleManager() {
}
}

+ 55
- 0
sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/ClusterConcurrentCheckerLogListener.java ファイルの表示

@@ -0,0 +1,55 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent;

import com.alibaba.csp.sentinel.cluster.flow.ConcurrentClusterFlowChecker;
import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterFlowRuleManager;
import com.alibaba.csp.sentinel.cluster.server.log.ClusterServerStatLogUtil;
import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;

import java.util.Set;

/**
* @author yunfeiyanggzq
*/
public class ClusterConcurrentCheckerLogListener implements Runnable {
@Override
public void run() {
try {
collectInformation();
} catch (Exception e) {
RecordLog.warn("[ClusterConcurrentCheckerLogListener] Failed to record concurrent flow control regularly", e);
}
}

private void collectInformation() {
Set<Long> keySet = CurrentConcurrencyManager.getConcurrencyMapKeySet();
for (long flowId : keySet) {
FlowRule rule = ClusterFlowRuleManager.getFlowRuleById(flowId);
if (rule == null || CurrentConcurrencyManager.get(flowId).get() == 0) {
continue;
}
double concurrencyLevel = ConcurrentClusterFlowChecker.calcGlobalThreshold(rule);
String resource = rule.getResource();
ClusterServerStatLogUtil.log(String.format("concurrent|resource:%s|flowId:%dl|concurrencyLevel:%fl|currentConcurrency", resource, flowId,concurrencyLevel),CurrentConcurrencyManager.get(flowId).get());
}
if (TokenCacheNodeManager.getSize() != 0){
ClusterServerStatLogUtil.log("flow|totalTokenSize", TokenCacheNodeManager.getSize());
}

}
}

+ 98
- 0
sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/CurrentConcurrencyManager.java ファイルの表示

@@ -0,0 +1,98 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent;

import com.alibaba.csp.sentinel.concurrent.NamedThreadFactory;

import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
* We use a ConcurrentHashMap<long, AtomicInteger> type structure to store nowCalls corresponding to
* rules, where the key is flowId and the value is nowCalls. Because nowCalls may be accessed and
* modified by multiple threads, we consider to design it as an AtomicInteger class . Each newly
* created rule will add a nowCalls object to this map. If the concurrency corresponding to a rule changes,
* we will update the corresponding nowCalls in real time. Each request to obtain a token will increase the nowCalls;
* and the request to release the token will reduce the nowCalls.
*
* @author yunfeiyanggzq
*/
public final class CurrentConcurrencyManager {
/**
* use ConcurrentHashMap to store the nowCalls of rules.
*/
private static final ConcurrentHashMap<Long, AtomicInteger> NOW_CALLS_MAP = new ConcurrentHashMap<Long, AtomicInteger>();

@SuppressWarnings("PMD.ThreadPoolCreationRule")
private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1,
new NamedThreadFactory("sentinel-cluster-concurrency-record-task", true));

static {
ClusterConcurrentCheckerLogListener logTask = new ClusterConcurrentCheckerLogListener();
SCHEDULER.scheduleAtFixedRate(logTask, 0, 1, TimeUnit.SECONDS);
}

/**
* add current concurrency.
*/
public static void addConcurrency(Long flowId, Integer acquireCount) {

AtomicInteger nowCalls = NOW_CALLS_MAP.get(flowId);
if (nowCalls == null) {
return;
}
nowCalls.getAndAdd(acquireCount);
}

/**
* get the current concurrency.
*/
public static AtomicInteger get(Long flowId) {
return NOW_CALLS_MAP.get(flowId);
}

/**
* delete the current concurrency.
*/
public static void remove(Long flowId) {
NOW_CALLS_MAP.remove(flowId);
}

/**
* put the current concurrency.
*/
public static void put(Long flowId, Integer nowCalls) {
NOW_CALLS_MAP.put(flowId, new AtomicInteger(nowCalls));
}

/**
* check flow id.
*/
public static boolean containsFlowId(Long flowId) {
return NOW_CALLS_MAP.containsKey(flowId);
}

/**
* get NOW_CALLS_MAP.
*/
public static Set<Long> getConcurrencyMapKeySet() {
return NOW_CALLS_MAP.keySet();
}
}

+ 131
- 0
sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/TokenCacheNode.java ファイルの表示

@@ -0,0 +1,131 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent;

import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;

import java.util.UUID;

/**
* We use TokenCacheNodeManager to store the tokenId, whose the underlying storage structure
* is ConcurrentLinkedHashMap, Its storage node is TokenCacheNode. In order to operate the nowCalls value when
* the expired tokenId is deleted regularly, we need to store the flowId in TokenCacheNode.
*
* @author yunfeiyanggzq
*/
public class TokenCacheNode {
/**
* the TokenId of the token
*/
private Long tokenId;
/**
* the client goes offline detection time
*/
private Long clientTimeout;
/**
* the resource called over time detection time
*/
private Long resourceTimeout;
/**
* the flow rule id corresponding to the token
*/
private Long flowId;
/**
* the number this token occupied
*/
private int acquireCount;

/**
* the address of the client holds the token.
*/
private String clientAddress;

public TokenCacheNode() {
}

public static TokenCacheNode generateTokenCacheNode(FlowRule rule, int acquireCount, String clientAddress) {
TokenCacheNode node = new TokenCacheNode();
// getMostSignificantBits() returns the most significant 64 bits of this UUID's 128 bit value.
// The probability of collision is extremely low.
node.setTokenId(UUID.randomUUID().getMostSignificantBits());
node.setFlowId(rule.getClusterConfig().getFlowId());
node.setClientTimeout(rule.getClusterConfig().getClientOfflineTime());
node.setResourceTimeout(rule.getClusterConfig().getResourceTimeout());
node.setAcquireCount(acquireCount);
node.setClientAddress(clientAddress);
return node;
}

public Long getTokenId() {
return tokenId;
}

public void setTokenId(Long tokenId) {
this.tokenId = tokenId;
}

public Long getClientTimeout() {
return clientTimeout;
}

public void setClientTimeout(Long clientTimeout) {
this.clientTimeout = clientTimeout + System.currentTimeMillis();
}

public Long getResourceTimeout() {
return this.resourceTimeout;
}

public void setResourceTimeout(Long resourceTimeout) {
this.resourceTimeout = resourceTimeout + System.currentTimeMillis();
}

public Long getFlowId() {
return flowId;
}

public void setFlowId(Long flowId) {
this.flowId = flowId;
}

public int getAcquireCount() {
return acquireCount;
}

public void setAcquireCount(int acquireCount) {
this.acquireCount = acquireCount;
}

public String getClientAddress() {
return clientAddress;
}

public void setClientAddress(String clientAddress) {
this.clientAddress = clientAddress;
}

@Override
public String toString() {
return "TokenCacheNode{" +
"tokenId=" + tokenId +
", clientTimeout=" + clientTimeout +
", resourceTimeout=" + resourceTimeout +
", flowId=" + flowId +
", acquireCount=" + acquireCount +
", clientAddress='" + clientAddress + '\'' +
'}';
}
}

+ 82
- 0
sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/TokenCacheNodeManager.java ファイルの表示

@@ -0,0 +1,82 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent;

import com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.expire.RegularExpireStrategy;
import com.alibaba.csp.sentinel.util.AssertUtil;
import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
import com.googlecode.concurrentlinkedhashmap.Weighers;

import java.util.Set;

/**
* @author yunfeiyanggzq
*/
public class TokenCacheNodeManager {
private static ConcurrentLinkedHashMap<Long, TokenCacheNode> TOKEN_CACHE_NODE_MAP;


private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
private static final int DEFAULT_CAPACITY = Integer.MAX_VALUE;

static {
prepare(DEFAULT_CONCURRENCY_LEVEL, DEFAULT_CAPACITY);
}

public static void prepare(int concurrencyLevel, int maximumWeightedCapacity) {
AssertUtil.isTrue(concurrencyLevel > 0, "concurrencyLevel must be positive");
AssertUtil.isTrue(maximumWeightedCapacity > 0, "maximumWeightedCapacity must be positive");

TOKEN_CACHE_NODE_MAP = new ConcurrentLinkedHashMap.Builder<Long, TokenCacheNode>()
.concurrencyLevel(concurrencyLevel)
.maximumWeightedCapacity(maximumWeightedCapacity)
.weigher(Weighers.singleton())
.build();
// Start the task of regularly clearing expired keys
RegularExpireStrategy strategy = new RegularExpireStrategy(TOKEN_CACHE_NODE_MAP);
strategy.startClearTaskRegularly();
}


public static TokenCacheNode getTokenCacheNode(long tokenId) {
//use getQuietly to prevent disorder
return TOKEN_CACHE_NODE_MAP.getQuietly(tokenId);
}

public static void putTokenCacheNode(long tokenId, TokenCacheNode cacheNode) {
TOKEN_CACHE_NODE_MAP.put(tokenId, cacheNode);
}

public static boolean isContainsTokenId(long tokenId) {
return TOKEN_CACHE_NODE_MAP.containsKey(tokenId);
}

public static TokenCacheNode removeTokenCacheNode(long tokenId) {
return TOKEN_CACHE_NODE_MAP.remove(tokenId);
}

public static int getSize() {
return TOKEN_CACHE_NODE_MAP.size();
}

public static Set<Long> getCacheKeySet() {
return TOKEN_CACHE_NODE_MAP.keySet();
}

public static boolean validToken(TokenCacheNode cacheNode) {
return cacheNode.getTokenId() != null && cacheNode.getFlowId() != null && cacheNode.getClientTimeout() >= 0 && cacheNode.getResourceTimeout() >= 0;
}
}

+ 26
- 0
sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/expire/ExpireStrategy.java ファイルの表示

@@ -0,0 +1,26 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.expire;

/**
* @author yunfeiyagnggzq
*/
public interface ExpireStrategy {
/**
* clean expired token regularly.
*/
void startClearTaskRegularly();
}

+ 137
- 0
sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/concurrent/expire/RegularExpireStrategy.java ファイルの表示

@@ -0,0 +1,137 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.expire;

import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterFlowRuleManager;
import com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.CurrentConcurrencyManager;
import com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.TokenCacheNode;
import com.alibaba.csp.sentinel.cluster.server.connection.ConnectionManager;
import com.alibaba.csp.sentinel.concurrent.NamedThreadFactory;
import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.util.AssertUtil;
import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
* We need to consider the situation that the token client goes offline
* or the resource call times out. It can be detected by sourceTimeout
* and clientTimeout. The resource calls timeout detection is triggered
* on the token client. If the resource is called over time, the token
* client will request the token server to release token or refresh the
* token. The client offline detection is triggered on the token server.
* If the offline detection time is exceeded, token server will trigger
* the detection token client’s status. If the token client is offline,
* token server will delete the corresponding tokenId. If it is not offline,
* token server will continue to save it.
*
* @author yunfeiyanggzq
**/
public class RegularExpireStrategy implements ExpireStrategy {
/**
* The max number of token deleted each time,
* the number of expired key-value pairs deleted each time does not exceed this number
*/
private long executeCount = 1000;
/**
* Length of time for task execution
*/
private long executeDuration = 800;
/**
* Frequency of task execution
*/
private long executeRate = 1000;
/**
* the local cache of tokenId
*/
private ConcurrentLinkedHashMap<Long, TokenCacheNode> localCache;

@SuppressWarnings("PMD.ThreadPoolCreationRule")
private static ScheduledExecutorService executor = Executors.newScheduledThreadPool(1,
new NamedThreadFactory("regular clear expired token thread"));


public RegularExpireStrategy(ConcurrentLinkedHashMap<Long, TokenCacheNode> localCache) {
AssertUtil.isTrue(localCache != null, " local cache can't be null");
this.localCache = localCache;
}

@Override
public void startClearTaskRegularly() {
executor.scheduleAtFixedRate(new ClearExpiredTokenTask(), 0, executeRate, TimeUnit.MILLISECONDS);
}

private class ClearExpiredTokenTask implements Runnable {
@Override
public void run() {
try {
clearToken();
} catch (Throwable e) {
e.printStackTrace();
RecordLog.warn("[RegularExpireStrategy] undefined throwable during clear token: ", e);
}
}
}

private void clearToken() {
long start = System.currentTimeMillis();
List<Long> keyList = new ArrayList<>(localCache.keySet());
for (int i = 0; i < executeCount && i < keyList.size(); i++) {
// time out execution exit
if (System.currentTimeMillis() - start > executeDuration) {
RecordLog.info("[RegularExpireStrategy] End the process of expired token detection because of execute time is more than executeDuration:", executeDuration);
break;
}
Long key = keyList.get(i);
TokenCacheNode node = localCache.get(key);
if (node == null) {
continue;
}

// remove the token whose client is offline and saved for more than clientTimeout
if (!ConnectionManager.isClientOnline(node.getClientAddress()) && node.getClientTimeout() - System.currentTimeMillis() < 0) {
removeToken(key, node);
RecordLog.info("[RegularExpireStrategy] Delete the expired token<{}> because of client offline for ruleId<{}>", node.getTokenId(), node.getFlowId());
continue;
}

// If we find that token's save time is more than 2 times of the client's call resource timeout time,
// the token will be determined to timeout.
long resourceTimeout = ClusterFlowRuleManager.getFlowRuleById(node.getFlowId()).getClusterConfig().getResourceTimeout();
if (System.currentTimeMillis() - node.getResourceTimeout() > resourceTimeout) {
removeToken(key, node);
RecordLog.info("[RegularExpireStrategy] Delete the expired token<{}> because of resource timeout for ruleId<{}>", node.getTokenId(), node.getFlowId());
}
}
}

private void removeToken(long tokenId, TokenCacheNode node) {
if (localCache.remove(tokenId) == null) {
RecordLog.info("[RegularExpireStrategy] Token<{}> is already released for ruleId<{}>", tokenId, node.getFlowId());
return;
}
AtomicInteger nowCalls = CurrentConcurrencyManager.get(node.getFlowId());
if (nowCalls == null) {
return;
}
nowCalls.getAndAdd(node.getAcquireCount() * -1);
}
}

+ 3
- 0
sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/ConnectionManager.java ファイルの表示

@@ -112,6 +112,9 @@ public final class ConnectionManager {
return group;
}

public static boolean isClientOnline(String address){
return NAMESPACE_MAP.containsKey(address);
}
static void clear() {
CONN_MAP.clear();
NAMESPACE_MAP.clear();


読み込み中…
キャンセル
保存