Browse Source

Add default Sentinel cluster transport implementation with Netty

- Add Netty transport client implementation and default cluster token client
- Add config manager for cluster client
- Add codec SPI mechanism for client entity decoder / writer

Signed-off-by: Eric Zhao <sczyh16@gmail.com>
master
Eric Zhao 6 years ago
parent
commit
6caa19ea95
20 changed files with 1156 additions and 0 deletions
  1. +1
    -0
      sentinel-cluster/sentinel-cluster-client-default/README.md
  2. +32
    -0
      sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/ClientConstants.java
  3. +105
    -0
      sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/DefaultClusterTokenClient.java
  4. +208
    -0
      sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/NettyTransportClient.java
  5. +62
    -0
      sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/ClientEntityCodecProvider.java
  6. +54
    -0
      sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/DefaultRequestEntityWriter.java
  7. +66
    -0
      sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/DefaultResponseEntityDecoder.java
  8. +39
    -0
      sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/data/FlowRequestDataWriter.java
  9. +39
    -0
      sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/data/FlowResponseDataDecoder.java
  10. +128
    -0
      sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/data/ParamFlowRequestDataWriter.java
  11. +44
    -0
      sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/netty/NettyRequestEncoder.java
  12. +54
    -0
      sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/netty/NettyResponseDecoder.java
  13. +47
    -0
      sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/registry/RequestDataWriterRegistry.java
  14. +48
    -0
      sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/registry/ResponseDataDecodeRegistry.java
  15. +65
    -0
      sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/config/ClusterClientConfig.java
  16. +30
    -0
      sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/config/ClusterClientConfigManager.java
  17. +72
    -0
      sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/handler/TokenClientHandler.java
  18. +60
    -0
      sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/handler/TokenClientPromiseHolder.java
  19. +1
    -0
      sentinel-cluster/sentinel-cluster-client-default/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.cluster.codec.request.RequestEntityWriter
  20. +1
    -0
      sentinel-cluster/sentinel-cluster-client-default/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.cluster.codec.response.ResponseEntityDecoder

+ 1
- 0
sentinel-cluster/sentinel-cluster-client-default/README.md View File

@@ -0,0 +1 @@
# Sentinel Cluster Client (Default)

+ 32
- 0
sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/ClientConstants.java View File

@@ -0,0 +1,32 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.client;

/**
* @author Eric Zhao
*/
public final class ClientConstants {

public static final int TYPE_PING = 0;
public static final int TYPE_FLOW = 1;
public static final int TYPE_PARAM_FLOW = 2;

public static final int CLIENT_STATUS_OFF = 0;
public static final int CLIENT_STATUS_PENDING = 1;
public static final int CLIENT_STATUS_STARTED = 2;

private ClientConstants() {}
}

+ 105
- 0
sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/DefaultClusterTokenClient.java View File

@@ -0,0 +1,105 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.client;

import java.util.Collection;

import com.alibaba.csp.sentinel.cluster.ClusterConstants;
import com.alibaba.csp.sentinel.cluster.ClusterTokenClient;
import com.alibaba.csp.sentinel.cluster.ClusterTransportClient;
import com.alibaba.csp.sentinel.cluster.TokenResult;
import com.alibaba.csp.sentinel.cluster.TokenResultStatus;
import com.alibaba.csp.sentinel.cluster.TokenServerDescriptor;
import com.alibaba.csp.sentinel.cluster.client.config.ClusterClientConfig;
import com.alibaba.csp.sentinel.cluster.log.ClusterStatLogUtil;
import com.alibaba.csp.sentinel.cluster.request.ClusterRequest;
import com.alibaba.csp.sentinel.cluster.request.data.FlowRequestData;
import com.alibaba.csp.sentinel.cluster.request.data.ParamFlowRequestData;
import com.alibaba.csp.sentinel.cluster.response.ClusterResponse;
import com.alibaba.csp.sentinel.cluster.response.data.FlowTokenResponseData;

/**
* @author Eric Zhao
* @since 1.4.0
*/
public class DefaultClusterTokenClient implements ClusterTokenClient {

private ClusterTransportClient transportClient;

public DefaultClusterTokenClient() {
// TODO: load and create transport client here.
}

public DefaultClusterTokenClient(ClusterTransportClient transportClient) {
this.transportClient = transportClient;
}

@Override
public TokenServerDescriptor currentServer() {
return new TokenServerDescriptor();
}

@Override
public TokenResult requestToken(Long flowId, int acquireCount, boolean prioritized) {
if (notValidRequest(flowId, acquireCount)) {
return badRequest();
}
FlowRequestData data = new FlowRequestData().setCount(acquireCount)
.setFlowId(flowId).setPriority(prioritized);
ClusterRequest<FlowRequestData> request = new ClusterRequest<>(ClusterConstants.MSG_TYPE_FLOW, data);
try {
return sendTokenRequest(request);
} catch (Exception ex) {
ClusterStatLogUtil.log(ex.getMessage());
return new TokenResult(TokenResultStatus.FAIL);
}
}

@Override
public TokenResult requestParamToken(Long flowId, int acquireCount, Collection<Object> params) {
if (notValidRequest(flowId, acquireCount) || params == null || params.isEmpty()) {
return badRequest();
}
ParamFlowRequestData data = new ParamFlowRequestData().setCount(acquireCount)
.setFlowId(flowId).setParams(params);
ClusterRequest<ParamFlowRequestData> request = new ClusterRequest<>(ClusterConstants.MSG_TYPE_PARAM_FLOW, data);
try {
return sendTokenRequest(request);
} catch (Exception ex) {
ClusterStatLogUtil.log(ex.getMessage());
return new TokenResult(TokenResultStatus.FAIL);
}
}

private boolean notValidRequest(Long id, int count) {
return id == null || id <= 0 || count <= 0;
}

private TokenResult badRequest() {
return new TokenResult(TokenResultStatus.BAD_REQUEST);
}

private TokenResult sendTokenRequest(ClusterRequest request) throws Exception {
ClusterResponse response = transportClient.sendRequest(request);
TokenResult result = new TokenResult(response.getStatus());
if (response.getData() != null) {
FlowTokenResponseData responseData = (FlowTokenResponseData)response.getData();
result.setRemaining(responseData.getRemainingCount())
.setWaitInMs(responseData.getWaitInMs());
}
return result;
}
}

+ 208
- 0
sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/NettyTransportClient.java View File

@@ -0,0 +1,208 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.client;

import java.util.AbstractMap.SimpleEntry;
import java.util.concurrent.atomic.AtomicInteger;

import com.alibaba.csp.sentinel.cluster.ClusterErrorMessages;
import com.alibaba.csp.sentinel.cluster.ClusterTransportClient;
import com.alibaba.csp.sentinel.cluster.client.codec.netty.NettyRequestEncoder;
import com.alibaba.csp.sentinel.cluster.client.codec.netty.NettyResponseDecoder;
import com.alibaba.csp.sentinel.cluster.client.config.ClusterClientConfig;
import com.alibaba.csp.sentinel.cluster.client.handler.TokenClientHandler;
import com.alibaba.csp.sentinel.cluster.client.handler.TokenClientPromiseHolder;
import com.alibaba.csp.sentinel.cluster.exception.SentinelClusterException;
import com.alibaba.csp.sentinel.cluster.request.ClusterRequest;
import com.alibaba.csp.sentinel.cluster.request.Request;
import com.alibaba.csp.sentinel.cluster.response.ClusterResponse;
import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.util.AssertUtil;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.util.concurrent.GenericFutureListener;

/**
* Netty transport client implementation for Sentinel cluster transport.
*
* @author Eric Zhao
* @since 1.4.0
*/
public class NettyTransportClient implements ClusterTransportClient {

private ClusterClientConfig clientConfig;
private String host;
private int port;

private Channel channel;
private NioEventLoopGroup eventLoopGroup;
private TokenClientHandler clientHandler;

private AtomicInteger idGenerator = new AtomicInteger(0);

private AtomicInteger failConnectedTime = new AtomicInteger(0);

public NettyTransportClient(ClusterClientConfig clientConfig, String host, int port) {
this.clientConfig = clientConfig;
this.host = host;
this.port = port;
}

private Bootstrap initClientBootstrap() {
Bootstrap b = new Bootstrap();
eventLoopGroup = new NioEventLoopGroup();
b.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_TIMEOUT, 20)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
clientHandler = new TokenClientHandler();
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));
pipeline.addLast(new NettyResponseDecoder());
pipeline.addLast(new LengthFieldPrepender(2));
pipeline.addLast(new NettyRequestEncoder());
pipeline.addLast(clientHandler);
}
});

return b;
}

private void connect(Bootstrap b) {
b.connect(host, port).addListener(new GenericFutureListener<ChannelFuture>() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.cause() != null) {
RecordLog.warn(
"[NettyTransportClient] Could not connect after " + failConnectedTime.get() + " times",
future.cause());
failConnectedTime.incrementAndGet();
channel = null;
} else {
failConnectedTime.set(0);
channel = future.channel();
RecordLog.info("[NettyTransportClient] Successfully connect to server " + host + ":" + port);
}
}
});
}

public void start() {
connect(initClientBootstrap());
}

public void stop() {
if (channel != null) {
channel.close();
channel = null;
}
if (eventLoopGroup != null) {
eventLoopGroup.shutdownGracefully();
}
failConnectedTime.set(0);

RecordLog.info("[NettyTransportClient] Token client stopped");
}

private boolean validRequest(Request request) {
return request != null && request.getType() >= 0;
}

public boolean isReady() {
return channel != null && clientHandler != null && clientHandler.hasStarted();
}

@Override
public ClusterResponse sendRequest(ClusterRequest request) throws Exception {
if (!isReady()) {
throw new SentinelClusterException(ClusterErrorMessages.CLIENT_NOT_READY);
}
if (!validRequest(request)) {
throw new SentinelClusterException(ClusterErrorMessages.BAD_REQUEST);
}
int xid = getCurrentId();
try {
request.setId(xid);

channel.writeAndFlush(request);

ChannelPromise promise = channel.newPromise();
TokenClientPromiseHolder.putPromise(xid, promise);

// TODO: timeout
if (!promise.await(20)) {
throw new SentinelClusterException(ClusterErrorMessages.REQUEST_TIME_OUT);
}

SimpleEntry<ChannelPromise, ClusterResponse> entry = TokenClientPromiseHolder.getEntry(xid);
if (entry == null || entry.getValue() == null) {
// Should not go through here.
throw new SentinelClusterException(ClusterErrorMessages.UNEXPECTED_STATUS);
}
return entry.getValue();
} finally {
TokenClientPromiseHolder.remove(xid);
}
}

private int getCurrentId() {
if (idGenerator.get() > MAX_ID) {
idGenerator.set(0);
}
return idGenerator.incrementAndGet();
}

/*public CompletableFuture<ClusterResponse> sendRequestAsync(ClusterRequest request) {
// Uncomment this when min target JDK is 1.8.
if (!validRequest(request)) {
return CompletableFuture.failedFuture(new IllegalArgumentException("Bad request"));
}
int xid = getCurrentId();
request.setId(xid);

CompletableFuture<ClusterResponse> future = new CompletableFuture<>();
channel.writeAndFlush(request)
.addListener(f -> {
if (f.isSuccess()) {
future.complete(someResult);
} else if (f.cause() != null) {
future.completeExceptionally(f.cause());
} else {
future.cancel(false);
}
});
return future;
}*/

private static final int MAX_ID = 999_999_999;
}

+ 62
- 0
sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/ClientEntityCodecProvider.java View File

@@ -0,0 +1,62 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.client.codec;

import com.alibaba.csp.sentinel.util.SpiLoader;
import com.alibaba.csp.sentinel.cluster.codec.request.RequestEntityWriter;
import com.alibaba.csp.sentinel.cluster.codec.response.ResponseEntityDecoder;
import com.alibaba.csp.sentinel.log.RecordLog;

/**
* @author Eric Zhao
* @since 1.4.0
*/
public final class ClientEntityCodecProvider {

private static RequestEntityWriter requestEntityWriter = null;
private static ResponseEntityDecoder responseEntityDecoder = null;

static {
resolveInstance();
}

private static void resolveInstance() {
RequestEntityWriter writer = SpiLoader.loadFirstInstance(RequestEntityWriter.class);
if (writer == null) {
RecordLog.warn("[ClientEntityCodecProvider] No existing request entity writer, resolve failed");
} else {
requestEntityWriter = writer;
RecordLog.info("[ClientEntityCodecProvider] Request entity writer resolved: " + requestEntityWriter.getClass().getCanonicalName());
}
ResponseEntityDecoder decoder = SpiLoader.loadFirstInstance(ResponseEntityDecoder.class);
if (decoder == null) {
RecordLog.warn("[ClientEntityCodecProvider] No existing response entity decoder, resolve failed");
} else {
responseEntityDecoder = decoder;
RecordLog.info("[ClientEntityCodecProvider] Response entity decoder resolved: " + responseEntityDecoder.getClass().getCanonicalName());
}
}

public static RequestEntityWriter getRequestEntityWriter() {
return requestEntityWriter;
}

public static ResponseEntityDecoder getResponseEntityDecoder() {
return responseEntityDecoder;
}

private ClientEntityCodecProvider() {}
}

+ 54
- 0
sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/DefaultRequestEntityWriter.java View File

@@ -0,0 +1,54 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.client.codec;

import com.alibaba.csp.sentinel.cluster.client.codec.registry.RequestDataWriterRegistry;
import com.alibaba.csp.sentinel.cluster.codec.EntityWriter;
import com.alibaba.csp.sentinel.cluster.codec.request.RequestEntityWriter;
import com.alibaba.csp.sentinel.cluster.request.ClusterRequest;
import com.alibaba.csp.sentinel.cluster.request.Request;
import com.alibaba.csp.sentinel.log.RecordLog;

import io.netty.buffer.ByteBuf;

/**
* @author Eric Zhao
* @since 1.4.0
*/
public class DefaultRequestEntityWriter implements RequestEntityWriter<ClusterRequest, ByteBuf> {

@Override
public void writeTo(ClusterRequest request, ByteBuf target) {
int type = request.getType();
EntityWriter<Object, ByteBuf> requestDataWriter = RequestDataWriterRegistry.getWriter(type);

if (requestDataWriter == null) {
// TODO: may need to throw exception?
RecordLog.warn(
"[NettyRequestEncoder] Cannot find matching request writer for type <{0}>, dropping the request", type);
return;
}
// Write head part of request.
writeHead(request, target);
// Write data part.
requestDataWriter.writeTo(request.getData(), target);
}

private void writeHead(Request request, ByteBuf out) {
out.writeInt(request.getId());
out.writeByte(request.getType());
}
}

+ 66
- 0
sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/DefaultResponseEntityDecoder.java View File

@@ -0,0 +1,66 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.client.codec;

import com.alibaba.csp.sentinel.cluster.client.codec.registry.ResponseDataDecodeRegistry;
import com.alibaba.csp.sentinel.cluster.codec.EntityDecoder;
import com.alibaba.csp.sentinel.cluster.codec.response.ResponseEntityDecoder;
import com.alibaba.csp.sentinel.cluster.response.ClusterResponse;
import com.alibaba.csp.sentinel.log.RecordLog;

import io.netty.buffer.ByteBuf;

/**
* <p>Default entity decoder for any {@link ClusterResponse} entity.</p>
*
* <p>Decode format:</p>
* <pre>
* +--------+---------+-----------+---------+
* | xid(4) | type(1) | status(1) | data... |
* +--------+---------+-----------+---------+
* </pre>
*
* @author Eric Zhao
* @since 1.4.0
*/
public class DefaultResponseEntityDecoder implements ResponseEntityDecoder<ByteBuf, ClusterResponse> {

@Override
public ClusterResponse decode(ByteBuf source) {
if (source.readableBytes() >= 6) {
int xid = source.readInt();
int type = source.readByte();
int status = source.readByte();

EntityDecoder<ByteBuf, ?> decoder = ResponseDataDecodeRegistry.getDecoder(type);
if (decoder == null) {
RecordLog.warn("Unknown type of response data decoder: {0}", type);
return null;
}

Object data;
if (source.readableBytes() == 0) {
data = null;
} else {
// TODO: handle decode error here.
data = decoder.decode(source);
}

return new ClusterResponse<>(xid, type, status, data);
}
return null;
}
}

+ 39
- 0
sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/data/FlowRequestDataWriter.java View File

@@ -0,0 +1,39 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.client.codec.data;

import com.alibaba.csp.sentinel.cluster.codec.EntityWriter;
import com.alibaba.csp.sentinel.cluster.request.data.FlowRequestData;

import io.netty.buffer.ByteBuf;

/**
* +-------------------+--------------+----------------+---------------+------------------+
* | RequestID(4 byte) | Type(1 byte) | FlowID(4 byte) | Count(4 byte) | PriorityFlag (1) |
* +-------------------+--------------+----------------+---------------+------------------+
*
* @author Eric Zhao
* @since 1.4.0
*/
public class FlowRequestDataWriter implements EntityWriter<FlowRequestData, ByteBuf> {

@Override
public void writeTo(FlowRequestData entity, ByteBuf target) {
target.writeLong(entity.getFlowId());
target.writeInt(entity.getCount());
target.writeBoolean(entity.isPriority());
}
}

+ 39
- 0
sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/data/FlowResponseDataDecoder.java View File

@@ -0,0 +1,39 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.client.codec.data;

import com.alibaba.csp.sentinel.cluster.codec.EntityDecoder;
import com.alibaba.csp.sentinel.cluster.response.data.FlowTokenResponseData;

import io.netty.buffer.ByteBuf;

/**
* @author Eric Zhao
* @since 1.4.0
*/
public class FlowResponseDataDecoder implements EntityDecoder<ByteBuf, FlowTokenResponseData> {

@Override
public FlowTokenResponseData decode(ByteBuf source) {
FlowTokenResponseData data = new FlowTokenResponseData();

if (source.readableBytes() == 8) {
data.setRemainingCount(source.readInt());
data.setWaitInMs(source.readInt());
}
return data;
}
}

+ 128
- 0
sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/data/ParamFlowRequestDataWriter.java View File

@@ -0,0 +1,128 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.client.codec.data;

import java.util.Collection;

import com.alibaba.csp.sentinel.cluster.ClusterConstants;
import com.alibaba.csp.sentinel.cluster.codec.EntityWriter;
import com.alibaba.csp.sentinel.cluster.request.data.ParamFlowRequestData;

import io.netty.buffer.ByteBuf;

/**
* @author Eric Zhao
* @since 1.4.0
*/
public class ParamFlowRequestDataWriter implements EntityWriter<ParamFlowRequestData, ByteBuf> {

@Override
public void writeTo(ParamFlowRequestData entity, ByteBuf target) {
target.writeLong(entity.getFlowId());
target.writeInt(entity.getCount());

Collection<Object> params = entity.getParams();

// Write parameter amount.
int amount = calculateParamAmount(params);
target.writeInt(amount);

// Serialize parameters with type flag.
for (Object param : entity.getParams()) {
encodeValue(param, target);
}
}

private void encodeValue(Object param, ByteBuf target) {
// Handle primitive type.
if (param instanceof Integer || int.class.isInstance(param)) {
target.writeByte(ClusterConstants.PARAM_TYPE_INTEGER);
target.writeInt((Integer)param);
} else if (param instanceof String) {
encodeString((String)param, target);
} else if (boolean.class.isInstance(param) || param instanceof Boolean) {
target.writeByte(ClusterConstants.PARAM_TYPE_BOOLEAN);
target.writeBoolean((Boolean)param);
} else if (long.class.isInstance(param) || param instanceof Long) {
target.writeByte(ClusterConstants.PARAM_TYPE_LONG);
target.writeLong((Long)param);
} else if (double.class.isInstance(param) || param instanceof Double) {
target.writeByte(ClusterConstants.PARAM_TYPE_DOUBLE);
target.writeDouble((Double)param);
} else if (float.class.isInstance(param) || param instanceof Float) {
target.writeByte(ClusterConstants.PARAM_TYPE_FLOAT);
target.writeFloat((Float)param);
} else if (byte.class.isInstance(param) || param instanceof Byte) {
target.writeByte(ClusterConstants.PARAM_TYPE_BYTE);
target.writeByte((Byte)param);
} else if (short.class.isInstance(param) || param instanceof Short) {
target.writeByte(ClusterConstants.PARAM_TYPE_SHORT);
target.writeShort((Short)param);
} else {
// Unexpected type, drop.
}
}

private void encodeString(String param, ByteBuf target) {
target.writeByte(ClusterConstants.PARAM_TYPE_STRING);
byte[] tmpChars = param.getBytes();
target.writeInt(tmpChars.length);
target.writeBytes(tmpChars);
}

private int calculateParamAmount(/*@NonEmpty*/ Collection<Object> params) {
int size = 0;
int length = 0;
for (Object param : params) {
int s = calculateParamTransportSize(param);
if (size + s > PARAM_MAX_SIZE) {
break;
}
length++;
}
return length;
}

private int calculateParamTransportSize(Object value) {
// Layout for primitives: |type flag(1)|value|
// size = original size + type flag (1)
if (value instanceof Integer || int.class.isInstance(value)) {
return 5;
} else if (value instanceof String) {
// Layout for string: |type flag(1)|length(4)|string content|
String tmpValue = (String)value;
byte[] tmpChars = tmpValue.getBytes();
return 1 + 4 + tmpChars.length;
} else if (boolean.class.isInstance(value) || value instanceof Boolean) {
return 2;
} else if (long.class.isInstance(value) || value instanceof Long) {
return 9;
} else if (double.class.isInstance(value) || value instanceof Double) {
return 9;
} else if (float.class.isInstance(value) || value instanceof Float) {
return 5;
} else if (byte.class.isInstance(value) || value instanceof Byte) {
return 2;
} else if (short.class.isInstance(value) || value instanceof Short) {
return 3;
} else {
// Ignore unexpected type.
return 0;
}
}

private static final int PARAM_MAX_SIZE = 1000;
}

+ 44
- 0
sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/netty/NettyRequestEncoder.java View File

@@ -0,0 +1,44 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.client.codec.netty;

import com.alibaba.csp.sentinel.cluster.client.codec.ClientEntityCodecProvider;
import com.alibaba.csp.sentinel.cluster.codec.request.RequestEntityWriter;
import com.alibaba.csp.sentinel.cluster.request.ClusterRequest;
import com.alibaba.csp.sentinel.cluster.request.Request;
import com.alibaba.csp.sentinel.log.RecordLog;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

/**
* @author Eric Zhao
*/
public class NettyRequestEncoder extends MessageToByteEncoder<ClusterRequest> {

@Override
protected void encode(ChannelHandlerContext ctx, ClusterRequest request, ByteBuf out) throws Exception {
RequestEntityWriter<Request, ByteBuf> requestEntityWriter = ClientEntityCodecProvider.getRequestEntityWriter();
if (requestEntityWriter == null) {
// TODO: may need to throw exception?
RecordLog.warn("[NettyRequestEncoder] Cannot resolve the global request entity writer, dropping the request");
return;
}

requestEntityWriter.writeTo(request, out);
}
}

+ 54
- 0
sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/netty/NettyResponseDecoder.java View File

@@ -0,0 +1,54 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.client.codec.netty;

import java.util.List;

import com.alibaba.csp.sentinel.cluster.client.codec.ClientEntityCodecProvider;
import com.alibaba.csp.sentinel.cluster.client.codec.registry.ResponseDataDecodeRegistry;
import com.alibaba.csp.sentinel.cluster.codec.EntityDecoder;
import com.alibaba.csp.sentinel.cluster.codec.response.ResponseEntityDecoder;
import com.alibaba.csp.sentinel.cluster.response.ClusterResponse;
import com.alibaba.csp.sentinel.cluster.response.Response;
import com.alibaba.csp.sentinel.log.RecordLog;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

/**
* @author Eric Zhao
* @since 1.4.0
*/
public class NettyResponseDecoder extends ByteToMessageDecoder {

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
ResponseEntityDecoder<ByteBuf, Response> responseDecoder = ClientEntityCodecProvider.getResponseEntityDecoder();
if (responseDecoder == null) {
// TODO: may need to throw exception?
RecordLog.warn("[NettyResponseDecoder] Cannot resolve the global response entity decoder, "
+ "dropping the response");
return;
}

// TODO: handle decode error here.
Response response = responseDecoder.decode(in);
if (response != null) {
out.add(response);
}
}
}

+ 47
- 0
sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/registry/RequestDataWriterRegistry.java View File

@@ -0,0 +1,47 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.client.codec.registry;

import java.util.HashMap;
import java.util.Map;

import com.alibaba.csp.sentinel.cluster.codec.EntityWriter;

import io.netty.buffer.ByteBuf;

/**
* @author Eric Zhao
*/
public final class RequestDataWriterRegistry {

private static final Map<Integer, EntityWriter<Object, ByteBuf>> WRITER_MAP = new HashMap<>();

public static <T> boolean addWriter(int type, EntityWriter<T, ByteBuf> writer) {
if (WRITER_MAP.containsKey(type)) {
return false;
}
WRITER_MAP.put(type, (EntityWriter<Object, ByteBuf>)writer);
return true;
}

public static EntityWriter<Object, ByteBuf> getWriter(int type) {
return WRITER_MAP.get(type);
}

public static boolean remove(int type) {
return WRITER_MAP.remove(type) != null;
}
}

+ 48
- 0
sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/registry/ResponseDataDecodeRegistry.java View File

@@ -0,0 +1,48 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.client.codec.registry;

import java.util.HashMap;
import java.util.Map;

import com.alibaba.csp.sentinel.cluster.codec.EntityDecoder;

import io.netty.buffer.ByteBuf;

/**
* @author Eric Zhao
* @since 1.4.0
*/
public final class ResponseDataDecodeRegistry {

private static final Map<Integer, EntityDecoder<ByteBuf, ?>> DECODER_MAP = new HashMap<>();

public static boolean addDecoder(int type, EntityDecoder<ByteBuf, ?> decoder) {
if (DECODER_MAP.containsKey(type)) {
return false;
}
DECODER_MAP.put(type, decoder);
return true;
}

public static EntityDecoder<ByteBuf, Object> getDecoder(int type) {
return (EntityDecoder<ByteBuf, Object>)DECODER_MAP.get(type);
}

public static boolean removeDecoder(int type) {
return DECODER_MAP.remove(type) != null;
}
}

+ 65
- 0
sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/config/ClusterClientConfig.java View File

@@ -0,0 +1,65 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.client.config;

/**
* @author Eric Zhao
* @since 1.4.0
*/
public class ClusterClientConfig {

private String serverHost;
private int serverPort;

private int requestTimeout;
private int connectTimeout;

public String getServerHost() {
return serverHost;
}

public ClusterClientConfig setServerHost(String serverHost) {
this.serverHost = serverHost;
return this;
}

public int getServerPort() {
return serverPort;
}

public ClusterClientConfig setServerPort(int serverPort) {
this.serverPort = serverPort;
return this;
}

public int getRequestTimeout() {
return requestTimeout;
}

public ClusterClientConfig setRequestTimeout(int requestTimeout) {
this.requestTimeout = requestTimeout;
return this;
}

public int getConnectTimeout() {
return connectTimeout;
}

public ClusterClientConfig setConnectTimeout(int connectTimeout) {
this.connectTimeout = connectTimeout;
return this;
}
}

+ 30
- 0
sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/config/ClusterClientConfigManager.java View File

@@ -0,0 +1,30 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.client.config;

import com.alibaba.csp.sentinel.cluster.ClusterConstants;

/**
* @author Eric Zhao
*/
public final class ClusterClientConfigManager {

private static volatile String serverIp;
private static volatile int serverPort = ClusterConstants.DEFAULT_CLUSTER_SERVER_PORT;
private static volatile int requestTimeout = ClusterConstants.DEFAULT_REQUEST_TIMEOUT;

private ClusterClientConfigManager() {}
}

+ 72
- 0
sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/handler/TokenClientHandler.java View File

@@ -0,0 +1,72 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.client.handler;

import java.util.concurrent.atomic.AtomicInteger;

import com.alibaba.csp.sentinel.cluster.client.ClientConstants;
import com.alibaba.csp.sentinel.cluster.response.ClusterResponse;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
* @author Eric Zhao
* @since 1.4.0
*/
public class TokenClientHandler extends ChannelInboundHandlerAdapter {

private final AtomicInteger currentState = new AtomicInteger(ClientConstants.CLIENT_STATUS_OFF);

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
currentState.set(ClientConstants.CLIENT_STATUS_STARTED);
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(String.format("[%s] Client message recv: %s", System.currentTimeMillis(), msg));
if (msg instanceof ClusterResponse) {
ClusterResponse<?> response = (ClusterResponse) msg;

TokenClientPromiseHolder.completePromise(response.getId(), response);
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
cause.printStackTrace();
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
}

@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
currentState.set(ClientConstants.CLIENT_STATUS_OFF);
}

public int getCurrentState() {
return currentState.get();
}

public boolean hasStarted() {
return getCurrentState() == ClientConstants.CLIENT_STATUS_STARTED;
}
}

+ 60
- 0
sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/handler/TokenClientPromiseHolder.java View File

@@ -0,0 +1,60 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel.cluster.client.handler;

import java.util.AbstractMap.SimpleEntry;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import com.alibaba.csp.sentinel.cluster.response.ClusterResponse;

import io.netty.channel.ChannelPromise;

/**
* @author Eric Zhao
* @since 1.4.0
*/
public final class TokenClientPromiseHolder {

private static final Map<Integer, SimpleEntry<ChannelPromise, ClusterResponse>> PROMISE_MAP = new ConcurrentHashMap<>();

public static void putPromise(int xid, ChannelPromise promise) {
PROMISE_MAP.put(xid, new SimpleEntry<ChannelPromise, ClusterResponse>(promise, null));
}

public static SimpleEntry<ChannelPromise, ClusterResponse> getEntry(int xid) {
return PROMISE_MAP.get(xid);
}

public static void remove(int xid) {
PROMISE_MAP.remove(xid);
}

public static <T> boolean completePromise(int xid, ClusterResponse<T> response) {
if (!PROMISE_MAP.containsKey(xid)) {
return false;
}
ChannelPromise promise = PROMISE_MAP.get(xid).getKey();
if (promise.isDone() || promise.isCancelled()) {
return false;
}
PROMISE_MAP.get(xid).setValue(response);
promise.setSuccess();
return true;
}

private TokenClientPromiseHolder() {}
}

+ 1
- 0
sentinel-cluster/sentinel-cluster-client-default/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.cluster.codec.request.RequestEntityWriter View File

@@ -0,0 +1 @@
com.alibaba.csp.sentinel.cluster.client.codec.DefaultRequestEntityWriter

+ 1
- 0
sentinel-cluster/sentinel-cluster-client-default/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.cluster.codec.response.ResponseEntityDecoder View File

@@ -0,0 +1 @@
com.alibaba.csp.sentinel.cluster.client.codec.DefaultResponseEntityDecoder

Loading…
Cancel
Save