林万龙 пре 3 година
родитељ
комит
052409605a
4 измењених фајлова са 0 додато и 323 уклоњено
  1. +0
    -2
      src/main/java/com/telpo/iotgateway/IotGatewayApplication.java
  2. +0
    -66
      src/main/java/com/telpo/iotgateway/listener/IotJmsConnectionListener.java
  3. +0
    -90
      src/main/java/com/telpo/iotgateway/listener/IotMessageListener.java
  4. +0
    -165
      src/main/java/com/telpo/iotgateway/server/IotSubscribe.java

+ 0
- 2
src/main/java/com/telpo/iotgateway/IotGatewayApplication.java Прегледај датотеку

@@ -1,9 +1,7 @@
package com.telpo.iotgateway;

import com.telpo.iotgateway.server.IotCommunication;
import com.telpo.iotgateway.server.IotSubscribe;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.ConfigurationPropertiesScan;


+ 0
- 66
src/main/java/com/telpo/iotgateway/listener/IotJmsConnectionListener.java Прегледај датотеку

@@ -1,66 +0,0 @@
package com.telpo.iotgateway.listener;

import lombok.extern.slf4j.Slf4j;
import org.apache.qpid.jms.JmsConnectionListener;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.springframework.stereotype.Component;

import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import java.net.URI;
import java.time.LocalDateTime;

/**
* @program: iotgateway
* @description: IotJms连接侦听
* @author: linwl
* @create: 2021-02-24 11:22
**/
@Slf4j
@Component
public class IotJmsConnectionListener implements JmsConnectionListener {
/**
* 连接成功建立。
*/
@Override
public void onConnectionEstablished(URI remoteURI) {
log.info("onConnectionEstablished, remoteUri:{0}, time:{1}", remoteURI, LocalDateTime.now());
}

/**
* 尝试过最大重试次数之后,最终连接失败。
*/
@Override
public void onConnectionFailure(Throwable error) {
log.error("onConnectionFailure, {}", error.getMessage());
}

/**
* 连接中断。
*/
@Override
public void onConnectionInterrupted(URI remoteURI) {
log.info("onConnectionInterrupted, remoteUri:{0}, time:{1}", remoteURI, LocalDateTime.now());
}

/**
* 连接中断后又自动重连上。
*/
@Override
public void onConnectionRestored(URI remoteURI) {
log.info("onConnectionRestored, remoteUri:{0}, time:{1}", remoteURI, LocalDateTime.now());
}

@Override
public void onInboundMessage(JmsInboundMessageDispatch envelope) {}

@Override
public void onSessionClosed(Session session, Throwable cause) {}

@Override
public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {}

@Override
public void onProducerClosed(MessageProducer producer, Throwable cause) {}
}

+ 0
- 90
src/main/java/com/telpo/iotgateway/listener/IotMessageListener.java Прегледај датотеку

@@ -1,90 +0,0 @@
package com.telpo.iotgateway.listener;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.springframework.stereotype.Component;

import javax.jms.Message;
import javax.jms.MessageListener;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
* @program: iotgateway
* @description: Iot消息侦听
* @author: linwl
* @create: 2021-02-24 11:39
**/
@Slf4j
@Component
public class IotMessageListener implements MessageListener {

private volatile AtomicInteger count = new AtomicInteger(0);
/**
* Runnable:实现了Runnable接口,jdk就知道这个类是一个线程
*/
private Runnable runnable;

public IotMessageListener() {

}

private ExecutorService executorService;
public void setExecutorService(ExecutorService executorService) {

this.executorService = executorService;
this.runnable = new Runnable() {
//创建 run 方法
@Override
public void run() {
// task to run goes here
log.warn("约1分钟处理 {} 个请求",count);
count.getAndSet(0);
}
};

// ScheduledExecutorService:是从Java SE5的java.util.concurrent里,
// 做为并发工具类被引进的,这是最理想的定时任务实现方式。
ScheduledExecutorService service = new ScheduledThreadPoolExecutor(1,
new BasicThreadFactory.Builder().namingPattern("count-schedule-pool-%d").daemon(true).build());
// 第二个参数为首次执行的延时时间,第三个参数为定时执行的间隔时间
// 10:秒 5:秒
// 第一次执行的时间为10秒,然后每隔五秒执行一次
service.scheduleAtFixedRate(runnable, 90, 60, TimeUnit.SECONDS);
}

@Override
public void onMessage(Message message) {
try {
//1.收到消息之后一定要ACK。
// 推荐做法:创建Session选择Session.AUTO_ACKNOWLEDGE,这里会自动ACK。
// 其他做法:创建Session选择Session.CLIENT_ACKNOWLEDGE,这里一定要调message.acknowledge()来ACK。
// message.acknowledge();
//2.异步触发回调,处理收到的消息,确保onMessage函数里没有耗时逻辑。
// 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。
executorService.submit(() -> processMessage(message));
} catch (Exception e) {
log.error("submit task occurs exception ", e);
}
}

/**
* 在这里处理您收到消息后的具体业务逻辑。
*/
private void processMessage(Message message) {
try {
byte[] body = message.getBody(byte[].class);
String content = new String(body);
String topic = message.getStringProperty("topic");
String messageId = message.getStringProperty("messageId");
log.info("receive message"
+ ", topic = " + topic
+ ", messageId = " + messageId
+ ", content = " + content);
count.getAndIncrement();

} catch (Exception e) {
log.error("processMessage occurs error ", e);
}
}
}

+ 0
- 165
src/main/java/com/telpo/iotgateway/server/IotSubscribe.java Прегледај датотеку

@@ -1,165 +0,0 @@
package com.telpo.iotgateway.server;

import com.telpo.iotgateway.listener.IotJmsConnectionListener;
import com.telpo.iotgateway.listener.IotMessageListener;
import lombok.extern.slf4j.Slf4j;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionListener;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.stereotype.Component;
import org.apache.commons.codec.binary.Base64;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import javax.jms.*;

import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Hashtable;

/**
* @program: DipperPositionServer
* @description: 北斗定位
* @author: king
* @create: 2021-01-13 14:01
*/
@Slf4j
@Component
public class IotSubscribe {

//参数说明,请参见AMQP客户端接入说明文档。
private String accessKey;
private String accessSecret;
private String consumerGroupId;
//iotInstanceId:企业版实例请填写实例ID,公共实例请填空字符串""。
private String iotInstanceId;
private long timeStamp;
//签名方法:支持hmacmd5、hmacsha1和hmacsha256。
private String signMethod;
private String uId;
private String regionId;
private String productKey;
private String clientId;
private String iotHost;
private String iotPort;

// 执行单元线程池
private final static ExecutorService executorService = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors() - 2,
(Runtime.getRuntime().availableProcessors() -2)* 2, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(50000));
// Jms连接侦听
private static JmsConnectionListener myJmsConnectionListener = new IotJmsConnectionListener();
// 消息侦听
private static MessageListener messageListener = new IotMessageListener();

public IotSubscribe(ConfigurableEnvironment environment) {
//参数说明,请参见AMQP客户端接入说明文档。
this.accessKey = environment.getProperty("iot.accessKey");
this.accessSecret = environment.getProperty("iot.accessSecret");
this.consumerGroupId = environment.getProperty("iot.consumerGroupId");
//iotInstanceId:企业版实例请填写实例ID,公共实例请填空字符串""。
this.iotInstanceId = environment.getProperty("iot.iotInstanceId");
//签名方法:支持hmacmd5、hmacsha1和hmacsha256。
this.signMethod = environment.getProperty("iot.signMethod");
this.uId = environment.getProperty("iot.uId");
this.regionId = environment.getProperty("iot.regionId");
this.productKey = environment.getProperty("iot.productKey");
this.clientId = environment.getProperty("iot.clientId");
this.iotHost = environment.getProperty("iot.iotHost");
this.iotPort = environment.getProperty("iot.iotPort");

timeStamp = System.currentTimeMillis();
}

/*
* 同步进程线程
*/
public void start() {

try {
//计算签名,password组装方法,请参见AMQP客户端接入说明文档。
String userInfo = getUserInfo();
String signContent = "authId=" + accessKey + "&timestamp=" + timeStamp;;
String password = doSign(signContent,accessSecret, signMethod);

//接入域名,请参见AMQP客户端接入说明文档。
String host = uId + ".iot-amqp." + regionId + ".aliyuncs.com";
String connectionUrl = "failover:(amqps://" + host + ":" + iotPort
+ "?amqp.idleTimeout=80000)"
+ "?failover.reconnectDelay=30";

Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF",connectionUrl);
hashtable.put("queue.QUEUE", "default");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);
ConnectionFactory cf = (ConnectionFactory)context.lookup("SBCF");
Destination queue = (Destination)context.lookup("QUEUE");

// 创建连接。
//log.warn("Befor cf.createConnection");
Connection connection = cf.createConnection(userInfo, password);
//log.warn("Befor connection.addConnectionListener");
((JmsConnection) connection).addConnectionListener(myJmsConnectionListener);
// 创建会话。
// Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()。
// Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)。
// log.warn("Befor connection.createSession");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// log.debug("Befor connection.start");
connection.start();

// 创建Receiver连接。消费消息
// log.warn("Befor session.createConsumer");
MessageConsumer consumer = session.createConsumer(queue);

// 设置消息侦听
((IotMessageListener)messageListener).setExecutorService(this.executorService);
consumer.setMessageListener(messageListener);

} catch (IllegalArgumentException e) {
log.error("IllegalArgumentException:{}", e.getMessage());
} catch (NamingException e) {
log.error("NamingException:{}", e.getMessage());
} catch (JMSException e) {
log.error("JMSException:{}", e.getMessage());
} catch (Exception e) {
log.error("Exception:{}", e.getMessage());
}
}

/*
* userInfo组装
*/
private String getUserInfo() {
//userInfo组装
String userInfo = clientId + "|authMode=aksign"
+ ",signMethod=" + signMethod
+ ",timestamp=" + timeStamp
+ ",authId=" + accessKey
+ ",iotInstanceId=" + iotInstanceId
+ ",consumerGroupId=" + consumerGroupId
+ "|";
//计算签名,password组装方法,请参见AMQP客户端接入说明文档。
return userInfo;
}

/**
* 计算签名,password组装方法,请参见AMQP客户端接入说明文档。
*/
private static String doSign(String toSignString, String secret, String signMethod) throws Exception {
SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);
Mac mac = Mac.getInstance(signMethod);
mac.init(signingKey);
byte[] rawHmac = mac.doFinal(toSignString.getBytes());
return Base64.encodeBase64String(rawHmac);
}

}

Loading…
Откажи
Сачувај