diff --git a/sentinel-demo/sentinel-demo-zookeeper-datasource/pom.xml b/sentinel-demo/sentinel-demo-zookeeper-datasource/pom.xml
index 2915c1d1..76e821de 100644
--- a/sentinel-demo/sentinel-demo-zookeeper-datasource/pom.xml
+++ b/sentinel-demo/sentinel-demo-zookeeper-datasource/pom.xml
@@ -14,7 +14,6 @@
3.4.13
4.0.1
- 2.12.0
@@ -44,8 +43,8 @@
org.apache.curator
- curator-test
- ${curator-test.version}
+ curator-recipes
+ ${curator.version}
org.apache.zookeeper
diff --git a/sentinel-demo/sentinel-demo-zookeeper-datasource/src/main/java/com/alibaba/csp/sentinel/demo/datasource/zookeeper/ZookeeperConfigSender.java b/sentinel-demo/sentinel-demo-zookeeper-datasource/src/main/java/com/alibaba/csp/sentinel/demo/datasource/zookeeper/ZookeeperConfigSender.java
index 1b635e61..4b1e4223 100644
--- a/sentinel-demo/sentinel-demo-zookeeper-datasource/src/main/java/com/alibaba/csp/sentinel/demo/datasource/zookeeper/ZookeeperConfigSender.java
+++ b/sentinel-demo/sentinel-demo-zookeeper-datasource/src/main/java/com/alibaba/csp/sentinel/demo/datasource/zookeeper/ZookeeperConfigSender.java
@@ -3,7 +3,6 @@ package com.alibaba.csp.sentinel.demo.datasource.zookeeper;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.curator.test.TestingServer;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
@@ -18,11 +17,7 @@ public class ZookeeperConfigSender {
private static final int SLEEP_TIME = 1000;
public static void main(String[] args) throws Exception {
-
- // 启动Zookeeper服务
- TestingServer server = new TestingServer(2181);
-
- final String remoteAddress = server.getConnectString();
+ final String remoteAddress = "localhost:2181";
final String groupId = "Sentinel-Demo";
final String dataId = "SYSTEM-CODE-DEMO-FLOW";
final String rule = "[\n"
@@ -44,18 +39,14 @@ public class ZookeeperConfigSender {
zkClient.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, null);
}
zkClient.setData().forPath(path, rule.getBytes());
- // zkClient.delete().forPath(path);
try {
- Thread.sleep(30000L);
+ Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
zkClient.close();
-
- //停止zookeeper服务
- server.stop();
}
private static String getPath(String groupId, String dataId) {
diff --git a/sentinel-extension/sentinel-datasource-zookeeper/pom.xml b/sentinel-extension/sentinel-datasource-zookeeper/pom.xml
index 6c7b2e21..f39ee711 100644
--- a/sentinel-extension/sentinel-datasource-zookeeper/pom.xml
+++ b/sentinel-extension/sentinel-datasource-zookeeper/pom.xml
@@ -15,6 +15,7 @@
3.4.13
4.0.1
+ 2.12.0
@@ -39,5 +40,28 @@
+
+
+ junit
+ junit
+ test
+
+
+ org.apache.curator
+ curator-test
+ ${curator.test.version}
+ test
+
+
+ org.apache.zookeeper
+ zookeeper
+
+
+
+
+ com.alibaba
+ fastjson
+ test
+
\ No newline at end of file
diff --git a/sentinel-extension/sentinel-datasource-zookeeper/src/main/java/com/alibaba/csp/sentinel/datasource/zookeeper/ZookeeperDataSource.java b/sentinel-extension/sentinel-datasource-zookeeper/src/main/java/com/alibaba/csp/sentinel/datasource/zookeeper/ZookeeperDataSource.java
index b8c2663d..09845f56 100644
--- a/sentinel-extension/sentinel-datasource-zookeeper/src/main/java/com/alibaba/csp/sentinel/datasource/zookeeper/ZookeeperDataSource.java
+++ b/sentinel-extension/sentinel-datasource-zookeeper/src/main/java/com/alibaba/csp/sentinel/datasource/zookeeper/ZookeeperDataSource.java
@@ -10,6 +10,7 @@ import com.alibaba.csp.sentinel.datasource.AbstractDataSource;
import com.alibaba.csp.sentinel.datasource.ConfigParser;
import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.util.StringUtil;
+
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
@@ -30,40 +31,35 @@ public class ZookeeperDataSource extends AbstractDataSource {
private static final int SLEEP_TIME = 1000;
private final ExecutorService pool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
- new ArrayBlockingQueue(1), new NamedThreadFactory("sentinel-zookeeper-ds-update"),
- new ThreadPoolExecutor.DiscardOldestPolicy());
+ new ArrayBlockingQueue(1), new NamedThreadFactory("sentinel-zookeeper-ds-update"),
+ new ThreadPoolExecutor.DiscardOldestPolicy());
private NodeCacheListener listener;
- private final String groupId;
- private final String dataId;
private final String path;
private CuratorFramework zkClient = null;
private NodeCache nodeCache = null;
- public ZookeeperDataSource(final String serverAddr, final String groupId, final String dataId,
- ConfigParser parser) {
+ public ZookeeperDataSource(final String serverAddr, final String path, ConfigParser parser) {
super(parser);
- if (StringUtil.isBlank(serverAddr) || StringUtil.isBlank(groupId) || StringUtil.isBlank(dataId)) {
- throw new IllegalArgumentException(String.format("Bad argument: serverAddr=[%s], groupId=[%s], dataId=[%s]",
- serverAddr, groupId, dataId));
+ if (StringUtil.isBlank(serverAddr) || StringUtil.isBlank(path)) {
+ throw new IllegalArgumentException(String.format("Bad argument: serverAddr=[%s], path=[%s]", serverAddr, path));
}
- this.groupId = groupId;
- this.dataId = dataId;
- this.path = getPath(groupId, dataId);
+ this.path = path;
init(serverAddr);
}
- public ZookeeperDataSource(final String serverAddr, final String path, ConfigParser parser) {
+ /**
+ * This constructor is Nacos-style.
+ */
+ public ZookeeperDataSource(final String serverAddr, final String groupId, final String dataId,
+ ConfigParser parser) {
super(parser);
- if (StringUtil.isBlank(serverAddr) || StringUtil.isBlank(path)) {
- throw new IllegalArgumentException(String.format("Bad argument: serverAddr=[%s], path=[%s]",
- serverAddr, path));
+ if (StringUtil.isBlank(serverAddr) || StringUtil.isBlank(groupId) || StringUtil.isBlank(dataId)) {
+ throw new IllegalArgumentException(String.format("Bad argument: serverAddr=[%s], groupId=[%s], dataId=[%s]", serverAddr, groupId, dataId));
}
- this.path = path;
- this.groupId = null;
- this.dataId = null;
+ this.path = getPath(groupId, dataId);
init(serverAddr);
}
@@ -90,7 +86,7 @@ public class ZookeeperDataSource extends AbstractDataSource {
this.listener = new NodeCacheListener() {
@Override
- public void nodeChanged() throws Exception {
+ public void nodeChanged() {
String configInfo = null;
ChildData childData = nodeCache.getCurrentData();
if (null != childData && childData.getData() != null) {
@@ -98,7 +94,7 @@ public class ZookeeperDataSource extends AbstractDataSource {
configInfo = new String(childData.getData());
}
RecordLog.info(String.format("[ZookeeperDataSource] New property value received for (%s, %s): %s",
- serverAddr, path, configInfo));
+ serverAddr, path, configInfo));
T newValue = ZookeeperDataSource.this.parser.parse(configInfo);
// Update the new value to the property.
getProperty().updateValue(newValue);
@@ -146,17 +142,6 @@ public class ZookeeperDataSource extends AbstractDataSource {
}
private String getPath(String groupId, String dataId) {
- String path = "";
- if (groupId.startsWith("/")) {
- path += groupId;
- } else {
- path += "/" + groupId;
- }
- if (dataId.startsWith("/")) {
- path += dataId;
- } else {
- path += "/" + dataId;
- }
- return path;
+ return String.format("/%s/%s", groupId, dataId);
}
}
diff --git a/sentinel-extension/sentinel-datasource-zookeeper/src/test/java/com/alibaba/csp/sentinel/datasource/zookeeper/ZookeeperDataSourceTest.java b/sentinel-extension/sentinel-datasource-zookeeper/src/test/java/com/alibaba/csp/sentinel/datasource/zookeeper/ZookeeperDataSourceTest.java
new file mode 100644
index 00000000..11c51504
--- /dev/null
+++ b/sentinel-extension/sentinel-datasource-zookeeper/src/test/java/com/alibaba/csp/sentinel/datasource/zookeeper/ZookeeperDataSourceTest.java
@@ -0,0 +1,84 @@
+package com.alibaba.csp.sentinel.datasource.zookeeper;
+
+import java.util.Collections;
+import java.util.List;
+
+import com.alibaba.csp.sentinel.datasource.ConfigParser;
+import com.alibaba.csp.sentinel.datasource.DataSource;
+import com.alibaba.csp.sentinel.slots.block.RuleConstant;
+import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
+import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.TypeReference;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.test.TestingServer;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.Stat;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+/**
+ * @author Eric Zhao
+ */
+public class ZookeeperDataSourceTest {
+
+ @Test
+ public void testZooKeeperDataSource() throws Exception {
+ TestingServer server = new TestingServer(21812);
+ server.start();
+
+ final String remoteAddress = server.getConnectString();
+ final String path = "/sentinel-zk-ds-demo/flow-HK";
+
+ DataSource> flowRuleDataSource = new ZookeeperDataSource>(remoteAddress, path,
+ new ConfigParser>() {
+ @Override
+ public List parse(String source) {
+ return JSON.parseObject(source, new TypeReference>() {});
+ }
+ });
+ FlowRuleManager.register2Property(flowRuleDataSource.getProperty());
+
+ CuratorFramework zkClient = CuratorFrameworkFactory.newClient(remoteAddress,
+ new ExponentialBackoffRetry(3, 1000));
+ zkClient.start();
+ Stat stat = zkClient.checkExists().forPath(path);
+ if (stat == null) {
+ zkClient.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, null);
+ }
+
+ final String resourceName = "HK";
+ publishThenTestFor(zkClient, path, resourceName, 10);
+ publishThenTestFor(zkClient, path, resourceName, 15);
+
+ zkClient.close();
+ server.stop();
+ }
+
+ private void publishThenTestFor(CuratorFramework zkClient, String path, String resourceName, long count) throws Exception {
+ FlowRule rule = new FlowRule().setResource(resourceName)
+ .setLimitApp("default")
+ .as(FlowRule.class)
+ .setCount(count)
+ .setGrade(RuleConstant.FLOW_GRADE_QPS);
+ String ruleString = JSON.toJSONString(Collections.singletonList(rule));
+ zkClient.setData().forPath(path, ruleString.getBytes());
+
+ Thread.sleep(5000);
+
+ List rules = FlowRuleManager.getRules();
+ assertTrue(rules != null && !rules.isEmpty());
+ boolean exists = false;
+ for (FlowRule r : rules) {
+ if (resourceName.equals(r.getResource())) {
+ exists = true;
+ assertEquals(count, new Double(r.getCount()).longValue());
+ }
+ }
+ assertTrue(exists);
+ }
+}
\ No newline at end of file