Signed-off-by: Eric Zhao <sczyh16@gmail.com>master
@@ -17,111 +17,22 @@ package com.alibaba.csp.sentinel.cluster.flow; | |||||
import com.alibaba.csp.sentinel.cluster.TokenResultStatus; | import com.alibaba.csp.sentinel.cluster.TokenResultStatus; | ||||
import com.alibaba.csp.sentinel.cluster.TokenResult; | import com.alibaba.csp.sentinel.cluster.TokenResult; | ||||
import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterFlowRuleManager; | |||||
import com.alibaba.csp.sentinel.cluster.flow.statistic.ClusterMetricStatistics; | import com.alibaba.csp.sentinel.cluster.flow.statistic.ClusterMetricStatistics; | ||||
import com.alibaba.csp.sentinel.cluster.server.config.ClusterServerConfigManager; | import com.alibaba.csp.sentinel.cluster.server.config.ClusterServerConfigManager; | ||||
import com.alibaba.csp.sentinel.cluster.flow.statistic.data.ClusterFlowEvent; | import com.alibaba.csp.sentinel.cluster.flow.statistic.data.ClusterFlowEvent; | ||||
import com.alibaba.csp.sentinel.cluster.flow.statistic.metric.ClusterMetric; | import com.alibaba.csp.sentinel.cluster.flow.statistic.metric.ClusterMetric; | ||||
import com.alibaba.csp.sentinel.cluster.server.log.ClusterServerStatLogUtil; | |||||
import com.alibaba.csp.sentinel.slots.block.ClusterRuleConstant; | import com.alibaba.csp.sentinel.slots.block.ClusterRuleConstant; | ||||
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; | import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; | ||||
import com.alibaba.csp.sentinel.util.TimeUtil; | |||||
/** | /** | ||||
* Flow checker for cluster flow rules. | |||||
* | |||||
* @author Eric Zhao | * @author Eric Zhao | ||||
* @since 1.4.0 | * @since 1.4.0 | ||||
*/ | */ | ||||
public final class ClusterFlowChecker { | |||||
static TokenResult tryAcquireOrBorrowFromRefResource(FlowRule rule, int acquireCount, boolean prioritized) { | |||||
// 1. First try acquire its own count. | |||||
// TokenResult ownResult = acquireClusterToken(rule, acquireCount, prioritized); | |||||
ClusterMetric metric = ClusterMetricStatistics.getMetric(rule.getClusterConfig().getFlowId()); | |||||
if (metric == null) { | |||||
return new TokenResult(TokenResultStatus.FAIL); | |||||
} | |||||
double latestQps = metric.getAvg(ClusterFlowEvent.PASS_REQUEST); | |||||
double globalThreshold = calcGlobalThreshold(rule) * ClusterServerConfigManager.exceedCount; | |||||
double nextRemaining = globalThreshold - latestQps - acquireCount; | |||||
if (nextRemaining >= 0) { | |||||
// TODO: checking logic and metric operation should be separated. | |||||
metric.add(ClusterFlowEvent.PASS, acquireCount); | |||||
metric.add(ClusterFlowEvent.PASS_REQUEST, 1); | |||||
if (prioritized) { | |||||
// Add prioritized pass. | |||||
metric.add(ClusterFlowEvent.OCCUPIED_PASS, acquireCount); | |||||
} | |||||
// Remaining count is cut down to a smaller integer. | |||||
return new TokenResult(TokenResultStatus.OK) | |||||
.setRemaining((int) nextRemaining) | |||||
.setWaitInMs(0); | |||||
} | |||||
if (prioritized) { | |||||
double occupyAvg = metric.getAvg(ClusterFlowEvent.WAITING); | |||||
if (occupyAvg <= ClusterServerConfigManager.maxOccupyRatio * globalThreshold) { | |||||
int waitInMs = metric.tryOccupyNext(ClusterFlowEvent.PASS, acquireCount, globalThreshold); | |||||
if (waitInMs > 0) { | |||||
return new TokenResult(TokenResultStatus.SHOULD_WAIT) | |||||
.setRemaining(0) | |||||
.setWaitInMs(waitInMs); | |||||
} | |||||
// Or else occupy failed, should be blocked. | |||||
} | |||||
} | |||||
// 2. If failed, try to borrow from reference resource. | |||||
// Assume it's valid as checked before. | |||||
if (!ClusterServerConfigManager.borrowRefEnabled) { | |||||
return new TokenResult(TokenResultStatus.NOT_AVAILABLE); | |||||
} | |||||
Long refFlowId = rule.getClusterConfig().getRefFlowId(); | |||||
FlowRule refFlowRule = ClusterFlowRuleManager.getFlowRuleById(refFlowId); | |||||
if (refFlowRule == null) { | |||||
return new TokenResult(TokenResultStatus.NO_REF_RULE_EXISTS); | |||||
} | |||||
// TODO: check here | |||||
ClusterMetric refMetric = ClusterMetricStatistics.getMetric(refFlowId); | |||||
if (refMetric == null) { | |||||
return new TokenResult(TokenResultStatus.FAIL); | |||||
} | |||||
double refOrders = refMetric.getAvg(ClusterFlowEvent.PASS); | |||||
double refQps = refMetric.getAvg(ClusterFlowEvent.PASS_REQUEST); | |||||
double splitRatio = refQps > 0 ? refOrders / refQps : 1; | |||||
double selfGlobalThreshold = ClusterServerConfigManager.exceedCount * calcGlobalThreshold(rule); | |||||
double refGlobalThreshold = ClusterServerConfigManager.exceedCount * calcGlobalThreshold(refFlowRule); | |||||
long currentTime = TimeUtil.currentTimeMillis(); | |||||
long latestRefTime = 0 /*refFlowRule.clusterQps.getStableWindowStartTime()*/; | |||||
int sampleCount = 10; | |||||
if (currentTime > latestRefTime | |||||
&& (refOrders / refGlobalThreshold + 1.0d / sampleCount >= ((double)(currentTime - latestRefTime)) / 1000) | |||||
|| refOrders == refGlobalThreshold) { | |||||
return blockedResult(); | |||||
} | |||||
// double latestQps = metric.getAvg(ClusterFlowEvent.PASS); | |||||
double refRatio = rule.getClusterConfig().getRefRatio(); | |||||
if (refOrders / splitRatio + (acquireCount + latestQps) * refRatio | |||||
<= refGlobalThreshold / splitRatio + selfGlobalThreshold * refRatio) { | |||||
metric.add(ClusterFlowEvent.PASS, acquireCount); | |||||
metric.add(ClusterFlowEvent.PASS_REQUEST, 1); | |||||
return new TokenResult(TokenResultStatus.OK); | |||||
} | |||||
// TODO: log here? | |||||
metric.add(ClusterFlowEvent.BLOCK, acquireCount); | |||||
return blockedResult(); | |||||
} | |||||
final class ClusterFlowChecker { | |||||
private static double calcGlobalThreshold(FlowRule rule) { | private static double calcGlobalThreshold(FlowRule rule) { | ||||
double count = rule.getCount(); | double count = rule.getCount(); | ||||
@@ -130,20 +41,20 @@ public final class ClusterFlowChecker { | |||||
return count; | return count; | ||||
case ClusterRuleConstant.FLOW_THRESHOLD_AVG_LOCAL: | case ClusterRuleConstant.FLOW_THRESHOLD_AVG_LOCAL: | ||||
default: | default: | ||||
// TODO: get real connected count grouped. | |||||
int connectedCount = 1; | |||||
int connectedCount = ClusterFlowRuleManager.getConnectedCount(rule.getClusterConfig().getFlowId()); | |||||
return count * connectedCount; | return count * connectedCount; | ||||
} | } | ||||
} | } | ||||
static TokenResult acquireClusterToken(/*@Valid*/ FlowRule rule, int acquireCount, boolean prioritized) { | static TokenResult acquireClusterToken(/*@Valid*/ FlowRule rule, int acquireCount, boolean prioritized) { | ||||
ClusterMetric metric = ClusterMetricStatistics.getMetric(rule.getClusterConfig().getFlowId()); | |||||
Long id = rule.getClusterConfig().getFlowId(); | |||||
ClusterMetric metric = ClusterMetricStatistics.getMetric(id); | |||||
if (metric == null) { | if (metric == null) { | ||||
return new TokenResult(TokenResultStatus.FAIL); | return new TokenResult(TokenResultStatus.FAIL); | ||||
} | } | ||||
double latestQps = metric.getAvg(ClusterFlowEvent.PASS_REQUEST); | double latestQps = metric.getAvg(ClusterFlowEvent.PASS_REQUEST); | ||||
double globalThreshold = calcGlobalThreshold(rule) * ClusterServerConfigManager.exceedCount; | |||||
double globalThreshold = calcGlobalThreshold(rule) * ClusterServerConfigManager.getExceedCount(); | |||||
double nextRemaining = globalThreshold - latestQps - acquireCount; | double nextRemaining = globalThreshold - latestQps - acquireCount; | ||||
if (nextRemaining >= 0) { | if (nextRemaining >= 0) { | ||||
@@ -160,10 +71,13 @@ public final class ClusterFlowChecker { | |||||
.setWaitInMs(0); | .setWaitInMs(0); | ||||
} else { | } else { | ||||
if (prioritized) { | if (prioritized) { | ||||
// Try to occupy incoming buckets. | |||||
double occupyAvg = metric.getAvg(ClusterFlowEvent.WAITING); | double occupyAvg = metric.getAvg(ClusterFlowEvent.WAITING); | ||||
if (occupyAvg <= ClusterServerConfigManager.maxOccupyRatio * globalThreshold) { | |||||
if (occupyAvg <= ClusterServerConfigManager.getMaxOccupyRatio() * globalThreshold) { | |||||
int waitInMs = metric.tryOccupyNext(ClusterFlowEvent.PASS, acquireCount, globalThreshold); | int waitInMs = metric.tryOccupyNext(ClusterFlowEvent.PASS, acquireCount, globalThreshold); | ||||
// waitInMs > 0 indicates pre-occupy incoming buckets successfully. | |||||
if (waitInMs > 0) { | if (waitInMs > 0) { | ||||
ClusterServerStatLogUtil.log("flow|waiting|" + id); | |||||
return new TokenResult(TokenResultStatus.SHOULD_WAIT) | return new TokenResult(TokenResultStatus.SHOULD_WAIT) | ||||
.setRemaining(0) | .setRemaining(0) | ||||
.setWaitInMs(waitInMs); | .setWaitInMs(waitInMs); | ||||
@@ -174,9 +88,12 @@ public final class ClusterFlowChecker { | |||||
// Blocked. | // Blocked. | ||||
metric.add(ClusterFlowEvent.BLOCK, acquireCount); | metric.add(ClusterFlowEvent.BLOCK, acquireCount); | ||||
metric.add(ClusterFlowEvent.BLOCK_REQUEST, 1); | metric.add(ClusterFlowEvent.BLOCK_REQUEST, 1); | ||||
ClusterServerStatLogUtil.log("flow|block|" + id, acquireCount); | |||||
ClusterServerStatLogUtil.log("flow|block_request|" + id, 1); | |||||
if (prioritized) { | if (prioritized) { | ||||
// Add prioritized block. | // Add prioritized block. | ||||
metric.add(ClusterFlowEvent.OCCUPIED_BLOCK, acquireCount); | metric.add(ClusterFlowEvent.OCCUPIED_BLOCK, acquireCount); | ||||
ClusterServerStatLogUtil.log("flow|occupied_block|" + id, 1); | |||||
} | } | ||||
return blockedResult(); | return blockedResult(); | ||||
@@ -1,137 +0,0 @@ | |||||
/* | |||||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||||
* | |||||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||||
* you may not use this file except in compliance with the License. | |||||
* You may obtain a copy of the License at | |||||
* | |||||
* http://www.apache.org/licenses/LICENSE-2.0 | |||||
* | |||||
* Unless required by applicable law or agreed to in writing, software | |||||
* distributed under the License is distributed on an "AS IS" BASIS, | |||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
* See the License for the specific language governing permissions and | |||||
* limitations under the License. | |||||
*/ | |||||
package com.alibaba.csp.sentinel.cluster.flow; | |||||
import java.util.List; | |||||
import java.util.Map; | |||||
import java.util.Set; | |||||
import java.util.concurrent.ConcurrentHashMap; | |||||
import com.alibaba.csp.sentinel.cluster.flow.statistic.ClusterMetricStatistics; | |||||
import com.alibaba.csp.sentinel.cluster.flow.statistic.metric.ClusterMetric; | |||||
import com.alibaba.csp.sentinel.cluster.server.util.ClusterRuleUtil; | |||||
import com.alibaba.csp.sentinel.log.RecordLog; | |||||
import com.alibaba.csp.sentinel.property.DynamicSentinelProperty; | |||||
import com.alibaba.csp.sentinel.property.PropertyListener; | |||||
import com.alibaba.csp.sentinel.property.SentinelProperty; | |||||
import com.alibaba.csp.sentinel.slots.block.RuleConstant; | |||||
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; | |||||
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleUtil; | |||||
import com.alibaba.csp.sentinel.util.StringUtil; | |||||
/** | |||||
* @author Eric Zhao | |||||
* @since 1.4.0 | |||||
*/ | |||||
public final class ClusterFlowRuleManager { | |||||
private static final Map<Long, FlowRule> FLOW_RULES = new ConcurrentHashMap<>(); | |||||
private static final PropertyListener<List<FlowRule>> PROPERTY_LISTENER = new FlowRulePropertyListener(); | |||||
private static SentinelProperty<List<FlowRule>> currentProperty = new DynamicSentinelProperty<>(); | |||||
static { | |||||
currentProperty.addListener(PROPERTY_LISTENER); | |||||
} | |||||
/** | |||||
* Listen to the {@link SentinelProperty} for {@link FlowRule}s. | |||||
* The property is the source of cluster {@link FlowRule}s. | |||||
* | |||||
* @param property the property to listen. | |||||
*/ | |||||
public static void register2Property(SentinelProperty<List<FlowRule>> property) { | |||||
synchronized (PROPERTY_LISTENER) { | |||||
RecordLog.info("[ClusterFlowRuleManager] Registering new property to cluster flow rule manager"); | |||||
currentProperty.removeListener(PROPERTY_LISTENER); | |||||
property.addListener(PROPERTY_LISTENER); | |||||
currentProperty = property; | |||||
} | |||||
} | |||||
public static FlowRule getFlowRuleById(Long id) { | |||||
if (!ClusterRuleUtil.validId(id)) { | |||||
return null; | |||||
} | |||||
return FLOW_RULES.get(id); | |||||
} | |||||
private static Map<Long, FlowRule> buildClusterFlowRuleMap(List<FlowRule> list) { | |||||
Map<Long, FlowRule> ruleMap = new ConcurrentHashMap<>(); | |||||
if (list == null || list.isEmpty()) { | |||||
return ruleMap; | |||||
} | |||||
for (FlowRule rule : list) { | |||||
if (!rule.isClusterMode()) { | |||||
continue; | |||||
} | |||||
if (!FlowRuleUtil.isValidRule(rule)) { | |||||
RecordLog.warn( | |||||
"[ClusterFlowRuleManager] Ignoring invalid flow rule when loading new flow rules: " + rule); | |||||
continue; | |||||
} | |||||
if (StringUtil.isBlank(rule.getLimitApp())) { | |||||
rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT); | |||||
} | |||||
// Flow id should not be null after filtered. | |||||
Long flowId = rule.getClusterConfig().getFlowId(); | |||||
if (flowId == null) { | |||||
continue; | |||||
} | |||||
ruleMap.put(flowId, rule); | |||||
// Prepare cluster metric from valid flow ID. | |||||
ClusterMetricStatistics.putMetricIfAbsent(flowId, new ClusterMetric(100, 1)); | |||||
} | |||||
// Cleanup unused cluster metrics. | |||||
Set<Long> previousSet = FLOW_RULES.keySet(); | |||||
for (Long id : previousSet) { | |||||
if (!ruleMap.containsKey(id)) { | |||||
ClusterMetricStatistics.removeMetric(id); | |||||
} | |||||
} | |||||
return ruleMap; | |||||
} | |||||
private static final class FlowRulePropertyListener implements PropertyListener<List<FlowRule>> { | |||||
@Override | |||||
public void configUpdate(List<FlowRule> conf) { | |||||
Map<Long, FlowRule> rules = buildClusterFlowRuleMap(conf); | |||||
if (rules != null) { | |||||
FLOW_RULES.clear(); | |||||
FLOW_RULES.putAll(rules); | |||||
} | |||||
RecordLog.info("[ClusterFlowRuleManager] Cluster flow rules received: " + FLOW_RULES); | |||||
} | |||||
@Override | |||||
public void configLoad(List<FlowRule> conf) { | |||||
Map<Long, FlowRule> rules = buildClusterFlowRuleMap(conf); | |||||
if (rules != null) { | |||||
FLOW_RULES.clear(); | |||||
FLOW_RULES.putAll(rules); | |||||
} | |||||
RecordLog.info("[ClusterFlowRuleManager] Cluster flow rules loaded: " + FLOW_RULES); | |||||
} | |||||
} | |||||
private ClusterFlowRuleManager() {} | |||||
} |
@@ -19,30 +19,36 @@ import java.util.Collection; | |||||
import com.alibaba.csp.sentinel.cluster.TokenResult; | import com.alibaba.csp.sentinel.cluster.TokenResult; | ||||
import com.alibaba.csp.sentinel.cluster.TokenResultStatus; | import com.alibaba.csp.sentinel.cluster.TokenResultStatus; | ||||
import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterParamFlowRuleManager; | |||||
import com.alibaba.csp.sentinel.cluster.flow.statistic.ClusterParamMetricStatistics; | import com.alibaba.csp.sentinel.cluster.flow.statistic.ClusterParamMetricStatistics; | ||||
import com.alibaba.csp.sentinel.cluster.flow.statistic.metric.ClusterParamMetric; | import com.alibaba.csp.sentinel.cluster.flow.statistic.metric.ClusterParamMetric; | ||||
import com.alibaba.csp.sentinel.cluster.server.log.ClusterServerStatLogUtil; | |||||
import com.alibaba.csp.sentinel.slots.block.ClusterRuleConstant; | import com.alibaba.csp.sentinel.slots.block.ClusterRuleConstant; | ||||
import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowRule; | import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowRule; | ||||
/** | /** | ||||
* @author jialiang.linjl | |||||
* @author Eric Zhao | * @author Eric Zhao | ||||
* @since 1.4.0 | |||||
*/ | */ | ||||
public final class ClusterParamFlowChecker { | public final class ClusterParamFlowChecker { | ||||
static TokenResult acquireClusterToken(ParamFlowRule rule, int count, Collection<Object> values) { | static TokenResult acquireClusterToken(ParamFlowRule rule, int count, Collection<Object> values) { | ||||
ClusterParamMetric metric = ClusterParamMetricStatistics.getMetric(rule.getClusterConfig().getFlowId()); | |||||
Long id = rule.getClusterConfig().getFlowId(); | |||||
ClusterParamMetric metric = ClusterParamMetricStatistics.getMetric(id); | |||||
if (metric == null) { | if (metric == null) { | ||||
// Unexpected state, return FAIL. | // Unexpected state, return FAIL. | ||||
return new TokenResult(TokenResultStatus.FAIL); | return new TokenResult(TokenResultStatus.FAIL); | ||||
} | } | ||||
double remaining = -1; | |||||
boolean hasPassed = true; | boolean hasPassed = true; | ||||
Object blockObject = null; | Object blockObject = null; | ||||
for (Object value : values) { | for (Object value : values) { | ||||
// TODO: origin is int * int, but current double! | |||||
double curCount = metric.getAvg(value); | |||||
double threshold = calcGlobalThreshold(rule); | |||||
if (++curCount > threshold) { | |||||
double latestQps = metric.getAvg(value); | |||||
double threshold = calcGlobalThreshold(rule, value); | |||||
double nextRemaining = threshold - latestQps - count; | |||||
remaining = nextRemaining; | |||||
if (nextRemaining < 0) { | |||||
hasPassed = false; | hasPassed = false; | ||||
blockObject = value; | blockObject = value; | ||||
break; | break; | ||||
@@ -53,30 +59,50 @@ public final class ClusterParamFlowChecker { | |||||
for (Object value : values) { | for (Object value : values) { | ||||
metric.addValue(value, count); | metric.addValue(value, count); | ||||
} | } | ||||
ClusterServerStatLogUtil.log(String.format("param|pass|%d", id)); | |||||
} else { | } else { | ||||
// TODO: log <blocked object> here? | |||||
ClusterServerStatLogUtil.log(String.format("param|block|%d|%s", id, blockObject)); | |||||
} | |||||
if (values.size() > 1) { | |||||
// Remaining field is unsupported for multi-values. | |||||
remaining = -1; | |||||
} | } | ||||
return hasPassed ? newRawResponse(TokenResultStatus.OK): newRawResponse(TokenResultStatus.BLOCKED); | |||||
return hasPassed ? newPassResponse((int)remaining): newBlockResponse(); | |||||
} | |||||
private static TokenResult newPassResponse(int remaining) { | |||||
return new TokenResult(TokenResultStatus.OK) | |||||
.setRemaining(remaining) | |||||
.setWaitInMs(0); | |||||
} | } | ||||
private static TokenResult newRawResponse(int status) { | |||||
return new TokenResult(status) | |||||
private static TokenResult newBlockResponse() { | |||||
return new TokenResult(TokenResultStatus.BLOCKED) | |||||
.setRemaining(0) | .setRemaining(0) | ||||
.setWaitInMs(0); | .setWaitInMs(0); | ||||
} | } | ||||
private static double calcGlobalThreshold(ParamFlowRule rule) { | |||||
double count = rule.getCount(); | |||||
private static double calcGlobalThreshold(ParamFlowRule rule, Object value) { | |||||
double count = getRawThreshold(rule, value); | |||||
switch (rule.getClusterConfig().getThresholdType()) { | switch (rule.getClusterConfig().getThresholdType()) { | ||||
case ClusterRuleConstant.FLOW_THRESHOLD_GLOBAL: | case ClusterRuleConstant.FLOW_THRESHOLD_GLOBAL: | ||||
return count; | return count; | ||||
case ClusterRuleConstant.FLOW_THRESHOLD_AVG_LOCAL: | case ClusterRuleConstant.FLOW_THRESHOLD_AVG_LOCAL: | ||||
default: | default: | ||||
int connectedCount = 1; // TODO: get real connected count grouped. | |||||
int connectedCount = ClusterParamFlowRuleManager.getConnectedCount(rule.getClusterConfig().getFlowId()); | |||||
return count * connectedCount; | return count * connectedCount; | ||||
} | } | ||||
} | } | ||||
private static double getRawThreshold(ParamFlowRule rule, Object value) { | |||||
Integer itemCount = rule.retrieveExclusiveItemCount(value); | |||||
if (itemCount == null) { | |||||
return rule.getCount(); | |||||
} else { | |||||
return itemCount; | |||||
} | |||||
} | |||||
private ClusterParamFlowChecker() {} | private ClusterParamFlowChecker() {} | ||||
} | } |
@@ -1,138 +0,0 @@ | |||||
/* | |||||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||||
* | |||||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||||
* you may not use this file except in compliance with the License. | |||||
* You may obtain a copy of the License at | |||||
* | |||||
* http://www.apache.org/licenses/LICENSE-2.0 | |||||
* | |||||
* Unless required by applicable law or agreed to in writing, software | |||||
* distributed under the License is distributed on an "AS IS" BASIS, | |||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
* See the License for the specific language governing permissions and | |||||
* limitations under the License. | |||||
*/ | |||||
package com.alibaba.csp.sentinel.cluster.flow; | |||||
import java.util.List; | |||||
import java.util.Map; | |||||
import java.util.Set; | |||||
import java.util.concurrent.ConcurrentHashMap; | |||||
import com.alibaba.csp.sentinel.cluster.flow.statistic.ClusterParamMetricStatistics; | |||||
import com.alibaba.csp.sentinel.cluster.flow.statistic.metric.ClusterParamMetric; | |||||
import com.alibaba.csp.sentinel.cluster.server.util.ClusterRuleUtil; | |||||
import com.alibaba.csp.sentinel.log.RecordLog; | |||||
import com.alibaba.csp.sentinel.property.DynamicSentinelProperty; | |||||
import com.alibaba.csp.sentinel.property.PropertyListener; | |||||
import com.alibaba.csp.sentinel.property.SentinelProperty; | |||||
import com.alibaba.csp.sentinel.slots.block.RuleConstant; | |||||
import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowRule; | |||||
import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowRuleUtil; | |||||
import com.alibaba.csp.sentinel.util.StringUtil; | |||||
/** | |||||
* @author Eric Zhao | |||||
* @since 1.4.0 | |||||
*/ | |||||
public final class ClusterParamFlowRuleManager { | |||||
private static final Map<Long, ParamFlowRule> PARAM_RULES = new ConcurrentHashMap<>(); | |||||
private final static RulePropertyListener PROPERTY_LISTENER = new RulePropertyListener(); | |||||
private static SentinelProperty<List<ParamFlowRule>> currentProperty | |||||
= new DynamicSentinelProperty<List<ParamFlowRule>>(); | |||||
static { | |||||
currentProperty.addListener(PROPERTY_LISTENER); | |||||
} | |||||
/** | |||||
* Listen to the {@link SentinelProperty} for {@link ParamFlowRule}s. | |||||
* The property is the source of {@link ParamFlowRule}s. | |||||
* | |||||
* @param property the property to listen | |||||
*/ | |||||
public static void register2Property(SentinelProperty<List<ParamFlowRule>> property) { | |||||
synchronized (PROPERTY_LISTENER) { | |||||
currentProperty.removeListener(PROPERTY_LISTENER); | |||||
property.addListener(PROPERTY_LISTENER); | |||||
currentProperty = property; | |||||
RecordLog.info("[ClusterParamFlowRuleManager] New property has been registered to cluster param rule manager"); | |||||
} | |||||
} | |||||
public static ParamFlowRule getParamFlowRuleById(Long id) { | |||||
if (!ClusterRuleUtil.validId(id)) { | |||||
return null; | |||||
} | |||||
return PARAM_RULES.get(id); | |||||
} | |||||
static class RulePropertyListener implements PropertyListener<List<ParamFlowRule>> { | |||||
@Override | |||||
public void configUpdate(List<ParamFlowRule> conf) { | |||||
Map<Long, ParamFlowRule> rules = buildClusterRuleMap(conf); | |||||
if (rules != null) { | |||||
PARAM_RULES.clear(); | |||||
PARAM_RULES.putAll(rules); | |||||
} | |||||
RecordLog.info("[ClusterFlowRuleManager] Cluster param flow rules received: " + PARAM_RULES); | |||||
} | |||||
@Override | |||||
public void configLoad(List<ParamFlowRule> conf) { | |||||
Map<Long, ParamFlowRule> rules = buildClusterRuleMap(conf); | |||||
if (rules != null) { | |||||
PARAM_RULES.clear(); | |||||
PARAM_RULES.putAll(rules); | |||||
} | |||||
RecordLog.info("[ClusterFlowRuleManager] Cluster param flow rules received: " + PARAM_RULES); | |||||
} | |||||
} | |||||
private static Map<Long, ParamFlowRule> buildClusterRuleMap(List<ParamFlowRule> list) { | |||||
Map<Long, ParamFlowRule> ruleMap = new ConcurrentHashMap<>(); | |||||
if (list == null || list.isEmpty()) { | |||||
return ruleMap; | |||||
} | |||||
for (ParamFlowRule rule : list) { | |||||
if (!rule.isClusterMode()) { | |||||
continue; | |||||
} | |||||
if (!ParamFlowRuleUtil.isValidRule(rule)) { | |||||
RecordLog.warn( | |||||
"[ClusterParamFlowRuleManager] Ignoring invalid param flow rule when loading new flow rules: " + rule); | |||||
continue; | |||||
} | |||||
if (StringUtil.isBlank(rule.getLimitApp())) { | |||||
rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT); | |||||
} | |||||
// Flow id should not be null after filtered. | |||||
Long flowId = rule.getClusterConfig().getFlowId(); | |||||
if (flowId == null) { | |||||
continue; | |||||
} | |||||
ruleMap.put(flowId, rule); | |||||
// Prepare cluster metric from valid flow ID. | |||||
ClusterParamMetricStatistics.putMetricIfAbsent(flowId, new ClusterParamMetric(100, 1)); | |||||
} | |||||
// Cleanup unused cluster metrics. | |||||
Set<Long> previousSet = PARAM_RULES.keySet(); | |||||
for (Long id : previousSet) { | |||||
if (!ruleMap.containsKey(id)) { | |||||
ClusterParamMetricStatistics.removeMetric(id); | |||||
} | |||||
} | |||||
return ruleMap; | |||||
} | |||||
private ClusterParamFlowRuleManager() {} | |||||
} |
@@ -20,7 +20,8 @@ import java.util.Collection; | |||||
import com.alibaba.csp.sentinel.cluster.TokenResultStatus; | import com.alibaba.csp.sentinel.cluster.TokenResultStatus; | ||||
import com.alibaba.csp.sentinel.cluster.TokenResult; | import com.alibaba.csp.sentinel.cluster.TokenResult; | ||||
import com.alibaba.csp.sentinel.cluster.TokenService; | import com.alibaba.csp.sentinel.cluster.TokenService; | ||||
import com.alibaba.csp.sentinel.slots.block.ClusterRuleConstant; | |||||
import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterFlowRuleManager; | |||||
import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterParamFlowRuleManager; | |||||
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; | import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; | ||||
import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowRule; | import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowRule; | ||||
@@ -42,24 +43,17 @@ public class DefaultTokenService implements TokenService { | |||||
if (rule == null) { | if (rule == null) { | ||||
return new TokenResult(TokenResultStatus.NO_RULE_EXISTS); | return new TokenResult(TokenResultStatus.NO_RULE_EXISTS); | ||||
} | } | ||||
if (isUsingReference(rule)) { | |||||
return ClusterFlowChecker.tryAcquireOrBorrowFromRefResource(rule, acquireCount, prioritized); | |||||
} | |||||
return ClusterFlowChecker.acquireClusterToken(rule, acquireCount, prioritized); | return ClusterFlowChecker.acquireClusterToken(rule, acquireCount, prioritized); | ||||
} | } | ||||
private boolean isUsingReference(FlowRule rule) { | |||||
return rule.getClusterConfig().getStrategy() == ClusterRuleConstant.FLOW_CLUSTER_STRATEGY_REF; | |||||
} | |||||
@Override | @Override | ||||
public TokenResult requestParamToken(Long ruleId, int acquireCount, Collection<Object> params) { | public TokenResult requestParamToken(Long ruleId, int acquireCount, Collection<Object> params) { | ||||
if (notValidRequest(ruleId, acquireCount) || params == null || params.isEmpty()) { | if (notValidRequest(ruleId, acquireCount) || params == null || params.isEmpty()) { | ||||
return badRequest(); | return badRequest(); | ||||
} | } | ||||
// The rule should be valid. | // The rule should be valid. | ||||
ParamFlowRule rule = ClusterParamFlowRuleManager.getParamFlowRuleById(ruleId); | |||||
ParamFlowRule rule = ClusterParamFlowRuleManager.getParamRuleById(ruleId); | |||||
if (rule == null) { | if (rule == null) { | ||||
return new TokenResult(TokenResultStatus.NO_RULE_EXISTS); | return new TokenResult(TokenResultStatus.NO_RULE_EXISTS); | ||||
} | } | ||||
@@ -0,0 +1,320 @@ | |||||
/* | |||||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||||
* | |||||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||||
* you may not use this file except in compliance with the License. | |||||
* You may obtain a copy of the License at | |||||
* | |||||
* http://www.apache.org/licenses/LICENSE-2.0 | |||||
* | |||||
* Unless required by applicable law or agreed to in writing, software | |||||
* distributed under the License is distributed on an "AS IS" BASIS, | |||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
* See the License for the specific language governing permissions and | |||||
* limitations under the License. | |||||
*/ | |||||
package com.alibaba.csp.sentinel.cluster.flow.rule; | |||||
import java.util.HashSet; | |||||
import java.util.List; | |||||
import java.util.Map; | |||||
import java.util.Set; | |||||
import java.util.concurrent.ConcurrentHashMap; | |||||
import com.alibaba.csp.sentinel.cluster.flow.statistic.ClusterMetricStatistics; | |||||
import com.alibaba.csp.sentinel.cluster.flow.statistic.metric.ClusterMetric; | |||||
import com.alibaba.csp.sentinel.cluster.server.ServerConstants; | |||||
import com.alibaba.csp.sentinel.cluster.server.config.ClusterServerConfigManager; | |||||
import com.alibaba.csp.sentinel.cluster.server.connection.ConnectionManager; | |||||
import com.alibaba.csp.sentinel.cluster.server.util.ClusterRuleUtil; | |||||
import com.alibaba.csp.sentinel.log.RecordLog; | |||||
import com.alibaba.csp.sentinel.property.DynamicSentinelProperty; | |||||
import com.alibaba.csp.sentinel.property.PropertyListener; | |||||
import com.alibaba.csp.sentinel.property.SentinelProperty; | |||||
import com.alibaba.csp.sentinel.slots.block.RuleConstant; | |||||
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; | |||||
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleUtil; | |||||
import com.alibaba.csp.sentinel.util.AssertUtil; | |||||
import com.alibaba.csp.sentinel.util.StringUtil; | |||||
import com.alibaba.csp.sentinel.util.function.Function; | |||||
import com.alibaba.csp.sentinel.util.function.Predicate; | |||||
/** | |||||
* Manager for cluster flow rules. | |||||
* | |||||
* @author Eric Zhao | |||||
* @since 1.4.0 | |||||
*/ | |||||
public final class ClusterFlowRuleManager { | |||||
/** | |||||
* The default cluster flow rule property supplier that creates a new dynamic property | |||||
* for a specific namespace to do rule management manually. | |||||
*/ | |||||
public static final Function<String, SentinelProperty<List<FlowRule>>> DEFAULT_PROPERTY_SUPPLIER = | |||||
new Function<String, SentinelProperty<List<FlowRule>>>() { | |||||
@Override | |||||
public SentinelProperty<List<FlowRule>> apply(String namespace) { | |||||
return new DynamicSentinelProperty<>(); | |||||
} | |||||
}; | |||||
/** | |||||
* (flowId, clusterRule) | |||||
*/ | |||||
private static final Map<Long, FlowRule> FLOW_RULES = new ConcurrentHashMap<>(); | |||||
/** | |||||
* (namespace, [flowId...]) | |||||
*/ | |||||
private static final Map<String, Set<Long>> NAMESPACE_FLOW_ID_MAP = new ConcurrentHashMap<>(); | |||||
/** | |||||
* <p>This map (flowId, namespace) is used for getting connected count | |||||
* when checking a specific rule in {@code ruleId}:</p> | |||||
* | |||||
* <pre> | |||||
* ruleId -> namespace -> connection group -> connected count | |||||
* </pre> | |||||
*/ | |||||
private static final Map<Long, String> FLOW_NAMESPACE_MAP = new ConcurrentHashMap<>(); | |||||
/** | |||||
* (namespace, property-listener wrapper) | |||||
*/ | |||||
private static final Map<String, NamespaceFlowProperty<FlowRule>> PROPERTY_MAP = new ConcurrentHashMap<>(); | |||||
/** | |||||
* Cluster flow rule property supplier for a specific namespace. | |||||
*/ | |||||
private static volatile Function<String, SentinelProperty<List<FlowRule>>> propertySupplier | |||||
= DEFAULT_PROPERTY_SUPPLIER; | |||||
private static final Object UPDATE_LOCK = new Object(); | |||||
static { | |||||
initDefaultProperty(); | |||||
} | |||||
private static void initDefaultProperty() { | |||||
// The server should always support default namespace, | |||||
// so register a default property for default namespace. | |||||
SentinelProperty<List<FlowRule>> defaultProperty = new DynamicSentinelProperty<>(); | |||||
String defaultNamespace = ServerConstants.DEFAULT_NAMESPACE; | |||||
registerPropertyInternal(defaultNamespace, defaultProperty); | |||||
} | |||||
public static void setPropertySupplier(Function<String, SentinelProperty<List<FlowRule>>> propertySupplier) { | |||||
ClusterFlowRuleManager.propertySupplier = propertySupplier; | |||||
} | |||||
/** | |||||
* Listen to the {@link SentinelProperty} for cluster {@link FlowRule}s. | |||||
* The property is the source of cluster {@link FlowRule}s for a specific namespace. | |||||
* | |||||
* @param namespace namespace to register | |||||
*/ | |||||
public static void register2Property(String namespace) { | |||||
AssertUtil.notEmpty(namespace, "namespace cannot be empty"); | |||||
if (propertySupplier == null) { | |||||
RecordLog.warn( | |||||
"[ClusterFlowRuleManager] Cluster flow property supplier is absent, cannot register property"); | |||||
return; | |||||
} | |||||
SentinelProperty<List<FlowRule>> property = propertySupplier.apply(namespace); | |||||
if (property == null) { | |||||
RecordLog.warn( | |||||
"[ClusterFlowRuleManager] Wrong created property from cluster flow property supplier, ignoring"); | |||||
return; | |||||
} | |||||
synchronized (UPDATE_LOCK) { | |||||
RecordLog.info("[ClusterFlowRuleManager] Registering new property to cluster flow rule manager" | |||||
+ " for namespace <{0}>", namespace); | |||||
registerPropertyInternal(namespace, property); | |||||
} | |||||
} | |||||
/** | |||||
* Listen to the {@link SentinelProperty} for cluster {@link FlowRule}s if current property for namespace is absent. | |||||
* The property is the source of cluster {@link FlowRule}s for a specific namespace. | |||||
* | |||||
* @param namespace namespace to register | |||||
*/ | |||||
public static void registerPropertyIfAbsent(String namespace) { | |||||
AssertUtil.notEmpty(namespace, "namespace cannot be empty"); | |||||
if (!PROPERTY_MAP.containsKey(namespace)) { | |||||
synchronized (UPDATE_LOCK) { | |||||
if (!PROPERTY_MAP.containsKey(namespace)) { | |||||
register2Property(namespace); | |||||
} | |||||
} | |||||
} | |||||
} | |||||
private static void registerPropertyInternal(/*@NonNull*/ String namespace, /*@Valid*/ | |||||
SentinelProperty<List<FlowRule>> property) { | |||||
NamespaceFlowProperty<FlowRule> oldProperty = PROPERTY_MAP.get(namespace); | |||||
if (oldProperty != null) { | |||||
oldProperty.getProperty().removeListener(oldProperty.getListener()); | |||||
} | |||||
PropertyListener<List<FlowRule>> listener = new FlowRulePropertyListener(namespace); | |||||
property.addListener(listener); | |||||
PROPERTY_MAP.put(namespace, new NamespaceFlowProperty<>(namespace, property, listener)); | |||||
Set<Long> flowIdSet = NAMESPACE_FLOW_ID_MAP.get(namespace); | |||||
if (flowIdSet == null) { | |||||
resetNamespaceFlowIdMapFor(namespace); | |||||
} | |||||
} | |||||
public static void removeProperty(String namespace) { | |||||
AssertUtil.notEmpty(namespace, "namespace cannot be empty"); | |||||
synchronized (UPDATE_LOCK) { | |||||
NamespaceFlowProperty<FlowRule> property = PROPERTY_MAP.get(namespace); | |||||
if (property != null) { | |||||
property.getProperty().removeListener(property.getListener()); | |||||
PROPERTY_MAP.remove(namespace); | |||||
} | |||||
RecordLog.info("[ClusterFlowRuleManager] Removing property from cluster flow rule manager" | |||||
+ " for namespace <{0}>", namespace); | |||||
} | |||||
} | |||||
private static void removePropertyListeners() { | |||||
for (NamespaceFlowProperty<FlowRule> property : PROPERTY_MAP.values()) { | |||||
property.getProperty().removeListener(property.getListener()); | |||||
} | |||||
} | |||||
private static void restorePropertyListeners() { | |||||
for (NamespaceFlowProperty<FlowRule> p : PROPERTY_MAP.values()) { | |||||
p.getProperty().removeListener(p.getListener()); | |||||
p.getProperty().addListener(p.getListener()); | |||||
} | |||||
} | |||||
public static FlowRule getFlowRuleById(Long id) { | |||||
if (!ClusterRuleUtil.validId(id)) { | |||||
return null; | |||||
} | |||||
return FLOW_RULES.get(id); | |||||
} | |||||
private static void resetNamespaceFlowIdMapFor(/*@Valid*/ String namespace) { | |||||
NAMESPACE_FLOW_ID_MAP.put(namespace, new HashSet<Long>()); | |||||
} | |||||
private static void clearAndResetRulesFor(/*@Valid*/ String namespace) { | |||||
Set<Long> flowIdSet = NAMESPACE_FLOW_ID_MAP.get(namespace); | |||||
if (flowIdSet != null && !flowIdSet.isEmpty()) { | |||||
for (Long flowId : flowIdSet) { | |||||
FLOW_RULES.remove(flowId); | |||||
FLOW_NAMESPACE_MAP.remove(flowId); | |||||
} | |||||
flowIdSet.clear(); | |||||
} else { | |||||
resetNamespaceFlowIdMapFor(namespace); | |||||
} | |||||
} | |||||
private static void clearAndResetRulesConditional(/*@Valid*/ String namespace, Predicate<Long> predicate) { | |||||
Set<Long> oldIdSet = NAMESPACE_FLOW_ID_MAP.get(namespace); | |||||
if (oldIdSet != null && !oldIdSet.isEmpty()) { | |||||
for (Long flowId : oldIdSet) { | |||||
if (predicate.test(flowId)) { | |||||
FLOW_RULES.remove(flowId); | |||||
FLOW_NAMESPACE_MAP.remove(flowId); | |||||
ClusterMetricStatistics.removeMetric(flowId); | |||||
} | |||||
} | |||||
oldIdSet.clear(); | |||||
} | |||||
} | |||||
/** | |||||
* Get connected count for associated namespace of given {@code flowId}. | |||||
* | |||||
* @param flowId unique flow ID | |||||
* @return connected count | |||||
*/ | |||||
public static int getConnectedCount(long flowId) { | |||||
if (flowId <= 0) { | |||||
return 0; | |||||
} | |||||
String namespace = FLOW_NAMESPACE_MAP.get(flowId); | |||||
if (namespace == null) { | |||||
return 0; | |||||
} | |||||
return ConnectionManager.getConnectedCount(namespace); | |||||
} | |||||
private static void applyClusterFlowRule(List<FlowRule> list, /*@Valid*/ String namespace) { | |||||
if (list == null || list.isEmpty()) { | |||||
clearAndResetRulesFor(namespace); | |||||
return; | |||||
} | |||||
final ConcurrentHashMap<Long, FlowRule> ruleMap = new ConcurrentHashMap<>(); | |||||
Set<Long> flowIdSet = new HashSet<>(); | |||||
for (FlowRule rule : list) { | |||||
if (!rule.isClusterMode()) { | |||||
continue; | |||||
} | |||||
if (!FlowRuleUtil.isValidRule(rule)) { | |||||
RecordLog.warn( | |||||
"[ClusterFlowRuleManager] Ignoring invalid flow rule when loading new flow rules: " + rule); | |||||
continue; | |||||
} | |||||
if (StringUtil.isBlank(rule.getLimitApp())) { | |||||
rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT); | |||||
} | |||||
// Flow id should not be null after filtered. | |||||
Long flowId = rule.getClusterConfig().getFlowId(); | |||||
if (flowId == null) { | |||||
continue; | |||||
} | |||||
ruleMap.put(flowId, rule); | |||||
FLOW_NAMESPACE_MAP.put(flowId, namespace); | |||||
flowIdSet.add(flowId); | |||||
// Prepare cluster metric from valid flow ID. | |||||
ClusterMetricStatistics.putMetricIfAbsent(flowId, | |||||
new ClusterMetric(ClusterServerConfigManager.getSampleCount(), | |||||
ClusterServerConfigManager.getIntervalMs())); | |||||
} | |||||
// Cleanup unused cluster metrics. | |||||
clearAndResetRulesConditional(namespace, new Predicate<Long>() { | |||||
@Override | |||||
public boolean test(Long flowId) { | |||||
return !ruleMap.containsKey(flowId); | |||||
} | |||||
}); | |||||
FLOW_RULES.putAll(ruleMap); | |||||
NAMESPACE_FLOW_ID_MAP.put(namespace, flowIdSet); | |||||
} | |||||
private static final class FlowRulePropertyListener implements PropertyListener<List<FlowRule>> { | |||||
private final String namespace; | |||||
public FlowRulePropertyListener(String namespace) { | |||||
this.namespace = namespace; | |||||
} | |||||
@Override | |||||
public synchronized void configUpdate(List<FlowRule> conf) { | |||||
applyClusterFlowRule(conf, namespace); | |||||
RecordLog.info("[ClusterFlowRuleManager] Cluster flow rules received for namespace <{0}>: {1}", | |||||
namespace, FLOW_RULES); | |||||
} | |||||
@Override | |||||
public synchronized void configLoad(List<FlowRule> conf) { | |||||
applyClusterFlowRule(conf, namespace); | |||||
RecordLog.info("[ClusterFlowRuleManager] Cluster flow rules loaded for namespace <{0}>: {1}", | |||||
namespace, FLOW_RULES); | |||||
} | |||||
} | |||||
private ClusterFlowRuleManager() {} | |||||
} |
@@ -0,0 +1,307 @@ | |||||
/* | |||||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||||
* | |||||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||||
* you may not use this file except in compliance with the License. | |||||
* You may obtain a copy of the License at | |||||
* | |||||
* http://www.apache.org/licenses/LICENSE-2.0 | |||||
* | |||||
* Unless required by applicable law or agreed to in writing, software | |||||
* distributed under the License is distributed on an "AS IS" BASIS, | |||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
* See the License for the specific language governing permissions and | |||||
* limitations under the License. | |||||
*/ | |||||
package com.alibaba.csp.sentinel.cluster.flow.rule; | |||||
import java.util.HashSet; | |||||
import java.util.List; | |||||
import java.util.Map; | |||||
import java.util.Set; | |||||
import java.util.concurrent.ConcurrentHashMap; | |||||
import com.alibaba.csp.sentinel.cluster.flow.statistic.ClusterParamMetricStatistics; | |||||
import com.alibaba.csp.sentinel.cluster.flow.statistic.metric.ClusterParamMetric; | |||||
import com.alibaba.csp.sentinel.cluster.server.ServerConstants; | |||||
import com.alibaba.csp.sentinel.cluster.server.config.ClusterServerConfigManager; | |||||
import com.alibaba.csp.sentinel.cluster.server.connection.ConnectionManager; | |||||
import com.alibaba.csp.sentinel.cluster.server.util.ClusterRuleUtil; | |||||
import com.alibaba.csp.sentinel.log.RecordLog; | |||||
import com.alibaba.csp.sentinel.property.DynamicSentinelProperty; | |||||
import com.alibaba.csp.sentinel.property.PropertyListener; | |||||
import com.alibaba.csp.sentinel.property.SentinelProperty; | |||||
import com.alibaba.csp.sentinel.slots.block.RuleConstant; | |||||
import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowRule; | |||||
import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowRuleUtil; | |||||
import com.alibaba.csp.sentinel.util.AssertUtil; | |||||
import com.alibaba.csp.sentinel.util.StringUtil; | |||||
import com.alibaba.csp.sentinel.util.function.Function; | |||||
import com.alibaba.csp.sentinel.util.function.Predicate; | |||||
/** | |||||
* Manager for cluster parameter flow rules. | |||||
* | |||||
* @author Eric Zhao | |||||
* @since 1.4.0 | |||||
*/ | |||||
public final class ClusterParamFlowRuleManager { | |||||
/** | |||||
* The default cluster parameter flow rule property supplier that creates a new | |||||
* dynamic property for a specific namespace to manually do rule management. | |||||
*/ | |||||
public static final Function<String, SentinelProperty<List<ParamFlowRule>>> DEFAULT_PROPERTY_SUPPLIER = | |||||
new Function<String, SentinelProperty<List<ParamFlowRule>>>() { | |||||
@Override | |||||
public SentinelProperty<List<ParamFlowRule>> apply(String namespace) { | |||||
return new DynamicSentinelProperty<>(); | |||||
} | |||||
}; | |||||
/** | |||||
* (id, clusterParamRule) | |||||
*/ | |||||
private static final Map<Long, ParamFlowRule> PARAM_RULES = new ConcurrentHashMap<>(); | |||||
/** | |||||
* (namespace, [flowId...]) | |||||
*/ | |||||
private static final Map<String, Set<Long>> NAMESPACE_FLOW_ID_MAP = new ConcurrentHashMap<>(); | |||||
/** | |||||
* (flowId, namespace) | |||||
*/ | |||||
private static final Map<Long, String> FLOW_NAMESPACE_MAP = new ConcurrentHashMap<>(); | |||||
/** | |||||
* (namespace, property-listener wrapper) | |||||
*/ | |||||
private static final Map<String, NamespaceFlowProperty<ParamFlowRule>> PROPERTY_MAP = new ConcurrentHashMap<>(); | |||||
/** | |||||
* Cluster parameter flow rule property supplier for a specific namespace. | |||||
*/ | |||||
private static volatile Function<String, SentinelProperty<List<ParamFlowRule>>> propertySupplier | |||||
= DEFAULT_PROPERTY_SUPPLIER; | |||||
private static final Object UPDATE_LOCK = new Object(); | |||||
static { | |||||
initDefaultProperty(); | |||||
} | |||||
private static void initDefaultProperty() { | |||||
SentinelProperty<List<ParamFlowRule>> defaultProperty = new DynamicSentinelProperty<>(); | |||||
String defaultNamespace = ServerConstants.DEFAULT_NAMESPACE; | |||||
registerPropertyInternal(defaultNamespace, defaultProperty); | |||||
} | |||||
public static void setPropertySupplier( | |||||
Function<String, SentinelProperty<List<ParamFlowRule>>> propertySupplier) { | |||||
ClusterParamFlowRuleManager.propertySupplier = propertySupplier; | |||||
} | |||||
/** | |||||
* Listen to the {@link SentinelProperty} for cluster {@link ParamFlowRule}s. | |||||
* The property is the source of cluster {@link ParamFlowRule}s for a specific namespace. | |||||
* | |||||
* @param namespace namespace to register | |||||
*/ | |||||
public static void register2Property(String namespace) { | |||||
AssertUtil.notEmpty(namespace, "namespace cannot be empty"); | |||||
if (propertySupplier == null) { | |||||
RecordLog.warn( | |||||
"[ClusterParamFlowRuleManager] Cluster param rule property supplier is absent, cannot register " | |||||
+ "property"); | |||||
return; | |||||
} | |||||
SentinelProperty<List<ParamFlowRule>> property = propertySupplier.apply(namespace); | |||||
if (property == null) { | |||||
RecordLog.warn( | |||||
"[ClusterParamFlowRuleManager] Wrong created property from cluster param rule property supplier, " | |||||
+ "ignoring"); | |||||
return; | |||||
} | |||||
synchronized (UPDATE_LOCK) { | |||||
RecordLog.info("[ClusterParamFlowRuleManager] Registering new property to cluster param rule manager" | |||||
+ " for namespace <{0}>", namespace); | |||||
registerPropertyInternal(namespace, property); | |||||
} | |||||
} | |||||
public static void registerPropertyIfAbsent(String namespace) { | |||||
AssertUtil.notEmpty(namespace, "namespace cannot be empty"); | |||||
if (!PROPERTY_MAP.containsKey(namespace)) { | |||||
synchronized (UPDATE_LOCK) { | |||||
if (!PROPERTY_MAP.containsKey(namespace)) { | |||||
register2Property(namespace); | |||||
} | |||||
} | |||||
} | |||||
} | |||||
private static void registerPropertyInternal(/*@NonNull*/ String namespace, /*@Valid*/ | |||||
SentinelProperty<List<ParamFlowRule>> property) { | |||||
NamespaceFlowProperty<ParamFlowRule> oldProperty = PROPERTY_MAP.get(namespace); | |||||
if (oldProperty != null) { | |||||
oldProperty.getProperty().removeListener(oldProperty.getListener()); | |||||
} | |||||
PropertyListener<List<ParamFlowRule>> listener = new ParamRulePropertyListener(namespace); | |||||
property.addListener(listener); | |||||
PROPERTY_MAP.put(namespace, new NamespaceFlowProperty<>(namespace, property, listener)); | |||||
Set<Long> flowIdSet = NAMESPACE_FLOW_ID_MAP.get(namespace); | |||||
if (flowIdSet == null) { | |||||
resetNamespaceFlowIdMapFor(namespace); | |||||
} | |||||
} | |||||
public static void removeProperty(String namespace) { | |||||
AssertUtil.notEmpty(namespace, "namespace cannot be empty"); | |||||
synchronized (UPDATE_LOCK) { | |||||
NamespaceFlowProperty<ParamFlowRule> property = PROPERTY_MAP.get(namespace); | |||||
if (property != null) { | |||||
property.getProperty().removeListener(property.getListener()); | |||||
PROPERTY_MAP.remove(namespace); | |||||
} | |||||
RecordLog.info("[ClusterParamFlowRuleManager] Removing property from cluster flow rule manager" | |||||
+ " for namespace <{0}>", namespace); | |||||
} | |||||
} | |||||
private static void removePropertyListeners() { | |||||
for (NamespaceFlowProperty<ParamFlowRule> property : PROPERTY_MAP.values()) { | |||||
property.getProperty().removeListener(property.getListener()); | |||||
} | |||||
} | |||||
private static void restorePropertyListeners() { | |||||
for (NamespaceFlowProperty<ParamFlowRule> p : PROPERTY_MAP.values()) { | |||||
p.getProperty().removeListener(p.getListener()); | |||||
p.getProperty().addListener(p.getListener()); | |||||
} | |||||
} | |||||
private static void resetNamespaceFlowIdMapFor(/*@Valid*/ String namespace) { | |||||
NAMESPACE_FLOW_ID_MAP.put(namespace, new HashSet<Long>()); | |||||
} | |||||
private static void clearAndResetRulesFor(/*@Valid*/ String namespace) { | |||||
Set<Long> flowIdSet = NAMESPACE_FLOW_ID_MAP.get(namespace); | |||||
if (flowIdSet != null && !flowIdSet.isEmpty()) { | |||||
for (Long flowId : flowIdSet) { | |||||
PARAM_RULES.remove(flowId); | |||||
FLOW_NAMESPACE_MAP.remove(flowId); | |||||
} | |||||
flowIdSet.clear(); | |||||
} else { | |||||
resetNamespaceFlowIdMapFor(namespace); | |||||
} | |||||
} | |||||
private static void clearAndResetRulesConditional(/*@Valid*/ String namespace, Predicate<Long> predicate) { | |||||
Set<Long> oldIdSet = NAMESPACE_FLOW_ID_MAP.get(namespace); | |||||
if (oldIdSet != null && !oldIdSet.isEmpty()) { | |||||
for (Long flowId : oldIdSet) { | |||||
if (predicate.test(flowId)) { | |||||
PARAM_RULES.remove(flowId); | |||||
FLOW_NAMESPACE_MAP.remove(flowId); | |||||
ClusterParamMetricStatistics.removeMetric(flowId); | |||||
} | |||||
} | |||||
oldIdSet.clear(); | |||||
} | |||||
} | |||||
public static ParamFlowRule getParamRuleById(Long id) { | |||||
if (!ClusterRuleUtil.validId(id)) { | |||||
return null; | |||||
} | |||||
return PARAM_RULES.get(id); | |||||
} | |||||
public static int getConnectedCount(long flowId) { | |||||
if (flowId <= 0) { | |||||
return 0; | |||||
} | |||||
String namespace = FLOW_NAMESPACE_MAP.get(flowId); | |||||
if (namespace == null) { | |||||
return 0; | |||||
} | |||||
return ConnectionManager.getConnectedCount(namespace); | |||||
} | |||||
private static class ParamRulePropertyListener implements PropertyListener<List<ParamFlowRule>> { | |||||
private final String namespace; | |||||
public ParamRulePropertyListener(String namespace) { | |||||
this.namespace = namespace; | |||||
} | |||||
@Override | |||||
public void configLoad(List<ParamFlowRule> conf) { | |||||
applyClusterParamRules(conf, namespace); | |||||
RecordLog.info("[ClusterParamFlowRuleManager] Cluster parameter rules loaded for namespace <{0}>: {1}", | |||||
namespace, PARAM_RULES); | |||||
} | |||||
@Override | |||||
public void configUpdate(List<ParamFlowRule> conf) { | |||||
applyClusterParamRules(conf, namespace); | |||||
RecordLog.info("[ClusterParamFlowRuleManager] Cluster parameter rules received for namespace <{0}>: {1}", | |||||
namespace, PARAM_RULES); | |||||
} | |||||
} | |||||
private static void applyClusterParamRules(List<ParamFlowRule> list, /*@Valid*/ String namespace) { | |||||
if (list == null || list.isEmpty()) { | |||||
clearAndResetRulesFor(namespace); | |||||
return; | |||||
} | |||||
final ConcurrentHashMap<Long, ParamFlowRule> ruleMap = new ConcurrentHashMap<>(); | |||||
Set<Long> flowIdSet = new HashSet<>(); | |||||
for (ParamFlowRule rule : list) { | |||||
if (!rule.isClusterMode()) { | |||||
continue; | |||||
} | |||||
if (!ParamFlowRuleUtil.isValidRule(rule)) { | |||||
RecordLog.warn( | |||||
"[ClusterParamFlowRuleManager] Ignoring invalid param flow rule when loading new flow rules: " | |||||
+ rule); | |||||
continue; | |||||
} | |||||
if (StringUtil.isBlank(rule.getLimitApp())) { | |||||
rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT); | |||||
} | |||||
ParamFlowRuleUtil.fillExceptionFlowItems(rule); | |||||
// Flow id should not be null after filtered. | |||||
Long flowId = rule.getClusterConfig().getFlowId(); | |||||
if (flowId == null) { | |||||
continue; | |||||
} | |||||
ruleMap.put(flowId, rule); | |||||
FLOW_NAMESPACE_MAP.put(flowId, namespace); | |||||
flowIdSet.add(flowId); | |||||
// Prepare cluster parameter metric from valid rule ID. | |||||
ClusterParamMetricStatistics.putMetricIfAbsent(flowId, | |||||
new ClusterParamMetric(ClusterServerConfigManager.getSampleCount(), | |||||
ClusterServerConfigManager.getIntervalMs())); | |||||
} | |||||
// Cleanup unused cluster parameter metrics. | |||||
clearAndResetRulesConditional(namespace, new Predicate<Long>() { | |||||
@Override | |||||
public boolean test(Long flowId) { | |||||
return !ruleMap.containsKey(flowId); | |||||
} | |||||
}); | |||||
PARAM_RULES.putAll(ruleMap); | |||||
NAMESPACE_FLOW_ID_MAP.put(namespace, flowIdSet); | |||||
} | |||||
private ClusterParamFlowRuleManager() {} | |||||
} |
@@ -0,0 +1,56 @@ | |||||
/* | |||||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||||
* | |||||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||||
* you may not use this file except in compliance with the License. | |||||
* You may obtain a copy of the License at | |||||
* | |||||
* http://www.apache.org/licenses/LICENSE-2.0 | |||||
* | |||||
* Unless required by applicable law or agreed to in writing, software | |||||
* distributed under the License is distributed on an "AS IS" BASIS, | |||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
* See the License for the specific language governing permissions and | |||||
* limitations under the License. | |||||
*/ | |||||
package com.alibaba.csp.sentinel.cluster.flow.rule; | |||||
import java.util.List; | |||||
import com.alibaba.csp.sentinel.property.PropertyListener; | |||||
import com.alibaba.csp.sentinel.property.SentinelProperty; | |||||
/** | |||||
* A property wrapper for list of rules of a given namespace. | |||||
* This is useful for auto-management of the property and listener. | |||||
* | |||||
* @param <T> type of the rule | |||||
* @author Eric Zhao | |||||
* @since 1.4.0 | |||||
*/ | |||||
class NamespaceFlowProperty<T> { | |||||
private final String namespace; | |||||
private final SentinelProperty<List<T>> property; | |||||
private final PropertyListener<List<T>> listener; | |||||
public NamespaceFlowProperty(String namespace, | |||||
SentinelProperty<List<T>> property, | |||||
PropertyListener<List<T>> listener) { | |||||
this.namespace = namespace; | |||||
this.property = property; | |||||
this.listener = listener; | |||||
} | |||||
public SentinelProperty<List<T>> getProperty() { | |||||
return property; | |||||
} | |||||
public String getNamespace() { | |||||
return namespace; | |||||
} | |||||
public PropertyListener<List<T>> getListener() { | |||||
return listener; | |||||
} | |||||
} |
@@ -16,9 +16,11 @@ | |||||
package com.alibaba.csp.sentinel.cluster.flow.statistic; | package com.alibaba.csp.sentinel.cluster.flow.statistic; | ||||
import java.util.Map; | import java.util.Map; | ||||
import java.util.Set; | |||||
import java.util.concurrent.ConcurrentHashMap; | import java.util.concurrent.ConcurrentHashMap; | ||||
import com.alibaba.csp.sentinel.cluster.flow.statistic.metric.ClusterMetric; | import com.alibaba.csp.sentinel.cluster.flow.statistic.metric.ClusterMetric; | ||||
import com.alibaba.csp.sentinel.cluster.server.config.ClusterServerConfigManager; | |||||
import com.alibaba.csp.sentinel.util.AssertUtil; | import com.alibaba.csp.sentinel.util.AssertUtil; | ||||
/** | /** | ||||
@@ -55,5 +57,13 @@ public final class ClusterMetricStatistics { | |||||
return METRIC_MAP.get(id); | return METRIC_MAP.get(id); | ||||
} | } | ||||
public static void resetFlowMetrics() { | |||||
Set<Long> keySet = METRIC_MAP.keySet(); | |||||
for (Long id : keySet) { | |||||
METRIC_MAP.put(id, new ClusterMetric(ClusterServerConfigManager.getSampleCount(), | |||||
ClusterServerConfigManager.getIntervalMs())); | |||||
} | |||||
} | |||||
private ClusterMetricStatistics() {} | private ClusterMetricStatistics() {} | ||||
} | } |
@@ -16,9 +16,11 @@ | |||||
package com.alibaba.csp.sentinel.cluster.flow.statistic; | package com.alibaba.csp.sentinel.cluster.flow.statistic; | ||||
import java.util.Map; | import java.util.Map; | ||||
import java.util.Set; | |||||
import java.util.concurrent.ConcurrentHashMap; | import java.util.concurrent.ConcurrentHashMap; | ||||
import com.alibaba.csp.sentinel.cluster.flow.statistic.metric.ClusterParamMetric; | import com.alibaba.csp.sentinel.cluster.flow.statistic.metric.ClusterParamMetric; | ||||
import com.alibaba.csp.sentinel.cluster.server.config.ClusterServerConfigManager; | |||||
import com.alibaba.csp.sentinel.util.AssertUtil; | import com.alibaba.csp.sentinel.util.AssertUtil; | ||||
/** | /** | ||||
@@ -55,5 +57,13 @@ public final class ClusterParamMetricStatistics { | |||||
return METRIC_MAP.get(id); | return METRIC_MAP.get(id); | ||||
} | } | ||||
public static void resetFlowMetrics() { | |||||
Set<Long> keySet = METRIC_MAP.keySet(); | |||||
for (Long id : keySet) { | |||||
METRIC_MAP.put(id, new ClusterParamMetric(ClusterServerConfigManager.getSampleCount(), | |||||
ClusterServerConfigManager.getIntervalMs())); | |||||
} | |||||
} | |||||
private ClusterParamMetricStatistics() {} | private ClusterParamMetricStatistics() {} | ||||
} | } |
@@ -17,6 +17,7 @@ package com.alibaba.csp.sentinel.cluster.flow.statistic.data; | |||||
/** | /** | ||||
* @author Eric Zhao | * @author Eric Zhao | ||||
* @since 1.4.0 | |||||
*/ | */ | ||||
public enum ClusterFlowEvent { | public enum ClusterFlowEvent { | ||||
@@ -36,7 +37,16 @@ public enum ClusterFlowEvent { | |||||
* Token request (from client) blocked. | * Token request (from client) blocked. | ||||
*/ | */ | ||||
BLOCK_REQUEST, | BLOCK_REQUEST, | ||||
/** | |||||
* Pass (pre-occupy incoming buckets). | |||||
*/ | |||||
OCCUPIED_PASS, | OCCUPIED_PASS, | ||||
/** | |||||
* Block (pre-occupy incoming buckets failed). | |||||
*/ | |||||
OCCUPIED_BLOCK, | OCCUPIED_BLOCK, | ||||
/** | |||||
* Waiting due to flow shaping or for next bucket tick. | |||||
*/ | |||||
WAITING | WAITING | ||||
} | } |
@@ -19,6 +19,7 @@ import com.alibaba.csp.sentinel.slots.statistic.base.LongAdder; | |||||
/** | /** | ||||
* @author Eric Zhao | * @author Eric Zhao | ||||
* @since 1.4.0 | |||||
*/ | */ | ||||
public class ClusterMetricBucket { | public class ClusterMetricBucket { | ||||
@@ -19,6 +19,7 @@ import java.util.List; | |||||
import com.alibaba.csp.sentinel.cluster.flow.statistic.data.ClusterFlowEvent; | import com.alibaba.csp.sentinel.cluster.flow.statistic.data.ClusterFlowEvent; | ||||
import com.alibaba.csp.sentinel.cluster.flow.statistic.data.ClusterMetricBucket; | import com.alibaba.csp.sentinel.cluster.flow.statistic.data.ClusterMetricBucket; | ||||
import com.alibaba.csp.sentinel.util.AssertUtil; | |||||
/** | /** | ||||
* @author Eric Zhao | * @author Eric Zhao | ||||
@@ -28,8 +29,12 @@ public class ClusterMetric { | |||||
private final ClusterMetricLeapArray metric; | private final ClusterMetricLeapArray metric; | ||||
public ClusterMetric(int windowLengthInMs, int intervalInSec) { | |||||
this.metric = new ClusterMetricLeapArray(windowLengthInMs, intervalInSec); | |||||
public ClusterMetric(int sampleCount, int intervalInMs) { | |||||
AssertUtil.isTrue(sampleCount > 0, "sampleCount should be positive"); | |||||
AssertUtil.isTrue(intervalInMs > 0, "interval should be positive"); | |||||
AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided"); | |||||
int windowLengthInMs = intervalInMs / sampleCount; | |||||
this.metric = new ClusterMetricLeapArray(windowLengthInMs, intervalInMs); | |||||
} | } | ||||
public void add(ClusterFlowEvent event, long count) { | public void add(ClusterFlowEvent event, long count) { | ||||
@@ -40,6 +45,12 @@ public class ClusterMetric { | |||||
return metric.currentWindow().value().get(event); | return metric.currentWindow().value().get(event); | ||||
} | } | ||||
/** | |||||
* Get total sum for provided event in {@code intervalInSec}. | |||||
* | |||||
* @param event event to calculate | |||||
* @return total sum for event | |||||
*/ | |||||
public long getSum(ClusterFlowEvent event) { | public long getSum(ClusterFlowEvent event) { | ||||
metric.currentWindow(); | metric.currentWindow(); | ||||
long sum = 0; | long sum = 0; | ||||
@@ -51,11 +62,18 @@ public class ClusterMetric { | |||||
return sum; | return sum; | ||||
} | } | ||||
/** | |||||
* Get average count for provided event per second. | |||||
* | |||||
* @param event event to calculate | |||||
* @return average count per second for event | |||||
*/ | |||||
public double getAvg(ClusterFlowEvent event) { | public double getAvg(ClusterFlowEvent event) { | ||||
return getSum(event) / metric.getIntervalInSecond(); | return getSum(event) / metric.getIntervalInSecond(); | ||||
} | } | ||||
/** | /** | ||||
* Try to pre-occupy upcoming buckets. | |||||
* | * | ||||
* @return time to wait for next bucket (in ms); 0 if cannot occupy next buckets | * @return time to wait for next bucket (in ms); 0 if cannot occupy next buckets | ||||
*/ | */ | ||||
@@ -70,7 +88,13 @@ public class ClusterMetric { | |||||
} | } | ||||
private boolean canOccupy(ClusterFlowEvent event, int acquireCount, double latestQps, double threshold) { | private boolean canOccupy(ClusterFlowEvent event, int acquireCount, double latestQps, double threshold) { | ||||
// TODO | |||||
return metric.getOccupiedCount(event) + latestQps + acquireCount /*- xxx*/ <= threshold; | |||||
long headPass = metric.getFirstCountOfWindow(event); | |||||
long occupiedCount = metric.getOccupiedCount(event); | |||||
// bucket to occupy (= incoming bucket) | |||||
// ↓ | |||||
// | head bucket | | | | current bucket | | |||||
// +-------------+----+----+----+----------- ----+ | |||||
// (headPass) | |||||
return latestQps + (acquireCount + occupiedCount) - headPass <= threshold; | |||||
} | } | ||||
} | } |
@@ -31,13 +31,13 @@ public class ClusterMetricLeapArray extends LeapArray<ClusterMetricBucket> { | |||||
private boolean hasOccupied = false; | private boolean hasOccupied = false; | ||||
/** | /** | ||||
* The total bucket count is: {@link #sampleCount} = intervalInSec * 1000 / windowLengthInMs. | |||||
* The total bucket count is: {@link #sampleCount} = intervalInMs / windowLengthInMs. | |||||
* | * | ||||
* @param windowLengthInMs a single window bucket's time length in milliseconds. | * @param windowLengthInMs a single window bucket's time length in milliseconds. | ||||
* @param intervalInSec the total time span of this {@link LeapArray} in seconds. | |||||
* @param intervalInMs the total time span of this {@link LeapArray} in milliseconds. | |||||
*/ | */ | ||||
public ClusterMetricLeapArray(int windowLengthInMs, int intervalInSec) { | |||||
super(windowLengthInMs, intervalInSec); | |||||
public ClusterMetricLeapArray(int windowLengthInMs, int intervalInMs) { | |||||
super(windowLengthInMs, intervalInMs / 1000); | |||||
ClusterFlowEvent[] events = ClusterFlowEvent.values(); | ClusterFlowEvent[] events = ClusterFlowEvent.values(); | ||||
this.occupyCounter = new LongAdder[events.length]; | this.occupyCounter = new LongAdder[events.length]; | ||||
for (ClusterFlowEvent event : events) { | for (ClusterFlowEvent event : events) { | ||||
@@ -84,4 +84,15 @@ public class ClusterMetricLeapArray extends LeapArray<ClusterMetricBucket> { | |||||
public long getOccupiedCount(ClusterFlowEvent event) { | public long getOccupiedCount(ClusterFlowEvent event) { | ||||
return occupyCounter[event.ordinal()].sum(); | return occupyCounter[event.ordinal()].sum(); | ||||
} | } | ||||
public long getFirstCountOfWindow(ClusterFlowEvent event) { | |||||
if (event == null) { | |||||
return 0; | |||||
} | |||||
WindowWrap<ClusterMetricBucket> windowWrap = getValidHead(); | |||||
if (windowWrap == null) { | |||||
return 0; | |||||
} | |||||
return windowWrap.value().get(event); | |||||
} | |||||
} | } |
@@ -19,20 +19,28 @@ import java.util.List; | |||||
import com.alibaba.csp.sentinel.slots.statistic.base.LongAdder; | import com.alibaba.csp.sentinel.slots.statistic.base.LongAdder; | ||||
import com.alibaba.csp.sentinel.slots.statistic.cache.CacheMap; | import com.alibaba.csp.sentinel.slots.statistic.cache.CacheMap; | ||||
import com.alibaba.csp.sentinel.util.AssertUtil; | |||||
/** | /** | ||||
* @author Eric Zhao | * @author Eric Zhao | ||||
* @since 1.4.0 | |||||
*/ | */ | ||||
public class ClusterParamMetric { | public class ClusterParamMetric { | ||||
public static final int DEFAULT_CLUSTER_MAX_CAPACITY = 4000; | |||||
private final ClusterParameterLeapArray<LongAdder> metric; | private final ClusterParameterLeapArray<LongAdder> metric; | ||||
public ClusterParamMetric(int windowLengthInMs, int intervalInSec) { | |||||
this.metric = new ClusterParameterLeapArray<>(windowLengthInMs, intervalInSec); | |||||
public ClusterParamMetric(int sampleCount, int intervalInMs) { | |||||
this(sampleCount, intervalInMs, DEFAULT_CLUSTER_MAX_CAPACITY); | |||||
} | } | ||||
public ClusterParamMetric(int windowLengthInMs, int intervalInSec, int maxCapacity) { | |||||
this.metric = new ClusterParameterLeapArray<>(windowLengthInMs, intervalInSec, maxCapacity); | |||||
public ClusterParamMetric(int sampleCount, int intervalInMs, int maxCapacity) { | |||||
AssertUtil.isTrue(sampleCount > 0, "sampleCount should be positive"); | |||||
AssertUtil.isTrue(intervalInMs > 0, "interval should be positive"); | |||||
AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided"); | |||||
int windowLengthInMs = intervalInMs / sampleCount; | |||||
this.metric = new ClusterParameterLeapArray<>(windowLengthInMs, intervalInMs, maxCapacity); | |||||
} | } | ||||
public long getSum(Object value) { | public long getSum(Object value) { | ||||
@@ -45,7 +53,8 @@ public class ClusterParamMetric { | |||||
List<CacheMap<Object, LongAdder>> buckets = metric.values(); | List<CacheMap<Object, LongAdder>> buckets = metric.values(); | ||||
for (CacheMap<Object, LongAdder> bucket : buckets) { | for (CacheMap<Object, LongAdder> bucket : buckets) { | ||||
sum += getCount(bucket.get(value)); | |||||
long count = getCount(bucket.get(value)); | |||||
sum += count; | |||||
} | } | ||||
return sum; | return sum; | ||||
} | } | ||||
@@ -22,20 +22,16 @@ import com.alibaba.csp.sentinel.slots.statistic.cache.ConcurrentLinkedHashMapWra | |||||
import com.alibaba.csp.sentinel.util.AssertUtil; | import com.alibaba.csp.sentinel.util.AssertUtil; | ||||
/** | /** | ||||
* @author Eric Zhao | |||||
* @param <C> counter type | * @param <C> counter type | ||||
* @author Eric Zhao | |||||
* @since 1.4.0 | * @since 1.4.0 | ||||
*/ | */ | ||||
public class ClusterParameterLeapArray<C> extends LeapArray<CacheMap<Object, C>> { | public class ClusterParameterLeapArray<C> extends LeapArray<CacheMap<Object, C>> { | ||||
private final int maxCapacity; | private final int maxCapacity; | ||||
public ClusterParameterLeapArray(int windowLengthInMs, int intervalInSec) { | |||||
this(windowLengthInMs, intervalInSec, DEFAULT_CLUSTER_MAX_CAPACITY); | |||||
} | |||||
public ClusterParameterLeapArray(int windowLengthInMs, int intervalInSec, int maxCapacity) { | |||||
super(windowLengthInMs, intervalInSec); | |||||
public ClusterParameterLeapArray(int windowLengthInMs, int intervalInMs, int maxCapacity) { | |||||
super(windowLengthInMs, intervalInMs / 1000); | |||||
AssertUtil.isTrue(maxCapacity > 0, "maxCapacity of LRU map should be positive"); | AssertUtil.isTrue(maxCapacity > 0, "maxCapacity of LRU map should be positive"); | ||||
this.maxCapacity = maxCapacity; | this.maxCapacity = maxCapacity; | ||||
} | } | ||||
@@ -46,11 +42,11 @@ public class ClusterParameterLeapArray<C> extends LeapArray<CacheMap<Object, C>> | |||||
} | } | ||||
@Override | @Override | ||||
protected WindowWrap<CacheMap<Object, C>> resetWindowTo(WindowWrap<CacheMap<Object, C>> w, | |||||
long startTime) { | |||||
protected WindowWrap<CacheMap<Object, C>> resetWindowTo(WindowWrap<CacheMap<Object, C>> w, long startTime) { | |||||
w.resetTo(startTime); | |||||
w.value().clear(); | w.value().clear(); | ||||
return w; | return w; | ||||
} | } | ||||
public static final int DEFAULT_CLUSTER_MAX_CAPACITY = 4000; | |||||
} | } |
@@ -0,0 +1,61 @@ | |||||
/* | |||||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||||
* | |||||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||||
* you may not use this file except in compliance with the License. | |||||
* You may obtain a copy of the License at | |||||
* | |||||
* http://www.apache.org/licenses/LICENSE-2.0 | |||||
* | |||||
* Unless required by applicable law or agreed to in writing, software | |||||
* distributed under the License is distributed on an "AS IS" BASIS, | |||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
* See the License for the specific language governing permissions and | |||||
* limitations under the License. | |||||
*/ | |||||
package com.alibaba.csp.sentinel.cluster.server; | |||||
import java.util.Collection; | |||||
import com.alibaba.csp.sentinel.cluster.TokenResult; | |||||
import com.alibaba.csp.sentinel.cluster.TokenResultStatus; | |||||
import com.alibaba.csp.sentinel.cluster.TokenService; | |||||
/** | |||||
* Default embedded token server in Sentinel which wraps the {@link SentinelDefaultTokenServer} | |||||
* and the {@link TokenService} from SPI provider. | |||||
* | |||||
* @author Eric Zhao | |||||
* @since 1.4.0 | |||||
*/ | |||||
public class DefaultEmbeddedTokenServer implements EmbeddedClusterTokenServer { | |||||
private final TokenService tokenService = TokenServiceProvider.getService(); | |||||
private final ClusterTokenServer server = new SentinelDefaultTokenServer(true); | |||||
@Override | |||||
public void start() throws Exception { | |||||
server.start(); | |||||
} | |||||
@Override | |||||
public void stop() throws Exception { | |||||
server.stop(); | |||||
} | |||||
@Override | |||||
public TokenResult requestToken(Long ruleId, int acquireCount, boolean prioritized) { | |||||
if (tokenService != null) { | |||||
return tokenService.requestToken(ruleId, acquireCount, prioritized); | |||||
} | |||||
return new TokenResult(TokenResultStatus.FAIL); | |||||
} | |||||
@Override | |||||
public TokenResult requestParamToken(Long ruleId, int acquireCount, Collection<Object> params) { | |||||
if (tokenService != null) { | |||||
return tokenService.requestParamToken(ruleId, acquireCount, params); | |||||
} | |||||
return new TokenResult(TokenResultStatus.FAIL); | |||||
} | |||||
} |
@@ -46,13 +46,16 @@ import static com.alibaba.csp.sentinel.cluster.server.ServerConstants.*; | |||||
/** | /** | ||||
* @author Eric Zhao | * @author Eric Zhao | ||||
* @since 1.4.0 | |||||
*/ | */ | ||||
public class NettyTransportServer implements ClusterTokenServer { | public class NettyTransportServer implements ClusterTokenServer { | ||||
private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt( | |||||
"io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2)); | |||||
private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1, | |||||
SystemPropertyUtil.getInt("io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2)); | |||||
private static final int MAX_RETRY_TIMES = 3; | |||||
private static final int RETRY_SLEEP_MS = 1000; | |||||
private final int port = 11111; | |||||
private final int port; | |||||
private NioEventLoopGroup bossGroup; | private NioEventLoopGroup bossGroup; | ||||
private NioEventLoopGroup workerGroup; | private NioEventLoopGroup workerGroup; | ||||
@@ -62,6 +65,10 @@ public class NettyTransportServer implements ClusterTokenServer { | |||||
private final AtomicInteger currentState = new AtomicInteger(SERVER_STATUS_OFF); | private final AtomicInteger currentState = new AtomicInteger(SERVER_STATUS_OFF); | ||||
private final AtomicInteger failedTimes = new AtomicInteger(0); | private final AtomicInteger failedTimes = new AtomicInteger(0); | ||||
public NettyTransportServer(int port) { | |||||
this.port = port; | |||||
} | |||||
@Override | @Override | ||||
public void start() { | public void start() { | ||||
if (!currentState.compareAndSet(SERVER_STATUS_OFF, SERVER_STATUS_STARTING)) { | if (!currentState.compareAndSet(SERVER_STATUS_OFF, SERVER_STATUS_STARTING)) { | ||||
@@ -92,23 +99,27 @@ public class NettyTransportServer implements ClusterTokenServer { | |||||
.childOption(ChannelOption.SO_TIMEOUT, 10) | .childOption(ChannelOption.SO_TIMEOUT, 10) | ||||
.childOption(ChannelOption.TCP_NODELAY, true) | .childOption(ChannelOption.TCP_NODELAY, true) | ||||
.childOption(ChannelOption.SO_RCVBUF, 32 * 1024); | .childOption(ChannelOption.SO_RCVBUF, 32 * 1024); | ||||
b.bind(Integer.valueOf(port)).addListener(new GenericFutureListener<ChannelFuture>() { | |||||
b.bind(port).addListener(new GenericFutureListener<ChannelFuture>() { | |||||
@Override | @Override | ||||
public void operationComplete(ChannelFuture future) { | public void operationComplete(ChannelFuture future) { | ||||
if (future.cause() != null) { | if (future.cause() != null) { | ||||
RecordLog.info("Token server start failed", future.cause()); | |||||
RecordLog.info("[NettyTransportServer] Token server start failed (port=" + port + ")", | |||||
future.cause()); | |||||
currentState.compareAndSet(SERVER_STATUS_STARTING, SERVER_STATUS_OFF); | currentState.compareAndSet(SERVER_STATUS_STARTING, SERVER_STATUS_OFF); | ||||
//try { | |||||
// Thread.sleep((failStartTimes.get() + 1) * 1000); | |||||
// start(); | |||||
//} catch (Throwable e) { | |||||
// RecordLog.info("Fail to start token server:", e); | |||||
//} | |||||
int failCount = failedTimes.incrementAndGet(); | |||||
if (failCount > MAX_RETRY_TIMES) { | |||||
return; | |||||
} | |||||
try { | |||||
Thread.sleep(failCount * RETRY_SLEEP_MS); | |||||
start(); | |||||
} catch (Throwable e) { | |||||
RecordLog.info("[NettyTransportServer] Failed to start token server when retrying", e); | |||||
} | |||||
} else { | } else { | ||||
RecordLog.info("Token server start success"); | |||||
RecordLog.info("[NettyTransportServer] Token server started success at port " + port); | |||||
currentState.compareAndSet(SERVER_STATUS_STARTING, SERVER_STATUS_STARTED); | currentState.compareAndSet(SERVER_STATUS_STARTING, SERVER_STATUS_STARTED); | ||||
//failStartTimes.set(0); | |||||
} | } | ||||
} | } | ||||
}); | }); | ||||
@@ -119,9 +130,9 @@ public class NettyTransportServer implements ClusterTokenServer { | |||||
// If still initializing, wait for ready. | // If still initializing, wait for ready. | ||||
while (currentState.get() == SERVER_STATUS_STARTING) { | while (currentState.get() == SERVER_STATUS_STARTING) { | ||||
try { | try { | ||||
Thread.sleep(1000); | |||||
Thread.sleep(500); | |||||
} catch (InterruptedException e) { | } catch (InterruptedException e) { | ||||
e.printStackTrace(); | |||||
// Ignore. | |||||
} | } | ||||
} | } | ||||
@@ -133,9 +144,9 @@ public class NettyTransportServer implements ClusterTokenServer { | |||||
failedTimes.set(0); | failedTimes.set(0); | ||||
RecordLog.info("Token server stopped"); | |||||
RecordLog.info("[NettyTransportServer] Sentinel token server stopped"); | |||||
} catch (Exception ex) { | } catch (Exception ex) { | ||||
RecordLog.warn("Failed to stop token server", ex); | |||||
RecordLog.warn("[NettyTransportServer] Failed to stop token server (port=" + port + ")", ex); | |||||
} | } | ||||
} | } | ||||
} | } | ||||
@@ -0,0 +1,142 @@ | |||||
/* | |||||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||||
* | |||||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||||
* you may not use this file except in compliance with the License. | |||||
* You may obtain a copy of the License at | |||||
* | |||||
* http://www.apache.org/licenses/LICENSE-2.0 | |||||
* | |||||
* Unless required by applicable law or agreed to in writing, software | |||||
* distributed under the License is distributed on an "AS IS" BASIS, | |||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
* See the License for the specific language governing permissions and | |||||
* limitations under the License. | |||||
*/ | |||||
package com.alibaba.csp.sentinel.cluster.server; | |||||
import java.util.concurrent.atomic.AtomicBoolean; | |||||
import com.alibaba.csp.sentinel.cluster.registry.ConfigSupplierRegistry; | |||||
import com.alibaba.csp.sentinel.cluster.server.config.ClusterServerConfigManager; | |||||
import com.alibaba.csp.sentinel.cluster.server.config.ServerTransportConfig; | |||||
import com.alibaba.csp.sentinel.cluster.server.config.ServerTransportConfigObserver; | |||||
import com.alibaba.csp.sentinel.cluster.server.connection.ConnectionManager; | |||||
import com.alibaba.csp.sentinel.init.InitExecutor; | |||||
import com.alibaba.csp.sentinel.log.RecordLog; | |||||
import com.alibaba.csp.sentinel.util.HostNameUtil; | |||||
import com.alibaba.csp.sentinel.util.StringUtil; | |||||
/** | |||||
* @author Eric Zhao | |||||
* @since 1.4.0 | |||||
*/ | |||||
public class SentinelDefaultTokenServer implements ClusterTokenServer { | |||||
private final boolean embedded; | |||||
private ClusterTokenServer server; | |||||
private int port; | |||||
private final AtomicBoolean shouldStart = new AtomicBoolean(false); | |||||
static { | |||||
InitExecutor.doInit(); | |||||
} | |||||
public SentinelDefaultTokenServer() { | |||||
this(false); | |||||
} | |||||
public SentinelDefaultTokenServer(boolean embedded) { | |||||
this.embedded = embedded; | |||||
ClusterServerConfigManager.addTransportConfigChangeObserver(new ServerTransportConfigObserver() { | |||||
@Override | |||||
public void onTransportConfigChange(ServerTransportConfig config) { | |||||
changeServerConfig(config); | |||||
} | |||||
}); | |||||
initNewServer(); | |||||
} | |||||
private void initNewServer() { | |||||
if (server != null) { | |||||
return; | |||||
} | |||||
int port = ClusterServerConfigManager.getPort(); | |||||
if (port > 0) { | |||||
this.server = new NettyTransportServer(port); | |||||
this.port = port; | |||||
} | |||||
} | |||||
private synchronized void changeServerConfig(ServerTransportConfig config) { | |||||
if (config == null || config.getPort() <= 0) { | |||||
return; | |||||
} | |||||
int newPort = config.getPort(); | |||||
if (newPort == port) { | |||||
return; | |||||
} | |||||
try { | |||||
if (server != null) { | |||||
stopServerIfStarted(); | |||||
} | |||||
this.server = new NettyTransportServer(newPort); | |||||
this.port = newPort; | |||||
startServerIfScheduled(); | |||||
} catch (Exception ex) { | |||||
RecordLog.warn("[SentinelDefaultTokenServer] Failed to apply modification to token server", ex); | |||||
} | |||||
} | |||||
private void startServerIfScheduled() throws Exception { | |||||
if (shouldStart.get()) { | |||||
if (server != null) { | |||||
server.start(); | |||||
if (embedded) { | |||||
RecordLog.info("[SentinelDefaultTokenServer] Running in embedded mode"); | |||||
handleEmbeddedStart(); | |||||
} | |||||
} | |||||
} | |||||
} | |||||
private void stopServerIfStarted() throws Exception { | |||||
if (shouldStart.get()) { | |||||
if (server != null) { | |||||
server.stop(); | |||||
if (embedded) { | |||||
handleEmbeddedStop(); | |||||
} | |||||
} | |||||
} | |||||
} | |||||
private void handleEmbeddedStop() { | |||||
String namespace = ConfigSupplierRegistry.getNamespaceSupplier().get(); | |||||
if (StringUtil.isNotEmpty(namespace)) { | |||||
ConnectionManager.removeConnection(namespace, HostNameUtil.getIp()); | |||||
} | |||||
} | |||||
private void handleEmbeddedStart() { | |||||
String namespace = ConfigSupplierRegistry.getNamespaceSupplier().get(); | |||||
if (StringUtil.isNotEmpty(namespace)) { | |||||
ConnectionManager.addConnection(namespace, HostNameUtil.getIp()); | |||||
} | |||||
} | |||||
@Override | |||||
public void start() throws Exception { | |||||
if (shouldStart.compareAndSet(false, true)) { | |||||
startServerIfScheduled(); | |||||
} | |||||
} | |||||
@Override | |||||
public void stop() throws Exception { | |||||
if (shouldStart.compareAndSet(true, false)) { | |||||
stopServerIfStarted(); | |||||
} | |||||
} | |||||
} |
@@ -52,6 +52,7 @@ public final class TokenServiceProvider { | |||||
} | } | ||||
if (hasOther) { | if (hasOther) { | ||||
// Pick the first. | |||||
service = list.get(0); | service = list.get(0); | ||||
} else { | } else { | ||||
// No custom token service, using default. | // No custom token service, using default. | ||||
@@ -54,7 +54,6 @@ public class DefaultRequestEntityDecoder implements RequestEntityDecoder<ByteBuf | |||||
if (source.readableBytes() == 0) { | if (source.readableBytes() == 0) { | ||||
data = null; | data = null; | ||||
} else { | } else { | ||||
// TODO: handle decode error here. | |||||
data = dataDecoder.decode(source); | data = dataDecoder.decode(source); | ||||
} | } | ||||
@@ -44,7 +44,6 @@ public class FlowRequestDataDecoder implements EntityDecoder<ByteBuf, FlowReques | |||||
} | } | ||||
return requestData; | return requestData; | ||||
} | } | ||||
// TODO: handle null here. | |||||
return null; | return null; | ||||
} | } | ||||
} | } |
@@ -25,7 +25,9 @@ import com.alibaba.csp.sentinel.cluster.request.data.ParamFlowRequestData; | |||||
import io.netty.buffer.ByteBuf; | import io.netty.buffer.ByteBuf; | ||||
/** | /** | ||||
* @author jialiang.linjl | |||||
* @author Eric Zhao | * @author Eric Zhao | ||||
* @since 1.4.0 | |||||
*/ | */ | ||||
public class ParamFlowRequestDataDecoder implements EntityDecoder<ByteBuf, ParamFlowRequestData> { | public class ParamFlowRequestDataDecoder implements EntityDecoder<ByteBuf, ParamFlowRequestData> { | ||||
@@ -38,7 +40,6 @@ public class ParamFlowRequestDataDecoder implements EntityDecoder<ByteBuf, Param | |||||
int amount = source.readInt(); | int amount = source.readInt(); | ||||
if (amount > 0) { | if (amount > 0) { | ||||
// TODO: should check rules exist here? | |||||
List<Object> params = new ArrayList<>(amount); | List<Object> params = new ArrayList<>(amount); | ||||
for (int i = 0; i < amount; i++) { | for (int i = 0; i < amount; i++) { | ||||
decodeParam(source, params); | decodeParam(source, params); | ||||
@@ -48,7 +49,6 @@ public class ParamFlowRequestDataDecoder implements EntityDecoder<ByteBuf, Param | |||||
return requestData; | return requestData; | ||||
} | } | ||||
} | } | ||||
// TODO: handle null here. | |||||
return null; | return null; | ||||
} | } | ||||
@@ -0,0 +1,40 @@ | |||||
/* | |||||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||||
* | |||||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||||
* you may not use this file except in compliance with the License. | |||||
* You may obtain a copy of the License at | |||||
* | |||||
* http://www.apache.org/licenses/LICENSE-2.0 | |||||
* | |||||
* Unless required by applicable law or agreed to in writing, software | |||||
* distributed under the License is distributed on an "AS IS" BASIS, | |||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
* See the License for the specific language governing permissions and | |||||
* limitations under the License. | |||||
*/ | |||||
package com.alibaba.csp.sentinel.cluster.server.codec.data; | |||||
import com.alibaba.csp.sentinel.cluster.codec.EntityDecoder; | |||||
import io.netty.buffer.ByteBuf; | |||||
/** | |||||
* @author Eric Zhao | |||||
* @since 1.4.0 | |||||
*/ | |||||
public class PingRequestDataDecoder implements EntityDecoder<ByteBuf, String> { | |||||
@Override | |||||
public String decode(ByteBuf source) { | |||||
if (source.readableBytes() >= 4) { | |||||
int length = source.readInt(); | |||||
if (length > 0 && source.readableBytes() > 0) { | |||||
byte[] bytes = new byte[length]; | |||||
source.readBytes(bytes); | |||||
return new String(bytes); | |||||
} | |||||
} | |||||
return null; | |||||
} | |||||
} |
@@ -0,0 +1,36 @@ | |||||
/* | |||||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||||
* | |||||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||||
* you may not use this file except in compliance with the License. | |||||
* You may obtain a copy of the License at | |||||
* | |||||
* http://www.apache.org/licenses/LICENSE-2.0 | |||||
* | |||||
* Unless required by applicable law or agreed to in writing, software | |||||
* distributed under the License is distributed on an "AS IS" BASIS, | |||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
* See the License for the specific language governing permissions and | |||||
* limitations under the License. | |||||
*/ | |||||
package com.alibaba.csp.sentinel.cluster.server.codec.data; | |||||
import com.alibaba.csp.sentinel.cluster.codec.EntityWriter; | |||||
import com.alibaba.csp.sentinel.util.StringUtil; | |||||
import io.netty.buffer.ByteBuf; | |||||
/** | |||||
* @author Eric Zhao | |||||
* @since 1.4.0 | |||||
*/ | |||||
public class PingResponseDataWriter implements EntityWriter<Integer, ByteBuf> { | |||||
@Override | |||||
public void writeTo(Integer entity, ByteBuf target) { | |||||
if (entity == null || target == null) { | |||||
return; | |||||
} | |||||
target.writeByte(entity); | |||||
} | |||||
} |
@@ -36,7 +36,6 @@ public class NettyRequestDecoder extends ByteToMessageDecoder { | |||||
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { | protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { | ||||
RequestEntityDecoder<ByteBuf, Request> requestDecoder = ServerEntityCodecProvider.getRequestEntityDecoder(); | RequestEntityDecoder<ByteBuf, Request> requestDecoder = ServerEntityCodecProvider.getRequestEntityDecoder(); | ||||
if (requestDecoder == null) { | if (requestDecoder == null) { | ||||
// TODO: may need to throw exception? | |||||
RecordLog.warn("[NettyRequestDecoder] Cannot resolve the global request entity decoder, " | RecordLog.warn("[NettyRequestDecoder] Cannot resolve the global request entity decoder, " | ||||
+ "dropping the request"); | + "dropping the request"); | ||||
return; | return; | ||||
@@ -15,22 +15,317 @@ | |||||
*/ | */ | ||||
package com.alibaba.csp.sentinel.cluster.server.config; | package com.alibaba.csp.sentinel.cluster.server.config; | ||||
import java.util.ArrayList; | |||||
import java.util.Collections; | |||||
import java.util.List; | |||||
import java.util.Map; | |||||
import java.util.Set; | |||||
import java.util.concurrent.ConcurrentHashMap; | |||||
import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterFlowRuleManager; | |||||
import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterParamFlowRuleManager; | |||||
import com.alibaba.csp.sentinel.cluster.flow.statistic.ClusterMetricStatistics; | |||||
import com.alibaba.csp.sentinel.cluster.flow.statistic.ClusterParamMetricStatistics; | |||||
import com.alibaba.csp.sentinel.cluster.server.ServerConstants; | |||||
import com.alibaba.csp.sentinel.log.RecordLog; | |||||
import com.alibaba.csp.sentinel.property.DynamicSentinelProperty; | |||||
import com.alibaba.csp.sentinel.property.PropertyListener; | |||||
import com.alibaba.csp.sentinel.property.SentinelProperty; | |||||
import com.alibaba.csp.sentinel.util.AssertUtil; | |||||
/** | /** | ||||
* @author Eric Zhao | * @author Eric Zhao | ||||
* @since 1.4.0 | |||||
*/ | */ | ||||
public final class ClusterServerConfigManager { | public final class ClusterServerConfigManager { | ||||
private static final int DEFAULT_PORT = 8730; | |||||
private static final int DEFAULT_IDLE_SECONDS = 600; | |||||
/** | |||||
* Server global transport and scope config. | |||||
*/ | |||||
private static volatile int port = ServerTransportConfig.DEFAULT_PORT; | |||||
private static volatile int idleSeconds = ServerTransportConfig.DEFAULT_IDLE_SECONDS; | |||||
private static volatile Set<String> namespaceSet = Collections.singleton(ServerConstants.DEFAULT_NAMESPACE); | |||||
/** | |||||
* Server global flow config. | |||||
*/ | |||||
private static volatile double exceedCount = ServerFlowConfig.DEFAULT_EXCEED_COUNT; | |||||
private static volatile double maxOccupyRatio = ServerFlowConfig.DEFAULT_MAX_OCCUPY_RATIO; | |||||
private static volatile int intervalMs = ServerFlowConfig.DEFAULT_INTERVAL_MS; | |||||
private static volatile int sampleCount = ServerFlowConfig.DEFAULT_SAMPLE_COUNT; | |||||
/** | |||||
* Namespace-specific flow config for token server. | |||||
* Format: (namespace, config). | |||||
*/ | |||||
private static final Map<String, ServerFlowConfig> NAMESPACE_CONF = new ConcurrentHashMap<>(); | |||||
private static final List<ServerTransportConfigObserver> TRANSPORT_CONFIG_OBSERVERS = new ArrayList<>(); | |||||
/** | |||||
* Property for cluster server global transport configuration. | |||||
*/ | |||||
private static SentinelProperty<ServerTransportConfig> transportConfigProperty = new DynamicSentinelProperty<>(); | |||||
/** | |||||
* Property for cluster server namespace set. | |||||
*/ | |||||
private static SentinelProperty<Set<String>> namespaceSetProperty = new DynamicSentinelProperty<>(); | |||||
/** | |||||
* Property for cluster server global flow control configuration. | |||||
*/ | |||||
private static SentinelProperty<ServerFlowConfig> globalFlowProperty = new DynamicSentinelProperty<>(); | |||||
private static final PropertyListener<ServerTransportConfig> TRANSPORT_PROPERTY_LISTENER | |||||
= new ServerGlobalTransportPropertyListener(); | |||||
private static final PropertyListener<ServerFlowConfig> GLOBAL_FLOW_PROPERTY_LISTENER | |||||
= new ServerGlobalFlowPropertyListener(); | |||||
private static final PropertyListener<Set<String>> NAMESPACE_SET_PROPERTY_LISTENER | |||||
= new ServerNamespaceSetPropertyListener(); | |||||
static { | |||||
transportConfigProperty.addListener(TRANSPORT_PROPERTY_LISTENER); | |||||
globalFlowProperty.addListener(GLOBAL_FLOW_PROPERTY_LISTENER); | |||||
namespaceSetProperty.addListener(NAMESPACE_SET_PROPERTY_LISTENER); | |||||
} | |||||
public static void registerNamespaceSetProperty(SentinelProperty<Set<String>> property) { | |||||
synchronized (NAMESPACE_SET_PROPERTY_LISTENER) { | |||||
RecordLog.info( | |||||
"[ClusterServerConfigManager] Registering new namespace set dynamic property to Sentinel server " | |||||
+ "config manager"); | |||||
namespaceSetProperty.removeListener(NAMESPACE_SET_PROPERTY_LISTENER); | |||||
property.addListener(NAMESPACE_SET_PROPERTY_LISTENER); | |||||
namespaceSetProperty = property; | |||||
} | |||||
} | |||||
public static void registerServerTransportProperty(SentinelProperty<ServerTransportConfig> property) { | |||||
synchronized (TRANSPORT_PROPERTY_LISTENER) { | |||||
RecordLog.info( | |||||
"[ClusterServerConfigManager] Registering new server transport dynamic property to Sentinel server " | |||||
+ "config manager"); | |||||
transportConfigProperty.removeListener(TRANSPORT_PROPERTY_LISTENER); | |||||
property.addListener(TRANSPORT_PROPERTY_LISTENER); | |||||
transportConfigProperty = property; | |||||
} | |||||
} | |||||
private static class ServerNamespaceSetPropertyListener implements PropertyListener<Set<String>> { | |||||
@Override | |||||
public synchronized void configLoad(Set<String> set) { | |||||
if (set == null || set.isEmpty()) { | |||||
RecordLog.warn("[ClusterServerConfigManager] WARN: empty initial server namespace set"); | |||||
return; | |||||
} | |||||
applyNamespaceSetChange(set); | |||||
} | |||||
@Override | |||||
public synchronized void configUpdate(Set<String> set) { | |||||
// TODO: should debounce? | |||||
applyNamespaceSetChange(set); | |||||
} | |||||
} | |||||
private static void applyNamespaceSetChange(Set<String> newSet) { | |||||
if (newSet == null) { | |||||
return; | |||||
} | |||||
RecordLog.info("[ClusterServerConfigManager] Server namespace set will be update to: " + newSet); | |||||
if (newSet.isEmpty()) { | |||||
ClusterServerConfigManager.namespaceSet = Collections.singleton(ServerConstants.DEFAULT_NAMESPACE); | |||||
return; | |||||
} | |||||
newSet.add(ServerConstants.DEFAULT_NAMESPACE); | |||||
Set<String> oldSet = ClusterServerConfigManager.namespaceSet; | |||||
if (oldSet != null && !oldSet.isEmpty()) { | |||||
for (String ns : oldSet) { | |||||
if (!newSet.contains(ns)) { | |||||
ClusterFlowRuleManager.removeProperty(ns); | |||||
ClusterParamFlowRuleManager.removeProperty(ns); | |||||
} | |||||
} | |||||
} | |||||
ClusterServerConfigManager.namespaceSet = newSet; | |||||
for (String ns : newSet) { | |||||
ClusterFlowRuleManager.registerPropertyIfAbsent(ns); | |||||
ClusterParamFlowRuleManager.registerPropertyIfAbsent(ns); | |||||
} | |||||
} | |||||
private static class ServerGlobalTransportPropertyListener implements PropertyListener<ServerTransportConfig> { | |||||
@Override | |||||
public void configLoad(ServerTransportConfig config) { | |||||
if (config == null) { | |||||
RecordLog.warn("[ClusterServerConfigManager] Empty initial server transport config"); | |||||
return; | |||||
} | |||||
applyConfig(config); | |||||
} | |||||
@Override | |||||
public void configUpdate(ServerTransportConfig config) { | |||||
applyConfig(config); | |||||
} | |||||
private synchronized void applyConfig(ServerTransportConfig config) { | |||||
if (!isValidTransportConfig(config)) { | |||||
RecordLog.warn( | |||||
"[ClusterServerConfigManager] Invalid cluster server transport config, ignoring: " + config); | |||||
return; | |||||
} | |||||
RecordLog.info("[ClusterServerConfigManager] Updating new server transport config: " + config); | |||||
if (config.getIdleSeconds() != idleSeconds) { | |||||
idleSeconds = config.getIdleSeconds(); | |||||
} | |||||
updateTokenServer(config); | |||||
} | |||||
} | |||||
private static class ServerGlobalFlowPropertyListener implements PropertyListener<ServerFlowConfig> { | |||||
@Override | |||||
public void configUpdate(ServerFlowConfig config) { | |||||
applyGlobalFlowConfig(config); | |||||
} | |||||
@Override | |||||
public void configLoad(ServerFlowConfig config) { | |||||
applyGlobalFlowConfig(config); | |||||
} | |||||
} | |||||
private static synchronized void applyGlobalFlowConfig(ServerFlowConfig config) { | |||||
if (!isValidFlowConfig(config)) { | |||||
RecordLog.warn( | |||||
"[ClusterServerConfigManager] Invalid cluster server global flow config, ignoring: " + config); | |||||
return; | |||||
} | |||||
RecordLog.info("[ClusterServerConfigManager] Updating new server global flow config: " + config); | |||||
if (config.getExceedCount() != exceedCount) { | |||||
exceedCount = config.getExceedCount(); | |||||
} | |||||
if (config.getMaxOccupyRatio() != maxOccupyRatio) { | |||||
maxOccupyRatio = config.getMaxOccupyRatio(); | |||||
} | |||||
int newIntervalMs = config.getIntervalMs(); | |||||
int newSampleCount = config.getSampleCount(); | |||||
if (newIntervalMs != intervalMs || newSampleCount != sampleCount) { | |||||
if (newIntervalMs <= 0 || newSampleCount <= 0 || newIntervalMs % newSampleCount != 0) { | |||||
RecordLog.warn("[ClusterServerConfigManager] Ignoring invalid flow interval or sample count"); | |||||
} else { | |||||
intervalMs = newIntervalMs; | |||||
sampleCount = newSampleCount; | |||||
// Reset all the metrics. | |||||
ClusterMetricStatistics.resetFlowMetrics(); | |||||
ClusterParamMetricStatistics.resetFlowMetrics(); | |||||
} | |||||
} | |||||
} | |||||
public static void updateTokenServer(ServerTransportConfig config) { | |||||
int newPort = config.getPort(); | |||||
AssertUtil.isTrue(newPort > 0, "token server port should be valid (positive)"); | |||||
if (newPort == port) { | |||||
return; | |||||
} | |||||
ClusterServerConfigManager.port = newPort; | |||||
for (ServerTransportConfigObserver observer : TRANSPORT_CONFIG_OBSERVERS) { | |||||
observer.onTransportConfigChange(config); | |||||
} | |||||
} | |||||
public static boolean isValidTransportConfig(ServerTransportConfig config) { | |||||
return config != null && config.getPort() > 0; | |||||
} | |||||
public static boolean isValidFlowConfig(ServerFlowConfig config) { | |||||
return config != null && config.getMaxOccupyRatio() >= 0 && config.getExceedCount() >= 0; | |||||
} | |||||
public static void addTransportConfigChangeObserver(ServerTransportConfigObserver observer) { | |||||
AssertUtil.notNull(observer, "observer cannot be null"); | |||||
TRANSPORT_CONFIG_OBSERVERS.add(observer); | |||||
} | |||||
public static double getExceedCount(String namespace) { | |||||
AssertUtil.notEmpty(namespace, "namespace cannot be empty"); | |||||
ServerFlowConfig config = NAMESPACE_CONF.get(namespace); | |||||
if (config != null) { | |||||
return config.getExceedCount(); | |||||
} | |||||
return exceedCount; | |||||
} | |||||
public static double getMaxOccupyRatio(String namespace) { | |||||
AssertUtil.notEmpty(namespace, "namespace cannot be empty"); | |||||
ServerFlowConfig config = NAMESPACE_CONF.get(namespace); | |||||
if (config != null) { | |||||
return config.getMaxOccupyRatio(); | |||||
} | |||||
return maxOccupyRatio; | |||||
} | |||||
public static int getIntervalMs(String namespace) { | |||||
AssertUtil.notEmpty(namespace, "namespace cannot be empty"); | |||||
ServerFlowConfig config = NAMESPACE_CONF.get(namespace); | |||||
if (config != null) { | |||||
return config.getIntervalMs(); | |||||
} | |||||
return intervalMs; | |||||
} | |||||
/** | |||||
* Get sample count of provided namespace. | |||||
* | |||||
* @param namespace valid namespace | |||||
* @return the sample count of namespace; if the namespace does not have customized value, use the global value | |||||
*/ | |||||
public static int getSampleCount(String namespace) { | |||||
AssertUtil.notEmpty(namespace, "namespace cannot be empty"); | |||||
ServerFlowConfig config = NAMESPACE_CONF.get(namespace); | |||||
if (config != null) { | |||||
return config.getSampleCount(); | |||||
} | |||||
return sampleCount; | |||||
} | |||||
public static double getExceedCount() { | |||||
return exceedCount; | |||||
} | |||||
public static double getMaxOccupyRatio() { | |||||
return maxOccupyRatio; | |||||
} | |||||
public static Set<String> getNamespaceSet() { | |||||
return namespaceSet; | |||||
} | |||||
public static int getPort() { | |||||
return port; | |||||
} | |||||
public static int getIdleSeconds() { | |||||
return idleSeconds; | |||||
} | |||||
public static volatile int port = DEFAULT_PORT; | |||||
public static int getIntervalMs() { | |||||
return intervalMs; | |||||
} | |||||
public static volatile double exceedCount = 1.0d; | |||||
public static volatile boolean borrowRefEnabled = true; | |||||
public static volatile int idleSeconds = DEFAULT_IDLE_SECONDS; | |||||
public static volatile double maxOccupyRatio = 1.0d; | |||||
public static int getSampleCount() { | |||||
return sampleCount; | |||||
} | |||||
// TODO: implement here. | |||||
public static void setNamespaceSet(Set<String> namespaceSet) { | |||||
applyNamespaceSetChange(namespaceSet); | |||||
} | |||||
private ClusterServerConfigManager() {} | private ClusterServerConfigManager() {} | ||||
} | } |
@@ -0,0 +1,97 @@ | |||||
/* | |||||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||||
* | |||||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||||
* you may not use this file except in compliance with the License. | |||||
* You may obtain a copy of the License at | |||||
* | |||||
* http://www.apache.org/licenses/LICENSE-2.0 | |||||
* | |||||
* Unless required by applicable law or agreed to in writing, software | |||||
* distributed under the License is distributed on an "AS IS" BASIS, | |||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
* See the License for the specific language governing permissions and | |||||
* limitations under the License. | |||||
*/ | |||||
package com.alibaba.csp.sentinel.cluster.server.config; | |||||
import com.alibaba.csp.sentinel.cluster.server.ServerConstants; | |||||
/** | |||||
* @author Eric Zhao | |||||
* @since 1.4.0 | |||||
*/ | |||||
public class ServerFlowConfig { | |||||
public static final double DEFAULT_EXCEED_COUNT = 1.0d; | |||||
public static final double DEFAULT_MAX_OCCUPY_RATIO = 1.0d; | |||||
public static final int DEFAULT_INTERVAL_MS = 1000; | |||||
public static final int DEFAULT_SAMPLE_COUNT= 10; | |||||
private final String namespace; | |||||
private double exceedCount = DEFAULT_EXCEED_COUNT; | |||||
private double maxOccupyRatio = DEFAULT_MAX_OCCUPY_RATIO; | |||||
private int intervalMs = DEFAULT_INTERVAL_MS; | |||||
private int sampleCount = DEFAULT_SAMPLE_COUNT; | |||||
public ServerFlowConfig() { | |||||
this(ServerConstants.DEFAULT_NAMESPACE); | |||||
} | |||||
public ServerFlowConfig(String namespace) { | |||||
this.namespace = namespace; | |||||
} | |||||
public String getNamespace() { | |||||
return namespace; | |||||
} | |||||
public double getExceedCount() { | |||||
return exceedCount; | |||||
} | |||||
public ServerFlowConfig setExceedCount(double exceedCount) { | |||||
this.exceedCount = exceedCount; | |||||
return this; | |||||
} | |||||
public double getMaxOccupyRatio() { | |||||
return maxOccupyRatio; | |||||
} | |||||
public ServerFlowConfig setMaxOccupyRatio(double maxOccupyRatio) { | |||||
this.maxOccupyRatio = maxOccupyRatio; | |||||
return this; | |||||
} | |||||
public int getIntervalMs() { | |||||
return intervalMs; | |||||
} | |||||
public ServerFlowConfig setIntervalMs(int intervalMs) { | |||||
this.intervalMs = intervalMs; | |||||
return this; | |||||
} | |||||
public int getSampleCount() { | |||||
return sampleCount; | |||||
} | |||||
public ServerFlowConfig setSampleCount(int sampleCount) { | |||||
this.sampleCount = sampleCount; | |||||
return this; | |||||
} | |||||
@Override | |||||
public String toString() { | |||||
return "ServerFlowConfig{" + | |||||
"namespace='" + namespace + '\'' + | |||||
", exceedCount=" + exceedCount + | |||||
", maxOccupyRatio=" + maxOccupyRatio + | |||||
", intervalMs=" + intervalMs + | |||||
", sampleCount=" + sampleCount + | |||||
'}'; | |||||
} | |||||
} |
@@ -0,0 +1,64 @@ | |||||
/* | |||||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||||
* | |||||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||||
* you may not use this file except in compliance with the License. | |||||
* You may obtain a copy of the License at | |||||
* | |||||
* http://www.apache.org/licenses/LICENSE-2.0 | |||||
* | |||||
* Unless required by applicable law or agreed to in writing, software | |||||
* distributed under the License is distributed on an "AS IS" BASIS, | |||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
* See the License for the specific language governing permissions and | |||||
* limitations under the License. | |||||
*/ | |||||
package com.alibaba.csp.sentinel.cluster.server.config; | |||||
/** | |||||
* @author Eric Zhao | |||||
* @since 1.4.0 | |||||
*/ | |||||
public class ServerTransportConfig { | |||||
public static final int DEFAULT_PORT = 8730; | |||||
public static final int DEFAULT_IDLE_SECONDS = 600; | |||||
private int port; | |||||
private int idleSeconds; | |||||
public ServerTransportConfig() { | |||||
this(DEFAULT_PORT, DEFAULT_IDLE_SECONDS); | |||||
} | |||||
public ServerTransportConfig(int port, int idleSeconds) { | |||||
this.port = port; | |||||
this.idleSeconds = idleSeconds; | |||||
} | |||||
public int getPort() { | |||||
return port; | |||||
} | |||||
public ServerTransportConfig setPort(int port) { | |||||
this.port = port; | |||||
return this; | |||||
} | |||||
public int getIdleSeconds() { | |||||
return idleSeconds; | |||||
} | |||||
public ServerTransportConfig setIdleSeconds(int idleSeconds) { | |||||
this.idleSeconds = idleSeconds; | |||||
return this; | |||||
} | |||||
@Override | |||||
public String toString() { | |||||
return "ServerTransportConfig{" + | |||||
"port=" + port + | |||||
", idleSeconds=" + idleSeconds + | |||||
'}'; | |||||
} | |||||
} |
@@ -0,0 +1,30 @@ | |||||
/* | |||||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||||
* | |||||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||||
* you may not use this file except in compliance with the License. | |||||
* You may obtain a copy of the License at | |||||
* | |||||
* http://www.apache.org/licenses/LICENSE-2.0 | |||||
* | |||||
* Unless required by applicable law or agreed to in writing, software | |||||
* distributed under the License is distributed on an "AS IS" BASIS, | |||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
* See the License for the specific language governing permissions and | |||||
* limitations under the License. | |||||
*/ | |||||
package com.alibaba.csp.sentinel.cluster.server.config; | |||||
/** | |||||
* @author Eric Zhao | |||||
* @since 1.4.0 | |||||
*/ | |||||
public interface ServerTransportConfigObserver { | |||||
/** | |||||
* Callback on server transport config (e.g. port) change. | |||||
* | |||||
* @param config new server transport config | |||||
*/ | |||||
void onTransportConfigChange(ServerTransportConfig config); | |||||
} |
@@ -20,6 +20,7 @@ import java.net.SocketAddress; | |||||
/** | /** | ||||
* @author xuyue | * @author xuyue | ||||
* @author Eric Zhao | * @author Eric Zhao | ||||
* @since 1.4.0 | |||||
*/ | */ | ||||
public interface Connection extends AutoCloseable { | public interface Connection extends AutoCloseable { | ||||
@@ -0,0 +1,69 @@ | |||||
/* | |||||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||||
* | |||||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||||
* you may not use this file except in compliance with the License. | |||||
* You may obtain a copy of the License at | |||||
* | |||||
* http://www.apache.org/licenses/LICENSE-2.0 | |||||
* | |||||
* Unless required by applicable law or agreed to in writing, software | |||||
* distributed under the License is distributed on an "AS IS" BASIS, | |||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
* See the License for the specific language governing permissions and | |||||
* limitations under the License. | |||||
*/ | |||||
package com.alibaba.csp.sentinel.cluster.server.connection; | |||||
import java.util.Objects; | |||||
/** | |||||
* @author Eric Zhao | |||||
* @since 1.4.0 | |||||
*/ | |||||
public class ConnectionDescriptor { | |||||
private String address; | |||||
private String host; | |||||
public String getAddress() { | |||||
return address; | |||||
} | |||||
public ConnectionDescriptor setAddress(String address) { | |||||
this.address = address; | |||||
return this; | |||||
} | |||||
public String getHost() { | |||||
return host; | |||||
} | |||||
public ConnectionDescriptor setHost(String host) { | |||||
this.host = host; | |||||
return this; | |||||
} | |||||
@Override | |||||
public boolean equals(Object o) { | |||||
if (this == o) { return true; } | |||||
if (o == null || getClass() != o.getClass()) { return false; } | |||||
ConnectionDescriptor that = (ConnectionDescriptor)o; | |||||
return Objects.equals(address, that.address); | |||||
} | |||||
@Override | |||||
public int hashCode() { | |||||
return address != null ? address.hashCode() : 0; | |||||
} | |||||
@Override | |||||
public String toString() { | |||||
return "ConnectionDescriptor{" + | |||||
"address='" + address + '\'' + | |||||
", host='" + host + '\'' + | |||||
'}'; | |||||
} | |||||
} |
@@ -15,6 +15,8 @@ | |||||
*/ | */ | ||||
package com.alibaba.csp.sentinel.cluster.server.connection; | package com.alibaba.csp.sentinel.cluster.server.connection; | ||||
import java.util.Collections; | |||||
import java.util.HashSet; | |||||
import java.util.Set; | import java.util.Set; | ||||
import java.util.concurrent.ConcurrentSkipListSet; | import java.util.concurrent.ConcurrentSkipListSet; | ||||
import java.util.concurrent.atomic.AtomicInteger; | import java.util.concurrent.atomic.AtomicInteger; | ||||
@@ -23,16 +25,17 @@ import com.alibaba.csp.sentinel.cluster.server.ServerConstants; | |||||
import com.alibaba.csp.sentinel.util.AssertUtil; | import com.alibaba.csp.sentinel.util.AssertUtil; | ||||
/** | /** | ||||
* The connection group stores connection set for a specific namespace. | |||||
* | |||||
* @author Eric Zhao | * @author Eric Zhao | ||||
* @since 1.4.0 | * @since 1.4.0 | ||||
*/ | */ | ||||
public class ConnectionGroup { | public class ConnectionGroup { | ||||
private String namespace; | |||||
private final String namespace; | |||||
private Set<String> addressSet = new ConcurrentSkipListSet<>(); | |||||
private Set<String> hostSet = new ConcurrentSkipListSet<>(); | |||||
private AtomicInteger connectedCount = new AtomicInteger(); | |||||
private final Set<ConnectionDescriptor> connectionSet = Collections.synchronizedSet(new HashSet<ConnectionDescriptor>()); | |||||
private final AtomicInteger connectedCount = new AtomicInteger(); | |||||
public ConnectionGroup(String namespace) { | public ConnectionGroup(String namespace) { | ||||
AssertUtil.notEmpty(namespace, "namespace cannot be empty"); | AssertUtil.notEmpty(namespace, "namespace cannot be empty"); | ||||
@@ -46,24 +49,26 @@ public class ConnectionGroup { | |||||
public ConnectionGroup addConnection(String address) { | public ConnectionGroup addConnection(String address) { | ||||
AssertUtil.notEmpty(address, "address cannot be empty"); | AssertUtil.notEmpty(address, "address cannot be empty"); | ||||
addressSet.add(address); | |||||
String[] ip = address.split(":"); | String[] ip = address.split(":"); | ||||
String host; | |||||
if (ip != null && ip.length >= 1) { | if (ip != null && ip.length >= 1) { | ||||
hostSet.add(ip[0]); | |||||
host = ip[0]; | |||||
} else { | |||||
host = address; | |||||
} | } | ||||
connectionSet.add(new ConnectionDescriptor().setAddress(address).setHost(host)); | |||||
connectedCount.incrementAndGet(); | connectedCount.incrementAndGet(); | ||||
return this; | return this; | ||||
} | } | ||||
public ConnectionGroup removeConnection(String address) { | public ConnectionGroup removeConnection(String address) { | ||||
AssertUtil.notEmpty(address, "address cannot be empty"); | AssertUtil.notEmpty(address, "address cannot be empty"); | ||||
addressSet.remove(address); | |||||
String[] ip = address.split(":"); | |||||
if (ip != null && ip.length >= 1) { | |||||
hostSet.remove(ip[0]); | |||||
if (connectionSet.remove(new ConnectionDescriptor().setAddress(address))) { | |||||
connectedCount.decrementAndGet(); | |||||
} | } | ||||
connectedCount.decrementAndGet(); | |||||
return this; | return this; | ||||
} | } | ||||
@@ -71,12 +76,8 @@ public class ConnectionGroup { | |||||
return namespace; | return namespace; | ||||
} | } | ||||
public Set<String> getAddressSet() { | |||||
return addressSet; | |||||
} | |||||
public Set<String> getHostSet() { | |||||
return hostSet; | |||||
public Set<ConnectionDescriptor> getConnectionSet() { | |||||
return connectionSet; | |||||
} | } | ||||
public int getConnectedCount() { | public int getConnectedCount() { | ||||
@@ -18,15 +18,89 @@ package com.alibaba.csp.sentinel.cluster.server.connection; | |||||
import java.util.Map; | import java.util.Map; | ||||
import java.util.concurrent.ConcurrentHashMap; | import java.util.concurrent.ConcurrentHashMap; | ||||
import com.alibaba.csp.sentinel.log.RecordLog; | |||||
import com.alibaba.csp.sentinel.util.AssertUtil; | |||||
/** | /** | ||||
* Manager for namespace-scope {@link ConnectionGroup}. | |||||
* | |||||
* @author Eric Zhao | * @author Eric Zhao | ||||
* @since 1.4.0 | * @since 1.4.0 | ||||
*/ | */ | ||||
public final class ConnectionManager { | public final class ConnectionManager { | ||||
/** | |||||
* Connection map (namespace, connection). | |||||
*/ | |||||
private static final Map<String, ConnectionGroup> CONN_MAP = new ConcurrentHashMap<>(); | private static final Map<String, ConnectionGroup> CONN_MAP = new ConcurrentHashMap<>(); | ||||
/** | |||||
* namespace map (address, namespace). | |||||
*/ | |||||
private static final Map<String, String> NAMESPACE_MAP = new ConcurrentHashMap<>(); | |||||
/** | |||||
* Get connected count for specific namespace. | |||||
* | |||||
* @param namespace namespace to check | |||||
* @return connected count for specific namespace | |||||
*/ | |||||
public static int getConnectedCount(String namespace) { | |||||
AssertUtil.notEmpty(namespace, "namespace should not be empty"); | |||||
ConnectionGroup group = CONN_MAP.get(namespace); | |||||
return group == null ? 0 : group.getConnectedCount(); | |||||
} | |||||
public static ConnectionGroup getOrCreateGroup(String namespace) { | |||||
AssertUtil.assertNotBlank(namespace, "namespace should not be empty"); | |||||
ConnectionGroup group = CONN_MAP.get(namespace); | |||||
if (group == null) { | |||||
synchronized (CREATE_LOCK) { | |||||
if (CONN_MAP.get(namespace) == null) { | |||||
group = new ConnectionGroup(namespace); | |||||
CONN_MAP.put(namespace, group); | |||||
} | |||||
} | |||||
} | |||||
return group; | |||||
} | |||||
public static void removeConnection(String address) { | |||||
AssertUtil.assertNotBlank(address, "address should not be empty"); | |||||
String namespace = NAMESPACE_MAP.get(address); | |||||
if (namespace != null) { | |||||
ConnectionGroup group = CONN_MAP.get(namespace); | |||||
if (group == null) { | |||||
return; | |||||
} | |||||
group.removeConnection(address); | |||||
RecordLog.info("[ConnectionManager] Client <{0}> disconnected and removed from namespace <{1}>", address, namespace); | |||||
} | |||||
NAMESPACE_MAP.remove(address); | |||||
} | |||||
public static void removeConnection(String namespace, String address) { | |||||
AssertUtil.assertNotBlank(namespace, "namespace should not be empty"); | |||||
AssertUtil.assertNotBlank(address, "address should not be empty"); | |||||
ConnectionGroup group = CONN_MAP.get(namespace); | |||||
if (group == null) { | |||||
return; | |||||
} | |||||
group.removeConnection(address); | |||||
NAMESPACE_MAP.remove(address); | |||||
RecordLog.info("[ConnectionManager] Client <{0}> disconnected and removed from namespace <{1}>", address, namespace); | |||||
} | |||||
public static ConnectionGroup addConnection(String namespace, String address) { | |||||
AssertUtil.assertNotBlank(namespace, "namespace should not be empty"); | |||||
AssertUtil.assertNotBlank(address, "address should not be empty"); | |||||
ConnectionGroup group = getOrCreateGroup(namespace); | |||||
group.addConnection(address); | |||||
NAMESPACE_MAP.put(address, namespace); | |||||
RecordLog.info("[ConnectionManager] Client <{0}> registered with namespace <{1}>", address, namespace); | |||||
return group; | |||||
} | |||||
private static final Object CREATE_LOCK = new Object(); | |||||
private ConnectionManager() {} | private ConnectionManager() {} | ||||
} | } |
@@ -50,11 +50,6 @@ public class ConnectionPool { | |||||
*/ | */ | ||||
private ScheduledFuture scanTaskFuture = null; | private ScheduledFuture scanTaskFuture = null; | ||||
/** | |||||
* 创建一个connection,并放入连接池中 | |||||
* | |||||
* @param channel | |||||
*/ | |||||
public void createConnection(Channel channel) { | public void createConnection(Channel channel) { | ||||
if (channel != null) { | if (channel != null) { | ||||
Connection connection = new NettyConnection(channel, this); | Connection connection = new NettyConnection(channel, this); | ||||
@@ -83,7 +78,7 @@ public class ConnectionPool { | |||||
* @return formatted key | * @return formatted key | ||||
*/ | */ | ||||
private String getConnectionKey(Channel channel) { | private String getConnectionKey(Channel channel) { | ||||
InetSocketAddress socketAddress = (InetSocketAddress) channel.remoteAddress(); | |||||
InetSocketAddress socketAddress = (InetSocketAddress)channel.remoteAddress(); | |||||
String remoteIp = socketAddress.getAddress().getHostAddress(); | String remoteIp = socketAddress.getAddress().getHostAddress(); | ||||
int remotePort = socketAddress.getPort(); | int remotePort = socketAddress.getPort(); | ||||
return remoteIp + ":" + remotePort; | return remoteIp + ":" + remotePort; | ||||
@@ -93,16 +88,10 @@ public class ConnectionPool { | |||||
return ip + ":" + port; | return ip + ":" + port; | ||||
} | } | ||||
/** | |||||
* 刷新一个连接上的最新read时间 | |||||
* | |||||
* @param channel | |||||
*/ | |||||
public void refreshLastReadTime(Channel channel) { | public void refreshLastReadTime(Channel channel) { | ||||
if (channel != null) { | if (channel != null) { | ||||
String connKey = getConnectionKey(channel); | String connKey = getConnectionKey(channel); | ||||
Connection connection = CONNECTION_MAP.get(connKey); | Connection connection = CONNECTION_MAP.get(connKey); | ||||
//不应该为null,需要处理这种情况吗? | |||||
if (connection != null) { | if (connection != null) { | ||||
connection.refreshLastReadTime(System.currentTimeMillis()); | connection.refreshLastReadTime(System.currentTimeMillis()); | ||||
} | } | ||||
@@ -124,7 +113,7 @@ public class ConnectionPool { | |||||
return connections; | return connections; | ||||
} | } | ||||
public int count(){ | |||||
public int count() { | |||||
return CONNECTION_MAP.size(); | return CONNECTION_MAP.size(); | ||||
} | } | ||||
@@ -141,7 +130,7 @@ public class ConnectionPool { | |||||
public void refreshIdleTask() { | public void refreshIdleTask() { | ||||
if (scanTaskFuture == null || scanTaskFuture.cancel(false)) { | if (scanTaskFuture == null || scanTaskFuture.cancel(false)) { | ||||
startScan(); | startScan(); | ||||
}else { | |||||
} else { | |||||
RecordLog.info("The result of canceling scanTask is error."); | RecordLog.info("The result of canceling scanTask is error."); | ||||
} | } | ||||
} | } | ||||
@@ -22,6 +22,7 @@ import io.netty.channel.Channel; | |||||
/** | /** | ||||
* @author xuyue | * @author xuyue | ||||
* @since 1.4.0 | |||||
*/ | */ | ||||
public class NettyConnection implements Connection { | public class NettyConnection implements Connection { | ||||
@@ -3,15 +3,17 @@ package com.alibaba.csp.sentinel.cluster.server.connection; | |||||
import java.util.List; | import java.util.List; | ||||
import com.alibaba.csp.sentinel.cluster.server.config.ClusterServerConfigManager; | import com.alibaba.csp.sentinel.cluster.server.config.ClusterServerConfigManager; | ||||
import com.alibaba.csp.sentinel.cluster.server.config.ServerTransportConfig; | |||||
import com.alibaba.csp.sentinel.log.RecordLog; | import com.alibaba.csp.sentinel.log.RecordLog; | ||||
/** | /** | ||||
* @author xuyue | * @author xuyue | ||||
* @author Eric Zhao | * @author Eric Zhao | ||||
* @since 1.4.0 | |||||
*/ | */ | ||||
public class ScanIdleConnectionTask implements Runnable { | public class ScanIdleConnectionTask implements Runnable { | ||||
private ConnectionPool connectionPool; | |||||
private final ConnectionPool connectionPool; | |||||
public ScanIdleConnectionTask(ConnectionPool connectionPool) { | public ScanIdleConnectionTask(ConnectionPool connectionPool) { | ||||
this.connectionPool = connectionPool; | this.connectionPool = connectionPool; | ||||
@@ -20,15 +22,15 @@ public class ScanIdleConnectionTask implements Runnable { | |||||
@Override | @Override | ||||
public void run() { | public void run() { | ||||
try { | try { | ||||
int idleSeconds = ClusterServerConfigManager.idleSeconds; | |||||
long idleTime = idleSeconds * 1000; | |||||
if (idleTime < 0) { | |||||
idleTime = 600 * 1000; | |||||
int idleSeconds = ClusterServerConfigManager.getIdleSeconds(); | |||||
long idleTimeMillis = idleSeconds * 1000; | |||||
if (idleTimeMillis < 0) { | |||||
idleTimeMillis = ServerTransportConfig.DEFAULT_IDLE_SECONDS * 1000; | |||||
} | } | ||||
long now = System.currentTimeMillis(); | long now = System.currentTimeMillis(); | ||||
List<Connection> connections = connectionPool.listAllConnection(); | List<Connection> connections = connectionPool.listAllConnection(); | ||||
for (Connection conn : connections) { | for (Connection conn : connections) { | ||||
if ((now - conn.getLastReadTime()) > idleTime) { | |||||
if ((now - conn.getLastReadTime()) > idleTimeMillis) { | |||||
RecordLog.info( | RecordLog.info( | ||||
String.format("[ScanIdleConnectionTask] The connection <%s:%d> has been idle for <%d>s. " | String.format("[ScanIdleConnectionTask] The connection <%s:%d> has been idle for <%d>s. " | ||||
+ "It will be closed now.", conn.getRemoteIP(), conn.getRemotePort(), idleSeconds) | + "It will be closed now.", conn.getRemoteIP(), conn.getRemotePort(), idleSeconds) | ||||
@@ -37,7 +39,7 @@ public class ScanIdleConnectionTask implements Runnable { | |||||
} | } | ||||
} | } | ||||
} catch (Throwable t) { | } catch (Throwable t) { | ||||
// TODO: should log here. | |||||
RecordLog.warn("[ScanIdleConnectionTask] Failed to clean-up idle tasks", t); | |||||
} | } | ||||
} | } | ||||
} | } |
@@ -18,11 +18,12 @@ package com.alibaba.csp.sentinel.cluster.server.handler; | |||||
import com.alibaba.csp.sentinel.cluster.ClusterConstants; | import com.alibaba.csp.sentinel.cluster.ClusterConstants; | ||||
import com.alibaba.csp.sentinel.cluster.request.ClusterRequest; | import com.alibaba.csp.sentinel.cluster.request.ClusterRequest; | ||||
import com.alibaba.csp.sentinel.cluster.response.ClusterResponse; | import com.alibaba.csp.sentinel.cluster.response.ClusterResponse; | ||||
import com.alibaba.csp.sentinel.cluster.response.data.FlowTokenResponseData; | |||||
import com.alibaba.csp.sentinel.cluster.server.connection.ConnectionManager; | |||||
import com.alibaba.csp.sentinel.cluster.server.connection.ConnectionPool; | import com.alibaba.csp.sentinel.cluster.server.connection.ConnectionPool; | ||||
import com.alibaba.csp.sentinel.cluster.server.processor.RequestProcessor; | import com.alibaba.csp.sentinel.cluster.server.processor.RequestProcessor; | ||||
import com.alibaba.csp.sentinel.cluster.server.processor.RequestProcessorRegistry; | |||||
import com.alibaba.csp.sentinel.cluster.server.processor.RequestProcessorProvider; | |||||
import com.alibaba.csp.sentinel.log.RecordLog; | import com.alibaba.csp.sentinel.log.RecordLog; | ||||
import com.alibaba.csp.sentinel.util.StringUtil; | |||||
import io.netty.channel.ChannelHandlerContext; | import io.netty.channel.ChannelHandlerContext; | ||||
import io.netty.channel.ChannelInboundHandlerAdapter; | import io.netty.channel.ChannelInboundHandlerAdapter; | ||||
@@ -35,36 +36,46 @@ import io.netty.channel.ChannelInboundHandlerAdapter; | |||||
*/ | */ | ||||
public class TokenServerHandler extends ChannelInboundHandlerAdapter { | public class TokenServerHandler extends ChannelInboundHandlerAdapter { | ||||
private final ConnectionPool connectionPool; | |||||
private final ConnectionPool globalConnectionPool; | |||||
public TokenServerHandler(ConnectionPool connectionPool) { | |||||
this.connectionPool = connectionPool; | |||||
public TokenServerHandler(ConnectionPool globalConnectionPool) { | |||||
this.globalConnectionPool = globalConnectionPool; | |||||
} | } | ||||
@Override | @Override | ||||
public void channelActive(ChannelHandlerContext ctx) throws Exception { | public void channelActive(ChannelHandlerContext ctx) throws Exception { | ||||
System.out.println("[TokenServerHandler] Connection established"); | |||||
super.channelActive(ctx); | |||||
globalConnectionPool.createConnection(ctx.channel()); | |||||
String remoteAddress = getRemoteAddress(ctx); | |||||
System.out.println("[TokenServerHandler] Connection established, remote client address: " + remoteAddress); //TODO: DEBUG | |||||
} | } | ||||
@Override | @Override | ||||
public void channelInactive(ChannelHandlerContext ctx) throws Exception { | public void channelInactive(ChannelHandlerContext ctx) throws Exception { | ||||
System.out.println("[TokenServerHandler] Connection inactive"); | |||||
super.channelInactive(ctx); | |||||
String remoteAddress = getRemoteAddress(ctx); | |||||
System.out.println("[TokenServerHandler] Connection inactive, remote client address: " + remoteAddress); //TODO: DEBUG | |||||
globalConnectionPool.remove(ctx.channel()); | |||||
ConnectionManager.removeConnection(remoteAddress); | |||||
} | } | ||||
@Override | @Override | ||||
@SuppressWarnings("unchecked") | @SuppressWarnings("unchecked") | ||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { | public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { | ||||
connectionPool.refreshLastReadTime(ctx.channel()); | |||||
System.out.println(String.format("[%s] Server message recv: %s", System.currentTimeMillis(), msg)); | |||||
globalConnectionPool.refreshLastReadTime(ctx.channel()); | |||||
System.out.println(String.format("[%s] Server message recv: %s", System.currentTimeMillis(), msg)); //TODO: DEBUG | |||||
if (msg instanceof ClusterRequest) { | if (msg instanceof ClusterRequest) { | ||||
ClusterRequest request = (ClusterRequest)msg; | ClusterRequest request = (ClusterRequest)msg; | ||||
RequestProcessor<?, ?> processor = RequestProcessorRegistry.getProcessor(request.getType()); | |||||
// Client ping with its namespace, add to connection manager. | |||||
if (request.getType() == ClusterConstants.MSG_TYPE_PING) { | |||||
handlePingRequest(ctx, request); | |||||
return; | |||||
} | |||||
// Pick request processor for request type. | |||||
RequestProcessor<?, ?> processor = RequestProcessorProvider.getProcessor(request.getType()); | |||||
if (processor == null) { | if (processor == null) { | ||||
System.out.println("[TokenServerHandler] No processor for request type: " + request.getType()); | |||||
writeNoProcessorResponse(ctx, request); | |||||
RecordLog.warn("[TokenServerHandler] No processor for request type: " + request.getType()); | |||||
writeBadResponse(ctx, request); | |||||
} else { | } else { | ||||
ClusterResponse<?> response = processor.processRequest(request); | ClusterResponse<?> response = processor.processRequest(request); | ||||
writeResponse(ctx, response); | writeResponse(ctx, response); | ||||
@@ -72,7 +83,7 @@ public class TokenServerHandler extends ChannelInboundHandlerAdapter { | |||||
} | } | ||||
} | } | ||||
private void writeNoProcessorResponse(ChannelHandlerContext ctx, ClusterRequest request) { | |||||
private void writeBadResponse(ChannelHandlerContext ctx, ClusterRequest request) { | |||||
ClusterResponse<?> response = new ClusterResponse<>(request.getId(), request.getType(), | ClusterResponse<?> response = new ClusterResponse<>(request.getId(), request.getType(), | ||||
ClusterConstants.RESPONSE_STATUS_BAD, null); | ClusterConstants.RESPONSE_STATUS_BAD, null); | ||||
writeResponse(ctx, response); | writeResponse(ctx, response); | ||||
@@ -81,4 +92,24 @@ public class TokenServerHandler extends ChannelInboundHandlerAdapter { | |||||
private void writeResponse(ChannelHandlerContext ctx, ClusterResponse response) { | private void writeResponse(ChannelHandlerContext ctx, ClusterResponse response) { | ||||
ctx.writeAndFlush(response); | ctx.writeAndFlush(response); | ||||
} | } | ||||
private void handlePingRequest(ChannelHandlerContext ctx, ClusterRequest request) { | |||||
if (request.getData() == null || StringUtil.isBlank((String)request.getData())) { | |||||
writeBadResponse(ctx, request); | |||||
return; | |||||
} | |||||
String namespace = (String)request.getData(); | |||||
String clientAddress = getRemoteAddress(ctx); | |||||
// Add the remote namespace to connection manager. | |||||
int curCount = ConnectionManager.addConnection(namespace, clientAddress).getConnectedCount(); | |||||
int status = ClusterConstants.RESPONSE_STATUS_OK; | |||||
ClusterResponse<Integer> response = new ClusterResponse<>(request.getId(), request.getType(), status, curCount); | |||||
writeResponse(ctx, response); | |||||
RecordLog.info("[TokenServerHandler] Client <{0}> registered with namespace <{1}>", clientAddress, namespace); | |||||
} | |||||
private String getRemoteAddress(ChannelHandlerContext ctx) { | |||||
return ctx.channel().remoteAddress().toString(); | |||||
} | |||||
} | } |
@@ -0,0 +1,66 @@ | |||||
/* | |||||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||||
* | |||||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||||
* you may not use this file except in compliance with the License. | |||||
* You may obtain a copy of the License at | |||||
* | |||||
* http://www.apache.org/licenses/LICENSE-2.0 | |||||
* | |||||
* Unless required by applicable law or agreed to in writing, software | |||||
* distributed under the License is distributed on an "AS IS" BASIS, | |||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
* See the License for the specific language governing permissions and | |||||
* limitations under the License. | |||||
*/ | |||||
package com.alibaba.csp.sentinel.cluster.server.init; | |||||
import com.alibaba.csp.sentinel.cluster.ClusterConstants; | |||||
import com.alibaba.csp.sentinel.cluster.server.TokenServiceProvider; | |||||
import com.alibaba.csp.sentinel.cluster.server.codec.data.FlowRequestDataDecoder; | |||||
import com.alibaba.csp.sentinel.cluster.server.codec.data.FlowResponseDataWriter; | |||||
import com.alibaba.csp.sentinel.cluster.server.codec.data.ParamFlowRequestDataDecoder; | |||||
import com.alibaba.csp.sentinel.cluster.server.codec.data.PingRequestDataDecoder; | |||||
import com.alibaba.csp.sentinel.cluster.server.codec.data.PingResponseDataWriter; | |||||
import com.alibaba.csp.sentinel.cluster.server.codec.registry.RequestDataDecodeRegistry; | |||||
import com.alibaba.csp.sentinel.cluster.server.codec.registry.ResponseDataWriterRegistry; | |||||
import com.alibaba.csp.sentinel.cluster.server.processor.RequestProcessorProvider; | |||||
import com.alibaba.csp.sentinel.init.InitFunc; | |||||
import com.alibaba.csp.sentinel.log.RecordLog; | |||||
/** | |||||
* @author Eric Zhao | |||||
* @since 1.4.0 | |||||
*/ | |||||
public class DefaultClusterServerInitFunc implements InitFunc { | |||||
@Override | |||||
public void init() throws Exception { | |||||
initDefaultEntityDecoders(); | |||||
initDefaultEntityWriters(); | |||||
initDefaultProcessors(); | |||||
// Eagerly-trigger the SPI pre-load of token service. | |||||
TokenServiceProvider.getService(); | |||||
RecordLog.info("[DefaultClusterServerInitFunc] Default entity codec and processors registered"); | |||||
} | |||||
private void initDefaultEntityWriters() { | |||||
ResponseDataWriterRegistry.addWriter(ClusterConstants.MSG_TYPE_PING, new PingResponseDataWriter()); | |||||
ResponseDataWriterRegistry.addWriter(ClusterConstants.MSG_TYPE_FLOW, new FlowResponseDataWriter()); | |||||
ResponseDataWriterRegistry.addWriter(ClusterConstants.MSG_TYPE_PARAM_FLOW, new FlowResponseDataWriter()); | |||||
} | |||||
private void initDefaultEntityDecoders() { | |||||
RequestDataDecodeRegistry.addDecoder(ClusterConstants.MSG_TYPE_PING, new PingRequestDataDecoder()); | |||||
RequestDataDecodeRegistry.addDecoder(ClusterConstants.MSG_TYPE_FLOW, new FlowRequestDataDecoder()); | |||||
RequestDataDecodeRegistry.addDecoder(ClusterConstants.MSG_TYPE_PARAM_FLOW, new ParamFlowRequestDataDecoder()); | |||||
} | |||||
private void initDefaultProcessors() { | |||||
// Eagerly-trigger the SPI pre-load. | |||||
RequestProcessorProvider.getProcessor(0); | |||||
} | |||||
} |
@@ -0,0 +1,56 @@ | |||||
/* | |||||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||||
* | |||||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||||
* you may not use this file except in compliance with the License. | |||||
* You may obtain a copy of the License at | |||||
* | |||||
* http://www.apache.org/licenses/LICENSE-2.0 | |||||
* | |||||
* Unless required by applicable law or agreed to in writing, software | |||||
* distributed under the License is distributed on an "AS IS" BASIS, | |||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
* See the License for the specific language governing permissions and | |||||
* limitations under the License. | |||||
*/ | |||||
package com.alibaba.csp.sentinel.cluster.server.log; | |||||
import com.alibaba.csp.sentinel.eagleeye.EagleEye; | |||||
import com.alibaba.csp.sentinel.eagleeye.StatLogger; | |||||
import com.alibaba.csp.sentinel.log.LogBase; | |||||
/** | |||||
* @author Eric Zhao | |||||
* @since 1.4.0 | |||||
*/ | |||||
public final class ClusterServerStatLogUtil { | |||||
private static final String FILE_NAME = "sentinel-server.log"; | |||||
private static StatLogger statLogger; | |||||
static { | |||||
String path = LogBase.getLogBaseDir() + FILE_NAME; | |||||
statLogger = EagleEye.statLoggerBuilder("sentinel-cluster-server-record") | |||||
.intervalSeconds(1) | |||||
.entryDelimiter('|') | |||||
.keyDelimiter(',') | |||||
.valueDelimiter(',') | |||||
.maxEntryCount(5000) | |||||
.configLogFilePath(path) | |||||
.maxFileSizeMB(300) | |||||
.maxBackupIndex(3) | |||||
.buildSingleton(); | |||||
} | |||||
public static void log(String msg) { | |||||
statLogger.stat(msg).count(); | |||||
} | |||||
public static void log(String msg, int count) { | |||||
statLogger.stat(msg).count(count); | |||||
} | |||||
private ClusterServerStatLogUtil() {} | |||||
} |
@@ -15,8 +15,10 @@ | |||||
*/ | */ | ||||
package com.alibaba.csp.sentinel.cluster.server.processor; | package com.alibaba.csp.sentinel.cluster.server.processor; | ||||
import com.alibaba.csp.sentinel.cluster.ClusterConstants; | |||||
import com.alibaba.csp.sentinel.cluster.TokenResult; | import com.alibaba.csp.sentinel.cluster.TokenResult; | ||||
import com.alibaba.csp.sentinel.cluster.TokenService; | import com.alibaba.csp.sentinel.cluster.TokenService; | ||||
import com.alibaba.csp.sentinel.cluster.annotation.RequestType; | |||||
import com.alibaba.csp.sentinel.cluster.request.ClusterRequest; | import com.alibaba.csp.sentinel.cluster.request.ClusterRequest; | ||||
import com.alibaba.csp.sentinel.cluster.request.data.FlowRequestData; | import com.alibaba.csp.sentinel.cluster.request.data.FlowRequestData; | ||||
import com.alibaba.csp.sentinel.cluster.response.ClusterResponse; | import com.alibaba.csp.sentinel.cluster.response.ClusterResponse; | ||||
@@ -27,6 +29,7 @@ import com.alibaba.csp.sentinel.cluster.server.TokenServiceProvider; | |||||
* @author Eric Zhao | * @author Eric Zhao | ||||
* @since 1.4.0 | * @since 1.4.0 | ||||
*/ | */ | ||||
@RequestType(ClusterConstants.MSG_TYPE_FLOW) | |||||
public class FlowRequestProcessor implements RequestProcessor<FlowRequestData, FlowTokenResponseData> { | public class FlowRequestProcessor implements RequestProcessor<FlowRequestData, FlowTokenResponseData> { | ||||
@Override | @Override | ||||
@@ -17,8 +17,10 @@ package com.alibaba.csp.sentinel.cluster.server.processor; | |||||
import java.util.Collection; | import java.util.Collection; | ||||
import com.alibaba.csp.sentinel.cluster.ClusterConstants; | |||||
import com.alibaba.csp.sentinel.cluster.TokenResult; | import com.alibaba.csp.sentinel.cluster.TokenResult; | ||||
import com.alibaba.csp.sentinel.cluster.TokenService; | import com.alibaba.csp.sentinel.cluster.TokenService; | ||||
import com.alibaba.csp.sentinel.cluster.annotation.RequestType; | |||||
import com.alibaba.csp.sentinel.cluster.request.ClusterRequest; | import com.alibaba.csp.sentinel.cluster.request.ClusterRequest; | ||||
import com.alibaba.csp.sentinel.cluster.request.data.ParamFlowRequestData; | import com.alibaba.csp.sentinel.cluster.request.data.ParamFlowRequestData; | ||||
import com.alibaba.csp.sentinel.cluster.response.ClusterResponse; | import com.alibaba.csp.sentinel.cluster.response.ClusterResponse; | ||||
@@ -29,6 +31,7 @@ import com.alibaba.csp.sentinel.cluster.server.TokenServiceProvider; | |||||
* @author Eric Zhao | * @author Eric Zhao | ||||
* @since 1.4.0 | * @since 1.4.0 | ||||
*/ | */ | ||||
@RequestType(ClusterConstants.MSG_TYPE_PARAM_FLOW) | |||||
public class ParamFlowRequestProcessor implements RequestProcessor<ParamFlowRequestData, FlowTokenResponseData> { | public class ParamFlowRequestProcessor implements RequestProcessor<ParamFlowRequestData, FlowTokenResponseData> { | ||||
@Override | @Override | ||||
@@ -16,23 +16,49 @@ | |||||
package com.alibaba.csp.sentinel.cluster.server.processor; | package com.alibaba.csp.sentinel.cluster.server.processor; | ||||
import java.util.Map; | import java.util.Map; | ||||
import java.util.ServiceLoader; | |||||
import java.util.concurrent.ConcurrentHashMap; | import java.util.concurrent.ConcurrentHashMap; | ||||
import com.alibaba.csp.sentinel.cluster.annotation.RequestType; | |||||
import com.alibaba.csp.sentinel.util.AssertUtil; | import com.alibaba.csp.sentinel.util.AssertUtil; | ||||
/** | /** | ||||
* @author Eric Zhao | * @author Eric Zhao | ||||
* @since 1.4.0 | * @since 1.4.0 | ||||
*/ | */ | ||||
public final class RequestProcessorRegistry { | |||||
public final class RequestProcessorProvider { | |||||
private static final Map<Integer, RequestProcessor> PROCESSOR_MAP = new ConcurrentHashMap<>(); | private static final Map<Integer, RequestProcessor> PROCESSOR_MAP = new ConcurrentHashMap<>(); | ||||
private static final ServiceLoader<RequestProcessor> SERVICE_LOADER = ServiceLoader.load(RequestProcessor.class); | |||||
static { | |||||
loadAndInit(); | |||||
} | |||||
private static void loadAndInit() { | |||||
for (RequestProcessor processor : SERVICE_LOADER) { | |||||
Integer type = parseRequestType(processor); | |||||
if (type != null) { | |||||
PROCESSOR_MAP.put(type, processor); | |||||
} | |||||
} | |||||
} | |||||
private static Integer parseRequestType(RequestProcessor processor) { | |||||
RequestType requestType = processor.getClass().getAnnotation(RequestType.class); | |||||
if (requestType != null) { | |||||
return requestType.value(); | |||||
} else { | |||||
return null; | |||||
} | |||||
} | |||||
public static RequestProcessor getProcessor(int type) { | public static RequestProcessor getProcessor(int type) { | ||||
return PROCESSOR_MAP.get(type); | return PROCESSOR_MAP.get(type); | ||||
} | } | ||||
public static void addProcessorIfAbsent(int type, RequestProcessor processor) { | |||||
static void addProcessorIfAbsent(int type, RequestProcessor processor) { | |||||
// TBD: use putIfAbsent in JDK 1.8. | // TBD: use putIfAbsent in JDK 1.8. | ||||
if (PROCESSOR_MAP.containsKey(type)) { | if (PROCESSOR_MAP.containsKey(type)) { | ||||
return; | return; | ||||
@@ -40,10 +66,11 @@ public final class RequestProcessorRegistry { | |||||
PROCESSOR_MAP.put(type, processor); | PROCESSOR_MAP.put(type, processor); | ||||
} | } | ||||
public static void addProcessor(int type, RequestProcessor processor) { | |||||
static void addProcessor(int type, RequestProcessor processor) { | |||||
AssertUtil.notNull(processor, "processor cannot be null"); | AssertUtil.notNull(processor, "processor cannot be null"); | ||||
PROCESSOR_MAP.put(type, processor); | PROCESSOR_MAP.put(type, processor); | ||||
} | } | ||||
private RequestProcessorRegistry() {} | |||||
private RequestProcessorProvider() {} | |||||
} | } |
@@ -17,6 +17,7 @@ package com.alibaba.csp.sentinel.cluster.server.util; | |||||
/** | /** | ||||
* @author Eric Zhao | * @author Eric Zhao | ||||
* @since 1.4.0 | |||||
*/ | */ | ||||
public final class ClusterRuleUtil { | public final class ClusterRuleUtil { | ||||
@@ -0,0 +1 @@ | |||||
com.alibaba.csp.sentinel.cluster.server.DefaultEmbeddedTokenServer |
@@ -0,0 +1,2 @@ | |||||
com.alibaba.csp.sentinel.cluster.server.processor.FlowRequestProcessor | |||||
com.alibaba.csp.sentinel.cluster.server.processor.ParamFlowRequestProcessor |
@@ -0,0 +1 @@ | |||||
com.alibaba.csp.sentinel.cluster.server.init.DefaultClusterServerInitFunc |
@@ -0,0 +1,54 @@ | |||||
/* | |||||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||||
* | |||||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||||
* you may not use this file except in compliance with the License. | |||||
* You may obtain a copy of the License at | |||||
* | |||||
* http://www.apache.org/licenses/LICENSE-2.0 | |||||
* | |||||
* Unless required by applicable law or agreed to in writing, software | |||||
* distributed under the License is distributed on an "AS IS" BASIS, | |||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
* See the License for the specific language governing permissions and | |||||
* limitations under the License. | |||||
*/ | |||||
package com.alibaba.csp.sentinel.cluster; | |||||
import static org.junit.Assert.*; | |||||
/** | |||||
* Useful for testing clustered flow control. | |||||
* Only used for test. | |||||
* | |||||
* @author Eric Zhao | |||||
* @since 1.4.0 | |||||
*/ | |||||
public final class ClusterFlowTestUtil { | |||||
public static void assertResultPass(TokenResult result) { | |||||
assertNotNull(result); | |||||
assertEquals(TokenResultStatus.OK, (int) result.getStatus()); | |||||
} | |||||
public static void assertResultBlock(TokenResult result) { | |||||
assertNotNull(result); | |||||
assertEquals(TokenResultStatus.BLOCKED, (int) result.getStatus()); | |||||
} | |||||
public static void assertResultWait(TokenResult result, int waitInMs) { | |||||
assertNotNull(result); | |||||
assertEquals(TokenResultStatus.SHOULD_WAIT, (int) result.getStatus()); | |||||
assertEquals(waitInMs, result.getWaitInMs()); | |||||
} | |||||
public static void sleep(int t) { | |||||
try { | |||||
Thread.sleep(t); | |||||
} catch (InterruptedException e) { | |||||
e.printStackTrace(); | |||||
} | |||||
} | |||||
private ClusterFlowTestUtil() {} | |||||
} |
@@ -0,0 +1,79 @@ | |||||
/* | |||||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||||
* | |||||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||||
* you may not use this file except in compliance with the License. | |||||
* You may obtain a copy of the License at | |||||
* | |||||
* http://www.apache.org/licenses/LICENSE-2.0 | |||||
* | |||||
* Unless required by applicable law or agreed to in writing, software | |||||
* distributed under the License is distributed on an "AS IS" BASIS, | |||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
* See the License for the specific language governing permissions and | |||||
* limitations under the License. | |||||
*/ | |||||
package com.alibaba.csp.sentinel.cluster.flow; | |||||
import com.alibaba.csp.sentinel.cluster.TokenResult; | |||||
import com.alibaba.csp.sentinel.cluster.flow.statistic.ClusterMetricStatistics; | |||||
import com.alibaba.csp.sentinel.cluster.flow.statistic.metric.ClusterMetric; | |||||
import com.alibaba.csp.sentinel.slots.block.ClusterRuleConstant; | |||||
import com.alibaba.csp.sentinel.slots.block.flow.ClusterFlowConfig; | |||||
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; | |||||
import org.junit.Ignore; | |||||
import org.junit.Test; | |||||
import static org.junit.Assert.*; | |||||
import static com.alibaba.csp.sentinel.cluster.ClusterFlowTestUtil.*; | |||||
/** | |||||
* @author Eric Zhao | |||||
* @since 1.4.0 | |||||
*/ | |||||
@Ignore | |||||
public class ClusterFlowCheckerTest { | |||||
@Test | |||||
public void testAcquireClusterTokenOccupyPass() { | |||||
long flowId = 98765L; | |||||
final int threshold = 5; | |||||
FlowRule clusterRule = new FlowRule("abc") | |||||
.setCount(threshold) | |||||
.setClusterMode(true) | |||||
.setClusterConfig(new ClusterFlowConfig() | |||||
.setFlowId(flowId) | |||||
.setThresholdType(ClusterRuleConstant.FLOW_THRESHOLD_GLOBAL)); | |||||
int sampleCount = 5; | |||||
int intervalInMs = 1000; | |||||
int bucketLength = intervalInMs / sampleCount; | |||||
ClusterMetric metric = new ClusterMetric(sampleCount, intervalInMs); | |||||
ClusterMetricStatistics.putMetric(flowId, metric); | |||||
System.out.println(System.currentTimeMillis()); | |||||
assertResultPass(tryAcquire(clusterRule, false)); | |||||
assertResultPass(tryAcquire(clusterRule, false)); | |||||
sleep(bucketLength); | |||||
assertResultPass(tryAcquire(clusterRule, false)); | |||||
sleep(bucketLength); | |||||
assertResultPass(tryAcquire(clusterRule, true)); | |||||
assertResultPass(tryAcquire(clusterRule, false)); | |||||
assertResultBlock(tryAcquire(clusterRule, true)); | |||||
sleep(bucketLength); | |||||
assertResultBlock(tryAcquire(clusterRule, false)); | |||||
assertResultBlock(tryAcquire(clusterRule, false)); | |||||
sleep(bucketLength); | |||||
assertResultBlock(tryAcquire(clusterRule, false)); | |||||
assertResultWait(tryAcquire(clusterRule, true), bucketLength); | |||||
assertResultBlock(tryAcquire(clusterRule, false)); | |||||
sleep(bucketLength); | |||||
assertResultPass(tryAcquire(clusterRule, false)); | |||||
ClusterMetricStatistics.removeMetric(flowId); | |||||
} | |||||
private TokenResult tryAcquire(FlowRule clusterRule, boolean occupy) { | |||||
return ClusterFlowChecker.acquireClusterToken(clusterRule, 1, occupy); | |||||
} | |||||
} |