Преглед на файлове

Add implementation for default token server module

Signed-off-by: Eric Zhao <sczyh16@gmail.com>
master
Eric Zhao преди 6 години
родител
ревизия
b973aca1f5
променени са 43 файла, в които са добавени 2762 реда и са изтрити 0 реда
  1. +193
    -0
      sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/ClusterFlowChecker.java
  2. +137
    -0
      sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/ClusterFlowRuleManager.java
  3. +82
    -0
      sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/ClusterParamFlowChecker.java
  4. +138
    -0
      sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/ClusterParamFlowRuleManager.java
  5. +77
    -0
      sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/DefaultTokenService.java
  6. +59
    -0
      sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/ClusterMetricStatistics.java
  7. +59
    -0
      sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/ClusterParamMetricStatistics.java
  8. +42
    -0
      sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/data/ClusterFlowEvent.java
  9. +49
    -0
      sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/data/ClusterMetricBucket.java
  10. +76
    -0
      sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/metric/ClusterMetric.java
  11. +87
    -0
      sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/metric/ClusterMetricLeapArray.java
  12. +74
    -0
      sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/metric/ClusterParamMetric.java
  13. +56
    -0
      sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/metric/ClusterParameterLeapArray.java
  14. +29
    -0
      sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/ClusterTokenServer.java
  15. +175
    -0
      sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/NettyTransportServer.java
  16. +31
    -0
      sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/ServerConstants.java
  17. +64
    -0
      sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/TokenServiceProvider.java
  18. +65
    -0
      sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/codec/DefaultRequestEntityDecoder.java
  19. +53
    -0
      sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/codec/DefaultResponseEntityWriter.java
  20. +66
    -0
      sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/codec/ServerEntityCodecProvider.java
  21. +50
    -0
      sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/codec/data/FlowRequestDataDecoder.java
  22. +34
    -0
      sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/codec/data/FlowResponseDataWriter.java
  23. +91
    -0
      sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/codec/data/ParamFlowRequestDataDecoder.java
  24. +51
    -0
      sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/codec/netty/NettyRequestDecoder.java
  25. +55
    -0
      sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/codec/netty/NettyResponseEncoder.java
  26. +48
    -0
      sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/codec/registry/RequestDataDecodeRegistry.java
  27. +48
    -0
      sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/codec/registry/ResponseDataWriterRegistry.java
  28. +36
    -0
      sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/config/ClusterServerConfigManager.java
  29. +37
    -0
      sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/Connection.java
  30. +85
    -0
      sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/ConnectionGroup.java
  31. +32
    -0
      sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/ConnectionManager.java
  32. +149
    -0
      sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/ConnectionPool.java
  33. +85
    -0
      sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/NettyConnection.java
  34. +43
    -0
      sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/ScanIdleConnectionTask.java
  35. +84
    -0
      sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/handler/TokenServerHandler.java
  36. +51
    -0
      sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/processor/FlowRequestProcessor.java
  37. +53
    -0
      sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/processor/ParamFlowRequestProcessor.java
  38. +38
    -0
      sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/processor/RequestProcessor.java
  39. +49
    -0
      sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/processor/RequestProcessorRegistry.java
  40. +28
    -0
      sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/util/ClusterRuleUtil.java
  41. +1
    -0
      sentinel-cluster/sentinel-cluster-server-default/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.cluster.TokenService
  42. +1
    -0
      sentinel-cluster/sentinel-cluster-server-default/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.cluster.codec.request.RequestEntityDecoder
  43. +1
    -0
      sentinel-cluster/sentinel-cluster-server-default/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.cluster.codec.response.ResponseEntityWriter

+ 193
- 0
sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/ClusterFlowChecker.java Целия файл

@@ -0,0 +1,193 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.flow;

import com.alibaba.csp.sentinel.cluster.TokenResultStatus;
import com.alibaba.csp.sentinel.cluster.TokenResult;
import com.alibaba.csp.sentinel.cluster.flow.statistic.ClusterMetricStatistics;
import com.alibaba.csp.sentinel.cluster.server.config.ClusterServerConfigManager;
import com.alibaba.csp.sentinel.cluster.flow.statistic.data.ClusterFlowEvent;
import com.alibaba.csp.sentinel.cluster.flow.statistic.metric.ClusterMetric;
import com.alibaba.csp.sentinel.slots.block.ClusterRuleConstant;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.util.TimeUtil;

/**
* @author Eric Zhao
* @since 1.4.0
*/
public final class ClusterFlowChecker {

static TokenResult tryAcquireOrBorrowFromRefResource(FlowRule rule, int acquireCount, boolean prioritized) {
// 1. First try acquire its own count.

// TokenResult ownResult = acquireClusterToken(rule, acquireCount, prioritized);
ClusterMetric metric = ClusterMetricStatistics.getMetric(rule.getClusterConfig().getFlowId());
if (metric == null) {
return new TokenResult(TokenResultStatus.FAIL);
}

double latestQps = metric.getAvg(ClusterFlowEvent.PASS_REQUEST);
double globalThreshold = calcGlobalThreshold(rule) * ClusterServerConfigManager.exceedCount;
double nextRemaining = globalThreshold - latestQps - acquireCount;

if (nextRemaining >= 0) {
// TODO: checking logic and metric operation should be separated.
metric.add(ClusterFlowEvent.PASS, acquireCount);
metric.add(ClusterFlowEvent.PASS_REQUEST, 1);
if (prioritized) {
// Add prioritized pass.
metric.add(ClusterFlowEvent.OCCUPIED_PASS, acquireCount);
}
// Remaining count is cut down to a smaller integer.
return new TokenResult(TokenResultStatus.OK)
.setRemaining((int) nextRemaining)
.setWaitInMs(0);
}

if (prioritized) {
double occupyAvg = metric.getAvg(ClusterFlowEvent.WAITING);
if (occupyAvg <= ClusterServerConfigManager.maxOccupyRatio * globalThreshold) {
int waitInMs = metric.tryOccupyNext(ClusterFlowEvent.PASS, acquireCount, globalThreshold);
if (waitInMs > 0) {
return new TokenResult(TokenResultStatus.SHOULD_WAIT)
.setRemaining(0)
.setWaitInMs(waitInMs);
}
// Or else occupy failed, should be blocked.
}
}

// 2. If failed, try to borrow from reference resource.

// Assume it's valid as checked before.
if (!ClusterServerConfigManager.borrowRefEnabled) {
return new TokenResult(TokenResultStatus.NOT_AVAILABLE);
}
Long refFlowId = rule.getClusterConfig().getRefFlowId();
FlowRule refFlowRule = ClusterFlowRuleManager.getFlowRuleById(refFlowId);
if (refFlowRule == null) {
return new TokenResult(TokenResultStatus.NO_REF_RULE_EXISTS);
}
// TODO: check here

ClusterMetric refMetric = ClusterMetricStatistics.getMetric(refFlowId);
if (refMetric == null) {
return new TokenResult(TokenResultStatus.FAIL);
}
double refOrders = refMetric.getAvg(ClusterFlowEvent.PASS);
double refQps = refMetric.getAvg(ClusterFlowEvent.PASS_REQUEST);

double splitRatio = refQps > 0 ? refOrders / refQps : 1;

double selfGlobalThreshold = ClusterServerConfigManager.exceedCount * calcGlobalThreshold(rule);
double refGlobalThreshold = ClusterServerConfigManager.exceedCount * calcGlobalThreshold(refFlowRule);

long currentTime = TimeUtil.currentTimeMillis();
long latestRefTime = 0 /*refFlowRule.clusterQps.getStableWindowStartTime()*/;
int sampleCount = 10;

if (currentTime > latestRefTime
&& (refOrders / refGlobalThreshold + 1.0d / sampleCount >= ((double)(currentTime - latestRefTime)) / 1000)
|| refOrders == refGlobalThreshold) {
return blockedResult();
}

// double latestQps = metric.getAvg(ClusterFlowEvent.PASS);
double refRatio = rule.getClusterConfig().getRefRatio();

if (refOrders / splitRatio + (acquireCount + latestQps) * refRatio
<= refGlobalThreshold / splitRatio + selfGlobalThreshold * refRatio) {
metric.add(ClusterFlowEvent.PASS, acquireCount);
metric.add(ClusterFlowEvent.PASS_REQUEST, 1);

return new TokenResult(TokenResultStatus.OK);
}

// TODO: log here?
metric.add(ClusterFlowEvent.BLOCK, acquireCount);

return blockedResult();
}

private static double calcGlobalThreshold(FlowRule rule) {
double count = rule.getCount();
switch (rule.getClusterConfig().getThresholdType()) {
case ClusterRuleConstant.FLOW_THRESHOLD_GLOBAL:
return count;
case ClusterRuleConstant.FLOW_THRESHOLD_AVG_LOCAL:
default:
// TODO: get real connected count grouped.
int connectedCount = 1;
return count * connectedCount;
}
}

static TokenResult acquireClusterToken(/*@Valid*/ FlowRule rule, int acquireCount, boolean prioritized) {
ClusterMetric metric = ClusterMetricStatistics.getMetric(rule.getClusterConfig().getFlowId());
if (metric == null) {
return new TokenResult(TokenResultStatus.FAIL);
}

double latestQps = metric.getAvg(ClusterFlowEvent.PASS_REQUEST);
double globalThreshold = calcGlobalThreshold(rule) * ClusterServerConfigManager.exceedCount;
double nextRemaining = globalThreshold - latestQps - acquireCount;

if (nextRemaining >= 0) {
// TODO: checking logic and metric operation should be separated.
metric.add(ClusterFlowEvent.PASS, acquireCount);
metric.add(ClusterFlowEvent.PASS_REQUEST, 1);
if (prioritized) {
// Add prioritized pass.
metric.add(ClusterFlowEvent.OCCUPIED_PASS, acquireCount);
}
// Remaining count is cut down to a smaller integer.
return new TokenResult(TokenResultStatus.OK)
.setRemaining((int) nextRemaining)
.setWaitInMs(0);
} else {
if (prioritized) {
double occupyAvg = metric.getAvg(ClusterFlowEvent.WAITING);
if (occupyAvg <= ClusterServerConfigManager.maxOccupyRatio * globalThreshold) {
int waitInMs = metric.tryOccupyNext(ClusterFlowEvent.PASS, acquireCount, globalThreshold);
if (waitInMs > 0) {
return new TokenResult(TokenResultStatus.SHOULD_WAIT)
.setRemaining(0)
.setWaitInMs(waitInMs);
}
// Or else occupy failed, should be blocked.
}
}
// Blocked.
metric.add(ClusterFlowEvent.BLOCK, acquireCount);
metric.add(ClusterFlowEvent.BLOCK_REQUEST, 1);
if (prioritized) {
// Add prioritized block.
metric.add(ClusterFlowEvent.OCCUPIED_BLOCK, acquireCount);
}

return blockedResult();
}
}

private static TokenResult blockedResult() {
return new TokenResult(TokenResultStatus.BLOCKED)
.setRemaining(0)
.setWaitInMs(0);
}

private ClusterFlowChecker() {}
}

+ 137
- 0
sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/ClusterFlowRuleManager.java Целия файл

@@ -0,0 +1,137 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.flow;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import com.alibaba.csp.sentinel.cluster.flow.statistic.ClusterMetricStatistics;
import com.alibaba.csp.sentinel.cluster.flow.statistic.metric.ClusterMetric;
import com.alibaba.csp.sentinel.cluster.server.util.ClusterRuleUtil;
import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.property.DynamicSentinelProperty;
import com.alibaba.csp.sentinel.property.PropertyListener;
import com.alibaba.csp.sentinel.property.SentinelProperty;
import com.alibaba.csp.sentinel.slots.block.RuleConstant;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleUtil;
import com.alibaba.csp.sentinel.util.StringUtil;

/**
* @author Eric Zhao
* @since 1.4.0
*/
public final class ClusterFlowRuleManager {

private static final Map<Long, FlowRule> FLOW_RULES = new ConcurrentHashMap<>();

private static final PropertyListener<List<FlowRule>> PROPERTY_LISTENER = new FlowRulePropertyListener();
private static SentinelProperty<List<FlowRule>> currentProperty = new DynamicSentinelProperty<>();

static {
currentProperty.addListener(PROPERTY_LISTENER);
}

/**
* Listen to the {@link SentinelProperty} for {@link FlowRule}s.
* The property is the source of cluster {@link FlowRule}s.
*
* @param property the property to listen.
*/
public static void register2Property(SentinelProperty<List<FlowRule>> property) {
synchronized (PROPERTY_LISTENER) {
RecordLog.info("[ClusterFlowRuleManager] Registering new property to cluster flow rule manager");
currentProperty.removeListener(PROPERTY_LISTENER);
property.addListener(PROPERTY_LISTENER);
currentProperty = property;
}
}

public static FlowRule getFlowRuleById(Long id) {
if (!ClusterRuleUtil.validId(id)) {
return null;
}
return FLOW_RULES.get(id);
}

private static Map<Long, FlowRule> buildClusterFlowRuleMap(List<FlowRule> list) {
Map<Long, FlowRule> ruleMap = new ConcurrentHashMap<>();
if (list == null || list.isEmpty()) {
return ruleMap;
}

for (FlowRule rule : list) {
if (!rule.isClusterMode()) {
continue;
}
if (!FlowRuleUtil.isValidRule(rule)) {
RecordLog.warn(
"[ClusterFlowRuleManager] Ignoring invalid flow rule when loading new flow rules: " + rule);
continue;
}
if (StringUtil.isBlank(rule.getLimitApp())) {
rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
}

// Flow id should not be null after filtered.
Long flowId = rule.getClusterConfig().getFlowId();
if (flowId == null) {
continue;
}
ruleMap.put(flowId, rule);

// Prepare cluster metric from valid flow ID.
ClusterMetricStatistics.putMetricIfAbsent(flowId, new ClusterMetric(100, 1));
}

// Cleanup unused cluster metrics.
Set<Long> previousSet = FLOW_RULES.keySet();
for (Long id : previousSet) {
if (!ruleMap.containsKey(id)) {
ClusterMetricStatistics.removeMetric(id);
}
}

return ruleMap;
}

private static final class FlowRulePropertyListener implements PropertyListener<List<FlowRule>> {

@Override
public void configUpdate(List<FlowRule> conf) {
Map<Long, FlowRule> rules = buildClusterFlowRuleMap(conf);
if (rules != null) {
FLOW_RULES.clear();
FLOW_RULES.putAll(rules);
}
RecordLog.info("[ClusterFlowRuleManager] Cluster flow rules received: " + FLOW_RULES);
}

@Override
public void configLoad(List<FlowRule> conf) {
Map<Long, FlowRule> rules = buildClusterFlowRuleMap(conf);
if (rules != null) {
FLOW_RULES.clear();
FLOW_RULES.putAll(rules);
}
RecordLog.info("[ClusterFlowRuleManager] Cluster flow rules loaded: " + FLOW_RULES);
}
}

private ClusterFlowRuleManager() {}
}

+ 82
- 0
sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/ClusterParamFlowChecker.java Целия файл

@@ -0,0 +1,82 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.flow;

import java.util.Collection;

import com.alibaba.csp.sentinel.cluster.TokenResult;
import com.alibaba.csp.sentinel.cluster.TokenResultStatus;
import com.alibaba.csp.sentinel.cluster.flow.statistic.ClusterParamMetricStatistics;
import com.alibaba.csp.sentinel.cluster.flow.statistic.metric.ClusterParamMetric;
import com.alibaba.csp.sentinel.slots.block.ClusterRuleConstant;
import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowRule;

/**
* @author Eric Zhao
*/
public final class ClusterParamFlowChecker {

static TokenResult acquireClusterToken(ParamFlowRule rule, int count, Collection<Object> values) {
ClusterParamMetric metric = ClusterParamMetricStatistics.getMetric(rule.getClusterConfig().getFlowId());
if (metric == null) {
// Unexpected state, return FAIL.
return new TokenResult(TokenResultStatus.FAIL);
}
boolean hasPassed = true;
Object blockObject = null;
for (Object value : values) {
// TODO: origin is int * int, but current double!
double curCount = metric.getAvg(value);

double threshold = calcGlobalThreshold(rule);
if (++curCount > threshold) {
hasPassed = false;
blockObject = value;
break;
}
}

if (hasPassed) {
for (Object value : values) {
metric.addValue(value, count);
}
} else {
// TODO: log <blocked object> here?
}

return hasPassed ? newRawResponse(TokenResultStatus.OK): newRawResponse(TokenResultStatus.BLOCKED);
}

private static TokenResult newRawResponse(int status) {
return new TokenResult(status)
.setRemaining(0)
.setWaitInMs(0);
}

private static double calcGlobalThreshold(ParamFlowRule rule) {
double count = rule.getCount();
switch (rule.getClusterConfig().getThresholdType()) {
case ClusterRuleConstant.FLOW_THRESHOLD_GLOBAL:
return count;
case ClusterRuleConstant.FLOW_THRESHOLD_AVG_LOCAL:
default:
int connectedCount = 1; // TODO: get real connected count grouped.
return count * connectedCount;
}
}

private ClusterParamFlowChecker() {}
}

+ 138
- 0
sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/ClusterParamFlowRuleManager.java Целия файл

@@ -0,0 +1,138 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.flow;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import com.alibaba.csp.sentinel.cluster.flow.statistic.ClusterParamMetricStatistics;
import com.alibaba.csp.sentinel.cluster.flow.statistic.metric.ClusterParamMetric;
import com.alibaba.csp.sentinel.cluster.server.util.ClusterRuleUtil;
import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.property.DynamicSentinelProperty;
import com.alibaba.csp.sentinel.property.PropertyListener;
import com.alibaba.csp.sentinel.property.SentinelProperty;
import com.alibaba.csp.sentinel.slots.block.RuleConstant;
import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowRuleUtil;
import com.alibaba.csp.sentinel.util.StringUtil;

/**
* @author Eric Zhao
* @since 1.4.0
*/
public final class ClusterParamFlowRuleManager {

private static final Map<Long, ParamFlowRule> PARAM_RULES = new ConcurrentHashMap<>();

private final static RulePropertyListener PROPERTY_LISTENER = new RulePropertyListener();
private static SentinelProperty<List<ParamFlowRule>> currentProperty
= new DynamicSentinelProperty<List<ParamFlowRule>>();

static {
currentProperty.addListener(PROPERTY_LISTENER);
}

/**
* Listen to the {@link SentinelProperty} for {@link ParamFlowRule}s.
* The property is the source of {@link ParamFlowRule}s.
*
* @param property the property to listen
*/
public static void register2Property(SentinelProperty<List<ParamFlowRule>> property) {
synchronized (PROPERTY_LISTENER) {
currentProperty.removeListener(PROPERTY_LISTENER);
property.addListener(PROPERTY_LISTENER);
currentProperty = property;
RecordLog.info("[ClusterParamFlowRuleManager] New property has been registered to cluster param rule manager");
}
}

public static ParamFlowRule getParamFlowRuleById(Long id) {
if (!ClusterRuleUtil.validId(id)) {
return null;
}
return PARAM_RULES.get(id);
}

static class RulePropertyListener implements PropertyListener<List<ParamFlowRule>> {

@Override
public void configUpdate(List<ParamFlowRule> conf) {
Map<Long, ParamFlowRule> rules = buildClusterRuleMap(conf);
if (rules != null) {
PARAM_RULES.clear();
PARAM_RULES.putAll(rules);
}
RecordLog.info("[ClusterFlowRuleManager] Cluster param flow rules received: " + PARAM_RULES);
}

@Override
public void configLoad(List<ParamFlowRule> conf) {
Map<Long, ParamFlowRule> rules = buildClusterRuleMap(conf);
if (rules != null) {
PARAM_RULES.clear();
PARAM_RULES.putAll(rules);
}
RecordLog.info("[ClusterFlowRuleManager] Cluster param flow rules received: " + PARAM_RULES);
}
}

private static Map<Long, ParamFlowRule> buildClusterRuleMap(List<ParamFlowRule> list) {
Map<Long, ParamFlowRule> ruleMap = new ConcurrentHashMap<>();
if (list == null || list.isEmpty()) {
return ruleMap;
}

for (ParamFlowRule rule : list) {
if (!rule.isClusterMode()) {
continue;
}
if (!ParamFlowRuleUtil.isValidRule(rule)) {
RecordLog.warn(
"[ClusterParamFlowRuleManager] Ignoring invalid param flow rule when loading new flow rules: " + rule);
continue;
}
if (StringUtil.isBlank(rule.getLimitApp())) {
rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
}

// Flow id should not be null after filtered.
Long flowId = rule.getClusterConfig().getFlowId();
if (flowId == null) {
continue;
}
ruleMap.put(flowId, rule);

// Prepare cluster metric from valid flow ID.
ClusterParamMetricStatistics.putMetricIfAbsent(flowId, new ClusterParamMetric(100, 1));
}

// Cleanup unused cluster metrics.
Set<Long> previousSet = PARAM_RULES.keySet();
for (Long id : previousSet) {
if (!ruleMap.containsKey(id)) {
ClusterParamMetricStatistics.removeMetric(id);
}
}

return ruleMap;
}

private ClusterParamFlowRuleManager() {}
}

+ 77
- 0
sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/DefaultTokenService.java Целия файл

@@ -0,0 +1,77 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.flow;

import java.util.Collection;

import com.alibaba.csp.sentinel.cluster.TokenResultStatus;
import com.alibaba.csp.sentinel.cluster.TokenResult;
import com.alibaba.csp.sentinel.cluster.TokenService;
import com.alibaba.csp.sentinel.slots.block.ClusterRuleConstant;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowRule;

/**
* Default implementation for cluster {@link TokenService}.
*
* @author Eric Zhao
* @since 1.4.0
*/
public class DefaultTokenService implements TokenService {

@Override
public TokenResult requestToken(Long ruleId, int acquireCount, boolean prioritized) {
if (notValidRequest(ruleId, acquireCount)) {
return badRequest();
}
// The rule should be valid.
FlowRule rule = ClusterFlowRuleManager.getFlowRuleById(ruleId);
if (rule == null) {
return new TokenResult(TokenResultStatus.NO_RULE_EXISTS);
}
if (isUsingReference(rule)) {
return ClusterFlowChecker.tryAcquireOrBorrowFromRefResource(rule, acquireCount, prioritized);
}

return ClusterFlowChecker.acquireClusterToken(rule, acquireCount, prioritized);
}

private boolean isUsingReference(FlowRule rule) {
return rule.getClusterConfig().getStrategy() == ClusterRuleConstant.FLOW_CLUSTER_STRATEGY_REF;
}

@Override
public TokenResult requestParamToken(Long ruleId, int acquireCount, Collection<Object> params) {
if (notValidRequest(ruleId, acquireCount) || params == null || params.isEmpty()) {
return badRequest();
}
// The rule should be valid.
ParamFlowRule rule = ClusterParamFlowRuleManager.getParamFlowRuleById(ruleId);
if (rule == null) {
return new TokenResult(TokenResultStatus.NO_RULE_EXISTS);
}

return ClusterParamFlowChecker.acquireClusterToken(rule, acquireCount, params);
}

private boolean notValidRequest(Long id, int count) {
return id == null || id <= 0 || count <= 0;
}

private TokenResult badRequest() {
return new TokenResult(TokenResultStatus.BAD_REQUEST);
}
}

+ 59
- 0
sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/ClusterMetricStatistics.java Целия файл

@@ -0,0 +1,59 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.flow.statistic;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import com.alibaba.csp.sentinel.cluster.flow.statistic.metric.ClusterMetric;
import com.alibaba.csp.sentinel.util.AssertUtil;

/**
* @author Eric Zhao
* @since 1.4.0
*/
public final class ClusterMetricStatistics {

private static final Map<Long, ClusterMetric> METRIC_MAP = new ConcurrentHashMap<>();

public static void clear() {
METRIC_MAP.clear();
}

public static void putMetric(long id, ClusterMetric metric) {
AssertUtil.notNull(metric, "Cluster metric cannot be null");
METRIC_MAP.put(id, metric);
}

public static boolean putMetricIfAbsent(long id, ClusterMetric metric) {
AssertUtil.notNull(metric, "Cluster metric cannot be null");
if (METRIC_MAP.containsKey(id)) {
return false;
}
METRIC_MAP.put(id, metric);
return true;
}

public static void removeMetric(long id) {
METRIC_MAP.remove(id);
}

public static ClusterMetric getMetric(long id) {
return METRIC_MAP.get(id);
}

private ClusterMetricStatistics() {}
}

+ 59
- 0
sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/ClusterParamMetricStatistics.java Целия файл

@@ -0,0 +1,59 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.flow.statistic;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import com.alibaba.csp.sentinel.cluster.flow.statistic.metric.ClusterParamMetric;
import com.alibaba.csp.sentinel.util.AssertUtil;

/**
* @author Eric Zhao
* @since 1.4.0
*/
public final class ClusterParamMetricStatistics {

private static final Map<Long, ClusterParamMetric> METRIC_MAP = new ConcurrentHashMap<>();

public static void clear() {
METRIC_MAP.clear();
}

public static void putMetric(long id, ClusterParamMetric metric) {
AssertUtil.notNull(metric, "metric cannot be null");
METRIC_MAP.put(id, metric);
}

public static boolean putMetricIfAbsent(long id, ClusterParamMetric metric) {
AssertUtil.notNull(metric, "metric cannot be null");
if (METRIC_MAP.containsKey(id)) {
return false;
}
METRIC_MAP.put(id, metric);
return true;
}

public static void removeMetric(long id) {
METRIC_MAP.remove(id);
}

public static ClusterParamMetric getMetric(long id) {
return METRIC_MAP.get(id);
}

private ClusterParamMetricStatistics() {}
}

+ 42
- 0
sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/data/ClusterFlowEvent.java Целия файл

@@ -0,0 +1,42 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.flow.statistic.data;

/**
* @author Eric Zhao
*/
public enum ClusterFlowEvent {

/**
* Normal pass.
*/
PASS,
/**
* Normal block.
*/
BLOCK,
/**
* Token request (from client) passed.
*/
PASS_REQUEST,
/**
* Token request (from client) blocked.
*/
BLOCK_REQUEST,
OCCUPIED_PASS,
OCCUPIED_BLOCK,
WAITING
}

+ 49
- 0
sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/data/ClusterMetricBucket.java Целия файл

@@ -0,0 +1,49 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.flow.statistic.data;

import com.alibaba.csp.sentinel.slots.statistic.base.LongAdder;

/**
* @author Eric Zhao
*/
public class ClusterMetricBucket {

private final LongAdder[] counters;

public ClusterMetricBucket() {
ClusterFlowEvent[] events = ClusterFlowEvent.values();
this.counters = new LongAdder[events.length];
for (ClusterFlowEvent event : events) {
counters[event.ordinal()] = new LongAdder();
}
}

public void reset() {
for (ClusterFlowEvent event : ClusterFlowEvent.values()) {
counters[event.ordinal()].reset();
}
}

public long get(ClusterFlowEvent event) {
return counters[event.ordinal()].sum();
}

public ClusterMetricBucket add(ClusterFlowEvent event, long count) {
counters[event.ordinal()].add(count);
return this;
}
}

+ 76
- 0
sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/metric/ClusterMetric.java Целия файл

@@ -0,0 +1,76 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.flow.statistic.metric;

import java.util.List;

import com.alibaba.csp.sentinel.cluster.flow.statistic.data.ClusterFlowEvent;
import com.alibaba.csp.sentinel.cluster.flow.statistic.data.ClusterMetricBucket;

/**
* @author Eric Zhao
* @since 1.4.0
*/
public class ClusterMetric {

private final ClusterMetricLeapArray metric;

public ClusterMetric(int windowLengthInMs, int intervalInSec) {
this.metric = new ClusterMetricLeapArray(windowLengthInMs, intervalInSec);
}

public void add(ClusterFlowEvent event, long count) {
metric.currentWindow().value().add(event, count);
}

public long getCurrentCount(ClusterFlowEvent event) {
return metric.currentWindow().value().get(event);
}

public long getSum(ClusterFlowEvent event) {
metric.currentWindow();
long sum = 0;

List<ClusterMetricBucket> buckets = metric.values();
for (ClusterMetricBucket bucket : buckets) {
sum += bucket.get(event);
}
return sum;
}

public double getAvg(ClusterFlowEvent event) {
return getSum(event) / metric.getIntervalInSecond();
}

/**
*
* @return time to wait for next bucket (in ms); 0 if cannot occupy next buckets
*/
public int tryOccupyNext(ClusterFlowEvent event, int acquireCount, double threshold) {
double latestQps = getAvg(ClusterFlowEvent.PASS);
if (!canOccupy(event, acquireCount, latestQps, threshold)) {
return 0;
}
metric.addOccupyPass(acquireCount);
add(ClusterFlowEvent.WAITING, acquireCount);
return 1000 / metric.getSampleCount();
}

private boolean canOccupy(ClusterFlowEvent event, int acquireCount, double latestQps, double threshold) {
// TODO
return metric.getOccupiedCount(event) + latestQps + acquireCount /*- xxx*/ <= threshold;
}
}

+ 87
- 0
sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/metric/ClusterMetricLeapArray.java Целия файл

@@ -0,0 +1,87 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.flow.statistic.metric;

import com.alibaba.csp.sentinel.cluster.flow.statistic.data.ClusterFlowEvent;
import com.alibaba.csp.sentinel.cluster.flow.statistic.data.ClusterMetricBucket;
import com.alibaba.csp.sentinel.slots.statistic.base.LeapArray;
import com.alibaba.csp.sentinel.slots.statistic.base.LongAdder;
import com.alibaba.csp.sentinel.slots.statistic.base.WindowWrap;

/**
* @author Eric Zhao
* @since 1.4.0
*/
public class ClusterMetricLeapArray extends LeapArray<ClusterMetricBucket> {

private final LongAdder[] occupyCounter;
private boolean hasOccupied = false;

/**
* The total bucket count is: {@link #sampleCount} = intervalInSec * 1000 / windowLengthInMs.
*
* @param windowLengthInMs a single window bucket's time length in milliseconds.
* @param intervalInSec the total time span of this {@link LeapArray} in seconds.
*/
public ClusterMetricLeapArray(int windowLengthInMs, int intervalInSec) {
super(windowLengthInMs, intervalInSec);
ClusterFlowEvent[] events = ClusterFlowEvent.values();
this.occupyCounter = new LongAdder[events.length];
for (ClusterFlowEvent event : events) {
occupyCounter[event.ordinal()] = new LongAdder();
}
}

@Override
public ClusterMetricBucket newEmptyBucket() {
return new ClusterMetricBucket();
}

@Override
protected WindowWrap<ClusterMetricBucket> resetWindowTo(WindowWrap<ClusterMetricBucket> w, long startTime) {
w.resetTo(startTime);
w.value().reset();
transferOccupyToBucket(w.value());
return w;
}

private void transferOccupyToBucket(/*@Valid*/ ClusterMetricBucket bucket) {
if (hasOccupied) {
transferOccupiedCount(bucket, ClusterFlowEvent.PASS, ClusterFlowEvent.OCCUPIED_PASS);
transferOccupiedThenReset(bucket, ClusterFlowEvent.PASS);
transferOccupiedThenReset(bucket, ClusterFlowEvent.PASS_REQUEST);
hasOccupied = false;
}
}

private void transferOccupiedCount(ClusterMetricBucket bucket, ClusterFlowEvent source, ClusterFlowEvent target) {
bucket.add(target, occupyCounter[source.ordinal()].sum());
}

private void transferOccupiedThenReset(ClusterMetricBucket bucket, ClusterFlowEvent event) {
bucket.add(event, occupyCounter[event.ordinal()].sumThenReset());
}

public void addOccupyPass(int count) {
occupyCounter[ClusterFlowEvent.PASS.ordinal()].add(count);
occupyCounter[ClusterFlowEvent.PASS_REQUEST.ordinal()].add(1);
this.hasOccupied = true;
}

public long getOccupiedCount(ClusterFlowEvent event) {
return occupyCounter[event.ordinal()].sum();
}
}

+ 74
- 0
sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/metric/ClusterParamMetric.java Целия файл

@@ -0,0 +1,74 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.flow.statistic.metric;

import java.util.List;

import com.alibaba.csp.sentinel.slots.statistic.base.LongAdder;
import com.alibaba.csp.sentinel.slots.statistic.cache.CacheMap;

/**
* @author Eric Zhao
*/
public class ClusterParamMetric {

private final ClusterParameterLeapArray<LongAdder> metric;

public ClusterParamMetric(int windowLengthInMs, int intervalInSec) {
this.metric = new ClusterParameterLeapArray<>(windowLengthInMs, intervalInSec);
}

public ClusterParamMetric(int windowLengthInMs, int intervalInSec, int maxCapacity) {
this.metric = new ClusterParameterLeapArray<>(windowLengthInMs, intervalInSec, maxCapacity);
}

public long getSum(Object value) {
if (value == null) {
return 0;
}

metric.currentWindow();
long sum = 0;

List<CacheMap<Object, LongAdder>> buckets = metric.values();
for (CacheMap<Object, LongAdder> bucket : buckets) {
sum += getCount(bucket.get(value));
}
return sum;
}

private long getCount(/*@Nullable*/ LongAdder adder) {
return adder == null ? 0 : adder.sum();
}

public void addValue(Object value, int count) {
if (value == null) {
return;
}
CacheMap<Object, LongAdder> data = metric.currentWindow().value();
LongAdder newCounter = new LongAdder();
LongAdder currentCounter = data.putIfAbsent(value, newCounter);
if (currentCounter != null) {
currentCounter.add(count);
} else {
newCounter.add(count);
}
}

public double getAvg(Object value) {
return getSum(value) / metric.getIntervalInSecond();
}
}

+ 56
- 0
sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/flow/statistic/metric/ClusterParameterLeapArray.java Целия файл

@@ -0,0 +1,56 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.flow.statistic.metric;

import com.alibaba.csp.sentinel.slots.statistic.base.LeapArray;
import com.alibaba.csp.sentinel.slots.statistic.base.WindowWrap;
import com.alibaba.csp.sentinel.slots.statistic.cache.CacheMap;
import com.alibaba.csp.sentinel.slots.statistic.cache.ConcurrentLinkedHashMapWrapper;
import com.alibaba.csp.sentinel.util.AssertUtil;

/**
* @author Eric Zhao
* @param <C> counter type
* @since 1.4.0
*/
public class ClusterParameterLeapArray<C> extends LeapArray<CacheMap<Object, C>> {

private final int maxCapacity;

public ClusterParameterLeapArray(int windowLengthInMs, int intervalInSec) {
this(windowLengthInMs, intervalInSec, DEFAULT_CLUSTER_MAX_CAPACITY);
}

public ClusterParameterLeapArray(int windowLengthInMs, int intervalInSec, int maxCapacity) {
super(windowLengthInMs, intervalInSec);
AssertUtil.isTrue(maxCapacity > 0, "maxCapacity of LRU map should be positive");
this.maxCapacity = maxCapacity;
}

@Override
public CacheMap<Object, C> newEmptyBucket() {
return new ConcurrentLinkedHashMapWrapper<>(maxCapacity);
}

@Override
protected WindowWrap<CacheMap<Object, C>> resetWindowTo(WindowWrap<CacheMap<Object, C>> w,
long startTime) {
w.value().clear();
return w;
}

public static final int DEFAULT_CLUSTER_MAX_CAPACITY = 4000;
}

+ 29
- 0
sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/ClusterTokenServer.java Целия файл

@@ -0,0 +1,29 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.server;

/**
* Token server interface for distributed flow control.
*
* @author Eric Zhao
* @since 1.4.0
*/
public interface ClusterTokenServer {

void start() throws Exception;

void stop() throws Exception;
}

+ 175
- 0
sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/NettyTransportServer.java Целия файл

@@ -0,0 +1,175 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.server;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import com.alibaba.csp.sentinel.cluster.server.codec.netty.NettyRequestDecoder;
import com.alibaba.csp.sentinel.cluster.server.codec.netty.NettyResponseEncoder;
import com.alibaba.csp.sentinel.cluster.server.connection.Connection;
import com.alibaba.csp.sentinel.cluster.server.connection.ConnectionPool;
import com.alibaba.csp.sentinel.cluster.server.handler.TokenServerHandler;
import com.alibaba.csp.sentinel.log.RecordLog;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.internal.SystemPropertyUtil;

import static com.alibaba.csp.sentinel.cluster.server.ServerConstants.*;

/**
* @author Eric Zhao
*/
public class NettyTransportServer implements ClusterTokenServer {

private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));

private final int port = 11111;

private NioEventLoopGroup bossGroup;
private NioEventLoopGroup workerGroup;

private final ConnectionPool connectionPool = new ConnectionPool();

private final AtomicInteger currentState = new AtomicInteger(SERVER_STATUS_OFF);
private final AtomicInteger failedTimes = new AtomicInteger(0);

@Override
public void start() {
if (!currentState.compareAndSet(SERVER_STATUS_OFF, SERVER_STATUS_STARTING)) {
return;
}

ServerBootstrap b = new ServerBootstrap();
this.bossGroup = new NioEventLoopGroup(1);
this.workerGroup = new NioEventLoopGroup(DEFAULT_EVENT_LOOP_THREADS);
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));
p.addLast(new NettyRequestDecoder());
p.addLast(new LengthFieldPrepender(2));
p.addLast(new NettyResponseEncoder());
p.addLast(new TokenServerHandler(connectionPool));
}
})
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.SO_SNDBUF, 32 * 1024)
.childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
.childOption(ChannelOption.SO_TIMEOUT, 10)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_RCVBUF, 32 * 1024);
b.bind(Integer.valueOf(port)).addListener(new GenericFutureListener<ChannelFuture>() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.cause() != null) {
RecordLog.info("Token server start failed", future.cause());
currentState.compareAndSet(SERVER_STATUS_STARTING, SERVER_STATUS_OFF);

//try {
// Thread.sleep((failStartTimes.get() + 1) * 1000);
// start();
//} catch (Throwable e) {
// RecordLog.info("Fail to start token server:", e);
//}
} else {
RecordLog.info("Token server start success");
currentState.compareAndSet(SERVER_STATUS_STARTING, SERVER_STATUS_STARTED);
//failStartTimes.set(0);
}
}
});
}

@Override
public void stop() {
// If still initializing, wait for ready.
while (currentState.get() == SERVER_STATUS_STARTING) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

if (currentState.compareAndSet(SERVER_STATUS_STARTED, SERVER_STATUS_OFF)) {
try {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
connectionPool.shutdownAll();

failedTimes.set(0);

RecordLog.info("Token server stopped");
} catch (Exception ex) {
RecordLog.warn("Failed to stop token server", ex);
}
}
}

public void refreshRunningServer() {
connectionPool.refreshIdleTask();
}

public void closeConnection(String clientIp, int clientPort) throws Exception {
Connection connection = connectionPool.getConnection(clientIp, clientPort);
connection.close();
}

public void closeAll() throws Exception {
List<Connection> connections = connectionPool.listAllConnection();
for (Connection connection : connections) {
connection.close();
}
}

public List<String> listAllClient() {
List<String> clients = new ArrayList<String>();
List<Connection> connections = connectionPool.listAllConnection();
for (Connection conn : connections) {
clients.add(conn.getConnectionKey());
}
return clients;
}

public int getCurrentState() {
return currentState.get();
}

public int clientCount() {
return connectionPool.count();
}
}

+ 31
- 0
sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/ServerConstants.java Целия файл

@@ -0,0 +1,31 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.server;

/**
* @author Eric Zhao
* @since 1.4.0
*/
public final class ServerConstants {

public static final int SERVER_STATUS_OFF = 0;
public static final int SERVER_STATUS_STARTING = 1;
public static final int SERVER_STATUS_STARTED = 2;

public static final String DEFAULT_NAMESPACE = "default";

private ServerConstants() {}
}

+ 64
- 0
sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/TokenServiceProvider.java Целия файл

@@ -0,0 +1,64 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.server;

import java.util.ArrayList;
import java.util.List;
import java.util.ServiceLoader;

import com.alibaba.csp.sentinel.cluster.TokenService;
import com.alibaba.csp.sentinel.cluster.flow.DefaultTokenService;
import com.alibaba.csp.sentinel.log.RecordLog;

/**
* @author Eric Zhao
* @since 1.4.0
*/
public final class TokenServiceProvider {

private static TokenService service = null;

private static final ServiceLoader<TokenService> LOADER = ServiceLoader.load(TokenService.class);

static {
resolveTokenServiceSpi();
}

public static TokenService getService() {
return service;
}

private static void resolveTokenServiceSpi() {
boolean hasOther = false;
List<TokenService> list = new ArrayList<TokenService>();
for (TokenService service : LOADER) {
if (service.getClass() != DefaultTokenService.class) {
hasOther = true;
list.add(service);
}
}

if (hasOther) {
service = list.get(0);
} else {
// No custom token service, using default.
service = new DefaultTokenService();
}

RecordLog.info("[TokenServiceProvider] Global token service resolved: "
+ service.getClass().getCanonicalName());
}
}

+ 65
- 0
sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/codec/DefaultRequestEntityDecoder.java Целия файл

@@ -0,0 +1,65 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.server.codec;

import com.alibaba.csp.sentinel.cluster.codec.EntityDecoder;
import com.alibaba.csp.sentinel.cluster.codec.request.RequestEntityDecoder;
import com.alibaba.csp.sentinel.cluster.request.ClusterRequest;
import com.alibaba.csp.sentinel.cluster.server.codec.registry.RequestDataDecodeRegistry;
import com.alibaba.csp.sentinel.log.RecordLog;

import io.netty.buffer.ByteBuf;

/**
* <p>Default entity decoder for any {@link ClusterRequest} entity.</p>
*
* <p>Decode format:</p>
* <pre>
* +--------+---------+---------+
* | xid(4) | type(1) | data... |
* +--------+---------+---------+
* </pre>
*
* @author Eric Zhao
* @since 1.4.0
*/
public class DefaultRequestEntityDecoder implements RequestEntityDecoder<ByteBuf, ClusterRequest> {

@Override
public ClusterRequest decode(ByteBuf source) {
if (source.readableBytes() >= 5) {
int xid = source.readInt();
int type = source.readByte();

EntityDecoder<ByteBuf, ?> dataDecoder = RequestDataDecodeRegistry.getDecoder(type);
if (dataDecoder == null) {
RecordLog.warn("Unknown type of request data decoder: {0}", type);
return null;
}

Object data;
if (source.readableBytes() == 0) {
data = null;
} else {
// TODO: handle decode error here.
data = dataDecoder.decode(source);
}

return new ClusterRequest<>(xid, type, data);
}
return null;
}
}

+ 53
- 0
sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/codec/DefaultResponseEntityWriter.java Целия файл

@@ -0,0 +1,53 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.server.codec;

import com.alibaba.csp.sentinel.cluster.ClusterConstants;
import com.alibaba.csp.sentinel.cluster.codec.EntityWriter;
import com.alibaba.csp.sentinel.cluster.codec.response.ResponseEntityWriter;
import com.alibaba.csp.sentinel.cluster.response.ClusterResponse;
import com.alibaba.csp.sentinel.cluster.response.Response;
import com.alibaba.csp.sentinel.cluster.server.codec.registry.ResponseDataWriterRegistry;
import com.alibaba.csp.sentinel.log.RecordLog;

import io.netty.buffer.ByteBuf;

/**
* @author Eric Zhao
* @since 1.4.0
*/
public class DefaultResponseEntityWriter implements ResponseEntityWriter<ClusterResponse, ByteBuf> {

@Override
public void writeTo(ClusterResponse response, ByteBuf out) {
int type = response.getType();
EntityWriter<Object, ByteBuf> responseDataWriter = ResponseDataWriterRegistry.getWriter(type);

if (responseDataWriter == null) {
writeHead(response.setStatus(ClusterConstants.RESPONSE_STATUS_BAD), out);
RecordLog.warn("[NettyResponseEncoder] Cannot find matching writer for type <{0}>", response.getType());
return;
}
writeHead(response, out);
responseDataWriter.writeTo(response.getData(), out);
}

private void writeHead(Response response, ByteBuf out) {
out.writeInt(response.getId());
out.writeByte(response.getType());
out.writeByte(response.getStatus());
}
}

+ 66
- 0
sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/codec/ServerEntityCodecProvider.java Целия файл

@@ -0,0 +1,66 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.server.codec;

import com.alibaba.csp.sentinel.cluster.codec.request.RequestEntityDecoder;
import com.alibaba.csp.sentinel.cluster.codec.response.ResponseEntityWriter;
import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.util.SpiLoader;

/**
* @author Eric Zhao
* @since 1.4.0
*/
public final class ServerEntityCodecProvider {

private static RequestEntityDecoder requestEntityDecoder = null;
private static ResponseEntityWriter responseEntityWriter = null;

static {
resolveInstance();
}

private static void resolveInstance() {
ResponseEntityWriter writer = SpiLoader.loadFirstInstance(ResponseEntityWriter.class);
if (writer == null) {
RecordLog.warn("[ServerEntityCodecProvider] No existing response entity writer, resolve failed");
} else {
responseEntityWriter = writer;
RecordLog.info(
"[ServerEntityCodecProvider] Response entity writer resolved: " + responseEntityWriter.getClass()
.getCanonicalName());
}
RequestEntityDecoder decoder = SpiLoader.loadFirstInstance(RequestEntityDecoder.class);
if (decoder == null) {
RecordLog.warn("[ServerEntityCodecProvider] No existing request entity decoder, resolve failed");
} else {
requestEntityDecoder = decoder;
RecordLog.info(
"[ServerEntityCodecProvider] Request entity decoder resolved: " + requestEntityDecoder.getClass()
.getCanonicalName());
}
}

public static RequestEntityDecoder getRequestEntityDecoder() {
return requestEntityDecoder;
}

public static ResponseEntityWriter getResponseEntityWriter() {
return responseEntityWriter;
}

private ServerEntityCodecProvider() {}
}

+ 50
- 0
sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/codec/data/FlowRequestDataDecoder.java Целия файл

@@ -0,0 +1,50 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.server.codec.data;

import com.alibaba.csp.sentinel.cluster.codec.EntityDecoder;
import com.alibaba.csp.sentinel.cluster.request.data.FlowRequestData;

import io.netty.buffer.ByteBuf;

/**
* <p>
* Decoder for {@link FlowRequestData} from {@code ByteBuf} stream. The layout:
* </p>
* <pre>
* | flow ID (4) | count (4) | priority flag (1) |
* </pre>
*
* @author Eric Zhao
* @since 1.4.0
*/
public class FlowRequestDataDecoder implements EntityDecoder<ByteBuf, FlowRequestData> {

@Override
public FlowRequestData decode(ByteBuf source) {
if (source.readableBytes() >= 12) {
FlowRequestData requestData = new FlowRequestData()
.setFlowId(source.readLong())
.setCount(source.readInt());
if (source.readableBytes() >= 1) {
requestData.setPriority(source.readBoolean());
}
return requestData;
}
// TODO: handle null here.
return null;
}
}

+ 34
- 0
sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/codec/data/FlowResponseDataWriter.java Целия файл

@@ -0,0 +1,34 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.server.codec.data;

import com.alibaba.csp.sentinel.cluster.codec.EntityWriter;
import com.alibaba.csp.sentinel.cluster.response.data.FlowTokenResponseData;

import io.netty.buffer.ByteBuf;

/**
* @author Eric Zhao
* @since 1.4.0
*/
public class FlowResponseDataWriter implements EntityWriter<FlowTokenResponseData, ByteBuf> {

@Override
public void writeTo(FlowTokenResponseData entity, ByteBuf out) {
out.writeInt(entity.getRemainingCount());
out.writeInt(entity.getWaitInMs());
}
}

+ 91
- 0
sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/codec/data/ParamFlowRequestDataDecoder.java Целия файл

@@ -0,0 +1,91 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.server.codec.data;

import java.util.ArrayList;
import java.util.List;

import com.alibaba.csp.sentinel.cluster.ClusterConstants;
import com.alibaba.csp.sentinel.cluster.codec.EntityDecoder;
import com.alibaba.csp.sentinel.cluster.request.data.ParamFlowRequestData;

import io.netty.buffer.ByteBuf;

/**
* @author Eric Zhao
*/
public class ParamFlowRequestDataDecoder implements EntityDecoder<ByteBuf, ParamFlowRequestData> {

@Override
public ParamFlowRequestData decode(ByteBuf source) {
if (source.readableBytes() >= 16) {
ParamFlowRequestData requestData = new ParamFlowRequestData()
.setFlowId(source.readLong())
.setCount(source.readInt());

int amount = source.readInt();
if (amount > 0) {
// TODO: should check rules exist here?
List<Object> params = new ArrayList<>(amount);
for (int i = 0; i < amount; i++) {
decodeParam(source, params);
}

requestData.setParams(params);
return requestData;
}
}
// TODO: handle null here.
return null;
}

private boolean decodeParam(ByteBuf source, List<Object> params) {
byte paramType = source.readByte();

switch (paramType) {
case ClusterConstants.PARAM_TYPE_INTEGER:
params.add(source.readInt());
return true;
case ClusterConstants.PARAM_TYPE_STRING:
int length = source.readInt();
byte[] bytes = new byte[length];
source.readBytes(bytes);
// TODO: take care of charset?
params.add(new String(bytes));
return true;
case ClusterConstants.PARAM_TYPE_BOOLEAN:
params.add(source.readBoolean());
return true;
case ClusterConstants.PARAM_TYPE_DOUBLE:
params.add(source.readDouble());
return true;
case ClusterConstants.PARAM_TYPE_LONG:
params.add(source.readLong());
return true;
case ClusterConstants.PARAM_TYPE_FLOAT:
params.add(source.readFloat());
return true;
case ClusterConstants.PARAM_TYPE_BYTE:
params.add(source.readByte());
return true;
case ClusterConstants.PARAM_TYPE_SHORT:
params.add(source.readShort());
return true;
default:
return false;
}
}
}

+ 51
- 0
sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/codec/netty/NettyRequestDecoder.java Целия файл

@@ -0,0 +1,51 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.server.codec.netty;

import java.util.List;

import com.alibaba.csp.sentinel.cluster.codec.request.RequestEntityDecoder;
import com.alibaba.csp.sentinel.cluster.request.Request;
import com.alibaba.csp.sentinel.cluster.server.codec.ServerEntityCodecProvider;
import com.alibaba.csp.sentinel.log.RecordLog;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

/**
* @author Eric Zhao
* @since 1.4.0
*/
public class NettyRequestDecoder extends ByteToMessageDecoder {

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
RequestEntityDecoder<ByteBuf, Request> requestDecoder = ServerEntityCodecProvider.getRequestEntityDecoder();
if (requestDecoder == null) {
// TODO: may need to throw exception?
RecordLog.warn("[NettyRequestDecoder] Cannot resolve the global request entity decoder, "
+ "dropping the request");
return;
}

// TODO: handle decode error here.
Request request = requestDecoder.decode(in);
if (request != null) {
out.add(request);
}
}
}

+ 55
- 0
sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/codec/netty/NettyResponseEncoder.java Целия файл

@@ -0,0 +1,55 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.server.codec.netty;

import com.alibaba.csp.sentinel.cluster.ClusterConstants;
import com.alibaba.csp.sentinel.cluster.codec.EntityWriter;
import com.alibaba.csp.sentinel.cluster.codec.response.ResponseEntityWriter;
import com.alibaba.csp.sentinel.cluster.response.ClusterResponse;
import com.alibaba.csp.sentinel.cluster.response.Response;
import com.alibaba.csp.sentinel.cluster.server.codec.ServerEntityCodecProvider;
import com.alibaba.csp.sentinel.cluster.server.codec.registry.ResponseDataWriterRegistry;
import com.alibaba.csp.sentinel.log.RecordLog;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

/**
* @author Eric Zhao
* @since 1.4.0
*/
public class NettyResponseEncoder extends MessageToByteEncoder<ClusterResponse> {

@Override
protected void encode(ChannelHandlerContext ctx, ClusterResponse response, ByteBuf out) throws Exception {
ResponseEntityWriter<ClusterResponse, ByteBuf> responseEntityWriter = ServerEntityCodecProvider.getResponseEntityWriter();
if (responseEntityWriter == null) {
RecordLog.warn("[NettyResponseEncoder] Cannot resolve the global response entity writer, reply bad status");
writeBadStatusHead(response, out);
return;
}

responseEntityWriter.writeTo(response, out);
}


private void writeBadStatusHead(Response response, ByteBuf out) {
out.writeInt(response.getId());
out.writeByte(ClusterConstants.RESPONSE_STATUS_BAD);
out.writeByte(response.getStatus());
}
}

+ 48
- 0
sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/codec/registry/RequestDataDecodeRegistry.java Целия файл

@@ -0,0 +1,48 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.server.codec.registry;

import java.util.HashMap;
import java.util.Map;

import com.alibaba.csp.sentinel.cluster.codec.EntityDecoder;

import io.netty.buffer.ByteBuf;

/**
* @author Eric Zhao
* @since 1.4.0
*/
public final class RequestDataDecodeRegistry {

private static final Map<Integer, EntityDecoder<ByteBuf, ?>> DECODER_MAP = new HashMap<>();

public static boolean addDecoder(int type, EntityDecoder<ByteBuf, ?> decoder) {
if (DECODER_MAP.containsKey(type)) {
return false;
}
DECODER_MAP.put(type, decoder);
return true;
}

public static EntityDecoder<ByteBuf, Object> getDecoder(int type) {
return (EntityDecoder<ByteBuf, Object>)DECODER_MAP.get(type);
}

public static boolean removeDecoder(int type) {
return DECODER_MAP.remove(type) != null;
}
}

+ 48
- 0
sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/codec/registry/ResponseDataWriterRegistry.java Целия файл

@@ -0,0 +1,48 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.server.codec.registry;

import java.util.HashMap;
import java.util.Map;

import com.alibaba.csp.sentinel.cluster.codec.EntityWriter;

import io.netty.buffer.ByteBuf;

/**
* @author Eric Zhao
* @since 1.4.0
*/
public final class ResponseDataWriterRegistry {

private static final Map<Integer, EntityWriter<Object, ByteBuf>> WRITER_MAP = new HashMap<>();

public static <T> boolean addWriter(int type, EntityWriter<T, ByteBuf> writer) {
if (WRITER_MAP.containsKey(type)) {
return false;
}
WRITER_MAP.put(type, (EntityWriter<Object, ByteBuf>)writer);
return true;
}

public static EntityWriter<Object, ByteBuf> getWriter(int type) {
return WRITER_MAP.get(type);
}

public static boolean remove(int type) {
return WRITER_MAP.remove(type) != null;
}
}

+ 36
- 0
sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/config/ClusterServerConfigManager.java Целия файл

@@ -0,0 +1,36 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.server.config;

/**
* @author Eric Zhao
*/
public final class ClusterServerConfigManager {

private static final int DEFAULT_PORT = 8730;
private static final int DEFAULT_IDLE_SECONDS = 600;

public static volatile int port = DEFAULT_PORT;

public static volatile double exceedCount = 1.0d;
public static volatile boolean borrowRefEnabled = true;
public static volatile int idleSeconds = DEFAULT_IDLE_SECONDS;
public static volatile double maxOccupyRatio = 1.0d;

// TODO: implement here.

private ClusterServerConfigManager() {}
}

+ 37
- 0
sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/Connection.java Целия файл

@@ -0,0 +1,37 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.server.connection;

import java.net.SocketAddress;

/**
* @author xuyue
* @author Eric Zhao
*/
public interface Connection extends AutoCloseable {

SocketAddress getLocalAddress();

int getRemotePort();

String getRemoteIP();

void refreshLastReadTime(long lastReadTime);

long getLastReadTime();

String getConnectionKey();
}

+ 85
- 0
sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/ConnectionGroup.java Целия файл

@@ -0,0 +1,85 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.server.connection;

import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicInteger;

import com.alibaba.csp.sentinel.cluster.server.ServerConstants;
import com.alibaba.csp.sentinel.util.AssertUtil;

/**
* @author Eric Zhao
* @since 1.4.0
*/
public class ConnectionGroup {

private String namespace;

private Set<String> addressSet = new ConcurrentSkipListSet<>();
private Set<String> hostSet = new ConcurrentSkipListSet<>();
private AtomicInteger connectedCount = new AtomicInteger();

public ConnectionGroup(String namespace) {
AssertUtil.notEmpty(namespace, "namespace cannot be empty");
this.namespace = namespace;
}

public ConnectionGroup() {
this(ServerConstants.DEFAULT_NAMESPACE);
}

public ConnectionGroup addConnection(String address) {
AssertUtil.notEmpty(address, "address cannot be empty");

addressSet.add(address);
String[] ip = address.split(":");
if (ip != null && ip.length >= 1) {
hostSet.add(ip[0]);
}
connectedCount.incrementAndGet();
return this;
}

public ConnectionGroup removeConnection(String address) {
AssertUtil.notEmpty(address, "address cannot be empty");

addressSet.remove(address);
String[] ip = address.split(":");
if (ip != null && ip.length >= 1) {
hostSet.remove(ip[0]);
}
connectedCount.decrementAndGet();
return this;
}

public String getNamespace() {
return namespace;
}

public Set<String> getAddressSet() {
return addressSet;
}

public Set<String> getHostSet() {
return hostSet;
}

public int getConnectedCount() {
return connectedCount.get();
}
}

+ 32
- 0
sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/ConnectionManager.java Целия файл

@@ -0,0 +1,32 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.server.connection;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* @author Eric Zhao
* @since 1.4.0
*/
public final class ConnectionManager {

private static final Map<String, ConnectionGroup> CONN_MAP = new ConcurrentHashMap<>();



private ConnectionManager() {}
}

+ 149
- 0
sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/ConnectionPool.java Целия файл

@@ -0,0 +1,149 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.server.connection;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import com.alibaba.csp.sentinel.log.RecordLog;

import io.netty.channel.Channel;

/**
* Universal connection pool for connection management.
*
* @author xuyue
* @author Eric Zhao
* @since 1.4.0
*/
public class ConnectionPool {

private static final ScheduledExecutorService TIMER = Executors.newScheduledThreadPool(2);

/**
* Format: ("ip:port", connection)
*/
private final Map<String, Connection> CONNECTION_MAP = new ConcurrentHashMap<String, Connection>();

/**
* Periodic scan task.
*/
private ScheduledFuture scanTaskFuture = null;

/**
* 创建一个connection,并放入连接池中
*
* @param channel
*/
public void createConnection(Channel channel) {
if (channel != null) {
Connection connection = new NettyConnection(channel, this);

String connKey = getConnectionKey(channel);
CONNECTION_MAP.put(connKey, connection);
}
}

/**
* Start the scan task for long-idle connections.
*/
private synchronized void startScan() {
if (scanTaskFuture == null
|| scanTaskFuture.isCancelled()
|| scanTaskFuture.isDone()) {
scanTaskFuture = TIMER.scheduleAtFixedRate(
new ScanIdleConnectionTask(this), 10, 30, TimeUnit.SECONDS);
}
}

/**
* Format to "ip:port".
*
* @param channel channel
* @return formatted key
*/
private String getConnectionKey(Channel channel) {
InetSocketAddress socketAddress = (InetSocketAddress) channel.remoteAddress();
String remoteIp = socketAddress.getAddress().getHostAddress();
int remotePort = socketAddress.getPort();
return remoteIp + ":" + remotePort;
}

private String getConnectionKey(String ip, int port) {
return ip + ":" + port;
}

/**
* 刷新一个连接上的最新read时间
*
* @param channel
*/
public void refreshLastReadTime(Channel channel) {
if (channel != null) {
String connKey = getConnectionKey(channel);
Connection connection = CONNECTION_MAP.get(connKey);
//不应该为null,需要处理这种情况吗?
if (connection != null) {
connection.refreshLastReadTime(System.currentTimeMillis());
}
}
}

public Connection getConnection(String remoteIp, int remotePort) {
String connKey = getConnectionKey(remoteIp, remotePort);
return CONNECTION_MAP.get(connKey);
}

public void remove(Channel channel) {
String connKey = getConnectionKey(channel);
CONNECTION_MAP.remove(connKey);
}

public List<Connection> listAllConnection() {
List<Connection> connections = new ArrayList<Connection>(CONNECTION_MAP.values());
return connections;
}

public int count(){
return CONNECTION_MAP.size();
}

public void clear() {
CONNECTION_MAP.clear();
}

public void shutdownAll() throws Exception {
for (Connection c : CONNECTION_MAP.values()) {
c.close();
}
}

public void refreshIdleTask() {
if (scanTaskFuture == null || scanTaskFuture.cancel(false)) {
startScan();
}else {
RecordLog.info("The result of canceling scanTask is error.");
}
}
}


+ 85
- 0
sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/NettyConnection.java Целия файл

@@ -0,0 +1,85 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.server.connection;

import java.net.InetSocketAddress;
import java.net.SocketAddress;

import io.netty.channel.Channel;

/**
* @author xuyue
*/
public class NettyConnection implements Connection {

private String remoteIp;
private int remotePort;
private Channel channel;

private long lastReadTime;

private ConnectionPool pool;

public NettyConnection(Channel channel, ConnectionPool pool) {
this.channel = channel;
this.pool = pool;

InetSocketAddress socketAddress = (InetSocketAddress) channel.remoteAddress();
this.remoteIp = socketAddress.getAddress().getHostAddress();
this.remotePort = socketAddress.getPort();
this.lastReadTime = System.currentTimeMillis();
}

@Override
public SocketAddress getLocalAddress() {
return channel.localAddress();
}

@Override
public int getRemotePort() {
return remotePort;
}

@Override
public String getRemoteIP() {
return remoteIp;
}

@Override
public void refreshLastReadTime(long lastReadTime) {
this.lastReadTime = lastReadTime;
}

@Override
public long getLastReadTime() {
return lastReadTime;
}

@Override
public String getConnectionKey() {
return remoteIp + ":" + remotePort;
}

@Override
public void close() {
// Remove from connection pool.
pool.remove(channel);
// Close the connection.
if (channel != null && channel.isActive()){
channel.close();
}
}
}

+ 43
- 0
sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/connection/ScanIdleConnectionTask.java Целия файл

@@ -0,0 +1,43 @@
package com.alibaba.csp.sentinel.cluster.server.connection;

import java.util.List;

import com.alibaba.csp.sentinel.cluster.server.config.ClusterServerConfigManager;
import com.alibaba.csp.sentinel.log.RecordLog;

/**
* @author xuyue
* @author Eric Zhao
*/
public class ScanIdleConnectionTask implements Runnable {

private ConnectionPool connectionPool;

public ScanIdleConnectionTask(ConnectionPool connectionPool) {
this.connectionPool = connectionPool;
}

@Override
public void run() {
try {
int idleSeconds = ClusterServerConfigManager.idleSeconds;
long idleTime = idleSeconds * 1000;
if (idleTime < 0) {
idleTime = 600 * 1000;
}
long now = System.currentTimeMillis();
List<Connection> connections = connectionPool.listAllConnection();
for (Connection conn : connections) {
if ((now - conn.getLastReadTime()) > idleTime) {
RecordLog.info(
String.format("[ScanIdleConnectionTask] The connection <%s:%d> has been idle for <%d>s. "
+ "It will be closed now.", conn.getRemoteIP(), conn.getRemotePort(), idleSeconds)
);
conn.close();
}
}
} catch (Throwable t) {
// TODO: should log here.
}
}
}

+ 84
- 0
sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/handler/TokenServerHandler.java Целия файл

@@ -0,0 +1,84 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.server.handler;

import com.alibaba.csp.sentinel.cluster.ClusterConstants;
import com.alibaba.csp.sentinel.cluster.request.ClusterRequest;
import com.alibaba.csp.sentinel.cluster.response.ClusterResponse;
import com.alibaba.csp.sentinel.cluster.response.data.FlowTokenResponseData;
import com.alibaba.csp.sentinel.cluster.server.connection.ConnectionPool;
import com.alibaba.csp.sentinel.cluster.server.processor.RequestProcessor;
import com.alibaba.csp.sentinel.cluster.server.processor.RequestProcessorRegistry;
import com.alibaba.csp.sentinel.log.RecordLog;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
* Netty server handler for Sentinel token server.
*
* @author Eric Zhao
* @since 1.4.0
*/
public class TokenServerHandler extends ChannelInboundHandlerAdapter {

private final ConnectionPool connectionPool;

public TokenServerHandler(ConnectionPool connectionPool) {
this.connectionPool = connectionPool;
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("[TokenServerHandler] Connection established");
super.channelActive(ctx);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("[TokenServerHandler] Connection inactive");
super.channelInactive(ctx);
}

@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
connectionPool.refreshLastReadTime(ctx.channel());
System.out.println(String.format("[%s] Server message recv: %s", System.currentTimeMillis(), msg));
if (msg instanceof ClusterRequest) {
ClusterRequest request = (ClusterRequest)msg;

RequestProcessor<?, ?> processor = RequestProcessorRegistry.getProcessor(request.getType());
if (processor == null) {
System.out.println("[TokenServerHandler] No processor for request type: " + request.getType());
writeNoProcessorResponse(ctx, request);
} else {
ClusterResponse<?> response = processor.processRequest(request);
writeResponse(ctx, response);
}
}
}

private void writeNoProcessorResponse(ChannelHandlerContext ctx, ClusterRequest request) {
ClusterResponse<?> response = new ClusterResponse<>(request.getId(), request.getType(),
ClusterConstants.RESPONSE_STATUS_BAD, null);
writeResponse(ctx, response);
}

private void writeResponse(ChannelHandlerContext ctx, ClusterResponse response) {
ctx.writeAndFlush(response);
}
}

+ 51
- 0
sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/processor/FlowRequestProcessor.java Целия файл

@@ -0,0 +1,51 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.server.processor;

import com.alibaba.csp.sentinel.cluster.TokenResult;
import com.alibaba.csp.sentinel.cluster.TokenService;
import com.alibaba.csp.sentinel.cluster.request.ClusterRequest;
import com.alibaba.csp.sentinel.cluster.request.data.FlowRequestData;
import com.alibaba.csp.sentinel.cluster.response.ClusterResponse;
import com.alibaba.csp.sentinel.cluster.response.data.FlowTokenResponseData;
import com.alibaba.csp.sentinel.cluster.server.TokenServiceProvider;

/**
* @author Eric Zhao
* @since 1.4.0
*/
public class FlowRequestProcessor implements RequestProcessor<FlowRequestData, FlowTokenResponseData> {

@Override
public ClusterResponse<FlowTokenResponseData> processRequest(ClusterRequest<FlowRequestData> request) {
TokenService tokenService = TokenServiceProvider.getService();

long flowId = request.getData().getFlowId();
int count = request.getData().getCount();
boolean prioritized = request.getData().isPriority();

TokenResult result = tokenService.requestToken(flowId, count, prioritized);
return toResponse(result, request);
}

private ClusterResponse<FlowTokenResponseData> toResponse(TokenResult result, ClusterRequest request) {
return new ClusterResponse<>(request.getId(), request.getType(), result.getStatus(),
new FlowTokenResponseData()
.setRemainingCount(result.getRemaining())
.setWaitInMs(result.getWaitInMs())
);
}
}

+ 53
- 0
sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/processor/ParamFlowRequestProcessor.java Целия файл

@@ -0,0 +1,53 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.server.processor;

import java.util.Collection;

import com.alibaba.csp.sentinel.cluster.TokenResult;
import com.alibaba.csp.sentinel.cluster.TokenService;
import com.alibaba.csp.sentinel.cluster.request.ClusterRequest;
import com.alibaba.csp.sentinel.cluster.request.data.ParamFlowRequestData;
import com.alibaba.csp.sentinel.cluster.response.ClusterResponse;
import com.alibaba.csp.sentinel.cluster.response.data.FlowTokenResponseData;
import com.alibaba.csp.sentinel.cluster.server.TokenServiceProvider;

/**
* @author Eric Zhao
* @since 1.4.0
*/
public class ParamFlowRequestProcessor implements RequestProcessor<ParamFlowRequestData, FlowTokenResponseData> {

@Override
public ClusterResponse<FlowTokenResponseData> processRequest(ClusterRequest<ParamFlowRequestData> request) {
TokenService tokenService = TokenServiceProvider.getService();

long flowId = request.getData().getFlowId();
int count = request.getData().getCount();
Collection<Object> args = request.getData().getParams();

TokenResult result = tokenService.requestParamToken(flowId, count, args);
return toResponse(result, request);
}

private ClusterResponse<FlowTokenResponseData> toResponse(TokenResult result, ClusterRequest request) {
return new ClusterResponse<>(request.getId(), request.getType(), result.getStatus(),
new FlowTokenResponseData()
.setRemainingCount(result.getRemaining())
.setWaitInMs(0)
);
}
}

+ 38
- 0
sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/processor/RequestProcessor.java Целия файл

@@ -0,0 +1,38 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.server.processor;

import com.alibaba.csp.sentinel.cluster.request.ClusterRequest;
import com.alibaba.csp.sentinel.cluster.response.ClusterResponse;

/**
* Interface of cluster request processor.
*
* @param <T> type of request body
* @param <R> type of response body
* @author Eric Zhao
* @since 1.4.0
*/
public interface RequestProcessor<T, R> {

/**
* Process the cluster request.
*
* @param request Sentinel cluster request
* @return the response after processed
*/
ClusterResponse<R> processRequest(ClusterRequest<T> request);
}

+ 49
- 0
sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/processor/RequestProcessorRegistry.java Целия файл

@@ -0,0 +1,49 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.server.processor;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import com.alibaba.csp.sentinel.util.AssertUtil;

/**
* @author Eric Zhao
* @since 1.4.0
*/
public final class RequestProcessorRegistry {

private static final Map<Integer, RequestProcessor> PROCESSOR_MAP = new ConcurrentHashMap<>();

public static RequestProcessor getProcessor(int type) {
return PROCESSOR_MAP.get(type);
}

public static void addProcessorIfAbsent(int type, RequestProcessor processor) {
// TBD: use putIfAbsent in JDK 1.8.
if (PROCESSOR_MAP.containsKey(type)) {
return;
}
PROCESSOR_MAP.put(type, processor);
}

public static void addProcessor(int type, RequestProcessor processor) {
AssertUtil.notNull(processor, "processor cannot be null");
PROCESSOR_MAP.put(type, processor);
}

private RequestProcessorRegistry() {}
}

+ 28
- 0
sentinel-cluster/sentinel-cluster-server-default/src/main/java/com/alibaba/csp/sentinel/cluster/server/util/ClusterRuleUtil.java Целия файл

@@ -0,0 +1,28 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.server.util;

/**
* @author Eric Zhao
*/
public final class ClusterRuleUtil {

public static boolean validId(Long id) {
return id != null && id > 0;
}

private ClusterRuleUtil() {}
}

+ 1
- 0
sentinel-cluster/sentinel-cluster-server-default/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.cluster.TokenService Целия файл

@@ -0,0 +1 @@
com.alibaba.csp.sentinel.cluster.flow.DefaultTokenService

+ 1
- 0
sentinel-cluster/sentinel-cluster-server-default/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.cluster.codec.request.RequestEntityDecoder Целия файл

@@ -0,0 +1 @@
com.alibaba.csp.sentinel.cluster.server.codec.DefaultRequestEntityDecoder

+ 1
- 0
sentinel-cluster/sentinel-cluster-server-default/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.cluster.codec.response.ResponseEntityWriter Целия файл

@@ -0,0 +1 @@
com.alibaba.csp.sentinel.cluster.server.codec.DefaultResponseEntityWriter

Loading…
Отказ
Запис