From b309dbe835024b6ba99446d17fb9a6ec7c41f421 Mon Sep 17 00:00:00 2001 From: "nick.tan" Date: Fri, 22 Feb 2019 17:20:10 +0800 Subject: [PATCH] Fix concurrent error in InMemoryMetricsRepository of the dashboard (#488) - fix ConcurrentModificationException of listResourcesOfApp method --- .../metric/InMemoryMetricsRepository.java | 29 ++-- .../metric/InMemoryMetricsRepositoryTest.java | 142 ++++++++++++++++++ 2 files changed, 158 insertions(+), 13 deletions(-) create mode 100644 sentinel-dashboard/src/test/java/com/alibaba/csp/sentinel/dashboard/repository/metric/InMemoryMetricsRepositoryTest.java diff --git a/sentinel-dashboard/src/main/java/com/alibaba/csp/sentinel/dashboard/repository/metric/InMemoryMetricsRepository.java b/sentinel-dashboard/src/main/java/com/alibaba/csp/sentinel/dashboard/repository/metric/InMemoryMetricsRepository.java index d10ecf2c..bed4ea1c 100644 --- a/sentinel-dashboard/src/main/java/com/alibaba/csp/sentinel/dashboard/repository/metric/InMemoryMetricsRepository.java +++ b/sentinel-dashboard/src/main/java/com/alibaba/csp/sentinel/dashboard/repository/metric/InMemoryMetricsRepository.java @@ -27,6 +27,7 @@ import java.util.stream.Collectors; import com.alibaba.csp.sentinel.dashboard.datasource.entity.MetricEntity; import com.alibaba.csp.sentinel.util.StringUtil; +import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap; import org.springframework.stereotype.Component; /** @@ -43,21 +44,23 @@ public class InMemoryMetricsRepository implements MetricsRepository resource -> timestamp -> metric} */ - private Map>> allMetrics = new ConcurrentHashMap<>(); + private Map>> allMetrics = new ConcurrentHashMap<>(); + + @Override public synchronized void save(MetricEntity entity) { if (entity == null || StringUtil.isBlank(entity.getApp())) { return; } - allMetrics.computeIfAbsent(entity.getApp(), e -> new HashMap<>(16)) - .computeIfAbsent(entity.getResource(), e -> new LinkedHashMap() { - @Override - protected boolean removeEldestEntry(Entry eldest) { + allMetrics.computeIfAbsent(entity.getApp(), e -> new ConcurrentHashMap<>(16)) + .computeIfAbsent(entity.getResource(), e -> new ConcurrentLinkedHashMap.Builder() + .maximumWeightedCapacity(MAX_METRIC_LIVE_TIME_MS).weigher((key, value) -> { // Metric older than {@link #MAX_METRIC_LIVE_TIME_MS} will be removed. - return eldest.getKey() < System.currentTimeMillis() - MAX_METRIC_LIVE_TIME_MS; - } - }).put(entity.getTimestamp().getTime(), entity); + int weight = (int)(System.currentTimeMillis() - key); + // weight must be a number greater than or equal to one + return Math.max(weight, 1); + }).build()).put(entity.getTimestamp().getTime(), entity); } @Override @@ -75,11 +78,11 @@ public class InMemoryMetricsRepository implements MetricsRepository> resourceMap = allMetrics.get(app); + Map> resourceMap = allMetrics.get(app); if (resourceMap == null) { return results; } - LinkedHashMap metricsMap = resourceMap.get(resource); + ConcurrentLinkedHashMap metricsMap = resourceMap.get(resource); if (metricsMap == null) { return results; } @@ -98,14 +101,14 @@ public class InMemoryMetricsRepository implements MetricsRepository timestamp -> metric - Map> resourceMap = allMetrics.get(app); + Map> resourceMap = allMetrics.get(app); if (resourceMap == null) { return results; } final long minTimeMs = System.currentTimeMillis() - 1000 * 60; - Map resourceCount = new HashMap<>(32); + Map resourceCount = new ConcurrentHashMap<>(32); - for (Entry> resourceMetrics : resourceMap.entrySet()) { + for (Entry> resourceMetrics : resourceMap.entrySet()) { for (Entry metrics : resourceMetrics.getValue().entrySet()) { if (metrics.getKey() < minTimeMs) { continue; diff --git a/sentinel-dashboard/src/test/java/com/alibaba/csp/sentinel/dashboard/repository/metric/InMemoryMetricsRepositoryTest.java b/sentinel-dashboard/src/test/java/com/alibaba/csp/sentinel/dashboard/repository/metric/InMemoryMetricsRepositoryTest.java new file mode 100644 index 00000000..62040671 --- /dev/null +++ b/sentinel-dashboard/src/test/java/com/alibaba/csp/sentinel/dashboard/repository/metric/InMemoryMetricsRepositoryTest.java @@ -0,0 +1,142 @@ +package com.alibaba.csp.sentinel.dashboard.repository.metric; + +import com.alibaba.csp.sentinel.dashboard.datasource.entity.MetricEntity; +import org.assertj.core.util.Lists; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.springframework.util.CollectionUtils; + +import java.util.ConcurrentModificationException; +import java.util.Date; +import java.util.List; +import java.util.concurrent.*; + +import static org.junit.Assert.*; + +/** + * InMemoryMetricsRepository Test + * + * @author Nick Tan + */ +public class InMemoryMetricsRepositoryTest { + + private static final String DEFAULT_APP = "default"; + private static final String DEFAULT_EXPIRE_APP = "default_expire_app"; + private static final String DEFAULT_RESOURCE = "test"; + private static final long EXPIRE_TIME = 1000 * 60 * 5L; + private InMemoryMetricsRepository inMemoryMetricsRepository; + + private static final int AVAILABLE_CPU_PROCESSORS = Runtime.getRuntime().availableProcessors(); + + private ExecutorService executorService = Executors.newFixedThreadPool(AVAILABLE_CPU_PROCESSORS); + + @Before + public void setUp() throws Exception { + + inMemoryMetricsRepository = new InMemoryMetricsRepository(); + } + + @Test + public void save() throws InterruptedException { + + for (int i = 0; i < 1000000; i++) { + + MetricEntity entry = new MetricEntity(); + entry.setApp(DEFAULT_APP); + entry.setResource(DEFAULT_RESOURCE); + entry.setTimestamp(new Date(System.currentTimeMillis())); + entry.setPassQps(1L); + entry.setExceptionQps(1L); + entry.setBlockQps(0L); + entry.setSuccessQps(1L); + inMemoryMetricsRepository.save(entry); + + } + + } + + @Test + public void testExpireMetric() throws InterruptedException { + + long now = System.currentTimeMillis(); + MetricEntity expireEntry = new MetricEntity(); + expireEntry.setApp(DEFAULT_EXPIRE_APP); + expireEntry.setResource(DEFAULT_RESOURCE); + expireEntry.setTimestamp(new Date(now - EXPIRE_TIME - 10L)); + expireEntry.setPassQps(1L); + expireEntry.setExceptionQps(1L); + expireEntry.setBlockQps(0L); + expireEntry.setSuccessQps(1L); + inMemoryMetricsRepository.save(expireEntry); + + MetricEntity entry = new MetricEntity(); + entry.setApp(DEFAULT_EXPIRE_APP); + entry.setResource(DEFAULT_RESOURCE); + entry.setTimestamp(new Date(now)); + entry.setPassQps(1L); + entry.setExceptionQps(1L); + entry.setBlockQps(0L); + entry.setSuccessQps(1L); + inMemoryMetricsRepository.save(entry); + + List list = inMemoryMetricsRepository.queryByAppAndResourceBetween( + DEFAULT_EXPIRE_APP, DEFAULT_RESOURCE, now - 2 * EXPIRE_TIME, now + EXPIRE_TIME); + + Assert.assertEquals(false, CollectionUtils.isEmpty(list)); + + assertTrue(list.size() == 1); + + } + + @Test + public void listResourcesOfApp() throws InterruptedException { + // prepare basic test data + save(); + System.out.println(System.currentTimeMillis() + "[basic test data ready]"); + + List futures = Lists.newArrayList(); + + // concurrent query resources of app + final CyclicBarrier cyclicBarrier = new CyclicBarrier(AVAILABLE_CPU_PROCESSORS); + for (int j = 0; j < 10000; j++) { + + futures.add( + CompletableFuture.runAsync(() -> { + try { + cyclicBarrier.await(); + inMemoryMetricsRepository.listResourcesOfApp(DEFAULT_APP); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (BrokenBarrierException e) { + e.printStackTrace(); + } + }, executorService + )); + } + + // batch add metric entity + for (int i = 0; i < 10000; i++) { + + MetricEntity entry = new MetricEntity(); + entry.setApp(DEFAULT_APP); + entry.setResource(DEFAULT_RESOURCE); + entry.setTimestamp(new Date(System.currentTimeMillis() - EXPIRE_TIME - 1000L)); + entry.setPassQps(1L); + entry.setExceptionQps(1L); + entry.setBlockQps(0L); + entry.setSuccessQps(1L); + inMemoryMetricsRepository.save(entry); + + } + + CompletableFuture all = CompletableFuture.allOf(futures.toArray((new CompletableFuture[futures.size()]))); + try { + all.join(); + } catch (ConcurrentModificationException e) { + e.printStackTrace(); + assertFalse("concurrent error", e instanceof ConcurrentModificationException); + } + } + +} \ No newline at end of file