您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符

159 行
6.5KB

  1. package com.telpo.iotgateway.server;
  2. import com.telpo.iotgateway.listener.IotJmsConnectionListener;
  3. import com.telpo.iotgateway.listener.IotMessageListener;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.apache.qpid.jms.JmsConnection;
  6. import org.apache.qpid.jms.JmsConnectionListener;
  7. import org.springframework.core.env.ConfigurableEnvironment;
  8. import org.springframework.stereotype.Component;
  9. import org.apache.commons.codec.binary.Base64;
  10. import java.util.concurrent.ExecutorService;
  11. import java.util.concurrent.LinkedBlockingQueue;
  12. import java.util.concurrent.ThreadPoolExecutor;
  13. import java.util.concurrent.TimeUnit;
  14. import javax.jms.*;
  15. import javax.crypto.Mac;
  16. import javax.crypto.spec.SecretKeySpec;
  17. import javax.naming.Context;
  18. import javax.naming.InitialContext;
  19. import javax.naming.NamingException;
  20. import java.util.Hashtable;
  21. /**
  22. * @program: DipperPositionServer
  23. * @description: 北斗定位
  24. * @author: king
  25. * @create: 2021-01-13 14:01
  26. */
  27. @Slf4j
  28. @Component
  29. public class IotSubscribe {
  30. //参数说明,请参见AMQP客户端接入说明文档。
  31. private String accessKey;
  32. private String accessSecret;
  33. private String consumerGroupId;
  34. //iotInstanceId:企业版实例请填写实例ID,公共实例请填空字符串""。
  35. private String iotInstanceId;
  36. private long timeStamp;
  37. //签名方法:支持hmacmd5、hmacsha1和hmacsha256。
  38. private String signMethod;
  39. private String clientId;
  40. private String iotHost;
  41. private String iotPort;
  42. // 执行单元线程池
  43. private final static ExecutorService executorService = new ThreadPoolExecutor(
  44. Runtime.getRuntime().availableProcessors(),
  45. Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,
  46. new LinkedBlockingQueue<>(50000));
  47. // Jms连接侦听
  48. private static JmsConnectionListener myJmsConnectionListener = new IotJmsConnectionListener();
  49. // 消息侦听
  50. private static MessageListener messageListener = new IotMessageListener();
  51. public IotSubscribe(ConfigurableEnvironment environment) {
  52. //参数说明,请参见AMQP客户端接入说明文档。
  53. this.accessKey = environment.getProperty("iot.accessKey");
  54. this.accessSecret = environment.getProperty("iot.accessSecret");
  55. this.consumerGroupId = environment.getProperty("iot.consumerGroupId");
  56. //iotInstanceId:企业版实例请填写实例ID,公共实例请填空字符串""。
  57. this.iotInstanceId = environment.getProperty("iot.iotInstanceId");
  58. //签名方法:支持hmacmd5、hmacsha1和hmacsha256。
  59. this.signMethod = environment.getProperty("iot.signMethod");
  60. this.clientId = environment.getProperty("iot.clientId");
  61. this.iotHost = environment.getProperty("iot.iotHost");
  62. this.iotPort = environment.getProperty("iot.iotPort");
  63. timeStamp = System.currentTimeMillis();
  64. }
  65. /*
  66. * 同步进程线程
  67. */
  68. public void start() {
  69. try {
  70. //计算签名,password组装方法,请参见AMQP客户端接入说明文档。
  71. String userInfo = getUserInfo();
  72. String signContent = "authId=" + accessKey + "&timestamp=" + timeStamp;;
  73. String password = doSign(signContent,accessSecret, signMethod);
  74. //接入域名,请参见AMQP客户端接入说明文档。
  75. String connectionUrl = "failover:(amqps://" + iotHost + ":" + iotPort
  76. + "?amqp.idleTimeout=80000)"
  77. + "?failover.reconnectDelay=30";
  78. Hashtable<String, String> hashtable = new Hashtable<>();
  79. hashtable.put("connectionfactory.SBCF",connectionUrl);
  80. hashtable.put("queue.QUEUE", "default");
  81. hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
  82. Context context = new InitialContext(hashtable);
  83. ConnectionFactory cf = (ConnectionFactory)context.lookup("SBCF");
  84. Destination queue = (Destination)context.lookup("QUEUE");
  85. // 创建连接。
  86. log.warn("Befor cf.createConnection");
  87. Connection connection = cf.createConnection(userInfo, password);
  88. log.warn("Befor connection.addConnectionListener");
  89. ((JmsConnection) connection).addConnectionListener(myJmsConnectionListener);
  90. // 创建会话。
  91. // Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()。
  92. // Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)。
  93. log.warn("Befor connection.createSession");
  94. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  95. log.warn("Befor connection.start");
  96. connection.start();
  97. // 创建Receiver连接。消费消息
  98. log.warn("Befor session.createConsumer");
  99. MessageConsumer consumer = session.createConsumer(queue);
  100. // 设置消息侦听
  101. ((IotMessageListener)messageListener).setExecutorService(this.executorService);
  102. consumer.setMessageListener(messageListener);
  103. } catch (IllegalArgumentException e) {
  104. log.error("IllegalArgumentException:{}", e.getMessage());
  105. } catch (NamingException e) {
  106. log.error("NamingException:{}", e.getMessage());
  107. } catch (JMSException e) {
  108. log.error("JMSException:{}", e.getMessage());
  109. } catch (Exception e) {
  110. log.error("Exception:{}", e.getMessage());
  111. }
  112. }
  113. /*
  114. * userInfo组装
  115. */
  116. private String getUserInfo() {
  117. //userInfo组装
  118. String userInfo = clientId + "|authMode=aksign"
  119. + ",signMethod=" + signMethod
  120. + ",timestamp=" + timeStamp
  121. + ",authId=" + accessKey
  122. + ",iotInstanceId=" + iotInstanceId
  123. + ",consumerGroupId=" + consumerGroupId
  124. + "|";
  125. //计算签名,password组装方法,请参见AMQP客户端接入说明文档。
  126. return userInfo;
  127. }
  128. /**
  129. * 计算签名,password组装方法,请参见AMQP客户端接入说明文档。
  130. */
  131. private static String doSign(String toSignString, String secret, String signMethod) throws Exception {
  132. SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);
  133. Mac mac = Mac.getInstance(signMethod);
  134. mac.init(signingKey);
  135. byte[] rawHmac = mac.doFinal(toSignString.getBytes());
  136. return Base64.encodeBase64String(rawHmac);
  137. }
  138. }