|
|
@@ -0,0 +1,134 @@ |
|
|
|
/* |
|
|
|
* Copyright 1999-2018 Alibaba Group Holding Ltd. |
|
|
|
* |
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
|
|
* you may not use this file except in compliance with the License. |
|
|
|
* You may obtain a copy of the License at |
|
|
|
* |
|
|
|
* http://www.apache.org/licenses/LICENSE-2.0 |
|
|
|
* |
|
|
|
* Unless required by applicable law or agreed to in writing, software |
|
|
|
* distributed under the License is distributed on an "AS IS" BASIS, |
|
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
|
|
* See the License for the specific language governing permissions and |
|
|
|
* limitations under the License. |
|
|
|
*/ |
|
|
|
package com.alibaba.csp.sentinel.datasource.nacos; |
|
|
|
|
|
|
|
import java.util.concurrent.ArrayBlockingQueue; |
|
|
|
import java.util.concurrent.Executor; |
|
|
|
import java.util.concurrent.ExecutorService; |
|
|
|
import java.util.concurrent.ThreadPoolExecutor; |
|
|
|
import java.util.concurrent.TimeUnit; |
|
|
|
|
|
|
|
import com.alibaba.csp.sentinel.concurrent.NamedThreadFactory; |
|
|
|
import com.alibaba.csp.sentinel.datasource.AbstractDataSource; |
|
|
|
import com.alibaba.csp.sentinel.datasource.ConfigParser; |
|
|
|
import com.alibaba.csp.sentinel.datasource.DataSource; |
|
|
|
import com.alibaba.csp.sentinel.log.RecordLog; |
|
|
|
import com.alibaba.csp.sentinel.util.StringUtil; |
|
|
|
import com.alibaba.nacos.api.NacosFactory; |
|
|
|
import com.alibaba.nacos.api.config.ConfigService; |
|
|
|
import com.alibaba.nacos.api.config.listener.Listener; |
|
|
|
|
|
|
|
/** |
|
|
|
* A {@link DataSource} with Nacos backend. When the data in Nacos backend has been modified, |
|
|
|
* Nacos will automatically push the new value so that the dynamic configuration can be real-time. |
|
|
|
* |
|
|
|
* @author Eric Zhao |
|
|
|
*/ |
|
|
|
public class NacosDataSource<T> extends AbstractDataSource<String, T> { |
|
|
|
|
|
|
|
private static final int DEFAULT_TIMEOUT = 3000; |
|
|
|
|
|
|
|
/** |
|
|
|
* Single-thread pool. Once the thread pool is blocked, we throw up the old task. |
|
|
|
*/ |
|
|
|
private final ExecutorService pool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, |
|
|
|
new ArrayBlockingQueue<Runnable>(1), new NamedThreadFactory("sentinel-nacos-ds-update"), |
|
|
|
new ThreadPoolExecutor.DiscardOldestPolicy()); |
|
|
|
|
|
|
|
private final Listener configListener; |
|
|
|
private final String groupId; |
|
|
|
private final String dataId; |
|
|
|
|
|
|
|
/** |
|
|
|
* Note: The Nacos config might be null if its initialization failed. |
|
|
|
*/ |
|
|
|
private ConfigService configService = null; |
|
|
|
|
|
|
|
/** |
|
|
|
* Constructs an DataSource with Nacos backend. |
|
|
|
* |
|
|
|
* @param serverAddr server address of Nacos, cannot be empty |
|
|
|
* @param groupId group ID, cannot be empty |
|
|
|
* @param dataId data ID, cannot be empty |
|
|
|
* @param parser customized data parser, cannot be empty |
|
|
|
*/ |
|
|
|
public NacosDataSource(final String serverAddr, final String groupId, final String dataId, |
|
|
|
ConfigParser<String, T> parser) { |
|
|
|
super(parser); |
|
|
|
if (StringUtil.isBlank(serverAddr) || StringUtil.isBlank(groupId) || StringUtil.isBlank(dataId)) { |
|
|
|
throw new IllegalArgumentException(String.format("Bad argument: serverAddr=[%s], groupId=[%s], dataId=[%s]", |
|
|
|
serverAddr, groupId, dataId)); |
|
|
|
} |
|
|
|
this.groupId = groupId; |
|
|
|
this.dataId = dataId; |
|
|
|
this.configListener = new Listener() { |
|
|
|
@Override |
|
|
|
public Executor getExecutor() { |
|
|
|
return pool; |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void receiveConfigInfo(final String configInfo) { |
|
|
|
RecordLog.info(String.format("[NacosDataSource] New property value received for (%s, %s, %s): %s", |
|
|
|
serverAddr, dataId, groupId, configInfo)); |
|
|
|
T newValue = NacosDataSource.this.parser.parse(configInfo); |
|
|
|
// Update the new value to the property. |
|
|
|
getProperty().updateValue(newValue); |
|
|
|
} |
|
|
|
}; |
|
|
|
initNacosListener(serverAddr); |
|
|
|
loadInitialConfig(); |
|
|
|
} |
|
|
|
|
|
|
|
private void loadInitialConfig() { |
|
|
|
try { |
|
|
|
T newValue = loadConfig(); |
|
|
|
if (newValue == null) { |
|
|
|
RecordLog.info("[NacosDataSource] WARN: initial config is null, you may have to check your data source"); |
|
|
|
} |
|
|
|
getProperty().updateValue(newValue); |
|
|
|
} catch (Exception ex) { |
|
|
|
RecordLog.info("[NacosDataSource] Error when loading initial config", ex); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private void initNacosListener(String serverAddr) { |
|
|
|
try { |
|
|
|
this.configService = NacosFactory.createConfigService(serverAddr); |
|
|
|
// Add config listener. |
|
|
|
configService.addListener(dataId, groupId, configListener); |
|
|
|
} catch (Exception e) { |
|
|
|
RecordLog.info("[NacosDataSource] Error occurred when initializing Nacos data source", e); |
|
|
|
e.printStackTrace(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public String readSource() throws Exception { |
|
|
|
if (configService == null) { |
|
|
|
throw new IllegalStateException("Nacos config service has not been initialized or error occurred"); |
|
|
|
} |
|
|
|
return configService.getConfig(dataId, groupId, DEFAULT_TIMEOUT); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void close() { |
|
|
|
if (configService != null) { |
|
|
|
configService.removeListener(dataId, groupId, configListener); |
|
|
|
} |
|
|
|
pool.shutdownNow(); |
|
|
|
} |
|
|
|
} |