From 052409605a73d594c0f86b31ab7a717ea0db3480 Mon Sep 17 00:00:00 2001 From: linwl <304115325@qq.com> Date: Fri, 16 Apr 2021 14:25:21 +0800 Subject: [PATCH] =?UTF-8?q?IOT=E8=B0=83=E7=94=A8=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iotgateway/IotGatewayApplication.java | 2 - .../listener/IotJmsConnectionListener.java | 66 ------- .../listener/IotMessageListener.java | 90 ---------- .../telpo/iotgateway/server/IotSubscribe.java | 165 ------------------ 4 files changed, 323 deletions(-) delete mode 100644 src/main/java/com/telpo/iotgateway/listener/IotJmsConnectionListener.java delete mode 100644 src/main/java/com/telpo/iotgateway/listener/IotMessageListener.java delete mode 100644 src/main/java/com/telpo/iotgateway/server/IotSubscribe.java diff --git a/src/main/java/com/telpo/iotgateway/IotGatewayApplication.java b/src/main/java/com/telpo/iotgateway/IotGatewayApplication.java index e29901f..248d0d3 100644 --- a/src/main/java/com/telpo/iotgateway/IotGatewayApplication.java +++ b/src/main/java/com/telpo/iotgateway/IotGatewayApplication.java @@ -1,9 +1,7 @@ package com.telpo.iotgateway; import com.telpo.iotgateway.server.IotCommunication; -import com.telpo.iotgateway.server.IotSubscribe; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.ConfigurationPropertiesScan; diff --git a/src/main/java/com/telpo/iotgateway/listener/IotJmsConnectionListener.java b/src/main/java/com/telpo/iotgateway/listener/IotJmsConnectionListener.java deleted file mode 100644 index 57b2f0c..0000000 --- a/src/main/java/com/telpo/iotgateway/listener/IotJmsConnectionListener.java +++ /dev/null @@ -1,66 +0,0 @@ -package com.telpo.iotgateway.listener; - -import lombok.extern.slf4j.Slf4j; -import org.apache.qpid.jms.JmsConnectionListener; -import org.apache.qpid.jms.message.JmsInboundMessageDispatch; -import org.springframework.stereotype.Component; - -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import java.net.URI; -import java.time.LocalDateTime; - -/** - * @program: iotgateway - * @description: IotJms连接侦听 - * @author: linwl - * @create: 2021-02-24 11:22 - **/ -@Slf4j -@Component -public class IotJmsConnectionListener implements JmsConnectionListener { - /** - * 连接成功建立。 - */ - @Override - public void onConnectionEstablished(URI remoteURI) { - log.info("onConnectionEstablished, remoteUri:{0}, time:{1}", remoteURI, LocalDateTime.now()); - } - - /** - * 尝试过最大重试次数之后,最终连接失败。 - */ - @Override - public void onConnectionFailure(Throwable error) { - log.error("onConnectionFailure, {}", error.getMessage()); - } - - /** - * 连接中断。 - */ - @Override - public void onConnectionInterrupted(URI remoteURI) { - log.info("onConnectionInterrupted, remoteUri:{0}, time:{1}", remoteURI, LocalDateTime.now()); - } - - /** - * 连接中断后又自动重连上。 - */ - @Override - public void onConnectionRestored(URI remoteURI) { - log.info("onConnectionRestored, remoteUri:{0}, time:{1}", remoteURI, LocalDateTime.now()); - } - - @Override - public void onInboundMessage(JmsInboundMessageDispatch envelope) {} - - @Override - public void onSessionClosed(Session session, Throwable cause) {} - - @Override - public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {} - - @Override - public void onProducerClosed(MessageProducer producer, Throwable cause) {} -} diff --git a/src/main/java/com/telpo/iotgateway/listener/IotMessageListener.java b/src/main/java/com/telpo/iotgateway/listener/IotMessageListener.java deleted file mode 100644 index 9eabc5d..0000000 --- a/src/main/java/com/telpo/iotgateway/listener/IotMessageListener.java +++ /dev/null @@ -1,90 +0,0 @@ -package com.telpo.iotgateway.listener; - -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.concurrent.BasicThreadFactory; -import org.springframework.stereotype.Component; - -import javax.jms.Message; -import javax.jms.MessageListener; -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * @program: iotgateway - * @description: Iot消息侦听 - * @author: linwl - * @create: 2021-02-24 11:39 - **/ -@Slf4j -@Component -public class IotMessageListener implements MessageListener { - - private volatile AtomicInteger count = new AtomicInteger(0); - /** - * Runnable:实现了Runnable接口,jdk就知道这个类是一个线程 - */ - private Runnable runnable; - - public IotMessageListener() { - - } - - private ExecutorService executorService; - public void setExecutorService(ExecutorService executorService) { - - this.executorService = executorService; - this.runnable = new Runnable() { - //创建 run 方法 - @Override - public void run() { - // task to run goes here - log.warn("约1分钟处理 {} 个请求",count); - count.getAndSet(0); - } - }; - - // ScheduledExecutorService:是从Java SE5的java.util.concurrent里, - // 做为并发工具类被引进的,这是最理想的定时任务实现方式。 - ScheduledExecutorService service = new ScheduledThreadPoolExecutor(1, - new BasicThreadFactory.Builder().namingPattern("count-schedule-pool-%d").daemon(true).build()); - // 第二个参数为首次执行的延时时间,第三个参数为定时执行的间隔时间 - // 10:秒 5:秒 - // 第一次执行的时间为10秒,然后每隔五秒执行一次 - service.scheduleAtFixedRate(runnable, 90, 60, TimeUnit.SECONDS); - } - - @Override - public void onMessage(Message message) { - try { - //1.收到消息之后一定要ACK。 - // 推荐做法:创建Session选择Session.AUTO_ACKNOWLEDGE,这里会自动ACK。 - // 其他做法:创建Session选择Session.CLIENT_ACKNOWLEDGE,这里一定要调message.acknowledge()来ACK。 - // message.acknowledge(); - //2.异步触发回调,处理收到的消息,确保onMessage函数里没有耗时逻辑。 - // 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。 - executorService.submit(() -> processMessage(message)); - } catch (Exception e) { - log.error("submit task occurs exception ", e); - } - } - - /** - * 在这里处理您收到消息后的具体业务逻辑。 - */ - private void processMessage(Message message) { - try { - byte[] body = message.getBody(byte[].class); - String content = new String(body); - String topic = message.getStringProperty("topic"); - String messageId = message.getStringProperty("messageId"); - log.info("receive message" - + ", topic = " + topic - + ", messageId = " + messageId - + ", content = " + content); - count.getAndIncrement(); - - } catch (Exception e) { - log.error("processMessage occurs error ", e); - } - } -} diff --git a/src/main/java/com/telpo/iotgateway/server/IotSubscribe.java b/src/main/java/com/telpo/iotgateway/server/IotSubscribe.java deleted file mode 100644 index 691fd6e..0000000 --- a/src/main/java/com/telpo/iotgateway/server/IotSubscribe.java +++ /dev/null @@ -1,165 +0,0 @@ -package com.telpo.iotgateway.server; - -import com.telpo.iotgateway.listener.IotJmsConnectionListener; -import com.telpo.iotgateway.listener.IotMessageListener; -import lombok.extern.slf4j.Slf4j; -import org.apache.qpid.jms.JmsConnection; -import org.apache.qpid.jms.JmsConnectionListener; -import org.springframework.core.env.ConfigurableEnvironment; -import org.springframework.stereotype.Component; -import org.apache.commons.codec.binary.Base64; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -import javax.jms.*; - -import javax.crypto.Mac; -import javax.crypto.spec.SecretKeySpec; -import javax.naming.Context; -import javax.naming.InitialContext; -import javax.naming.NamingException; -import java.util.Hashtable; - -/** - * @program: DipperPositionServer - * @description: 北斗定位 - * @author: king - * @create: 2021-01-13 14:01 - */ -@Slf4j -@Component -public class IotSubscribe { - - //参数说明,请参见AMQP客户端接入说明文档。 - private String accessKey; - private String accessSecret; - private String consumerGroupId; - //iotInstanceId:企业版实例请填写实例ID,公共实例请填空字符串""。 - private String iotInstanceId; - private long timeStamp; - //签名方法:支持hmacmd5、hmacsha1和hmacsha256。 - private String signMethod; - private String uId; - private String regionId; - private String productKey; - private String clientId; - private String iotHost; - private String iotPort; - - // 执行单元线程池 - private final static ExecutorService executorService = new ThreadPoolExecutor( - Runtime.getRuntime().availableProcessors() - 2, - (Runtime.getRuntime().availableProcessors() -2)* 2, 60, TimeUnit.SECONDS, - new LinkedBlockingQueue<>(50000)); - // Jms连接侦听 - private static JmsConnectionListener myJmsConnectionListener = new IotJmsConnectionListener(); - // 消息侦听 - private static MessageListener messageListener = new IotMessageListener(); - - public IotSubscribe(ConfigurableEnvironment environment) { - //参数说明,请参见AMQP客户端接入说明文档。 - this.accessKey = environment.getProperty("iot.accessKey"); - this.accessSecret = environment.getProperty("iot.accessSecret"); - this.consumerGroupId = environment.getProperty("iot.consumerGroupId"); - //iotInstanceId:企业版实例请填写实例ID,公共实例请填空字符串""。 - this.iotInstanceId = environment.getProperty("iot.iotInstanceId"); - //签名方法:支持hmacmd5、hmacsha1和hmacsha256。 - this.signMethod = environment.getProperty("iot.signMethod"); - this.uId = environment.getProperty("iot.uId"); - this.regionId = environment.getProperty("iot.regionId"); - this.productKey = environment.getProperty("iot.productKey"); - this.clientId = environment.getProperty("iot.clientId"); - this.iotHost = environment.getProperty("iot.iotHost"); - this.iotPort = environment.getProperty("iot.iotPort"); - - timeStamp = System.currentTimeMillis(); - } - - /* - * 同步进程线程 - */ - public void start() { - - try { - //计算签名,password组装方法,请参见AMQP客户端接入说明文档。 - String userInfo = getUserInfo(); - String signContent = "authId=" + accessKey + "×tamp=" + timeStamp;; - String password = doSign(signContent,accessSecret, signMethod); - - //接入域名,请参见AMQP客户端接入说明文档。 - String host = uId + ".iot-amqp." + regionId + ".aliyuncs.com"; - String connectionUrl = "failover:(amqps://" + host + ":" + iotPort - + "?amqp.idleTimeout=80000)" - + "?failover.reconnectDelay=30"; - - Hashtable hashtable = new Hashtable<>(); - hashtable.put("connectionfactory.SBCF",connectionUrl); - hashtable.put("queue.QUEUE", "default"); - hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory"); - Context context = new InitialContext(hashtable); - ConnectionFactory cf = (ConnectionFactory)context.lookup("SBCF"); - Destination queue = (Destination)context.lookup("QUEUE"); - - // 创建连接。 - //log.warn("Befor cf.createConnection"); - Connection connection = cf.createConnection(userInfo, password); - //log.warn("Befor connection.addConnectionListener"); - ((JmsConnection) connection).addConnectionListener(myJmsConnectionListener); - // 创建会话。 - // Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()。 - // Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)。 - // log.warn("Befor connection.createSession"); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - // log.debug("Befor connection.start"); - connection.start(); - - // 创建Receiver连接。消费消息 - // log.warn("Befor session.createConsumer"); - MessageConsumer consumer = session.createConsumer(queue); - - // 设置消息侦听 - ((IotMessageListener)messageListener).setExecutorService(this.executorService); - consumer.setMessageListener(messageListener); - - } catch (IllegalArgumentException e) { - log.error("IllegalArgumentException:{}", e.getMessage()); - } catch (NamingException e) { - log.error("NamingException:{}", e.getMessage()); - } catch (JMSException e) { - log.error("JMSException:{}", e.getMessage()); - } catch (Exception e) { - log.error("Exception:{}", e.getMessage()); - } - } - - /* - * userInfo组装 - */ - private String getUserInfo() { - //userInfo组装 - String userInfo = clientId + "|authMode=aksign" - + ",signMethod=" + signMethod - + ",timestamp=" + timeStamp - + ",authId=" + accessKey - + ",iotInstanceId=" + iotInstanceId - + ",consumerGroupId=" + consumerGroupId - + "|"; - //计算签名,password组装方法,请参见AMQP客户端接入说明文档。 - return userInfo; - } - - /** - * 计算签名,password组装方法,请参见AMQP客户端接入说明文档。 - */ - private static String doSign(String toSignString, String secret, String signMethod) throws Exception { - SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod); - Mac mac = Mac.getInstance(signMethod); - mac.init(signingKey); - byte[] rawHmac = mac.doFinal(toSignString.getBytes()); - return Base64.encodeBase64String(rawHmac); - } - -}