From 8096b22eb155f93a64ee7a6bc6bb1617d17a49b5 Mon Sep 17 00:00:00 2001 From: Eric Zhao Date: Tue, 18 Dec 2018 19:58:08 +0800 Subject: [PATCH] Add cluster embedded mode handling logic for parameter flow checker Signed-off-by: Eric Zhao --- .../block/flow/param/ParamFlowChecker.java | 26 ++++++++++++++----- 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowChecker.java b/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowChecker.java index dfacaf28..27df8ccb 100644 --- a/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowChecker.java +++ b/sentinel-extension/sentinel-parameter-flow-control/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/param/ParamFlowChecker.java @@ -22,10 +22,12 @@ import java.util.Collections; import java.util.List; import java.util.Set; -import com.alibaba.csp.sentinel.cluster.client.ClusterTokenClient; +import com.alibaba.csp.sentinel.cluster.ClusterStateManager; +import com.alibaba.csp.sentinel.cluster.TokenService; import com.alibaba.csp.sentinel.cluster.client.TokenClientProvider; import com.alibaba.csp.sentinel.cluster.TokenResult; import com.alibaba.csp.sentinel.cluster.TokenResultStatus; +import com.alibaba.csp.sentinel.cluster.server.EmbeddedClusterTokenServerProvider; import com.alibaba.csp.sentinel.log.RecordLog; import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; import com.alibaba.csp.sentinel.slots.block.RuleConstant; @@ -134,13 +136,15 @@ final class ParamFlowChecker { private static boolean passClusterCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count, Object value) { try { - ClusterTokenClient client = TokenClientProvider.getClient(); - if (client == null) { - return true; - } Collection params = toCollection(value); - TokenResult result = client.requestParamToken(rule.getClusterConfig().getFlowId(), count, params); + TokenService clusterService = pickClusterService(); + if (clusterService == null) { + // No available cluster client or server, fallback to local or pass in need. + return fallbackToLocalOrPass(resourceWrapper, rule, count, params); + } + + TokenResult result = clusterService.requestParamToken(rule.getClusterConfig().getFlowId(), count, params); switch (result.getStatus()) { case TokenResultStatus.OK: return true; @@ -165,5 +169,15 @@ final class ParamFlowChecker { } } + private static TokenService pickClusterService() { + if (ClusterStateManager.isClient()) { + return TokenClientProvider.getClient(); + } + if (ClusterStateManager.isServer()) { + return EmbeddedClusterTokenServerProvider.getServer(); + } + return null; + } + private ParamFlowChecker() {} }