- Abstract a universal `MetricsRepository` interface so that users can implement their own metrics persistence. - Reuse original in-memory implementation (`InMemoryMetricsRepository`) as the default repository. Signed-off-by: Eric Zhao <sczyh16@gmail.com>master
@@ -40,7 +40,7 @@ import com.alibaba.csp.sentinel.util.StringUtil; | |||||
import com.taobao.csp.sentinel.dashboard.datasource.entity.MetricEntity; | import com.taobao.csp.sentinel.dashboard.datasource.entity.MetricEntity; | ||||
import com.taobao.csp.sentinel.dashboard.discovery.AppManagement; | import com.taobao.csp.sentinel.dashboard.discovery.AppManagement; | ||||
import com.taobao.csp.sentinel.dashboard.discovery.MachineInfo; | import com.taobao.csp.sentinel.dashboard.discovery.MachineInfo; | ||||
import com.taobao.csp.sentinel.dashboard.inmem.InMemMetricStore; | |||||
import com.taobao.csp.sentinel.dashboard.repository.metric.MetricsRepository; | |||||
import org.apache.http.HttpResponse; | import org.apache.http.HttpResponse; | ||||
import org.apache.http.client.methods.HttpGet; | import org.apache.http.client.methods.HttpGet; | ||||
import org.apache.http.concurrent.FutureCallback; | import org.apache.http.concurrent.FutureCallback; | ||||
@@ -78,7 +78,7 @@ public class MetricFetcher { | |||||
private Map<String, AtomicLong> appLastFetchTime = new ConcurrentHashMap<>(); | private Map<String, AtomicLong> appLastFetchTime = new ConcurrentHashMap<>(); | ||||
@Autowired | @Autowired | ||||
private InMemMetricStore metricStore; | |||||
private MetricsRepository<MetricEntity> metricStore; | |||||
@Autowired | @Autowired | ||||
private AppManagement appManagement; | private AppManagement appManagement; | ||||
@@ -13,66 +13,77 @@ | |||||
* See the License for the specific language governing permissions and | * See the License for the specific language governing permissions and | ||||
* limitations under the License. | * limitations under the License. | ||||
*/ | */ | ||||
package com.taobao.csp.sentinel.dashboard.inmem; | |||||
package com.taobao.csp.sentinel.dashboard.repository.metric; | |||||
import java.util.ArrayList; | import java.util.ArrayList; | ||||
import java.util.HashMap; | import java.util.HashMap; | ||||
import java.util.LinkedHashMap; | import java.util.LinkedHashMap; | ||||
import java.util.List; | import java.util.List; | ||||
import java.util.Map; | import java.util.Map; | ||||
import java.util.Map.Entry; | |||||
import java.util.concurrent.ConcurrentHashMap; | import java.util.concurrent.ConcurrentHashMap; | ||||
import java.util.stream.Collectors; | import java.util.stream.Collectors; | ||||
import com.alibaba.csp.sentinel.util.StringUtil; | |||||
import com.taobao.csp.sentinel.dashboard.datasource.entity.MetricEntity; | import com.taobao.csp.sentinel.dashboard.datasource.entity.MetricEntity; | ||||
import org.springframework.stereotype.Component; | import org.springframework.stereotype.Component; | ||||
/** | /** | ||||
* Store metrics in memory. | |||||
* Caches metrics data in a period of time in memory. | |||||
* | * | ||||
* @author leyou | |||||
* @author Carpenter Lee | |||||
* @author Eric Zhao | |||||
*/ | */ | ||||
@Component | @Component | ||||
public class InMemMetricStore { | |||||
public static final long MAX_METRIC_LIVE_TIME_MS = 1000 * 60 * 5; | |||||
public class InMemoryMetricsRepository implements MetricsRepository<MetricEntity> { | |||||
private static final long MAX_METRIC_LIVE_TIME_MS = 1000 * 60 * 5; | |||||
/** | /** | ||||
* {@code app -> resource -> timestamp -> metric} | * {@code app -> resource -> timestamp -> metric} | ||||
*/ | */ | ||||
private Map<String, Map<String, LinkedHashMap<Long, MetricEntity>>> allMetrics = new ConcurrentHashMap<>(); | private Map<String, Map<String, LinkedHashMap<Long, MetricEntity>>> allMetrics = new ConcurrentHashMap<>(); | ||||
/** | |||||
* Save all metrics in memory. Metric older than {@link #MAX_METRIC_LIVE_TIME_MS} will be removed. | |||||
* | |||||
* @param metrics metrics to be saved. | |||||
*/ | |||||
@Override | |||||
public synchronized void save(MetricEntity entity) { | |||||
if (entity == null || StringUtil.isBlank(entity.getApp())) { | |||||
return; | |||||
} | |||||
allMetrics.computeIfAbsent(entity.getApp(), e -> new HashMap<>(16)) | |||||
.computeIfAbsent(entity.getResource(), e -> new LinkedHashMap<Long, MetricEntity>() { | |||||
@Override | |||||
protected boolean removeEldestEntry(Entry<Long, MetricEntity> eldest) { | |||||
// Metric older than {@link #MAX_METRIC_LIVE_TIME_MS} will be removed. | |||||
return eldest.getKey() < System.currentTimeMillis() - MAX_METRIC_LIVE_TIME_MS; | |||||
} | |||||
}).put(entity.getTimestamp().getTime(), entity); | |||||
} | |||||
@Override | |||||
public synchronized void saveAll(Iterable<MetricEntity> metrics) { | public synchronized void saveAll(Iterable<MetricEntity> metrics) { | ||||
if (metrics == null) { | if (metrics == null) { | ||||
return; | return; | ||||
} | } | ||||
for (MetricEntity entity : metrics) { | |||||
allMetrics.computeIfAbsent(entity.getApp(), e -> new HashMap<>(16)) | |||||
.computeIfAbsent(entity.getResource(), e -> new LinkedHashMap<Long, MetricEntity>() { | |||||
@Override | |||||
protected boolean removeEldestEntry(Map.Entry<Long, MetricEntity> eldest) { | |||||
return eldest.getKey() < System.currentTimeMillis() - MAX_METRIC_LIVE_TIME_MS; | |||||
} | |||||
}).put(entity.getTimestamp().getTime(), entity); | |||||
} | |||||
metrics.forEach(this::save); | |||||
} | } | ||||
public synchronized List<MetricEntity> queryByAppAndResouce(String app, | |||||
String resource, | |||||
long startTime, | |||||
long endTime) { | |||||
@Override | |||||
public synchronized List<MetricEntity> queryByAppAndResourceBetween(String app, String resource, | |||||
long startTime, long endTime) { | |||||
List<MetricEntity> results = new ArrayList<>(); | List<MetricEntity> results = new ArrayList<>(); | ||||
Map<String, LinkedHashMap<Long, MetricEntity>> resouceMap = allMetrics.get(app); | |||||
if (resouceMap == null) { | |||||
if (StringUtil.isBlank(app)) { | |||||
return results; | return results; | ||||
} | } | ||||
LinkedHashMap<Long, MetricEntity> metricsMap = resouceMap.get(resource); | |||||
Map<String, LinkedHashMap<Long, MetricEntity>> resourceMap = allMetrics.get(app); | |||||
if (resourceMap == null) { | |||||
return results; | |||||
} | |||||
LinkedHashMap<Long, MetricEntity> metricsMap = resourceMap.get(resource); | |||||
if (metricsMap == null) { | if (metricsMap == null) { | ||||
return results; | return results; | ||||
} | } | ||||
for (Map.Entry<Long, MetricEntity> entry : metricsMap.entrySet()) { | |||||
for (Entry<Long, MetricEntity> entry : metricsMap.entrySet()) { | |||||
if (entry.getKey() >= startTime && entry.getKey() <= endTime) { | if (entry.getKey() >= startTime && entry.getKey() <= endTime) { | ||||
results.add(entry.getValue()); | results.add(entry.getValue()); | ||||
} | } | ||||
@@ -80,14 +91,12 @@ public class InMemMetricStore { | |||||
return results; | return results; | ||||
} | } | ||||
/** | |||||
* Find resources of App order by last minute b_qps desc | |||||
* | |||||
* @param app app name | |||||
* @return Resources list, order by last minute b_qps desc. | |||||
*/ | |||||
public synchronized List<String> findResourcesOfApp(String app) { | |||||
@Override | |||||
public List<String> listResourcesOfApp(String app) { | |||||
List<String> results = new ArrayList<>(); | List<String> results = new ArrayList<>(); | ||||
if (StringUtil.isBlank(app)) { | |||||
return results; | |||||
} | |||||
// resource -> timestamp -> metric | // resource -> timestamp -> metric | ||||
Map<String, LinkedHashMap<Long, MetricEntity>> resourceMap = allMetrics.get(app); | Map<String, LinkedHashMap<Long, MetricEntity>> resourceMap = allMetrics.get(app); | ||||
if (resourceMap == null) { | if (resourceMap == null) { | ||||
@@ -96,8 +105,8 @@ public class InMemMetricStore { | |||||
final long minTimeMs = System.currentTimeMillis() - 1000 * 60; | final long minTimeMs = System.currentTimeMillis() - 1000 * 60; | ||||
Map<String, MetricEntity> resourceCount = new HashMap<>(32); | Map<String, MetricEntity> resourceCount = new HashMap<>(32); | ||||
for (Map.Entry<String, LinkedHashMap<Long, MetricEntity>> resourceMetrics : resourceMap.entrySet()) { | |||||
for (Map.Entry<Long, MetricEntity> metrics : resourceMetrics.getValue().entrySet()) { | |||||
for (Entry<String, LinkedHashMap<Long, MetricEntity>> resourceMetrics : resourceMap.entrySet()) { | |||||
for (Entry<Long, MetricEntity> metrics : resourceMetrics.getValue().entrySet()) { | |||||
if (metrics.getKey() < minTimeMs) { | if (metrics.getKey() < minTimeMs) { | ||||
continue; | continue; | ||||
} | } | ||||
@@ -114,16 +123,19 @@ public class InMemMetricStore { | |||||
} | } | ||||
} | } | ||||
} | } | ||||
return resourceCount.entrySet().stream().sorted((o1, o2) -> { | |||||
MetricEntity e1 = o1.getValue(); | |||||
MetricEntity e2 = o2.getValue(); | |||||
int t = e2.getBlockedQps().compareTo(e1.getBlockedQps()); | |||||
if (t != 0) { | |||||
return t; | |||||
} | |||||
return e2.getPassedQps().compareTo(e1.getPassedQps()); | |||||
}).map(e -> e.getKey()) | |||||
// Order by last minute b_qps DESC. | |||||
return resourceCount.entrySet() | |||||
.stream() | |||||
.sorted((o1, o2) -> { | |||||
MetricEntity e1 = o1.getValue(); | |||||
MetricEntity e2 = o2.getValue(); | |||||
int t = e2.getBlockedQps().compareTo(e1.getBlockedQps()); | |||||
if (t != 0) { | |||||
return t; | |||||
} | |||||
return e2.getPassedQps().compareTo(e1.getPassedQps()); | |||||
}) | |||||
.map(Entry::getKey) | |||||
.collect(Collectors.toList()); | .collect(Collectors.toList()); | ||||
} | } | ||||
} | } |
@@ -0,0 +1,60 @@ | |||||
/* | |||||
* Copyright 1999-2018 Alibaba Group Holding Ltd. | |||||
* | |||||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||||
* you may not use this file except in compliance with the License. | |||||
* You may obtain a copy of the License at | |||||
* | |||||
* http://www.apache.org/licenses/LICENSE-2.0 | |||||
* | |||||
* Unless required by applicable law or agreed to in writing, software | |||||
* distributed under the License is distributed on an "AS IS" BASIS, | |||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
* See the License for the specific language governing permissions and | |||||
* limitations under the License. | |||||
*/ | |||||
package com.taobao.csp.sentinel.dashboard.repository.metric; | |||||
import java.util.List; | |||||
/** | |||||
* Repository interface for aggregated metrics data. | |||||
* | |||||
* @param <T> type of metrics | |||||
* @author Eric Zhao | |||||
*/ | |||||
public interface MetricsRepository<T> { | |||||
/** | |||||
* Save the metric to the storage repository. | |||||
* | |||||
* @param metric metric data to save | |||||
*/ | |||||
void save(T metric); | |||||
/** | |||||
* Save all metrics to the storage repository. | |||||
* | |||||
* @param metrics metrics to save | |||||
*/ | |||||
void saveAll(Iterable<T> metrics); | |||||
/** | |||||
* Get all metrics by {@code appName} and {@code resourceName} between a period of time. | |||||
* | |||||
* @param app application name for Sentinel | |||||
* @param resource resource name | |||||
* @param startTime start timestamp | |||||
* @param endTime end timestamp | |||||
* @return all metrics in query conditions | |||||
*/ | |||||
List<T> queryByAppAndResourceBetween(String app, String resource, long startTime, long endTime); | |||||
/** | |||||
* List resource name of provided application name. | |||||
* | |||||
* @param app application name | |||||
* @return list of resources | |||||
*/ | |||||
List<String> listResourcesOfApp(String app); | |||||
} |
@@ -24,6 +24,7 @@ import java.util.Map; | |||||
import java.util.TreeMap; | import java.util.TreeMap; | ||||
import java.util.concurrent.ConcurrentHashMap; | import java.util.concurrent.ConcurrentHashMap; | ||||
import com.taobao.csp.sentinel.dashboard.repository.metric.MetricsRepository; | |||||
import org.slf4j.Logger; | import org.slf4j.Logger; | ||||
import org.slf4j.LoggerFactory; | import org.slf4j.LoggerFactory; | ||||
import org.springframework.beans.factory.annotation.Autowired; | import org.springframework.beans.factory.annotation.Autowired; | ||||
@@ -35,7 +36,6 @@ import org.springframework.web.bind.annotation.ResponseBody; | |||||
import com.alibaba.csp.sentinel.util.StringUtil; | import com.alibaba.csp.sentinel.util.StringUtil; | ||||
import com.taobao.csp.sentinel.dashboard.datasource.entity.MetricEntity; | import com.taobao.csp.sentinel.dashboard.datasource.entity.MetricEntity; | ||||
import com.taobao.csp.sentinel.dashboard.inmem.InMemMetricStore; | |||||
import com.taobao.csp.sentinel.dashboard.view.vo.MetricVo; | import com.taobao.csp.sentinel.dashboard.view.vo.MetricVo; | ||||
/** | /** | ||||
@@ -50,7 +50,7 @@ public class MetricController { | |||||
private static final long maxQueryIntervalMs = 1000 * 60 * 60; | private static final long maxQueryIntervalMs = 1000 * 60 * 60; | ||||
@Autowired | @Autowired | ||||
private InMemMetricStore metricStore; | |||||
private MetricsRepository<MetricEntity> metricStore; | |||||
@ResponseBody | @ResponseBody | ||||
@RequestMapping("/queryTopResourceMetric.json") | @RequestMapping("/queryTopResourceMetric.json") | ||||
@@ -83,7 +83,7 @@ public class MetricController { | |||||
if (endTime - startTime > maxQueryIntervalMs) { | if (endTime - startTime > maxQueryIntervalMs) { | ||||
return Result.ofFail(-1, "time intervalMs is too big, must <= 1h"); | return Result.ofFail(-1, "time intervalMs is too big, must <= 1h"); | ||||
} | } | ||||
List<String> resources = metricStore.findResourcesOfApp(app); | |||||
List<String> resources = metricStore.listResourcesOfApp(app); | |||||
logger.info("queryTopResourceMetric(), resources.size()={}", resources.size()); | logger.info("queryTopResourceMetric(), resources.size()={}", resources.size()); | ||||
if (resources == null || resources.isEmpty()) { | if (resources == null || resources.isEmpty()) { | ||||
return Result.ofSuccess(null); | return Result.ofSuccess(null); | ||||
@@ -110,7 +110,7 @@ public class MetricController { | |||||
logger.info("topResource={}", topResource); | logger.info("topResource={}", topResource); | ||||
long time = System.currentTimeMillis(); | long time = System.currentTimeMillis(); | ||||
for (final String resource : topResource) { | for (final String resource : topResource) { | ||||
List<MetricEntity> entities = metricStore.queryByAppAndResouce( | |||||
List<MetricEntity> entities = metricStore.queryByAppAndResourceBetween( | |||||
app, resource, startTime, endTime); | app, resource, startTime, endTime); | ||||
logger.info("resource={}, entities.size()={}", resource, entities == null ? "null" : entities.size()); | logger.info("resource={}, entities.size()={}", resource, entities == null ? "null" : entities.size()); | ||||
List<MetricVo> vos = MetricVo.fromMetricEntities(entities, resource); | List<MetricVo> vos = MetricVo.fromMetricEntities(entities, resource); | ||||
@@ -151,7 +151,7 @@ public class MetricController { | |||||
if (endTime - startTime > maxQueryIntervalMs) { | if (endTime - startTime > maxQueryIntervalMs) { | ||||
return Result.ofFail(-1, "time intervalMs is too big, must <= 1h"); | return Result.ofFail(-1, "time intervalMs is too big, must <= 1h"); | ||||
} | } | ||||
List<MetricEntity> entities = metricStore.queryByAppAndResouce( | |||||
List<MetricEntity> entities = metricStore.queryByAppAndResourceBetween( | |||||
app, identity, startTime, endTime); | app, identity, startTime, endTime); | ||||
List<MetricVo> vos = MetricVo.fromMetricEntities(entities, identity); | List<MetricVo> vos = MetricVo.fromMetricEntities(entities, identity); | ||||
return Result.ofSuccess(sortMetricVoAndDistinct(vos)); | return Result.ofSuccess(sortMetricVoAndDistinct(vos)); | ||||