Add basic interface and adaptations for Sentinel cluster flow control in `sentinel-core` (#257)master
@@ -0,0 +1,32 @@ | |||||
/* | |||||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||||
* | |||||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||||
* you may not use this file except in compliance with the License. | |||||
* You may obtain a copy of the License at | |||||
* | |||||
* http://www.apache.org/licenses/LICENSE-2.0 | |||||
* | |||||
* Unless required by applicable law or agreed to in writing, software | |||||
* distributed under the License is distributed on an "AS IS" BASIS, | |||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
* See the License for the specific language governing permissions and | |||||
* limitations under the License. | |||||
*/ | |||||
package com.alibaba.csp.sentinel.cluster; | |||||
/** | |||||
* Token client interface for distributed flow control. | |||||
* | |||||
* @author Eric Zhao | |||||
* @since 1.4.0 | |||||
*/ | |||||
public interface ClusterTokenClient extends TokenService { | |||||
/** | |||||
* Get descriptor of current token server. | |||||
* | |||||
* @return current token server | |||||
*/ | |||||
TokenServerDescriptor currentServer(); | |||||
} |
@@ -0,0 +1,61 @@ | |||||
/* | |||||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||||
* | |||||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||||
* you may not use this file except in compliance with the License. | |||||
* You may obtain a copy of the License at | |||||
* | |||||
* http://www.apache.org/licenses/LICENSE-2.0 | |||||
* | |||||
* Unless required by applicable law or agreed to in writing, software | |||||
* distributed under the License is distributed on an "AS IS" BASIS, | |||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
* See the License for the specific language governing permissions and | |||||
* limitations under the License. | |||||
*/ | |||||
package com.alibaba.csp.sentinel.cluster; | |||||
import java.util.ArrayList; | |||||
import java.util.List; | |||||
import java.util.ServiceLoader; | |||||
import com.alibaba.csp.sentinel.log.RecordLog; | |||||
/** | |||||
* Provider for a universal {@link ClusterTokenClient} instance. | |||||
* | |||||
* @author Eric Zhao | |||||
* @since 1.4.0 | |||||
*/ | |||||
public final class TokenClientProvider { | |||||
private static ClusterTokenClient client = null; | |||||
private static final ServiceLoader<ClusterTokenClient> LOADER = ServiceLoader.load(ClusterTokenClient.class); | |||||
static { | |||||
// Not strictly thread-safe, but it's OK since it will be resolved only once. | |||||
resolveTokenClientInstance(); | |||||
} | |||||
public static ClusterTokenClient getClient() { | |||||
return client; | |||||
} | |||||
private static void resolveTokenClientInstance() { | |||||
List<ClusterTokenClient> clients = new ArrayList<ClusterTokenClient>(); | |||||
for (ClusterTokenClient client : LOADER) { | |||||
clients.add(client); | |||||
} | |||||
if (!clients.isEmpty()) { | |||||
// Get first. | |||||
client = clients.get(0); | |||||
RecordLog.info("[TokenClientProvider] Token client resolved: " + client.getClass().getCanonicalName()); | |||||
} else { | |||||
RecordLog.warn("[TokenClientProvider] No existing token client, resolve failed"); | |||||
} | |||||
} | |||||
private TokenClientProvider() {} | |||||
} |
@@ -0,0 +1,86 @@ | |||||
/* | |||||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||||
* | |||||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||||
* you may not use this file except in compliance with the License. | |||||
* You may obtain a copy of the License at | |||||
* | |||||
* http://www.apache.org/licenses/LICENSE-2.0 | |||||
* | |||||
* Unless required by applicable law or agreed to in writing, software | |||||
* distributed under the License is distributed on an "AS IS" BASIS, | |||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
* See the License for the specific language governing permissions and | |||||
* limitations under the License. | |||||
*/ | |||||
package com.alibaba.csp.sentinel.cluster; | |||||
import java.util.Map; | |||||
/** | |||||
* Result entity of acquiring cluster flow token. | |||||
* | |||||
* @author Eric Zhao | |||||
* @since 1.4.0 | |||||
*/ | |||||
public class TokenResult { | |||||
private Integer status; | |||||
private int remaining; | |||||
private int waitInMs; | |||||
private Map<String, String> attachments; | |||||
public TokenResult() {} | |||||
public TokenResult(Integer status) { | |||||
this.status = status; | |||||
} | |||||
public Integer getStatus() { | |||||
return status; | |||||
} | |||||
public TokenResult setStatus(Integer status) { | |||||
this.status = status; | |||||
return this; | |||||
} | |||||
public int getRemaining() { | |||||
return remaining; | |||||
} | |||||
public TokenResult setRemaining(int remaining) { | |||||
this.remaining = remaining; | |||||
return this; | |||||
} | |||||
public int getWaitInMs() { | |||||
return waitInMs; | |||||
} | |||||
public TokenResult setWaitInMs(int waitInMs) { | |||||
this.waitInMs = waitInMs; | |||||
return this; | |||||
} | |||||
public Map<String, String> getAttachments() { | |||||
return attachments; | |||||
} | |||||
public TokenResult setAttachments(Map<String, String> attachments) { | |||||
this.attachments = attachments; | |||||
return this; | |||||
} | |||||
@Override | |||||
public String toString() { | |||||
return "TokenResult{" + | |||||
"status=" + status + | |||||
", remaining=" + remaining + | |||||
", waitInMs=" + waitInMs + | |||||
", attachments=" + attachments + | |||||
'}'; | |||||
} | |||||
} |
@@ -0,0 +1,60 @@ | |||||
/* | |||||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||||
* | |||||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||||
* you may not use this file except in compliance with the License. | |||||
* You may obtain a copy of the License at | |||||
* | |||||
* http://www.apache.org/licenses/LICENSE-2.0 | |||||
* | |||||
* Unless required by applicable law or agreed to in writing, software | |||||
* distributed under the License is distributed on an "AS IS" BASIS, | |||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
* See the License for the specific language governing permissions and | |||||
* limitations under the License. | |||||
*/ | |||||
package com.alibaba.csp.sentinel.cluster; | |||||
/** | |||||
* @author Eric Zhao | |||||
* @since 1.4.0 | |||||
*/ | |||||
public final class TokenResultStatus { | |||||
/** | |||||
* Bad client request. | |||||
*/ | |||||
public static final int BAD_REQUEST = -4; | |||||
/** | |||||
* Server or client unexpected failure (due to transport or serialization failure). | |||||
*/ | |||||
public static final int FAIL = -1; | |||||
/** | |||||
* Token acquired. | |||||
*/ | |||||
public static final int OK = 0; | |||||
/** | |||||
* Token acquire failed (blocked). | |||||
*/ | |||||
public static final int BLOCKED = 1; | |||||
/** | |||||
* Should wait for next buckets. | |||||
*/ | |||||
public static final int SHOULD_WAIT = 2; | |||||
/** | |||||
* Token acquire failed (no rule exists). | |||||
*/ | |||||
public static final int NO_RULE_EXISTS = 3; | |||||
/** | |||||
* Token acquire failed (reference resource is not available). | |||||
*/ | |||||
public static final int NO_REF_RULE_EXISTS = 4; | |||||
/** | |||||
* Token acquire failed (strategy not available). | |||||
*/ | |||||
public static final int NOT_AVAILABLE = 5; | |||||
private TokenResultStatus() {} | |||||
} |
@@ -0,0 +1,72 @@ | |||||
/* | |||||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||||
* | |||||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||||
* you may not use this file except in compliance with the License. | |||||
* You may obtain a copy of the License at | |||||
* | |||||
* http://www.apache.org/licenses/LICENSE-2.0 | |||||
* | |||||
* Unless required by applicable law or agreed to in writing, software | |||||
* distributed under the License is distributed on an "AS IS" BASIS, | |||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
* See the License for the specific language governing permissions and | |||||
* limitations under the License. | |||||
*/ | |||||
package com.alibaba.csp.sentinel.cluster; | |||||
/** | |||||
* A simple descriptor for Sentinel token server. | |||||
* | |||||
* @author Eric Zhao | |||||
* @since 1.4.0 | |||||
*/ | |||||
public class TokenServerDescriptor { | |||||
private String host; | |||||
private int port; | |||||
private String type; | |||||
public TokenServerDescriptor() {} | |||||
public TokenServerDescriptor(String host, int port) { | |||||
this.host = host; | |||||
this.port = port; | |||||
} | |||||
public String getHost() { | |||||
return host; | |||||
} | |||||
public TokenServerDescriptor setHost(String host) { | |||||
this.host = host; | |||||
return this; | |||||
} | |||||
public int getPort() { | |||||
return port; | |||||
} | |||||
public TokenServerDescriptor setPort(int port) { | |||||
this.port = port; | |||||
return this; | |||||
} | |||||
public String getType() { | |||||
return type; | |||||
} | |||||
public TokenServerDescriptor setType(String type) { | |||||
this.type = type; | |||||
return this; | |||||
} | |||||
@Override | |||||
public String toString() { | |||||
return "TokenServerDescriptor{" + | |||||
"host='" + host + '\'' + | |||||
", port=" + port + | |||||
", type='" + type + '\'' + | |||||
'}'; | |||||
} | |||||
} |
@@ -0,0 +1,47 @@ | |||||
/* | |||||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||||
* | |||||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||||
* you may not use this file except in compliance with the License. | |||||
* You may obtain a copy of the License at | |||||
* | |||||
* http://www.apache.org/licenses/LICENSE-2.0 | |||||
* | |||||
* Unless required by applicable law or agreed to in writing, software | |||||
* distributed under the License is distributed on an "AS IS" BASIS, | |||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
* See the License for the specific language governing permissions and | |||||
* limitations under the License. | |||||
*/ | |||||
package com.alibaba.csp.sentinel.cluster; | |||||
import java.util.Collection; | |||||
/** | |||||
* Service interface of flow control. | |||||
* | |||||
* @author Eric Zhao | |||||
* @since 1.4.0 | |||||
*/ | |||||
public interface TokenService { | |||||
/** | |||||
* Request tokens from remote token server. | |||||
* | |||||
* @param ruleId the unique rule ID | |||||
* @param acquireCount token count to acquire | |||||
* @param prioritized whether the request is prioritized | |||||
* @return result of the token request | |||||
*/ | |||||
TokenResult requestToken(Long ruleId, int acquireCount, boolean prioritized); | |||||
/** | |||||
* Request tokens for a specific parameter from remote token server. | |||||
* | |||||
* @param ruleId the unique rule ID | |||||
* @param acquireCount token count to acquire | |||||
* @param params parameter list | |||||
* @return result of the token request | |||||
*/ | |||||
TokenResult requestParamToken(Long ruleId, int acquireCount, Collection<Object> params); | |||||
} |
@@ -0,0 +1,59 @@ | |||||
/* | |||||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||||
* | |||||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||||
* you may not use this file except in compliance with the License. | |||||
* You may obtain a copy of the License at | |||||
* | |||||
* http://www.apache.org/licenses/LICENSE-2.0 | |||||
* | |||||
* Unless required by applicable law or agreed to in writing, software | |||||
* distributed under the License is distributed on an "AS IS" BASIS, | |||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
* See the License for the specific language governing permissions and | |||||
* limitations under the License. | |||||
*/ | |||||
package com.alibaba.csp.sentinel.cluster.log; | |||||
import java.io.File; | |||||
import com.alibaba.csp.sentinel.eagleeye.EagleEye; | |||||
import com.alibaba.csp.sentinel.eagleeye.StatLogger; | |||||
import com.alibaba.csp.sentinel.log.LogBase; | |||||
/** | |||||
* @author jialiang.linjl | |||||
* @author Eric Zhao | |||||
* @since 1.4.0 | |||||
*/ | |||||
public final class ClusterStatLogUtil { | |||||
private static final String FILE_NAME = "sentinel-cluster.log"; | |||||
private static StatLogger statLogger; | |||||
static { | |||||
String path = LogBase.getLogBaseDir() + FILE_NAME; | |||||
statLogger = EagleEye.statLoggerBuilder("sentinel-cluster-record") | |||||
.intervalSeconds(1) | |||||
.entryDelimiter('|') | |||||
.keyDelimiter(',') | |||||
.valueDelimiter(',') | |||||
.maxEntryCount(5000) | |||||
.configLogFilePath(path) | |||||
.maxFileSizeMB(300) | |||||
.maxBackupIndex(3) | |||||
.buildSingleton(); | |||||
} | |||||
public static void log(String msg) { | |||||
statLogger.stat(msg).count(); | |||||
} | |||||
public static void log(String msg, int count) { | |||||
statLogger.stat(msg).count(count); | |||||
} | |||||
private ClusterStatLogUtil() {} | |||||
} |
@@ -0,0 +1,31 @@ | |||||
/* | |||||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||||
* | |||||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||||
* you may not use this file except in compliance with the License. | |||||
* You may obtain a copy of the License at | |||||
* | |||||
* http://www.apache.org/licenses/LICENSE-2.0 | |||||
* | |||||
* Unless required by applicable law or agreed to in writing, software | |||||
* distributed under the License is distributed on an "AS IS" BASIS, | |||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
* See the License for the specific language governing permissions and | |||||
* limitations under the License. | |||||
*/ | |||||
package com.alibaba.csp.sentinel.slots.block; | |||||
/** | |||||
* @author Eric Zhao | |||||
* @since 1.4.0 | |||||
*/ | |||||
public final class ClusterRuleConstant { | |||||
public static final int FLOW_CLUSTER_STRATEGY_NORMAL = 0; | |||||
public static final int FLOW_CLUSTER_STRATEGY_REF = 1; | |||||
public static final int FLOW_THRESHOLD_AVG_LOCAL = 0; | |||||
public static final int FLOW_THRESHOLD_GLOBAL = 1; | |||||
private ClusterRuleConstant() {} | |||||
} |
@@ -0,0 +1,154 @@ | |||||
/* | |||||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||||
* | |||||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||||
* you may not use this file except in compliance with the License. | |||||
* You may obtain a copy of the License at | |||||
* | |||||
* http://www.apache.org/licenses/LICENSE-2.0 | |||||
* | |||||
* Unless required by applicable law or agreed to in writing, software | |||||
* distributed under the License is distributed on an "AS IS" BASIS, | |||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
* See the License for the specific language governing permissions and | |||||
* limitations under the License. | |||||
*/ | |||||
package com.alibaba.csp.sentinel.slots.block.flow; | |||||
import com.alibaba.csp.sentinel.slots.block.ClusterRuleConstant; | |||||
/** | |||||
* Flow rule config in cluster mode. | |||||
* | |||||
* @author Eric Zhao | |||||
* @since 1.4.0 | |||||
*/ | |||||
public class ClusterFlowConfig { | |||||
/** | |||||
* Global unique ID. | |||||
*/ | |||||
private Long flowId; | |||||
/** | |||||
* Threshold type (average by local value or global value). | |||||
*/ | |||||
private int thresholdType = ClusterRuleConstant.FLOW_THRESHOLD_AVG_LOCAL; | |||||
private boolean fallbackToLocalWhenFail; | |||||
/** | |||||
* 0: normal; 1: using reference (borrow from reference). | |||||
*/ | |||||
private int strategy = ClusterRuleConstant.FLOW_CLUSTER_STRATEGY_NORMAL; | |||||
private Long refFlowId; | |||||
private int refSampleCount = 10; | |||||
private double refRatio = 1d; | |||||
public Long getFlowId() { | |||||
return flowId; | |||||
} | |||||
public ClusterFlowConfig setFlowId(Long flowId) { | |||||
this.flowId = flowId; | |||||
return this; | |||||
} | |||||
public int getThresholdType() { | |||||
return thresholdType; | |||||
} | |||||
public ClusterFlowConfig setThresholdType(int thresholdType) { | |||||
this.thresholdType = thresholdType; | |||||
return this; | |||||
} | |||||
public int getStrategy() { | |||||
return strategy; | |||||
} | |||||
public ClusterFlowConfig setStrategy(int strategy) { | |||||
this.strategy = strategy; | |||||
return this; | |||||
} | |||||
public Long getRefFlowId() { | |||||
return refFlowId; | |||||
} | |||||
public ClusterFlowConfig setRefFlowId(Long refFlowId) { | |||||
this.refFlowId = refFlowId; | |||||
return this; | |||||
} | |||||
public int getRefSampleCount() { | |||||
return refSampleCount; | |||||
} | |||||
public ClusterFlowConfig setRefSampleCount(int refSampleCount) { | |||||
this.refSampleCount = refSampleCount; | |||||
return this; | |||||
} | |||||
public double getRefRatio() { | |||||
return refRatio; | |||||
} | |||||
public ClusterFlowConfig setRefRatio(double refRatio) { | |||||
this.refRatio = refRatio; | |||||
return this; | |||||
} | |||||
public boolean isFallbackToLocalWhenFail() { | |||||
return fallbackToLocalWhenFail; | |||||
} | |||||
public ClusterFlowConfig setFallbackToLocalWhenFail(boolean fallbackToLocalWhenFail) { | |||||
this.fallbackToLocalWhenFail = fallbackToLocalWhenFail; | |||||
return this; | |||||
} | |||||
@Override | |||||
public boolean equals(Object o) { | |||||
if (this == o) { return true; } | |||||
if (o == null || getClass() != o.getClass()) { return false; } | |||||
ClusterFlowConfig that = (ClusterFlowConfig)o; | |||||
if (thresholdType != that.thresholdType) { return false; } | |||||
if (fallbackToLocalWhenFail != that.fallbackToLocalWhenFail) { return false; } | |||||
if (strategy != that.strategy) { return false; } | |||||
if (refSampleCount != that.refSampleCount) { return false; } | |||||
if (Double.compare(that.refRatio, refRatio) != 0) { return false; } | |||||
if (flowId != null ? !flowId.equals(that.flowId) : that.flowId != null) { return false; } | |||||
return refFlowId != null ? refFlowId.equals(that.refFlowId) : that.refFlowId == null; | |||||
} | |||||
@Override | |||||
public int hashCode() { | |||||
int result; | |||||
long temp; | |||||
result = flowId != null ? flowId.hashCode() : 0; | |||||
result = 31 * result + thresholdType; | |||||
result = 31 * result + (fallbackToLocalWhenFail ? 1 : 0); | |||||
result = 31 * result + strategy; | |||||
result = 31 * result + (refFlowId != null ? refFlowId.hashCode() : 0); | |||||
result = 31 * result + refSampleCount; | |||||
temp = Double.doubleToLongBits(refRatio); | |||||
result = 31 * result + (int)(temp ^ (temp >>> 32)); | |||||
return result; | |||||
} | |||||
@Override | |||||
public String toString() { | |||||
return "ClusterFlowConfig{" + | |||||
"flowId=" + flowId + | |||||
", thresholdType=" + thresholdType + | |||||
", fallbackToLocalWhenFail=" + fallbackToLocalWhenFail + | |||||
", strategy=" + strategy + | |||||
", refFlowId=" + refFlowId + | |||||
", refSampleCount=" + refSampleCount + | |||||
", refRatio=" + refRatio + | |||||
'}'; | |||||
} | |||||
} |
@@ -85,6 +85,12 @@ public class FlowRule extends AbstractRule { | |||||
*/ | */ | ||||
private int maxQueueingTimeMs = 500; | private int maxQueueingTimeMs = 500; | ||||
private boolean clusterMode; | |||||
/** | |||||
* Flow rule config for cluster mode. | |||||
*/ | |||||
private ClusterFlowConfig clusterConfig; | |||||
/** | /** | ||||
* The traffic shaping (throttling) controller. | * The traffic shaping (throttling) controller. | ||||
*/ | */ | ||||
@@ -162,6 +168,24 @@ public class FlowRule extends AbstractRule { | |||||
return this; | return this; | ||||
} | } | ||||
public boolean isClusterMode() { | |||||
return clusterMode; | |||||
} | |||||
public FlowRule setClusterMode(boolean clusterMode) { | |||||
this.clusterMode = clusterMode; | |||||
return this; | |||||
} | |||||
public ClusterFlowConfig getClusterConfig() { | |||||
return clusterConfig; | |||||
} | |||||
public FlowRule setClusterConfig(ClusterFlowConfig clusterConfig) { | |||||
this.clusterConfig = clusterConfig; | |||||
return this; | |||||
} | |||||
@Override | @Override | ||||
public boolean passCheck(Context context, DefaultNode node, int acquireCount, Object... args) { | public boolean passCheck(Context context, DefaultNode node, int acquireCount, Object... args) { | ||||
return true; | return true; | ||||
@@ -169,43 +193,21 @@ public class FlowRule extends AbstractRule { | |||||
@Override | @Override | ||||
public boolean equals(Object o) { | public boolean equals(Object o) { | ||||
if (this == o) { | |||||
return true; | |||||
} | |||||
if (!(o instanceof FlowRule)) { | |||||
return false; | |||||
} | |||||
if (!super.equals(o)) { | |||||
return false; | |||||
} | |||||
FlowRule flowRule = (FlowRule)o; | |||||
if (grade != flowRule.grade) { | |||||
return false; | |||||
} | |||||
if (Double.compare(flowRule.count, count) != 0) { | |||||
return false; | |||||
} | |||||
if (strategy != flowRule.strategy) { | |||||
return false; | |||||
} | |||||
if (refResource != null ? !refResource.equals(flowRule.refResource) : flowRule.refResource != null) { | |||||
return false; | |||||
} | |||||
if (this.controlBehavior != flowRule.controlBehavior) { | |||||
return false; | |||||
} | |||||
if (warmUpPeriodSec != flowRule.warmUpPeriodSec) { | |||||
return false; | |||||
} | |||||
if (maxQueueingTimeMs != flowRule.maxQueueingTimeMs) { | |||||
return false; | |||||
} | |||||
if (this == o) { return true; } | |||||
if (o == null || getClass() != o.getClass()) { return false; } | |||||
if (!super.equals(o)) { return false; } | |||||
return true; | |||||
FlowRule rule = (FlowRule)o; | |||||
if (grade != rule.grade) { return false; } | |||||
if (Double.compare(rule.count, count) != 0) { return false; } | |||||
if (strategy != rule.strategy) { return false; } | |||||
if (controlBehavior != rule.controlBehavior) { return false; } | |||||
if (warmUpPeriodSec != rule.warmUpPeriodSec) { return false; } | |||||
if (maxQueueingTimeMs != rule.maxQueueingTimeMs) { return false; } | |||||
if (clusterMode != rule.clusterMode) { return false; } | |||||
if (refResource != null ? !refResource.equals(rule.refResource) : rule.refResource != null) { return false; } | |||||
return clusterConfig != null ? clusterConfig.equals(rule.clusterConfig) : rule.clusterConfig == null; | |||||
} | } | ||||
@Override | @Override | ||||
@@ -217,10 +219,11 @@ public class FlowRule extends AbstractRule { | |||||
result = 31 * result + (int)(temp ^ (temp >>> 32)); | result = 31 * result + (int)(temp ^ (temp >>> 32)); | ||||
result = 31 * result + strategy; | result = 31 * result + strategy; | ||||
result = 31 * result + (refResource != null ? refResource.hashCode() : 0); | result = 31 * result + (refResource != null ? refResource.hashCode() : 0); | ||||
result = 31 * result + (int)(temp ^ (temp >>> 32)); | |||||
result = 31 * result + warmUpPeriodSec; | |||||
result = 31 * result + controlBehavior; | result = 31 * result + controlBehavior; | ||||
result = 31 * result + warmUpPeriodSec; | |||||
result = 31 * result + maxQueueingTimeMs; | result = 31 * result + maxQueueingTimeMs; | ||||
result = 31 * result + (clusterMode ? 1 : 0); | |||||
result = 31 * result + (clusterConfig != null ? clusterConfig.hashCode() : 0); | |||||
return result; | return result; | ||||
} | } | ||||
@@ -236,7 +239,9 @@ public class FlowRule extends AbstractRule { | |||||
", controlBehavior=" + controlBehavior + | ", controlBehavior=" + controlBehavior + | ||||
", warmUpPeriodSec=" + warmUpPeriodSec + | ", warmUpPeriodSec=" + warmUpPeriodSec + | ||||
", maxQueueingTimeMs=" + maxQueueingTimeMs + | ", maxQueueingTimeMs=" + maxQueueingTimeMs + | ||||
", clusterMode=" + clusterMode + | |||||
", clusterConfig=" + clusterConfig + | |||||
", controller=" + controller + | ", controller=" + controller + | ||||
"}"; | |||||
'}'; | |||||
} | } | ||||
} | } |
@@ -15,7 +15,12 @@ | |||||
*/ | */ | ||||
package com.alibaba.csp.sentinel.slots.block.flow; | package com.alibaba.csp.sentinel.slots.block.flow; | ||||
import com.alibaba.csp.sentinel.cluster.ClusterTokenClient; | |||||
import com.alibaba.csp.sentinel.cluster.TokenClientProvider; | |||||
import com.alibaba.csp.sentinel.cluster.TokenResultStatus; | |||||
import com.alibaba.csp.sentinel.cluster.TokenResult; | |||||
import com.alibaba.csp.sentinel.context.Context; | import com.alibaba.csp.sentinel.context.Context; | ||||
import com.alibaba.csp.sentinel.log.RecordLog; | |||||
import com.alibaba.csp.sentinel.node.DefaultNode; | import com.alibaba.csp.sentinel.node.DefaultNode; | ||||
import com.alibaba.csp.sentinel.node.Node; | import com.alibaba.csp.sentinel.node.Node; | ||||
import com.alibaba.csp.sentinel.slots.block.RuleConstant; | import com.alibaba.csp.sentinel.slots.block.RuleConstant; | ||||
@@ -30,11 +35,25 @@ import com.alibaba.csp.sentinel.util.StringUtil; | |||||
final class FlowRuleChecker { | final class FlowRuleChecker { | ||||
static boolean passCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount) { | static boolean passCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount) { | ||||
return passCheck(rule, context, node, acquireCount, false); | |||||
} | |||||
static boolean passCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount, | |||||
boolean prioritized) { | |||||
String limitApp = rule.getLimitApp(); | String limitApp = rule.getLimitApp(); | ||||
if (limitApp == null) { | if (limitApp == null) { | ||||
return true; | return true; | ||||
} | } | ||||
if (rule.isClusterMode()) { | |||||
return passClusterCheck(rule, context, node, acquireCount, prioritized); | |||||
} | |||||
return passLocalCheck(rule, context, node, acquireCount, prioritized); | |||||
} | |||||
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, | |||||
boolean prioritized) { | |||||
Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node); | Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node); | ||||
if (selectedNode == null) { | if (selectedNode == null) { | ||||
return true; | return true; | ||||
@@ -43,6 +62,38 @@ final class FlowRuleChecker { | |||||
return rule.getRater().canPass(selectedNode, acquireCount); | return rule.getRater().canPass(selectedNode, acquireCount); | ||||
} | } | ||||
static boolean passClusterCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, | |||||
boolean prioritized) { | |||||
try { | |||||
ClusterTokenClient client = TokenClientProvider.getClient(); | |||||
if (client != null) { | |||||
TokenResult result = client.requestToken(rule.getClusterConfig().getFlowId(), acquireCount, | |||||
prioritized); | |||||
switch (result.getStatus()) { | |||||
case TokenResultStatus.OK: | |||||
return true; | |||||
case TokenResultStatus.SHOULD_WAIT: | |||||
// Wait for next tick. | |||||
Thread.sleep(result.getWaitInMs()); | |||||
return true; | |||||
case TokenResultStatus.NO_RULE_EXISTS: | |||||
case TokenResultStatus.BAD_REQUEST: | |||||
case TokenResultStatus.FAIL: | |||||
return passLocalCheck(rule, context, node, acquireCount, prioritized); | |||||
case TokenResultStatus.BLOCKED: | |||||
default: | |||||
return false; | |||||
} | |||||
} | |||||
// If client is absent, then fallback to local mode. | |||||
} catch (Throwable ex) { | |||||
RecordLog.warn("[FlowRuleChecker] Request cluster token unexpected failed", ex); | |||||
} | |||||
// TODO: choose whether fallback to local or inactivate the rule. | |||||
// Downgrade to local flow control when token client or server for this rule is not available. | |||||
return passLocalCheck(rule, context, node, acquireCount, prioritized); | |||||
} | |||||
static Node selectReferenceNode(FlowRule rule, Context context, DefaultNode node) { | static Node selectReferenceNode(FlowRule rule, Context context, DefaultNode node) { | ||||
String refResource = rule.getRefResource(); | String refResource = rule.getRefResource(); | ||||
int strategy = rule.getStrategy(); | int strategy = rule.getStrategy(); | ||||
@@ -103,4 +154,4 @@ final class FlowRuleChecker { | |||||
} | } | ||||
private FlowRuleChecker() {} | private FlowRuleChecker() {} | ||||
} | |||||
} |
@@ -19,10 +19,23 @@ import java.util.Comparator; | |||||
import com.alibaba.csp.sentinel.slots.block.RuleConstant; | import com.alibaba.csp.sentinel.slots.block.RuleConstant; | ||||
/** | |||||
* Comparator for flow rules. | |||||
* | |||||
* @author jialiang.linjl | |||||
*/ | |||||
public class FlowRuleComparator implements Comparator<FlowRule> { | public class FlowRuleComparator implements Comparator<FlowRule> { | ||||
@Override | @Override | ||||
public int compare(FlowRule o1, FlowRule o2) { | public int compare(FlowRule o1, FlowRule o2) { | ||||
// Clustered mode will be on the top. | |||||
if (o1.isClusterMode() && !o2.isClusterMode()) { | |||||
return 1; | |||||
} | |||||
if (!o1.isClusterMode() && o2.isClusterMode()) { | |||||
return -1; | |||||
} | |||||
if (o1.getLimitApp() == null) { | if (o1.getLimitApp() == null) { | ||||
return 0; | return 0; | ||||
@@ -16,8 +16,6 @@ | |||||
package com.alibaba.csp.sentinel.slots.block.flow; | package com.alibaba.csp.sentinel.slots.block.flow; | ||||
import java.util.ArrayList; | import java.util.ArrayList; | ||||
import java.util.Collections; | |||||
import java.util.Comparator; | |||||
import java.util.List; | import java.util.List; | ||||
import java.util.Map; | import java.util.Map; | ||||
import java.util.concurrent.ConcurrentHashMap; | import java.util.concurrent.ConcurrentHashMap; | ||||
@@ -32,16 +30,10 @@ import com.alibaba.csp.sentinel.node.metric.MetricTimerListener; | |||||
import com.alibaba.csp.sentinel.property.DynamicSentinelProperty; | import com.alibaba.csp.sentinel.property.DynamicSentinelProperty; | ||||
import com.alibaba.csp.sentinel.property.PropertyListener; | import com.alibaba.csp.sentinel.property.PropertyListener; | ||||
import com.alibaba.csp.sentinel.property.SentinelProperty; | import com.alibaba.csp.sentinel.property.SentinelProperty; | ||||
import com.alibaba.csp.sentinel.slots.block.RuleConstant; | |||||
import com.alibaba.csp.sentinel.slots.block.flow.controller.DefaultController; | |||||
import com.alibaba.csp.sentinel.slots.block.flow.controller.RateLimiterController; | |||||
import com.alibaba.csp.sentinel.slots.block.flow.controller.WarmUpController; | |||||
import com.alibaba.csp.sentinel.slots.block.flow.controller.WarmUpRateLimiterController; | |||||
/** | /** | ||||
* <p> | * <p> | ||||
* One resources can have multiple rules. And these rules take effects in the | |||||
* following order: | |||||
* One resources can have multiple rules. And these rules take effects in the following order: | |||||
* <ol> | * <ol> | ||||
* <li>requests from specified caller</li> | * <li>requests from specified caller</li> | ||||
* <li>no specified caller</li> | * <li>no specified caller</li> | ||||
@@ -49,18 +41,21 @@ import com.alibaba.csp.sentinel.slots.block.flow.controller.WarmUpRateLimiterCon | |||||
* </p> | * </p> | ||||
* | * | ||||
* @author jialiang.linjl | * @author jialiang.linjl | ||||
* @author Eric Zhao | |||||
*/ | */ | ||||
public class FlowRuleManager { | public class FlowRuleManager { | ||||
private static final Map<String, List<FlowRule>> flowRules = new ConcurrentHashMap<String, List<FlowRule>>(); | private static final Map<String, List<FlowRule>> flowRules = new ConcurrentHashMap<String, List<FlowRule>>(); | ||||
private final static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, | |||||
new NamedThreadFactory("sentinel-metrics-record-task", true)); | |||||
private final static FlowPropertyListener listener = new FlowPropertyListener(); | |||||
private static final FlowPropertyListener LISTENER = new FlowPropertyListener(); | |||||
private static SentinelProperty<List<FlowRule>> currentProperty = new DynamicSentinelProperty<List<FlowRule>>(); | private static SentinelProperty<List<FlowRule>> currentProperty = new DynamicSentinelProperty<List<FlowRule>>(); | ||||
private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1, | |||||
new NamedThreadFactory("sentinel-metrics-record-task", true)); | |||||
static { | static { | ||||
currentProperty.addListener(listener); | |||||
scheduler.scheduleAtFixedRate(new MetricTimerListener(), 0, 1, TimeUnit.SECONDS); | |||||
currentProperty.addListener(LISTENER); | |||||
SCHEDULER.scheduleAtFixedRate(new MetricTimerListener(), 0, 1, TimeUnit.SECONDS); | |||||
} | } | ||||
/** | /** | ||||
@@ -70,10 +65,10 @@ public class FlowRuleManager { | |||||
* @param property the property to listen. | * @param property the property to listen. | ||||
*/ | */ | ||||
public static void register2Property(SentinelProperty<List<FlowRule>> property) { | public static void register2Property(SentinelProperty<List<FlowRule>> property) { | ||||
synchronized (listener) { | |||||
synchronized (LISTENER) { | |||||
RecordLog.info("[FlowRuleManager] Registering new property to flow rule manager"); | RecordLog.info("[FlowRuleManager] Registering new property to flow rule manager"); | ||||
currentProperty.removeListener(listener); | |||||
property.addListener(listener); | |||||
currentProperty.removeListener(LISTENER); | |||||
property.addListener(LISTENER); | |||||
currentProperty = property; | currentProperty = property; | ||||
} | } | ||||
} | } | ||||
@@ -100,65 +95,7 @@ public class FlowRuleManager { | |||||
currentProperty.updateValue(rules); | currentProperty.updateValue(rules); | ||||
} | } | ||||
private static Map<String, List<FlowRule>> loadFlowConf(List<FlowRule> list) { | |||||
Map<String, List<FlowRule>> newRuleMap = new ConcurrentHashMap<String, List<FlowRule>>(); | |||||
if (list == null || list.isEmpty()) { | |||||
return newRuleMap; | |||||
} | |||||
for (FlowRule rule : list) { | |||||
if (!isValidRule(rule)) { | |||||
RecordLog.warn("[FlowRuleManager] Ignoring invalid flow rule when loading new flow rules: " + rule); | |||||
continue; | |||||
} | |||||
if (StringUtil.isBlank(rule.getLimitApp())) { | |||||
rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT); | |||||
} | |||||
TrafficShapingController rater = new DefaultController(rule.getCount(), rule.getGrade()); | |||||
if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS | |||||
&& rule.getControlBehavior() == RuleConstant.CONTROL_BEHAVIOR_WARM_UP | |||||
&& rule.getWarmUpPeriodSec() > 0) { | |||||
rater = new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(), ColdFactorProperty.coldFactor); | |||||
} else if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS | |||||
&& rule.getControlBehavior() == RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER | |||||
&& rule.getMaxQueueingTimeMs() > 0) { | |||||
rater = new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount()); | |||||
} else if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS | |||||
&& rule.getControlBehavior() == RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER | |||||
&& rule.getMaxQueueingTimeMs() > 0 && rule.getWarmUpPeriodSec() > 0) { | |||||
rater = new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(), | |||||
rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor); | |||||
} | |||||
rule.setRater(rater); | |||||
String identity = rule.getResource(); | |||||
List<FlowRule> ruleM = newRuleMap.get(identity); | |||||
if (ruleM == null) { | |||||
ruleM = new ArrayList<FlowRule>(); | |||||
newRuleMap.put(identity, ruleM); | |||||
} | |||||
ruleM.add(rule); | |||||
} | |||||
if (!newRuleMap.isEmpty()) { | |||||
Comparator<FlowRule> comparator = new FlowRuleComparator(); | |||||
// Sort the rules. | |||||
for (List<FlowRule> rules : newRuleMap.values()) { | |||||
Collections.sort(rules, comparator); | |||||
} | |||||
} | |||||
return newRuleMap; | |||||
} | |||||
static Map<String, List<FlowRule>> getFlowRules() { | |||||
static Map<String, List<FlowRule>> getFlowRuleMap() { | |||||
return flowRules; | return flowRules; | ||||
} | } | ||||
@@ -188,7 +125,7 @@ public class FlowRuleManager { | |||||
@Override | @Override | ||||
public void configUpdate(List<FlowRule> value) { | public void configUpdate(List<FlowRule> value) { | ||||
Map<String, List<FlowRule>> rules = loadFlowConf(value); | |||||
Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(value); | |||||
if (rules != null) { | if (rules != null) { | ||||
flowRules.clear(); | flowRules.clear(); | ||||
flowRules.putAll(rules); | flowRules.putAll(rules); | ||||
@@ -198,43 +135,13 @@ public class FlowRuleManager { | |||||
@Override | @Override | ||||
public void configLoad(List<FlowRule> conf) { | public void configLoad(List<FlowRule> conf) { | ||||
Map<String, List<FlowRule>> rules = loadFlowConf(conf); | |||||
Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(conf); | |||||
if (rules != null) { | if (rules != null) { | ||||
flowRules.clear(); | flowRules.clear(); | ||||
flowRules.putAll(rules); | flowRules.putAll(rules); | ||||
} | } | ||||
RecordLog.info("[FlowRuleManager] Flow rules loaded: " + flowRules); | RecordLog.info("[FlowRuleManager] Flow rules loaded: " + flowRules); | ||||
} | } | ||||
} | } | ||||
public static boolean isValidRule(FlowRule rule) { | |||||
boolean baseValid = rule != null && !StringUtil.isBlank(rule.getResource()) && rule.getCount() >= 0 | |||||
&& rule.getGrade() >= 0 && rule.getStrategy() >= 0 && rule.getControlBehavior() >= 0; | |||||
if (!baseValid) { | |||||
return false; | |||||
} | |||||
// Check strategy and control (shaping) behavior. | |||||
return checkStrategyField(rule) && checkControlBehaviorField(rule); | |||||
} | |||||
private static boolean checkStrategyField(/*@NonNull*/ FlowRule rule) { | |||||
if (rule.getStrategy() == RuleConstant.STRATEGY_RELATE || rule.getStrategy() == RuleConstant.STRATEGY_CHAIN) { | |||||
return StringUtil.isNotBlank(rule.getRefResource()); | |||||
} | |||||
return true; | |||||
} | |||||
private static boolean checkControlBehaviorField(/*@NonNull*/ FlowRule rule) { | |||||
switch (rule.getControlBehavior()) { | |||||
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP: | |||||
return rule.getWarmUpPeriodSec() > 0; | |||||
case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER: | |||||
return rule.getMaxQueueingTimeMs() > 0; | |||||
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER: | |||||
return rule.getWarmUpPeriodSec() > 0 && rule.getMaxQueueingTimeMs() > 0; | |||||
default: | |||||
return true; | |||||
} | |||||
} | |||||
} | } |
@@ -0,0 +1,227 @@ | |||||
/* | |||||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||||
* | |||||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||||
* you may not use this file except in compliance with the License. | |||||
* You may obtain a copy of the License at | |||||
* | |||||
* http://www.apache.org/licenses/LICENSE-2.0 | |||||
* | |||||
* Unless required by applicable law or agreed to in writing, software | |||||
* distributed under the License is distributed on an "AS IS" BASIS, | |||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
* See the License for the specific language governing permissions and | |||||
* limitations under the License. | |||||
*/ | |||||
package com.alibaba.csp.sentinel.slots.block.flow; | |||||
import java.util.ArrayList; | |||||
import java.util.Collections; | |||||
import java.util.Comparator; | |||||
import java.util.List; | |||||
import java.util.Map; | |||||
import java.util.concurrent.ConcurrentHashMap; | |||||
import com.alibaba.csp.sentinel.log.RecordLog; | |||||
import com.alibaba.csp.sentinel.slots.block.ClusterRuleConstant; | |||||
import com.alibaba.csp.sentinel.slots.block.RuleConstant; | |||||
import com.alibaba.csp.sentinel.slots.block.flow.controller.DefaultController; | |||||
import com.alibaba.csp.sentinel.slots.block.flow.controller.RateLimiterController; | |||||
import com.alibaba.csp.sentinel.slots.block.flow.controller.WarmUpController; | |||||
import com.alibaba.csp.sentinel.slots.block.flow.controller.WarmUpRateLimiterController; | |||||
import com.alibaba.csp.sentinel.util.StringUtil; | |||||
import com.alibaba.csp.sentinel.util.function.Function; | |||||
import com.alibaba.csp.sentinel.util.function.Predicate; | |||||
/** | |||||
* @author Eric Zhao | |||||
* @since 1.4.0 | |||||
*/ | |||||
public final class FlowRuleUtil { | |||||
/** | |||||
* Build the flow rule map from raw list of flow rules, grouping by resource name. | |||||
* | |||||
* @param list raw list of flow rules | |||||
* @return constructed new flow rule map; empty map if list is null or empty, or no valid rules | |||||
*/ | |||||
public static Map<String, List<FlowRule>> buildFlowRuleMap(List<FlowRule> list) { | |||||
return buildFlowRuleMap(list, null); | |||||
} | |||||
/** | |||||
* Build the flow rule map from raw list of flow rules, grouping by resource name. | |||||
* | |||||
* @param list raw list of flow rules | |||||
* @param filter rule filter | |||||
* @return constructed new flow rule map; empty map if list is null or empty, or no wanted rules | |||||
*/ | |||||
public static Map<String, List<FlowRule>> buildFlowRuleMap(List<FlowRule> list, Predicate<FlowRule> filter) { | |||||
return buildFlowRuleMap(list, filter, true); | |||||
} | |||||
/** | |||||
* Build the flow rule map from raw list of flow rules, grouping by resource name. | |||||
* | |||||
* @param list raw list of flow rules | |||||
* @param filter rule filter | |||||
* @param shouldSort whether the rules should be sorted | |||||
* @return constructed new flow rule map; empty map if list is null or empty, or no wanted rules | |||||
*/ | |||||
public static Map<String, List<FlowRule>> buildFlowRuleMap(List<FlowRule> list, Predicate<FlowRule> filter, | |||||
boolean shouldSort) { | |||||
return buildFlowRuleMap(list, extractResource, filter, shouldSort); | |||||
} | |||||
/** | |||||
* Build the flow rule map from raw list of flow rules, grouping by provided group function. | |||||
* | |||||
* @param list raw list of flow rules | |||||
* @param groupFunction grouping function of the map (by key) | |||||
* @param filter rule filter | |||||
* @param shouldSort whether the rules should be sorted | |||||
* @param <K> type of key | |||||
* @return constructed new flow rule map; empty map if list is null or empty, or no wanted rules | |||||
*/ | |||||
public static <K> Map<K, List<FlowRule>> buildFlowRuleMap(List<FlowRule> list, Function<FlowRule, K> groupFunction, | |||||
Predicate<FlowRule> filter, boolean shouldSort) { | |||||
Map<K, List<FlowRule>> newRuleMap = new ConcurrentHashMap<K, List<FlowRule>>(); | |||||
if (list == null || list.isEmpty()) { | |||||
return newRuleMap; | |||||
} | |||||
for (FlowRule rule : list) { | |||||
if (!isValidRule(rule)) { | |||||
RecordLog.warn("[FlowRuleManager] Ignoring invalid flow rule when loading new flow rules: " + rule); | |||||
continue; | |||||
} | |||||
if (filter != null && !filter.test(rule)) { | |||||
continue; | |||||
} | |||||
if (StringUtil.isBlank(rule.getLimitApp())) { | |||||
rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT); | |||||
} | |||||
TrafficShapingController rater = generateRater(rule); | |||||
rule.setRater(rater); | |||||
K key = groupFunction.apply(rule); | |||||
if (key == null) { | |||||
continue; | |||||
} | |||||
List<FlowRule> flowRules = newRuleMap.get(key); | |||||
if (flowRules == null) { | |||||
flowRules = new ArrayList<FlowRule>(); | |||||
newRuleMap.put(key, flowRules); | |||||
} | |||||
flowRules.add(rule); | |||||
} | |||||
if (shouldSort && !newRuleMap.isEmpty()) { | |||||
Comparator<FlowRule> comparator = new FlowRuleComparator(); | |||||
// Sort the rules. | |||||
for (List<FlowRule> rules : newRuleMap.values()) { | |||||
Collections.sort(rules, comparator); | |||||
} | |||||
} | |||||
return newRuleMap; | |||||
} | |||||
private static TrafficShapingController generateRater(/*@Valid*/ FlowRule rule) { | |||||
if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) { | |||||
switch (rule.getControlBehavior()) { | |||||
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP: | |||||
return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(), | |||||
ColdFactorProperty.coldFactor); | |||||
case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER: | |||||
return new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount()); | |||||
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER: | |||||
return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(), | |||||
rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor); | |||||
case RuleConstant.CONTROL_BEHAVIOR_DEFAULT: | |||||
default: | |||||
// Default mode or unknown mode: default traffic shaping controller (fast-reject). | |||||
} | |||||
} | |||||
return new DefaultController(rule.getCount(), rule.getGrade()); | |||||
} | |||||
/** | |||||
* Check whether provided ID can be a valid cluster flow ID. | |||||
* | |||||
* @param id flow ID to check | |||||
* @return true if valid, otherwise false | |||||
*/ | |||||
public static boolean validClusterRuleId(Long id) { | |||||
return id != null && id > 0; | |||||
} | |||||
/** | |||||
* Check whether provided flow rule is valid. | |||||
* | |||||
* @param rule flow rule to check | |||||
* @return true if valid, otherwise false | |||||
*/ | |||||
public static boolean isValidRule(FlowRule rule) { | |||||
boolean baseValid = rule != null && !StringUtil.isBlank(rule.getResource()) && rule.getCount() >= 0 | |||||
&& rule.getGrade() >= 0 && rule.getStrategy() >= 0 && rule.getControlBehavior() >= 0; | |||||
if (!baseValid) { | |||||
return false; | |||||
} | |||||
// Check strategy and control (shaping) behavior. | |||||
return checkClusterField(rule) && checkStrategyField(rule) && checkControlBehaviorField(rule); | |||||
} | |||||
private static boolean checkClusterField(/*@NonNull*/ FlowRule rule) { | |||||
if (!rule.isClusterMode()) { | |||||
return true; | |||||
} | |||||
ClusterFlowConfig clusterConfig = rule.getClusterConfig(); | |||||
if (clusterConfig == null) { | |||||
return false; | |||||
} | |||||
if (!validClusterRuleId(clusterConfig.getFlowId())) { | |||||
return false; | |||||
} | |||||
switch (rule.getStrategy()) { | |||||
case ClusterRuleConstant.FLOW_CLUSTER_STRATEGY_NORMAL: | |||||
return true; | |||||
case ClusterRuleConstant.FLOW_CLUSTER_STRATEGY_REF: | |||||
return validClusterRuleId(clusterConfig.getRefFlowId()); | |||||
default: | |||||
return true; | |||||
} | |||||
} | |||||
private static boolean checkStrategyField(/*@NonNull*/ FlowRule rule) { | |||||
if (rule.getStrategy() == RuleConstant.STRATEGY_RELATE || rule.getStrategy() == RuleConstant.STRATEGY_CHAIN) { | |||||
return StringUtil.isNotBlank(rule.getRefResource()); | |||||
} | |||||
return true; | |||||
} | |||||
private static boolean checkControlBehaviorField(/*@NonNull*/ FlowRule rule) { | |||||
switch (rule.getControlBehavior()) { | |||||
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP: | |||||
return rule.getWarmUpPeriodSec() > 0; | |||||
case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER: | |||||
return rule.getMaxQueueingTimeMs() > 0; | |||||
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER: | |||||
return rule.getWarmUpPeriodSec() > 0 && rule.getMaxQueueingTimeMs() > 0; | |||||
default: | |||||
return true; | |||||
} | |||||
} | |||||
private static final Function<FlowRule, String> extractResource = new Function<FlowRule, String>() { | |||||
@Override | |||||
public String apply(FlowRule rule) { | |||||
return rule.getResource(); | |||||
} | |||||
}; | |||||
private FlowRuleUtil() {} | |||||
} |
@@ -138,27 +138,27 @@ public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> { | |||||
@Override | @Override | ||||
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, | public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, | ||||
boolean prioritized, Object... args) throws Throwable { | boolean prioritized, Object... args) throws Throwable { | ||||
checkFlow(resourceWrapper, context, node, count); | |||||
checkFlow(resourceWrapper, context, node, count, prioritized); | |||||
fireEntry(context, resourceWrapper, node, count, prioritized, args); | fireEntry(context, resourceWrapper, node, count, prioritized, args); | ||||
} | } | ||||
void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count) throws BlockException { | |||||
void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException { | |||||
// Flow rule map cannot be null. | // Flow rule map cannot be null. | ||||
Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRules(); | |||||
Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap(); | |||||
List<FlowRule> rules = flowRules.get(resource.getName()); | List<FlowRule> rules = flowRules.get(resource.getName()); | ||||
if (rules != null) { | if (rules != null) { | ||||
for (FlowRule rule : rules) { | for (FlowRule rule : rules) { | ||||
if (!canPassCheck(rule, context, node, count)) { | |||||
if (!canPassCheck(rule, context, node, count, prioritized)) { | |||||
throw new FlowException(rule.getLimitApp()); | throw new FlowException(rule.getLimitApp()); | ||||
} | } | ||||
} | } | ||||
} | } | ||||
} | } | ||||
boolean canPassCheck(FlowRule rule, Context context, DefaultNode node, int count) { | |||||
return FlowRuleChecker.passCheck(rule, context, node, count); | |||||
boolean canPassCheck(FlowRule rule, Context context, DefaultNode node, int count, boolean prioritized) { | |||||
return FlowRuleChecker.passCheck(rule, context, node, count, prioritized); | |||||
} | } | ||||
@Override | @Override | ||||
@@ -54,7 +54,7 @@ public class FlowSlotTest { | |||||
Context context = mock(Context.class); | Context context = mock(Context.class); | ||||
DefaultNode node = mock(DefaultNode.class); | DefaultNode node = mock(DefaultNode.class); | ||||
doCallRealMethod().when(flowSlot).checkFlow(any(ResourceWrapper.class), any(Context.class), | doCallRealMethod().when(flowSlot).checkFlow(any(ResourceWrapper.class), any(Context.class), | ||||
any(DefaultNode.class), anyInt()); | |||||
any(DefaultNode.class), anyInt(), anyBoolean()); | |||||
String resA = "resAK"; | String resA = "resAK"; | ||||
String resB = "resBK"; | String resB = "resBK"; | ||||
@@ -63,13 +63,13 @@ public class FlowSlotTest { | |||||
// Here we only load rules for resA. | // Here we only load rules for resA. | ||||
FlowRuleManager.loadRules(Collections.singletonList(rule1)); | FlowRuleManager.loadRules(Collections.singletonList(rule1)); | ||||
when(flowSlot.canPassCheck(eq(rule1), any(Context.class), any(DefaultNode.class), anyInt())) | |||||
when(flowSlot.canPassCheck(eq(rule1), any(Context.class), any(DefaultNode.class), anyInt(), anyBoolean())) | |||||
.thenReturn(true); | .thenReturn(true); | ||||
when(flowSlot.canPassCheck(eq(rule2), any(Context.class), any(DefaultNode.class), anyInt())) | |||||
when(flowSlot.canPassCheck(eq(rule2), any(Context.class), any(DefaultNode.class), anyInt(), anyBoolean())) | |||||
.thenReturn(false); | .thenReturn(false); | ||||
flowSlot.checkFlow(new StringResourceWrapper(resA, EntryType.IN), context, node, 1); | |||||
flowSlot.checkFlow(new StringResourceWrapper(resB, EntryType.IN), context, node, 1); | |||||
flowSlot.checkFlow(new StringResourceWrapper(resA, EntryType.IN), context, node, 1, false); | |||||
flowSlot.checkFlow(new StringResourceWrapper(resB, EntryType.IN), context, node, 1, false); | |||||
} | } | ||||
@Test(expected = FlowException.class) | @Test(expected = FlowException.class) | ||||
@@ -78,15 +78,15 @@ public class FlowSlotTest { | |||||
Context context = mock(Context.class); | Context context = mock(Context.class); | ||||
DefaultNode node = mock(DefaultNode.class); | DefaultNode node = mock(DefaultNode.class); | ||||
doCallRealMethod().when(flowSlot).checkFlow(any(ResourceWrapper.class), any(Context.class), | doCallRealMethod().when(flowSlot).checkFlow(any(ResourceWrapper.class), any(Context.class), | ||||
any(DefaultNode.class), anyInt()); | |||||
any(DefaultNode.class), anyInt(), anyBoolean()); | |||||
String resA = "resAK"; | String resA = "resAK"; | ||||
FlowRule rule = new FlowRule(resA).setCount(10); | FlowRule rule = new FlowRule(resA).setCount(10); | ||||
FlowRuleManager.loadRules(Collections.singletonList(rule)); | FlowRuleManager.loadRules(Collections.singletonList(rule)); | ||||
when(flowSlot.canPassCheck(any(FlowRule.class), any(Context.class), any(DefaultNode.class), anyInt())) | |||||
when(flowSlot.canPassCheck(any(FlowRule.class), any(Context.class), any(DefaultNode.class), anyInt(), anyBoolean())) | |||||
.thenReturn(false); | .thenReturn(false); | ||||
flowSlot.checkFlow(new StringResourceWrapper(resA, EntryType.IN), context, node, 1); | |||||
flowSlot.checkFlow(new StringResourceWrapper(resA, EntryType.IN), context, node, 1, false); | |||||
} | } | ||||
} | } |
@@ -16,14 +16,23 @@ | |||||
package com.alibaba.csp.sentinel.slots.block.flow.param; | package com.alibaba.csp.sentinel.slots.block.flow.param; | ||||
import java.lang.reflect.Array; | import java.lang.reflect.Array; | ||||
import java.util.ArrayList; | |||||
import java.util.Collection; | import java.util.Collection; | ||||
import java.util.Collections; | |||||
import java.util.List; | |||||
import java.util.Set; | import java.util.Set; | ||||
import com.alibaba.csp.sentinel.cluster.ClusterTokenClient; | |||||
import com.alibaba.csp.sentinel.cluster.TokenClientProvider; | |||||
import com.alibaba.csp.sentinel.cluster.TokenResult; | |||||
import com.alibaba.csp.sentinel.cluster.TokenResultStatus; | |||||
import com.alibaba.csp.sentinel.log.RecordLog; | import com.alibaba.csp.sentinel.log.RecordLog; | ||||
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; | import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; | ||||
import com.alibaba.csp.sentinel.slots.block.RuleConstant; | import com.alibaba.csp.sentinel.slots.block.RuleConstant; | ||||
/** | /** | ||||
* Rule checker for parameter flow control. | |||||
* | |||||
* @author Eric Zhao | * @author Eric Zhao | ||||
* @since 0.2.0 | * @since 0.2.0 | ||||
*/ | */ | ||||
@@ -40,17 +49,72 @@ final class ParamFlowChecker { | |||||
return true; | return true; | ||||
} | } | ||||
// Get parameter value. If value is null, then pass. | |||||
Object value = args[paramIdx]; | Object value = args[paramIdx]; | ||||
if (value == null) { | |||||
return true; | |||||
} | |||||
if (rule.isClusterMode()) { | |||||
return passClusterCheck(resourceWrapper, rule, count, value); | |||||
} | |||||
return passLocalCheck(resourceWrapper, rule, count, value); | return passLocalCheck(resourceWrapper, rule, count, value); | ||||
} | } | ||||
private static ParameterMetric getHotParameters(ResourceWrapper resourceWrapper) { | |||||
// Should not be null. | |||||
return ParamFlowSlot.getParamMetric(resourceWrapper); | |||||
@SuppressWarnings("unchecked") | |||||
private static Collection<Object> toCollection(Object value) { | |||||
if (value instanceof Collection) { | |||||
return (Collection<Object>)value; | |||||
} else if (value.getClass().isArray()) { | |||||
List<Object> params = new ArrayList<Object>(); | |||||
int length = Array.getLength(value); | |||||
for (int i = 0; i < length; i++) { | |||||
Object param = Array.get(value, i); | |||||
params.add(param); | |||||
} | |||||
return params; | |||||
} else { | |||||
return Collections.singletonList(value); | |||||
} | |||||
} | } | ||||
private static boolean passLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count, Object value) { | |||||
private static boolean passClusterCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count, | |||||
Object value) { | |||||
try { | |||||
ClusterTokenClient client = TokenClientProvider.getClient(); | |||||
if (client == null) { | |||||
return true; | |||||
} | |||||
Collection<Object> params = toCollection(value); | |||||
TokenResult result = client.requestParamToken(rule.getClusterConfig().getFlowId(), count, params); | |||||
switch (result.getStatus()) { | |||||
case TokenResultStatus.OK: | |||||
return true; | |||||
case TokenResultStatus.BLOCKED: | |||||
return false; | |||||
default: | |||||
return fallbackToLocalOrPass(resourceWrapper, rule, count, params); | |||||
} | |||||
} catch (Throwable ex) { | |||||
RecordLog.warn("[ParamFlowChecker] Request cluster token for parameter unexpected failed", ex); | |||||
return fallbackToLocalOrPass(resourceWrapper, rule, count, value); | |||||
} | |||||
} | |||||
private static boolean fallbackToLocalOrPass(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count, | |||||
Object value) { | |||||
if (rule.getClusterConfig().isFallbackToLocalWhenFail()) { | |||||
return passLocalCheck(resourceWrapper, rule, count, value); | |||||
} else { | |||||
// The rule won't be activated, just pass. | |||||
return true; | |||||
} | |||||
} | |||||
private static boolean passLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count, | |||||
Object value) { | |||||
try { | try { | ||||
if (Collection.class.isAssignableFrom(value.getClass())) { | if (Collection.class.isAssignableFrom(value.getClass())) { | ||||
for (Object param : ((Collection)value)) { | for (Object param : ((Collection)value)) { | ||||
@@ -70,7 +134,7 @@ final class ParamFlowChecker { | |||||
return passSingleValueCheck(resourceWrapper, rule, count, value); | return passSingleValueCheck(resourceWrapper, rule, count, value); | ||||
} | } | ||||
} catch (Throwable e) { | } catch (Throwable e) { | ||||
RecordLog.info("[ParamFlowChecker] Unexpected error", e); | |||||
RecordLog.warn("[ParamFlowChecker] Unexpected error", e); | |||||
} | } | ||||
return true; | return true; | ||||
@@ -96,5 +160,10 @@ final class ParamFlowChecker { | |||||
return true; | return true; | ||||
} | } | ||||
private static ParameterMetric getHotParameters(ResourceWrapper resourceWrapper) { | |||||
// Should not be null. | |||||
return ParamFlowSlot.getParamMetric(resourceWrapper); | |||||
} | |||||
private ParamFlowChecker() {} | private ParamFlowChecker() {} | ||||
} | } |
@@ -0,0 +1,94 @@ | |||||
/* | |||||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||||
* | |||||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||||
* you may not use this file except in compliance with the License. | |||||
* You may obtain a copy of the License at | |||||
* | |||||
* http://www.apache.org/licenses/LICENSE-2.0 | |||||
* | |||||
* Unless required by applicable law or agreed to in writing, software | |||||
* distributed under the License is distributed on an "AS IS" BASIS, | |||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
* See the License for the specific language governing permissions and | |||||
* limitations under the License. | |||||
*/ | |||||
package com.alibaba.csp.sentinel.slots.block.flow.param; | |||||
import com.alibaba.csp.sentinel.slots.block.ClusterRuleConstant; | |||||
/** | |||||
* Parameter flow rule config in cluster mode. | |||||
* | |||||
* @author Eric Zhao | |||||
* @since 1.4.0 | |||||
*/ | |||||
public class ParamFlowClusterConfig { | |||||
/** | |||||
* Global unique ID. | |||||
*/ | |||||
private Long flowId; | |||||
/** | |||||
* Threshold type (average by local value or global value). | |||||
*/ | |||||
private int thresholdType = ClusterRuleConstant.FLOW_THRESHOLD_AVG_LOCAL; | |||||
private boolean fallbackToLocalWhenFail = false; | |||||
public Long getFlowId() { | |||||
return flowId; | |||||
} | |||||
public ParamFlowClusterConfig setFlowId(Long flowId) { | |||||
this.flowId = flowId; | |||||
return this; | |||||
} | |||||
public int getThresholdType() { | |||||
return thresholdType; | |||||
} | |||||
public ParamFlowClusterConfig setThresholdType(int thresholdType) { | |||||
this.thresholdType = thresholdType; | |||||
return this; | |||||
} | |||||
public boolean isFallbackToLocalWhenFail() { | |||||
return fallbackToLocalWhenFail; | |||||
} | |||||
public ParamFlowClusterConfig setFallbackToLocalWhenFail(boolean fallbackToLocalWhenFail) { | |||||
this.fallbackToLocalWhenFail = fallbackToLocalWhenFail; | |||||
return this; | |||||
} | |||||
@Override | |||||
public boolean equals(Object o) { | |||||
if (this == o) { return true; } | |||||
if (o == null || getClass() != o.getClass()) { return false; } | |||||
ParamFlowClusterConfig that = (ParamFlowClusterConfig)o; | |||||
if (thresholdType != that.thresholdType) { return false; } | |||||
if (fallbackToLocalWhenFail != that.fallbackToLocalWhenFail) { return false; } | |||||
return flowId != null ? flowId.equals(that.flowId) : that.flowId == null; | |||||
} | |||||
@Override | |||||
public int hashCode() { | |||||
int result = flowId != null ? flowId.hashCode() : 0; | |||||
result = 31 * result + thresholdType; | |||||
result = 31 * result + (fallbackToLocalWhenFail ? 1 : 0); | |||||
return result; | |||||
} | |||||
@Override | |||||
public String toString() { | |||||
return "ParamFlowClusterConfig{" + | |||||
"flowId=" + flowId + | |||||
", thresholdType=" + thresholdType + | |||||
", fallbackToLocalWhenFail=" + fallbackToLocalWhenFail + | |||||
'}'; | |||||
} | |||||
} |
@@ -65,6 +65,9 @@ public class ParamFlowRule extends AbstractRule { | |||||
*/ | */ | ||||
private Map<Object, Integer> hotItems = new HashMap<Object, Integer>(); | private Map<Object, Integer> hotItems = new HashMap<Object, Integer>(); | ||||
private boolean clusterMode = false; | |||||
private ParamFlowClusterConfig clusterConfig; | |||||
public int getGrade() { | public int getGrade() { | ||||
return grade; | return grade; | ||||
} | } | ||||
@@ -110,6 +113,25 @@ public class ParamFlowRule extends AbstractRule { | |||||
return this; | return this; | ||||
} | } | ||||
public boolean isClusterMode() { | |||||
return clusterMode; | |||||
} | |||||
public ParamFlowRule setClusterMode(boolean clusterMode) { | |||||
this.clusterMode = clusterMode; | |||||
return this; | |||||
} | |||||
public ParamFlowClusterConfig getClusterConfig() { | |||||
return clusterConfig; | |||||
} | |||||
public ParamFlowRule setClusterConfig( | |||||
ParamFlowClusterConfig clusterConfig) { | |||||
this.clusterConfig = clusterConfig; | |||||
return this; | |||||
} | |||||
@Override | @Override | ||||
@Deprecated | @Deprecated | ||||
public boolean passCheck(Context context, DefaultNode node, int count, Object... args) { | public boolean passCheck(Context context, DefaultNode node, int count, Object... args) { | ||||
@@ -126,8 +148,11 @@ public class ParamFlowRule extends AbstractRule { | |||||
if (grade != rule.grade) { return false; } | if (grade != rule.grade) { return false; } | ||||
if (Double.compare(rule.count, count) != 0) { return false; } | if (Double.compare(rule.count, count) != 0) { return false; } | ||||
if (clusterMode != rule.clusterMode) { return false; } | |||||
if (paramIdx != null ? !paramIdx.equals(rule.paramIdx) : rule.paramIdx != null) { return false; } | if (paramIdx != null ? !paramIdx.equals(rule.paramIdx) : rule.paramIdx != null) { return false; } | ||||
return paramFlowItemList != null ? paramFlowItemList.equals(rule.paramFlowItemList) : rule.paramFlowItemList == null; | |||||
if (paramFlowItemList != null ? !paramFlowItemList.equals(rule.paramFlowItemList) | |||||
: rule.paramFlowItemList != null) { return false; } | |||||
return clusterConfig != null ? clusterConfig.equals(rule.clusterConfig) : rule.clusterConfig == null; | |||||
} | } | ||||
@Override | @Override | ||||
@@ -139,18 +164,20 @@ public class ParamFlowRule extends AbstractRule { | |||||
temp = Double.doubleToLongBits(count); | temp = Double.doubleToLongBits(count); | ||||
result = 31 * result + (int)(temp ^ (temp >>> 32)); | result = 31 * result + (int)(temp ^ (temp >>> 32)); | ||||
result = 31 * result + (paramFlowItemList != null ? paramFlowItemList.hashCode() : 0); | result = 31 * result + (paramFlowItemList != null ? paramFlowItemList.hashCode() : 0); | ||||
result = 31 * result + (clusterMode ? 1 : 0); | |||||
result = 31 * result + (clusterConfig != null ? clusterConfig.hashCode() : 0); | |||||
return result; | return result; | ||||
} | } | ||||
@Override | @Override | ||||
public String toString() { | public String toString() { | ||||
return "ParamFlowRule{" + | return "ParamFlowRule{" + | ||||
"resource=" + getResource() + | |||||
", limitApp=" + getLimitApp() + | |||||
", grade=" + grade + | |||||
"grade=" + grade + | |||||
", paramIdx=" + paramIdx + | ", paramIdx=" + paramIdx + | ||||
", count=" + count + | ", count=" + count + | ||||
", paramFlowItemList=" + paramFlowItemList + | ", paramFlowItemList=" + paramFlowItemList + | ||||
", clusterMode=" + clusterMode + | |||||
", clusterConfig=" + clusterConfig + | |||||
'}'; | '}'; | ||||
} | } | ||||
} | } |
@@ -99,37 +99,6 @@ public final class ParamFlowRuleManager { | |||||
return rules; | return rules; | ||||
} | } | ||||
private static Object parseValue(String value, String classType) { | |||||
if (value == null) { | |||||
throw new IllegalArgumentException("Null value"); | |||||
} | |||||
if (StringUtil.isBlank(classType)) { | |||||
// If the class type is not provided, then treat it as string. | |||||
return value; | |||||
} | |||||
// Handle primitive type. | |||||
if (int.class.toString().equals(classType) || Integer.class.getName().equals(classType)) { | |||||
return Integer.parseInt(value); | |||||
} else if (boolean.class.toString().equals(classType) || Boolean.class.getName().equals(classType)) { | |||||
return Boolean.parseBoolean(value); | |||||
} else if (long.class.toString().equals(classType) || Long.class.getName().equals(classType)) { | |||||
return Long.parseLong(value); | |||||
} else if (double.class.toString().equals(classType) || Double.class.getName().equals(classType)) { | |||||
return Double.parseDouble(value); | |||||
} else if (float.class.toString().equals(classType) || Float.class.getName().equals(classType)) { | |||||
return Float.parseFloat(value); | |||||
} else if (byte.class.toString().equals(classType) || Byte.class.getName().equals(classType)) { | |||||
return Byte.parseByte(value); | |||||
} else if (short.class.toString().equals(classType) || Short.class.getName().equals(classType)) { | |||||
return Short.parseShort(value); | |||||
} else if (char.class.toString().equals(classType)) { | |||||
char[] array = value.toCharArray(); | |||||
return array.length > 0 ? array[0] : null; | |||||
} | |||||
return value; | |||||
} | |||||
static class RulePropertyListener implements PropertyListener<List<ParamFlowRule>> { | static class RulePropertyListener implements PropertyListener<List<ParamFlowRule>> { | ||||
@Override | @Override | ||||
@@ -163,7 +132,7 @@ public final class ParamFlowRuleManager { | |||||
} | } | ||||
for (ParamFlowRule rule : list) { | for (ParamFlowRule rule : list) { | ||||
if (!isValidRule(rule)) { | |||||
if (!ParamFlowRuleUtil.isValidRule(rule)) { | |||||
RecordLog.warn("[ParamFlowRuleManager] Ignoring invalid rule when loading new rules: " + rule); | RecordLog.warn("[ParamFlowRuleManager] Ignoring invalid rule when loading new rules: " + rule); | ||||
continue; | continue; | ||||
} | } | ||||
@@ -172,12 +141,7 @@ public final class ParamFlowRuleManager { | |||||
rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT); | rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT); | ||||
} | } | ||||
if (rule.getParamFlowItemList() == null) { | |||||
rule.setParamFlowItemList(new ArrayList<ParamFlowItem>()); | |||||
} | |||||
Map<Object, Integer> itemMap = parseHotItems(rule.getParamFlowItemList()); | |||||
rule.setParsedHotItems(itemMap); | |||||
ParamFlowRuleUtil.fillExceptionFlowItems(rule); | |||||
String resourceName = rule.getResource(); | String resourceName = rule.getResource(); | ||||
List<ParamFlowRule> ruleList = newRuleMap.get(resourceName); | List<ParamFlowRule> ruleList = newRuleMap.get(resourceName); | ||||
@@ -200,34 +164,6 @@ public final class ParamFlowRuleManager { | |||||
} | } | ||||
} | } | ||||
static Map<Object, Integer> parseHotItems(List<ParamFlowItem> items) { | |||||
Map<Object, Integer> itemMap = new HashMap<Object, Integer>(); | |||||
if (items == null || items.isEmpty()) { | |||||
return itemMap; | |||||
} | |||||
for (ParamFlowItem item : items) { | |||||
// Value should not be null. | |||||
Object value; | |||||
try { | |||||
value = parseValue(item.getObject(), item.getClassType()); | |||||
} catch (Exception ex) { | |||||
RecordLog.warn("[ParamFlowRuleManager] Failed to parse value for item: " + item, ex); | |||||
continue; | |||||
} | |||||
if (item.getCount() == null || item.getCount() < 0 || value == null) { | |||||
RecordLog.warn("[ParamFlowRuleManager] Ignoring invalid exclusion parameter item: " + item); | |||||
continue; | |||||
} | |||||
itemMap.put(value, item.getCount()); | |||||
} | |||||
return itemMap; | |||||
} | |||||
static boolean isValidRule(ParamFlowRule rule) { | |||||
return rule != null && !StringUtil.isBlank(rule.getResource()) && rule.getCount() >= 0 | |||||
&& rule.getParamIdx() != null && rule.getParamIdx() >= 0; | |||||
} | |||||
private ParamFlowRuleManager() {} | private ParamFlowRuleManager() {} | ||||
} | } | ||||
@@ -0,0 +1,114 @@ | |||||
/* | |||||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||||
* | |||||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||||
* you may not use this file except in compliance with the License. | |||||
* You may obtain a copy of the License at | |||||
* | |||||
* http://www.apache.org/licenses/LICENSE-2.0 | |||||
* | |||||
* Unless required by applicable law or agreed to in writing, software | |||||
* distributed under the License is distributed on an "AS IS" BASIS, | |||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
* See the License for the specific language governing permissions and | |||||
* limitations under the License. | |||||
*/ | |||||
package com.alibaba.csp.sentinel.slots.block.flow.param; | |||||
import java.util.ArrayList; | |||||
import java.util.HashMap; | |||||
import java.util.List; | |||||
import java.util.Map; | |||||
import com.alibaba.csp.sentinel.log.RecordLog; | |||||
import com.alibaba.csp.sentinel.util.StringUtil; | |||||
/** | |||||
* @author Eric Zhao | |||||
*/ | |||||
public final class ParamFlowRuleUtil { | |||||
public static boolean isValidRule(ParamFlowRule rule) { | |||||
return rule != null && !StringUtil.isBlank(rule.getResource()) && rule.getCount() >= 0 | |||||
&& rule.getParamIdx() != null && rule.getParamIdx() >= 0 && checkCluster(rule); | |||||
} | |||||
private static boolean checkCluster(/*@PreChecked*/ ParamFlowRule rule) { | |||||
if (!rule.isClusterMode()) { | |||||
return true; | |||||
} | |||||
ParamFlowClusterConfig clusterConfig = rule.getClusterConfig(); | |||||
return clusterConfig != null && validClusterRuleId(clusterConfig.getFlowId()); | |||||
} | |||||
public static boolean validClusterRuleId(Long id) { | |||||
return id != null && id > 0; | |||||
} | |||||
public static void fillExceptionFlowItems(ParamFlowRule rule) { | |||||
if (rule != null) { | |||||
if (rule.getParamFlowItemList() == null) { | |||||
rule.setParamFlowItemList(new ArrayList<ParamFlowItem>()); | |||||
} | |||||
Map<Object, Integer> itemMap = parseHotItems(rule.getParamFlowItemList()); | |||||
rule.setParsedHotItems(itemMap); | |||||
} | |||||
} | |||||
static Map<Object, Integer> parseHotItems(List<ParamFlowItem> items) { | |||||
Map<Object, Integer> itemMap = new HashMap<Object, Integer>(); | |||||
if (items == null || items.isEmpty()) { | |||||
return itemMap; | |||||
} | |||||
for (ParamFlowItem item : items) { | |||||
// Value should not be null. | |||||
Object value; | |||||
try { | |||||
value = parseItemValue(item.getObject(), item.getClassType()); | |||||
} catch (Exception ex) { | |||||
RecordLog.warn("[ParamFlowRuleUtil] Failed to parse value for item: " + item, ex); | |||||
continue; | |||||
} | |||||
if (item.getCount() == null || item.getCount() < 0 || value == null) { | |||||
RecordLog.warn("[ParamFlowRuleUtil] Ignoring invalid exclusion parameter item: " + item); | |||||
continue; | |||||
} | |||||
itemMap.put(value, item.getCount()); | |||||
} | |||||
return itemMap; | |||||
} | |||||
static Object parseItemValue(String value, String classType) { | |||||
if (value == null) { | |||||
throw new IllegalArgumentException("Null value"); | |||||
} | |||||
if (StringUtil.isBlank(classType)) { | |||||
// If the class type is not provided, then treat it as string. | |||||
return value; | |||||
} | |||||
// Handle primitive type. | |||||
if (int.class.toString().equals(classType) || Integer.class.getName().equals(classType)) { | |||||
return Integer.parseInt(value); | |||||
} else if (boolean.class.toString().equals(classType) || Boolean.class.getName().equals(classType)) { | |||||
return Boolean.parseBoolean(value); | |||||
} else if (long.class.toString().equals(classType) || Long.class.getName().equals(classType)) { | |||||
return Long.parseLong(value); | |||||
} else if (double.class.toString().equals(classType) || Double.class.getName().equals(classType)) { | |||||
return Double.parseDouble(value); | |||||
} else if (float.class.toString().equals(classType) || Float.class.getName().equals(classType)) { | |||||
return Float.parseFloat(value); | |||||
} else if (byte.class.toString().equals(classType) || Byte.class.getName().equals(classType)) { | |||||
return Byte.parseByte(value); | |||||
} else if (short.class.toString().equals(classType) || Short.class.getName().equals(classType)) { | |||||
return Short.parseShort(value); | |||||
} else if (char.class.toString().equals(classType)) { | |||||
char[] array = value.toCharArray(); | |||||
return array.length > 0 ? array[0] : null; | |||||
} | |||||
return value; | |||||
} | |||||
private ParamFlowRuleUtil() {} | |||||
} |
@@ -15,11 +15,9 @@ | |||||
*/ | */ | ||||
package com.alibaba.csp.sentinel.slots.block.flow.param; | package com.alibaba.csp.sentinel.slots.block.flow.param; | ||||
import java.util.ArrayList; | |||||
import java.util.Arrays; | import java.util.Arrays; | ||||
import java.util.Collections; | import java.util.Collections; | ||||
import java.util.List; | import java.util.List; | ||||
import java.util.Map; | |||||
import com.alibaba.csp.sentinel.EntryType; | import com.alibaba.csp.sentinel.EntryType; | ||||
import com.alibaba.csp.sentinel.slotchain.StringResourceWrapper; | import com.alibaba.csp.sentinel.slotchain.StringResourceWrapper; | ||||
@@ -109,84 +107,4 @@ public class ParamFlowRuleManagerTest { | |||||
assertTrue(allRules.contains(ruleC)); | assertTrue(allRules.contains(ruleC)); | ||||
assertTrue(allRules.contains(ruleD)); | assertTrue(allRules.contains(ruleD)); | ||||
} | } | ||||
@Test | |||||
public void testParseHotParamExceptionItemsFailure() { | |||||
String valueB = "Sentinel"; | |||||
Integer valueC = 6; | |||||
char valueD = 6; | |||||
float valueE = 11.11f; | |||||
// Null object will not be parsed. | |||||
ParamFlowItem itemA = new ParamFlowItem(null, 1, double.class.getName()); | |||||
// Hot item with empty class type will be treated as string. | |||||
ParamFlowItem itemB = new ParamFlowItem(valueB, 3, null); | |||||
ParamFlowItem itemE = new ParamFlowItem(String.valueOf(valueE), 3, ""); | |||||
// Bad count will not be parsed. | |||||
ParamFlowItem itemC = ParamFlowItem.newItem(valueC, -5); | |||||
ParamFlowItem itemD = new ParamFlowItem(String.valueOf(valueD), null, char.class.getName()); | |||||
List<ParamFlowItem> badItems = Arrays.asList(itemA, itemB, itemC, itemD, itemE); | |||||
Map<Object, Integer> parsedItems = ParamFlowRuleManager.parseHotItems(badItems); | |||||
// Value B and E will be parsed, but ignoring the type. | |||||
assertEquals(2, parsedItems.size()); | |||||
assertEquals(itemB.getCount(), parsedItems.get(valueB)); | |||||
assertFalse(parsedItems.containsKey(valueE)); | |||||
assertEquals(itemE.getCount(), parsedItems.get(String.valueOf(valueE))); | |||||
} | |||||
@Test | |||||
public void testParseHotParamExceptionItemsSuccess() { | |||||
// Test for empty list. | |||||
assertEquals(0, ParamFlowRuleManager.parseHotItems(null).size()); | |||||
assertEquals(0, ParamFlowRuleManager.parseHotItems(new ArrayList<ParamFlowItem>()).size()); | |||||
// Test for boxing objects and primitive types. | |||||
Double valueA = 1.1d; | |||||
String valueB = "Sentinel"; | |||||
Integer valueC = 6; | |||||
char valueD = 'c'; | |||||
ParamFlowItem itemA = ParamFlowItem.newItem(valueA, 1); | |||||
ParamFlowItem itemB = ParamFlowItem.newItem(valueB, 3); | |||||
ParamFlowItem itemC = ParamFlowItem.newItem(valueC, 5); | |||||
ParamFlowItem itemD = new ParamFlowItem().setObject(String.valueOf(valueD)) | |||||
.setClassType(char.class.getName()) | |||||
.setCount(7); | |||||
List<ParamFlowItem> items = Arrays.asList(itemA, itemB, itemC, itemD); | |||||
Map<Object, Integer> parsedItems = ParamFlowRuleManager.parseHotItems(items); | |||||
assertEquals(itemA.getCount(), parsedItems.get(valueA)); | |||||
assertEquals(itemB.getCount(), parsedItems.get(valueB)); | |||||
assertEquals(itemC.getCount(), parsedItems.get(valueC)); | |||||
assertEquals(itemD.getCount(), parsedItems.get(valueD)); | |||||
} | |||||
@Test | |||||
public void testCheckValidHotParamRule() { | |||||
// Null or empty resource; | |||||
ParamFlowRule rule1 = new ParamFlowRule(); | |||||
ParamFlowRule rule2 = new ParamFlowRule(""); | |||||
assertFalse(ParamFlowRuleManager.isValidRule(null)); | |||||
assertFalse(ParamFlowRuleManager.isValidRule(rule1)); | |||||
assertFalse(ParamFlowRuleManager.isValidRule(rule2)); | |||||
// Invalid threshold count. | |||||
ParamFlowRule rule3 = new ParamFlowRule("abc") | |||||
.setCount(-1) | |||||
.setParamIdx(1); | |||||
assertFalse(ParamFlowRuleManager.isValidRule(rule3)); | |||||
// Parameter index not set or invalid. | |||||
ParamFlowRule rule4 = new ParamFlowRule("abc") | |||||
.setCount(1); | |||||
ParamFlowRule rule5 = new ParamFlowRule("abc") | |||||
.setCount(1) | |||||
.setParamIdx(-1); | |||||
assertFalse(ParamFlowRuleManager.isValidRule(rule4)); | |||||
assertFalse(ParamFlowRuleManager.isValidRule(rule5)); | |||||
ParamFlowRule goodRule = new ParamFlowRule("abc") | |||||
.setCount(10) | |||||
.setParamIdx(1); | |||||
assertTrue(ParamFlowRuleManager.isValidRule(goodRule)); | |||||
} | |||||
} | } |
@@ -0,0 +1,95 @@ | |||||
package com.alibaba.csp.sentinel.slots.block.flow.param; | |||||
import java.util.ArrayList; | |||||
import java.util.Arrays; | |||||
import java.util.List; | |||||
import java.util.Map; | |||||
import org.junit.Test; | |||||
import static org.junit.Assert.*; | |||||
/** | |||||
* @author Eric Zhao | |||||
*/ | |||||
public class ParamFlowRuleUtilTest { | |||||
@Test | |||||
public void testCheckValidHotParamRule() { | |||||
// Null or empty resource; | |||||
ParamFlowRule rule1 = new ParamFlowRule(); | |||||
ParamFlowRule rule2 = new ParamFlowRule(""); | |||||
assertFalse(ParamFlowRuleUtil.isValidRule(null)); | |||||
assertFalse(ParamFlowRuleUtil.isValidRule(rule1)); | |||||
assertFalse(ParamFlowRuleUtil.isValidRule(rule2)); | |||||
// Invalid threshold count. | |||||
ParamFlowRule rule3 = new ParamFlowRule("abc") | |||||
.setCount(-1) | |||||
.setParamIdx(1); | |||||
assertFalse(ParamFlowRuleUtil.isValidRule(rule3)); | |||||
// Parameter index not set or invalid. | |||||
ParamFlowRule rule4 = new ParamFlowRule("abc") | |||||
.setCount(1); | |||||
ParamFlowRule rule5 = new ParamFlowRule("abc") | |||||
.setCount(1) | |||||
.setParamIdx(-1); | |||||
assertFalse(ParamFlowRuleUtil.isValidRule(rule4)); | |||||
assertFalse(ParamFlowRuleUtil.isValidRule(rule5)); | |||||
ParamFlowRule goodRule = new ParamFlowRule("abc") | |||||
.setCount(10) | |||||
.setParamIdx(1); | |||||
assertTrue(ParamFlowRuleUtil.isValidRule(goodRule)); | |||||
} | |||||
@Test | |||||
public void testParseHotParamExceptionItemsFailure() { | |||||
String valueB = "Sentinel"; | |||||
Integer valueC = 6; | |||||
char valueD = 6; | |||||
float valueE = 11.11f; | |||||
// Null object will not be parsed. | |||||
ParamFlowItem itemA = new ParamFlowItem(null, 1, double.class.getName()); | |||||
// Hot item with empty class type will be treated as string. | |||||
ParamFlowItem itemB = new ParamFlowItem(valueB, 3, null); | |||||
ParamFlowItem itemE = new ParamFlowItem(String.valueOf(valueE), 3, ""); | |||||
// Bad count will not be parsed. | |||||
ParamFlowItem itemC = ParamFlowItem.newItem(valueC, -5); | |||||
ParamFlowItem itemD = new ParamFlowItem(String.valueOf(valueD), null, char.class.getName()); | |||||
List<ParamFlowItem> badItems = Arrays.asList(itemA, itemB, itemC, itemD, itemE); | |||||
Map<Object, Integer> parsedItems = ParamFlowRuleUtil.parseHotItems(badItems); | |||||
// Value B and E will be parsed, but ignoring the type. | |||||
assertEquals(2, parsedItems.size()); | |||||
assertEquals(itemB.getCount(), parsedItems.get(valueB)); | |||||
assertFalse(parsedItems.containsKey(valueE)); | |||||
assertEquals(itemE.getCount(), parsedItems.get(String.valueOf(valueE))); | |||||
} | |||||
@Test | |||||
public void testParseHotParamExceptionItemsSuccess() { | |||||
// Test for empty list. | |||||
assertEquals(0, ParamFlowRuleUtil.parseHotItems(null).size()); | |||||
assertEquals(0, ParamFlowRuleUtil.parseHotItems(new ArrayList<ParamFlowItem>()).size()); | |||||
// Test for boxing objects and primitive types. | |||||
Double valueA = 1.1d; | |||||
String valueB = "Sentinel"; | |||||
Integer valueC = 6; | |||||
char valueD = 'c'; | |||||
ParamFlowItem itemA = ParamFlowItem.newItem(valueA, 1); | |||||
ParamFlowItem itemB = ParamFlowItem.newItem(valueB, 3); | |||||
ParamFlowItem itemC = ParamFlowItem.newItem(valueC, 5); | |||||
ParamFlowItem itemD = new ParamFlowItem().setObject(String.valueOf(valueD)) | |||||
.setClassType(char.class.getName()) | |||||
.setCount(7); | |||||
List<ParamFlowItem> items = Arrays.asList(itemA, itemB, itemC, itemD); | |||||
Map<Object, Integer> parsedItems = ParamFlowRuleUtil.parseHotItems(items); | |||||
assertEquals(itemA.getCount(), parsedItems.get(valueA)); | |||||
assertEquals(itemB.getCount(), parsedItems.get(valueB)); | |||||
assertEquals(itemC.getCount(), parsedItems.get(valueC)); | |||||
assertEquals(itemD.getCount(), parsedItems.get(valueD)); | |||||
} | |||||
} |