From 0a77abedf7e02e7ec1bfb4447d5f835286aa265e Mon Sep 17 00:00:00 2001 From: linwl <304115325@qq.com> Date: Wed, 24 Feb 2021 16:20:30 +0800 Subject: [PATCH] =?UTF-8?q?IOTGateway=E4=BB=A3=E7=A0=81=E4=BB=93=E5=BA=93?= =?UTF-8?q?=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Dockerfile | 10 + pom.xml | 234 ++++++++++++++++++ push_service_run.sh | 39 +++ setup_dev.sh | 26 ++ setup_pro.sh | 20 ++ setup_test.sh | 26 ++ .../iotgateway/IotGatewayApplication.java | 37 +++ .../iotgateway/config/JacksonConfig.java | 61 +++++ .../config/SchedulingExecutorConfig.java | 40 +++ .../com/telpo/iotgateway/dto/BaseDto.java | 23 ++ .../exception/GobalExceptionHandler.java | 30 +++ .../listener/IotJmsConnectionListener.java | 66 +++++ .../listener/IotMessageListener.java | 75 ++++++ .../telpo/iotgateway/server/IotSubscribe.java | 154 ++++++++++++ src/main/resources/bootstrap-dev.yaml | 10 + src/main/resources/bootstrap-pro.yaml | 10 + src/main/resources/bootstrap-test.yaml | 48 ++++ src/main/resources/bootstrap.yaml | 10 + src/main/resources/log/logback-spring.xml | 197 +++++++++++++++ 19 files changed, 1116 insertions(+) create mode 100644 Dockerfile create mode 100644 pom.xml create mode 100644 push_service_run.sh create mode 100644 setup_dev.sh create mode 100644 setup_pro.sh create mode 100644 setup_test.sh create mode 100644 src/main/java/com/telpo/iotgateway/IotGatewayApplication.java create mode 100644 src/main/java/com/telpo/iotgateway/config/JacksonConfig.java create mode 100644 src/main/java/com/telpo/iotgateway/config/SchedulingExecutorConfig.java create mode 100644 src/main/java/com/telpo/iotgateway/dto/BaseDto.java create mode 100644 src/main/java/com/telpo/iotgateway/exception/GobalExceptionHandler.java create mode 100644 src/main/java/com/telpo/iotgateway/listener/IotJmsConnectionListener.java create mode 100644 src/main/java/com/telpo/iotgateway/listener/IotMessageListener.java create mode 100644 src/main/java/com/telpo/iotgateway/server/IotSubscribe.java create mode 100644 src/main/resources/bootstrap-dev.yaml create mode 100644 src/main/resources/bootstrap-pro.yaml create mode 100644 src/main/resources/bootstrap-test.yaml create mode 100644 src/main/resources/bootstrap.yaml create mode 100644 src/main/resources/log/logback-spring.xml diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..6cf8ada --- /dev/null +++ b/Dockerfile @@ -0,0 +1,10 @@ +FROM java:8 +MAINTAINER king <1609724385@qq.com> +VOLUME /tmp +COPY /target/iotgatewayservice.jar iotgatewayservice.jar +COPY --from=hengyunabc/arthas:latest /opt/arthas /opt/arthas +ENV TimeZone=Asia/Shanghai +ENV active=dev +ENV JAVA_OPTS="-Xmx512M -Xms512M" +RUN ln -snf /usr/share/zoneinfo/$TimeZone /etc/localtime && echo $TimeZone > /etc/timezone +ENTRYPOINT java $JAVA_OPTS -Djava.security.egd=file:/dev/./urandom -jar /iotgatewayservice.jar --spring.profiles.active=$active \ No newline at end of file diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..43f7a2d --- /dev/null +++ b/pom.xml @@ -0,0 +1,234 @@ + + + 4.0.0 + com.telpo + iotgatewayservice + 1.0-SNAPSHOT + data-iot-gateway-server + 数据推送服务 + jar + + UTF-8 + UTF-8 + 1.8 + UTF-8 + 2.2.0.RELEASE + Hoxton.RELEASE + true + + + + org.springframework.boot + spring-boot-starter-parent + 2.2.5.RELEASE + + + + + + org.springframework.boot + spring-boot-devtools + runtime + true + + + + org.projectlombok + lombok + true + + + + + com.telpo + common + 1.1.19 + + + + + org.apache.qpid + qpid-jms-client + 0.47.0 + + + + commons-codec + commons-codec + 1.10 + + + + + com.alibaba.cloud + spring-cloud-starter-alibaba-nacos-discovery + + + + com.alibaba.cloud + spring-cloud-starter-alibaba-nacos-config + + + + + org.springframework.boot + spring-boot-starter-web + + + + + + mysql + mysql-connector-java + runtime + + + + + + com.baomidou + mybatis-plus + 3.3.1 + + + + + org.mybatis.spring.boot + mybatis-spring-boot-autoconfigure + 2.1.2 + + + + + org.mybatis.spring.boot + mybatis-spring-boot-starter + 2.1.2 + + + + + + com.alibaba + druid-spring-boot-starter + 1.1.22 + + + + + org.springframework.boot + spring-boot-starter-test + test + + + org.junit.vintage + junit-vintage-engine + + + + + + org.springframework.boot + spring-boot-configuration-processor + true + + + + + org.apache.commons + commons-lang3 + 3.10 + + + + + org.springframework.boot + spring-boot-starter-aop + + + + + com.fasterxml.jackson.datatype + jackson-datatype-jsr310 + + + + + org.springframework.boot + spring-boot-starter-data-redis + + + + + org.apache.commons + commons-pool2 + + + + + org.springframework.kafka + spring-kafka + + + + + com.squareup.okhttp3 + okhttp + 4.8.0 + + + + + de.codecentric + spring-boot-admin-starter-client + 2.2.4 + + + + + + + + + + org.springframework.cloud + spring-cloud-dependencies + ${spring-cloud.version} + pom + import + + + + com.alibaba.cloud + spring-cloud-alibaba-dependencies + ${spring-cloud-alibaba.version} + pom + import + + + + + + ${project.artifactId} + + + org.springframework.boot + spring-boot-maven-plugin + + + true + + + + + repackage + + + + + + + + + \ No newline at end of file diff --git a/push_service_run.sh b/push_service_run.sh new file mode 100644 index 0000000..61e3d63 --- /dev/null +++ b/push_service_run.sh @@ -0,0 +1,39 @@ +#!/bin/bash +environment=$1 +version=$2 +echo "环境变量为$environment,版本为$version!" +if [[ $environment == 'pro' ]]; then + echo "开始远程构建容器" + docker stop iot_gateway_service || true + docker rm iot_gateway_service || true + docker rmi -f $(docker images | grep registry.cn-shanghai.aliyuncs.com/telpo_platform/iot_gateway_service | awk '{print $3}') + docker login --username=rzl_wangjx@1111649216405698 --password=telpo.123 registry.cn-shanghai.aliyuncs.com + docker pull registry.cn-shanghai.aliyuncs.com/telpo_platform/iot_gateway_service:$version + docker run -v /home/data/iotgatewayserver/log:/var/log/iotgatewayserver -e active=pro --restart=always -d --network host --name iot_gateway_service registry.cn-shanghai.aliyuncs.com/telpo_platform/iot_gateway_service:$version + #删除产生的None镜像 + docker rmi -f $(docker images | grep none | awk '{print $3}') + docker ps -a +else + if [[ $environment == 'test' ]]; then + #echo "开始远程构建容器" + docker stop iot_gateway_service || true + docker rm iot_gateway_service || true + + docker rmi -f $(docker images | grep 139.224.254.18:5000/iot_gateway_service | awk '{print $3}') + docker pull 139.224.254.18:5000/iot_gateway_service:$version + docker run -v /home/data/iotgatewayserver/log:/var/log/iotgatewayserver -e active=test --restart=always -d --network host --name iot_gateway_service 139.224.254.18:5000/iot_gateway_service:$version + + #删除产生的None镜像 + docker rmi -f $(docker images | grep none | awk '{print $3}') + docker ps -a + else + docker stop iot_gateway_service || true + docker rm iot_gateway_service || true + docker rmi -f $(docker images | grep 139.224.254.18:5000/iot_gateway_service | awk '{print $3}') + docker pull 139.224.254.18:5000/iot_gateway_service:$version + docker run -v /home/data/iotgatewayserver/log:/var/log/iotgatewayserver -e active=dev --restart=always -d --network host --name iot_gateway_service telpo/iot_gateway_service:$version + #删除产生的None镜像 + docker rmi -f $(docker images | grep none | awk '{print $3}') + docker ps -a + fi +fi diff --git a/setup_dev.sh b/setup_dev.sh new file mode 100644 index 0000000..b084149 --- /dev/null +++ b/setup_dev.sh @@ -0,0 +1,26 @@ +#!/bin/bash +mvn clean +mvn package -Dmaven.test.skip=true +#image_version=$(date +%Y%m%d%H%M) +image_version=$version +docker stop iot_gateway_service || true +docker rm iot_gateway_service || true +# 删除镜像 +docker rmi -f $(docker images | grep telpo/iot_gateway_service | awk '{print $3}') + +docker build . -t telpo/push_service:$image_version + +#TODO:推送镜像到私有仓库 +echo '=================开始推送镜像=======================' +docker tag telpo/iot_gateway_service:$image_version 139.224.254.18:5000/iot_gateway_service:$image_version +docker push 139.224.254.18:5000/iot_gateway_service:$image_version +echo '=================推送镜像完成=======================' + +#删除产生的None镜像 +docker rmi -f $(docker images | grep none | awk '{print $3}') +# 查看镜像列表 +docker images +# 启动容器 +docker run -v /home/data/iotgatewayserver/log:/var/log/iotgatewayserver -d -e active=dev --network host --restart=always --name iot_gateway_service telpo/iot_gateway_service:$image_version +# 查看日志 +docker logs push_service diff --git a/setup_pro.sh b/setup_pro.sh new file mode 100644 index 0000000..1eea540 --- /dev/null +++ b/setup_pro.sh @@ -0,0 +1,20 @@ +#!/usr/bin/env bash +mvn clean +mvn package -Dmaven.test.skip=true +image_version=$version +# 删除镜像 +docker rmi -f $( + docker images | grep registry.cn-shanghai.aliyuncs.com/telpo_platform/iot_gateway_service | awk '{print $3}' +) +# 构建telpo/mrp:$image_version镜像 +docker build . -t telpo/iot_gateway_service:$image_version +#TODO:推送镜像到阿里仓库 +echo '=================开始推送镜像=======================' +docker login --username=rzl_wangjx@1111649216405698 --password=telpo.123 registry.cn-shanghai.aliyuncs.com +docker tag telpo/iot_gateway_service:$image_version registry.cn-shanghai.aliyuncs.com/telpo_platform/iot_gateway_service:$image_version +docker push registry.cn-shanghai.aliyuncs.com/telpo_platform/iot_gateway_service:$image_version +echo '=================推送镜像完成=======================' +#删除产生的None镜像 +docker rmi -f $(docker images | grep none | awk '{print $3}') +# 查看镜像列表 +docker images diff --git a/setup_test.sh b/setup_test.sh new file mode 100644 index 0000000..c63b808 --- /dev/null +++ b/setup_test.sh @@ -0,0 +1,26 @@ +#!/bin/bash +mvn clean +mvn package -Dmaven.test.skip=true +#image_version=$(date +%Y%m%d%H%M) +image_version=$version +docker stop iot_gateway_service || true +docker rm iot_gateway_service || true +# 删除镜像 +docker rmi -f $(docker images | grep telpo/iot_gateway_service | awk '{print $3}') + +docker build . -t telpo/iot_gateway_service:$image_version + +#TODO:推送镜像到私有仓库 +echo '=================开始推送镜像=======================' +docker tag telpo/iot_gateway_service:$image_version 139.224.254.18:5000/iot_gateway_service:$image_version +docker push 139.224.254.18:5000/iot_gateway_service:$image_version +echo '=================推送镜像完成=======================' + +#删除产生的None镜像 +docker rmi -f $(docker images | grep none | awk '{print $3}') +# 查看镜像列表 +docker images +# 启动容器 +docker run -v /home/data/iotgatewayserver/log:/var/log/iotgatewayserver -d -e active=test --network host --restart=always --name iot_gateway_service telpo/iot_gateway_service:$image_version +# 查看日志 +docker logs iot_gateway_service diff --git a/src/main/java/com/telpo/iotgateway/IotGatewayApplication.java b/src/main/java/com/telpo/iotgateway/IotGatewayApplication.java new file mode 100644 index 0000000..ef36f9b --- /dev/null +++ b/src/main/java/com/telpo/iotgateway/IotGatewayApplication.java @@ -0,0 +1,37 @@ +package com.telpo.iotgateway; + +import com.telpo.iotgateway.server.IotSubscribe; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.properties.ConfigurationPropertiesScan; +import org.springframework.cloud.client.discovery.EnableDiscoveryClient; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.core.env.ConfigurableEnvironment; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.annotation.EnableScheduling; + +/** + * @program: DataPushServer + * @description: 推送服务启动程序 + * @author: linwl + * @create: 2020-07-10 18:04 + */ +@SpringBootApplication +@EnableDiscoveryClient +@ConfigurationPropertiesScan +@EnableAsync +@EnableScheduling +@Slf4j +public class IotGatewayApplication { + + public static void main(String[] args) { + ConfigurableApplicationContext applicationContext = SpringApplication.run(IotGatewayApplication.class, args); + + ConfigurableEnvironment environment = applicationContext.getEnvironment(); + IotSubscribe iotSubscribe = new IotSubscribe(environment); + iotSubscribe.start(); + log.info("推送服务启动!"); + } +} diff --git a/src/main/java/com/telpo/iotgateway/config/JacksonConfig.java b/src/main/java/com/telpo/iotgateway/config/JacksonConfig.java new file mode 100644 index 0000000..e690110 --- /dev/null +++ b/src/main/java/com/telpo/iotgateway/config/JacksonConfig.java @@ -0,0 +1,61 @@ +package com.telpo.iotgateway.config; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateDeserializer; +import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer; +import com.fasterxml.jackson.datatype.jsr310.deser.LocalTimeDeserializer; +import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateSerializer; +import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer; +import com.fasterxml.jackson.datatype.jsr310.ser.LocalTimeSerializer; +import com.fasterxml.jackson.module.paramnames.ParameterNamesModule; +import models.Constants; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.format.DateTimeFormatter; + +/** + * @program: DataPushServer + * @description: 序列化设置 + * @author: linwl + * @create: 2020-07-11 09:15 + */ +@Configuration +public class JacksonConfig { + + @Bean + public ObjectMapper objectMapper() { + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + objectMapper.disable(DeserializationFeature.ADJUST_DATES_TO_CONTEXT_TIME_ZONE); + JavaTimeModule javaTimeModule = new JavaTimeModule(); + javaTimeModule.addSerializer( + LocalDateTime.class, + new LocalDateTimeSerializer( + DateTimeFormatter.ofPattern(Constants.DEFAULT_DATE_TIME_FORMAT))); + javaTimeModule.addSerializer( + LocalDate.class, + new LocalDateSerializer(DateTimeFormatter.ofPattern(Constants.DEFAULT_DATE_FORMAT))); + javaTimeModule.addSerializer( + LocalTime.class, + new LocalTimeSerializer(DateTimeFormatter.ofPattern(Constants.DEFAULT_TIME_FORMAT))); + javaTimeModule.addDeserializer( + LocalDateTime.class, + new LocalDateTimeDeserializer( + DateTimeFormatter.ofPattern(Constants.DEFAULT_DATE_TIME_FORMAT))); + javaTimeModule.addDeserializer( + LocalDate.class, + new LocalDateDeserializer(DateTimeFormatter.ofPattern(Constants.DEFAULT_DATE_FORMAT))); + javaTimeModule.addDeserializer( + LocalTime.class, + new LocalTimeDeserializer(DateTimeFormatter.ofPattern(Constants.DEFAULT_TIME_FORMAT))); + objectMapper.registerModule(javaTimeModule).registerModule(new ParameterNamesModule()); + return objectMapper; + } +} diff --git a/src/main/java/com/telpo/iotgateway/config/SchedulingExecutorConfig.java b/src/main/java/com/telpo/iotgateway/config/SchedulingExecutorConfig.java new file mode 100644 index 0000000..5a757ee --- /dev/null +++ b/src/main/java/com/telpo/iotgateway/config/SchedulingExecutorConfig.java @@ -0,0 +1,40 @@ +package com.telpo.iotgateway.config; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.SchedulingConfigurer; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; +import org.springframework.scheduling.config.ScheduledTaskRegistrar; + +/** + * @program: DataPushServer + * @description: 定时任务线程配置 + * @author: linwl + * @create: 2020-07-24 10:53 + */ +@Configuration +public class SchedulingExecutorConfig implements SchedulingConfigurer { + + @Value("${scheduler.pool.size}") + private int pollSize; + + @Value("${scheduler.pool.await-seconds}") + private int awaitSeconds; + + @Override + public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { + ThreadPoolTaskScheduler taskScheduler = taskScheduler(); + taskRegistrar.setTaskScheduler(taskScheduler); + } + + @Bean(destroyMethod = "shutdown", name = "taskScheduler") + public ThreadPoolTaskScheduler taskScheduler() { + ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); + scheduler.setPoolSize(pollSize); + scheduler.setThreadNamePrefix("task-"); + scheduler.setAwaitTerminationSeconds(awaitSeconds); + scheduler.setWaitForTasksToCompleteOnShutdown(true); + return scheduler; + } +} diff --git a/src/main/java/com/telpo/iotgateway/dto/BaseDto.java b/src/main/java/com/telpo/iotgateway/dto/BaseDto.java new file mode 100644 index 0000000..f8f3a23 --- /dev/null +++ b/src/main/java/com/telpo/iotgateway/dto/BaseDto.java @@ -0,0 +1,23 @@ +package com.telpo.iotgateway.dto; + +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + +/** + * @program: DataPushServer + * @description: + * @author: linwl + * @create: 2020-07-22 17:23 + */ +@Setter +@Getter +@ToString +public class BaseDto { + + /** 设备号 */ + private String imei; + + /** 数据时间 */ + private String time; +} diff --git a/src/main/java/com/telpo/iotgateway/exception/GobalExceptionHandler.java b/src/main/java/com/telpo/iotgateway/exception/GobalExceptionHandler.java new file mode 100644 index 0000000..e5bf117 --- /dev/null +++ b/src/main/java/com/telpo/iotgateway/exception/GobalExceptionHandler.java @@ -0,0 +1,30 @@ +package com.telpo.iotgateway.exception; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.bind.annotation.ExceptionHandler; +import org.springframework.web.bind.annotation.ResponseBody; +import org.springframework.web.bind.annotation.RestControllerAdvice; +import response.ERRORCODE; +import response.Result; + +import java.text.MessageFormat; + +/** + * @program: DataPushServer + * @description: 全局异常拦截 + * @author: linwl + * @create: 2020-07-11 10:29 + */ +@Slf4j +@RestControllerAdvice(basePackages = "com.telpo.datapushserver.controller") +public class GobalExceptionHandler { + @ExceptionHandler(value = Exception.class) + @ResponseBody + public Result exceptionErrorHandler(Exception e) throws Exception { + log.error(MessageFormat.format("推送服务发生异常:{0}!", e.getMessage()), e); + return new Result.Builder() + .setCode(ERRORCODE.SystemErr) + .setMessage(MessageFormat.format("系统发生异常:{0}!", e.getMessage())) + .build(); + } +} diff --git a/src/main/java/com/telpo/iotgateway/listener/IotJmsConnectionListener.java b/src/main/java/com/telpo/iotgateway/listener/IotJmsConnectionListener.java new file mode 100644 index 0000000..57b2f0c --- /dev/null +++ b/src/main/java/com/telpo/iotgateway/listener/IotJmsConnectionListener.java @@ -0,0 +1,66 @@ +package com.telpo.iotgateway.listener; + +import lombok.extern.slf4j.Slf4j; +import org.apache.qpid.jms.JmsConnectionListener; +import org.apache.qpid.jms.message.JmsInboundMessageDispatch; +import org.springframework.stereotype.Component; + +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import java.net.URI; +import java.time.LocalDateTime; + +/** + * @program: iotgateway + * @description: IotJms连接侦听 + * @author: linwl + * @create: 2021-02-24 11:22 + **/ +@Slf4j +@Component +public class IotJmsConnectionListener implements JmsConnectionListener { + /** + * 连接成功建立。 + */ + @Override + public void onConnectionEstablished(URI remoteURI) { + log.info("onConnectionEstablished, remoteUri:{0}, time:{1}", remoteURI, LocalDateTime.now()); + } + + /** + * 尝试过最大重试次数之后,最终连接失败。 + */ + @Override + public void onConnectionFailure(Throwable error) { + log.error("onConnectionFailure, {}", error.getMessage()); + } + + /** + * 连接中断。 + */ + @Override + public void onConnectionInterrupted(URI remoteURI) { + log.info("onConnectionInterrupted, remoteUri:{0}, time:{1}", remoteURI, LocalDateTime.now()); + } + + /** + * 连接中断后又自动重连上。 + */ + @Override + public void onConnectionRestored(URI remoteURI) { + log.info("onConnectionRestored, remoteUri:{0}, time:{1}", remoteURI, LocalDateTime.now()); + } + + @Override + public void onInboundMessage(JmsInboundMessageDispatch envelope) {} + + @Override + public void onSessionClosed(Session session, Throwable cause) {} + + @Override + public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {} + + @Override + public void onProducerClosed(MessageProducer producer, Throwable cause) {} +} diff --git a/src/main/java/com/telpo/iotgateway/listener/IotMessageListener.java b/src/main/java/com/telpo/iotgateway/listener/IotMessageListener.java new file mode 100644 index 0000000..763269d --- /dev/null +++ b/src/main/java/com/telpo/iotgateway/listener/IotMessageListener.java @@ -0,0 +1,75 @@ +package com.telpo.iotgateway.listener; + +import com.google.common.base.Stopwatch; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import javax.jms.Message; +import javax.jms.MessageListener; +import java.util.concurrent.ExecutorService; + +/** + * @program: iotgateway + * @description: Iot消息侦听 + * @author: linwl + * @create: 2021-02-24 11:39 + **/ +@Slf4j +@Component +public class IotMessageListener implements MessageListener { + + int count = 0; + + private ExecutorService executorService; + public void setExecutorService(ExecutorService executorService) { + this.executorService = executorService; + } + + @Override + public void onMessage(Message message) { + try { + //1.收到消息之后一定要ACK。 + // 推荐做法:创建Session选择Session.AUTO_ACKNOWLEDGE,这里会自动ACK。 + // 其他做法:创建Session选择Session.CLIENT_ACKNOWLEDGE,这里一定要调message.acknowledge()来ACK。 + // message.acknowledge(); + //2.建议异步处理收到的消息,确保onMessage函数里没有耗时逻辑。 + // 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。 + executorService.submit(() -> processMessage(message)); + count++; + } catch (Exception e) { + log.error("submit task occurs exception ", e); + } + } + + /** + * 在这里处理您收到消息后的具体业务逻辑。 + */ + private static void processMessage(Message message) { + try { + byte[] body = message.getBody(byte[].class); + String content = new String(body); + String topic = message.getStringProperty("topic"); + String messageId = message.getStringProperty("messageId"); + log.info("receive message" + + ", topic = " + topic + + ", messageId = " + messageId + + ", content = " + content); + } catch (Exception e) { + log.error("processMessage occurs error ", e); + } + } + + @Scheduled(cron = "${scheduler.task.cron}") + public void countMessages() { + //log.info("开始执行定时推送失败的推送记录!"); + // 获取推送失败的记录 + //try { + log.warn("约1分钟处理 {} 个请求",count); + count = 0; + //} catch (Exception e) { + // log.error("执行定时计数发生异常:", e); + //} + } + +} diff --git a/src/main/java/com/telpo/iotgateway/server/IotSubscribe.java b/src/main/java/com/telpo/iotgateway/server/IotSubscribe.java new file mode 100644 index 0000000..be2fb56 --- /dev/null +++ b/src/main/java/com/telpo/iotgateway/server/IotSubscribe.java @@ -0,0 +1,154 @@ +package com.telpo.iotgateway.server; + +import com.telpo.iotgateway.listener.IotJmsConnectionListener; +import com.telpo.iotgateway.listener.IotMessageListener; +import lombok.extern.slf4j.Slf4j; +import org.apache.qpid.jms.JmsConnection; +import org.apache.qpid.jms.JmsConnectionListener; +import org.springframework.core.env.ConfigurableEnvironment; +import org.springframework.stereotype.Component; +import org.apache.commons.codec.binary.Base64; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import javax.jms.*; + +import javax.crypto.Mac; +import javax.crypto.spec.SecretKeySpec; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NamingException; +import java.util.Hashtable; + +/** + * @program: DipperPositionServer + * @description: 北斗定位 + * @author: king + * @create: 2021-01-13 14:01 + */ +@Slf4j +@Component +public class IotSubscribe { + + //参数说明,请参见AMQP客户端接入说明文档。 + private String accessKey; + private String accessSecret; + private String consumerGroupId; + //iotInstanceId:企业版实例请填写实例ID,公共实例请填空字符串""。 + private String iotInstanceId; + private long timeStamp; + //签名方法:支持hmacmd5、hmacsha1和hmacsha256。 + private String signMethod; + private String clientId; + private String iotHost; + private String iotPort; + + // 执行单元线程池 + private final static ExecutorService executorService = new ThreadPoolExecutor( + Runtime.getRuntime().availableProcessors(), + Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(50000)); + // Jms连接侦听 + private static JmsConnectionListener myJmsConnectionListener = new IotJmsConnectionListener(); + // 消息侦听 + private static MessageListener messageListener = new IotMessageListener(); + + public IotSubscribe(ConfigurableEnvironment environment) { + //参数说明,请参见AMQP客户端接入说明文档。 + this.accessKey = environment.getProperty("iot.accessKey"); + this.accessSecret = environment.getProperty("iot.accessSecret"); + this.consumerGroupId = environment.getProperty("iot.consumerGroupId"); + //iotInstanceId:企业版实例请填写实例ID,公共实例请填空字符串""。 + this.iotInstanceId = environment.getProperty("iot.iotInstanceId"); + //签名方法:支持hmacmd5、hmacsha1和hmacsha256。 + this.signMethod = environment.getProperty("iot.signMethod"); + this.clientId = environment.getProperty("iot.clientId"); + this.iotHost = environment.getProperty("iot.iotHost"); + this.iotPort = environment.getProperty("iot.iotPort"); + + timeStamp = System.currentTimeMillis(); + } + + /* + * 同步进程线程 + */ + public void start() { + + try { + //计算签名,password组装方法,请参见AMQP客户端接入说明文档。 + String userInfo = getUserInfo(); + String signContent = "authId=" + accessKey + "×tamp=" + timeStamp;; + String password = doSign(signContent,accessSecret, signMethod); + + //接入域名,请参见AMQP客户端接入说明文档。 + // TODO Change ${YourHost} + String connectionUrl = "failover:(amqps://" + iotHost + ":" + iotPort + + "?amqp.idleTimeout=80000)" + + "?failover.reconnectDelay=30"; + + Hashtable hashtable = new Hashtable<>(); + hashtable.put("connectionfactory.SBCF",connectionUrl); + hashtable.put("queue.QUEUE", "default"); + hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory"); + Context context = new InitialContext(hashtable); + ConnectionFactory cf = (ConnectionFactory)context.lookup("SBCF"); + Destination queue = (Destination)context.lookup("QUEUE"); + + // 创建连接。 + Connection connection = cf.createConnection(userInfo, password); + ((JmsConnection) connection).addConnectionListener(myJmsConnectionListener); + // 创建会话。 + // Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()。 + // Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)。 + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + connection.start(); + + // 创建Receiver连接。消费消息 + MessageConsumer consumer = session.createConsumer(queue); + + // 设置消息侦听 + ((IotMessageListener)messageListener).setExecutorService(this.executorService); + consumer.setMessageListener(messageListener); + + } catch (IllegalArgumentException e) { + log.error("IllegalArgumentException:{}", e.getMessage()); + } catch (NamingException e) { + log.error("NamingException:{}", e.getMessage()); + } catch (JMSException e) { + log.error("JMSException:{}", e.getMessage()); + } catch (Exception e) { + log.error("Exception:{}", e.getMessage()); + } + } + + /* + * userInfo组装 + */ + private String getUserInfo() { + //userInfo组装 + String userInfo = clientId + "|authMode=aksign" + + ",signMethod=" + signMethod + + ",timestamp=" + timeStamp + + ",authId=" + accessKey + + ",iotInstanceId=" + iotInstanceId + + ",consumerGroupId=" + consumerGroupId + + "|"; + //计算签名,password组装方法,请参见AMQP客户端接入说明文档。 + return userInfo; + } + + /** + * 计算签名,password组装方法,请参见AMQP客户端接入说明文档。 + */ + private static String doSign(String toSignString, String secret, String signMethod) throws Exception { + SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod); + Mac mac = Mac.getInstance(signMethod); + mac.init(signingKey); + byte[] rawHmac = mac.doFinal(toSignString.getBytes()); + return Base64.encodeBase64String(rawHmac); + } + +} diff --git a/src/main/resources/bootstrap-dev.yaml b/src/main/resources/bootstrap-dev.yaml new file mode 100644 index 0000000..70a8c52 --- /dev/null +++ b/src/main/resources/bootstrap-dev.yaml @@ -0,0 +1,10 @@ +spring: + main: + allow-bean-definition-overriding: true + application: + name: push-service + cloud: + nacos: + config: + server-addr: 172.16.192.26:8848 + file-extension: yaml \ No newline at end of file diff --git a/src/main/resources/bootstrap-pro.yaml b/src/main/resources/bootstrap-pro.yaml new file mode 100644 index 0000000..23fb7a8 --- /dev/null +++ b/src/main/resources/bootstrap-pro.yaml @@ -0,0 +1,10 @@ +spring: + main: + allow-bean-definition-overriding: true + application: + name: push-service + cloud: + nacos: + config: + server-addr: 172.19.42.38:8848 + file-extension: yaml \ No newline at end of file diff --git a/src/main/resources/bootstrap-test.yaml b/src/main/resources/bootstrap-test.yaml new file mode 100644 index 0000000..568cfc8 --- /dev/null +++ b/src/main/resources/bootstrap-test.yaml @@ -0,0 +1,48 @@ +spring: + main: + allow-bean-definition-overriding: true + application: + name: push-service + cloud: + nacos: + config: + server-addr: 172.19.42.44:8848 + file-extension: yaml + jackson: + time-zone: GMT+8 + date-format: yyyy-MM-dd HH:mm:ss + aop: + auto: true + proxy-target-class: true + devtools: + restart: + enabled: true + freemarker: + cache: false + +async: + pool: + corePoolSize: 4 + maxPoolSize: 8 + queueCapacity: 5000 + +scheduler: + pool: + size: 1 + #等待任务完成退出最大秒数 + await-seconds: 6 + task: + cron: "0 */1 * * * *" + +iot: + accessKey: LTAI4FdXhwy1evoHXingMaiZ + accessSecret: CGmGpzta6ro8Bta4RLiQD18EF8m6Bm + consumerGroupId: eQVdFouKAYajil208F7F000100 + iotInstanceId: iot-cn-nif1vosz501 + RegionId: cn-shanghai + ProductKey: a18mXM6Cvx8 + //签名方法:支持hmacmd5、hmacsha1和hmacsha256。 + signMethod: hmacmd5 + clientId: gateway + iotHost: contoso.com + iotPort: 5671 \ No newline at end of file diff --git a/src/main/resources/bootstrap.yaml b/src/main/resources/bootstrap.yaml new file mode 100644 index 0000000..70a8c52 --- /dev/null +++ b/src/main/resources/bootstrap.yaml @@ -0,0 +1,10 @@ +spring: + main: + allow-bean-definition-overriding: true + application: + name: push-service + cloud: + nacos: + config: + server-addr: 172.16.192.26:8848 + file-extension: yaml \ No newline at end of file diff --git a/src/main/resources/log/logback-spring.xml b/src/main/resources/log/logback-spring.xml new file mode 100644 index 0000000..1de6973 --- /dev/null +++ b/src/main/resources/log/logback-spring.xml @@ -0,0 +1,197 @@ + + + + + + + + + + + + + + + + + + + + + + + + + debug + + + ${CONSOLE_LOG_PATTERN} + + UTF-8 + + + + + + + + + + ${log.path}/log_debug.log + + + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n + UTF-8 + + + + + ${log.path}/debug/log-debug-%d{yyyy-MM-dd}.%i.log + + 100MB + + + 7 + + + + debug + ACCEPT + DENY + + + + + + + ${log.path}/log_info.log + + + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n + UTF-8 + + + + + ${log.path}/info/log-info-%d{yyyy-MM-dd}.%i.log + + 100MB + + + 7 + + + + info + ACCEPT + DENY + + + + + + + ${log.path}/log_warn.log + + + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n + UTF-8 + + + + ${log.path}/warn/log-warn-%d{yyyy-MM-dd}.%i.log + + 100MB + + + 7 + + + + warn + ACCEPT + DENY + + + + + + + + ${log.path}/log_error.log + + + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n + UTF-8 + + + + ${log.path}/error/log-error-%d{yyyy-MM-dd}.%i.log + + 100MB + + + 15 + + + + ERROR + ACCEPT + DENY + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file