diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/config/SentinelConfig.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/config/SentinelConfig.java index 2c371253..2874c1e1 100755 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/config/SentinelConfig.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/config/SentinelConfig.java @@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentHashMap; import com.alibaba.csp.sentinel.log.LogBase; import com.alibaba.csp.sentinel.log.RecordLog; import com.alibaba.csp.sentinel.util.AppNameUtil; +import com.alibaba.csp.sentinel.util.AssertUtil; import com.alibaba.csp.sentinel.util.StringUtil; /** @@ -102,14 +103,24 @@ public class SentinelConfig { * @return the config value. */ public static String getConfig(String key) { + AssertUtil.notNull(key, "key cannot be null"); return props.get(key); } public static void setConfig(String key, String value) { + AssertUtil.notNull(key, "key cannot be null"); + AssertUtil.notNull(value, "value cannot be null"); props.put(key, value); } + public static String removeConfig(String key) { + AssertUtil.notNull(key, "key cannot be null"); + return props.remove(key); + } + public static void setConfigIfAbsent(String key, String value) { + AssertUtil.notNull(key, "key cannot be null"); + AssertUtil.notNull(value, "value cannot be null"); String v = props.get(key); if (v == null) { props.put(key, value); diff --git a/sentinel-transport/sentinel-transport-common/pom.xml b/sentinel-transport/sentinel-transport-common/pom.xml index 497bdf06..0b484dbd 100755 --- a/sentinel-transport/sentinel-transport-common/pom.xml +++ b/sentinel-transport/sentinel-transport-common/pom.xml @@ -20,10 +20,16 @@ com.alibaba fastjson + junit junit test + + org.mockito + mockito-core + test + \ No newline at end of file diff --git a/sentinel-transport/sentinel-transport-common/src/main/java/com/alibaba/csp/sentinel/transport/HeartbeatSender.java b/sentinel-transport/sentinel-transport-common/src/main/java/com/alibaba/csp/sentinel/transport/HeartbeatSender.java index b7eec31f..4dd52fd8 100755 --- a/sentinel-transport/sentinel-transport-common/src/main/java/com/alibaba/csp/sentinel/transport/HeartbeatSender.java +++ b/sentinel-transport/sentinel-transport-common/src/main/java/com/alibaba/csp/sentinel/transport/HeartbeatSender.java @@ -16,8 +16,8 @@ package com.alibaba.csp.sentinel.transport; /** - * Heartbeat interface. Sentinel core is responsible for invoking {@link #sendHeartbeat()} - * at every {@link #intervalMs()} interval. + * The heartbeat sender which is responsible for sending heartbeat to remote dashboard + * periodically per {@code interval}. * * @author leyou * @author Eric Zhao @@ -30,14 +30,15 @@ public interface HeartbeatSender { * at every {@link #intervalMs()} interval. * * @return whether heartbeat is successfully send. - * @throws Exception + * @throws Exception if error occurs */ boolean sendHeartbeat() throws Exception; /** - * Millisecond interval of every {@link #sendHeartbeat()} + * Default interval in milliseconds of the sender. It would take effect only when + * the heartbeat interval is not configured in Sentinel config property. * - * @return millisecond interval. + * @return default interval of the sender in milliseconds */ long intervalMs(); } diff --git a/sentinel-transport/sentinel-transport-common/src/main/java/com/alibaba/csp/sentinel/transport/config/TransportConfig.java b/sentinel-transport/sentinel-transport-common/src/main/java/com/alibaba/csp/sentinel/transport/config/TransportConfig.java index ab150175..84b53cbd 100755 --- a/sentinel-transport/sentinel-transport-common/src/main/java/com/alibaba/csp/sentinel/transport/config/TransportConfig.java +++ b/sentinel-transport/sentinel-transport-common/src/main/java/com/alibaba/csp/sentinel/transport/config/TransportConfig.java @@ -16,6 +16,7 @@ package com.alibaba.csp.sentinel.transport.config; import com.alibaba.csp.sentinel.config.SentinelConfig; +import com.alibaba.csp.sentinel.log.RecordLog; import com.alibaba.csp.sentinel.util.HostNameUtil; import com.alibaba.csp.sentinel.util.StringUtil; @@ -31,9 +32,19 @@ public class TransportConfig { private static int runtimePort = -1; + /** + * Get heartbeat interval in milliseconds. + * + * @return heartbeat interval in milliseconds if exists, or null if not configured or invalid config + */ public static Long getHeartbeatIntervalMs() { String interval = SentinelConfig.getConfig(HEARTBEAT_INTERVAL_MS); - return interval == null ? null : Long.parseLong(interval); + try { + return interval == null ? null : Long.parseLong(interval); + } catch (Exception ex) { + RecordLog.warn("[TransportConfig] Failed to parse heartbeat interval: " + interval); + return null; + } } /** diff --git a/sentinel-transport/sentinel-transport-common/src/main/java/com/alibaba/csp/sentinel/transport/init/HeartbeatSenderInitFunc.java b/sentinel-transport/sentinel-transport-common/src/main/java/com/alibaba/csp/sentinel/transport/init/HeartbeatSenderInitFunc.java index c3206edc..f0cf75ce 100755 --- a/sentinel-transport/sentinel-transport-common/src/main/java/com/alibaba/csp/sentinel/transport/init/HeartbeatSenderInitFunc.java +++ b/sentinel-transport/sentinel-transport-common/src/main/java/com/alibaba/csp/sentinel/transport/init/HeartbeatSenderInitFunc.java @@ -22,6 +22,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import com.alibaba.csp.sentinel.concurrent.NamedThreadFactory; +import com.alibaba.csp.sentinel.config.SentinelConfig; import com.alibaba.csp.sentinel.init.InitFunc; import com.alibaba.csp.sentinel.log.RecordLog; import com.alibaba.csp.sentinel.transport.HeartbeatSender; @@ -37,15 +38,12 @@ public class HeartbeatSenderInitFunc implements InitFunc { private static ScheduledExecutorService pool = Executors.newScheduledThreadPool(2, new NamedThreadFactory("sentinel-heartbeat-send-task", true)); + private boolean validHeartbeatInterval(Long interval) { + return interval != null && interval > 0; + } + @Override - public void init() throws Exception { - long heartBeatInterval = -1; - try { - heartBeatInterval = TransportConfig.getHeartbeatIntervalMs(); - RecordLog.info("system property heartbeat interval set: " + heartBeatInterval); - } catch (Exception ex) { - RecordLog.info("Parse heartbeat interval failed, use that in code, " + ex.getMessage()); - } + public void init() { ServiceLoader loader = ServiceLoader.load(HeartbeatSender.class); Iterator iterator = loader.iterator(); if (iterator.hasNext()) { @@ -53,24 +51,42 @@ public class HeartbeatSenderInitFunc implements InitFunc { if (iterator.hasNext()) { throw new IllegalStateException("Only single heartbeat sender can be scheduled"); } else { - long interval = sender.intervalMs(); - if (heartBeatInterval != -1) { - interval = heartBeatInterval; - } - pool.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - try { - sender.sendHeartbeat(); - } catch (Throwable e) { - e.printStackTrace(); - RecordLog.info("[HeartbeatSender] Send heartbeat error", e); - } - } - }, 10000, interval, TimeUnit.MILLISECONDS); - RecordLog.info("[HeartbeatSenderInit] HeartbeatSender started: " - + sender.getClass().getCanonicalName()); + long interval = retrieveInterval(sender); + setIntervalIfNotExists(interval); + scheduleHeartbeatTask(sender, interval); } } } + + private void setIntervalIfNotExists(long interval) { + SentinelConfig.setConfig(TransportConfig.HEARTBEAT_INTERVAL_MS, String.valueOf(interval)); + } + + long retrieveInterval(/*@NonNull*/ HeartbeatSender sender) { + Long intervalInConfig = TransportConfig.getHeartbeatIntervalMs(); + if (validHeartbeatInterval(intervalInConfig)) { + RecordLog.info("[HeartbeatSenderInit] Using heartbeat interval in Sentinel config property: " + intervalInConfig); + return intervalInConfig; + } else { + long senderInterval = sender.intervalMs(); + RecordLog.info("[HeartbeatSenderInit] Heartbeat interval not configured in config property or invalid, " + + "using sender default: " + senderInterval); + return senderInterval; + } + } + + private void scheduleHeartbeatTask(/*@NonNull*/ final HeartbeatSender sender, /*@Valid*/ long interval) { + pool.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + sender.sendHeartbeat(); + } catch (Throwable e) { + RecordLog.warn("[HeartbeatSender] Send heartbeat error", e); + } + } + }, 5000, interval, TimeUnit.MILLISECONDS); + RecordLog.info("[HeartbeatSenderInit] HeartbeatSender started: " + + sender.getClass().getCanonicalName()); + } } diff --git a/sentinel-transport/sentinel-transport-common/src/test/java/com/alibaba/csp/sentinel/transport/config/TransportConfigTest.java b/sentinel-transport/sentinel-transport-common/src/test/java/com/alibaba/csp/sentinel/transport/config/TransportConfigTest.java index d35c77fa..d77a88b7 100644 --- a/sentinel-transport/sentinel-transport-common/src/test/java/com/alibaba/csp/sentinel/transport/config/TransportConfigTest.java +++ b/sentinel-transport/sentinel-transport-common/src/test/java/com/alibaba/csp/sentinel/transport/config/TransportConfigTest.java @@ -1,25 +1,67 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.alibaba.csp.sentinel.transport.config; import com.alibaba.csp.sentinel.config.SentinelConfig; +import com.alibaba.csp.sentinel.util.StringUtil; + +import org.junit.After; +import org.junit.Before; import org.junit.Test; import static org.junit.Assert.*; public class TransportConfigTest { + @Before + public void setUp() throws Exception { + SentinelConfig.removeConfig(TransportConfig.HEARTBEAT_INTERVAL_MS); + SentinelConfig.removeConfig(TransportConfig.HEARTBEAT_CLIENT_IP); + } + + @After + public void tearDown() throws Exception { + SentinelConfig.removeConfig(TransportConfig.HEARTBEAT_INTERVAL_MS); + SentinelConfig.removeConfig(TransportConfig.HEARTBEAT_CLIENT_IP); + } + @Test - public void getClientIp() { - //config heartbeat client ip - System.setProperty(TransportConfig.HEARTBEAT_CLIENT_IP, "10.10.10.10"); + public void testGetHeartbeatInterval() { + long interval = 20000; + assertNull(TransportConfig.getHeartbeatIntervalMs()); + // Set valid interval. + SentinelConfig.setConfig(TransportConfig.HEARTBEAT_INTERVAL_MS, String.valueOf(interval)); + assertEquals(new Long(interval), TransportConfig.getHeartbeatIntervalMs()); + // Set invalid interval. + SentinelConfig.setConfig(TransportConfig.HEARTBEAT_INTERVAL_MS, "Sentinel"); + assertNull(TransportConfig.getHeartbeatIntervalMs()); + } + + @Test + public void testGetHeartbeatClientIp() { + String clientIp = "10.10.10.10"; + SentinelConfig.setConfig(TransportConfig.HEARTBEAT_CLIENT_IP, clientIp); + // Set heartbeat client ip to system property. String ip = TransportConfig.getHeartbeatClientIp(); assertNotNull(ip); - assertEquals(ip, "10.10.10.10"); + assertEquals(clientIp, ip); - //no heartbeat client ip + // Set no heartbeat client ip. SentinelConfig.setConfig(TransportConfig.HEARTBEAT_CLIENT_IP, ""); - ip = TransportConfig.getHeartbeatClientIp(); - assertNotNull(ip); - + assertTrue(StringUtil.isNotEmpty(TransportConfig.getHeartbeatClientIp())); } } \ No newline at end of file diff --git a/sentinel-transport/sentinel-transport-common/src/test/java/com/alibaba/csp/sentinel/transport/init/HeartbeatSenderInitFuncTest.java b/sentinel-transport/sentinel-transport-common/src/test/java/com/alibaba/csp/sentinel/transport/init/HeartbeatSenderInitFuncTest.java new file mode 100644 index 00000000..b403f172 --- /dev/null +++ b/sentinel-transport/sentinel-transport-common/src/test/java/com/alibaba/csp/sentinel/transport/init/HeartbeatSenderInitFuncTest.java @@ -0,0 +1,63 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.csp.sentinel.transport.init; + +import com.alibaba.csp.sentinel.config.SentinelConfig; +import com.alibaba.csp.sentinel.transport.HeartbeatSender; +import com.alibaba.csp.sentinel.transport.config.TransportConfig; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +/** + * @author Eric Zhao + */ +public class HeartbeatSenderInitFuncTest { + + @Before + public void setUp() throws Exception { + SentinelConfig.removeConfig(TransportConfig.HEARTBEAT_INTERVAL_MS); + } + + @After + public void tearDown() throws Exception { + SentinelConfig.removeConfig(TransportConfig.HEARTBEAT_INTERVAL_MS); + } + + @Test + public void testRetrieveInterval() { + HeartbeatSender sender = mock(HeartbeatSender.class); + + long senderInterval = 5666; + long configInterval = 6777; + + when(sender.intervalMs()).thenReturn(senderInterval); + + HeartbeatSenderInitFunc func = new HeartbeatSenderInitFunc(); + assertEquals(senderInterval, func.retrieveInterval(sender)); + + // Invalid interval. + SentinelConfig.setConfig(TransportConfig.HEARTBEAT_INTERVAL_MS, "-1"); + assertEquals(senderInterval, func.retrieveInterval(sender)); + + SentinelConfig.setConfig(TransportConfig.HEARTBEAT_INTERVAL_MS, String.valueOf(configInterval)); + assertEquals(configInterval, func.retrieveInterval(sender)); + } +} \ No newline at end of file diff --git a/sentinel-transport/sentinel-transport-netty-http/src/main/java/com/alibaba/csp/sentinel/transport/heartbeat/HttpHeartbeatSender.java b/sentinel-transport/sentinel-transport-netty-http/src/main/java/com/alibaba/csp/sentinel/transport/heartbeat/HttpHeartbeatSender.java index 090cd09b..96cf4327 100755 --- a/sentinel-transport/sentinel-transport-netty-http/src/main/java/com/alibaba/csp/sentinel/transport/heartbeat/HttpHeartbeatSender.java +++ b/sentinel-transport/sentinel-transport-netty-http/src/main/java/com/alibaba/csp/sentinel/transport/heartbeat/HttpHeartbeatSender.java @@ -15,6 +15,9 @@ */ package com.alibaba.csp.sentinel.transport.heartbeat; +import java.util.ArrayList; +import java.util.List; + import com.alibaba.csp.sentinel.Constants; import com.alibaba.csp.sentinel.transport.config.TransportConfig; import com.alibaba.csp.sentinel.log.RecordLog; @@ -23,6 +26,7 @@ import com.alibaba.csp.sentinel.util.HostNameUtil; import com.alibaba.csp.sentinel.transport.HeartbeatSender; import com.alibaba.csp.sentinel.util.PidUtil; import com.alibaba.csp.sentinel.util.StringUtil; +import com.alibaba.csp.sentinel.util.function.Tuple2; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.CloseableHttpResponse; @@ -51,24 +55,48 @@ public class HttpHeartbeatSender implements HeartbeatSender { public HttpHeartbeatSender() { this.client = HttpClients.createDefault(); - String consoleServer = TransportConfig.getConsoleServer(); - if (StringUtil.isEmpty(consoleServer)) { - RecordLog.info("[Heartbeat] Console server address is not configured!"); + List> dashboardList = parseDashboardList(); + if (dashboardList == null || dashboardList.isEmpty()) { + RecordLog.info("[NettyHttpHeartbeatSender] No dashboard available"); } else { - String consoleHost = consoleServer; - int consolePort = 80; - if (consoleServer.contains(",")) { - consoleHost = consoleServer.split(",")[0]; + consoleHost = dashboardList.get(0).r1; + consolePort = dashboardList.get(0).r2; + RecordLog.info("[NettyHttpHeartbeatSender] Dashboard address parsed: <" + consoleHost + ':' + consolePort + ">"); + } + } + + private List> parseDashboardList() { + List> list = new ArrayList>(); + try { + String ipsStr = TransportConfig.getConsoleServer(); + if (StringUtil.isBlank(ipsStr)) { + RecordLog.warn("[NettyHttpHeartbeatSender] Dashboard server address is not configured"); + return list; } - if (consoleHost.contains(":")) { - String[] strs = consoleServer.split(":"); - consoleHost = strs[0]; - consolePort = Integer.parseInt(strs[1]); + + for (String ipPortStr : ipsStr.split(",")) { + if (ipPortStr.trim().length() == 0) { + continue; + } + ipPortStr = ipPortStr.trim(); + if (ipPortStr.startsWith("http://")) { + ipPortStr = ipPortStr.substring(7); + } + if (ipPortStr.startsWith(":")) { + continue; + } + String[] ipPort = ipPortStr.trim().split(":"); + int port = 8080; + if (ipPort.length > 1) { + port = Integer.parseInt(ipPort[1].trim()); + } + list.add(Tuple2.of(ipPort[0].trim(), port)); } - this.consoleHost = consoleHost; - this.consolePort = consolePort; + } catch (Exception ex) { + RecordLog.warn("[NettyHttpHeartbeatSender] Parse dashboard list failed, current address list: " + list, ex); + ex.printStackTrace(); } - + return list; } @Override @@ -76,8 +104,6 @@ public class HttpHeartbeatSender implements HeartbeatSender { if (StringUtil.isEmpty(consoleHost)) { return false; } - RecordLog.info(String.format("[Heartbeat] Sending heartbeat to %s:%d", consoleHost, consolePort)); - URIBuilder uriBuilder = new URIBuilder(); uriBuilder.setScheme("http").setHost(consoleHost).setPort(consolePort) .setPath("/registry/machine") diff --git a/sentinel-transport/sentinel-transport-simple-http/src/main/java/com/alibaba/csp/sentinel/transport/heartbeat/SimpleHttpHeartbeatSender.java b/sentinel-transport/sentinel-transport-simple-http/src/main/java/com/alibaba/csp/sentinel/transport/heartbeat/SimpleHttpHeartbeatSender.java index 2ac4ffe1..a230c125 100755 --- a/sentinel-transport/sentinel-transport-simple-http/src/main/java/com/alibaba/csp/sentinel/transport/heartbeat/SimpleHttpHeartbeatSender.java +++ b/sentinel-transport/sentinel-transport-simple-http/src/main/java/com/alibaba/csp/sentinel/transport/heartbeat/SimpleHttpHeartbeatSender.java @@ -19,7 +19,6 @@ import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; -import com.alibaba.csp.sentinel.config.SentinelConfig; import com.alibaba.csp.sentinel.log.RecordLog; import com.alibaba.csp.sentinel.transport.HeartbeatSender; import com.alibaba.csp.sentinel.transport.config.TransportConfig; @@ -54,9 +53,6 @@ public class SimpleHttpHeartbeatSender implements HeartbeatSender { List newAddrs = getDefaultConsoleIps(); RecordLog.info("[SimpleHttpHeartbeatSender] Default console address list retrieved: " + newAddrs); this.addressList = newAddrs; - // Set interval config. - String interval = System.getProperty(TransportConfig.HEARTBEAT_INTERVAL_MS, String.valueOf(DEFAULT_INTERVAL)); - SentinelConfig.setConfig(TransportConfig.HEARTBEAT_INTERVAL_MS, interval); } @Override @@ -78,7 +74,7 @@ public class SimpleHttpHeartbeatSender implements HeartbeatSender { return true; } } catch (Exception e) { - RecordLog.info("Failed to send heart beat to " + addr + " : ", e); + RecordLog.warn("[SimpleHttpHeartbeatSender] Failed to send heartbeat to " + addr + " : ", e); } return false; } @@ -104,6 +100,7 @@ public class SimpleHttpHeartbeatSender implements HeartbeatSender { try { String ipsStr = TransportConfig.getConsoleServer(); if (StringUtil.isEmpty(ipsStr)) { + RecordLog.warn("[NettyHttpHeartbeatSender] Dashboard server address not configured"); return newAddrs; } @@ -122,7 +119,7 @@ public class SimpleHttpHeartbeatSender implements HeartbeatSender { newAddrs.add(new InetSocketAddress(ipPort[0].trim(), port)); } } catch (Exception ex) { - RecordLog.info("[SimpleHeartbeatSender] Parse console list failed, current address list: " + newAddrs, ex); + RecordLog.warn("[SimpleHeartbeatSender] Parse dashboard list failed, current address list: " + newAddrs, ex); ex.printStackTrace(); } return newAddrs;