Browse Source

增加线程安全处理

master
林万龙 3 years ago
parent
commit
9f16e26bf4
1 changed files with 41 additions and 12 deletions
  1. +41
    -12
      src/main/java/com/telpo/iotgateway/listener/IotMessageListener.java

+ 41
- 12
src/main/java/com/telpo/iotgateway/listener/IotMessageListener.java View File

@@ -8,6 +8,9 @@ import org.springframework.stereotype.Component;
import javax.jms.Message; import javax.jms.Message;
import javax.jms.MessageListener; import javax.jms.MessageListener;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;


/** /**
@@ -21,6 +24,30 @@ import java.util.concurrent.atomic.AtomicInteger;
public class IotMessageListener implements MessageListener { public class IotMessageListener implements MessageListener {


private AtomicInteger count = new AtomicInteger(0); private AtomicInteger count = new AtomicInteger(0);
/**
* Runnable:实现了Runnable接口,jdk就知道这个类是一个线程
*/
private Runnable runnable;

public IotMessageListener() {
this.runnable = new Runnable() {
//创建 run 方法
@Override
public void run() {
// task to run goes here
count.getAndSet(0);
System.out.println("Hello, stranger");
}
};

// ScheduledExecutorService:是从Java SE5的java.util.concurrent里,
// 做为并发工具类被引进的,这是最理想的定时任务实现方式。
ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
// 第二个参数为首次执行的延时时间,第三个参数为定时执行的间隔时间
// 10:秒 5:秒
// 第一次执行的时间为10秒,然后每隔五秒执行一次
service.scheduleAtFixedRate(runnable, 60, 60, TimeUnit.SECONDS);
}


private ExecutorService executorService; private ExecutorService executorService;
public void setExecutorService(ExecutorService executorService) { public void setExecutorService(ExecutorService executorService) {
@@ -56,22 +83,24 @@ public class IotMessageListener implements MessageListener {
+ ", messageId = " + messageId + ", messageId = " + messageId
+ ", content = " + content); + ", content = " + content);
count.getAndIncrement(); count.getAndIncrement();
log.info("count is : " + count.toString());
} catch (Exception e) { } catch (Exception e) {
log.error("processMessage occurs error ", e); log.error("processMessage occurs error ", e);
} }
} }


@Scheduled(cron = "${scheduler.task.cron}")
public void countMessages() {
//log.info("开始执行定时推送失败的推送记录!");
// 获取推送失败的记录
//try {
log.warn("约1分钟处理 {} 个请求",count);
count.getAndSet(0);
//} catch (Exception e) {
// log.error("执行定时计数发生异常:", e);
//}
}


// @Scheduled(cron = "${scheduler.task.cron}")
// public void countMessages() {
// //log.info("开始执行定时推送失败的推送记录!");
// // 获取推送失败的记录
// //try {
// log.warn("约1分钟处理 {} 个请求",count);
// count.getAndSet(0);
// //} catch (Exception e) {
// // log.error("执行定时计数发生异常:", e);
// //}
// }


} }

Loading…
Cancel
Save