From 138c265a34e411a88c42750ec6845246a07a4e1a Mon Sep 17 00:00:00 2001 From: Eric Zhao Date: Sun, 9 Dec 2018 21:53:42 +0800 Subject: [PATCH] Polish cluster client module - Initial work Signed-off-by: Eric Zhao --- .../cluster/client/ClientConstants.java | 1 + .../client/DefaultClusterTokenClient.java | 55 +++++++++++---- .../cluster/client/NettyTransportClient.java | 70 ++++++++++++++----- .../codec/DefaultRequestEntityWriter.java | 1 - .../codec/DefaultResponseEntityDecoder.java | 1 - .../codec/data/FlowRequestDataWriter.java | 2 +- .../data/ParamFlowRequestDataWriter.java | 1 + .../codec/data/PingRequestDataWriter.java | 38 ++++++++++ .../codec/data/PingResponseDataDecoder.java | 35 ++++++++++ .../codec/netty/NettyRequestEncoder.java | 1 - .../codec/netty/NettyResponseDecoder.java | 1 - .../registry/RequestDataWriterRegistry.java | 1 + .../client/handler/TokenClientHandler.java | 46 ++++++++++-- .../handler/TokenClientPromiseHolder.java | 16 +++-- .../init/DefaultClusterClientInitFunc.java | 53 ++++++++++++++ ...entinel.cluster.client.ClusterTokenClient} | 0 .../com.alibaba.csp.sentinel.init.InitFunc | 1 + 17 files changed, 274 insertions(+), 49 deletions(-) create mode 100644 sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/data/PingRequestDataWriter.java create mode 100644 sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/data/PingResponseDataDecoder.java create mode 100644 sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/init/DefaultClusterClientInitFunc.java rename sentinel-cluster/sentinel-cluster-client-default/src/main/resources/META-INF/services/{com.alibaba.csp.sentinel.cluster.ClusterTokenClient => com.alibaba.csp.sentinel.cluster.client.ClusterTokenClient} (100%) create mode 100755 sentinel-cluster/sentinel-cluster-client-default/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.init.InitFunc diff --git a/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/ClientConstants.java b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/ClientConstants.java index 9645feb5..41b1dd01 100644 --- a/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/ClientConstants.java +++ b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/ClientConstants.java @@ -17,6 +17,7 @@ package com.alibaba.csp.sentinel.cluster.client; /** * @author Eric Zhao + * @since 1.4.0 */ public final class ClientConstants { diff --git a/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/DefaultClusterTokenClient.java b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/DefaultClusterTokenClient.java index 1d7bc646..d2bc0045 100644 --- a/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/DefaultClusterTokenClient.java +++ b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/DefaultClusterTokenClient.java @@ -16,9 +16,9 @@ package com.alibaba.csp.sentinel.cluster.client; import java.util.Collection; +import java.util.concurrent.atomic.AtomicBoolean; import com.alibaba.csp.sentinel.cluster.ClusterConstants; -import com.alibaba.csp.sentinel.cluster.ClusterTokenClient; import com.alibaba.csp.sentinel.cluster.ClusterTransportClient; import com.alibaba.csp.sentinel.cluster.TokenResult; import com.alibaba.csp.sentinel.cluster.TokenResultStatus; @@ -26,7 +26,7 @@ import com.alibaba.csp.sentinel.cluster.TokenServerDescriptor; import com.alibaba.csp.sentinel.cluster.client.config.ClusterClientConfig; import com.alibaba.csp.sentinel.cluster.client.config.ClusterClientConfigManager; import com.alibaba.csp.sentinel.cluster.client.config.ServerChangeObserver; -import com.alibaba.csp.sentinel.cluster.log.ClusterStatLogUtil; +import com.alibaba.csp.sentinel.cluster.log.ClusterClientStatLogUtil; import com.alibaba.csp.sentinel.cluster.request.ClusterRequest; import com.alibaba.csp.sentinel.cluster.request.data.FlowRequestData; import com.alibaba.csp.sentinel.cluster.request.data.ParamFlowRequestData; @@ -46,6 +46,8 @@ public class DefaultClusterTokenClient implements ClusterTokenClient { private ClusterTransportClient transportClient; private TokenServerDescriptor serverDescriptor; + private final AtomicBoolean shouldStart = new AtomicBoolean(false); + public DefaultClusterTokenClient() { ClusterClientConfigManager.addServerChangeObserver(new ServerChangeObserver() { @Override @@ -53,14 +55,10 @@ public class DefaultClusterTokenClient implements ClusterTokenClient { changeServer(clusterClientConfig); } }); + // TODO: check here, who should start the client? initNewConnection(); } - public DefaultClusterTokenClient(ClusterTransportClient transportClient) { - // TODO: only for test, remove this constructor. - this.transportClient = transportClient; - } - private boolean serverEqual(TokenServerDescriptor descriptor, ClusterClientConfig config) { if (descriptor == null || config == null) { return false; @@ -81,9 +79,9 @@ public class DefaultClusterTokenClient implements ClusterTokenClient { try { this.transportClient = new NettyTransportClient(host, port); this.serverDescriptor = new TokenServerDescriptor(host, port); - transportClient.start(); + RecordLog.info("[DefaultClusterTokenClient] New client created: " + serverDescriptor); } catch (Exception ex) { - ex.printStackTrace(); + RecordLog.warn("[DefaultClusterTokenClient] Failed to initialize new token client", ex); } } @@ -93,17 +91,46 @@ public class DefaultClusterTokenClient implements ClusterTokenClient { } try { // TODO: what if the client is pending init? - if (transportClient != null && transportClient.isReady()) { + if (transportClient != null) { transportClient.stop(); } // Replace with new, even if the new client is not ready. this.transportClient = new NettyTransportClient(config); this.serverDescriptor = new TokenServerDescriptor(config.getServerHost(), config.getServerPort()); - transportClient.start(); + startClientIfScheduled(); RecordLog.info("[DefaultClusterTokenClient] New client created: " + serverDescriptor); } catch (Exception ex) { RecordLog.warn("[DefaultClusterTokenClient] Failed to change remote token server", ex); - ex.printStackTrace(); + } + } + + private void startClientIfScheduled() throws Exception { + if (shouldStart.get()) { + if (transportClient != null) { + transportClient.start(); + } + } + } + + private void stopClientIfStarted() throws Exception { + if (shouldStart.get()) { + if (transportClient != null) { + transportClient.stop(); + } + } + } + + @Override + public void start() throws Exception { + if (shouldStart.compareAndSet(false, true)) { + startClientIfScheduled(); + } + } + + @Override + public void stop() throws Exception { + if (shouldStart.compareAndSet(true, false)) { + stopClientIfStarted(); } } @@ -123,7 +150,7 @@ public class DefaultClusterTokenClient implements ClusterTokenClient { try { return sendTokenRequest(request); } catch (Exception ex) { - ClusterStatLogUtil.log(ex.getMessage()); + ClusterClientStatLogUtil.log(ex.getMessage()); return new TokenResult(TokenResultStatus.FAIL); } } @@ -139,7 +166,7 @@ public class DefaultClusterTokenClient implements ClusterTokenClient { try { return sendTokenRequest(request); } catch (Exception ex) { - ClusterStatLogUtil.log(ex.getMessage()); + ClusterClientStatLogUtil.log(ex.getMessage()); return new TokenResult(TokenResultStatus.FAIL); } } diff --git a/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/NettyTransportClient.java b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/NettyTransportClient.java index 6fd21379..e70df0e0 100644 --- a/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/NettyTransportClient.java +++ b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/NettyTransportClient.java @@ -16,6 +16,7 @@ package com.alibaba.csp.sentinel.cluster.client; import java.util.AbstractMap.SimpleEntry; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import com.alibaba.csp.sentinel.cluster.ClusterErrorMessages; @@ -56,6 +57,8 @@ import io.netty.util.concurrent.GenericFutureListener; */ public class NettyTransportClient implements ClusterTransportClient { + public static final int RECONNECT_DELAY_MS = 1000; + private final String host; private final int port; @@ -63,8 +66,9 @@ public class NettyTransportClient implements ClusterTransportClient { private NioEventLoopGroup eventLoopGroup; private TokenClientHandler clientHandler; - private AtomicInteger idGenerator = new AtomicInteger(0); - private AtomicInteger failConnectedTime = new AtomicInteger(0); + private final AtomicInteger idGenerator = new AtomicInteger(0); + private final AtomicInteger currentState = new AtomicInteger(ClientConstants.CLIENT_STATUS_OFF); + private final AtomicInteger failConnectedTime = new AtomicInteger(0); public NettyTransportClient(ClusterClientConfig clientConfig) { AssertUtil.notNull(clientConfig, "client config cannot be null"); @@ -91,7 +95,7 @@ public class NettyTransportClient implements ClusterTransportClient { .handler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { - clientHandler = new TokenClientHandler(); + clientHandler = new TokenClientHandler(currentState, disconnectCallback); ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2)); pipeline.addLast(new NettyResponseDecoder()); @@ -105,24 +109,44 @@ public class NettyTransportClient implements ClusterTransportClient { } private void connect(Bootstrap b) { - b.connect(host, port).addListener(new GenericFutureListener() { - @Override - public void operationComplete(ChannelFuture future) { - if (future.cause() != null) { - RecordLog.warn( - "[NettyTransportClient] Could not connect after " + failConnectedTime.get() + " times", - future.cause()); - failConnectedTime.incrementAndGet(); - channel = null; - } else { - failConnectedTime.set(0); - channel = future.channel(); - RecordLog.info("[NettyTransportClient] Successfully connect to server " + host + ":" + port); + if (currentState.compareAndSet(ClientConstants.CLIENT_STATUS_OFF, ClientConstants.CLIENT_STATUS_PENDING)) { + b.connect(host, port).addListener(new GenericFutureListener() { + @Override + public void operationComplete(ChannelFuture future) { + if (future.cause() != null) { + RecordLog.warn(String.format("[NettyTransportClient] Could not connect to <%s:%d> after %d times", + host, port, failConnectedTime.get()), future.cause()); + failConnectedTime.incrementAndGet(); + channel = null; + } else { + failConnectedTime.set(0); + channel = future.channel(); + RecordLog.info("[NettyTransportClient] Successfully connect to server <" + host + ":" + port + ">"); + } } - } - }); + }); + } } + private Runnable disconnectCallback = new Runnable() { + @Override + public void run() { + if (channel != null) { + channel.eventLoop().schedule(new Runnable() { + @Override + public void run() { + RecordLog.info("[NettyTransportClient] Reconnecting to server <" + host + ":" + port + ">"); + try { + start(); + } catch (Exception e) { + RecordLog.warn("[NettyTransportClient] Failed to reconnect to server", e); + } + } + }, RECONNECT_DELAY_MS * (failConnectedTime.get() + 1), TimeUnit.MILLISECONDS); + } + } + }; + @Override public void start() throws Exception { connect(initClientBootstrap()); @@ -130,6 +154,14 @@ public class NettyTransportClient implements ClusterTransportClient { @Override public void stop() throws Exception { + while (currentState.get() == ClientConstants.CLIENT_STATUS_PENDING) { + try { + Thread.sleep(500); + } catch (Exception ex) { + // Ignore. + } + } + if (channel != null) { channel.close(); channel = null; @@ -139,7 +171,7 @@ public class NettyTransportClient implements ClusterTransportClient { } failConnectedTime.set(0); - RecordLog.info("[NettyTransportClient] Token client stopped"); + RecordLog.info("[NettyTransportClient] Cluster transport client stopped"); } private boolean validRequest(Request request) { diff --git a/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/DefaultRequestEntityWriter.java b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/DefaultRequestEntityWriter.java index f3dad331..0ba9680f 100644 --- a/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/DefaultRequestEntityWriter.java +++ b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/DefaultRequestEntityWriter.java @@ -36,7 +36,6 @@ public class DefaultRequestEntityWriter implements RequestEntityWriter requestDataWriter = RequestDataWriterRegistry.getWriter(type); if (requestDataWriter == null) { - // TODO: may need to throw exception? RecordLog.warn("[DefaultRequestEntityWriter] Cannot find matching request writer for type <{0}>," + " dropping the request", type); return; diff --git a/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/DefaultResponseEntityDecoder.java b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/DefaultResponseEntityDecoder.java index 4c1ad34e..6cb3333c 100644 --- a/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/DefaultResponseEntityDecoder.java +++ b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/DefaultResponseEntityDecoder.java @@ -55,7 +55,6 @@ public class DefaultResponseEntityDecoder implements ResponseEntityDecoder { + + @Override + public void writeTo(String entity, ByteBuf target) { + if (StringUtil.isBlank(entity) || target == null) { + return; + } + byte[] bytes = entity.getBytes(); + target.writeInt(bytes.length); + target.writeBytes(bytes); + } +} diff --git a/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/data/PingResponseDataDecoder.java b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/data/PingResponseDataDecoder.java new file mode 100644 index 00000000..e12395f0 --- /dev/null +++ b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/data/PingResponseDataDecoder.java @@ -0,0 +1,35 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.cluster.client.codec.data; + +import com.alibaba.csp.sentinel.cluster.codec.EntityDecoder; + +import io.netty.buffer.ByteBuf; + +/** + * @author Eric Zhao + * @since 1.4.0 + */ +public class PingResponseDataDecoder implements EntityDecoder { + + @Override + public Integer decode(ByteBuf source) { + if (source.readableBytes() >= 1) { + return (int) source.readByte(); + } + return -1; + } +} diff --git a/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/netty/NettyRequestEncoder.java b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/netty/NettyRequestEncoder.java index 50a0476e..f6141a3d 100644 --- a/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/netty/NettyRequestEncoder.java +++ b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/netty/NettyRequestEncoder.java @@ -35,7 +35,6 @@ public class NettyRequestEncoder extends MessageToByteEncoder { protected void encode(ChannelHandlerContext ctx, ClusterRequest request, ByteBuf out) throws Exception { RequestEntityWriter requestEntityWriter = ClientEntityCodecProvider.getRequestEntityWriter(); if (requestEntityWriter == null) { - // TODO: may need to throw exception? RecordLog.warn("[NettyRequestEncoder] Cannot resolve the global request entity writer, dropping the request"); return; } diff --git a/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/netty/NettyResponseDecoder.java b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/netty/NettyResponseDecoder.java index 75c15a72..5f314619 100644 --- a/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/netty/NettyResponseDecoder.java +++ b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/netty/NettyResponseDecoder.java @@ -39,7 +39,6 @@ public class NettyResponseDecoder extends ByteToMessageDecoder { protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { ResponseEntityDecoder responseDecoder = ClientEntityCodecProvider.getResponseEntityDecoder(); if (responseDecoder == null) { - // TODO: may need to throw exception? RecordLog.warn("[NettyResponseDecoder] Cannot resolve the global response entity decoder, " + "dropping the response"); return; diff --git a/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/registry/RequestDataWriterRegistry.java b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/registry/RequestDataWriterRegistry.java index 4ad9305d..9c0a61b4 100644 --- a/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/registry/RequestDataWriterRegistry.java +++ b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/codec/registry/RequestDataWriterRegistry.java @@ -24,6 +24,7 @@ import io.netty.buffer.ByteBuf; /** * @author Eric Zhao + * @since 1.4.0 */ public final class RequestDataWriterRegistry { diff --git a/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/handler/TokenClientHandler.java b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/handler/TokenClientHandler.java index 564b9bb5..3d36b4d8 100644 --- a/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/handler/TokenClientHandler.java +++ b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/handler/TokenClientHandler.java @@ -17,7 +17,10 @@ package com.alibaba.csp.sentinel.cluster.client.handler; import java.util.concurrent.atomic.AtomicInteger; +import com.alibaba.csp.sentinel.cluster.ClusterConstants; import com.alibaba.csp.sentinel.cluster.client.ClientConstants; +import com.alibaba.csp.sentinel.cluster.registry.ConfigSupplierRegistry; +import com.alibaba.csp.sentinel.cluster.request.ClusterRequest; import com.alibaba.csp.sentinel.cluster.response.ClusterResponse; import com.alibaba.csp.sentinel.log.RecordLog; @@ -25,32 +28,63 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; /** + * Netty client handler for Sentinel token client. + * * @author Eric Zhao * @since 1.4.0 */ public class TokenClientHandler extends ChannelInboundHandlerAdapter { - private final AtomicInteger currentState = new AtomicInteger(ClientConstants.CLIENT_STATUS_OFF); + private final AtomicInteger currentState; + private final Runnable disconnectCallback; + + public TokenClientHandler(AtomicInteger currentState, Runnable disconnectCallback) { + this.currentState = currentState; + this.disconnectCallback = disconnectCallback; + } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { - currentState.set(ClientConstants.CLIENT_STATUS_STARTED); + currentState.compareAndSet(ClientConstants.CLIENT_STATUS_PENDING, ClientConstants.CLIENT_STATUS_STARTED); + fireClientPing(ctx); RecordLog.info("[TokenClientHandler] Client handler active, remote address: " + ctx.channel().remoteAddress()); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - System.out.println(String.format("[%s] Client message recv: %s", System.currentTimeMillis(), msg)); + System.out.println(String.format("[%s] Client message recv: %s", System.currentTimeMillis(), msg)); // TODO: remove here if (msg instanceof ClusterResponse) { ClusterResponse response = (ClusterResponse) msg; + if (response.getType() == ClusterConstants.MSG_TYPE_PING) { + handlePingResponse(ctx, response); + return; + } + TokenClientPromiseHolder.completePromise(response.getId(), response); } } + private void fireClientPing(ChannelHandlerContext ctx) { + // Data body: namespace of the client. + ClusterRequest ping = new ClusterRequest().setId(0) + .setType(ClusterConstants.MSG_TYPE_PING) + .setData(ConfigSupplierRegistry.getNamespaceSupplier().get()); + ctx.writeAndFlush(ping); + } + + private void handlePingResponse(ChannelHandlerContext ctx, ClusterResponse response) { + if (response.getStatus() == ClusterConstants.RESPONSE_STATUS_OK) { + int count = (int) response.getData(); + RecordLog.info("[TokenClientHandler] Client ping OK (target server: {0}, connected count: {1})", + ctx.channel().remoteAddress(), count); + return; + } + RecordLog.warn("[TokenClientHandler] Client ping failed (target server: {0})", ctx.channel().remoteAddress()); + } + @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - // TODO: should close the connection when an exception is raised. RecordLog.warn("[TokenClientHandler] Client exception caught", cause); } @@ -61,7 +95,9 @@ public class TokenClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { - currentState.set(ClientConstants.CLIENT_STATUS_OFF); + RecordLog.info("[TokenClientHandler] Client channel unregistered, remote address: " + ctx.channel().remoteAddress()); + currentState.compareAndSet(ClientConstants.CLIENT_STATUS_STARTED, ClientConstants.CLIENT_STATUS_OFF); + disconnectCallback.run(); } public int getCurrentState() { diff --git a/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/handler/TokenClientPromiseHolder.java b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/handler/TokenClientPromiseHolder.java index 033d5900..b0cf0e6c 100644 --- a/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/handler/TokenClientPromiseHolder.java +++ b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/handler/TokenClientPromiseHolder.java @@ -47,13 +47,17 @@ public final class TokenClientPromiseHolder { if (!PROMISE_MAP.containsKey(xid)) { return false; } - ChannelPromise promise = PROMISE_MAP.get(xid).getKey(); - if (promise.isDone() || promise.isCancelled()) { - return false; + SimpleEntry entry = PROMISE_MAP.get(xid); + if (entry != null) { + ChannelPromise promise = entry.getKey(); + if (promise.isDone() || promise.isCancelled()) { + return false; + } + entry.setValue(response); + promise.setSuccess(); + return true; } - PROMISE_MAP.get(xid).setValue(response); - promise.setSuccess(); - return true; + return false; } private TokenClientPromiseHolder() {} diff --git a/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/init/DefaultClusterClientInitFunc.java b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/init/DefaultClusterClientInitFunc.java new file mode 100644 index 00000000..d5851bc0 --- /dev/null +++ b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/init/DefaultClusterClientInitFunc.java @@ -0,0 +1,53 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.cluster.client.init; + +import com.alibaba.csp.sentinel.cluster.client.ClientConstants; +import com.alibaba.csp.sentinel.cluster.client.codec.data.FlowRequestDataWriter; +import com.alibaba.csp.sentinel.cluster.client.codec.data.FlowResponseDataDecoder; +import com.alibaba.csp.sentinel.cluster.client.codec.data.ParamFlowRequestDataWriter; +import com.alibaba.csp.sentinel.cluster.client.codec.data.PingRequestDataWriter; +import com.alibaba.csp.sentinel.cluster.client.codec.data.PingResponseDataDecoder; +import com.alibaba.csp.sentinel.cluster.client.codec.registry.RequestDataWriterRegistry; +import com.alibaba.csp.sentinel.cluster.client.codec.registry.ResponseDataDecodeRegistry; +import com.alibaba.csp.sentinel.init.InitFunc; +import com.alibaba.csp.sentinel.init.InitOrder; + +/** + * @author Eric Zhao + * @since 1.4.0 + */ +@InitOrder(0) +public class DefaultClusterClientInitFunc implements InitFunc { + + @Override + public void init() throws Exception { + initDefaultEntityWriters(); + initDefaultEntityDecoders(); + } + + private void initDefaultEntityWriters() { + RequestDataWriterRegistry.addWriter(ClientConstants.TYPE_PING, new PingRequestDataWriter()); + RequestDataWriterRegistry.addWriter(ClientConstants.TYPE_FLOW, new FlowRequestDataWriter()); + RequestDataWriterRegistry.addWriter(ClientConstants.TYPE_PARAM_FLOW, new ParamFlowRequestDataWriter()); + } + + private void initDefaultEntityDecoders() { + ResponseDataDecodeRegistry.addDecoder(ClientConstants.TYPE_PING, new PingResponseDataDecoder()); + ResponseDataDecodeRegistry.addDecoder(ClientConstants.TYPE_FLOW, new FlowResponseDataDecoder()); + ResponseDataDecodeRegistry.addDecoder(ClientConstants.TYPE_PARAM_FLOW, new FlowResponseDataDecoder()); + } +} diff --git a/sentinel-cluster/sentinel-cluster-client-default/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.cluster.ClusterTokenClient b/sentinel-cluster/sentinel-cluster-client-default/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.cluster.client.ClusterTokenClient similarity index 100% rename from sentinel-cluster/sentinel-cluster-client-default/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.cluster.ClusterTokenClient rename to sentinel-cluster/sentinel-cluster-client-default/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.cluster.client.ClusterTokenClient diff --git a/sentinel-cluster/sentinel-cluster-client-default/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.init.InitFunc b/sentinel-cluster/sentinel-cluster-client-default/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.init.InitFunc new file mode 100755 index 00000000..b9b709cd --- /dev/null +++ b/sentinel-cluster/sentinel-cluster-client-default/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.init.InitFunc @@ -0,0 +1 @@ +com.alibaba.csp.sentinel.cluster.client.init.DefaultClusterClientInitFunc \ No newline at end of file