|
@@ -103,17 +103,16 @@ public class ZookeeperDataSource<T> extends AbstractDataSource<String, T> { |
|
|
this.listener = new NodeCacheListener() { |
|
|
this.listener = new NodeCacheListener() { |
|
|
@Override |
|
|
@Override |
|
|
public void nodeChanged() { |
|
|
public void nodeChanged() { |
|
|
String configInfo = null; |
|
|
|
|
|
ChildData childData = nodeCache.getCurrentData(); |
|
|
|
|
|
if (null != childData && childData.getData() != null) { |
|
|
|
|
|
|
|
|
|
|
|
configInfo = new String(childData.getData()); |
|
|
|
|
|
|
|
|
try { |
|
|
|
|
|
T newValue = loadConfig(); |
|
|
|
|
|
RecordLog.info(String.format("[ZookeeperDataSource] New property value received for (%s, %s): %s", |
|
|
|
|
|
serverAddr, path, newValue)); |
|
|
|
|
|
// Update the new value to the property. |
|
|
|
|
|
getProperty().updateValue(newValue); |
|
|
|
|
|
} catch (Exception ex) { |
|
|
|
|
|
RecordLog.warn("[ZookeeperDataSource] loadConfig exception", ex); |
|
|
} |
|
|
} |
|
|
RecordLog.info(String.format("[ZookeeperDataSource] New property value received for (%s, %s): %s", |
|
|
|
|
|
serverAddr, path, configInfo)); |
|
|
|
|
|
T newValue = ZookeeperDataSource.this.parser.convert(configInfo); |
|
|
|
|
|
// Update the new value to the property. |
|
|
|
|
|
getProperty().updateValue(newValue); |
|
|
|
|
|
} |
|
|
} |
|
|
}; |
|
|
}; |
|
|
|
|
|
|
|
@@ -127,10 +126,6 @@ public class ZookeeperDataSource<T> extends AbstractDataSource<String, T> { |
|
|
build(); |
|
|
build(); |
|
|
} |
|
|
} |
|
|
this.zkClient.start(); |
|
|
this.zkClient.start(); |
|
|
Stat stat = this.zkClient.checkExists().forPath(this.path); |
|
|
|
|
|
if (stat == null) { |
|
|
|
|
|
this.zkClient.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath(this.path, null); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
this.nodeCache = new NodeCache(this.zkClient, this.path); |
|
|
this.nodeCache = new NodeCache(this.zkClient, this.path); |
|
|
this.nodeCache.getListenable().addListener(this.listener, this.pool); |
|
|
this.nodeCache.getListenable().addListener(this.listener, this.pool); |
|
@@ -146,11 +141,13 @@ public class ZookeeperDataSource<T> extends AbstractDataSource<String, T> { |
|
|
if (this.zkClient == null) { |
|
|
if (this.zkClient == null) { |
|
|
throw new IllegalStateException("Zookeeper has not been initialized or error occurred"); |
|
|
throw new IllegalStateException("Zookeeper has not been initialized or error occurred"); |
|
|
} |
|
|
} |
|
|
byte[] data = this.zkClient.getData().forPath(this.path); |
|
|
|
|
|
if (data != null) { |
|
|
|
|
|
return new String(data); |
|
|
|
|
|
|
|
|
String configInfo = null; |
|
|
|
|
|
ChildData childData = nodeCache.getCurrentData(); |
|
|
|
|
|
if (null != childData && childData.getData() != null) { |
|
|
|
|
|
|
|
|
|
|
|
configInfo = new String(childData.getData()); |
|
|
} |
|
|
} |
|
|
return null; |
|
|
|
|
|
|
|
|
return configInfo; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
@Override |
|
|
@Override |
|
|