- Add `close` method in WritableDataSource (to extend AutoCloseable in JDK 1.7 later) - Separate the writable file data source from original class - Add a sample to show how to register data sources via Sentinel init mechanism - Separate a writable data source registry from original handler to make it clear Signed-off-by: Eric Zhao <sczyh16@gmail.com>master
@@ -19,6 +19,10 @@ | |||||
<groupId>com.alibaba.csp</groupId> | <groupId>com.alibaba.csp</groupId> | ||||
<artifactId>sentinel-datasource-extension</artifactId> | <artifactId>sentinel-datasource-extension</artifactId> | ||||
</dependency> | </dependency> | ||||
<dependency> | |||||
<groupId>com.alibaba.csp</groupId> | |||||
<artifactId>sentinel-transport-simple-http</artifactId> | |||||
</dependency> | |||||
<dependency> | <dependency> | ||||
<groupId>com.alibaba</groupId> | <groupId>com.alibaba</groupId> | ||||
@@ -81,19 +81,19 @@ public class FileDataSourceDemo { | |||||
// Data source for FlowRule | // Data source for FlowRule | ||||
FileRefreshableDataSource<List<FlowRule>> flowRuleDataSource = new FileRefreshableDataSource<>( | FileRefreshableDataSource<List<FlowRule>> flowRuleDataSource = new FileRefreshableDataSource<>( | ||||
flowRulePath, flowRuleListParser, this::encodeJson); | |||||
flowRulePath, flowRuleListParser); | |||||
FlowRuleManager.register2Property(flowRuleDataSource.getProperty()); | FlowRuleManager.register2Property(flowRuleDataSource.getProperty()); | ||||
// Data source for DegradeRule | // Data source for DegradeRule | ||||
FileRefreshableDataSource<List<DegradeRule>> degradeRuleDataSource | FileRefreshableDataSource<List<DegradeRule>> degradeRuleDataSource | ||||
= new FileRefreshableDataSource<>( | = new FileRefreshableDataSource<>( | ||||
degradeRulePath, degradeRuleListParser, this::encodeJson); | |||||
degradeRulePath, degradeRuleListParser); | |||||
DegradeRuleManager.register2Property(degradeRuleDataSource.getProperty()); | DegradeRuleManager.register2Property(degradeRuleDataSource.getProperty()); | ||||
// Data source for SystemRule | // Data source for SystemRule | ||||
FileRefreshableDataSource<List<SystemRule>> systemRuleDataSource | FileRefreshableDataSource<List<SystemRule>> systemRuleDataSource | ||||
= new FileRefreshableDataSource<>( | = new FileRefreshableDataSource<>( | ||||
systemRulePath, systemRuleListParser, this::encodeJson); | |||||
systemRulePath, systemRuleListParser); | |||||
SystemRuleManager.register2Property(systemRuleDataSource.getProperty()); | SystemRuleManager.register2Property(systemRuleDataSource.getProperty()); | ||||
} | } | ||||
@@ -103,8 +103,4 @@ public class FileDataSourceDemo { | |||||
new TypeReference<List<DegradeRule>>() {}); | new TypeReference<List<DegradeRule>>() {}); | ||||
private Converter<String, List<SystemRule>> systemRuleListParser = source -> JSON.parseObject(source, | private Converter<String, List<SystemRule>> systemRuleListParser = source -> JSON.parseObject(source, | ||||
new TypeReference<List<SystemRule>>() {}); | new TypeReference<List<SystemRule>>() {}); | ||||
private <T> String encodeJson(T t) { | |||||
return JSON.toJSONString(t); | |||||
} | |||||
} | } |
@@ -0,0 +1,67 @@ | |||||
/* | |||||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||||
* | |||||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||||
* you may not use this file except in compliance with the License. | |||||
* You may obtain a copy of the License at | |||||
* | |||||
* http://www.apache.org/licenses/LICENSE-2.0 | |||||
* | |||||
* Unless required by applicable law or agreed to in writing, software | |||||
* distributed under the License is distributed on an "AS IS" BASIS, | |||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
* See the License for the specific language governing permissions and | |||||
* limitations under the License. | |||||
*/ | |||||
package com.alibaba.csp.sentinel.demo.file.rule; | |||||
import java.util.List; | |||||
import com.alibaba.csp.sentinel.datasource.FileRefreshableDataSource; | |||||
import com.alibaba.csp.sentinel.datasource.FileWritableDataSource; | |||||
import com.alibaba.csp.sentinel.datasource.ReadableDataSource; | |||||
import com.alibaba.csp.sentinel.datasource.WritableDataSource; | |||||
import com.alibaba.csp.sentinel.init.InitFunc; | |||||
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; | |||||
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager; | |||||
import com.alibaba.csp.sentinel.transport.util.WritableDataSourceRegistry; | |||||
import com.alibaba.fastjson.JSON; | |||||
import com.alibaba.fastjson.TypeReference; | |||||
/** | |||||
* <p> | |||||
* A sample showing how to register readable and writable data source via Sentinel init SPI mechanism. | |||||
* </p> | |||||
* <p> | |||||
* To activate this, you can add the class name to `com.alibaba.csp.sentinel.init.InitFunc` file | |||||
* in `META-INF/services/` directory of the resource directory. Then the data source will be automatically | |||||
* registered during the initialization of Sentinel. | |||||
* </p> | |||||
* | |||||
* @author Eric Zhao | |||||
*/ | |||||
public class FileDataSourceInit implements InitFunc { | |||||
@Override | |||||
public void init() throws Exception { | |||||
// A fake path. | |||||
String flowRuleDir = System.getProperty("user.home") + "/sentinel/rules"; | |||||
String flowRuleFile = "flowRule.json"; | |||||
String flowRulePath = flowRuleDir + "/" + flowRuleFile; | |||||
ReadableDataSource<String, List<FlowRule>> ds = new FileRefreshableDataSource<>( | |||||
flowRulePath, source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() {}) | |||||
); | |||||
// Register to flow rule manager. | |||||
FlowRuleManager.register2Property(ds.getProperty()); | |||||
WritableDataSource<List<FlowRule>> wds = new FileWritableDataSource<>(flowRulePath, this::encodeJson); | |||||
// Register to writable data source registry so that rules can be updated to file | |||||
// when there are rules pushed from the Sentinel Dashboard. | |||||
WritableDataSourceRegistry.registerFlowDataSource(wds); | |||||
} | |||||
private <T> String encodeJson(T t) { | |||||
return JSON.toJSONString(t); | |||||
} | |||||
} |
@@ -25,7 +25,7 @@ import com.alibaba.csp.sentinel.log.RecordLog; | |||||
/** | /** | ||||
* <p> | * <p> | ||||
* A {@link WritableDataSource} based on file. This class will automatically fetches the backend file every refresh period. | |||||
* A {@link ReadableDataSource} based on file. This class will automatically fetches the backend file every refresh period. | |||||
* </p> | * </p> | ||||
* <p> | * <p> | ||||
* Limitations: Default read buffer size is 1 MB. If file size is greater than buffer size, exceeding bytes will | * Limitations: Default read buffer size is 1 MB. If file size is greater than buffer size, exceeding bytes will | ||||
@@ -35,7 +35,7 @@ import com.alibaba.csp.sentinel.log.RecordLog; | |||||
* @author Carpenter Lee | * @author Carpenter Lee | ||||
* @author Eric Zhao | * @author Eric Zhao | ||||
*/ | */ | ||||
public class FileRefreshableDataSource<T> extends AutoRefreshDataSource<String, T> implements WritableDataSource<T> { | |||||
public class FileRefreshableDataSource<T> extends AutoRefreshDataSource<String, T> { | |||||
private static final int MAX_SIZE = 1024 * 1024 * 4; | private static final int MAX_SIZE = 1024 * 1024 * 4; | ||||
private static final long DEFAULT_REFRESH_MS = 3000; | private static final long DEFAULT_REFRESH_MS = 3000; | ||||
@@ -46,8 +46,6 @@ public class FileRefreshableDataSource<T> extends AutoRefreshDataSource<String, | |||||
private final Charset charset; | private final Charset charset; | ||||
private final File file; | private final File file; | ||||
private final Converter<T, String> configEncoder; | |||||
/** | /** | ||||
* Create a file based {@link ReadableDataSource} whose read buffer size is 1MB, charset is UTF8, | * Create a file based {@link ReadableDataSource} whose read buffer size is 1MB, charset is UTF8, | ||||
* and read interval is 3 seconds. | * and read interval is 3 seconds. | ||||
@@ -55,26 +53,26 @@ public class FileRefreshableDataSource<T> extends AutoRefreshDataSource<String, | |||||
* @param file the file to read | * @param file the file to read | ||||
* @param configParser the config decoder (parser) | * @param configParser the config decoder (parser) | ||||
*/ | */ | ||||
public FileRefreshableDataSource(File file, Converter<String, T> configParser, Converter<T, String> configEncoder) throws FileNotFoundException { | |||||
this(file, configParser, configEncoder, DEFAULT_REFRESH_MS, DEFAULT_BUF_SIZE, DEFAULT_CHAR_SET); | |||||
public FileRefreshableDataSource(File file, Converter<String, T> configParser) throws FileNotFoundException { | |||||
this(file, configParser, DEFAULT_REFRESH_MS, DEFAULT_BUF_SIZE, DEFAULT_CHAR_SET); | |||||
} | } | ||||
public FileRefreshableDataSource(String fileName, Converter<String, T> configParser, Converter<T, String> configEncoder) | |||||
public FileRefreshableDataSource(String fileName, Converter<String, T> configParser) | |||||
throws FileNotFoundException { | throws FileNotFoundException { | ||||
this(new File(fileName), configParser, configEncoder, DEFAULT_REFRESH_MS, DEFAULT_BUF_SIZE, DEFAULT_CHAR_SET); | |||||
this(new File(fileName), configParser, DEFAULT_REFRESH_MS, DEFAULT_BUF_SIZE, DEFAULT_CHAR_SET); | |||||
} | } | ||||
public FileRefreshableDataSource(File file, Converter<String, T> configParser, Converter<T, String> configEncoder, int bufSize) | |||||
public FileRefreshableDataSource(File file, Converter<String, T> configParser, int bufSize) | |||||
throws FileNotFoundException { | throws FileNotFoundException { | ||||
this(file, configParser, configEncoder, DEFAULT_REFRESH_MS, bufSize, DEFAULT_CHAR_SET); | |||||
this(file, configParser, DEFAULT_REFRESH_MS, bufSize, DEFAULT_CHAR_SET); | |||||
} | } | ||||
public FileRefreshableDataSource(File file, Converter<String, T> configParser, Converter<T, String> configEncoder, Charset charset) | |||||
public FileRefreshableDataSource(File file, Converter<String, T> configParser, Charset charset) | |||||
throws FileNotFoundException { | throws FileNotFoundException { | ||||
this(file, configParser, configEncoder, DEFAULT_REFRESH_MS, DEFAULT_BUF_SIZE, charset); | |||||
this(file, configParser, DEFAULT_REFRESH_MS, DEFAULT_BUF_SIZE, charset); | |||||
} | } | ||||
public FileRefreshableDataSource(File file, Converter<String, T> configParser, Converter<T, String> configEncoder, long recommendRefreshMs, | |||||
public FileRefreshableDataSource(File file, Converter<String, T> configParser, long recommendRefreshMs, | |||||
int bufSize, Charset charset) throws FileNotFoundException { | int bufSize, Charset charset) throws FileNotFoundException { | ||||
super(configParser, recommendRefreshMs); | super(configParser, recommendRefreshMs); | ||||
if (bufSize <= 0 || bufSize > MAX_SIZE) { | if (bufSize <= 0 || bufSize > MAX_SIZE) { | ||||
@@ -86,13 +84,9 @@ public class FileRefreshableDataSource<T> extends AutoRefreshDataSource<String, | |||||
if (charset == null) { | if (charset == null) { | ||||
throw new IllegalArgumentException("charset can't be null"); | throw new IllegalArgumentException("charset can't be null"); | ||||
} | } | ||||
if (configEncoder == null) { | |||||
throw new IllegalArgumentException("Config encoder cannot be null"); | |||||
} | |||||
this.buf = new byte[bufSize]; | this.buf = new byte[bufSize]; | ||||
this.file = file; | this.file = file; | ||||
this.charset = charset; | this.charset = charset; | ||||
this.configEncoder = configEncoder; | |||||
firstLoad(); | firstLoad(); | ||||
} | } | ||||
@@ -132,9 +126,4 @@ public class FileRefreshableDataSource<T> extends AutoRefreshDataSource<String, | |||||
super.close(); | super.close(); | ||||
buf = null; | buf = null; | ||||
} | } | ||||
@Override | |||||
public void write(T value) throws Exception { | |||||
throw new UnsupportedOperationException("Not implemented"); | |||||
} | |||||
} | } |
@@ -0,0 +1,56 @@ | |||||
/* | |||||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||||
* | |||||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||||
* you may not use this file except in compliance with the License. | |||||
* You may obtain a copy of the License at | |||||
* | |||||
* http://www.apache.org/licenses/LICENSE-2.0 | |||||
* | |||||
* Unless required by applicable law or agreed to in writing, software | |||||
* distributed under the License is distributed on an "AS IS" BASIS, | |||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
* See the License for the specific language governing permissions and | |||||
* limitations under the License. | |||||
*/ | |||||
package com.alibaba.csp.sentinel.datasource; | |||||
import java.io.File; | |||||
/** | |||||
* A {@link WritableDataSource} based on file. | |||||
* | |||||
* @param <T> data type | |||||
* @author Eric Zhao | |||||
* @since 0.2.0 | |||||
*/ | |||||
public class FileWritableDataSource<T> implements WritableDataSource<T> { | |||||
private final Converter<T, String> configEncoder; | |||||
private File file; | |||||
public FileWritableDataSource(String filePath, Converter<T, String> configEncoder) { | |||||
this(new File(filePath), configEncoder); | |||||
} | |||||
public FileWritableDataSource(File file, Converter<T, String> configEncoder) { | |||||
if (file == null || file.isDirectory()) { | |||||
throw new IllegalArgumentException("Bad file"); | |||||
} | |||||
if (configEncoder == null) { | |||||
throw new IllegalArgumentException("Config encoder cannot be null"); | |||||
} | |||||
this.configEncoder = configEncoder; | |||||
this.file = file; | |||||
} | |||||
@Override | |||||
public void write(T value) throws Exception { | |||||
throw new UnsupportedOperationException("Not implemented"); | |||||
} | |||||
@Override | |||||
public void close() throws Exception { | |||||
// Nothing | |||||
} | |||||
} |
@@ -30,4 +30,11 @@ public interface WritableDataSource<T> { | |||||
* @throws Exception IO or other error occurs | * @throws Exception IO or other error occurs | ||||
*/ | */ | ||||
void write(T value) throws Exception; | void write(T value) throws Exception; | ||||
/** | |||||
* Close the data source. | |||||
* | |||||
* @throws Exception IO or other error occurs | |||||
*/ | |||||
void close() throws Exception; | |||||
} | } |
@@ -35,6 +35,8 @@ import com.alibaba.csp.sentinel.slots.system.SystemRuleManager; | |||||
import com.alibaba.csp.sentinel.slots.system.SystemRule; | import com.alibaba.csp.sentinel.slots.system.SystemRule; | ||||
import com.alibaba.fastjson.JSONArray; | import com.alibaba.fastjson.JSONArray; | ||||
import static com.alibaba.csp.sentinel.transport.util.WritableDataSourceRegistry.*; | |||||
/** | /** | ||||
* @author jialiang.linjl | * @author jialiang.linjl | ||||
* @author Eric Zhao | * @author Eric Zhao | ||||
@@ -42,27 +44,6 @@ import com.alibaba.fastjson.JSONArray; | |||||
@CommandMapping(name = "setRules") | @CommandMapping(name = "setRules") | ||||
public class ModifyRulesCommandHandler implements CommandHandler<String> { | public class ModifyRulesCommandHandler implements CommandHandler<String> { | ||||
private static WritableDataSource<List<FlowRule>> flowDataSource = null; | |||||
private static WritableDataSource<List<AuthorityRule>> authorityDataSource = null; | |||||
private static WritableDataSource<List<DegradeRule>> degradeDataSource = null; | |||||
private static WritableDataSource<List<SystemRule>> systemSource = null; | |||||
public static synchronized void registerFlowDataSource(WritableDataSource<List<FlowRule>> datasource) { | |||||
flowDataSource = datasource; | |||||
} | |||||
public static synchronized void registerAuthorityDataSource(WritableDataSource<List<AuthorityRule>> dataSource) { | |||||
authorityDataSource = dataSource; | |||||
} | |||||
public static synchronized void registerDegradeDataSource(WritableDataSource<List<DegradeRule>> dataSource) { | |||||
degradeDataSource = dataSource; | |||||
} | |||||
public static synchronized void registerSystemDataSource(WritableDataSource<List<SystemRule>> dataSource) { | |||||
systemSource = dataSource; | |||||
} | |||||
@Override | @Override | ||||
public CommandResponse<String> handle(CommandRequest request) { | public CommandResponse<String> handle(CommandRequest request) { | ||||
String type = request.getParam("type"); | String type = request.getParam("type"); | ||||
@@ -84,28 +65,28 @@ public class ModifyRulesCommandHandler implements CommandHandler<String> { | |||||
if (FLOW_RULE_TYPE.equalsIgnoreCase(type)) { | if (FLOW_RULE_TYPE.equalsIgnoreCase(type)) { | ||||
List<FlowRule> flowRules = JSONArray.parseArray(data, FlowRule.class); | List<FlowRule> flowRules = JSONArray.parseArray(data, FlowRule.class); | ||||
FlowRuleManager.loadRules(flowRules); | FlowRuleManager.loadRules(flowRules); | ||||
if (!writeToDataSource(flowDataSource, flowRules)) { | |||||
if (!writeToDataSource(getFlowDataSource(), flowRules)) { | |||||
result = WRITE_DS_FAILURE_MSG; | result = WRITE_DS_FAILURE_MSG; | ||||
} | } | ||||
return CommandResponse.ofSuccess(result); | return CommandResponse.ofSuccess(result); | ||||
} else if (AUTHORITY_RULE_TYPE.equalsIgnoreCase(type)) { | } else if (AUTHORITY_RULE_TYPE.equalsIgnoreCase(type)) { | ||||
List<AuthorityRule> rules = JSONArray.parseArray(data, AuthorityRule.class); | List<AuthorityRule> rules = JSONArray.parseArray(data, AuthorityRule.class); | ||||
AuthorityRuleManager.loadRules(rules); | AuthorityRuleManager.loadRules(rules); | ||||
if (!writeToDataSource(authorityDataSource, rules)) { | |||||
if (!writeToDataSource(getAuthorityDataSource(), rules)) { | |||||
result = WRITE_DS_FAILURE_MSG; | result = WRITE_DS_FAILURE_MSG; | ||||
} | } | ||||
return CommandResponse.ofSuccess(result); | return CommandResponse.ofSuccess(result); | ||||
} else if (DEGRADE_RULE_TYPE.equalsIgnoreCase(type)) { | } else if (DEGRADE_RULE_TYPE.equalsIgnoreCase(type)) { | ||||
List<DegradeRule> rules = JSONArray.parseArray(data, DegradeRule.class); | List<DegradeRule> rules = JSONArray.parseArray(data, DegradeRule.class); | ||||
DegradeRuleManager.loadRules(rules); | DegradeRuleManager.loadRules(rules); | ||||
if (!writeToDataSource(degradeDataSource, rules)) { | |||||
if (!writeToDataSource(getDegradeDataSource(), rules)) { | |||||
result = WRITE_DS_FAILURE_MSG; | result = WRITE_DS_FAILURE_MSG; | ||||
} | } | ||||
return CommandResponse.ofSuccess(result); | return CommandResponse.ofSuccess(result); | ||||
} else if (SYSTEM_RULE_TYPE.equalsIgnoreCase(type)) { | } else if (SYSTEM_RULE_TYPE.equalsIgnoreCase(type)) { | ||||
List<SystemRule> rules = JSONArray.parseArray(data, SystemRule.class); | List<SystemRule> rules = JSONArray.parseArray(data, SystemRule.class); | ||||
SystemRuleManager.loadRules(rules); | SystemRuleManager.loadRules(rules); | ||||
if (!writeToDataSource(systemSource, rules)) { | |||||
if (!writeToDataSource(getSystemSource(), rules)) { | |||||
result = WRITE_DS_FAILURE_MSG; | result = WRITE_DS_FAILURE_MSG; | ||||
} | } | ||||
return CommandResponse.ofSuccess(result); | return CommandResponse.ofSuccess(result); | ||||
@@ -0,0 +1,56 @@ | |||||
package com.alibaba.csp.sentinel.transport.util; | |||||
import java.util.List; | |||||
import com.alibaba.csp.sentinel.datasource.WritableDataSource; | |||||
import com.alibaba.csp.sentinel.slots.block.authority.AuthorityRule; | |||||
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule; | |||||
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; | |||||
import com.alibaba.csp.sentinel.slots.system.SystemRule; | |||||
/** | |||||
* Writable data source registry for modifying rules via HTTP API. | |||||
* | |||||
* @author Eric Zhao | |||||
*/ | |||||
public final class WritableDataSourceRegistry { | |||||
private static WritableDataSource<List<FlowRule>> flowDataSource = null; | |||||
private static WritableDataSource<List<AuthorityRule>> authorityDataSource = null; | |||||
private static WritableDataSource<List<DegradeRule>> degradeDataSource = null; | |||||
private static WritableDataSource<List<SystemRule>> systemSource = null; | |||||
public static synchronized void registerFlowDataSource(WritableDataSource<List<FlowRule>> datasource) { | |||||
flowDataSource = datasource; | |||||
} | |||||
public static synchronized void registerAuthorityDataSource(WritableDataSource<List<AuthorityRule>> dataSource) { | |||||
authorityDataSource = dataSource; | |||||
} | |||||
public static synchronized void registerDegradeDataSource(WritableDataSource<List<DegradeRule>> dataSource) { | |||||
degradeDataSource = dataSource; | |||||
} | |||||
public static synchronized void registerSystemDataSource(WritableDataSource<List<SystemRule>> dataSource) { | |||||
systemSource = dataSource; | |||||
} | |||||
public static WritableDataSource<List<FlowRule>> getFlowDataSource() { | |||||
return flowDataSource; | |||||
} | |||||
public static WritableDataSource<List<AuthorityRule>> getAuthorityDataSource() { | |||||
return authorityDataSource; | |||||
} | |||||
public static WritableDataSource<List<DegradeRule>> getDegradeDataSource() { | |||||
return degradeDataSource; | |||||
} | |||||
public static WritableDataSource<List<SystemRule>> getSystemSource() { | |||||
return systemSource; | |||||
} | |||||
private WritableDataSourceRegistry() {} | |||||
} |