diff --git a/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/NettyTransportClient.java b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/NettyTransportClient.java index e70df0e0..2492bfea 100644 --- a/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/NettyTransportClient.java +++ b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/NettyTransportClient.java @@ -16,14 +16,16 @@ package com.alibaba.csp.sentinel.cluster.client; import java.util.AbstractMap.SimpleEntry; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import com.alibaba.csp.sentinel.cluster.ClusterErrorMessages; import com.alibaba.csp.sentinel.cluster.ClusterTransportClient; import com.alibaba.csp.sentinel.cluster.client.codec.netty.NettyRequestEncoder; import com.alibaba.csp.sentinel.cluster.client.codec.netty.NettyResponseDecoder; -import com.alibaba.csp.sentinel.cluster.client.config.ClusterClientConfig; import com.alibaba.csp.sentinel.cluster.client.config.ClusterClientConfigManager; import com.alibaba.csp.sentinel.cluster.client.handler.TokenClientHandler; import com.alibaba.csp.sentinel.cluster.client.handler.TokenClientPromiseHolder; @@ -31,6 +33,7 @@ import com.alibaba.csp.sentinel.cluster.exception.SentinelClusterException; import com.alibaba.csp.sentinel.cluster.request.ClusterRequest; import com.alibaba.csp.sentinel.cluster.request.Request; import com.alibaba.csp.sentinel.cluster.response.ClusterResponse; +import com.alibaba.csp.sentinel.concurrent.NamedThreadFactory; import com.alibaba.csp.sentinel.log.RecordLog; import com.alibaba.csp.sentinel.util.AssertUtil; @@ -57,7 +60,10 @@ import io.netty.util.concurrent.GenericFutureListener; */ public class NettyTransportClient implements ClusterTransportClient { - public static final int RECONNECT_DELAY_MS = 1000; + private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1, + new NamedThreadFactory("sentinel-cluster-transport-client-scheduler")); + + public static final int RECONNECT_DELAY_MS = 2000; private final String host; private final int port; @@ -70,11 +76,7 @@ public class NettyTransportClient implements ClusterTransportClient { private final AtomicInteger currentState = new AtomicInteger(ClientConstants.CLIENT_STATUS_OFF); private final AtomicInteger failConnectedTime = new AtomicInteger(0); - public NettyTransportClient(ClusterClientConfig clientConfig) { - AssertUtil.notNull(clientConfig, "client config cannot be null"); - this.host = clientConfig.getServerHost(); - this.port = clientConfig.getServerPort(); - } + private final AtomicBoolean shouldRetry = new AtomicBoolean(true); public NettyTransportClient(String host, int port) { AssertUtil.assertNotBlank(host, "remote host cannot be blank"); @@ -88,14 +90,14 @@ public class NettyTransportClient implements ClusterTransportClient { eventLoopGroup = new NioEventLoopGroup(); b.group(eventLoopGroup) .channel(NioSocketChannel.class) - .option(ChannelOption.SO_TIMEOUT, 20) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, ClusterClientConfigManager.getConnectTimeout()) .handler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { clientHandler = new TokenClientHandler(currentState, disconnectCallback); + ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2)); pipeline.addLast(new NettyResponseDecoder()); @@ -110,18 +112,21 @@ public class NettyTransportClient implements ClusterTransportClient { private void connect(Bootstrap b) { if (currentState.compareAndSet(ClientConstants.CLIENT_STATUS_OFF, ClientConstants.CLIENT_STATUS_PENDING)) { - b.connect(host, port).addListener(new GenericFutureListener() { + b.connect(host, port) + .addListener(new GenericFutureListener() { @Override public void operationComplete(ChannelFuture future) { if (future.cause() != null) { - RecordLog.warn(String.format("[NettyTransportClient] Could not connect to <%s:%d> after %d times", - host, port, failConnectedTime.get()), future.cause()); + RecordLog.warn( + String.format("[NettyTransportClient] Could not connect to <%s:%d> after %d times", + host, port, failConnectedTime.get()), future.cause()); failConnectedTime.incrementAndGet(); channel = null; } else { failConnectedTime.set(0); channel = future.channel(); - RecordLog.info("[NettyTransportClient] Successfully connect to server <" + host + ":" + port + ">"); + RecordLog.info( + "[NettyTransportClient] Successfully connect to server <" + host + ":" + port + ">"); } } }); @@ -131,44 +136,60 @@ public class NettyTransportClient implements ClusterTransportClient { private Runnable disconnectCallback = new Runnable() { @Override public void run() { - if (channel != null) { - channel.eventLoop().schedule(new Runnable() { - @Override - public void run() { + if (!shouldRetry.get()) { + return; + } + SCHEDULER.schedule(new Runnable() { + @Override + public void run() { + if (shouldRetry.get()) { RecordLog.info("[NettyTransportClient] Reconnecting to server <" + host + ":" + port + ">"); try { - start(); + startInternal(); } catch (Exception e) { RecordLog.warn("[NettyTransportClient] Failed to reconnect to server", e); } } - }, RECONNECT_DELAY_MS * (failConnectedTime.get() + 1), TimeUnit.MILLISECONDS); - } + } + }, RECONNECT_DELAY_MS * (failConnectedTime.get() + 1), TimeUnit.MILLISECONDS); + cleanUp(); } }; @Override public void start() throws Exception { + shouldRetry.set(true); + startInternal(); + } + + private void startInternal() { connect(initClientBootstrap()); } + private void cleanUp() { + if (channel != null) { + channel.close(); + channel = null; + } + if (eventLoopGroup != null) { + eventLoopGroup.shutdownGracefully(); + } + } + @Override public void stop() throws Exception { + // Stop retrying for connection. + shouldRetry.set(false); + while (currentState.get() == ClientConstants.CLIENT_STATUS_PENDING) { try { - Thread.sleep(500); + Thread.sleep(200); } catch (Exception ex) { // Ignore. } } - if (channel != null) { - channel.close(); - channel = null; - } - if (eventLoopGroup != null) { - eventLoopGroup.shutdownGracefully(); - } + cleanUp(); failConnectedTime.set(0); RecordLog.info("[NettyTransportClient] Cluster transport client stopped"); diff --git a/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/handler/TokenClientHandler.java b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/handler/TokenClientHandler.java index 61b2e37c..aef1e377 100644 --- a/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/handler/TokenClientHandler.java +++ b/sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/handler/TokenClientHandler.java @@ -15,6 +15,7 @@ */ package com.alibaba.csp.sentinel.cluster.client.handler; +import java.net.InetSocketAddress; import java.util.concurrent.atomic.AtomicInteger; import com.alibaba.csp.sentinel.cluster.ClusterConstants; @@ -47,7 +48,7 @@ public class TokenClientHandler extends ChannelInboundHandlerAdapter { public void channelActive(ChannelHandlerContext ctx) throws Exception { currentState.set(ClientConstants.CLIENT_STATUS_STARTED); fireClientPing(ctx); - RecordLog.info("[TokenClientHandler] Client handler active, remote address: " + ctx.channel().remoteAddress()); + RecordLog.info("[TokenClientHandler] Client handler active, remote address: " + getRemoteAddress(ctx)); } @Override @@ -76,10 +77,10 @@ public class TokenClientHandler extends ChannelInboundHandlerAdapter { if (response.getStatus() == ClusterConstants.RESPONSE_STATUS_OK) { int count = (int) response.getData(); RecordLog.info("[TokenClientHandler] Client ping OK (target server: {0}, connected count: {1})", - ctx.channel().remoteAddress(), count); - return; + getRemoteAddress(ctx), count); + } else { + RecordLog.warn("[TokenClientHandler] Client ping failed (target server: {0})", getRemoteAddress(ctx)); } - RecordLog.warn("[TokenClientHandler] Client ping failed (target server: {0})", ctx.channel().remoteAddress()); } @Override @@ -89,16 +90,25 @@ public class TokenClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { - RecordLog.info("[TokenClientHandler] Client handler inactive, remote address: " + ctx.channel().remoteAddress()); + RecordLog.info("[TokenClientHandler] Client handler inactive, remote address: " + getRemoteAddress(ctx)); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { - RecordLog.info("[TokenClientHandler] Client channel unregistered, remote address: " + ctx.channel().remoteAddress()); + RecordLog.info("[TokenClientHandler] Client channel unregistered, remote address: " + getRemoteAddress(ctx)); currentState.set(ClientConstants.CLIENT_STATUS_OFF); + disconnectCallback.run(); } + private String getRemoteAddress(ChannelHandlerContext ctx) { + if (ctx.channel().remoteAddress() == null) { + return null; + } + InetSocketAddress inetAddress = (InetSocketAddress) ctx.channel().remoteAddress(); + return inetAddress.getAddress().getHostAddress() + ":" + inetAddress.getPort(); + } + public int getCurrentState() { return currentState.get(); }