Browse Source

计数线程处理调整

master
林万龙 3 years ago
parent
commit
5fd53dcb8c
2 changed files with 10 additions and 15 deletions
  1. +5
    -10
      src/main/java/com/telpo/iotgateway/listener/IotMessageListener.java
  2. +5
    -5
      src/main/java/com/telpo/iotgateway/server/IotSubscribe.java

+ 5
- 10
src/main/java/com/telpo/iotgateway/listener/IotMessageListener.java View File

@@ -1,15 +1,11 @@
package com.telpo.iotgateway.listener; package com.telpo.iotgateway.listener;


import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;


import javax.jms.Message; import javax.jms.Message;
import javax.jms.MessageListener; import javax.jms.MessageListener;
import javax.validation.constraints.Positive;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;


@@ -37,7 +33,6 @@ public class IotMessageListener implements MessageListener {
// task to run goes here // task to run goes here
log.warn("约1分钟处理 {} 个请求",count); log.warn("约1分钟处理 {} 个请求",count);
count.getAndSet(0); count.getAndSet(0);
//System.out.println("Hello, stranger");
} }
}; };


@@ -48,7 +43,7 @@ public class IotMessageListener implements MessageListener {
// 第二个参数为首次执行的延时时间,第三个参数为定时执行的间隔时间 // 第二个参数为首次执行的延时时间,第三个参数为定时执行的间隔时间
// 10:秒 5:秒 // 10:秒 5:秒
// 第一次执行的时间为10秒,然后每隔五秒执行一次 // 第一次执行的时间为10秒,然后每隔五秒执行一次
service.scheduleAtFixedRate(runnable, 60, 60, TimeUnit.SECONDS);
service.scheduleAtFixedRate(runnable, 120, 60, TimeUnit.SECONDS);
} }


private ExecutorService executorService; private ExecutorService executorService;
@@ -80,10 +75,10 @@ public class IotMessageListener implements MessageListener {
String content = new String(body); String content = new String(body);
String topic = message.getStringProperty("topic"); String topic = message.getStringProperty("topic");
String messageId = message.getStringProperty("messageId"); String messageId = message.getStringProperty("messageId");
log.info("receive message"
+ ", topic = " + topic
+ ", messageId = " + messageId
+ ", content = " + content);
// log.info("receive message"
// + ", topic = " + topic
// + ", messageId = " + messageId
// + ", content = " + content);
count.getAndIncrement(); count.getAndIncrement();


} catch (Exception e) { } catch (Exception e) {


+ 5
- 5
src/main/java/com/telpo/iotgateway/server/IotSubscribe.java View File

@@ -104,20 +104,20 @@ public class IotSubscribe {
Destination queue = (Destination)context.lookup("QUEUE"); Destination queue = (Destination)context.lookup("QUEUE");


// 创建连接。 // 创建连接。
log.warn("Befor cf.createConnection");
//log.warn("Befor cf.createConnection");
Connection connection = cf.createConnection(userInfo, password); Connection connection = cf.createConnection(userInfo, password);
log.warn("Befor connection.addConnectionListener");
//log.warn("Befor connection.addConnectionListener");
((JmsConnection) connection).addConnectionListener(myJmsConnectionListener); ((JmsConnection) connection).addConnectionListener(myJmsConnectionListener);
// 创建会话。 // 创建会话。
// Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()。 // Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()。
// Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)。 // Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)。
log.warn("Befor connection.createSession");
// log.warn("Befor connection.createSession");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
log.debug("Befor connection.start");
// log.debug("Befor connection.start");
connection.start(); connection.start();


// 创建Receiver连接。消费消息 // 创建Receiver连接。消费消息
log.warn("Befor session.createConsumer");
// log.warn("Befor session.createConsumer");
MessageConsumer consumer = session.createConsumer(queue); MessageConsumer consumer = session.createConsumer(queue);


// 设置消息侦听 // 设置消息侦听


Loading…
Cancel
Save