@@ -15,16 +15,17 @@ | |||||
*/ | */ | ||||
package com.alibaba.csp.sentinel.cluster.client.codec.data; | package com.alibaba.csp.sentinel.cluster.client.codec.data; | ||||
import java.util.Collection; | |||||
import com.alibaba.csp.sentinel.cluster.ClusterConstants; | import com.alibaba.csp.sentinel.cluster.ClusterConstants; | ||||
import com.alibaba.csp.sentinel.cluster.codec.EntityWriter; | import com.alibaba.csp.sentinel.cluster.codec.EntityWriter; | ||||
import com.alibaba.csp.sentinel.cluster.request.data.ParamFlowRequestData; | import com.alibaba.csp.sentinel.cluster.request.data.ParamFlowRequestData; | ||||
import com.alibaba.csp.sentinel.log.RecordLog; | import com.alibaba.csp.sentinel.log.RecordLog; | ||||
import com.alibaba.csp.sentinel.util.AssertUtil; | import com.alibaba.csp.sentinel.util.AssertUtil; | ||||
import io.netty.buffer.ByteBuf; | import io.netty.buffer.ByteBuf; | ||||
import java.util.ArrayList; | |||||
import java.util.Collection; | |||||
import java.util.List; | |||||
/** | /** | ||||
* @author jialiang.linjl | * @author jialiang.linjl | ||||
* @author Eric Zhao | * @author Eric Zhao | ||||
@@ -50,41 +51,67 @@ public class ParamFlowRequestDataWriter implements EntityWriter<ParamFlowRequest | |||||
Collection<Object> params = entity.getParams(); | Collection<Object> params = entity.getParams(); | ||||
// Write parameter amount. | |||||
int amount = calculateParamAmount(params); | |||||
target.writeInt(amount); | |||||
params = resolveValidParams(params); | |||||
target.writeInt(params.size()); | |||||
// Serialize parameters with type flag. | // Serialize parameters with type flag. | ||||
for (Object param : entity.getParams()) { | |||||
for (Object param : params) { | |||||
encodeValue(param, target); | encodeValue(param, target); | ||||
} | } | ||||
} | } | ||||
/** | |||||
* Get valid parameters in provided parameter list | |||||
* | |||||
* @param params | |||||
* @return | |||||
*/ | |||||
public List<Object> resolveValidParams(Collection<Object> params) { | |||||
List<Object> validParams = new ArrayList<>(); | |||||
int size = 0; | |||||
for (Object param : params) { | |||||
int s = calculateParamTransportSize(param); | |||||
if (s <= 0) { | |||||
RecordLog.warn("[ParamFlowRequestDataWriter] WARN: Non-primitive type detected in params of " | |||||
+ "cluster parameter flow control, which is not supported: " + param); | |||||
continue; | |||||
} | |||||
if (size + s > maxParamByteSize) { | |||||
RecordLog.warn("[ParamFlowRequestDataWriter] WARN: params size is too big." + | |||||
" the configure value is : " + maxParamByteSize + ", the params size is: " + params.size()); | |||||
break; | |||||
} | |||||
size += s; | |||||
validParams.add(param); | |||||
} | |||||
return validParams; | |||||
} | |||||
private void encodeValue(Object param, ByteBuf target) { | private void encodeValue(Object param, ByteBuf target) { | ||||
// Handle primitive type. | // Handle primitive type. | ||||
if (param instanceof Integer || int.class.isInstance(param)) { | if (param instanceof Integer || int.class.isInstance(param)) { | ||||
target.writeByte(ClusterConstants.PARAM_TYPE_INTEGER); | target.writeByte(ClusterConstants.PARAM_TYPE_INTEGER); | ||||
target.writeInt((Integer)param); | |||||
target.writeInt((Integer) param); | |||||
} else if (param instanceof String) { | } else if (param instanceof String) { | ||||
encodeString((String)param, target); | |||||
encodeString((String) param, target); | |||||
} else if (boolean.class.isInstance(param) || param instanceof Boolean) { | } else if (boolean.class.isInstance(param) || param instanceof Boolean) { | ||||
target.writeByte(ClusterConstants.PARAM_TYPE_BOOLEAN); | target.writeByte(ClusterConstants.PARAM_TYPE_BOOLEAN); | ||||
target.writeBoolean((Boolean)param); | |||||
target.writeBoolean((Boolean) param); | |||||
} else if (long.class.isInstance(param) || param instanceof Long) { | } else if (long.class.isInstance(param) || param instanceof Long) { | ||||
target.writeByte(ClusterConstants.PARAM_TYPE_LONG); | target.writeByte(ClusterConstants.PARAM_TYPE_LONG); | ||||
target.writeLong((Long)param); | |||||
target.writeLong((Long) param); | |||||
} else if (double.class.isInstance(param) || param instanceof Double) { | } else if (double.class.isInstance(param) || param instanceof Double) { | ||||
target.writeByte(ClusterConstants.PARAM_TYPE_DOUBLE); | target.writeByte(ClusterConstants.PARAM_TYPE_DOUBLE); | ||||
target.writeDouble((Double)param); | |||||
target.writeDouble((Double) param); | |||||
} else if (float.class.isInstance(param) || param instanceof Float) { | } else if (float.class.isInstance(param) || param instanceof Float) { | ||||
target.writeByte(ClusterConstants.PARAM_TYPE_FLOAT); | target.writeByte(ClusterConstants.PARAM_TYPE_FLOAT); | ||||
target.writeFloat((Float)param); | |||||
target.writeFloat((Float) param); | |||||
} else if (byte.class.isInstance(param) || param instanceof Byte) { | } else if (byte.class.isInstance(param) || param instanceof Byte) { | ||||
target.writeByte(ClusterConstants.PARAM_TYPE_BYTE); | target.writeByte(ClusterConstants.PARAM_TYPE_BYTE); | ||||
target.writeByte((Byte)param); | |||||
target.writeByte((Byte) param); | |||||
} else if (short.class.isInstance(param) || param instanceof Short) { | } else if (short.class.isInstance(param) || param instanceof Short) { | ||||
target.writeByte(ClusterConstants.PARAM_TYPE_SHORT); | target.writeByte(ClusterConstants.PARAM_TYPE_SHORT); | ||||
target.writeShort((Short)param); | |||||
target.writeShort((Short) param); | |||||
} else { | } else { | ||||
// Unexpected type, drop. | // Unexpected type, drop. | ||||
} | } | ||||
@@ -97,30 +124,6 @@ public class ParamFlowRequestDataWriter implements EntityWriter<ParamFlowRequest | |||||
target.writeBytes(tmpChars); | target.writeBytes(tmpChars); | ||||
} | } | ||||
/** | |||||
* Calculate amount of valid parameters in provided parameter list. | |||||
* | |||||
* @param params non-empty parameter list | |||||
* @return amount of valid parameters | |||||
*/ | |||||
int calculateParamAmount(/*@NonEmpty*/ Collection<Object> params) { | |||||
int size = 0; | |||||
int length = 0; | |||||
for (Object param : params) { | |||||
int s = calculateParamTransportSize(param); | |||||
if (s <= 0) { | |||||
RecordLog.warn("[ParamFlowRequestDataWriter] WARN: Non-primitive type detected in params of " | |||||
+ "cluster parameter flow control, which is not supported: " + param); | |||||
continue; | |||||
} | |||||
if (size + s > maxParamByteSize) { | |||||
break; | |||||
} | |||||
size += s; | |||||
length++; | |||||
} | |||||
return length; | |||||
} | |||||
int calculateParamTransportSize(Object value) { | int calculateParamTransportSize(Object value) { | ||||
if (value == null) { | if (value == null) { | ||||
@@ -132,7 +135,7 @@ public class ParamFlowRequestDataWriter implements EntityWriter<ParamFlowRequest | |||||
return 5; | return 5; | ||||
} else if (value instanceof String) { | } else if (value instanceof String) { | ||||
// Layout for string: |type flag(1)|length(4)|string content| | // Layout for string: |type flag(1)|length(4)|string content| | ||||
String tmpValue = (String)value; | |||||
String tmpValue = (String) value; | |||||
byte[] tmpChars = tmpValue.getBytes(); | byte[] tmpChars = tmpValue.getBytes(); | ||||
return 1 + 4 + tmpChars.length; | return 1 + 4 + tmpChars.length; | ||||
} else if (boolean.class.isInstance(value) || value instanceof Boolean) { | } else if (boolean.class.isInstance(value) || value instanceof Boolean) { | ||||
@@ -0,0 +1,49 @@ | |||||
/* | |||||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||||
* | |||||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||||
* you may not use this file except in compliance with the License. | |||||
* You may obtain a copy of the License at | |||||
* | |||||
* http://www.apache.org/licenses/LICENSE-2.0 | |||||
* | |||||
* Unless required by applicable law or agreed to in writing, software | |||||
* distributed under the License is distributed on an "AS IS" BASIS, | |||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
* See the License for the specific language governing permissions and | |||||
* limitations under the License. | |||||
*/ | |||||
package com.alibaba.csp.sentinel.cluster.client.config; | |||||
import com.alibaba.csp.sentinel.config.SentinelConfig; | |||||
import com.alibaba.csp.sentinel.log.RecordLog; | |||||
/** | |||||
* <p> | |||||
* this class dedicated to reading startup configurations of cluster client | |||||
* </p> | |||||
* | |||||
* @author lianglin | |||||
* @since 1.7.0 | |||||
*/ | |||||
public class ClusterClientStartUpConfig { | |||||
private static final String MAX_PARAM_BYTE_SIZE = "csp.sentinel.cluster.max.param.byte.size"; | |||||
/** | |||||
* Get the max bytes params can be serialized | |||||
* | |||||
* @return the max bytes, may be null | |||||
*/ | |||||
public static Integer getMaxParamByteSize() { | |||||
String maxParamByteSize = SentinelConfig.getConfig(MAX_PARAM_BYTE_SIZE); | |||||
try { | |||||
return maxParamByteSize == null ? null : Integer.valueOf(maxParamByteSize); | |||||
} catch (Exception ex) { | |||||
RecordLog.warn("[ClusterClientStartUpConfig] Failed to parse maxParamByteSize: " + maxParamByteSize); | |||||
return null; | |||||
} | |||||
} | |||||
} |
@@ -23,6 +23,7 @@ import com.alibaba.csp.sentinel.cluster.client.codec.data.PingRequestDataWriter; | |||||
import com.alibaba.csp.sentinel.cluster.client.codec.data.PingResponseDataDecoder; | import com.alibaba.csp.sentinel.cluster.client.codec.data.PingResponseDataDecoder; | ||||
import com.alibaba.csp.sentinel.cluster.client.codec.registry.RequestDataWriterRegistry; | import com.alibaba.csp.sentinel.cluster.client.codec.registry.RequestDataWriterRegistry; | ||||
import com.alibaba.csp.sentinel.cluster.client.codec.registry.ResponseDataDecodeRegistry; | import com.alibaba.csp.sentinel.cluster.client.codec.registry.ResponseDataDecodeRegistry; | ||||
import com.alibaba.csp.sentinel.cluster.client.config.ClusterClientStartUpConfig; | |||||
import com.alibaba.csp.sentinel.init.InitFunc; | import com.alibaba.csp.sentinel.init.InitFunc; | ||||
import com.alibaba.csp.sentinel.init.InitOrder; | import com.alibaba.csp.sentinel.init.InitOrder; | ||||
@@ -42,7 +43,12 @@ public class DefaultClusterClientInitFunc implements InitFunc { | |||||
private void initDefaultEntityWriters() { | private void initDefaultEntityWriters() { | ||||
RequestDataWriterRegistry.addWriter(ClientConstants.TYPE_PING, new PingRequestDataWriter()); | RequestDataWriterRegistry.addWriter(ClientConstants.TYPE_PING, new PingRequestDataWriter()); | ||||
RequestDataWriterRegistry.addWriter(ClientConstants.TYPE_FLOW, new FlowRequestDataWriter()); | RequestDataWriterRegistry.addWriter(ClientConstants.TYPE_FLOW, new FlowRequestDataWriter()); | ||||
RequestDataWriterRegistry.addWriter(ClientConstants.TYPE_PARAM_FLOW, new ParamFlowRequestDataWriter()); | |||||
Integer maxParamByteSize = ClusterClientStartUpConfig.getMaxParamByteSize(); | |||||
if (maxParamByteSize == null) { | |||||
RequestDataWriterRegistry.addWriter(ClientConstants.TYPE_PARAM_FLOW, new ParamFlowRequestDataWriter()); | |||||
} else { | |||||
RequestDataWriterRegistry.addWriter(ClientConstants.TYPE_PARAM_FLOW, new ParamFlowRequestDataWriter(maxParamByteSize)); | |||||
} | |||||
} | } | ||||
private void initDefaultEntityDecoders() { | private void initDefaultEntityDecoders() { | ||||
@@ -1,9 +1,10 @@ | |||||
package com.alibaba.csp.sentinel.cluster.client.codec.data; | package com.alibaba.csp.sentinel.cluster.client.codec.data; | ||||
import java.util.ArrayList; | |||||
import org.junit.Test; | import org.junit.Test; | ||||
import java.util.ArrayList; | |||||
import java.util.List; | |||||
import static org.junit.Assert.*; | import static org.junit.Assert.*; | ||||
/** | /** | ||||
@@ -27,35 +28,33 @@ public class ParamFlowRequestDataWriterTest { | |||||
} | } | ||||
@Test | @Test | ||||
public void testCalculateParamAmountExceedsMaxSize() { | |||||
final int maxSize = 10; | |||||
public void testResolveValidParams() { | |||||
final int maxSize = 15; | |||||
ParamFlowRequestDataWriter writer = new ParamFlowRequestDataWriter(maxSize); | ParamFlowRequestDataWriter writer = new ParamFlowRequestDataWriter(maxSize); | ||||
assertEquals(1, writer.calculateParamAmount(new ArrayList<Object>() {{ | |||||
ArrayList<Object> params = new ArrayList<Object>() {{ | |||||
add(1); | add(1); | ||||
}})); | |||||
assertEquals(2, writer.calculateParamAmount(new ArrayList<Object>() {{ | |||||
add(1); add(64); | |||||
}})); | |||||
assertEquals(2, writer.calculateParamAmount(new ArrayList<Object>() {{ | |||||
add(1); add(64); add(3); | |||||
}})); | |||||
} | |||||
add(64); | |||||
add(3); | |||||
}}; | |||||
@Test | |||||
public void testCalculateParamAmount() { | |||||
ParamFlowRequestDataWriter writer = new ParamFlowRequestDataWriter(); | |||||
assertEquals(6, writer.calculateParamAmount(new ArrayList<Object>() {{ | |||||
add(1); add(1d); add(1f); add((byte) 1); add("123"); add(true); | |||||
}})); | |||||
// POJO (non-primitive type) should not be regarded as a valid parameter. | |||||
assertEquals(0, writer.calculateParamAmount(new ArrayList<Object>() {{ | |||||
List<Object> validParams = writer.resolveValidParams(params); | |||||
assertTrue(validParams.contains(1) && validParams.contains(64) && validParams.contains(3)); | |||||
//when over maxSize, the exceed number should not be contained | |||||
params.add(5); | |||||
assertFalse(writer.resolveValidParams(params).contains(5)); | |||||
//POJO (non-primitive type) should not be regarded as a valid parameter | |||||
assertTrue(writer.resolveValidParams(new ArrayList<Object>() {{ | |||||
add(new SomePojo()); | add(new SomePojo()); | ||||
}})); | |||||
assertEquals(1, writer.calculateParamAmount(new ArrayList<Object>() {{ | |||||
add(new Object()); add(1); | |||||
}})); | |||||
}}).size() == 0); | |||||
} | } | ||||
private static class SomePojo { | private static class SomePojo { | ||||
private String param1; | private String param1; | ||||
@@ -15,6 +15,11 @@ | |||||
*/ | */ | ||||
package com.alibaba.csp.sentinel.config; | package com.alibaba.csp.sentinel.config; | ||||
import com.alibaba.csp.sentinel.log.LogBase; | |||||
import com.alibaba.csp.sentinel.log.RecordLog; | |||||
import com.alibaba.csp.sentinel.util.AppNameUtil; | |||||
import com.alibaba.csp.sentinel.util.AssertUtil; | |||||
import java.io.File; | import java.io.File; | ||||
import java.io.FileInputStream; | import java.io.FileInputStream; | ||||
import java.util.Map; | import java.util.Map; | ||||
@@ -22,11 +27,6 @@ import java.util.Properties; | |||||
import java.util.concurrent.ConcurrentHashMap; | import java.util.concurrent.ConcurrentHashMap; | ||||
import java.util.concurrent.CopyOnWriteArraySet; | import java.util.concurrent.CopyOnWriteArraySet; | ||||
import com.alibaba.csp.sentinel.log.LogBase; | |||||
import com.alibaba.csp.sentinel.log.RecordLog; | |||||
import com.alibaba.csp.sentinel.util.AppNameUtil; | |||||
import com.alibaba.csp.sentinel.util.AssertUtil; | |||||
/** | /** | ||||
* The universal local config center of Sentinel. The config is retrieved from command line arguments | * The universal local config center of Sentinel. The config is retrieved from command line arguments | ||||
* and {@code ${user.home}/logs/csp/${appName}.properties} file by default. | * and {@code ${user.home}/logs/csp/${appName}.properties} file by default. | ||||
@@ -53,6 +53,7 @@ public class SentinelConfig { | |||||
public static final String COLD_FACTOR = "csp.sentinel.flow.cold.factor"; | public static final String COLD_FACTOR = "csp.sentinel.flow.cold.factor"; | ||||
public static final String STATISTIC_MAX_RT = "csp.sentinel.statistic.max.rt"; | public static final String STATISTIC_MAX_RT = "csp.sentinel.statistic.max.rt"; | ||||
static final String DEFAULT_CHARSET = "UTF-8"; | static final String DEFAULT_CHARSET = "UTF-8"; | ||||
static final long DEFAULT_SINGLE_METRIC_FILE_SIZE = 1024 * 1024 * 50; | static final long DEFAULT_SINGLE_METRIC_FILE_SIZE = 1024 * 1024 * 50; | ||||
static final int DEFAULT_TOTAL_METRIC_FILE_COUNT = 6; | static final int DEFAULT_TOTAL_METRIC_FILE_COUNT = 6; | ||||