@@ -91,6 +91,11 @@ | |||||
<artifactId>sentinel-datasource-nacos</artifactId> | <artifactId>sentinel-datasource-nacos</artifactId> | ||||
<version>${project.version}</version> | <version>${project.version}</version> | ||||
</dependency> | </dependency> | ||||
<dependency> | |||||
<groupId>com.alibaba.csp</groupId> | |||||
<artifactId>sentinel-datasource-zookeeper</artifactId> | |||||
<version>${project.version}</version> | |||||
</dependency> | |||||
<dependency> | <dependency> | ||||
<groupId>com.alibaba.csp</groupId> | <groupId>com.alibaba.csp</groupId> | ||||
<artifactId>sentinel-adapter</artifactId> | <artifactId>sentinel-adapter</artifactId> | ||||
@@ -18,6 +18,7 @@ | |||||
<module>sentinel-demo-rocketmq</module> | <module>sentinel-demo-rocketmq</module> | ||||
<module>sentinel-demo-dubbo</module> | <module>sentinel-demo-dubbo</module> | ||||
<module>sentinel-demo-nacos-datasource</module> | <module>sentinel-demo-nacos-datasource</module> | ||||
<module>sentinel-demo-zookeeper-datasource</module> | |||||
</modules> | </modules> | ||||
<dependencies> | <dependencies> | ||||
@@ -0,0 +1,73 @@ | |||||
<?xml version="1.0" encoding="UTF-8"?> | |||||
<project xmlns="http://maven.apache.org/POM/4.0.0" | |||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | |||||
<parent> | |||||
<artifactId>sentinel-demo</artifactId> | |||||
<groupId>com.alibaba.csp</groupId> | |||||
<version>0.1.1-SNAPSHOT</version> | |||||
</parent> | |||||
<modelVersion>4.0.0</modelVersion> | |||||
<artifactId>sentinel-demo-zookeeper-datasource</artifactId> | |||||
<properties> | |||||
<zookeeper.version>3.4.13</zookeeper.version> | |||||
<curator.version>4.0.1</curator.version> | |||||
<curator-test.version>2.12.0</curator-test.version> | |||||
</properties> | |||||
<dependencies> | |||||
<dependency> | |||||
<groupId>com.alibaba.csp</groupId> | |||||
<artifactId>sentinel-core</artifactId> | |||||
</dependency> | |||||
<dependency> | |||||
<groupId>com.alibaba.csp</groupId> | |||||
<artifactId>sentinel-datasource-extension</artifactId> | |||||
</dependency> | |||||
<dependency> | |||||
<groupId>com.alibaba.csp</groupId> | |||||
<artifactId>sentinel-datasource-zookeeper</artifactId> | |||||
</dependency> | |||||
<dependency> | |||||
<groupId>com.alibaba</groupId> | |||||
<artifactId>fastjson</artifactId> | |||||
</dependency> | |||||
<dependency> | |||||
<groupId>org.apache.zookeeper</groupId> | |||||
<artifactId>zookeeper</artifactId> | |||||
<version>${zookeeper.version}</version> | |||||
</dependency> | |||||
<dependency> | |||||
<groupId>org.apache.curator</groupId> | |||||
<artifactId>curator-test</artifactId> | |||||
<version>${curator-test.version}</version> | |||||
<exclusions> | |||||
<exclusion> | |||||
<groupId>org.apache.zookeeper</groupId> | |||||
<artifactId>zookeeper</artifactId> | |||||
</exclusion> | |||||
</exclusions> | |||||
</dependency> | |||||
</dependencies> | |||||
<build> | |||||
<plugins> | |||||
<plugin> | |||||
<groupId>org.apache.maven.plugins</groupId> | |||||
<artifactId>maven-compiler-plugin</artifactId> | |||||
<version>${maven.compiler.version}</version> | |||||
<configuration> | |||||
<source>1.8</source> | |||||
<target>1.8</target> | |||||
<encoding>${java.encoding}</encoding> | |||||
</configuration> | |||||
</plugin> | |||||
</plugins> | |||||
</build> | |||||
</project> |
@@ -0,0 +1,75 @@ | |||||
package com.alibaba.csp.sentinel.demo.datasource.zookeeper; | |||||
import org.apache.curator.framework.CuratorFramework; | |||||
import org.apache.curator.framework.CuratorFrameworkFactory; | |||||
import org.apache.curator.retry.ExponentialBackoffRetry; | |||||
import org.apache.curator.test.TestingServer; | |||||
import org.apache.zookeeper.CreateMode; | |||||
import org.apache.zookeeper.data.Stat; | |||||
/** | |||||
* Zookeeper config sender for demo | |||||
* | |||||
* @author guonanjun | |||||
*/ | |||||
public class ZookeeperConfigSender { | |||||
private static final int RETRY_TIMES = 3; | |||||
private static final int SLEEP_TIME = 1000; | |||||
public static void main(String[] args) throws Exception { | |||||
// 启动Zookeeper服务 | |||||
TestingServer server = new TestingServer(2181); | |||||
final String remoteAddress = server.getConnectString(); | |||||
final String groupId = "Sentinel-Demo"; | |||||
final String dataId = "SYSTEM-CODE-DEMO-FLOW"; | |||||
final String rule = "[\n" | |||||
+ " {\n" | |||||
+ " \"resource\": \"TestResource\",\n" | |||||
+ " \"controlBehavior\": 0,\n" | |||||
+ " \"count\": 10.0,\n" | |||||
+ " \"grade\": 1,\n" | |||||
+ " \"limitApp\": \"default\",\n" | |||||
+ " \"strategy\": 0\n" | |||||
+ " }\n" | |||||
+ "]"; | |||||
CuratorFramework zkClient = CuratorFrameworkFactory.newClient(remoteAddress, new ExponentialBackoffRetry(SLEEP_TIME, RETRY_TIMES)); | |||||
zkClient.start(); | |||||
String path = getPath(groupId, dataId); | |||||
Stat stat = zkClient.checkExists().forPath(path); | |||||
if (stat == null) { | |||||
zkClient.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, null); | |||||
} | |||||
zkClient.setData().forPath(path, rule.getBytes()); | |||||
// zkClient.delete().forPath(path); | |||||
try { | |||||
Thread.sleep(30000L); | |||||
} catch (InterruptedException e) { | |||||
e.printStackTrace(); | |||||
} | |||||
zkClient.close(); | |||||
//停止zookeeper服务 | |||||
server.stop(); | |||||
} | |||||
private static String getPath(String groupId, String dataId) { | |||||
String path = ""; | |||||
if (groupId.startsWith("/")) { | |||||
path += groupId; | |||||
} else { | |||||
path += "/" + groupId; | |||||
} | |||||
if (dataId.startsWith("/")) { | |||||
path += dataId; | |||||
} else { | |||||
path += "/" + dataId; | |||||
} | |||||
return path; | |||||
} | |||||
} |
@@ -0,0 +1,65 @@ | |||||
package com.alibaba.csp.sentinel.demo.datasource.zookeeper; | |||||
import java.util.List; | |||||
import com.alibaba.csp.sentinel.datasource.DataSource; | |||||
import com.alibaba.csp.sentinel.datasource.zookeeper.ZookeeperDataSource; | |||||
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; | |||||
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager; | |||||
import com.alibaba.fastjson.JSON; | |||||
import com.alibaba.fastjson.TypeReference; | |||||
/** | |||||
* Zookeeper DataSource Demo | |||||
* | |||||
* @author guonanjun | |||||
*/ | |||||
public class ZookeeperDataSourceDemo { | |||||
public static void main(String[] args) { | |||||
// 使用zookeeper的场景 | |||||
loadRules(); | |||||
// 方便扩展的场景 | |||||
//loadRules2(); | |||||
} | |||||
private static void loadRules() { | |||||
final String remoteAddress = "127.0.0.1:2181"; | |||||
final String path = "/Sentinel-Demo/SYSTEM-CODE-DEMO-FLOW"; | |||||
DataSource<String, List<FlowRule>> flowRuleDataSource = new ZookeeperDataSource<>(remoteAddress, path, | |||||
source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() {})); | |||||
FlowRuleManager.register2Property(flowRuleDataSource.getProperty()); | |||||
} | |||||
private static void loadRules2() { | |||||
final String remoteAddress = "127.0.0.1:2181"; | |||||
// 引入groupId和dataId的概念,是为了方便和Nacos进行切换 | |||||
final String groupId = "Sentinel-Demo"; | |||||
final String flowDataId = "SYSTEM-CODE-DEMO-FLOW"; | |||||
// final String degradeDataId = "SYSTEM-CODE-DEMO-DEGRADE"; | |||||
// final String systemDataId = "SYSTEM-CODE-DEMO-SYSTEM"; | |||||
// 规则会持久化到zk的/groupId/flowDataId节点 | |||||
// groupId和和flowDataId可以用/开头也可以不用 | |||||
// 建议不用以/开头,目的是为了如果从Zookeeper切换到Nacos的话,只需要改数据源类名就可以 | |||||
DataSource<String, List<FlowRule>> flowRuleDataSource = new ZookeeperDataSource<>(remoteAddress, groupId, flowDataId, | |||||
source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() {})); | |||||
FlowRuleManager.register2Property(flowRuleDataSource.getProperty()); | |||||
// DataSource<String, List<DegradeRule>> degradeRuleDataSource = new ZookeeperDataSource<>(remoteAddress, groupId, degradeDataId, | |||||
// source -> JSON.parseObject(source, new TypeReference<List<DegradeRule>>() {})); | |||||
// DegradeRuleManager.register2Property(degradeRuleDataSource.getProperty()); | |||||
// | |||||
// DataSource<String, List<SystemRule>> systemRuleDataSource = new ZookeeperDataSource<>(remoteAddress, groupId, systemDataId, | |||||
// source -> JSON.parseObject(source, new TypeReference<List<SystemRule>>() {})); | |||||
// SystemRuleManager.register2Property(systemRuleDataSource.getProperty()); | |||||
} | |||||
} |
@@ -14,6 +14,7 @@ | |||||
<modules> | <modules> | ||||
<module>sentinel-datasource-extension</module> | <module>sentinel-datasource-extension</module> | ||||
<module>sentinel-datasource-nacos</module> | <module>sentinel-datasource-nacos</module> | ||||
<module>sentinel-datasource-zookeeper</module> | |||||
</modules> | </modules> | ||||
</project> | </project> |
@@ -0,0 +1,43 @@ | |||||
<?xml version="1.0" encoding="UTF-8"?> | |||||
<project xmlns="http://maven.apache.org/POM/4.0.0" | |||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | |||||
<parent> | |||||
<artifactId>sentinel-extension</artifactId> | |||||
<groupId>com.alibaba.csp</groupId> | |||||
<version>0.1.1-SNAPSHOT</version> | |||||
</parent> | |||||
<modelVersion>4.0.0</modelVersion> | |||||
<artifactId>sentinel-datasource-zookeeper</artifactId> | |||||
<packaging>jar</packaging> | |||||
<properties> | |||||
<zookeeper.version>3.4.13</zookeeper.version> | |||||
<curator.version>4.0.1</curator.version> | |||||
</properties> | |||||
<dependencies> | |||||
<dependency> | |||||
<groupId>com.alibaba.csp</groupId> | |||||
<artifactId>sentinel-datasource-extension</artifactId> | |||||
</dependency> | |||||
<dependency> | |||||
<groupId>org.apache.zookeeper</groupId> | |||||
<artifactId>zookeeper</artifactId> | |||||
<version>${zookeeper.version}</version> | |||||
</dependency> | |||||
<dependency> | |||||
<groupId>org.apache.curator</groupId> | |||||
<artifactId>curator-recipes</artifactId> | |||||
<version>${curator.version}</version> | |||||
<exclusions> | |||||
<exclusion> | |||||
<groupId>org.apache.zookeeper</groupId> | |||||
<artifactId>zookeeper</artifactId> | |||||
</exclusion> | |||||
</exclusions> | |||||
</dependency> | |||||
</dependencies> | |||||
</project> |
@@ -0,0 +1,162 @@ | |||||
package com.alibaba.csp.sentinel.datasource.zookeeper; | |||||
import java.util.concurrent.ArrayBlockingQueue; | |||||
import java.util.concurrent.ExecutorService; | |||||
import java.util.concurrent.ThreadPoolExecutor; | |||||
import java.util.concurrent.TimeUnit; | |||||
import com.alibaba.csp.sentinel.concurrent.NamedThreadFactory; | |||||
import com.alibaba.csp.sentinel.datasource.AbstractDataSource; | |||||
import com.alibaba.csp.sentinel.datasource.ConfigParser; | |||||
import com.alibaba.csp.sentinel.log.RecordLog; | |||||
import com.alibaba.csp.sentinel.util.StringUtil; | |||||
import org.apache.curator.framework.CuratorFramework; | |||||
import org.apache.curator.framework.CuratorFrameworkFactory; | |||||
import org.apache.curator.framework.recipes.cache.ChildData; | |||||
import org.apache.curator.framework.recipes.cache.NodeCache; | |||||
import org.apache.curator.framework.recipes.cache.NodeCacheListener; | |||||
import org.apache.curator.retry.ExponentialBackoffRetry; | |||||
import org.apache.zookeeper.CreateMode; | |||||
import org.apache.zookeeper.data.Stat; | |||||
/** | |||||
* Zookeeper DataSource | |||||
* | |||||
* @author guonanjun | |||||
*/ | |||||
public class ZookeeperDataSource<T> extends AbstractDataSource<String, T> { | |||||
private static final int RETRY_TIMES = 3; | |||||
private static final int SLEEP_TIME = 1000; | |||||
private final ExecutorService pool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, | |||||
new ArrayBlockingQueue<Runnable>(1), new NamedThreadFactory("sentinel-zookeeper-ds-update"), | |||||
new ThreadPoolExecutor.DiscardOldestPolicy()); | |||||
private NodeCacheListener listener; | |||||
private final String groupId; | |||||
private final String dataId; | |||||
private final String path; | |||||
private CuratorFramework zkClient = null; | |||||
private NodeCache nodeCache = null; | |||||
public ZookeeperDataSource(final String serverAddr, final String groupId, final String dataId, | |||||
ConfigParser<String, T> parser) { | |||||
super(parser); | |||||
if (StringUtil.isBlank(serverAddr) || StringUtil.isBlank(groupId) || StringUtil.isBlank(dataId)) { | |||||
throw new IllegalArgumentException(String.format("Bad argument: serverAddr=[%s], groupId=[%s], dataId=[%s]", | |||||
serverAddr, groupId, dataId)); | |||||
} | |||||
this.groupId = groupId; | |||||
this.dataId = dataId; | |||||
this.path = getPath(groupId, dataId); | |||||
init(serverAddr); | |||||
} | |||||
public ZookeeperDataSource(final String serverAddr, final String path, ConfigParser<String, T> parser) { | |||||
super(parser); | |||||
if (StringUtil.isBlank(serverAddr) || StringUtil.isBlank(path)) { | |||||
throw new IllegalArgumentException(String.format("Bad argument: serverAddr=[%s], path=[%s]", | |||||
serverAddr, path)); | |||||
} | |||||
this.path = path; | |||||
this.groupId = null; | |||||
this.dataId = null; | |||||
init(serverAddr); | |||||
} | |||||
private void init(final String serverAddr) { | |||||
initZookeeperListener(serverAddr); | |||||
loadInitialConfig(); | |||||
} | |||||
private void loadInitialConfig() { | |||||
try { | |||||
T newValue = loadConfig(); | |||||
if (newValue == null) { | |||||
RecordLog.info("[ZookeeperDataSource] WARN: initial config is null, you may have to check your data source"); | |||||
} | |||||
getProperty().updateValue(newValue); | |||||
} catch (Exception ex) { | |||||
RecordLog.info("[ZookeeperDataSource] Error when loading initial config", ex); | |||||
} | |||||
} | |||||
private void initZookeeperListener(final String serverAddr) { | |||||
try { | |||||
this.listener = new NodeCacheListener() { | |||||
@Override | |||||
public void nodeChanged() throws Exception { | |||||
String configInfo = null; | |||||
ChildData childData = nodeCache.getCurrentData(); | |||||
if (null != childData && childData.getData() != null) { | |||||
configInfo = new String(childData.getData()); | |||||
} | |||||
RecordLog.info(String.format("[ZookeeperDataSource] New property value received for (%s, %s): %s", | |||||
serverAddr, path, configInfo)); | |||||
T newValue = ZookeeperDataSource.this.parser.parse(configInfo); | |||||
// Update the new value to the property. | |||||
getProperty().updateValue(newValue); | |||||
} | |||||
}; | |||||
this.zkClient = CuratorFrameworkFactory.newClient(serverAddr, new ExponentialBackoffRetry(SLEEP_TIME, RETRY_TIMES)); | |||||
this.zkClient.start(); | |||||
Stat stat = this.zkClient.checkExists().forPath(this.path); | |||||
if (stat == null) { | |||||
this.zkClient.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath(this.path, null); | |||||
} | |||||
this.nodeCache = new NodeCache(this.zkClient, this.path); | |||||
this.nodeCache.getListenable().addListener(this.listener, this.pool); | |||||
this.nodeCache.start(); | |||||
} catch (Exception e) { | |||||
RecordLog.info("[ZookeeperDataSource] Error occurred when initializing Zookeeper data source", e); | |||||
e.printStackTrace(); | |||||
} | |||||
} | |||||
@Override | |||||
public String readSource() throws Exception { | |||||
if (this.zkClient == null) { | |||||
throw new IllegalStateException("Zookeeper has not been initialized or error occurred"); | |||||
} | |||||
byte[] data = this.zkClient.getData().forPath(this.path); | |||||
if (data != null) { | |||||
return new String(data); | |||||
} | |||||
return null; | |||||
} | |||||
@Override | |||||
public void close() throws Exception { | |||||
if (this.nodeCache != null) { | |||||
this.nodeCache.getListenable().removeListener(listener); | |||||
this.nodeCache.close(); | |||||
} | |||||
if (this.zkClient != null) { | |||||
this.zkClient.close(); | |||||
} | |||||
pool.shutdown(); | |||||
} | |||||
private String getPath(String groupId, String dataId) { | |||||
String path = ""; | |||||
if (groupId.startsWith("/")) { | |||||
path += groupId; | |||||
} else { | |||||
path += "/" + groupId; | |||||
} | |||||
if (dataId.startsWith("/")) { | |||||
path += dataId; | |||||
} else { | |||||
path += "/" + dataId; | |||||
} | |||||
return path; | |||||
} | |||||
} |