Explorar el Código

Replace AtomicInteger with LongAdder for curThreadNum of StatisticNode (#747)

master
Lin.Liang Eric Zhao hace 5 años
padre
commit
3a9e2629b7
Se han modificado 2 ficheros con 137 adiciones y 10 borrados
  1. +10
    -10
      sentinel-core/src/main/java/com/alibaba/csp/sentinel/node/StatisticNode.java
  2. +127
    -0
      sentinel-core/src/test/java/com/alibaba/csp/sentinel/node/StatisticNodeTest.java

+ 10
- 10
sentinel-core/src/main/java/com/alibaba/csp/sentinel/node/StatisticNode.java Ver fichero

@@ -15,15 +15,15 @@
*/
package com.alibaba.csp.sentinel.node;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

import com.alibaba.csp.sentinel.util.TimeUtil;
import com.alibaba.csp.sentinel.node.metric.MetricNode;
import com.alibaba.csp.sentinel.slots.statistic.base.LongAdder;
import com.alibaba.csp.sentinel.slots.statistic.metric.ArrayMetric;
import com.alibaba.csp.sentinel.slots.statistic.metric.Metric;
import com.alibaba.csp.sentinel.util.TimeUtil;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* <p>The statistic node keep three kinds of real-time statistics metrics:</p>
@@ -104,7 +104,7 @@ public class StatisticNode implements Node {
/**
* The counter for thread count.
*/
private AtomicInteger curThreadNum = new AtomicInteger(0);
private LongAdder curThreadNum = new LongAdder();

/**
* The last timestamp when metrics were fetched.
@@ -233,7 +233,7 @@ public class StatisticNode implements Node {

@Override
public int curThreadNum() {
return curThreadNum.get();
return (int)curThreadNum.sum();
}

@Override
@@ -265,12 +265,12 @@ public class StatisticNode implements Node {

@Override
public void increaseThreadNum() {
curThreadNum.incrementAndGet();
curThreadNum.increment();
}

@Override
public void decreaseThreadNum() {
curThreadNum.decrementAndGet();
curThreadNum.decrement();
}

@Override


+ 127
- 0
sentinel-core/src/test/java/com/alibaba/csp/sentinel/node/StatisticNodeTest.java Ver fichero

@@ -17,17 +17,23 @@ package com.alibaba.csp.sentinel.node;

import com.alibaba.csp.sentinel.Constants;
import com.alibaba.csp.sentinel.util.TimeUtil;
import org.junit.Assert;
import org.junit.Test;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -198,4 +204,125 @@ public class StatisticNodeTest {
private static void log(Object obj) {
System.out.println(LOG_PREFIX + obj);
}


/**
* com.alibaba.csp.sentinel.node.StatisticNode#curThreadNum using LongAdder replace the AtomicInteger.
* now test the LongAdder is fast than AtomicInteger
* and get the right statistic or not
*/
@Test
public void testStatisticLongAdder() throws InterruptedException {
AtomicInteger atomicInteger = new AtomicInteger(0);
StatisticNode statisticNode = new StatisticNode();
ExecutorService bizEs1 = new ThreadPoolExecutor(THREAD_COUNT, THREAD_COUNT,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
ExecutorService bizEs2 = new ThreadPoolExecutor(THREAD_COUNT, THREAD_COUNT,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
int taskCount = 100;
for (int i = 0; i < taskCount; i++) {
int op = i % 2;
bizEs2.submit(new StatisticAtomicIntegerTask(atomicInteger, op, i));
bizEs1.submit(new StatisticLongAdderTask(statisticNode, op, i));
}
Thread.sleep(5000);

log("LongAdder totalCost : " + StatisticLongAdderTask.totalCost() + "ms");
log("AtomicInteger totalCost : " + StatisticAtomicIntegerTask.totalCost() + "ms");
Assert.assertEquals(statisticNode.curThreadNum(), atomicInteger.get());


}

private static class StatisticLongAdderTask implements Runnable {


private StatisticNode statisticNode;
/**
* 0 addition
* 1 subtraction
*/
private int op;

private int taskId;

private static Map<Integer, Long> taskCostMap = new ConcurrentHashMap<>(16);


public StatisticLongAdderTask(StatisticNode statisticNode, int op, int taskId) {
this.statisticNode = statisticNode;
this.op = op;
this.taskId = taskId;
}

@Override
public void run() {
long startTime = System.currentTimeMillis();
int calCount = 100000;
for (int i = 0; i < calCount; i++) {
if (op == 0) {
statisticNode.increaseThreadNum();
} else if (op == 1) {
statisticNode.decreaseThreadNum();
}
}
long cost = System.currentTimeMillis() - startTime;
taskCostMap.put(taskId, cost);
}

public static long totalCost() {
long totalCost = 0;
for (long cost : taskCostMap.values()) {
totalCost += cost;
}
return totalCost;
}
}

private static class StatisticAtomicIntegerTask implements Runnable {

AtomicInteger atomicInteger;
/**
* 0 addition
* 1 subtraction
*/
private int op;

private int taskId;

private static Map<Integer, Long> taskCostMap = new ConcurrentHashMap<>(16);

public StatisticAtomicIntegerTask(AtomicInteger atomicInteger, int op, int taskId) {
this.atomicInteger = atomicInteger;
this.op = op;
this.taskId = taskId;
}

@Override
public void run() {
long startTime = System.currentTimeMillis();
int calCount = 100000;
for (int i = 0; i < calCount; i++) {
if (op == 0) {
atomicInteger.incrementAndGet();
} else if (op == 1) {
atomicInteger.decrementAndGet();
}
}
long cost = System.currentTimeMillis() - startTime;
taskCostMap.put(taskId, cost);
}

public static long totalCost() {
long totalCost = 0;
for (long cost : taskCostMap.values()) {
totalCost += cost;
}
return totalCost;
}
}


}

Cargando…
Cancelar
Guardar