Signed-off-by: Eric Zhao <sczyh16@gmail.com>master
@@ -509,7 +509,7 @@ public class SentinelApiClient { | |||
// Cluster related | |||
public CompletableFuture<ClusterStateSimpleEntity> fetchClusterMode(String app, String ip, int port) { | |||
public CompletableFuture<ClusterStateSimpleEntity> fetchClusterMode(String ip, int port) { | |||
if (StringUtil.isBlank(ip) || port <= 0) { | |||
return AsyncUtils.newFailedFuture(new IllegalArgumentException("Invalid parameter")); | |||
} | |||
@@ -525,7 +525,7 @@ public class SentinelApiClient { | |||
} | |||
} | |||
public CompletableFuture<Void> modifyClusterMode(String app, String ip, int port, int mode) { | |||
public CompletableFuture<Void> modifyClusterMode(String ip, int port, int mode) { | |||
if (StringUtil.isBlank(ip) || port <= 0) { | |||
return AsyncUtils.newFailedFuture(new IllegalArgumentException("Invalid parameter")); | |||
} | |||
@@ -549,7 +549,7 @@ public class SentinelApiClient { | |||
} | |||
} | |||
public CompletableFuture<ClusterClientInfoVO> fetchClusterClientInfoAndConfig(String app, String ip, int port) { | |||
public CompletableFuture<ClusterClientInfoVO> fetchClusterClientInfoAndConfig(String ip, int port) { | |||
if (StringUtil.isBlank(ip) || port <= 0) { | |||
return AsyncUtils.newFailedFuture(new IllegalArgumentException("Invalid parameter")); | |||
} | |||
@@ -662,7 +662,7 @@ public class SentinelApiClient { | |||
} | |||
} | |||
public CompletableFuture<ClusterServerStateVO> fetchClusterServerBasicInfo(String app, String ip, int port) { | |||
public CompletableFuture<ClusterServerStateVO> fetchClusterServerBasicInfo(String ip, int port) { | |||
if (StringUtil.isBlank(ip) || port <= 0) { | |||
return AsyncUtils.newFailedFuture(new IllegalArgumentException("Invalid parameter")); | |||
} | |||
@@ -28,6 +28,8 @@ import com.alibaba.csp.sentinel.dashboard.domain.cluster.config.ServerTransportC | |||
*/ | |||
public class ClusterServerStateVO { | |||
private String appName; | |||
private ServerTransportConfig transport; | |||
private ServerFlowConfig flow; | |||
private Set<String> namespaceSet; | |||
@@ -39,6 +41,15 @@ public class ClusterServerStateVO { | |||
private Boolean embedded; | |||
public String getAppName() { | |||
return appName; | |||
} | |||
public ClusterServerStateVO setAppName(String appName) { | |||
this.appName = appName; | |||
return this; | |||
} | |||
public ServerTransportConfig getTransport() { | |||
return transport; | |||
} | |||
@@ -105,7 +116,8 @@ public class ClusterServerStateVO { | |||
@Override | |||
public String toString() { | |||
return "ClusterServerStateVO{" + | |||
"transport=" + transport + | |||
"appName='" + appName + '\'' + | |||
", transport=" + transport + | |||
", flow=" + flow + | |||
", namespaceSet=" + namespaceSet + | |||
", port=" + port + | |||
@@ -23,8 +23,10 @@ import java.util.Set; | |||
import java.util.concurrent.CompletableFuture; | |||
import java.util.concurrent.ExecutionException; | |||
import java.util.concurrent.TimeUnit; | |||
import java.util.stream.Collectors; | |||
import com.alibaba.csp.sentinel.cluster.ClusterStateManager; | |||
import com.alibaba.csp.sentinel.dashboard.domain.cluster.state.ClusterUniversalStatePairVO; | |||
import com.alibaba.csp.sentinel.util.AssertUtil; | |||
import com.alibaba.csp.sentinel.util.function.Tuple2; | |||
@@ -55,12 +57,54 @@ public class ClusterAssignServiceImpl implements ClusterAssignService { | |||
@Autowired | |||
private ClusterConfigService clusterConfigService; | |||
private boolean isMachineInApp(/*@NonEmpty*/ String machineId) { | |||
return machineId.contains(":"); | |||
} | |||
private ClusterAppAssignResultVO handleUnbindClusterServerNotInApp(String app, String machineId) { | |||
Set<String> failedSet = new HashSet<>(); | |||
try { | |||
List<ClusterUniversalStatePairVO> list = clusterConfigService.getClusterUniversalState(app) | |||
.get(10, TimeUnit.SECONDS); | |||
Set<String> toModifySet = list.stream() | |||
.filter(e -> e.getState().getStateInfo().getMode() == ClusterStateManager.CLUSTER_CLIENT) | |||
.filter(e -> machineId.equals(e.getState().getClient().getClientConfig().getServerHost() + ':' + | |||
e.getState().getClient().getClientConfig().getServerPort())) | |||
.map(e -> e.getIp() + '@' + e.getCommandPort()) | |||
.collect(Collectors.toSet()); | |||
// Modify mode to NOT-STARTED for all associated token clients. | |||
modifyToNonStarted(toModifySet, failedSet); | |||
} catch (Exception ex) { | |||
Throwable e = ex instanceof ExecutionException ? ex.getCause() : ex; | |||
LOGGER.error("Failed to unbind machine <{}>", machineId, e); | |||
failedSet.add(machineId); | |||
} | |||
return new ClusterAppAssignResultVO() | |||
.setFailedClientSet(failedSet) | |||
.setFailedServerSet(new HashSet<>()); | |||
} | |||
private void modifyToNonStarted(Set<String> toModifySet, Set<String> failedSet) { | |||
toModifySet.parallelStream() | |||
.map(MachineUtils::parseCommandIpAndPort) | |||
.filter(Optional::isPresent) | |||
.map(Optional::get) | |||
.map(e -> { | |||
CompletableFuture<Void> f = modifyMode(e.r1, e.r2, ClusterStateManager.CLUSTER_NOT_STARTED); | |||
return Tuple2.of(e.r1 + '@' + e.r2, f); | |||
}) | |||
.forEach(f -> handleFutureSync(f, failedSet)); | |||
} | |||
@Override | |||
public ClusterAppAssignResultVO unbindClusterServer(String app, String machineId) { | |||
AssertUtil.assertNotBlank(app, "app cannot be blank"); | |||
AssertUtil.assertNotBlank(machineId, "machineId cannot be blank"); | |||
Set<String> failedSet = new HashSet<>(); | |||
if (isMachineInApp(machineId)) { | |||
return handleUnbindClusterServerNotInApp(app, machineId); | |||
} | |||
Set<String> failedSet = new HashSet<>(); | |||
try { | |||
ClusterGroupEntity entity = clusterConfigService.getClusterUniversalStateForAppMachine(app, machineId) | |||
.get(10, TimeUnit.SECONDS); | |||
@@ -70,15 +114,7 @@ public class ClusterAssignServiceImpl implements ClusterAssignService { | |||
toModifySet.addAll(entity.getClientSet()); | |||
} | |||
// Modify mode to NOT-STARTED for all chosen token servers and associated token clients. | |||
toModifySet.parallelStream() | |||
.map(MachineUtils::parseCommandIpAndPort) | |||
.filter(Optional::isPresent) | |||
.map(Optional::get) | |||
.map(e -> { | |||
CompletableFuture<Void> f = modifyMode(app, e.r1, e.r2, ClusterStateManager.CLUSTER_NOT_STARTED); | |||
return Tuple2.of(e.r1 + '@' + e.r2, f); | |||
}) | |||
.forEach(f -> handleFutureSync(f, failedSet)); | |||
modifyToNonStarted(toModifySet, failedSet); | |||
} catch (Exception ex) { | |||
Throwable e = ex instanceof ExecutionException ? ex.getCause() : ex; | |||
LOGGER.error("Failed to unbind machine <{}>", machineId, e); | |||
@@ -119,7 +155,7 @@ public class ClusterAssignServiceImpl implements ClusterAssignService { | |||
.map(e -> { | |||
String ip = e.getIp(); | |||
int commandPort = parsePort(e); | |||
CompletableFuture<Void> f = modifyMode(app, ip, commandPort, ClusterStateManager.CLUSTER_SERVER) | |||
CompletableFuture<Void> f = modifyMode(ip, commandPort, ClusterStateManager.CLUSTER_SERVER) | |||
.thenCompose(v -> applyServerConfigChange(app, ip, commandPort, e)); | |||
return Tuple2.of(e.getMachineId(), f); | |||
}) | |||
@@ -150,7 +186,7 @@ public class ClusterAssignServiceImpl implements ClusterAssignService { | |||
.map(ipPort -> { | |||
String ip = ipPort.r1; | |||
int commandPort = ipPort.r2; | |||
CompletableFuture<Void> f = modifyMode(app, ip, commandPort, ClusterStateManager.CLUSTER_NOT_STARTED); | |||
CompletableFuture<Void> f = modifyMode(ip, commandPort, ClusterStateManager.CLUSTER_NOT_STARTED); | |||
return Tuple2.of(ip + '@' + commandPort, f); | |||
}) | |||
.forEach(t -> handleFutureSync(t, failedSet)); | |||
@@ -170,7 +206,7 @@ public class ClusterAssignServiceImpl implements ClusterAssignService { | |||
.map(Optional::get) | |||
.map(ipPort -> { | |||
CompletableFuture<Void> f = sentinelApiClient | |||
.modifyClusterMode(app, ipPort.r1, ipPort.r2, ClusterStateManager.CLUSTER_CLIENT) | |||
.modifyClusterMode(ipPort.r1, ipPort.r2, ClusterStateManager.CLUSTER_CLIENT) | |||
.thenCompose(v -> sentinelApiClient.modifyClusterClientConfig(app, ipPort.r1, ipPort.r2, | |||
new ClusterClientConfig().setRequestTimeout(20) | |||
.setServerHost(serverIp) | |||
@@ -223,8 +259,8 @@ public class ClusterAssignServiceImpl implements ClusterAssignService { | |||
return sentinelApiClient.modifyClusterServerNamespaceSet(app, ip, commandPort, namespaceSet); | |||
} | |||
private CompletableFuture<Void> modifyMode(String app, String ip, int port, int mode) { | |||
return sentinelApiClient.modifyClusterMode(app, ip, port, mode); | |||
private CompletableFuture<Void> modifyMode(String ip, int port, int mode) { | |||
return sentinelApiClient.modifyClusterMode(ip, port, mode); | |||
} | |||
private int parsePort(ClusterAppAssignMap assignMap) { | |||
@@ -62,7 +62,7 @@ public class ClusterConfigService { | |||
String ip = request.getIp(); | |||
int port = request.getPort(); | |||
return sentinelApiClient.modifyClusterClientConfig(app, ip, port, request.getClientConfig()) | |||
.thenCompose(v -> sentinelApiClient.modifyClusterMode(app, ip, port, ClusterStateManager.CLUSTER_CLIENT)); | |||
.thenCompose(v -> sentinelApiClient.modifyClusterMode(ip, port, ClusterStateManager.CLUSTER_CLIENT)); | |||
} | |||
private boolean notClientRequestValid(/*@NonNull */ ClusterClientModifyRequest request) { | |||
@@ -91,7 +91,7 @@ public class ClusterConfigService { | |||
return sentinelApiClient.modifyClusterServerNamespaceSet(app, ip, port, namespaceSet) | |||
.thenCompose(v -> sentinelApiClient.modifyClusterServerTransportConfig(app, ip, port, transportConfig)) | |||
.thenCompose(v -> sentinelApiClient.modifyClusterServerFlowConfig(app, ip, port, flowConfig)) | |||
.thenCompose(v -> sentinelApiClient.modifyClusterMode(app, ip, port, ClusterStateManager.CLUSTER_SERVER)); | |||
.thenCompose(v -> sentinelApiClient.modifyClusterMode(ip, port, ClusterStateManager.CLUSTER_SERVER)); | |||
} | |||
/** | |||
@@ -147,18 +147,18 @@ public class ClusterConfigService { | |||
} | |||
public CompletableFuture<ClusterUniversalStateVO> getClusterUniversalState(String app, String ip, int port) { | |||
return sentinelApiClient.fetchClusterMode(app, ip, port) | |||
return sentinelApiClient.fetchClusterMode(ip, port) | |||
.thenApply(e -> new ClusterUniversalStateVO().setStateInfo(e)) | |||
.thenCompose(vo -> { | |||
if (vo.getStateInfo().getClientAvailable()) { | |||
return sentinelApiClient.fetchClusterClientInfoAndConfig(app, ip, port) | |||
return sentinelApiClient.fetchClusterClientInfoAndConfig(ip, port) | |||
.thenApply(cc -> vo.setClient(new ClusterClientStateVO().setClientConfig(cc))); | |||
} else { | |||
return CompletableFuture.completedFuture(vo); | |||
} | |||
}).thenCompose(vo -> { | |||
if (vo.getStateInfo().getServerAvailable()) { | |||
return sentinelApiClient.fetchClusterServerBasicInfo(app, ip, port) | |||
return sentinelApiClient.fetchClusterServerBasicInfo(ip, port) | |||
.thenApply(vo::setServer); | |||
} else { | |||
return CompletableFuture.completedFuture(vo); | |||
@@ -17,8 +17,10 @@ package com.alibaba.csp.sentinel.dashboard.util; | |||
import java.util.ArrayList; | |||
import java.util.HashMap; | |||
import java.util.HashSet; | |||
import java.util.List; | |||
import java.util.Map; | |||
import java.util.Set; | |||
import com.alibaba.csp.sentinel.cluster.ClusterStateManager; | |||
import com.alibaba.csp.sentinel.util.StringUtil; | |||
@@ -43,6 +45,8 @@ public final class ClusterEntityUtils { | |||
return new ArrayList<>(); | |||
} | |||
Map<String, AppClusterServerStateWrapVO> map = new HashMap<>(); | |||
Set<String> tokenServerSet = new HashSet<>(); | |||
// Handle token servers that belong to current app. | |||
for (ClusterUniversalStatePairVO stateVO : list) { | |||
int mode = stateVO.getState().getStateInfo().getMode(); | |||
@@ -55,11 +59,37 @@ public final class ClusterEntityUtils { | |||
.setIp(ip) | |||
.setPort(serverStateVO.getPort()) | |||
.setState(serverStateVO) | |||
.setBelongToApp(true) | |||
.setConnectedCount(serverStateVO.getConnection().stream() | |||
.mapToInt(ConnectionGroupVO::getConnectedCount) | |||
.sum() | |||
) | |||
); | |||
tokenServerSet.add(ip + ":" + serverStateVO.getPort()); | |||
} | |||
} | |||
// Handle token servers from other app. | |||
for (ClusterUniversalStatePairVO stateVO : list) { | |||
int mode = stateVO.getState().getStateInfo().getMode(); | |||
if (mode == ClusterStateManager.CLUSTER_CLIENT) { | |||
ClusterClientStateVO clientState = stateVO.getState().getClient(); | |||
if (clientState == null) { | |||
continue; | |||
} | |||
String serverIp = clientState.getClientConfig().getServerHost(); | |||
int serverPort = clientState.getClientConfig().getServerPort(); | |||
if (tokenServerSet.contains(serverIp + ":" + serverPort)) { | |||
continue; | |||
} | |||
// We are not able to get the commandPort of foreign token server directly. | |||
String serverId = String.format("%s:%d", serverIp, serverPort); | |||
map.computeIfAbsent(serverId, v -> new AppClusterServerStateWrapVO() | |||
.setId(serverId) | |||
.setIp(serverIp) | |||
.setPort(serverPort) | |||
.setBelongToApp(false) | |||
); | |||
} | |||
} | |||
return new ArrayList<>(map.values()); | |||
@@ -92,6 +92,7 @@ app.controller('SentinelClusterAppServerListController', ['$scope', '$stateParam | |||
return; | |||
} | |||
let tmpMap = new Map(); | |||
let serverCommandPortMap = new Map(); | |||
$scope.clusterMap = []; | |||
$scope.remainingMachineList = []; | |||
let tmpServerList = []; | |||
@@ -117,9 +118,10 @@ app.controller('SentinelClusterAppServerListController', ['$scope', '$stateParam | |||
maxAllowedQps: e.state.server.flow.maxAllowedQps, | |||
belongToApp: true, | |||
}; | |||
if (!tmpMap.has(ip)) { | |||
tmpMap.set(ip, group); | |||
if (!tmpMap.has(machineId)) { | |||
tmpMap.set(machineId, group); | |||
} | |||
serverCommandPortMap.set(ip + ':' + e.state.server.port, e.commandPort); | |||
}); | |||
tmpClientList.forEach((e) => { | |||
let ip = e.ip; | |||
@@ -133,19 +135,45 @@ app.controller('SentinelClusterAppServerListController', ['$scope', '$stateParam | |||
return; | |||
} | |||
if (!tmpMap.has(targetServer)) { | |||
let serverHostPort = targetServer + ':' + targetPort; | |||
if (serverCommandPortMap.has(serverHostPort)) { | |||
let serverCommandPort = serverCommandPortMap.get(serverHostPort); | |||
let g; | |||
if (serverCommandPort < 0) { | |||
// Not belong to this app. | |||
g = tmpMap.get(serverHostPort); | |||
} else { | |||
// Belong to this app. | |||
g = tmpMap.get(targetServer + '@' + serverCommandPort); | |||
} | |||
g.clientSet.push(machineId); | |||
} else { | |||
let group = { | |||
ip: targetServer, | |||
machineId: targetServer, | |||
machineId: serverHostPort, | |||
port: targetPort, | |||
clientSet: [machineId], | |||
belongToApp: false, | |||
}; | |||
tmpMap.set(targetServer, group); | |||
} else { | |||
let g = tmpMap.get(targetServer); | |||
g.clientSet.push(machineId); | |||
tmpMap.set(serverHostPort, group); | |||
// Indicates that it's not belonging to current app. | |||
serverCommandPortMap.set(serverHostPort, -1); | |||
} | |||
// if (!tmpMap.has(serverHostPort)) { | |||
// let group = { | |||
// ip: targetServer, | |||
// machineId: targetServer, | |||
// port: targetPort, | |||
// clientSet: [machineId], | |||
// belongToApp: false, | |||
// }; | |||
// tmpMap.set(targetServer, group); | |||
// } else { | |||
// let g = tmpMap.get(targetServer); | |||
// g.clientSet.push(machineId); | |||
// } | |||
}); | |||
tmpMap.forEach((v) => { | |||
if (v !== undefined) { | |||
@@ -179,6 +207,9 @@ app.controller('SentinelClusterAppServerListController', ['$scope', '$stateParam | |||
}; | |||
function parseIpFromMachineId(machineId) { | |||
if (machineId.indexOf(':') !== -1) { | |||
return machineId.split(':')[0]; | |||
} | |||
if (machineId.indexOf('@') === -1) { | |||
return machineId; | |||
} | |||
@@ -228,7 +259,8 @@ app.controller('SentinelClusterAppServerListController', ['$scope', '$stateParam | |||
}); | |||
}; | |||
$scope.modifyServerAssignConfig = (id) => { | |||
$scope.modifyServerAssignConfig = (serverVO) => { | |||
let id = serverVO.id; | |||
ClusterStateService.fetchClusterUniversalStateOfApp($scope.app).success(function (data) { | |||
if (data.code === 0 && data.data) { | |||
$scope.loadError = undefined; | |||
@@ -252,7 +284,7 @@ app.controller('SentinelClusterAppServerListController', ['$scope', '$stateParam | |||
confirmBtnText: '保存', | |||
serverData: { | |||
currentServer: d.machineId, | |||
belongToApp: true, | |||
belongToApp: serverVO.belongToApp, | |||
serverPort: d.port, | |||
clientSet: d.clientSet, | |||
} | |||
@@ -470,11 +502,6 @@ app.controller('SentinelClusterAppServerListController', ['$scope', '$stateParam | |||
$scope.loadError = undefined; | |||
$scope.serverVOList = data.data; | |||
$scope.serverVOList.forEach(processServerListData); | |||
// if ($scope.serverVOList.length > 0) { | |||
// $scope.tmp.curChosenServer = $scope.serverVOList[0]; | |||
// $scope.onChosenServerChange(); | |||
// } | |||
} else { | |||
$scope.serverVOList = {}; | |||
if (data.code === UNSUPPORTED_CODE) { | |||
@@ -51,29 +51,33 @@ | |||
</thead> | |||
<tbody> | |||
<tr ng-repeat="serverVO in serverVOList | filter: {id: searchKey}"> | |||
<td style="word-wrap:break-word;word-break:break-all;">{{serverVO.id}}</td> | |||
<td style="word-wrap:break-word;word-break:break-all;"> | |||
<span ng-if="serverVO.belongToApp">{{serverVO.id}}</span> | |||
<span ng-if="!serverVO.belongToApp">{{serverVO.id}}(自主指定)</span> | |||
</td> | |||
<td>{{serverVO.port}}</td> | |||
<td style="word-wrap:break-word;word-break:break-all;"> | |||
{{serverVO.state.namespaceSetStr}} | |||
<span ng-if="serverVO.belongToApp">{{serverVO.state.namespaceSetStr}}</span> | |||
<span ng-if="!serverVO.belongToApp">未知</span> | |||
</td> | |||
<td style="word-wrap:break-word;word-break:break-all;"> | |||
<span ng-if="serverVO.state.embedded">嵌入模式</span> | |||
<span ng-if="!serverVO.state.embedded">独立模式</span> | |||
<span ng-if="!serverVO.belongToApp">未知</span> | |||
<span ng-if="serverVO.belongToApp && serverVO.state.embedded">嵌入模式</span> | |||
<span ng-if="serverVO.belongToApp && !serverVO.state.embedded">独立模式</span> | |||
</td> | |||
<td style="word-wrap:break-word;word-break:break-all;"> | |||
{{serverVO.connectedCount}} | |||
<span ng-if="serverVO.belongToApp">{{serverVO.connectedCount}}</span> | |||
<span ng-if="!serverVO.belongToApp">未知</span> | |||
</td> | |||
<td> | |||
{{serverVO.state.requestLimitDataStr}} | |||
<!--<p ng-repeat="crl in serverVO.state.requestLimitData">--> | |||
<!--<span ng-if="crl.namespace === app">{{crl.namespace}}:{{crl.currentQps}} / {{crl.maxAllowedQps}}</span>--> | |||
<!--</p>--> | |||
<span ng-if="serverVO.belongToApp">{{serverVO.state.requestLimitDataStr}}</span> | |||
<span ng-if="!serverVO.belongToApp">未知</span> | |||
</td> | |||
<td> | |||
<button class="btn btn-xs btn-outline-primary" type="button" | |||
<button class="btn btn-xs btn-outline-primary" type="button" ng-if="serverVO.belongToApp" | |||
ng-click="viewConnectionDetail(serverVO)" style="font-size: 12px; height:25px;">连接详情</button> | |||
<button class="btn btn-xs btn-outline-primary" type="button" | |||
ng-click="modifyServerAssignConfig(serverVO.id)" style="font-size: 12px; height:25px;">管理</button> | |||
ng-click="modifyServerAssignConfig(serverVO)" style="font-size: 12px; height:25px;">管理</button> | |||
<button class="btn btn-xs btn-outline-danger" type="button" | |||
ng-click="unbindServer(serverVO.id)" style="font-size: 12px; height:25px;">移除</button> | |||
</td> | |||