Signed-off-by: Eric Zhao <sczyh16@gmail.com>master
@@ -43,7 +43,7 @@ public class FetchClusterServerInfoCommandHandler implements CommandHandler<Stri | |||||
JSONArray connectionGroups = new JSONArray(); | JSONArray connectionGroups = new JSONArray(); | ||||
Set<String> namespaceSet = ClusterServerConfigManager.getNamespaceSet(); | Set<String> namespaceSet = ClusterServerConfigManager.getNamespaceSet(); | ||||
for (String namespace : namespaceSet) { | for (String namespace : namespaceSet) { | ||||
ConnectionGroup group = ConnectionManager.getConnectionGroup(namespace); | |||||
ConnectionGroup group = ConnectionManager.getOrCreateConnectionGroup(namespace); | |||||
if (group != null) { | if (group != null) { | ||||
connectionGroups.add(group); | connectionGroups.add(group); | ||||
} | } | ||||
@@ -56,8 +56,10 @@ public class ConnectionGroup { | |||||
} else { | } else { | ||||
host = address; | host = address; | ||||
} | } | ||||
connectionSet.add(new ConnectionDescriptor().setAddress(address).setHost(host)); | |||||
connectedCount.incrementAndGet(); | |||||
boolean newAdded = connectionSet.add(new ConnectionDescriptor().setAddress(address).setHost(host)); | |||||
if (newAdded) { | |||||
connectedCount.incrementAndGet(); | |||||
} | |||||
return this; | return this; | ||||
} | } | ||||
@@ -100,12 +100,23 @@ public final class ConnectionManager { | |||||
return group; | return group; | ||||
} | } | ||||
public static ConnectionGroup getConnectionGroup(String namespace) { | |||||
public static ConnectionGroup getOrCreateConnectionGroup(String namespace) { | |||||
AssertUtil.assertNotBlank(namespace, "namespace should not be empty"); | AssertUtil.assertNotBlank(namespace, "namespace should not be empty"); | ||||
ConnectionGroup group = getOrCreateGroup(namespace); | ConnectionGroup group = getOrCreateGroup(namespace); | ||||
return group; | return group; | ||||
} | } | ||||
public static ConnectionGroup getConnectionGroup(String namespace) { | |||||
AssertUtil.assertNotBlank(namespace, "namespace should not be empty"); | |||||
ConnectionGroup group = CONN_MAP.get(namespace); | |||||
return group; | |||||
} | |||||
static void clear() { | |||||
CONN_MAP.clear(); | |||||
NAMESPACE_MAP.clear(); | |||||
} | |||||
private static final Object CREATE_LOCK = new Object(); | private static final Object CREATE_LOCK = new Object(); | ||||
private ConnectionManager() {} | private ConnectionManager() {} | ||||
@@ -0,0 +1,34 @@ | |||||
package com.alibaba.csp.sentinel.cluster.server.connection; | |||||
import org.junit.Test; | |||||
import static org.junit.Assert.*; | |||||
/** | |||||
* @author Eric Zhao | |||||
*/ | |||||
public class ConnectionGroupTest { | |||||
@Test | |||||
public void testAddAndRemoveConnection() { | |||||
String namespace = "test-conn-group"; | |||||
ConnectionGroup group = new ConnectionGroup(namespace); | |||||
assertEquals(0, group.getConnectedCount()); | |||||
String address1 = "12.23.34.45:5566"; | |||||
String address2 = "192.168.0.22:32123"; | |||||
String address3 = "12.23.34.45:5566"; | |||||
group.addConnection(address1); | |||||
assertEquals(1, group.getConnectedCount()); | |||||
group.addConnection(address2); | |||||
assertEquals(2, group.getConnectedCount()); | |||||
group.addConnection(address3); | |||||
assertEquals(2, group.getConnectedCount()); | |||||
group.removeConnection(address1); | |||||
assertEquals(1, group.getConnectedCount()); | |||||
group.removeConnection(address3); | |||||
assertEquals(1, group.getConnectedCount()); | |||||
} | |||||
} |
@@ -0,0 +1,102 @@ | |||||
package com.alibaba.csp.sentinel.cluster.server.connection; | |||||
import java.util.List; | |||||
import java.util.concurrent.CopyOnWriteArrayList; | |||||
import java.util.concurrent.CountDownLatch; | |||||
import org.junit.After; | |||||
import org.junit.Before; | |||||
import org.junit.Test; | |||||
import static org.junit.Assert.*; | |||||
/** | |||||
* @author Eric Zhao | |||||
*/ | |||||
public class ConnectionManagerTest { | |||||
@Before | |||||
public void setUp() { | |||||
ConnectionManager.clear(); | |||||
} | |||||
@After | |||||
public void cleanUp() { | |||||
ConnectionManager.clear(); | |||||
} | |||||
@Test | |||||
public void testAndConnectionAndGetConnectedCount() { | |||||
String namespace = "test-namespace"; | |||||
assertEquals(0, ConnectionManager.getConnectedCount(namespace)); | |||||
// Put one connection. | |||||
ConnectionManager.addConnection(namespace, "12.23.34.45:1997"); | |||||
assertEquals(1, ConnectionManager.getConnectedCount(namespace)); | |||||
// Put duplicate connection. | |||||
ConnectionManager.addConnection(namespace, "12.23.34.45:1997"); | |||||
assertEquals(1, ConnectionManager.getConnectedCount(namespace)); | |||||
// Put another connection. | |||||
ConnectionManager.addConnection(namespace, "12.23.34.49:22123"); | |||||
assertEquals(2, ConnectionManager.getConnectedCount(namespace)); | |||||
} | |||||
@Test(expected = IllegalArgumentException.class) | |||||
public void testGetOrCreateGroupBadNamespace() { | |||||
ConnectionManager.getOrCreateGroup(""); | |||||
} | |||||
@Test | |||||
public void testGetOrCreateGroupMultipleThread() throws Exception { | |||||
final String namespace = "test-namespace"; | |||||
int threadCount = 32; | |||||
final List<ConnectionGroup> groups = new CopyOnWriteArrayList<>(); | |||||
final CountDownLatch latch = new CountDownLatch(threadCount); | |||||
for (int i = 0; i < threadCount; i++) { | |||||
new Thread(new Runnable() { | |||||
@Override | |||||
public void run() { | |||||
groups.add(ConnectionManager.getOrCreateGroup(namespace)); | |||||
latch.countDown(); | |||||
} | |||||
}).start(); | |||||
} | |||||
latch.await(); | |||||
for (int i = 1; i < groups.size(); i++) { | |||||
assertSame(groups.get(i - 1).getNamespace(), groups.get(i).getNamespace()); | |||||
} | |||||
} | |||||
@Test | |||||
public void testRemoveConnection() { | |||||
String namespace = "test-namespace-remove"; | |||||
String address1 = "12.23.34.45:1997"; | |||||
String address2 = "12.23.34.46:1998"; | |||||
String address3 = "12.23.34.47:1999"; | |||||
ConnectionManager.addConnection(namespace, address1); | |||||
ConnectionManager.addConnection(namespace, address2); | |||||
ConnectionManager.addConnection(namespace, address3); | |||||
assertEquals(3, ConnectionManager.getConnectedCount(namespace)); | |||||
ConnectionManager.removeConnection(namespace, address3); | |||||
assertEquals(2, ConnectionManager.getConnectedCount(namespace)); | |||||
assertFalse(ConnectionManager.getOrCreateConnectionGroup(namespace).getConnectionSet().contains( | |||||
new ConnectionDescriptor().setAddress(address3) | |||||
)); | |||||
} | |||||
@Test | |||||
public void testGetOrCreateConnectionGroup() { | |||||
String namespace = "test-namespace"; | |||||
assertNull(ConnectionManager.getConnectionGroup(namespace)); | |||||
ConnectionGroup group1 = ConnectionManager.getOrCreateConnectionGroup(namespace); | |||||
assertNotNull(group1); | |||||
// Put one connection. | |||||
ConnectionManager.addConnection(namespace, "12.23.34.45:1997"); | |||||
ConnectionGroup group2 = ConnectionManager.getOrCreateConnectionGroup(namespace); | |||||
assertSame(group1, group2); | |||||
} | |||||
} |