- Spilt DataSource into two types: ReadableDataSource and WritableDataSource - The AbstractDataSource now is read-only - Refactor the file data source for writable implementation - Rename: ConfigParser -> Converter (represents both encoder `T -> S` and decoder `S -> T`) - Some other refinement Signed-off-by: Eric Zhao <sczyh16@gmail.com>master
@@ -1,9 +1,9 @@ | |||||
package com.alibaba.csp.sentinel.datasource.apollo; | package com.alibaba.csp.sentinel.datasource.apollo; | ||||
import com.alibaba.csp.sentinel.datasource.AbstractDataSource; | import com.alibaba.csp.sentinel.datasource.AbstractDataSource; | ||||
import com.alibaba.csp.sentinel.datasource.ConfigParser; | |||||
import com.alibaba.csp.sentinel.datasource.DataSource; | |||||
import com.alibaba.csp.sentinel.datasource.Converter; | |||||
import com.alibaba.csp.sentinel.log.RecordLog; | import com.alibaba.csp.sentinel.log.RecordLog; | ||||
import com.ctrip.framework.apollo.Config; | import com.ctrip.framework.apollo.Config; | ||||
import com.ctrip.framework.apollo.ConfigChangeListener; | import com.ctrip.framework.apollo.ConfigChangeListener; | ||||
import com.ctrip.framework.apollo.ConfigService; | import com.ctrip.framework.apollo.ConfigService; | ||||
@@ -14,7 +14,8 @@ import com.google.common.base.Strings; | |||||
import com.google.common.collect.Sets; | import com.google.common.collect.Sets; | ||||
/** | /** | ||||
* A {@link DataSource} with <a href="http://github.com/ctripcorp/apollo">Apollo</a> as its configuration source. | |||||
* A read-only {@code DataSource} with <a href="http://github.com/ctripcorp/apollo">Apollo</a> as its configuration | |||||
* source. | |||||
* <br /> | * <br /> | ||||
* When the rule is changed in Apollo, it will take effect in real time. | * When the rule is changed in Apollo, it will take effect in real time. | ||||
* | * | ||||
@@ -22,73 +23,75 @@ import com.google.common.collect.Sets; | |||||
*/ | */ | ||||
public class ApolloDataSource<T> extends AbstractDataSource<String, T> { | public class ApolloDataSource<T> extends AbstractDataSource<String, T> { | ||||
private final Config config; | |||||
private final String flowRulesKey; | |||||
private final String defaultFlowRuleValue; | |||||
/** | |||||
* Constructs the Apollo data source | |||||
* | |||||
* @param namespaceName the namespace name in Apollo, should not be null or empty | |||||
* @param flowRulesKey the flow rules key in the namespace, should not be null or empty | |||||
* @param defaultFlowRuleValue the default flow rules value when the flow rules key is not found or any error occurred | |||||
* @param parser the parser to transform string configuration to actual flow rules | |||||
*/ | |||||
public ApolloDataSource(String namespaceName, String flowRulesKey, String defaultFlowRuleValue, | |||||
ConfigParser<String, T> parser) { | |||||
super(parser); | |||||
private final Config config; | |||||
private final String flowRulesKey; | |||||
private final String defaultFlowRuleValue; | |||||
Preconditions.checkArgument(!Strings.isNullOrEmpty(namespaceName), "Namespace name could not be null or empty"); | |||||
Preconditions.checkArgument(!Strings.isNullOrEmpty(flowRulesKey), "FlowRuleKey could not be null or empty!"); | |||||
/** | |||||
* Constructs the Apollo data source | |||||
* | |||||
* @param namespaceName the namespace name in Apollo, should not be null or empty | |||||
* @param flowRulesKey the flow rules key in the namespace, should not be null or empty | |||||
* @param defaultFlowRuleValue the default flow rules value when the flow rules key is not found or any error | |||||
* occurred | |||||
* @param parser the parser to transform string configuration to actual flow rules | |||||
*/ | |||||
public ApolloDataSource(String namespaceName, String flowRulesKey, String defaultFlowRuleValue, | |||||
Converter<String, T> parser) { | |||||
super(parser); | |||||
this.flowRulesKey = flowRulesKey; | |||||
this.defaultFlowRuleValue = defaultFlowRuleValue; | |||||
Preconditions.checkArgument(!Strings.isNullOrEmpty(namespaceName), "Namespace name could not be null or empty"); | |||||
Preconditions.checkArgument(!Strings.isNullOrEmpty(flowRulesKey), "FlowRuleKey could not be null or empty!"); | |||||
this.config = ConfigService.getConfig(namespaceName); | |||||
this.flowRulesKey = flowRulesKey; | |||||
this.defaultFlowRuleValue = defaultFlowRuleValue; | |||||
initialize(); | |||||
this.config = ConfigService.getConfig(namespaceName); | |||||
RecordLog.info(String.format("Initialized rule for namespace: %s, flow rules key: %s", namespaceName, flowRulesKey)); | |||||
} | |||||
initialize(); | |||||
private void initialize() { | |||||
initializeConfigChangeListener(); | |||||
loadAndUpdateRules(); | |||||
} | |||||
RecordLog.info(String.format("Initialized rule for namespace: %s, flow rules key: %s", | |||||
namespaceName, flowRulesKey)); | |||||
} | |||||
private void loadAndUpdateRules() { | |||||
try { | |||||
T newValue = loadConfig(); | |||||
if (newValue == null) { | |||||
RecordLog.warn("[ApolloDataSource] WARN: rule config is null, you may have to check your data source"); | |||||
} | |||||
getProperty().updateValue(newValue); | |||||
} catch (Throwable ex) { | |||||
RecordLog.warn("[ApolloDataSource] Error when loading rule config", ex); | |||||
private void initialize() { | |||||
initializeConfigChangeListener(); | |||||
loadAndUpdateRules(); | |||||
} | } | ||||
} | |||||
private void initializeConfigChangeListener() { | |||||
config.addChangeListener(new ConfigChangeListener() { | |||||
@Override | |||||
public void onChange(ConfigChangeEvent changeEvent) { | |||||
ConfigChange change = changeEvent.getChange(flowRulesKey); | |||||
//change is never null because the listener will only notify for this key | |||||
if (change != null) { | |||||
RecordLog.info("[ApolloDataSource] Received config changes: " + change.toString()); | |||||
private void loadAndUpdateRules() { | |||||
try { | |||||
T newValue = loadConfig(); | |||||
if (newValue == null) { | |||||
RecordLog.warn("[ApolloDataSource] WARN: rule config is null, you may have to check your data source"); | |||||
} | |||||
getProperty().updateValue(newValue); | |||||
} catch (Throwable ex) { | |||||
RecordLog.warn("[ApolloDataSource] Error when loading rule config", ex); | |||||
} | } | ||||
loadAndUpdateRules(); | |||||
} | |||||
}, Sets.newHashSet(flowRulesKey)); | |||||
} | |||||
} | |||||
@Override | |||||
public String readSource() throws Exception { | |||||
return config.getProperty(flowRulesKey, defaultFlowRuleValue); | |||||
} | |||||
private void initializeConfigChangeListener() { | |||||
config.addChangeListener(new ConfigChangeListener() { | |||||
@Override | |||||
public void onChange(ConfigChangeEvent changeEvent) { | |||||
ConfigChange change = changeEvent.getChange(flowRulesKey); | |||||
//change is never null because the listener will only notify for this key | |||||
if (change != null) { | |||||
RecordLog.info("[ApolloDataSource] Received config changes: " + change.toString()); | |||||
} | |||||
loadAndUpdateRules(); | |||||
} | |||||
}, Sets.newHashSet(flowRulesKey)); | |||||
} | |||||
@Override | |||||
public void close() throws Exception { | |||||
// nothing to destroy | |||||
} | |||||
@Override | |||||
public String readSource() throws Exception { | |||||
return config.getProperty(flowRulesKey, defaultFlowRuleValue); | |||||
} | |||||
@Override | |||||
public void close() throws Exception { | |||||
// nothing to destroy | |||||
} | |||||
} | } |
@@ -18,12 +18,20 @@ package com.alibaba.csp.sentinel.datasource; | |||||
import com.alibaba.csp.sentinel.property.DynamicSentinelProperty; | import com.alibaba.csp.sentinel.property.DynamicSentinelProperty; | ||||
import com.alibaba.csp.sentinel.property.SentinelProperty; | import com.alibaba.csp.sentinel.property.SentinelProperty; | ||||
public abstract class AbstractDataSource<S, T> implements DataSource<S, T> { | |||||
/** | |||||
* The abstract readable data source provides basic functionality for loading and parsing config. | |||||
* | |||||
* @param <S> source data type | |||||
* @param <T> target data type | |||||
* @author Carpenter Lee | |||||
* @author Eric Zhao | |||||
*/ | |||||
public abstract class AbstractDataSource<S, T> implements ReadableDataSource<S, T> { | |||||
protected final ConfigParser<S, T> parser; | |||||
protected final Converter<S, T> parser; | |||||
protected final SentinelProperty<T> property; | protected final SentinelProperty<T> property; | ||||
public AbstractDataSource(ConfigParser<S, T> parser) { | |||||
public AbstractDataSource(Converter<S, T> parser) { | |||||
if (parser == null) { | if (parser == null) { | ||||
throw new IllegalArgumentException("parser can't be null"); | throw new IllegalArgumentException("parser can't be null"); | ||||
} | } | ||||
@@ -33,13 +41,11 @@ public abstract class AbstractDataSource<S, T> implements DataSource<S, T> { | |||||
@Override | @Override | ||||
public T loadConfig() throws Exception { | public T loadConfig() throws Exception { | ||||
S readValue = readSource(); | |||||
T value = parser.parse(readValue); | |||||
return value; | |||||
return loadConfig(readSource()); | |||||
} | } | ||||
public T loadConfig(S conf) throws Exception { | public T loadConfig(S conf) throws Exception { | ||||
T value = parser.parse(conf); | |||||
T value = parser.convert(conf); | |||||
return value; | return value; | ||||
} | } | ||||
@@ -47,10 +53,4 @@ public abstract class AbstractDataSource<S, T> implements DataSource<S, T> { | |||||
public SentinelProperty<T> getProperty() { | public SentinelProperty<T> getProperty() { | ||||
return property; | return property; | ||||
} | } | ||||
@Override | |||||
public void writeDataSource(T values) throws Exception { | |||||
throw new UnsupportedOperationException(); | |||||
} | |||||
} | } |
@@ -23,7 +23,7 @@ import com.alibaba.csp.sentinel.concurrent.NamedThreadFactory; | |||||
import com.alibaba.csp.sentinel.log.RecordLog; | import com.alibaba.csp.sentinel.log.RecordLog; | ||||
/** | /** | ||||
* A {@link DataSource} automatically fetches the backend data. | |||||
* A {@link ReadableDataSource} automatically fetches the backend data. | |||||
* | * | ||||
* @param <S> source data type | * @param <S> source data type | ||||
* @param <T> target data type | * @param <T> target data type | ||||
@@ -34,12 +34,12 @@ public abstract class AutoRefreshDataSource<S, T> extends AbstractDataSource<S, | |||||
private ScheduledExecutorService service; | private ScheduledExecutorService service; | ||||
protected long recommendRefreshMs = 3000; | protected long recommendRefreshMs = 3000; | ||||
public AutoRefreshDataSource(ConfigParser<S, T> configParser) { | |||||
public AutoRefreshDataSource(Converter<S, T> configParser) { | |||||
super(configParser); | super(configParser); | ||||
startTimerService(); | startTimerService(); | ||||
} | } | ||||
public AutoRefreshDataSource(ConfigParser<S, T> configParser, final long recommendRefreshMs) { | |||||
public AutoRefreshDataSource(Converter<S, T> configParser, final long recommendRefreshMs) { | |||||
super(configParser); | super(configParser); | ||||
if (recommendRefreshMs <= 0) { | if (recommendRefreshMs <= 0) { | ||||
throw new IllegalArgumentException("recommendRefreshMs must > 0, but " + recommendRefreshMs + " get"); | throw new IllegalArgumentException("recommendRefreshMs must > 0, but " + recommendRefreshMs + " get"); | ||||
@@ -72,5 +72,4 @@ public abstract class AutoRefreshDataSource<S, T> extends AbstractDataSource<S, | |||||
service = null; | service = null; | ||||
} | } | ||||
} | } | ||||
} | } |
@@ -16,16 +16,18 @@ | |||||
package com.alibaba.csp.sentinel.datasource; | package com.alibaba.csp.sentinel.datasource; | ||||
/** | /** | ||||
* Parse config from source data type S to target data type T. | |||||
* Convert an object from source type {@code S} to target type {@code T}. | |||||
* | * | ||||
* @author leyou | * @author leyou | ||||
* @author Eric Zhao | |||||
*/ | */ | ||||
public interface ConfigParser<S, T> { | |||||
public interface Converter<S, T> { | |||||
/** | /** | ||||
* Parse {@code source} to the target format. | |||||
* Convert {@code source} to the target type. | |||||
* | * | ||||
* @param source the source. | |||||
* @return the target. | |||||
* @param source the source object | |||||
* @return the target object | |||||
*/ | */ | ||||
T parse(S source); | |||||
T convert(S source); | |||||
} | } |
@@ -19,18 +19,18 @@ import com.alibaba.csp.sentinel.property.NoOpSentinelProperty; | |||||
import com.alibaba.csp.sentinel.property.SentinelProperty; | import com.alibaba.csp.sentinel.property.SentinelProperty; | ||||
/** | /** | ||||
* A {@link DataSource} based on nothing. {@link EmptyDataSource#getProperty()} will always return the same cached | |||||
* A {@link ReadableDataSource} based on nothing. {@link EmptyDataSource#getProperty()} will always return the same cached | |||||
* {@link SentinelProperty} that doing nothing. | * {@link SentinelProperty} that doing nothing. | ||||
* <br/> | * <br/> | ||||
* This class is used when we want to use default settings instead of configs from the {@link DataSource} | |||||
* This class is used when we want to use default settings instead of configs from the {@link ReadableDataSource}. | |||||
* | * | ||||
* @author leyou | * @author leyou | ||||
*/ | */ | ||||
public class EmptyDataSource implements DataSource<Object, Object> { | |||||
public final class EmptyDataSource implements ReadableDataSource<Object, Object> { | |||||
public static final DataSource<Object, Object> EMPTY_DATASOURCE = new EmptyDataSource(); | |||||
public static final ReadableDataSource<Object, Object> EMPTY_DATASOURCE = new EmptyDataSource(); | |||||
private static final SentinelProperty<Object> property = new NoOpSentinelProperty(); | |||||
private static final SentinelProperty<Object> PROPERTY = new NoOpSentinelProperty(); | |||||
private EmptyDataSource() { } | private EmptyDataSource() { } | ||||
@@ -46,15 +46,9 @@ public class EmptyDataSource implements DataSource<Object, Object> { | |||||
@Override | @Override | ||||
public SentinelProperty<Object> getProperty() { | public SentinelProperty<Object> getProperty() { | ||||
return property; | |||||
return PROPERTY; | |||||
} | } | ||||
@Override | @Override | ||||
public void close() throws Exception { } | public void close() throws Exception { } | ||||
@Override | |||||
public void writeDataSource(Object config) throws Exception { | |||||
throw new UnsupportedOperationException(); | |||||
} | |||||
} | } |
@@ -25,16 +25,17 @@ import com.alibaba.csp.sentinel.log.RecordLog; | |||||
/** | /** | ||||
* <p> | * <p> | ||||
* A {@link DataSource} based on file. This class will automatically fetches the backend file every 3 seconds. | |||||
* A {@link WritableDataSource} based on file. This class will automatically fetches the backend file every refresh period. | |||||
* </p> | * </p> | ||||
* <p> | * <p> | ||||
* Limitations: default read buffer size is 1MB, if file size is greater than buffer size, exceeding bytes will | |||||
* be ignored. Default charset is UTF8. | |||||
* Limitations: Default read buffer size is 1 MB. If file size is greater than buffer size, exceeding bytes will | |||||
* be ignored. Default charset is UTF-8. | |||||
* </p> | * </p> | ||||
* | * | ||||
* @author Carpenter Lee | * @author Carpenter Lee | ||||
* @author Eric Zhao | |||||
*/ | */ | ||||
public class FileRefreshableDataSource<T> extends AutoRefreshDataSource<String, T> { | |||||
public class FileRefreshableDataSource<T> extends AutoRefreshDataSource<String, T> implements WritableDataSource<T> { | |||||
private static final int MAX_SIZE = 1024 * 1024 * 4; | private static final int MAX_SIZE = 1024 * 1024 * 4; | ||||
private static final long DEFAULT_REFRESH_MS = 3000; | private static final long DEFAULT_REFRESH_MS = 3000; | ||||
@@ -42,37 +43,38 @@ public class FileRefreshableDataSource<T> extends AutoRefreshDataSource<String, | |||||
private static final Charset DEFAULT_CHAR_SET = Charset.forName("utf-8"); | private static final Charset DEFAULT_CHAR_SET = Charset.forName("utf-8"); | ||||
private byte[] buf; | private byte[] buf; | ||||
private Charset charset; | |||||
private File file; | |||||
private final Charset charset; | |||||
private final File file; | |||||
private final Converter<T, String> configEncoder; | |||||
/** | /** | ||||
* Create a file based {@link DataSource} whose read buffer size is 1MB, charset is UTF8, | |||||
* Create a file based {@link ReadableDataSource} whose read buffer size is 1MB, charset is UTF8, | |||||
* and read interval is 3 seconds. | * and read interval is 3 seconds. | ||||
* | * | ||||
* @param file the file to read. | |||||
* @param configParser the config parser. | |||||
* @param file the file to read | |||||
* @param configParser the config decoder (parser) | |||||
*/ | */ | ||||
public FileRefreshableDataSource(File file, ConfigParser<String, T> configParser) throws FileNotFoundException { | |||||
this(file, configParser, DEFAULT_REFRESH_MS, DEFAULT_BUF_SIZE, DEFAULT_CHAR_SET); | |||||
public FileRefreshableDataSource(File file, Converter<String, T> configParser, Converter<T, String> configEncoder) throws FileNotFoundException { | |||||
this(file, configParser, configEncoder, DEFAULT_REFRESH_MS, DEFAULT_BUF_SIZE, DEFAULT_CHAR_SET); | |||||
} | } | ||||
public FileRefreshableDataSource(String fileName, ConfigParser<String, T> configParser) | |||||
public FileRefreshableDataSource(String fileName, Converter<String, T> configParser, Converter<T, String> configEncoder) | |||||
throws FileNotFoundException { | throws FileNotFoundException { | ||||
this(new File(fileName), configParser, DEFAULT_REFRESH_MS, DEFAULT_BUF_SIZE, DEFAULT_CHAR_SET); | |||||
//System.out.println(file.getAbsoluteFile()); | |||||
this(new File(fileName), configParser, configEncoder, DEFAULT_REFRESH_MS, DEFAULT_BUF_SIZE, DEFAULT_CHAR_SET); | |||||
} | } | ||||
public FileRefreshableDataSource(File file, ConfigParser<String, T> configParser, int bufSize) | |||||
public FileRefreshableDataSource(File file, Converter<String, T> configParser, Converter<T, String> configEncoder, int bufSize) | |||||
throws FileNotFoundException { | throws FileNotFoundException { | ||||
this(file, configParser, DEFAULT_REFRESH_MS, bufSize, DEFAULT_CHAR_SET); | |||||
this(file, configParser, configEncoder, DEFAULT_REFRESH_MS, bufSize, DEFAULT_CHAR_SET); | |||||
} | } | ||||
public FileRefreshableDataSource(File file, ConfigParser<String, T> configParser, Charset charset) | |||||
public FileRefreshableDataSource(File file, Converter<String, T> configParser, Converter<T, String> configEncoder, Charset charset) | |||||
throws FileNotFoundException { | throws FileNotFoundException { | ||||
this(file, configParser, DEFAULT_REFRESH_MS, DEFAULT_BUF_SIZE, charset); | |||||
this(file, configParser, configEncoder, DEFAULT_REFRESH_MS, DEFAULT_BUF_SIZE, charset); | |||||
} | } | ||||
public FileRefreshableDataSource(File file, ConfigParser<String, T> configParser, long recommendRefreshMs, | |||||
public FileRefreshableDataSource(File file, Converter<String, T> configParser, Converter<T, String> configEncoder, long recommendRefreshMs, | |||||
int bufSize, Charset charset) throws FileNotFoundException { | int bufSize, Charset charset) throws FileNotFoundException { | ||||
super(configParser, recommendRefreshMs); | super(configParser, recommendRefreshMs); | ||||
if (bufSize <= 0 || bufSize > MAX_SIZE) { | if (bufSize <= 0 || bufSize > MAX_SIZE) { | ||||
@@ -84,9 +86,13 @@ public class FileRefreshableDataSource<T> extends AutoRefreshDataSource<String, | |||||
if (charset == null) { | if (charset == null) { | ||||
throw new IllegalArgumentException("charset can't be null"); | throw new IllegalArgumentException("charset can't be null"); | ||||
} | } | ||||
if (configEncoder == null) { | |||||
throw new IllegalArgumentException("Config encoder cannot be null"); | |||||
} | |||||
this.buf = new byte[bufSize]; | this.buf = new byte[bufSize]; | ||||
this.file = file; | this.file = file; | ||||
this.charset = charset; | this.charset = charset; | ||||
this.configEncoder = configEncoder; | |||||
firstLoad(); | firstLoad(); | ||||
} | } | ||||
@@ -106,7 +112,7 @@ public class FileRefreshableDataSource<T> extends AutoRefreshDataSource<String, | |||||
inputStream = new FileInputStream(file); | inputStream = new FileInputStream(file); | ||||
FileChannel channel = inputStream.getChannel(); | FileChannel channel = inputStream.getChannel(); | ||||
if (channel.size() > buf.length) { | if (channel.size() > buf.length) { | ||||
throw new RuntimeException(file.getAbsolutePath() + " file size=" + channel.size() | |||||
throw new IllegalStateException(file.getAbsolutePath() + " file size=" + channel.size() | |||||
+ ", is bigger than bufSize=" + buf.length + ". Can't read"); | + ", is bigger than bufSize=" + buf.length + ". Can't read"); | ||||
} | } | ||||
int len = inputStream.read(buf); | int len = inputStream.read(buf); | ||||
@@ -128,7 +134,7 @@ public class FileRefreshableDataSource<T> extends AutoRefreshDataSource<String, | |||||
} | } | ||||
@Override | @Override | ||||
public void writeDataSource(T values) throws Exception { | |||||
throw new UnsupportedOperationException(); | |||||
public void write(T value) throws Exception { | |||||
throw new UnsupportedOperationException("Not implemented"); | |||||
} | } | ||||
} | } |
@@ -18,19 +18,20 @@ package com.alibaba.csp.sentinel.datasource; | |||||
import com.alibaba.csp.sentinel.property.SentinelProperty; | import com.alibaba.csp.sentinel.property.SentinelProperty; | ||||
/** | /** | ||||
* This class is responsible for getting configs. | |||||
* The readable data source is responsible for retrieving configs (read-only). | |||||
* | * | ||||
* @param <S> source data type | * @param <S> source data type | ||||
* @param <T> target data type | * @param <T> target data type | ||||
* @author leyou | * @author leyou | ||||
* @author Eric Zhao | |||||
*/ | */ | ||||
public interface DataSource<S, T> { | |||||
public interface ReadableDataSource<S, T> { | |||||
/** | /** | ||||
* Load data data source as the target type. | * Load data data source as the target type. | ||||
* | * | ||||
* @return the target data. | * @return the target data. | ||||
* @throws Exception | |||||
* @throws Exception IO or other error occurs | |||||
*/ | */ | ||||
T loadConfig() throws Exception; | T loadConfig() throws Exception; | ||||
@@ -38,7 +39,7 @@ public interface DataSource<S, T> { | |||||
* Read original data from the data source. | * Read original data from the data source. | ||||
* | * | ||||
* @return the original data. | * @return the original data. | ||||
* @throws Exception | |||||
* @throws Exception IO or other error occurs | |||||
*/ | */ | ||||
S readSource() throws Exception; | S readSource() throws Exception; | ||||
@@ -49,18 +50,10 @@ public interface DataSource<S, T> { | |||||
*/ | */ | ||||
SentinelProperty<T> getProperty(); | SentinelProperty<T> getProperty(); | ||||
/** | |||||
* Write the {@code values} to the data source. | |||||
* | |||||
* @param values | |||||
* @throws Exception | |||||
*/ | |||||
void writeDataSource(T values) throws Exception; | |||||
/** | /** | ||||
* Close the data source. | * Close the data source. | ||||
* | * | ||||
* @throws Exception | |||||
* @throws Exception IO or other error occurs | |||||
*/ | */ | ||||
void close() throws Exception; | void close() throws Exception; | ||||
} | } |
@@ -0,0 +1,33 @@ | |||||
/* | |||||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||||
* | |||||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||||
* you may not use this file except in compliance with the License. | |||||
* You may obtain a copy of the License at | |||||
* | |||||
* http://www.apache.org/licenses/LICENSE-2.0 | |||||
* | |||||
* Unless required by applicable law or agreed to in writing, software | |||||
* distributed under the License is distributed on an "AS IS" BASIS, | |||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
* See the License for the specific language governing permissions and | |||||
* limitations under the License. | |||||
*/ | |||||
package com.alibaba.csp.sentinel.datasource; | |||||
/** | |||||
* Interface of writable data source support. | |||||
* | |||||
* @author Eric Zhao | |||||
* @since 0.2.0 | |||||
*/ | |||||
public interface WritableDataSource<T> { | |||||
/** | |||||
* Write the {@code value} to the data source. | |||||
* | |||||
* @param value value to write | |||||
* @throws Exception IO or other error occurs | |||||
*/ | |||||
void write(T value) throws Exception; | |||||
} |
@@ -19,7 +19,7 @@ For instance: | |||||
```java | ```java | ||||
// remoteAddress is the address of Nacos | // remoteAddress is the address of Nacos | ||||
// groupId and dataId are concepts of Nacos | // groupId and dataId are concepts of Nacos | ||||
DataSource<String, List<FlowRule>> flowRuleDataSource = new NacosDataSource<>(remoteAddress, groupId, dataId, | |||||
ReadableDataSource<String, List<FlowRule>> flowRuleDataSource = new NacosDataSource<>(remoteAddress, groupId, dataId, | |||||
source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() {})); | source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() {})); | ||||
FlowRuleManager.register2Property(flowRuleDataSource.getProperty()); | FlowRuleManager.register2Property(flowRuleDataSource.getProperty()); | ||||
``` | ``` | ||||
@@ -23,8 +23,7 @@ import java.util.concurrent.TimeUnit; | |||||
import com.alibaba.csp.sentinel.concurrent.NamedThreadFactory; | import com.alibaba.csp.sentinel.concurrent.NamedThreadFactory; | ||||
import com.alibaba.csp.sentinel.datasource.AbstractDataSource; | import com.alibaba.csp.sentinel.datasource.AbstractDataSource; | ||||
import com.alibaba.csp.sentinel.datasource.ConfigParser; | |||||
import com.alibaba.csp.sentinel.datasource.DataSource; | |||||
import com.alibaba.csp.sentinel.datasource.Converter; | |||||
import com.alibaba.csp.sentinel.log.RecordLog; | import com.alibaba.csp.sentinel.log.RecordLog; | ||||
import com.alibaba.csp.sentinel.util.StringUtil; | import com.alibaba.csp.sentinel.util.StringUtil; | ||||
import com.alibaba.nacos.api.NacosFactory; | import com.alibaba.nacos.api.NacosFactory; | ||||
@@ -32,7 +31,7 @@ import com.alibaba.nacos.api.config.ConfigService; | |||||
import com.alibaba.nacos.api.config.listener.Listener; | import com.alibaba.nacos.api.config.listener.Listener; | ||||
/** | /** | ||||
* A {@link DataSource} with Nacos backend. When the data in Nacos backend has been modified, | |||||
* A read-only {@code DataSource} with Nacos backend. When the data in Nacos backend has been modified, | |||||
* Nacos will automatically push the new value so that the dynamic configuration can be real-time. | * Nacos will automatically push the new value so that the dynamic configuration can be real-time. | ||||
* | * | ||||
* @author Eric Zhao | * @author Eric Zhao | ||||
@@ -58,7 +57,7 @@ public class NacosDataSource<T> extends AbstractDataSource<String, T> { | |||||
private ConfigService configService = null; | private ConfigService configService = null; | ||||
/** | /** | ||||
* Constructs an DataSource with Nacos backend. | |||||
* Constructs an read-only DataSource with Nacos backend. | |||||
* | * | ||||
* @param serverAddr server address of Nacos, cannot be empty | * @param serverAddr server address of Nacos, cannot be empty | ||||
* @param groupId group ID, cannot be empty | * @param groupId group ID, cannot be empty | ||||
@@ -66,7 +65,7 @@ public class NacosDataSource<T> extends AbstractDataSource<String, T> { | |||||
* @param parser customized data parser, cannot be empty | * @param parser customized data parser, cannot be empty | ||||
*/ | */ | ||||
public NacosDataSource(final String serverAddr, final String groupId, final String dataId, | public NacosDataSource(final String serverAddr, final String groupId, final String dataId, | ||||
ConfigParser<String, T> parser) { | |||||
Converter<String, T> parser) { | |||||
super(parser); | super(parser); | ||||
if (StringUtil.isBlank(serverAddr) || StringUtil.isBlank(groupId) || StringUtil.isBlank(dataId)) { | if (StringUtil.isBlank(serverAddr) || StringUtil.isBlank(groupId) || StringUtil.isBlank(dataId)) { | ||||
throw new IllegalArgumentException(String.format("Bad argument: serverAddr=[%s], groupId=[%s], dataId=[%s]", | throw new IllegalArgumentException(String.format("Bad argument: serverAddr=[%s], groupId=[%s], dataId=[%s]", | ||||
@@ -84,7 +83,7 @@ public class NacosDataSource<T> extends AbstractDataSource<String, T> { | |||||
public void receiveConfigInfo(final String configInfo) { | public void receiveConfigInfo(final String configInfo) { | ||||
RecordLog.info(String.format("[NacosDataSource] New property value received for (%s, %s, %s): %s", | RecordLog.info(String.format("[NacosDataSource] New property value received for (%s, %s, %s): %s", | ||||
serverAddr, dataId, groupId, configInfo)); | serverAddr, dataId, groupId, configInfo)); | ||||
T newValue = NacosDataSource.this.parser.parse(configInfo); | |||||
T newValue = NacosDataSource.this.parser.convert(configInfo); | |||||
// Update the new value to the property. | // Update the new value to the property. | ||||
getProperty().updateValue(newValue); | getProperty().updateValue(newValue); | ||||
} | } | ||||
@@ -97,11 +96,11 @@ public class NacosDataSource<T> extends AbstractDataSource<String, T> { | |||||
try { | try { | ||||
T newValue = loadConfig(); | T newValue = loadConfig(); | ||||
if (newValue == null) { | if (newValue == null) { | ||||
RecordLog.info("[NacosDataSource] WARN: initial config is null, you may have to check your data source"); | |||||
RecordLog.warn("[NacosDataSource] WARN: initial config is null, you may have to check your data source"); | |||||
} | } | ||||
getProperty().updateValue(newValue); | getProperty().updateValue(newValue); | ||||
} catch (Exception ex) { | } catch (Exception ex) { | ||||
RecordLog.info("[NacosDataSource] Error when loading initial config", ex); | |||||
RecordLog.warn("[NacosDataSource] Error when loading initial config", ex); | |||||
} | } | ||||
} | } | ||||
@@ -111,7 +110,7 @@ public class NacosDataSource<T> extends AbstractDataSource<String, T> { | |||||
// Add config listener. | // Add config listener. | ||||
configService.addListener(dataId, groupId, configListener); | configService.addListener(dataId, groupId, configListener); | ||||
} catch (Exception e) { | } catch (Exception e) { | ||||
RecordLog.info("[NacosDataSource] Error occurred when initializing Nacos data source", e); | |||||
RecordLog.warn("[NacosDataSource] Error occurred when initializing Nacos data source", e); | |||||
e.printStackTrace(); | e.printStackTrace(); | ||||
} | } | ||||
} | } | ||||
@@ -18,7 +18,7 @@ For instance: | |||||
```java | ```java | ||||
// `path` is the data path in ZooKeeper | // `path` is the data path in ZooKeeper | ||||
DataSource<String, List<FlowRule>> flowRuleDataSource = new ZookeeperDataSource<>(remoteAddress, path, source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() {})); | |||||
ReadableDataSource<String, List<FlowRule>> flowRuleDataSource = new ZookeeperDataSource<>(remoteAddress, path, source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() {})); | |||||
FlowRuleManager.register2Property(flowRuleDataSource.getProperty()); | FlowRuleManager.register2Property(flowRuleDataSource.getProperty()); | ||||
``` | ``` | ||||
@@ -7,7 +7,7 @@ import java.util.concurrent.TimeUnit; | |||||
import com.alibaba.csp.sentinel.concurrent.NamedThreadFactory; | import com.alibaba.csp.sentinel.concurrent.NamedThreadFactory; | ||||
import com.alibaba.csp.sentinel.datasource.AbstractDataSource; | import com.alibaba.csp.sentinel.datasource.AbstractDataSource; | ||||
import com.alibaba.csp.sentinel.datasource.ConfigParser; | |||||
import com.alibaba.csp.sentinel.datasource.Converter; | |||||
import com.alibaba.csp.sentinel.log.RecordLog; | import com.alibaba.csp.sentinel.log.RecordLog; | ||||
import com.alibaba.csp.sentinel.util.StringUtil; | import com.alibaba.csp.sentinel.util.StringUtil; | ||||
@@ -21,7 +21,7 @@ import org.apache.zookeeper.CreateMode; | |||||
import org.apache.zookeeper.data.Stat; | import org.apache.zookeeper.data.Stat; | ||||
/** | /** | ||||
* Zookeeper DataSource | |||||
* A read-only {@code DataSource} with ZooKeeper backend. | |||||
* | * | ||||
* @author guonanjun | * @author guonanjun | ||||
*/ | */ | ||||
@@ -40,7 +40,7 @@ public class ZookeeperDataSource<T> extends AbstractDataSource<String, T> { | |||||
private CuratorFramework zkClient = null; | private CuratorFramework zkClient = null; | ||||
private NodeCache nodeCache = null; | private NodeCache nodeCache = null; | ||||
public ZookeeperDataSource(final String serverAddr, final String path, ConfigParser<String, T> parser) { | |||||
public ZookeeperDataSource(final String serverAddr, final String path, Converter<String, T> parser) { | |||||
super(parser); | super(parser); | ||||
if (StringUtil.isBlank(serverAddr) || StringUtil.isBlank(path)) { | if (StringUtil.isBlank(serverAddr) || StringUtil.isBlank(path)) { | ||||
throw new IllegalArgumentException(String.format("Bad argument: serverAddr=[%s], path=[%s]", serverAddr, path)); | throw new IllegalArgumentException(String.format("Bad argument: serverAddr=[%s], path=[%s]", serverAddr, path)); | ||||
@@ -54,7 +54,7 @@ public class ZookeeperDataSource<T> extends AbstractDataSource<String, T> { | |||||
* This constructor is Nacos-style. | * This constructor is Nacos-style. | ||||
*/ | */ | ||||
public ZookeeperDataSource(final String serverAddr, final String groupId, final String dataId, | public ZookeeperDataSource(final String serverAddr, final String groupId, final String dataId, | ||||
ConfigParser<String, T> parser) { | |||||
Converter<String, T> parser) { | |||||
super(parser); | super(parser); | ||||
if (StringUtil.isBlank(serverAddr) || StringUtil.isBlank(groupId) || StringUtil.isBlank(dataId)) { | if (StringUtil.isBlank(serverAddr) || StringUtil.isBlank(groupId) || StringUtil.isBlank(dataId)) { | ||||
throw new IllegalArgumentException(String.format("Bad argument: serverAddr=[%s], groupId=[%s], dataId=[%s]", serverAddr, groupId, dataId)); | throw new IllegalArgumentException(String.format("Bad argument: serverAddr=[%s], groupId=[%s], dataId=[%s]", serverAddr, groupId, dataId)); | ||||
@@ -73,11 +73,11 @@ public class ZookeeperDataSource<T> extends AbstractDataSource<String, T> { | |||||
try { | try { | ||||
T newValue = loadConfig(); | T newValue = loadConfig(); | ||||
if (newValue == null) { | if (newValue == null) { | ||||
RecordLog.info("[ZookeeperDataSource] WARN: initial config is null, you may have to check your data source"); | |||||
RecordLog.warn("[ZookeeperDataSource] WARN: initial config is null, you may have to check your data source"); | |||||
} | } | ||||
getProperty().updateValue(newValue); | getProperty().updateValue(newValue); | ||||
} catch (Exception ex) { | } catch (Exception ex) { | ||||
RecordLog.info("[ZookeeperDataSource] Error when loading initial config", ex); | |||||
RecordLog.warn("[ZookeeperDataSource] Error when loading initial config", ex); | |||||
} | } | ||||
} | } | ||||
@@ -95,7 +95,7 @@ public class ZookeeperDataSource<T> extends AbstractDataSource<String, T> { | |||||
} | } | ||||
RecordLog.info(String.format("[ZookeeperDataSource] New property value received for (%s, %s): %s", | RecordLog.info(String.format("[ZookeeperDataSource] New property value received for (%s, %s): %s", | ||||
serverAddr, path, configInfo)); | serverAddr, path, configInfo)); | ||||
T newValue = ZookeeperDataSource.this.parser.parse(configInfo); | |||||
T newValue = ZookeeperDataSource.this.parser.convert(configInfo); | |||||
// Update the new value to the property. | // Update the new value to the property. | ||||
getProperty().updateValue(newValue); | getProperty().updateValue(newValue); | ||||
} | } | ||||
@@ -112,7 +112,7 @@ public class ZookeeperDataSource<T> extends AbstractDataSource<String, T> { | |||||
this.nodeCache.getListenable().addListener(this.listener, this.pool); | this.nodeCache.getListenable().addListener(this.listener, this.pool); | ||||
this.nodeCache.start(); | this.nodeCache.start(); | ||||
} catch (Exception e) { | } catch (Exception e) { | ||||
RecordLog.info("[ZookeeperDataSource] Error occurred when initializing Zookeeper data source", e); | |||||
RecordLog.warn("[ZookeeperDataSource] Error occurred when initializing Zookeeper data source", e); | |||||
e.printStackTrace(); | e.printStackTrace(); | ||||
} | } | ||||
} | } | ||||
@@ -3,8 +3,8 @@ package com.alibaba.csp.sentinel.datasource.zookeeper; | |||||
import java.util.Collections; | import java.util.Collections; | ||||
import java.util.List; | import java.util.List; | ||||
import com.alibaba.csp.sentinel.datasource.ConfigParser; | |||||
import com.alibaba.csp.sentinel.datasource.DataSource; | |||||
import com.alibaba.csp.sentinel.datasource.Converter; | |||||
import com.alibaba.csp.sentinel.datasource.ReadableDataSource; | |||||
import com.alibaba.csp.sentinel.slots.block.RuleConstant; | import com.alibaba.csp.sentinel.slots.block.RuleConstant; | ||||
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; | import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; | ||||
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager; | import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager; | ||||
@@ -34,10 +34,10 @@ public class ZookeeperDataSourceTest { | |||||
final String remoteAddress = server.getConnectString(); | final String remoteAddress = server.getConnectString(); | ||||
final String path = "/sentinel-zk-ds-demo/flow-HK"; | final String path = "/sentinel-zk-ds-demo/flow-HK"; | ||||
DataSource<String, List<FlowRule>> flowRuleDataSource = new ZookeeperDataSource<List<FlowRule>>(remoteAddress, path, | |||||
new ConfigParser<String, List<FlowRule>>() { | |||||
ReadableDataSource<String, List<FlowRule>> flowRuleDataSource = new ZookeeperDataSource<List<FlowRule>>(remoteAddress, path, | |||||
new Converter<String, List<FlowRule>>() { | |||||
@Override | @Override | ||||
public List<FlowRule> parse(String source) { | |||||
public List<FlowRule> convert(String source) { | |||||
return JSON.parseObject(source, new TypeReference<List<FlowRule>>() {}); | return JSON.parseObject(source, new TypeReference<List<FlowRule>>() {}); | ||||
} | } | ||||
}); | }); | ||||