Browse Source

Enhance reliability and performance of InMemoryMetricsRepository (#1319)

* Fix InMemoryMetricsRepository can't keep the last five minutes metrics data problem and Improve read-write performance
* Use TimeUtil.currentTimeMillis() replace System.currentTimeMillis() for better performance
master
Lin.Liang GitHub 5 years ago
parent
commit
80d5c8b484
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 153 additions and 95 deletions
  1. +78
    -56
      sentinel-dashboard/src/main/java/com/alibaba/csp/sentinel/dashboard/repository/metric/InMemoryMetricsRepository.java
  2. +75
    -39
      sentinel-dashboard/src/test/java/com/alibaba/csp/sentinel/dashboard/repository/metric/InMemoryMetricsRepositoryTest.java

+ 78
- 56
sentinel-dashboard/src/main/java/com/alibaba/csp/sentinel/dashboard/repository/metric/InMemoryMetricsRepository.java View File

@@ -15,6 +15,11 @@
*/
package com.alibaba.csp.sentinel.dashboard.repository.metric;

import com.alibaba.csp.sentinel.dashboard.datasource.entity.MetricEntity;
import com.alibaba.csp.sentinel.util.StringUtil;
import com.alibaba.csp.sentinel.util.TimeUtil;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
@@ -22,14 +27,9 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

import com.alibaba.csp.sentinel.dashboard.datasource.entity.MetricEntity;
import com.alibaba.csp.sentinel.util.StringUtil;

import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
import org.springframework.stereotype.Component;

/**
* Caches metrics data in a period of time in memory.
*
@@ -44,54 +44,71 @@ public class InMemoryMetricsRepository implements MetricsRepository<MetricEntity
/**
* {@code app -> resource -> timestamp -> metric}
*/
private Map<String, Map<String, ConcurrentLinkedHashMap<Long, MetricEntity>>> allMetrics = new ConcurrentHashMap<>();
private Map<String, Map<String, LinkedHashMap<Long, MetricEntity>>> allMetrics = new ConcurrentHashMap<>();

private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();


@Override
public synchronized void save(MetricEntity entity) {
public void save(MetricEntity entity) {
if (entity == null || StringUtil.isBlank(entity.getApp())) {
return;
}
allMetrics.computeIfAbsent(entity.getApp(), e -> new ConcurrentHashMap<>(16))
.computeIfAbsent(entity.getResource(), e -> new ConcurrentLinkedHashMap.Builder<Long, MetricEntity>()
.maximumWeightedCapacity(MAX_METRIC_LIVE_TIME_MS).weigher((key, value) -> {
// Metric older than {@link #MAX_METRIC_LIVE_TIME_MS} will be removed.
int weight = (int)(System.currentTimeMillis() - key);
// weight must be a number greater than or equal to one
return Math.max(weight, 1);
}).build()).put(entity.getTimestamp().getTime(), entity);
readWriteLock.writeLock().lock();
try {
allMetrics.computeIfAbsent(entity.getApp(), e -> new HashMap<>(16))
.computeIfAbsent(entity.getResource(), e -> new LinkedHashMap<Long, MetricEntity>() {
@Override
protected boolean removeEldestEntry(Entry<Long, MetricEntity> eldest) {
// Metric older than {@link #MAX_METRIC_LIVE_TIME_MS} will be removed.
return eldest.getKey() < TimeUtil.currentTimeMillis() - MAX_METRIC_LIVE_TIME_MS;
}
}).put(entity.getTimestamp().getTime(), entity);
} finally {
readWriteLock.writeLock().unlock();
}

}

@Override
public synchronized void saveAll(Iterable<MetricEntity> metrics) {
public void saveAll(Iterable<MetricEntity> metrics) {
if (metrics == null) {
return;
}
metrics.forEach(this::save);
readWriteLock.writeLock().lock();
try {
metrics.forEach(this::save);
} finally {
readWriteLock.writeLock().unlock();
}
}

@Override
public synchronized List<MetricEntity> queryByAppAndResourceBetween(String app, String resource,
long startTime, long endTime) {
public List<MetricEntity> queryByAppAndResourceBetween(String app, String resource,
long startTime, long endTime) {
List<MetricEntity> results = new ArrayList<>();
if (StringUtil.isBlank(app)) {
return results;
}
Map<String, ConcurrentLinkedHashMap<Long, MetricEntity>> resourceMap = allMetrics.get(app);
Map<String, LinkedHashMap<Long, MetricEntity>> resourceMap = allMetrics.get(app);
if (resourceMap == null) {
return results;
}
ConcurrentLinkedHashMap<Long, MetricEntity> metricsMap = resourceMap.get(resource);
LinkedHashMap<Long, MetricEntity> metricsMap = resourceMap.get(resource);
if (metricsMap == null) {
return results;
}
for (Entry<Long, MetricEntity> entry : metricsMap.entrySet()) {
if (entry.getKey() >= startTime && entry.getKey() <= endTime) {
results.add(entry.getValue());
readWriteLock.readLock().lock();
try {
for (Entry<Long, MetricEntity> entry : metricsMap.entrySet()) {
if (entry.getKey() >= startTime && entry.getKey() <= endTime) {
results.add(entry.getValue());
}
}
return results;
} finally {
readWriteLock.readLock().unlock();
}
return results;
}

@Override
@@ -101,44 +118,49 @@ public class InMemoryMetricsRepository implements MetricsRepository<MetricEntity
return results;
}
// resource -> timestamp -> metric
Map<String, ConcurrentLinkedHashMap<Long, MetricEntity>> resourceMap = allMetrics.get(app);
Map<String, LinkedHashMap<Long, MetricEntity>> resourceMap = allMetrics.get(app);
if (resourceMap == null) {
return results;
}
final long minTimeMs = System.currentTimeMillis() - 1000 * 60;
Map<String, MetricEntity> resourceCount = new ConcurrentHashMap<>(32);

for (Entry<String, ConcurrentLinkedHashMap<Long, MetricEntity>> resourceMetrics : resourceMap.entrySet()) {
for (Entry<Long, MetricEntity> metrics : resourceMetrics.getValue().entrySet()) {
if (metrics.getKey() < minTimeMs) {
continue;
}
MetricEntity newEntity = metrics.getValue();
if (resourceCount.containsKey(resourceMetrics.getKey())) {
MetricEntity oldEntity = resourceCount.get(resourceMetrics.getKey());
oldEntity.addPassQps(newEntity.getPassQps());
oldEntity.addRtAndSuccessQps(newEntity.getRt(), newEntity.getSuccessQps());
oldEntity.addBlockQps(newEntity.getBlockQps());
oldEntity.addExceptionQps(newEntity.getExceptionQps());
oldEntity.addCount(1);
} else {
resourceCount.put(resourceMetrics.getKey(), MetricEntity.copyOf(newEntity));
readWriteLock.readLock().lock();
try {
for (Entry<String, LinkedHashMap<Long, MetricEntity>> resourceMetrics : resourceMap.entrySet()) {
for (Entry<Long, MetricEntity> metrics : resourceMetrics.getValue().entrySet()) {
if (metrics.getKey() < minTimeMs) {
continue;
}
MetricEntity newEntity = metrics.getValue();
if (resourceCount.containsKey(resourceMetrics.getKey())) {
MetricEntity oldEntity = resourceCount.get(resourceMetrics.getKey());
oldEntity.addPassQps(newEntity.getPassQps());
oldEntity.addRtAndSuccessQps(newEntity.getRt(), newEntity.getSuccessQps());
oldEntity.addBlockQps(newEntity.getBlockQps());
oldEntity.addExceptionQps(newEntity.getExceptionQps());
oldEntity.addCount(1);
} else {
resourceCount.put(resourceMetrics.getKey(), MetricEntity.copyOf(newEntity));
}
}
}
// Order by last minute b_qps DESC.
return resourceCount.entrySet()
.stream()
.sorted((o1, o2) -> {
MetricEntity e1 = o1.getValue();
MetricEntity e2 = o2.getValue();
int t = e2.getBlockQps().compareTo(e1.getBlockQps());
if (t != 0) {
return t;
}
return e2.getPassQps().compareTo(e1.getPassQps());
})
.map(Entry::getKey)
.collect(Collectors.toList());
} finally {
readWriteLock.readLock().unlock();
}
// Order by last minute b_qps DESC.
return resourceCount.entrySet()
.stream()
.sorted((o1, o2) -> {
MetricEntity e1 = o1.getValue();
MetricEntity e2 = o2.getValue();
int t = e2.getBlockQps().compareTo(e1.getBlockQps());
if (t != 0) {
return t;
}
return e2.getPassQps().compareTo(e1.getPassQps());
})
.map(Entry::getKey)
.collect(Collectors.toList());
}
}

+ 75
- 39
sentinel-dashboard/src/test/java/com/alibaba/csp/sentinel/dashboard/repository/metric/InMemoryMetricsRepositoryTest.java View File

@@ -16,17 +16,24 @@
package com.alibaba.csp.sentinel.dashboard.repository.metric;

import com.alibaba.csp.sentinel.dashboard.datasource.entity.MetricEntity;

import org.assertj.core.util.Lists;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.springframework.util.CollectionUtils;

import java.util.ArrayList;
import java.util.ConcurrentModificationException;
import java.util.Date;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.junit.Assert.*;

@@ -37,13 +44,11 @@ import static org.junit.Assert.*;
*/
public class InMemoryMetricsRepositoryTest {

private static final String DEFAULT_APP = "default";
private static final String DEFAULT_EXPIRE_APP = "default_expire_app";
private static final String DEFAULT_RESOURCE = "test";
private final static String DEFAULT_APP = "defaultApp";
private final static String DEFAULT_RESOURCE = "defaultResource";
private static final long EXPIRE_TIME = 1000 * 60 * 5L;

private InMemoryMetricsRepository inMemoryMetricsRepository;

private ExecutorService executorService;

@Before
@@ -57,27 +62,50 @@ public class InMemoryMetricsRepositoryTest {
executorService.shutdownNow();
}

private void testSave() {
for (int i = 0; i < 1000000; i++) {

@Test
public void testSave() {
MetricEntity entry = new MetricEntity();
entry.setApp("testSave");
entry.setResource("testResource");
entry.setTimestamp(new Date(System.currentTimeMillis()));
entry.setPassQps(1L);
entry.setExceptionQps(1L);
entry.setBlockQps(0L);
entry.setSuccessQps(1L);
inMemoryMetricsRepository.save(entry);
List<String> resources = inMemoryMetricsRepository.listResourcesOfApp("testSave");
Assert.assertTrue(resources.size() == 1 && "testResource".equals(resources.get(0)));
}


@Test
public void testSaveAll() {
List<MetricEntity> entities = new ArrayList<>(10000);
for (int i = 0; i < 10000; i++) {
MetricEntity entry = new MetricEntity();
entry.setApp(DEFAULT_APP);
entry.setResource(DEFAULT_RESOURCE);
entry.setApp("testSaveAll");
entry.setResource("testResource" + i);
entry.setTimestamp(new Date(System.currentTimeMillis()));
entry.setPassQps(1L);
entry.setExceptionQps(1L);
entry.setBlockQps(0L);
entry.setSuccessQps(1L);
inMemoryMetricsRepository.save(entry);
entities.add(entry);
}
inMemoryMetricsRepository.saveAll(entities);
List<String> result = inMemoryMetricsRepository.listResourcesOfApp("testSaveAll");
Assert.assertTrue(result.size() == entities.size());
}


@Test
public void testExpireMetric() {
long now = System.currentTimeMillis();
MetricEntity expireEntry = new MetricEntity();
expireEntry.setApp(DEFAULT_EXPIRE_APP);
expireEntry.setApp(DEFAULT_APP);
expireEntry.setResource(DEFAULT_RESOURCE);
expireEntry.setTimestamp(new Date(now - EXPIRE_TIME - 10L));
expireEntry.setTimestamp(new Date(now - EXPIRE_TIME - 1L));
expireEntry.setPassQps(1L);
expireEntry.setExceptionQps(1L);
expireEntry.setBlockQps(0L);
@@ -85,7 +113,7 @@ public class InMemoryMetricsRepositoryTest {
inMemoryMetricsRepository.save(expireEntry);

MetricEntity entry = new MetricEntity();
entry.setApp(DEFAULT_EXPIRE_APP);
entry.setApp(DEFAULT_APP);
entry.setResource(DEFAULT_RESOURCE);
entry.setTimestamp(new Date(now));
entry.setPassQps(1L);
@@ -95,47 +123,40 @@ public class InMemoryMetricsRepositoryTest {
inMemoryMetricsRepository.save(entry);

List<MetricEntity> list = inMemoryMetricsRepository.queryByAppAndResourceBetween(
DEFAULT_EXPIRE_APP, DEFAULT_RESOURCE, now - 2 * EXPIRE_TIME, now + EXPIRE_TIME);
DEFAULT_APP, DEFAULT_RESOURCE, now - EXPIRE_TIME, now);

assertFalse(CollectionUtils.isEmpty(list));
assertEquals(1, list.size());
assertTrue(list.get(0).getTimestamp().getTime() >= now - EXPIRE_TIME && list.get(0).getTimestamp().getTime() <= now);

}

@Test
public void testListResourcesOfApp() {
// prepare basic test data
testSave();
System.out.println( "[" + System.currentTimeMillis() + "] Basic test data ready in testListResourcesOfApp");

List<CompletableFuture> futures = Lists.newArrayList();
@Test
public void testConcurrentPutAndGet() {

// concurrent query resources of app
List<CompletableFuture> futures = new ArrayList<>(10000);
final CyclicBarrier cyclicBarrier = new CyclicBarrier(8);

for (int j = 0; j < 10000; j++) {
futures.add(
CompletableFuture.runAsync(() -> {
final int finalJ = j;
futures.add(CompletableFuture.runAsync(() -> {
try {
cyclicBarrier.await();
inMemoryMetricsRepository.listResourcesOfApp(DEFAULT_APP);
if (finalJ % 2 == 0) {
batchSave();
} else {
inMemoryMetricsRepository.listResourcesOfApp(DEFAULT_APP);
}

} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}, executorService)

}, executorService)
);
}

// batch add metric entity
for (int i = 0; i < 10000; i++) {
MetricEntity entry = new MetricEntity();
entry.setApp(DEFAULT_APP);
entry.setResource(DEFAULT_RESOURCE);
entry.setTimestamp(new Date(System.currentTimeMillis() - EXPIRE_TIME - 1000L));
entry.setPassQps(1L);
entry.setExceptionQps(1L);
entry.setBlockQps(0L);
entry.setSuccessQps(1L);
inMemoryMetricsRepository.save(entry);
}

CompletableFuture all = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));

@@ -155,4 +176,19 @@ public class InMemoryMetricsRepositoryTest {
}
}

private void batchSave() {
for (int i = 0; i < 100; i++) {
MetricEntity entry = new MetricEntity();
entry.setApp(DEFAULT_APP);
entry.setResource(DEFAULT_RESOURCE);
entry.setTimestamp(new Date(System.currentTimeMillis()));
entry.setPassQps(1L);
entry.setExceptionQps(1L);
entry.setBlockQps(0L);
entry.setSuccessQps(1L);
inMemoryMetricsRepository.save(entry);
}
}


}

Loading…
Cancel
Save