- Refactor token client common config and assign config - Log enhancement when transport to token server failed - Add `getState` method to `ClusterTokenClient` interface Signed-off-by: Eric Zhao <sczyh16@gmail.com>master
@@ -19,11 +19,12 @@ import java.util.Collection; | |||||
import java.util.concurrent.atomic.AtomicBoolean; | import java.util.concurrent.atomic.AtomicBoolean; | ||||
import com.alibaba.csp.sentinel.cluster.ClusterConstants; | import com.alibaba.csp.sentinel.cluster.ClusterConstants; | ||||
import com.alibaba.csp.sentinel.cluster.ClusterErrorMessages; | |||||
import com.alibaba.csp.sentinel.cluster.ClusterTransportClient; | import com.alibaba.csp.sentinel.cluster.ClusterTransportClient; | ||||
import com.alibaba.csp.sentinel.cluster.TokenResult; | import com.alibaba.csp.sentinel.cluster.TokenResult; | ||||
import com.alibaba.csp.sentinel.cluster.TokenResultStatus; | import com.alibaba.csp.sentinel.cluster.TokenResultStatus; | ||||
import com.alibaba.csp.sentinel.cluster.TokenServerDescriptor; | import com.alibaba.csp.sentinel.cluster.TokenServerDescriptor; | ||||
import com.alibaba.csp.sentinel.cluster.client.config.ClusterClientConfig; | |||||
import com.alibaba.csp.sentinel.cluster.client.config.ClusterClientAssignConfig; | |||||
import com.alibaba.csp.sentinel.cluster.client.config.ClusterClientConfigManager; | import com.alibaba.csp.sentinel.cluster.client.config.ClusterClientConfigManager; | ||||
import com.alibaba.csp.sentinel.cluster.client.config.ServerChangeObserver; | import com.alibaba.csp.sentinel.cluster.client.config.ServerChangeObserver; | ||||
import com.alibaba.csp.sentinel.cluster.log.ClusterClientStatLogUtil; | import com.alibaba.csp.sentinel.cluster.log.ClusterClientStatLogUtil; | ||||
@@ -51,14 +52,14 @@ public class DefaultClusterTokenClient implements ClusterTokenClient { | |||||
public DefaultClusterTokenClient() { | public DefaultClusterTokenClient() { | ||||
ClusterClientConfigManager.addServerChangeObserver(new ServerChangeObserver() { | ClusterClientConfigManager.addServerChangeObserver(new ServerChangeObserver() { | ||||
@Override | @Override | ||||
public void onRemoteServerChange(ClusterClientConfig clusterClientConfig) { | |||||
changeServer(clusterClientConfig); | |||||
public void onRemoteServerChange(ClusterClientAssignConfig assignConfig) { | |||||
changeServer(assignConfig); | |||||
} | } | ||||
}); | }); | ||||
initNewConnection(); | initNewConnection(); | ||||
} | } | ||||
private boolean serverEqual(TokenServerDescriptor descriptor, ClusterClientConfig config) { | |||||
private boolean serverEqual(TokenServerDescriptor descriptor, ClusterClientAssignConfig config) { | |||||
if (descriptor == null || config == null) { | if (descriptor == null || config == null) { | ||||
return false; | return false; | ||||
} | } | ||||
@@ -84,7 +85,7 @@ public class DefaultClusterTokenClient implements ClusterTokenClient { | |||||
} | } | ||||
} | } | ||||
private void changeServer(/*@Valid*/ ClusterClientConfig config) { | |||||
private void changeServer(/*@Valid*/ ClusterClientAssignConfig config) { | |||||
if (serverEqual(serverDescriptor, config)) { | if (serverEqual(serverDescriptor, config)) { | ||||
return; | return; | ||||
} | } | ||||
@@ -93,7 +94,7 @@ public class DefaultClusterTokenClient implements ClusterTokenClient { | |||||
transportClient.stop(); | transportClient.stop(); | ||||
} | } | ||||
// Replace with new, even if the new client is not ready. | // Replace with new, even if the new client is not ready. | ||||
this.transportClient = new NettyTransportClient(config); | |||||
this.transportClient = new NettyTransportClient(config.getServerHost(), config.getServerPort()); | |||||
this.serverDescriptor = new TokenServerDescriptor(config.getServerHost(), config.getServerPort()); | this.serverDescriptor = new TokenServerDescriptor(config.getServerHost(), config.getServerPort()); | ||||
startClientIfScheduled(); | startClientIfScheduled(); | ||||
RecordLog.info("[DefaultClusterTokenClient] New client created: " + serverDescriptor); | RecordLog.info("[DefaultClusterTokenClient] New client created: " + serverDescriptor); | ||||
@@ -132,6 +133,14 @@ public class DefaultClusterTokenClient implements ClusterTokenClient { | |||||
stopClientIfStarted(); | stopClientIfStarted(); | ||||
} | } | ||||
@Override | |||||
public int getState() { | |||||
if (transportClient == null) { | |||||
return ClientConstants.CLIENT_STATUS_OFF; | |||||
} | |||||
return transportClient.isReady() ? ClientConstants.CLIENT_STATUS_STARTED : ClientConstants.CLIENT_STATUS_OFF; | |||||
} | |||||
@Override | @Override | ||||
public TokenServerDescriptor currentServer() { | public TokenServerDescriptor currentServer() { | ||||
return serverDescriptor; | return serverDescriptor; | ||||
@@ -146,7 +155,9 @@ public class DefaultClusterTokenClient implements ClusterTokenClient { | |||||
.setFlowId(flowId).setPriority(prioritized); | .setFlowId(flowId).setPriority(prioritized); | ||||
ClusterRequest<FlowRequestData> request = new ClusterRequest<>(ClusterConstants.MSG_TYPE_FLOW, data); | ClusterRequest<FlowRequestData> request = new ClusterRequest<>(ClusterConstants.MSG_TYPE_FLOW, data); | ||||
try { | try { | ||||
return sendTokenRequest(request); | |||||
TokenResult result = sendTokenRequest(request); | |||||
logForResult(result); | |||||
return result; | |||||
} catch (Exception ex) { | } catch (Exception ex) { | ||||
ClusterClientStatLogUtil.log(ex.getMessage()); | ClusterClientStatLogUtil.log(ex.getMessage()); | ||||
return new TokenResult(TokenResultStatus.FAIL); | return new TokenResult(TokenResultStatus.FAIL); | ||||
@@ -162,16 +173,31 @@ public class DefaultClusterTokenClient implements ClusterTokenClient { | |||||
.setFlowId(flowId).setParams(params); | .setFlowId(flowId).setParams(params); | ||||
ClusterRequest<ParamFlowRequestData> request = new ClusterRequest<>(ClusterConstants.MSG_TYPE_PARAM_FLOW, data); | ClusterRequest<ParamFlowRequestData> request = new ClusterRequest<>(ClusterConstants.MSG_TYPE_PARAM_FLOW, data); | ||||
try { | try { | ||||
return sendTokenRequest(request); | |||||
TokenResult result = sendTokenRequest(request); | |||||
logForResult(result); | |||||
return result; | |||||
} catch (Exception ex) { | } catch (Exception ex) { | ||||
ClusterClientStatLogUtil.log(ex.getMessage()); | ClusterClientStatLogUtil.log(ex.getMessage()); | ||||
return new TokenResult(TokenResultStatus.FAIL); | return new TokenResult(TokenResultStatus.FAIL); | ||||
} | } | ||||
} | } | ||||
private void logForResult(TokenResult result) { | |||||
switch (result.getStatus()) { | |||||
case TokenResultStatus.NO_RULE_EXISTS: | |||||
ClusterClientStatLogUtil.log(ClusterErrorMessages.NO_RULES_IN_SERVER); | |||||
break; | |||||
case TokenResultStatus.TOO_MANY_REQUEST: | |||||
ClusterClientStatLogUtil.log(ClusterErrorMessages.TOO_MANY_REQUESTS); | |||||
break; | |||||
default: | |||||
} | |||||
} | |||||
private TokenResult sendTokenRequest(ClusterRequest request) throws Exception { | private TokenResult sendTokenRequest(ClusterRequest request) throws Exception { | ||||
if (transportClient == null) { | if (transportClient == null) { | ||||
RecordLog.warn("[DefaultClusterTokenClient] Client not created, please check your config for cluster client"); | |||||
RecordLog.warn( | |||||
"[DefaultClusterTokenClient] Client not created, please check your config for cluster client"); | |||||
return clientFail(); | return clientFail(); | ||||
} | } | ||||
ClusterResponse response = transportClient.sendRequest(request); | ClusterResponse response = transportClient.sendRequest(request); | ||||
@@ -0,0 +1,59 @@ | |||||
/* | |||||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||||
* | |||||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||||
* you may not use this file except in compliance with the License. | |||||
* You may obtain a copy of the License at | |||||
* | |||||
* http://www.apache.org/licenses/LICENSE-2.0 | |||||
* | |||||
* Unless required by applicable law or agreed to in writing, software | |||||
* distributed under the License is distributed on an "AS IS" BASIS, | |||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
* See the License for the specific language governing permissions and | |||||
* limitations under the License. | |||||
*/ | |||||
package com.alibaba.csp.sentinel.cluster.client.config; | |||||
/** | |||||
* @author Eric Zhao | |||||
* @since 1.4.1 | |||||
*/ | |||||
public class ClusterClientAssignConfig { | |||||
private String serverHost; | |||||
private Integer serverPort; | |||||
public ClusterClientAssignConfig() {} | |||||
public ClusterClientAssignConfig(String serverHost, Integer serverPort) { | |||||
this.serverHost = serverHost; | |||||
this.serverPort = serverPort; | |||||
} | |||||
public String getServerHost() { | |||||
return serverHost; | |||||
} | |||||
public ClusterClientAssignConfig setServerHost(String serverHost) { | |||||
this.serverHost = serverHost; | |||||
return this; | |||||
} | |||||
public Integer getServerPort() { | |||||
return serverPort; | |||||
} | |||||
public ClusterClientAssignConfig setServerPort(Integer serverPort) { | |||||
this.serverPort = serverPort; | |||||
return this; | |||||
} | |||||
@Override | |||||
public String toString() { | |||||
return "ClusterClientAssignConfig{" + | |||||
"serverHost='" + serverHost + '\'' + | |||||
", serverPort=" + serverPort + | |||||
'}'; | |||||
} | |||||
} |
@@ -21,55 +21,21 @@ package com.alibaba.csp.sentinel.cluster.client.config; | |||||
*/ | */ | ||||
public class ClusterClientConfig { | public class ClusterClientConfig { | ||||
private String serverHost; | |||||
private int serverPort; | |||||
private Integer requestTimeout; | |||||
private int requestTimeout; | |||||
private int connectTimeout; | |||||
public String getServerHost() { | |||||
return serverHost; | |||||
} | |||||
public ClusterClientConfig setServerHost(String serverHost) { | |||||
this.serverHost = serverHost; | |||||
return this; | |||||
} | |||||
public int getServerPort() { | |||||
return serverPort; | |||||
} | |||||
public ClusterClientConfig setServerPort(int serverPort) { | |||||
this.serverPort = serverPort; | |||||
return this; | |||||
} | |||||
public int getRequestTimeout() { | |||||
public Integer getRequestTimeout() { | |||||
return requestTimeout; | return requestTimeout; | ||||
} | } | ||||
public ClusterClientConfig setRequestTimeout(int requestTimeout) { | |||||
public ClusterClientConfig setRequestTimeout(Integer requestTimeout) { | |||||
this.requestTimeout = requestTimeout; | this.requestTimeout = requestTimeout; | ||||
return this; | return this; | ||||
} | } | ||||
public int getConnectTimeout() { | |||||
return connectTimeout; | |||||
} | |||||
public ClusterClientConfig setConnectTimeout(int connectTimeout) { | |||||
this.connectTimeout = connectTimeout; | |||||
return this; | |||||
} | |||||
@Override | @Override | ||||
public String toString() { | public String toString() { | ||||
return "ClusterClientConfig{" + | return "ClusterClientConfig{" + | ||||
"serverHost='" + serverHost + '\'' + | |||||
", serverPort=" + serverPort + | |||||
", requestTimeout=" + requestTimeout + | |||||
", connectTimeout=" + connectTimeout + | |||||
"requestTimeout=" + requestTimeout + | |||||
'}'; | '}'; | ||||
} | } | ||||
} | } |
@@ -37,23 +37,54 @@ public final class ClusterClientConfigManager { | |||||
*/ | */ | ||||
private static volatile String serverHost = null; | private static volatile String serverHost = null; | ||||
private static volatile int serverPort = ClusterConstants.DEFAULT_CLUSTER_SERVER_PORT; | private static volatile int serverPort = ClusterConstants.DEFAULT_CLUSTER_SERVER_PORT; | ||||
private static volatile int requestTimeout = ClusterConstants.DEFAULT_REQUEST_TIMEOUT; | private static volatile int requestTimeout = ClusterConstants.DEFAULT_REQUEST_TIMEOUT; | ||||
private static volatile int connectTimeout = ClusterConstants.DEFAULT_CONNECT_TIMEOUT_MILLIS; | |||||
private static final PropertyListener<ClusterClientConfig> CONFIG_PROPERTY_LISTENER | |||||
= new ClientConfigPropertyListener(); | |||||
private static final PropertyListener<ClusterClientAssignConfig> ASSIGN_PROPERTY_LISTENER | |||||
= new ClientAssignPropertyListener(); | |||||
private static final PropertyListener<ClusterClientConfig> PROPERTY_LISTENER = new ClientConfigPropertyListener(); | |||||
private static SentinelProperty<ClusterClientConfig> currentProperty = new DynamicSentinelProperty<>(); | |||||
private static SentinelProperty<ClusterClientConfig> clientConfigProperty = new DynamicSentinelProperty<>(); | |||||
private static SentinelProperty<ClusterClientAssignConfig> clientAssignProperty = new DynamicSentinelProperty<>(); | |||||
private static final List<ServerChangeObserver> SERVER_CHANGE_OBSERVERS = new ArrayList<>(); | private static final List<ServerChangeObserver> SERVER_CHANGE_OBSERVERS = new ArrayList<>(); | ||||
static { | static { | ||||
currentProperty.addListener(PROPERTY_LISTENER); | |||||
bindPropertyListener(); | |||||
} | |||||
private static void bindPropertyListener() { | |||||
removePropertyListener(); | |||||
clientAssignProperty.addListener(ASSIGN_PROPERTY_LISTENER); | |||||
clientConfigProperty.addListener(CONFIG_PROPERTY_LISTENER); | |||||
} | } | ||||
public static void register2Property(SentinelProperty<ClusterClientConfig> property) { | |||||
synchronized (PROPERTY_LISTENER) { | |||||
RecordLog.info("[ClusterClientConfigManager] Registering new property to cluster client config manager"); | |||||
currentProperty.removeListener(PROPERTY_LISTENER); | |||||
property.addListener(PROPERTY_LISTENER); | |||||
currentProperty = property; | |||||
private static void removePropertyListener() { | |||||
clientAssignProperty.removeListener(ASSIGN_PROPERTY_LISTENER); | |||||
clientConfigProperty.removeListener(CONFIG_PROPERTY_LISTENER); | |||||
} | |||||
public static void registerServerAssignProperty(SentinelProperty<ClusterClientAssignConfig> property) { | |||||
AssertUtil.notNull(property, "property cannot be null"); | |||||
synchronized (ASSIGN_PROPERTY_LISTENER) { | |||||
RecordLog.info("[ClusterClientConfigManager] Registering new server assignment property to cluster " | |||||
+ "client config manager"); | |||||
clientAssignProperty.removeListener(ASSIGN_PROPERTY_LISTENER); | |||||
property.addListener(ASSIGN_PROPERTY_LISTENER); | |||||
clientAssignProperty = property; | |||||
} | |||||
} | |||||
public static void registerClientConfigProperty(SentinelProperty<ClusterClientConfig> property) { | |||||
AssertUtil.notNull(property, "property cannot be null"); | |||||
synchronized (CONFIG_PROPERTY_LISTENER) { | |||||
RecordLog.info("[ClusterClientConfigManager] Registering new global client config property to " | |||||
+ "cluster client config manager"); | |||||
clientConfigProperty.removeListener(CONFIG_PROPERTY_LISTENER); | |||||
property.addListener(CONFIG_PROPERTY_LISTENER); | |||||
clientConfigProperty = property; | |||||
} | } | ||||
} | } | ||||
@@ -68,54 +99,83 @@ public final class ClusterClientConfigManager { | |||||
* @param config new config to apply | * @param config new config to apply | ||||
*/ | */ | ||||
public static void applyNewConfig(ClusterClientConfig config) { | public static void applyNewConfig(ClusterClientConfig config) { | ||||
currentProperty.updateValue(config); | |||||
clientConfigProperty.updateValue(config); | |||||
} | } | ||||
private static class ClientConfigPropertyListener implements PropertyListener<ClusterClientConfig> { | |||||
public static void applyNewAssignConfig(ClusterClientAssignConfig clusterClientAssignConfig) { | |||||
clientAssignProperty.updateValue(clusterClientAssignConfig); | |||||
} | |||||
private static class ClientAssignPropertyListener implements PropertyListener<ClusterClientAssignConfig> { | |||||
@Override | @Override | ||||
public void configUpdate(ClusterClientConfig config) { | |||||
public void configLoad(ClusterClientAssignConfig config) { | |||||
if (config == null) { | |||||
RecordLog.warn("[ClusterClientConfigManager] Empty initial client assignment config"); | |||||
return; | |||||
} | |||||
applyConfig(config); | applyConfig(config); | ||||
} | } | ||||
@Override | |||||
public void configUpdate(ClusterClientAssignConfig config) { | |||||
applyConfig(config); | |||||
} | |||||
private synchronized void applyConfig(ClusterClientAssignConfig config) { | |||||
if (!isValidAssignConfig(config)) { | |||||
RecordLog.warn( | |||||
"[ClusterClientConfigManager] Invalid cluster client assign config, ignoring: " + config); | |||||
return; | |||||
} | |||||
if (serverPort == config.getServerPort() && config.getServerHost().equals(serverHost)) { | |||||
return; | |||||
} | |||||
RecordLog.info("[ClusterClientConfigManager] Assign to new target token server: " + config); | |||||
updateServerAssignment(config); | |||||
} | |||||
} | |||||
private static class ClientConfigPropertyListener implements PropertyListener<ClusterClientConfig> { | |||||
@Override | @Override | ||||
public void configLoad(ClusterClientConfig config) { | public void configLoad(ClusterClientConfig config) { | ||||
if (config == null) { | if (config == null) { | ||||
RecordLog.warn("[ClusterClientConfigManager] Empty initial config"); | |||||
RecordLog.warn("[ClusterClientConfigManager] Empty initial client config"); | |||||
return; | return; | ||||
} | } | ||||
applyConfig(config); | applyConfig(config); | ||||
} | } | ||||
@Override | |||||
public void configUpdate(ClusterClientConfig config) { | |||||
applyConfig(config); | |||||
} | |||||
private synchronized void applyConfig(ClusterClientConfig config) { | private synchronized void applyConfig(ClusterClientConfig config) { | ||||
if (!isValidConfig(config)) { | |||||
if (!isValidClientConfig(config)) { | |||||
RecordLog.warn( | RecordLog.warn( | ||||
"[ClusterClientConfigManager] Invalid cluster client config, ignoring: " + config); | "[ClusterClientConfigManager] Invalid cluster client config, ignoring: " + config); | ||||
return; | return; | ||||
} | } | ||||
RecordLog.info("[ClusterClientConfigManager] Updating new config: " + config); | |||||
if (config.getRequestTimeout() != requestTimeout) { | |||||
requestTimeout = config.getRequestTimeout(); | |||||
} | |||||
updateServer(config); | |||||
RecordLog.info("[ClusterClientConfigManager] Updating to new client config: " + config); | |||||
updateClientConfigChange(config); | |||||
} | } | ||||
} | } | ||||
public static boolean isValidConfig(ClusterClientConfig config) { | |||||
return config != null && StringUtil.isNotBlank(config.getServerHost()) | |||||
&& config.getServerPort() > 0 | |||||
&& config.getServerPort() <= 65535 | |||||
&& config.getRequestTimeout() > 0; | |||||
private static void updateClientConfigChange(ClusterClientConfig config) { | |||||
if (config.getRequestTimeout() != requestTimeout) { | |||||
requestTimeout = config.getRequestTimeout(); | |||||
} | |||||
} | } | ||||
public static void updateServer(ClusterClientConfig config) { | |||||
private static void updateServerAssignment(/*@Valid*/ ClusterClientAssignConfig config) { | |||||
String host = config.getServerHost(); | String host = config.getServerHost(); | ||||
int port = config.getServerPort(); | int port = config.getServerPort(); | ||||
AssertUtil.assertNotBlank(host, "token server host cannot be empty"); | |||||
AssertUtil.isTrue(port > 0, "token server port should be valid (positive)"); | |||||
if (serverPort == port && host.equals(serverHost)) { | |||||
return; | |||||
} | |||||
for (ServerChangeObserver observer : SERVER_CHANGE_OBSERVERS) { | for (ServerChangeObserver observer : SERVER_CHANGE_OBSERVERS) { | ||||
observer.onRemoteServerChange(config); | observer.onRemoteServerChange(config); | ||||
} | } | ||||
@@ -124,6 +184,16 @@ public final class ClusterClientConfigManager { | |||||
serverPort = port; | serverPort = port; | ||||
} | } | ||||
public static boolean isValidAssignConfig(ClusterClientAssignConfig config) { | |||||
return config != null && StringUtil.isNotBlank(config.getServerHost()) | |||||
&& config.getServerPort() > 0 | |||||
&& config.getServerPort() <= 65535; | |||||
} | |||||
public static boolean isValidClientConfig(ClusterClientConfig config) { | |||||
return config != null && config.getRequestTimeout() > 0; | |||||
} | |||||
public static String getServerHost() { | public static String getServerHost() { | ||||
return serverHost; | return serverHost; | ||||
} | } | ||||
@@ -136,5 +206,9 @@ public final class ClusterClientConfigManager { | |||||
return requestTimeout; | return requestTimeout; | ||||
} | } | ||||
public static int getConnectTimeout() { | |||||
return connectTimeout; | |||||
} | |||||
private ClusterClientConfigManager() {} | private ClusterClientConfigManager() {} | ||||
} | } |
@@ -24,7 +24,7 @@ public interface ServerChangeObserver { | |||||
/** | /** | ||||
* Callback on remote server address change. | * Callback on remote server address change. | ||||
* | * | ||||
* @param clusterClientConfig new cluster client config | |||||
* @param assignConfig new cluster assignment config | |||||
*/ | */ | ||||
void onRemoteServerChange(ClusterClientConfig clusterClientConfig); | |||||
void onRemoteServerChange(ClusterClientAssignConfig assignConfig); | |||||
} | } |
@@ -0,0 +1,89 @@ | |||||
/* | |||||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||||
* | |||||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||||
* you may not use this file except in compliance with the License. | |||||
* You may obtain a copy of the License at | |||||
* | |||||
* http://www.apache.org/licenses/LICENSE-2.0 | |||||
* | |||||
* Unless required by applicable law or agreed to in writing, software | |||||
* distributed under the License is distributed on an "AS IS" BASIS, | |||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
* See the License for the specific language governing permissions and | |||||
* limitations under the License. | |||||
*/ | |||||
package com.alibaba.csp.sentinel.command.entity; | |||||
import com.alibaba.csp.sentinel.cluster.client.config.ClusterClientAssignConfig; | |||||
import com.alibaba.csp.sentinel.cluster.client.config.ClusterClientConfig; | |||||
/** | |||||
* @author Eric Zhao | |||||
* @since 1.4.1 | |||||
*/ | |||||
public class ClusterClientStateEntity { | |||||
private String serverHost; | |||||
private Integer serverPort; | |||||
private Integer clientState; | |||||
private Integer requestTimeout; | |||||
public String getServerHost() { | |||||
return serverHost; | |||||
} | |||||
public ClusterClientStateEntity setServerHost(String serverHost) { | |||||
this.serverHost = serverHost; | |||||
return this; | |||||
} | |||||
public Integer getServerPort() { | |||||
return serverPort; | |||||
} | |||||
public ClusterClientStateEntity setServerPort(Integer serverPort) { | |||||
this.serverPort = serverPort; | |||||
return this; | |||||
} | |||||
public Integer getRequestTimeout() { | |||||
return requestTimeout; | |||||
} | |||||
public ClusterClientStateEntity setRequestTimeout(Integer requestTimeout) { | |||||
this.requestTimeout = requestTimeout; | |||||
return this; | |||||
} | |||||
public Integer getClientState() { | |||||
return clientState; | |||||
} | |||||
public ClusterClientStateEntity setClientState(Integer clientState) { | |||||
this.clientState = clientState; | |||||
return this; | |||||
} | |||||
public ClusterClientConfig toClientConfig() { | |||||
return new ClusterClientConfig().setRequestTimeout(requestTimeout); | |||||
} | |||||
public ClusterClientAssignConfig toAssignConfig() { | |||||
return new ClusterClientAssignConfig() | |||||
.setServerHost(serverHost) | |||||
.setServerPort(serverPort); | |||||
} | |||||
@Override | |||||
public String toString() { | |||||
return "ClusterClientStateEntity{" + | |||||
"serverHost='" + serverHost + '\'' + | |||||
", serverPort=" + serverPort + | |||||
", clientState=" + clientState + | |||||
", requestTimeout=" + requestTimeout + | |||||
'}'; | |||||
} | |||||
} |
@@ -15,12 +15,14 @@ | |||||
*/ | */ | ||||
package com.alibaba.csp.sentinel.command.handler; | package com.alibaba.csp.sentinel.command.handler; | ||||
import com.alibaba.csp.sentinel.cluster.client.config.ClusterClientConfig; | |||||
import com.alibaba.csp.sentinel.cluster.client.ClientConstants; | |||||
import com.alibaba.csp.sentinel.cluster.client.TokenClientProvider; | |||||
import com.alibaba.csp.sentinel.cluster.client.config.ClusterClientConfigManager; | import com.alibaba.csp.sentinel.cluster.client.config.ClusterClientConfigManager; | ||||
import com.alibaba.csp.sentinel.command.CommandHandler; | import com.alibaba.csp.sentinel.command.CommandHandler; | ||||
import com.alibaba.csp.sentinel.command.CommandRequest; | import com.alibaba.csp.sentinel.command.CommandRequest; | ||||
import com.alibaba.csp.sentinel.command.CommandResponse; | import com.alibaba.csp.sentinel.command.CommandResponse; | ||||
import com.alibaba.csp.sentinel.command.annotation.CommandMapping; | import com.alibaba.csp.sentinel.command.annotation.CommandMapping; | ||||
import com.alibaba.csp.sentinel.command.entity.ClusterClientStateEntity; | |||||
import com.alibaba.fastjson.JSON; | import com.alibaba.fastjson.JSON; | ||||
/** | /** | ||||
@@ -32,11 +34,16 @@ public class FetchClusterClientConfigHandler implements CommandHandler<String> { | |||||
@Override | @Override | ||||
public CommandResponse<String> handle(CommandRequest request) { | public CommandResponse<String> handle(CommandRequest request) { | ||||
ClusterClientConfig config = new ClusterClientConfig() | |||||
ClusterClientStateEntity stateVO = new ClusterClientStateEntity() | |||||
.setServerHost(ClusterClientConfigManager.getServerHost()) | .setServerHost(ClusterClientConfigManager.getServerHost()) | ||||
.setServerPort(ClusterClientConfigManager.getServerPort()) | .setServerPort(ClusterClientConfigManager.getServerPort()) | ||||
.setRequestTimeout(ClusterClientConfigManager.getRequestTimeout()); | .setRequestTimeout(ClusterClientConfigManager.getRequestTimeout()); | ||||
return CommandResponse.ofSuccess(JSON.toJSONString(config)); | |||||
if (TokenClientProvider.isClientSpiAvailable()) { | |||||
stateVO.setClientState(TokenClientProvider.getClient().getState()); | |||||
} else { | |||||
stateVO.setClientState(ClientConstants.CLIENT_STATUS_OFF); | |||||
} | |||||
return CommandResponse.ofSuccess(JSON.toJSONString(stateVO)); | |||||
} | } | ||||
} | } | ||||
@@ -19,10 +19,12 @@ import java.net.URLDecoder; | |||||
import com.alibaba.csp.sentinel.cluster.client.config.ClusterClientConfig; | import com.alibaba.csp.sentinel.cluster.client.config.ClusterClientConfig; | ||||
import com.alibaba.csp.sentinel.cluster.client.config.ClusterClientConfigManager; | import com.alibaba.csp.sentinel.cluster.client.config.ClusterClientConfigManager; | ||||
import com.alibaba.csp.sentinel.command.CommandConstants; | |||||
import com.alibaba.csp.sentinel.command.CommandHandler; | import com.alibaba.csp.sentinel.command.CommandHandler; | ||||
import com.alibaba.csp.sentinel.command.CommandRequest; | import com.alibaba.csp.sentinel.command.CommandRequest; | ||||
import com.alibaba.csp.sentinel.command.CommandResponse; | import com.alibaba.csp.sentinel.command.CommandResponse; | ||||
import com.alibaba.csp.sentinel.command.annotation.CommandMapping; | import com.alibaba.csp.sentinel.command.annotation.CommandMapping; | ||||
import com.alibaba.csp.sentinel.command.entity.ClusterClientStateEntity; | |||||
import com.alibaba.csp.sentinel.log.RecordLog; | import com.alibaba.csp.sentinel.log.RecordLog; | ||||
import com.alibaba.csp.sentinel.util.StringUtil; | import com.alibaba.csp.sentinel.util.StringUtil; | ||||
import com.alibaba.fastjson.JSON; | import com.alibaba.fastjson.JSON; | ||||
@@ -43,10 +45,12 @@ public class ModifyClusterClientConfigHandler implements CommandHandler<String> | |||||
try { | try { | ||||
data = URLDecoder.decode(data, "utf-8"); | data = URLDecoder.decode(data, "utf-8"); | ||||
RecordLog.info("[ModifyClusterClientConfigHandler] Receiving cluster client config: " + data); | RecordLog.info("[ModifyClusterClientConfigHandler] Receiving cluster client config: " + data); | ||||
ClusterClientConfig clusterClientConfig = JSON.parseObject(data, ClusterClientConfig.class); | |||||
ClusterClientConfigManager.applyNewConfig(clusterClientConfig); | |||||
ClusterClientStateEntity entity = JSON.parseObject(data, ClusterClientStateEntity.class); | |||||
return CommandResponse.ofSuccess("success"); | |||||
ClusterClientConfigManager.applyNewConfig(entity.toClientConfig()); | |||||
ClusterClientConfigManager.applyNewAssignConfig(entity.toAssignConfig()); | |||||
return CommandResponse.ofSuccess(CommandConstants.MSG_SUCCESS); | |||||
} catch (Exception e) { | } catch (Exception e) { | ||||
RecordLog.warn("[ModifyClusterClientConfigHandler] Decode client cluster config error", e); | RecordLog.warn("[ModifyClusterClientConfigHandler] Decode client cluster config error", e); | ||||
return CommandResponse.ofFailure(e, "decode client cluster config error"); | return CommandResponse.ofFailure(e, "decode client cluster config error"); | ||||
@@ -37,8 +37,9 @@ public final class ClusterConstants { | |||||
public static final int PARAM_TYPE_BOOLEAN = 6; | public static final int PARAM_TYPE_BOOLEAN = 6; | ||||
public static final int PARAM_TYPE_STRING = 7; | public static final int PARAM_TYPE_STRING = 7; | ||||
public static final int DEFAULT_CLUSTER_SERVER_PORT = 8730; | |||||
public static final int DEFAULT_CLUSTER_SERVER_PORT = 18730; | |||||
public static final int DEFAULT_REQUEST_TIMEOUT = 20; | public static final int DEFAULT_REQUEST_TIMEOUT = 20; | ||||
public static final int DEFAULT_CONNECT_TIMEOUT_MILLIS = 10 * 1000; | |||||
private ClusterConstants() {} | private ClusterConstants() {} | ||||
} | } |
@@ -46,4 +46,11 @@ public interface ClusterTokenClient extends TokenService { | |||||
* @throws Exception some error occurs | * @throws Exception some error occurs | ||||
*/ | */ | ||||
void stop() throws Exception; | void stop() throws Exception; | ||||
/** | |||||
* Get state of the cluster token client. | |||||
* | |||||
* @return state of the cluster token client | |||||
*/ | |||||
int getState(); | |||||
} | } |