From ec30e897dc3990385c73cc8e51c84b8e693dd56f Mon Sep 17 00:00:00 2001 From: "tao.zhang" Date: Wed, 14 Aug 2019 10:38:57 +0800 Subject: [PATCH] Add Consul DataSource integration module (#979) --- sentinel-extension/pom.xml | 1 + .../sentinel-datasource-consul/pom.xml | 49 +++++ .../datasource/consul/ConsulDataSource.java | 175 ++++++++++++++++++ .../consul/ConsulDataSourceTest.java | 98 ++++++++++ 4 files changed, 323 insertions(+) create mode 100644 sentinel-extension/sentinel-datasource-consul/pom.xml create mode 100644 sentinel-extension/sentinel-datasource-consul/src/main/java/com/alibaba/csp/sentinel/datasource/consul/ConsulDataSource.java create mode 100644 sentinel-extension/sentinel-datasource-consul/src/test/java/com/alibaba/csp/sentinel/datasource/consul/ConsulDataSourceTest.java diff --git a/sentinel-extension/pom.xml b/sentinel-extension/pom.xml index 075323a6..f7ae6b6a 100755 --- a/sentinel-extension/pom.xml +++ b/sentinel-extension/pom.xml @@ -19,6 +19,7 @@ sentinel-datasource-redis sentinel-annotation-aspectj sentinel-parameter-flow-control + sentinel-datasource-consul diff --git a/sentinel-extension/sentinel-datasource-consul/pom.xml b/sentinel-extension/sentinel-datasource-consul/pom.xml new file mode 100644 index 00000000..02730215 --- /dev/null +++ b/sentinel-extension/sentinel-datasource-consul/pom.xml @@ -0,0 +1,49 @@ + + + + sentinel-extension + com.alibaba.csp + 1.7.0-SNAPSHOT + + 4.0.0 + + sentinel-datasource-consul + jar + + + 1.8 + 1.8 + 1.4.2 + 2.0.0 + + + + + com.alibaba.csp + sentinel-datasource-extension + + + com.ecwid.consul + consul-api + ${consul.version} + + + com.pszymczyk.consul + embedded-consul + ${consul.process.version} + test + + + junit + junit + test + + + com.alibaba + fastjson + test + + + \ No newline at end of file diff --git a/sentinel-extension/sentinel-datasource-consul/src/main/java/com/alibaba/csp/sentinel/datasource/consul/ConsulDataSource.java b/sentinel-extension/sentinel-datasource-consul/src/main/java/com/alibaba/csp/sentinel/datasource/consul/ConsulDataSource.java new file mode 100644 index 00000000..132c9538 --- /dev/null +++ b/sentinel-extension/sentinel-datasource-consul/src/main/java/com/alibaba/csp/sentinel/datasource/consul/ConsulDataSource.java @@ -0,0 +1,175 @@ +package com.alibaba.csp.sentinel.datasource.consul; + +import com.alibaba.csp.sentinel.concurrent.NamedThreadFactory; +import com.alibaba.csp.sentinel.datasource.AbstractDataSource; +import com.alibaba.csp.sentinel.datasource.Converter; +import com.alibaba.csp.sentinel.log.RecordLog; +import com.alibaba.csp.sentinel.util.AssertUtil; +import com.ecwid.consul.v1.ConsulClient; +import com.ecwid.consul.v1.QueryParams; +import com.ecwid.consul.v1.Response; +import com.ecwid.consul.v1.kv.model.GetValue; + +import java.util.concurrent.*; + +import static java.util.concurrent.Executors.newSingleThreadExecutor; + +/** + *

+ * A read-only {@code DataSource} with Consul backend. + *

+ *

+ * The data source first initial rules from a Consul during initialization. + * Then it start a watcher to observe the updates of rule date and update to memory. + * + * Consul do not provide http api to watch the update of KV,so it use a long polling and + * blocking queries of the Consul's feature + * to watch and update value easily.When Querying data by index will blocking until change or timeout. If + * the index of the current query is larger than before, it means that the data has changed. + *

+ * + * @author wavesZh + */ +public class ConsulDataSource extends AbstractDataSource { + + private static final int DEFAULT_PORT = 8500; + + private final ConsulClient client; + + private final String address; + + private String ruleKey; + /** + * Record the data's index in Consul to watch the change. + * If lastIndex is smaller than the index of next query, it means that rule data has updated. + */ + private volatile long lastIndex; + /** + * Request of query will hang until timeout(ms) or get updated value. + */ + private int watchTimeout; + + private ConsulKVWatcher watcher = new ConsulKVWatcher(); + + private ExecutorService watcherService = newSingleThreadExecutor( + new NamedThreadFactory("sentinel-consul-ds-update", true)); + + public ConsulDataSource(Converter parser, String host, String ruleKey, int watchTimeout) { + this(host, DEFAULT_PORT, ruleKey, watchTimeout, parser); + } + + /** + * Constructor of {@code ConsulDataSource}. + * @param parser customized data parser, cannot be empty + * @param host consul agent host + * @param port consul agent port + * @param ruleKey data key in Consul + * @param watchTimeout request for querying data will be blocked until new data or timeout. The unit is ms + */ + public ConsulDataSource(String host, int port, String ruleKey, int watchTimeout, Converter parser) { + super(parser); + AssertUtil.notNull(host, "Consul host can not be null"); + AssertUtil.notEmpty(ruleKey, "Consul ruleKey can not be empty"); + this.client = new ConsulClient(host, port); + this.address = host + ":" + port; + this.ruleKey = ruleKey; + this.watchTimeout = watchTimeout; + loadInitialConfig(); + startKVWatcher(); + } + + private void startKVWatcher() { + watcherService.submit(watcher); + } + + private void loadInitialConfig() { + try { + T newValue = loadConfig(); + if (newValue == null) { + RecordLog.warn("[ConsulDataSource] WARN: initial config is null, you may have to check your data source"); + } + getProperty().updateValue(newValue); + } catch (Exception ex) { + RecordLog.warn("[ConsulDataSource] Error when loading initial config", ex); + } + } + + @Override + public String readSource() throws Exception { + if (this.client == null) { + throw new IllegalStateException("Consul has not been initialized or error occurred"); + } + Response response = getValueImmediately(ruleKey); + if (response != null) { + GetValue value = response.getValue(); + lastIndex = response.getConsulIndex(); + return value != null ? value.getDecodedValue() : null; + } + return null; + } + + @Override + public void close() throws Exception { + watcher.stop(); + watcherService.shutdown(); + } + + private class ConsulKVWatcher implements Runnable { + private boolean running = true; + @Override + public void run() { + while (running) { + // It will be blocked until watchTimeout(ms) if rule data has no update. + Response response = getValue(ruleKey, lastIndex, watchTimeout / 1000); + if (response == null) { + try { + TimeUnit.MILLISECONDS.sleep(watchTimeout); + } catch (InterruptedException e) { + } + continue; + } + GetValue getValue = response.getValue(); + Long currentIndex = response.getConsulIndex(); + if (currentIndex == null || currentIndex <= lastIndex) { + continue; + } + lastIndex = currentIndex; + if (getValue != null) { + String newValue = getValue.getDecodedValue(); + getProperty().updateValue(parser.convert(newValue)); + RecordLog.info(String.format("[ConsulDataSource] New property value received for (%s, %s): %s", + address, ruleKey, newValue)); + } + } + } + + private void stop() { + running = false; + } + } + + /** + * get data from Consul immediately. + * + * @param key data key in Consul + */ + public Response getValueImmediately(String key) { + return getValue(key, -1, -1); + } + /** + * get data from Consul. + * + * @param key data key in Consul + * @param index the index of data in Consul. + * @param waitTime time(second) for waiting get updated value. + */ + private Response getValue(String key, long index, long waitTime) { + try { + return client.getKVValue(key, new QueryParams(waitTime, index)); + } catch (Throwable t) { + RecordLog.warn("fail to get value for key: " + key); + } + return null; + } + +} diff --git a/sentinel-extension/sentinel-datasource-consul/src/test/java/com/alibaba/csp/sentinel/datasource/consul/ConsulDataSourceTest.java b/sentinel-extension/sentinel-datasource-consul/src/test/java/com/alibaba/csp/sentinel/datasource/consul/ConsulDataSourceTest.java new file mode 100644 index 00000000..8d2c96ff --- /dev/null +++ b/sentinel-extension/sentinel-datasource-consul/src/test/java/com/alibaba/csp/sentinel/datasource/consul/ConsulDataSourceTest.java @@ -0,0 +1,98 @@ +package com.alibaba.csp.sentinel.datasource.consul; + + +import com.alibaba.csp.sentinel.datasource.Converter; +import com.alibaba.csp.sentinel.datasource.ReadableDataSource; +import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; +import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.TypeReference; +import com.ecwid.consul.v1.ConsulClient; +import com.ecwid.consul.v1.Response; +import com.pszymczyk.consul.ConsulProcess; +import com.pszymczyk.consul.ConsulStarterBuilder; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; +import java.util.Random; +import java.util.concurrent.TimeUnit; + + +public class ConsulDataSourceTest { + private ConsulProcess consul; + + private ConsulClient client; + + private ReadableDataSource> consulDataSource; + + private String host = "127.0.0.1"; + + private int port; + + private String ruleKey = "sentinel.rules.flow.ruleKey"; + + private int waitTimeout = 60; + + private List rules; + + @Before + public void init() { + this.consul = ConsulStarterBuilder.consulStarter() + .build() + .start(); + this.port = consul.getHttpPort(); + client = new ConsulClient(host, port); + Converter> flowConfigParser = buildFlowConfigParser(); + String flowRulesJson = + "[{\"resource\":\"test\", \"limitApp\":\"default\", \"grade\":1, \"count\":\"0.0\", \"strategy\":0, " + + "\"refResource\":null, " + + + "\"controlBehavior\":0, \"warmUpPeriodSec\":10, \"maxQueueingTimeMs\":500, \"controller\":null}]"; + initConsulRuleData(flowRulesJson); + rules = flowConfigParser.convert(flowRulesJson); + consulDataSource = new ConsulDataSource<>(host, port, ruleKey, waitTimeout, flowConfigParser); + FlowRuleManager.register2Property(consulDataSource.getProperty()); + } + + @After + public void clean() throws Exception { + if (consulDataSource != null) { + consulDataSource.close(); + } + if (consul != null) { + consul.close(); + } + } + + + @Test + public void testConsulDataSourceWhenInit() { + List rules = FlowRuleManager.getRules(); + Assert.assertEquals(this.rules, rules); + } + + + @Test + public void testConsulDataSourceWhenUpdate() throws InterruptedException { + rules.get(0).setMaxQueueingTimeMs(new Random().nextInt()); + client.setKVValue(ruleKey, JSON.toJSONString(rules)); + TimeUnit.MILLISECONDS.sleep(waitTimeout); + List rules = FlowRuleManager.getRules(); + Assert.assertEquals(this.rules, rules); + } + + + private Converter> buildFlowConfigParser() { + return source -> JSON.parseObject(source, new TypeReference>() {}); + } + + private void initConsulRuleData(String flowRulesJson) { + Response response = client.setKVValue(ruleKey, flowRulesJson); + Assert.assertEquals(Boolean.TRUE, response.getValue()); + } + + +} \ No newline at end of file