Browse Source

Add Consul DataSource integration module (#979)

master
tao.zhang Eric Zhao 5 years ago
parent
commit
ec30e897dc
4 changed files with 323 additions and 0 deletions
  1. +1
    -0
      sentinel-extension/pom.xml
  2. +49
    -0
      sentinel-extension/sentinel-datasource-consul/pom.xml
  3. +175
    -0
      sentinel-extension/sentinel-datasource-consul/src/main/java/com/alibaba/csp/sentinel/datasource/consul/ConsulDataSource.java
  4. +98
    -0
      sentinel-extension/sentinel-datasource-consul/src/test/java/com/alibaba/csp/sentinel/datasource/consul/ConsulDataSourceTest.java

+ 1
- 0
sentinel-extension/pom.xml View File

@@ -19,6 +19,7 @@
<module>sentinel-datasource-redis</module>
<module>sentinel-annotation-aspectj</module>
<module>sentinel-parameter-flow-control</module>
<module>sentinel-datasource-consul</module>
</modules>

</project>

+ 49
- 0
sentinel-extension/sentinel-datasource-consul/pom.xml View File

@@ -0,0 +1,49 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>sentinel-extension</artifactId>
<groupId>com.alibaba.csp</groupId>
<version>1.7.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>sentinel-datasource-consul</artifactId>
<packaging>jar</packaging>

<properties>
<java.source.version>1.8</java.source.version>
<java.target.version>1.8</java.target.version>
<consul.version>1.4.2</consul.version>
<consul.process.version>2.0.0</consul.process.version>
</properties>

<dependencies>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-datasource-extension</artifactId>
</dependency>
<dependency>
<groupId>com.ecwid.consul</groupId>
<artifactId>consul-api</artifactId>
<version>${consul.version}</version>
</dependency>
<dependency>
<groupId>com.pszymczyk.consul</groupId>
<artifactId>embedded-consul</artifactId>
<version>${consul.process.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

+ 175
- 0
sentinel-extension/sentinel-datasource-consul/src/main/java/com/alibaba/csp/sentinel/datasource/consul/ConsulDataSource.java View File

@@ -0,0 +1,175 @@
package com.alibaba.csp.sentinel.datasource.consul;

import com.alibaba.csp.sentinel.concurrent.NamedThreadFactory;
import com.alibaba.csp.sentinel.datasource.AbstractDataSource;
import com.alibaba.csp.sentinel.datasource.Converter;
import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.util.AssertUtil;
import com.ecwid.consul.v1.ConsulClient;
import com.ecwid.consul.v1.QueryParams;
import com.ecwid.consul.v1.Response;
import com.ecwid.consul.v1.kv.model.GetValue;

import java.util.concurrent.*;

import static java.util.concurrent.Executors.newSingleThreadExecutor;

/**
* <p>
* A read-only {@code DataSource} with Consul backend.
* <p>
* <p>
* The data source first initial rules from a Consul during initialization.
* Then it start a watcher to observe the updates of rule date and update to memory.
*
* Consul do not provide http api to watch the update of KV,so it use a long polling and
* <a href="https://www.consul.io/api/features/blocking.html">blocking queries</a> of the Consul's feature
* to watch and update value easily.When Querying data by index will blocking until change or timeout. If
* the index of the current query is larger than before, it means that the data has changed.
* </p>
*
* @author wavesZh
*/
public class ConsulDataSource<T> extends AbstractDataSource<String, T> {

private static final int DEFAULT_PORT = 8500;

private final ConsulClient client;

private final String address;

private String ruleKey;
/**
* Record the data's index in Consul to watch the change.
* If lastIndex is smaller than the index of next query, it means that rule data has updated.
*/
private volatile long lastIndex;
/**
* Request of query will hang until timeout(ms) or get updated value.
*/
private int watchTimeout;

private ConsulKVWatcher watcher = new ConsulKVWatcher();

private ExecutorService watcherService = newSingleThreadExecutor(
new NamedThreadFactory("sentinel-consul-ds-update", true));

public ConsulDataSource(Converter<String, T> parser, String host, String ruleKey, int watchTimeout) {
this(host, DEFAULT_PORT, ruleKey, watchTimeout, parser);
}

/**
* Constructor of {@code ConsulDataSource}.
* @param parser customized data parser, cannot be empty
* @param host consul agent host
* @param port consul agent port
* @param ruleKey data key in Consul
* @param watchTimeout request for querying data will be blocked until new data or timeout. The unit is ms
*/
public ConsulDataSource(String host, int port, String ruleKey, int watchTimeout, Converter<String, T> parser) {
super(parser);
AssertUtil.notNull(host, "Consul host can not be null");
AssertUtil.notEmpty(ruleKey, "Consul ruleKey can not be empty");
this.client = new ConsulClient(host, port);
this.address = host + ":" + port;
this.ruleKey = ruleKey;
this.watchTimeout = watchTimeout;
loadInitialConfig();
startKVWatcher();
}

private void startKVWatcher() {
watcherService.submit(watcher);
}

private void loadInitialConfig() {
try {
T newValue = loadConfig();
if (newValue == null) {
RecordLog.warn("[ConsulDataSource] WARN: initial config is null, you may have to check your data source");
}
getProperty().updateValue(newValue);
} catch (Exception ex) {
RecordLog.warn("[ConsulDataSource] Error when loading initial config", ex);
}
}

@Override
public String readSource() throws Exception {
if (this.client == null) {
throw new IllegalStateException("Consul has not been initialized or error occurred");
}
Response<GetValue> response = getValueImmediately(ruleKey);
if (response != null) {
GetValue value = response.getValue();
lastIndex = response.getConsulIndex();
return value != null ? value.getDecodedValue() : null;
}
return null;
}

@Override
public void close() throws Exception {
watcher.stop();
watcherService.shutdown();
}

private class ConsulKVWatcher implements Runnable {
private boolean running = true;
@Override
public void run() {
while (running) {
// It will be blocked until watchTimeout(ms) if rule data has no update.
Response<GetValue> response = getValue(ruleKey, lastIndex, watchTimeout / 1000);
if (response == null) {
try {
TimeUnit.MILLISECONDS.sleep(watchTimeout);
} catch (InterruptedException e) {
}
continue;
}
GetValue getValue = response.getValue();
Long currentIndex = response.getConsulIndex();
if (currentIndex == null || currentIndex <= lastIndex) {
continue;
}
lastIndex = currentIndex;
if (getValue != null) {
String newValue = getValue.getDecodedValue();
getProperty().updateValue(parser.convert(newValue));
RecordLog.info(String.format("[ConsulDataSource] New property value received for (%s, %s): %s",
address, ruleKey, newValue));
}
}
}

private void stop() {
running = false;
}
}

/**
* get data from Consul immediately.
*
* @param key data key in Consul
*/
public Response<GetValue> getValueImmediately(String key) {
return getValue(key, -1, -1);
}
/**
* get data from Consul.
*
* @param key data key in Consul
* @param index the index of data in Consul.
* @param waitTime time(second) for waiting get updated value.
*/
private Response<GetValue> getValue(String key, long index, long waitTime) {
try {
return client.getKVValue(key, new QueryParams(waitTime, index));
} catch (Throwable t) {
RecordLog.warn("fail to get value for key: " + key);
}
return null;
}

}

+ 98
- 0
sentinel-extension/sentinel-datasource-consul/src/test/java/com/alibaba/csp/sentinel/datasource/consul/ConsulDataSourceTest.java View File

@@ -0,0 +1,98 @@
package com.alibaba.csp.sentinel.datasource.consul;


import com.alibaba.csp.sentinel.datasource.Converter;
import com.alibaba.csp.sentinel.datasource.ReadableDataSource;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.ecwid.consul.v1.ConsulClient;
import com.ecwid.consul.v1.Response;
import com.pszymczyk.consul.ConsulProcess;
import com.pszymczyk.consul.ConsulStarterBuilder;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;


public class ConsulDataSourceTest {
private ConsulProcess consul;

private ConsulClient client;

private ReadableDataSource<String, List<FlowRule>> consulDataSource;

private String host = "127.0.0.1";

private int port;

private String ruleKey = "sentinel.rules.flow.ruleKey";

private int waitTimeout = 60;

private List<FlowRule> rules;

@Before
public void init() {
this.consul = ConsulStarterBuilder.consulStarter()
.build()
.start();
this.port = consul.getHttpPort();
client = new ConsulClient(host, port);
Converter<String, List<FlowRule>> flowConfigParser = buildFlowConfigParser();
String flowRulesJson =
"[{\"resource\":\"test\", \"limitApp\":\"default\", \"grade\":1, \"count\":\"0.0\", \"strategy\":0, "
+ "\"refResource\":null, "
+
"\"controlBehavior\":0, \"warmUpPeriodSec\":10, \"maxQueueingTimeMs\":500, \"controller\":null}]";
initConsulRuleData(flowRulesJson);
rules = flowConfigParser.convert(flowRulesJson);
consulDataSource = new ConsulDataSource<>(host, port, ruleKey, waitTimeout, flowConfigParser);
FlowRuleManager.register2Property(consulDataSource.getProperty());
}

@After
public void clean() throws Exception {
if (consulDataSource != null) {
consulDataSource.close();
}
if (consul != null) {
consul.close();
}
}


@Test
public void testConsulDataSourceWhenInit() {
List<FlowRule> rules = FlowRuleManager.getRules();
Assert.assertEquals(this.rules, rules);
}


@Test
public void testConsulDataSourceWhenUpdate() throws InterruptedException {
rules.get(0).setMaxQueueingTimeMs(new Random().nextInt());
client.setKVValue(ruleKey, JSON.toJSONString(rules));
TimeUnit.MILLISECONDS.sleep(waitTimeout);
List<FlowRule> rules = FlowRuleManager.getRules();
Assert.assertEquals(this.rules, rules);
}


private Converter<String, List<FlowRule>> buildFlowConfigParser() {
return source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() {});
}

private void initConsulRuleData(String flowRulesJson) {
Response<Boolean> response = client.setKVValue(ruleKey, flowRulesJson);
Assert.assertEquals(Boolean.TRUE, response.getValue());
}


}

Loading…
Cancel
Save