Signed-off-by: Eric Zhao <sczyh16@gmail.com>master
@@ -17,6 +17,11 @@ | |||
<groupId>com.alibaba.csp</groupId> | |||
<artifactId>sentinel-core</artifactId> | |||
</dependency> | |||
<dependency> | |||
<groupId>com.alibaba.csp</groupId> | |||
<artifactId>sentinel-transport-common</artifactId> | |||
<scope>provided</scope> | |||
</dependency> | |||
<dependency> | |||
<groupId>com.alibaba.csp</groupId> | |||
<artifactId>sentinel-cluster-common-default</artifactId> | |||
@@ -26,5 +31,16 @@ | |||
<groupId>io.netty</groupId> | |||
<artifactId>netty-all</artifactId> | |||
</dependency> | |||
<dependency> | |||
<groupId>junit</groupId> | |||
<artifactId>junit</artifactId> | |||
<scope>test</scope> | |||
</dependency> | |||
<dependency> | |||
<groupId>org.mockito</groupId> | |||
<artifactId>mockito-core</artifactId> | |||
<scope>test</scope> | |||
</dependency> | |||
</dependencies> | |||
</project> |
@@ -24,32 +24,92 @@ import com.alibaba.csp.sentinel.cluster.TokenResult; | |||
import com.alibaba.csp.sentinel.cluster.TokenResultStatus; | |||
import com.alibaba.csp.sentinel.cluster.TokenServerDescriptor; | |||
import com.alibaba.csp.sentinel.cluster.client.config.ClusterClientConfig; | |||
import com.alibaba.csp.sentinel.cluster.client.config.ClusterClientConfigManager; | |||
import com.alibaba.csp.sentinel.cluster.client.config.ServerChangeObserver; | |||
import com.alibaba.csp.sentinel.cluster.log.ClusterStatLogUtil; | |||
import com.alibaba.csp.sentinel.cluster.request.ClusterRequest; | |||
import com.alibaba.csp.sentinel.cluster.request.data.FlowRequestData; | |||
import com.alibaba.csp.sentinel.cluster.request.data.ParamFlowRequestData; | |||
import com.alibaba.csp.sentinel.cluster.response.ClusterResponse; | |||
import com.alibaba.csp.sentinel.cluster.response.data.FlowTokenResponseData; | |||
import com.alibaba.csp.sentinel.log.RecordLog; | |||
import com.alibaba.csp.sentinel.util.StringUtil; | |||
/** | |||
* Default implementation of {@link ClusterTokenClient}. | |||
* | |||
* @author Eric Zhao | |||
* @since 1.4.0 | |||
*/ | |||
public class DefaultClusterTokenClient implements ClusterTokenClient { | |||
private ClusterTransportClient transportClient; | |||
private TokenServerDescriptor serverDescriptor; | |||
public DefaultClusterTokenClient() { | |||
// TODO: load and create transport client here. | |||
ClusterClientConfigManager.addServerChangeObserver(new ServerChangeObserver() { | |||
@Override | |||
public void onRemoteServerChange(ClusterClientConfig clusterClientConfig) { | |||
changeServer(clusterClientConfig); | |||
} | |||
}); | |||
initNewConnection(); | |||
} | |||
public DefaultClusterTokenClient(ClusterTransportClient transportClient) { | |||
// TODO: only for test, remove this constructor. | |||
this.transportClient = transportClient; | |||
} | |||
private boolean serverEqual(TokenServerDescriptor descriptor, ClusterClientConfig config) { | |||
if (descriptor == null || config == null) { | |||
return false; | |||
} | |||
return descriptor.getHost().equals(config.getServerHost()) && descriptor.getPort() == config.getServerPort(); | |||
} | |||
private void initNewConnection() { | |||
if (transportClient != null) { | |||
return; | |||
} | |||
String host = ClusterClientConfigManager.getServerHost(); | |||
int port = ClusterClientConfigManager.getServerPort(); | |||
if (StringUtil.isBlank(host) || port <= 0) { | |||
return; | |||
} | |||
try { | |||
this.transportClient = new NettyTransportClient(host, port); | |||
this.serverDescriptor = new TokenServerDescriptor(host, port); | |||
transportClient.start(); | |||
} catch (Exception ex) { | |||
ex.printStackTrace(); | |||
} | |||
} | |||
private void changeServer(/*@Valid*/ ClusterClientConfig config) { | |||
if (serverEqual(serverDescriptor, config)) { | |||
return; | |||
} | |||
try { | |||
// TODO: what if the client is pending init? | |||
if (transportClient != null && transportClient.isReady()) { | |||
transportClient.stop(); | |||
} | |||
// Replace with new, even if the new client is not ready. | |||
this.transportClient = new NettyTransportClient(config); | |||
this.serverDescriptor = new TokenServerDescriptor(config.getServerHost(), config.getServerPort()); | |||
transportClient.start(); | |||
RecordLog.info("[DefaultClusterTokenClient] New client created: " + serverDescriptor); | |||
} catch (Exception ex) { | |||
RecordLog.warn("[DefaultClusterTokenClient] Failed to change remote token server", ex); | |||
ex.printStackTrace(); | |||
} | |||
} | |||
@Override | |||
public TokenServerDescriptor currentServer() { | |||
return new TokenServerDescriptor(); | |||
return serverDescriptor; | |||
} | |||
@Override | |||
@@ -84,15 +144,11 @@ public class DefaultClusterTokenClient implements ClusterTokenClient { | |||
} | |||
} | |||
private boolean notValidRequest(Long id, int count) { | |||
return id == null || id <= 0 || count <= 0; | |||
} | |||
private TokenResult badRequest() { | |||
return new TokenResult(TokenResultStatus.BAD_REQUEST); | |||
} | |||
private TokenResult sendTokenRequest(ClusterRequest request) throws Exception { | |||
if (transportClient == null) { | |||
RecordLog.warn("[DefaultClusterTokenClient] Client not created, please check your config for cluster client"); | |||
return clientFail(); | |||
} | |||
ClusterResponse response = transportClient.sendRequest(request); | |||
TokenResult result = new TokenResult(response.getStatus()); | |||
if (response.getData() != null) { | |||
@@ -102,4 +158,16 @@ public class DefaultClusterTokenClient implements ClusterTokenClient { | |||
} | |||
return result; | |||
} | |||
private boolean notValidRequest(Long id, int count) { | |||
return id == null || id <= 0 || count <= 0; | |||
} | |||
private TokenResult badRequest() { | |||
return new TokenResult(TokenResultStatus.BAD_REQUEST); | |||
} | |||
private TokenResult clientFail() { | |||
return new TokenResult(TokenResultStatus.FAIL); | |||
} | |||
} |
@@ -23,6 +23,7 @@ import com.alibaba.csp.sentinel.cluster.ClusterTransportClient; | |||
import com.alibaba.csp.sentinel.cluster.client.codec.netty.NettyRequestEncoder; | |||
import com.alibaba.csp.sentinel.cluster.client.codec.netty.NettyResponseDecoder; | |||
import com.alibaba.csp.sentinel.cluster.client.config.ClusterClientConfig; | |||
import com.alibaba.csp.sentinel.cluster.client.config.ClusterClientConfigManager; | |||
import com.alibaba.csp.sentinel.cluster.client.handler.TokenClientHandler; | |||
import com.alibaba.csp.sentinel.cluster.client.handler.TokenClientPromiseHolder; | |||
import com.alibaba.csp.sentinel.cluster.exception.SentinelClusterException; | |||
@@ -55,20 +56,25 @@ import io.netty.util.concurrent.GenericFutureListener; | |||
*/ | |||
public class NettyTransportClient implements ClusterTransportClient { | |||
private ClusterClientConfig clientConfig; | |||
private String host; | |||
private int port; | |||
private final String host; | |||
private final int port; | |||
private Channel channel; | |||
private NioEventLoopGroup eventLoopGroup; | |||
private TokenClientHandler clientHandler; | |||
private AtomicInteger idGenerator = new AtomicInteger(0); | |||
private AtomicInteger failConnectedTime = new AtomicInteger(0); | |||
public NettyTransportClient(ClusterClientConfig clientConfig, String host, int port) { | |||
this.clientConfig = clientConfig; | |||
public NettyTransportClient(ClusterClientConfig clientConfig) { | |||
AssertUtil.notNull(clientConfig, "client config cannot be null"); | |||
this.host = clientConfig.getServerHost(); | |||
this.port = clientConfig.getServerPort(); | |||
} | |||
public NettyTransportClient(String host, int port) { | |||
AssertUtil.assertNotBlank(host, "remote host cannot be blank"); | |||
AssertUtil.isTrue(port > 0, "port should be positive"); | |||
this.host = host; | |||
this.port = port; | |||
} | |||
@@ -117,11 +123,13 @@ public class NettyTransportClient implements ClusterTransportClient { | |||
}); | |||
} | |||
public void start() { | |||
@Override | |||
public void start() throws Exception { | |||
connect(initClientBootstrap()); | |||
} | |||
public void stop() { | |||
@Override | |||
public void stop() throws Exception { | |||
if (channel != null) { | |||
channel.close(); | |||
channel = null; | |||
@@ -138,6 +146,7 @@ public class NettyTransportClient implements ClusterTransportClient { | |||
return request != null && request.getType() >= 0; | |||
} | |||
@Override | |||
public boolean isReady() { | |||
return channel != null && clientHandler != null && clientHandler.hasStarted(); | |||
} | |||
@@ -159,8 +168,7 @@ public class NettyTransportClient implements ClusterTransportClient { | |||
ChannelPromise promise = channel.newPromise(); | |||
TokenClientPromiseHolder.putPromise(xid, promise); | |||
// TODO: timeout | |||
if (!promise.await(20)) { | |||
if (!promise.await(ClusterClientConfigManager.getRequestTimeout())) { | |||
throw new SentinelClusterException(ClusterErrorMessages.REQUEST_TIME_OUT); | |||
} | |||
@@ -37,8 +37,8 @@ public class DefaultRequestEntityWriter implements RequestEntityWriter<ClusterRe | |||
if (requestDataWriter == null) { | |||
// TODO: may need to throw exception? | |||
RecordLog.warn( | |||
"[NettyRequestEncoder] Cannot find matching request writer for type <{0}>, dropping the request", type); | |||
RecordLog.warn("[DefaultRequestEntityWriter] Cannot find matching request writer for type <{0}>," | |||
+ " dropping the request", type); | |||
return; | |||
} | |||
// Write head part of request. | |||
@@ -27,6 +27,7 @@ import io.netty.handler.codec.MessageToByteEncoder; | |||
/** | |||
* @author Eric Zhao | |||
* @since 1.4.0 | |||
*/ | |||
public class NettyRequestEncoder extends MessageToByteEncoder<ClusterRequest> { | |||
@@ -62,4 +62,14 @@ public class ClusterClientConfig { | |||
this.connectTimeout = connectTimeout; | |||
return this; | |||
} | |||
@Override | |||
public String toString() { | |||
return "ClusterClientConfig{" + | |||
"serverHost='" + serverHost + '\'' + | |||
", serverPort=" + serverPort + | |||
", requestTimeout=" + requestTimeout + | |||
", connectTimeout=" + connectTimeout + | |||
'}'; | |||
} | |||
} |
@@ -15,16 +15,125 @@ | |||
*/ | |||
package com.alibaba.csp.sentinel.cluster.client.config; | |||
import java.util.ArrayList; | |||
import java.util.List; | |||
import com.alibaba.csp.sentinel.cluster.ClusterConstants; | |||
import com.alibaba.csp.sentinel.log.RecordLog; | |||
import com.alibaba.csp.sentinel.property.DynamicSentinelProperty; | |||
import com.alibaba.csp.sentinel.property.PropertyListener; | |||
import com.alibaba.csp.sentinel.property.SentinelProperty; | |||
import com.alibaba.csp.sentinel.util.AssertUtil; | |||
import com.alibaba.csp.sentinel.util.StringUtil; | |||
/** | |||
* @author Eric Zhao | |||
* @since 1.4.0 | |||
*/ | |||
public final class ClusterClientConfigManager { | |||
private static volatile String serverIp; | |||
/** | |||
* Client config properties. | |||
*/ | |||
private static volatile String serverHost = null; | |||
private static volatile int serverPort = ClusterConstants.DEFAULT_CLUSTER_SERVER_PORT; | |||
private static volatile int requestTimeout = ClusterConstants.DEFAULT_REQUEST_TIMEOUT; | |||
private static final PropertyListener<ClusterClientConfig> PROPERTY_LISTENER = new ClientConfigPropertyListener(); | |||
private static SentinelProperty<ClusterClientConfig> currentProperty = new DynamicSentinelProperty<>(); | |||
private static final List<ServerChangeObserver> SERVER_CHANGE_OBSERVERS = new ArrayList<>(); | |||
static { | |||
currentProperty.addListener(PROPERTY_LISTENER); | |||
} | |||
public static void register2Property(SentinelProperty<ClusterClientConfig> property) { | |||
synchronized (PROPERTY_LISTENER) { | |||
RecordLog.info("[ClusterClientConfigManager] Registering new property to cluster client config manager"); | |||
currentProperty.removeListener(PROPERTY_LISTENER); | |||
property.addListener(PROPERTY_LISTENER); | |||
currentProperty = property; | |||
} | |||
} | |||
public static void addServerChangeObserver(ServerChangeObserver observer) { | |||
AssertUtil.notNull(observer, "observer cannot be null"); | |||
SERVER_CHANGE_OBSERVERS.add(observer); | |||
} | |||
/** | |||
* Apply new {@link ClusterClientConfig}, while the former config will be replaced. | |||
* | |||
* @param config new config to apply | |||
*/ | |||
public static void applyNewConfig(ClusterClientConfig config) { | |||
currentProperty.updateValue(config); | |||
} | |||
private static class ClientConfigPropertyListener implements PropertyListener<ClusterClientConfig> { | |||
@Override | |||
public void configUpdate(ClusterClientConfig config) { | |||
applyConfig(config); | |||
} | |||
@Override | |||
public void configLoad(ClusterClientConfig config) { | |||
if (config == null) { | |||
RecordLog.warn("[ClusterClientConfigManager] Empty initial config"); | |||
return; | |||
} | |||
applyConfig(config); | |||
} | |||
private synchronized void applyConfig(ClusterClientConfig config) { | |||
if (!isValidConfig(config)) { | |||
RecordLog.warn( | |||
"[ClusterClientConfigManager] Invalid cluster client config, ignoring: " + config); | |||
return; | |||
} | |||
RecordLog.info("[ClusterClientConfigManager] Updating new config: " + config); | |||
if (config.getRequestTimeout() != requestTimeout) { | |||
requestTimeout = config.getRequestTimeout(); | |||
} | |||
updateServer(config); | |||
} | |||
} | |||
public static boolean isValidConfig(ClusterClientConfig config) { | |||
return config != null && StringUtil.isNotBlank(config.getServerHost()) | |||
&& config.getServerPort() > 0 | |||
&& config.getRequestTimeout() > 0; | |||
} | |||
public static void updateServer(ClusterClientConfig config) { | |||
String host = config.getServerHost(); | |||
int port = config.getServerPort(); | |||
AssertUtil.assertNotBlank(host, "token server host cannot be empty"); | |||
AssertUtil.isTrue(port > 0, "token server port should be valid (positive)"); | |||
if (serverPort == port && host.equals(serverHost)) { | |||
return; | |||
} | |||
for (ServerChangeObserver observer : SERVER_CHANGE_OBSERVERS) { | |||
observer.onRemoteServerChange(config); | |||
} | |||
serverHost = host; | |||
serverPort = port; | |||
} | |||
public static String getServerHost() { | |||
return serverHost; | |||
} | |||
public static int getServerPort() { | |||
return serverPort; | |||
} | |||
public static int getRequestTimeout() { | |||
return requestTimeout; | |||
} | |||
private ClusterClientConfigManager() {} | |||
} |
@@ -0,0 +1,30 @@ | |||
/* | |||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||
* | |||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||
* you may not use this file except in compliance with the License. | |||
* You may obtain a copy of the License at | |||
* | |||
* http://www.apache.org/licenses/LICENSE-2.0 | |||
* | |||
* Unless required by applicable law or agreed to in writing, software | |||
* distributed under the License is distributed on an "AS IS" BASIS, | |||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
* See the License for the specific language governing permissions and | |||
* limitations under the License. | |||
*/ | |||
package com.alibaba.csp.sentinel.cluster.client.config; | |||
/** | |||
* @author Eric Zhao | |||
* @since 1.4.0 | |||
*/ | |||
public interface ServerChangeObserver { | |||
/** | |||
* Callback on remote server address change. | |||
* | |||
* @param clusterClientConfig new cluster client config | |||
*/ | |||
void onRemoteServerChange(ClusterClientConfig clusterClientConfig); | |||
} |
@@ -19,6 +19,7 @@ import java.util.concurrent.atomic.AtomicInteger; | |||
import com.alibaba.csp.sentinel.cluster.client.ClientConstants; | |||
import com.alibaba.csp.sentinel.cluster.response.ClusterResponse; | |||
import com.alibaba.csp.sentinel.log.RecordLog; | |||
import io.netty.channel.ChannelHandlerContext; | |||
import io.netty.channel.ChannelInboundHandlerAdapter; | |||
@@ -34,6 +35,7 @@ public class TokenClientHandler extends ChannelInboundHandlerAdapter { | |||
@Override | |||
public void channelActive(ChannelHandlerContext ctx) throws Exception { | |||
currentState.set(ClientConstants.CLIENT_STATUS_STARTED); | |||
RecordLog.info("[TokenClientHandler] Client handler active, remote address: " + ctx.channel().remoteAddress()); | |||
} | |||
@Override | |||
@@ -48,13 +50,13 @@ public class TokenClientHandler extends ChannelInboundHandlerAdapter { | |||
@Override | |||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { | |||
super.exceptionCaught(ctx, cause); | |||
cause.printStackTrace(); | |||
// TODO: should close the connection when an exception is raised. | |||
RecordLog.warn("[TokenClientHandler] Client exception caught", cause); | |||
} | |||
@Override | |||
public void channelInactive(ChannelHandlerContext ctx) throws Exception { | |||
super.channelInactive(ctx); | |||
RecordLog.info("[TokenClientHandler] Client handler inactive, remote address: " + ctx.channel().remoteAddress()); | |||
} | |||
@Override | |||
@@ -0,0 +1,42 @@ | |||
/* | |||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||
* | |||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||
* you may not use this file except in compliance with the License. | |||
* You may obtain a copy of the License at | |||
* | |||
* http://www.apache.org/licenses/LICENSE-2.0 | |||
* | |||
* Unless required by applicable law or agreed to in writing, software | |||
* distributed under the License is distributed on an "AS IS" BASIS, | |||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
* See the License for the specific language governing permissions and | |||
* limitations under the License. | |||
*/ | |||
package com.alibaba.csp.sentinel.command.handler; | |||
import com.alibaba.csp.sentinel.cluster.client.config.ClusterClientConfig; | |||
import com.alibaba.csp.sentinel.cluster.client.config.ClusterClientConfigManager; | |||
import com.alibaba.csp.sentinel.command.CommandHandler; | |||
import com.alibaba.csp.sentinel.command.CommandRequest; | |||
import com.alibaba.csp.sentinel.command.CommandResponse; | |||
import com.alibaba.csp.sentinel.command.annotation.CommandMapping; | |||
/** | |||
* @author Eric Zhao | |||
* @since 1.4.0 | |||
*/ | |||
@CommandMapping(name = "modifyClusterConfig") | |||
public class ModifyClusterClientConfigHandler implements CommandHandler<String> { | |||
@Override | |||
public CommandResponse<String> handle(CommandRequest request) { | |||
// TODO: parse the new config; | |||
ClusterClientConfig clusterClientConfig = null; | |||
ClusterClientConfigManager.applyNewConfig(clusterClientConfig); | |||
return CommandResponse.ofSuccess("ok"); | |||
} | |||
} | |||
@@ -0,0 +1 @@ | |||
com.alibaba.csp.sentinel.cluster.client.DefaultClusterTokenClient |
@@ -0,0 +1 @@ | |||
com.alibaba.csp.sentinel.command.handler.ModifyClusterClientConfigHandler |
@@ -26,6 +26,20 @@ import com.alibaba.csp.sentinel.cluster.response.ClusterResponse; | |||
*/ | |||
public interface ClusterTransportClient { | |||
/** | |||
* Start the client. | |||
* | |||
* @throws Exception some error occurred (e.g. initialization failed) | |||
*/ | |||
void start() throws Exception; | |||
/** | |||
* Stop the client. | |||
* | |||
* @throws Exception some error occurred (e.g. shutdown failed) | |||
*/ | |||
void stop() throws Exception; | |||
/** | |||
* Send request to remote server and get response. | |||
* | |||
@@ -34,4 +48,11 @@ public interface ClusterTransportClient { | |||
* @throws Exception some error occurs | |||
*/ | |||
ClusterResponse sendRequest(ClusterRequest request) throws Exception; | |||
/** | |||
* Check whether the client has been started and ready for sending requests. | |||
* | |||
* @return true if the client is ready to send requests, otherwise false | |||
*/ | |||
boolean isReady(); | |||
} |
@@ -26,7 +26,7 @@ public interface ClusterTokenClient extends TokenService { | |||
/** | |||
* Get descriptor of current token server. | |||
* | |||
* @return current token server | |||
* @return current token server if connected, otherwise null | |||
*/ | |||
TokenServerDescriptor currentServer(); | |||
} |
@@ -23,11 +23,10 @@ package com.alibaba.csp.sentinel.cluster; | |||
*/ | |||
public class TokenServerDescriptor { | |||
private String host; | |||
private int port; | |||
private String type; | |||
private final String host; | |||
private final int port; | |||
public TokenServerDescriptor() {} | |||
private String type = "default"; | |||
public TokenServerDescriptor(String host, int port) { | |||
this.host = host; | |||
@@ -38,20 +37,10 @@ public class TokenServerDescriptor { | |||
return host; | |||
} | |||
public TokenServerDescriptor setHost(String host) { | |||
this.host = host; | |||
return this; | |||
} | |||
public int getPort() { | |||
return port; | |||
} | |||
public TokenServerDescriptor setPort(int port) { | |||
this.port = port; | |||
return this; | |||
} | |||
public String getType() { | |||
return type; | |||
} | |||
@@ -28,6 +28,12 @@ public class AssertUtil { | |||
} | |||
} | |||
public static void assertNotBlank(String string, String message) { | |||
if (StringUtil.isBlank(string)) { | |||
throw new IllegalArgumentException(message); | |||
} | |||
} | |||
public static void notNull(Object object, String message) { | |||
if (object == null) { | |||
throw new IllegalArgumentException(message); | |||