Procházet zdrojové kódy

Improvements for cluster token client retry and stop control logic

Signed-off-by: Eric Zhao <sczyh16@gmail.com>
master
Eric Zhao před 5 roky
rodič
revize
6a9d479216
2 změnil soubory, kde provedl 65 přidání a 34 odebrání
  1. +49
    -28
      sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/NettyTransportClient.java
  2. +16
    -6
      sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/handler/TokenClientHandler.java

+ 49
- 28
sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/NettyTransportClient.java Zobrazit soubor

@@ -16,14 +16,16 @@
package com.alibaba.csp.sentinel.cluster.client;

import java.util.AbstractMap.SimpleEntry;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import com.alibaba.csp.sentinel.cluster.ClusterErrorMessages;
import com.alibaba.csp.sentinel.cluster.ClusterTransportClient;
import com.alibaba.csp.sentinel.cluster.client.codec.netty.NettyRequestEncoder;
import com.alibaba.csp.sentinel.cluster.client.codec.netty.NettyResponseDecoder;
import com.alibaba.csp.sentinel.cluster.client.config.ClusterClientConfig;
import com.alibaba.csp.sentinel.cluster.client.config.ClusterClientConfigManager;
import com.alibaba.csp.sentinel.cluster.client.handler.TokenClientHandler;
import com.alibaba.csp.sentinel.cluster.client.handler.TokenClientPromiseHolder;
@@ -31,6 +33,7 @@ import com.alibaba.csp.sentinel.cluster.exception.SentinelClusterException;
import com.alibaba.csp.sentinel.cluster.request.ClusterRequest;
import com.alibaba.csp.sentinel.cluster.request.Request;
import com.alibaba.csp.sentinel.cluster.response.ClusterResponse;
import com.alibaba.csp.sentinel.concurrent.NamedThreadFactory;
import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.util.AssertUtil;

@@ -57,7 +60,10 @@ import io.netty.util.concurrent.GenericFutureListener;
*/
public class NettyTransportClient implements ClusterTransportClient {

public static final int RECONNECT_DELAY_MS = 1000;
private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1,
new NamedThreadFactory("sentinel-cluster-transport-client-scheduler"));

public static final int RECONNECT_DELAY_MS = 2000;

private final String host;
private final int port;
@@ -70,11 +76,7 @@ public class NettyTransportClient implements ClusterTransportClient {
private final AtomicInteger currentState = new AtomicInteger(ClientConstants.CLIENT_STATUS_OFF);
private final AtomicInteger failConnectedTime = new AtomicInteger(0);

public NettyTransportClient(ClusterClientConfig clientConfig) {
AssertUtil.notNull(clientConfig, "client config cannot be null");
this.host = clientConfig.getServerHost();
this.port = clientConfig.getServerPort();
}
private final AtomicBoolean shouldRetry = new AtomicBoolean(true);

public NettyTransportClient(String host, int port) {
AssertUtil.assertNotBlank(host, "remote host cannot be blank");
@@ -88,14 +90,14 @@ public class NettyTransportClient implements ClusterTransportClient {
eventLoopGroup = new NioEventLoopGroup();
b.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_TIMEOUT, 20)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, ClusterClientConfigManager.getConnectTimeout())
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
clientHandler = new TokenClientHandler(currentState, disconnectCallback);

ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));
pipeline.addLast(new NettyResponseDecoder());
@@ -110,18 +112,21 @@ public class NettyTransportClient implements ClusterTransportClient {

private void connect(Bootstrap b) {
if (currentState.compareAndSet(ClientConstants.CLIENT_STATUS_OFF, ClientConstants.CLIENT_STATUS_PENDING)) {
b.connect(host, port).addListener(new GenericFutureListener<ChannelFuture>() {
b.connect(host, port)
.addListener(new GenericFutureListener<ChannelFuture>() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.cause() != null) {
RecordLog.warn(String.format("[NettyTransportClient] Could not connect to <%s:%d> after %d times",
host, port, failConnectedTime.get()), future.cause());
RecordLog.warn(
String.format("[NettyTransportClient] Could not connect to <%s:%d> after %d times",
host, port, failConnectedTime.get()), future.cause());
failConnectedTime.incrementAndGet();
channel = null;
} else {
failConnectedTime.set(0);
channel = future.channel();
RecordLog.info("[NettyTransportClient] Successfully connect to server <" + host + ":" + port + ">");
RecordLog.info(
"[NettyTransportClient] Successfully connect to server <" + host + ":" + port + ">");
}
}
});
@@ -131,44 +136,60 @@ public class NettyTransportClient implements ClusterTransportClient {
private Runnable disconnectCallback = new Runnable() {
@Override
public void run() {
if (channel != null) {
channel.eventLoop().schedule(new Runnable() {
@Override
public void run() {
if (!shouldRetry.get()) {
return;
}
SCHEDULER.schedule(new Runnable() {
@Override
public void run() {
if (shouldRetry.get()) {
RecordLog.info("[NettyTransportClient] Reconnecting to server <" + host + ":" + port + ">");
try {
start();
startInternal();
} catch (Exception e) {
RecordLog.warn("[NettyTransportClient] Failed to reconnect to server", e);
}
}
}, RECONNECT_DELAY_MS * (failConnectedTime.get() + 1), TimeUnit.MILLISECONDS);
}
}
}, RECONNECT_DELAY_MS * (failConnectedTime.get() + 1), TimeUnit.MILLISECONDS);
cleanUp();
}
};

@Override
public void start() throws Exception {
shouldRetry.set(true);
startInternal();
}

private void startInternal() {
connect(initClientBootstrap());
}

private void cleanUp() {
if (channel != null) {
channel.close();
channel = null;
}
if (eventLoopGroup != null) {
eventLoopGroup.shutdownGracefully();
}
}

@Override
public void stop() throws Exception {
// Stop retrying for connection.
shouldRetry.set(false);

while (currentState.get() == ClientConstants.CLIENT_STATUS_PENDING) {
try {
Thread.sleep(500);
Thread.sleep(200);
} catch (Exception ex) {
// Ignore.
}
}

if (channel != null) {
channel.close();
channel = null;
}
if (eventLoopGroup != null) {
eventLoopGroup.shutdownGracefully();
}
cleanUp();
failConnectedTime.set(0);

RecordLog.info("[NettyTransportClient] Cluster transport client stopped");


+ 16
- 6
sentinel-cluster/sentinel-cluster-client-default/src/main/java/com/alibaba/csp/sentinel/cluster/client/handler/TokenClientHandler.java Zobrazit soubor

@@ -15,6 +15,7 @@
*/
package com.alibaba.csp.sentinel.cluster.client.handler;

import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicInteger;

import com.alibaba.csp.sentinel.cluster.ClusterConstants;
@@ -47,7 +48,7 @@ public class TokenClientHandler extends ChannelInboundHandlerAdapter {
public void channelActive(ChannelHandlerContext ctx) throws Exception {
currentState.set(ClientConstants.CLIENT_STATUS_STARTED);
fireClientPing(ctx);
RecordLog.info("[TokenClientHandler] Client handler active, remote address: " + ctx.channel().remoteAddress());
RecordLog.info("[TokenClientHandler] Client handler active, remote address: " + getRemoteAddress(ctx));
}

@Override
@@ -76,10 +77,10 @@ public class TokenClientHandler extends ChannelInboundHandlerAdapter {
if (response.getStatus() == ClusterConstants.RESPONSE_STATUS_OK) {
int count = (int) response.getData();
RecordLog.info("[TokenClientHandler] Client ping OK (target server: {0}, connected count: {1})",
ctx.channel().remoteAddress(), count);
return;
getRemoteAddress(ctx), count);
} else {
RecordLog.warn("[TokenClientHandler] Client ping failed (target server: {0})", getRemoteAddress(ctx));
}
RecordLog.warn("[TokenClientHandler] Client ping failed (target server: {0})", ctx.channel().remoteAddress());
}

@Override
@@ -89,16 +90,25 @@ public class TokenClientHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
RecordLog.info("[TokenClientHandler] Client handler inactive, remote address: " + ctx.channel().remoteAddress());
RecordLog.info("[TokenClientHandler] Client handler inactive, remote address: " + getRemoteAddress(ctx));
}

@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
RecordLog.info("[TokenClientHandler] Client channel unregistered, remote address: " + ctx.channel().remoteAddress());
RecordLog.info("[TokenClientHandler] Client channel unregistered, remote address: " + getRemoteAddress(ctx));
currentState.set(ClientConstants.CLIENT_STATUS_OFF);

disconnectCallback.run();
}

private String getRemoteAddress(ChannelHandlerContext ctx) {
if (ctx.channel().remoteAddress() == null) {
return null;
}
InetSocketAddress inetAddress = (InetSocketAddress) ctx.channel().remoteAddress();
return inetAddress.getAddress().getHostAddress() + ":" + inetAddress.getPort();
}

public int getCurrentState() {
return currentState.get();
}


Načítá se…
Zrušit
Uložit