浏览代码

IOTGateway代码仓库提交

master
林万龙 3 年前
父节点
当前提交
0a77abedf7
共有 19 个文件被更改,包括 1116 次插入0 次删除
  1. +10
    -0
      Dockerfile
  2. +234
    -0
      pom.xml
  3. +39
    -0
      push_service_run.sh
  4. +26
    -0
      setup_dev.sh
  5. +20
    -0
      setup_pro.sh
  6. +26
    -0
      setup_test.sh
  7. +37
    -0
      src/main/java/com/telpo/iotgateway/IotGatewayApplication.java
  8. +61
    -0
      src/main/java/com/telpo/iotgateway/config/JacksonConfig.java
  9. +40
    -0
      src/main/java/com/telpo/iotgateway/config/SchedulingExecutorConfig.java
  10. +23
    -0
      src/main/java/com/telpo/iotgateway/dto/BaseDto.java
  11. +30
    -0
      src/main/java/com/telpo/iotgateway/exception/GobalExceptionHandler.java
  12. +66
    -0
      src/main/java/com/telpo/iotgateway/listener/IotJmsConnectionListener.java
  13. +75
    -0
      src/main/java/com/telpo/iotgateway/listener/IotMessageListener.java
  14. +154
    -0
      src/main/java/com/telpo/iotgateway/server/IotSubscribe.java
  15. +10
    -0
      src/main/resources/bootstrap-dev.yaml
  16. +10
    -0
      src/main/resources/bootstrap-pro.yaml
  17. +48
    -0
      src/main/resources/bootstrap-test.yaml
  18. +10
    -0
      src/main/resources/bootstrap.yaml
  19. +197
    -0
      src/main/resources/log/logback-spring.xml

+ 10
- 0
Dockerfile 查看文件

@@ -0,0 +1,10 @@
FROM java:8
MAINTAINER king <1609724385@qq.com>
VOLUME /tmp
COPY /target/iotgatewayservice.jar iotgatewayservice.jar
COPY --from=hengyunabc/arthas:latest /opt/arthas /opt/arthas
ENV TimeZone=Asia/Shanghai
ENV active=dev
ENV JAVA_OPTS="-Xmx512M -Xms512M"
RUN ln -snf /usr/share/zoneinfo/$TimeZone /etc/localtime && echo $TimeZone > /etc/timezone
ENTRYPOINT java $JAVA_OPTS -Djava.security.egd=file:/dev/./urandom -jar /iotgatewayservice.jar --spring.profiles.active=$active

+ 234
- 0
pom.xml 查看文件

@@ -0,0 +1,234 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.telpo</groupId>
<artifactId>iotgatewayservice</artifactId>
<version>1.0-SNAPSHOT</version>
<name>data-iot-gateway-server</name>
<description>数据推送服务</description>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<maven.compiler.encoding>UTF-8</maven.compiler.encoding>
<spring-cloud-alibaba.version>2.2.0.RELEASE</spring-cloud-alibaba.version>
<spring-cloud.version>Hoxton.RELEASE</spring-cloud.version>
<skipTests>true</skipTests>
</properties>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.5.RELEASE</version>
</parent>

<dependencies>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>

<!--天波通用包-->
<dependency>
<groupId>com.telpo</groupId>
<artifactId>common</artifactId>
<version>1.1.19</version>
</dependency>
<!-- amqp 1.0 qpid client -->
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-jms-client</artifactId>
<version>0.47.0</version>
</dependency>
<!-- util for base64-->
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.10</version>
</dependency>

<!-- springcloud alibaba依赖包-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>

<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>

<!-- springcloud 依赖包-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>


<!--MySQL 连接驱动依赖-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>

<!--mybatis-plus数据持久层-->
<!-- https://mvnrepository.com/artifact/com.baomidou/mybatis-plus -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus</artifactId>
<version>3.3.1</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.mybatis.spring.boot/mybatis-spring-boot-autoconfigure -->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-autoconfigure</artifactId>
<version>2.1.2</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.mybatis.spring.boot/mybatis-spring-boot-starter -->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.1.2</version>
</dependency>


<!--druid数据连接池-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.22</version>
</dependency>


<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.10</version>
</dependency>

<!-- AOP-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>

<!-- LocalDateTime 序列化-->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
</dependency>

<!-- 开启redis缓存 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<!-- redis依赖commons-pool 这个依赖一定要添加 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>

<!--kafka 依赖-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

<!-- https://mvnrepository.com/artifact/com.squareup.okhttp3/okhttp -->
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>4.8.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/de.codecentric/spring-boot-admin-starter-client -->
<dependency>
<groupId>de.codecentric</groupId>
<artifactId>spring-boot-admin-starter-client</artifactId>
<version>2.2.4</version>
</dependency>


</dependencies>


<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>

<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>${spring-cloud-alibaba.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<build>
<finalName>${project.artifactId}</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<!--fork : 使用 devtools生效-->
<fork>true</fork>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal><!--可以把依赖的包都打包到生成的Jar包中-->
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>


</project>

+ 39
- 0
push_service_run.sh 查看文件

@@ -0,0 +1,39 @@
#!/bin/bash
environment=$1
version=$2
echo "环境变量为$environment,版本为$version!"
if [[ $environment == 'pro' ]]; then
echo "开始远程构建容器"
docker stop iot_gateway_service || true
docker rm iot_gateway_service || true
docker rmi -f $(docker images | grep registry.cn-shanghai.aliyuncs.com/telpo_platform/iot_gateway_service | awk '{print $3}')
docker login --username=rzl_wangjx@1111649216405698 --password=telpo.123 registry.cn-shanghai.aliyuncs.com
docker pull registry.cn-shanghai.aliyuncs.com/telpo_platform/iot_gateway_service:$version
docker run -v /home/data/iotgatewayserver/log:/var/log/iotgatewayserver -e active=pro --restart=always -d --network host --name iot_gateway_service registry.cn-shanghai.aliyuncs.com/telpo_platform/iot_gateway_service:$version
#删除产生的None镜像
docker rmi -f $(docker images | grep none | awk '{print $3}')
docker ps -a
else
if [[ $environment == 'test' ]]; then
#echo "开始远程构建容器"
docker stop iot_gateway_service || true
docker rm iot_gateway_service || true

docker rmi -f $(docker images | grep 139.224.254.18:5000/iot_gateway_service | awk '{print $3}')
docker pull 139.224.254.18:5000/iot_gateway_service:$version
docker run -v /home/data/iotgatewayserver/log:/var/log/iotgatewayserver -e active=test --restart=always -d --network host --name iot_gateway_service 139.224.254.18:5000/iot_gateway_service:$version

#删除产生的None镜像
docker rmi -f $(docker images | grep none | awk '{print $3}')
docker ps -a
else
docker stop iot_gateway_service || true
docker rm iot_gateway_service || true
docker rmi -f $(docker images | grep 139.224.254.18:5000/iot_gateway_service | awk '{print $3}')
docker pull 139.224.254.18:5000/iot_gateway_service:$version
docker run -v /home/data/iotgatewayserver/log:/var/log/iotgatewayserver -e active=dev --restart=always -d --network host --name iot_gateway_service telpo/iot_gateway_service:$version
#删除产生的None镜像
docker rmi -f $(docker images | grep none | awk '{print $3}')
docker ps -a
fi
fi

+ 26
- 0
setup_dev.sh 查看文件

@@ -0,0 +1,26 @@
#!/bin/bash
mvn clean
mvn package -Dmaven.test.skip=true
#image_version=$(date +%Y%m%d%H%M)
image_version=$version
docker stop iot_gateway_service || true
docker rm iot_gateway_service || true
# 删除镜像
docker rmi -f $(docker images | grep telpo/iot_gateway_service | awk '{print $3}')

docker build . -t telpo/push_service:$image_version

#TODO:推送镜像到私有仓库
echo '=================开始推送镜像======================='
docker tag telpo/iot_gateway_service:$image_version 139.224.254.18:5000/iot_gateway_service:$image_version
docker push 139.224.254.18:5000/iot_gateway_service:$image_version
echo '=================推送镜像完成======================='

#删除产生的None镜像
docker rmi -f $(docker images | grep none | awk '{print $3}')
# 查看镜像列表
docker images
# 启动容器
docker run -v /home/data/iotgatewayserver/log:/var/log/iotgatewayserver -d -e active=dev --network host --restart=always --name iot_gateway_service telpo/iot_gateway_service:$image_version
# 查看日志
docker logs push_service

+ 20
- 0
setup_pro.sh 查看文件

@@ -0,0 +1,20 @@
#!/usr/bin/env bash
mvn clean
mvn package -Dmaven.test.skip=true
image_version=$version
# 删除镜像
docker rmi -f $(
docker images | grep registry.cn-shanghai.aliyuncs.com/telpo_platform/iot_gateway_service | awk '{print $3}'
)
# 构建telpo/mrp:$image_version镜像
docker build . -t telpo/iot_gateway_service:$image_version
#TODO:推送镜像到阿里仓库
echo '=================开始推送镜像======================='
docker login --username=rzl_wangjx@1111649216405698 --password=telpo.123 registry.cn-shanghai.aliyuncs.com
docker tag telpo/iot_gateway_service:$image_version registry.cn-shanghai.aliyuncs.com/telpo_platform/iot_gateway_service:$image_version
docker push registry.cn-shanghai.aliyuncs.com/telpo_platform/iot_gateway_service:$image_version
echo '=================推送镜像完成======================='
#删除产生的None镜像
docker rmi -f $(docker images | grep none | awk '{print $3}')
# 查看镜像列表
docker images

+ 26
- 0
setup_test.sh 查看文件

@@ -0,0 +1,26 @@
#!/bin/bash
mvn clean
mvn package -Dmaven.test.skip=true
#image_version=$(date +%Y%m%d%H%M)
image_version=$version
docker stop iot_gateway_service || true
docker rm iot_gateway_service || true
# 删除镜像
docker rmi -f $(docker images | grep telpo/iot_gateway_service | awk '{print $3}')

docker build . -t telpo/iot_gateway_service:$image_version

#TODO:推送镜像到私有仓库
echo '=================开始推送镜像======================='
docker tag telpo/iot_gateway_service:$image_version 139.224.254.18:5000/iot_gateway_service:$image_version
docker push 139.224.254.18:5000/iot_gateway_service:$image_version
echo '=================推送镜像完成======================='

#删除产生的None镜像
docker rmi -f $(docker images | grep none | awk '{print $3}')
# 查看镜像列表
docker images
# 启动容器
docker run -v /home/data/iotgatewayserver/log:/var/log/iotgatewayserver -d -e active=test --network host --restart=always --name iot_gateway_service telpo/iot_gateway_service:$image_version
# 查看日志
docker logs iot_gateway_service

+ 37
- 0
src/main/java/com/telpo/iotgateway/IotGatewayApplication.java 查看文件

@@ -0,0 +1,37 @@
package com.telpo.iotgateway;

import com.telpo.iotgateway.server.IotSubscribe;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.ConfigurationPropertiesScan;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;

/**
* @program: DataPushServer
* @description: 推送服务启动程序
* @author: linwl
* @create: 2020-07-10 18:04
*/
@SpringBootApplication
@EnableDiscoveryClient
@ConfigurationPropertiesScan
@EnableAsync
@EnableScheduling
@Slf4j
public class IotGatewayApplication {

public static void main(String[] args) {
ConfigurableApplicationContext applicationContext = SpringApplication.run(IotGatewayApplication.class, args);

ConfigurableEnvironment environment = applicationContext.getEnvironment();
IotSubscribe iotSubscribe = new IotSubscribe(environment);
iotSubscribe.start();
log.info("推送服务启动!");
}
}

+ 61
- 0
src/main/java/com/telpo/iotgateway/config/JacksonConfig.java 查看文件

@@ -0,0 +1,61 @@
package com.telpo.iotgateway.config;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateDeserializer;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateSerializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalTimeSerializer;
import com.fasterxml.jackson.module.paramnames.ParameterNamesModule;
import models.Constants;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;

/**
* @program: DataPushServer
* @description: 序列化设置
* @author: linwl
* @create: 2020-07-11 09:15
*/
@Configuration
public class JacksonConfig {

@Bean
public ObjectMapper objectMapper() {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
objectMapper.disable(DeserializationFeature.ADJUST_DATES_TO_CONTEXT_TIME_ZONE);
JavaTimeModule javaTimeModule = new JavaTimeModule();
javaTimeModule.addSerializer(
LocalDateTime.class,
new LocalDateTimeSerializer(
DateTimeFormatter.ofPattern(Constants.DEFAULT_DATE_TIME_FORMAT)));
javaTimeModule.addSerializer(
LocalDate.class,
new LocalDateSerializer(DateTimeFormatter.ofPattern(Constants.DEFAULT_DATE_FORMAT)));
javaTimeModule.addSerializer(
LocalTime.class,
new LocalTimeSerializer(DateTimeFormatter.ofPattern(Constants.DEFAULT_TIME_FORMAT)));
javaTimeModule.addDeserializer(
LocalDateTime.class,
new LocalDateTimeDeserializer(
DateTimeFormatter.ofPattern(Constants.DEFAULT_DATE_TIME_FORMAT)));
javaTimeModule.addDeserializer(
LocalDate.class,
new LocalDateDeserializer(DateTimeFormatter.ofPattern(Constants.DEFAULT_DATE_FORMAT)));
javaTimeModule.addDeserializer(
LocalTime.class,
new LocalTimeDeserializer(DateTimeFormatter.ofPattern(Constants.DEFAULT_TIME_FORMAT)));
objectMapper.registerModule(javaTimeModule).registerModule(new ParameterNamesModule());
return objectMapper;
}
}

+ 40
- 0
src/main/java/com/telpo/iotgateway/config/SchedulingExecutorConfig.java 查看文件

@@ -0,0 +1,40 @@
package com.telpo.iotgateway.config;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;

/**
* @program: DataPushServer
* @description: 定时任务线程配置
* @author: linwl
* @create: 2020-07-24 10:53
*/
@Configuration
public class SchedulingExecutorConfig implements SchedulingConfigurer {

@Value("${scheduler.pool.size}")
private int pollSize;

@Value("${scheduler.pool.await-seconds}")
private int awaitSeconds;

@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
ThreadPoolTaskScheduler taskScheduler = taskScheduler();
taskRegistrar.setTaskScheduler(taskScheduler);
}

@Bean(destroyMethod = "shutdown", name = "taskScheduler")
public ThreadPoolTaskScheduler taskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(pollSize);
scheduler.setThreadNamePrefix("task-");
scheduler.setAwaitTerminationSeconds(awaitSeconds);
scheduler.setWaitForTasksToCompleteOnShutdown(true);
return scheduler;
}
}

+ 23
- 0
src/main/java/com/telpo/iotgateway/dto/BaseDto.java 查看文件

@@ -0,0 +1,23 @@
package com.telpo.iotgateway.dto;

import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

/**
* @program: DataPushServer
* @description:
* @author: linwl
* @create: 2020-07-22 17:23
*/
@Setter
@Getter
@ToString
public class BaseDto {

/** 设备号 */
private String imei;

/** 数据时间 */
private String time;
}

+ 30
- 0
src/main/java/com/telpo/iotgateway/exception/GobalExceptionHandler.java 查看文件

@@ -0,0 +1,30 @@
package com.telpo.iotgateway.exception;

import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestControllerAdvice;
import response.ERRORCODE;
import response.Result;

import java.text.MessageFormat;

/**
* @program: DataPushServer
* @description: 全局异常拦截
* @author: linwl
* @create: 2020-07-11 10:29
*/
@Slf4j
@RestControllerAdvice(basePackages = "com.telpo.datapushserver.controller")
public class GobalExceptionHandler {
@ExceptionHandler(value = Exception.class)
@ResponseBody
public Result<Boolean> exceptionErrorHandler(Exception e) throws Exception {
log.error(MessageFormat.format("推送服务发生异常:{0}!", e.getMessage()), e);
return new Result.Builder<Boolean>()
.setCode(ERRORCODE.SystemErr)
.setMessage(MessageFormat.format("系统发生异常:{0}!", e.getMessage()))
.build();
}
}

+ 66
- 0
src/main/java/com/telpo/iotgateway/listener/IotJmsConnectionListener.java 查看文件

@@ -0,0 +1,66 @@
package com.telpo.iotgateway.listener;

import lombok.extern.slf4j.Slf4j;
import org.apache.qpid.jms.JmsConnectionListener;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.springframework.stereotype.Component;

import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import java.net.URI;
import java.time.LocalDateTime;

/**
* @program: iotgateway
* @description: IotJms连接侦听
* @author: linwl
* @create: 2021-02-24 11:22
**/
@Slf4j
@Component
public class IotJmsConnectionListener implements JmsConnectionListener {
/**
* 连接成功建立。
*/
@Override
public void onConnectionEstablished(URI remoteURI) {
log.info("onConnectionEstablished, remoteUri:{0}, time:{1}", remoteURI, LocalDateTime.now());
}

/**
* 尝试过最大重试次数之后,最终连接失败。
*/
@Override
public void onConnectionFailure(Throwable error) {
log.error("onConnectionFailure, {}", error.getMessage());
}

/**
* 连接中断。
*/
@Override
public void onConnectionInterrupted(URI remoteURI) {
log.info("onConnectionInterrupted, remoteUri:{0}, time:{1}", remoteURI, LocalDateTime.now());
}

/**
* 连接中断后又自动重连上。
*/
@Override
public void onConnectionRestored(URI remoteURI) {
log.info("onConnectionRestored, remoteUri:{0}, time:{1}", remoteURI, LocalDateTime.now());
}

@Override
public void onInboundMessage(JmsInboundMessageDispatch envelope) {}

@Override
public void onSessionClosed(Session session, Throwable cause) {}

@Override
public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {}

@Override
public void onProducerClosed(MessageProducer producer, Throwable cause) {}
}

+ 75
- 0
src/main/java/com/telpo/iotgateway/listener/IotMessageListener.java 查看文件

@@ -0,0 +1,75 @@
package com.telpo.iotgateway.listener;

import com.google.common.base.Stopwatch;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.jms.Message;
import javax.jms.MessageListener;
import java.util.concurrent.ExecutorService;

/**
* @program: iotgateway
* @description: Iot消息侦听
* @author: linwl
* @create: 2021-02-24 11:39
**/
@Slf4j
@Component
public class IotMessageListener implements MessageListener {

int count = 0;

private ExecutorService executorService;
public void setExecutorService(ExecutorService executorService) {
this.executorService = executorService;
}

@Override
public void onMessage(Message message) {
try {
//1.收到消息之后一定要ACK。
// 推荐做法:创建Session选择Session.AUTO_ACKNOWLEDGE,这里会自动ACK。
// 其他做法:创建Session选择Session.CLIENT_ACKNOWLEDGE,这里一定要调message.acknowledge()来ACK。
// message.acknowledge();
//2.建议异步处理收到的消息,确保onMessage函数里没有耗时逻辑。
// 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。
executorService.submit(() -> processMessage(message));
count++;
} catch (Exception e) {
log.error("submit task occurs exception ", e);
}
}

/**
* 在这里处理您收到消息后的具体业务逻辑。
*/
private static void processMessage(Message message) {
try {
byte[] body = message.getBody(byte[].class);
String content = new String(body);
String topic = message.getStringProperty("topic");
String messageId = message.getStringProperty("messageId");
log.info("receive message"
+ ", topic = " + topic
+ ", messageId = " + messageId
+ ", content = " + content);
} catch (Exception e) {
log.error("processMessage occurs error ", e);
}
}

@Scheduled(cron = "${scheduler.task.cron}")
public void countMessages() {
//log.info("开始执行定时推送失败的推送记录!");
// 获取推送失败的记录
//try {
log.warn("约1分钟处理 {} 个请求",count);
count = 0;
//} catch (Exception e) {
// log.error("执行定时计数发生异常:", e);
//}
}

}

+ 154
- 0
src/main/java/com/telpo/iotgateway/server/IotSubscribe.java 查看文件

@@ -0,0 +1,154 @@
package com.telpo.iotgateway.server;

import com.telpo.iotgateway.listener.IotJmsConnectionListener;
import com.telpo.iotgateway.listener.IotMessageListener;
import lombok.extern.slf4j.Slf4j;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionListener;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.stereotype.Component;
import org.apache.commons.codec.binary.Base64;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import javax.jms.*;

import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Hashtable;

/**
* @program: DipperPositionServer
* @description: 北斗定位
* @author: king
* @create: 2021-01-13 14:01
*/
@Slf4j
@Component
public class IotSubscribe {

//参数说明,请参见AMQP客户端接入说明文档。
private String accessKey;
private String accessSecret;
private String consumerGroupId;
//iotInstanceId:企业版实例请填写实例ID,公共实例请填空字符串""。
private String iotInstanceId;
private long timeStamp;
//签名方法:支持hmacmd5、hmacsha1和hmacsha256。
private String signMethod;
private String clientId;
private String iotHost;
private String iotPort;

// 执行单元线程池
private final static ExecutorService executorService = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(50000));
// Jms连接侦听
private static JmsConnectionListener myJmsConnectionListener = new IotJmsConnectionListener();
// 消息侦听
private static MessageListener messageListener = new IotMessageListener();

public IotSubscribe(ConfigurableEnvironment environment) {
//参数说明,请参见AMQP客户端接入说明文档。
this.accessKey = environment.getProperty("iot.accessKey");
this.accessSecret = environment.getProperty("iot.accessSecret");
this.consumerGroupId = environment.getProperty("iot.consumerGroupId");
//iotInstanceId:企业版实例请填写实例ID,公共实例请填空字符串""。
this.iotInstanceId = environment.getProperty("iot.iotInstanceId");
//签名方法:支持hmacmd5、hmacsha1和hmacsha256。
this.signMethod = environment.getProperty("iot.signMethod");
this.clientId = environment.getProperty("iot.clientId");
this.iotHost = environment.getProperty("iot.iotHost");
this.iotPort = environment.getProperty("iot.iotPort");

timeStamp = System.currentTimeMillis();
}

/*
* 同步进程线程
*/
public void start() {

try {
//计算签名,password组装方法,请参见AMQP客户端接入说明文档。
String userInfo = getUserInfo();
String signContent = "authId=" + accessKey + "&timestamp=" + timeStamp;;
String password = doSign(signContent,accessSecret, signMethod);

//接入域名,请参见AMQP客户端接入说明文档。
// TODO Change ${YourHost}
String connectionUrl = "failover:(amqps://" + iotHost + ":" + iotPort
+ "?amqp.idleTimeout=80000)"
+ "?failover.reconnectDelay=30";

Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF",connectionUrl);
hashtable.put("queue.QUEUE", "default");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);
ConnectionFactory cf = (ConnectionFactory)context.lookup("SBCF");
Destination queue = (Destination)context.lookup("QUEUE");

// 创建连接。
Connection connection = cf.createConnection(userInfo, password);
((JmsConnection) connection).addConnectionListener(myJmsConnectionListener);
// 创建会话。
// Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()。
// Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();

// 创建Receiver连接。消费消息
MessageConsumer consumer = session.createConsumer(queue);

// 设置消息侦听
((IotMessageListener)messageListener).setExecutorService(this.executorService);
consumer.setMessageListener(messageListener);

} catch (IllegalArgumentException e) {
log.error("IllegalArgumentException:{}", e.getMessage());
} catch (NamingException e) {
log.error("NamingException:{}", e.getMessage());
} catch (JMSException e) {
log.error("JMSException:{}", e.getMessage());
} catch (Exception e) {
log.error("Exception:{}", e.getMessage());
}
}

/*
* userInfo组装
*/
private String getUserInfo() {
//userInfo组装
String userInfo = clientId + "|authMode=aksign"
+ ",signMethod=" + signMethod
+ ",timestamp=" + timeStamp
+ ",authId=" + accessKey
+ ",iotInstanceId=" + iotInstanceId
+ ",consumerGroupId=" + consumerGroupId
+ "|";
//计算签名,password组装方法,请参见AMQP客户端接入说明文档。
return userInfo;
}

/**
* 计算签名,password组装方法,请参见AMQP客户端接入说明文档。
*/
private static String doSign(String toSignString, String secret, String signMethod) throws Exception {
SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);
Mac mac = Mac.getInstance(signMethod);
mac.init(signingKey);
byte[] rawHmac = mac.doFinal(toSignString.getBytes());
return Base64.encodeBase64String(rawHmac);
}

}

+ 10
- 0
src/main/resources/bootstrap-dev.yaml 查看文件

@@ -0,0 +1,10 @@
spring:
main:
allow-bean-definition-overriding: true
application:
name: push-service
cloud:
nacos:
config:
server-addr: 172.16.192.26:8848
file-extension: yaml

+ 10
- 0
src/main/resources/bootstrap-pro.yaml 查看文件

@@ -0,0 +1,10 @@
spring:
main:
allow-bean-definition-overriding: true
application:
name: push-service
cloud:
nacos:
config:
server-addr: 172.19.42.38:8848
file-extension: yaml

+ 48
- 0
src/main/resources/bootstrap-test.yaml 查看文件

@@ -0,0 +1,48 @@
spring:
main:
allow-bean-definition-overriding: true
application:
name: push-service
cloud:
nacos:
config:
server-addr: 172.19.42.44:8848
file-extension: yaml
jackson:
time-zone: GMT+8
date-format: yyyy-MM-dd HH:mm:ss
aop:
auto: true
proxy-target-class: true
devtools:
restart:
enabled: true
freemarker:
cache: false

async:
pool:
corePoolSize: 4
maxPoolSize: 8
queueCapacity: 5000

scheduler:
pool:
size: 1
#等待任务完成退出最大秒数
await-seconds: 6
task:
cron: "0 */1 * * * *"

iot:
accessKey: LTAI4FdXhwy1evoHXingMaiZ
accessSecret: CGmGpzta6ro8Bta4RLiQD18EF8m6Bm
consumerGroupId: eQVdFouKAYajil208F7F000100
iotInstanceId: iot-cn-nif1vosz501
RegionId: cn-shanghai
ProductKey: a18mXM6Cvx8
//签名方法:支持hmacmd5、hmacsha1和hmacsha256。
signMethod: hmacmd5
clientId: gateway
iotHost: contoso.com
iotPort: 5671

+ 10
- 0
src/main/resources/bootstrap.yaml 查看文件

@@ -0,0 +1,10 @@
spring:
main:
allow-bean-definition-overriding: true
application:
name: push-service
cloud:
nacos:
config:
server-addr: 172.16.192.26:8848
file-extension: yaml

+ 197
- 0
src/main/resources/log/logback-spring.xml 查看文件

@@ -0,0 +1,197 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- 日志级别从低到高分为TRACE < DEBUG < INFO < WARN < ERROR < FATAL,如果设置为WARN,则低于WARN的信息都不会输出 -->
<!-- scan:当此属性设置为true时,配置文件如果发生改变,将会被重新加载,默认值为true -->
<!-- scanPeriod:设置监测配置文件是否有修改的时间间隔,如果没有给出时间单位,默认单位是毫秒。当scan为true时,此属性生效。默认的时间间隔为1分钟。 -->
<!-- debug:当此属性设置为true时,将打印出logback内部日志信息,实时查看logback运行状态。默认值为false。 -->
<configuration scan="true" scanPeriod="10 seconds">

<!--<include resource="org/springframework/boot/logging/logback/base.xml" />-->
<!-- name的值是变量的名称,value的值时变量定义的值。通过定义的值会被插入到logger上下文中。定义变量后,可以使“${}”来使用变量。 -->
<property name="log.path" value="/var/log/iotgatewayserver"/>

<!-- 彩色日志 -->
<!-- 彩色日志依赖的渲染类 -->
<conversionRule conversionWord="clr" converterClass="org.springframework.boot.logging.logback.ColorConverter"/>
<conversionRule conversionWord="wex"
converterClass="org.springframework.boot.logging.logback.WhitespaceThrowableProxyConverter"/>
<conversionRule conversionWord="wEx"
converterClass="org.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter"/>
<!-- 彩色日志格式 -->
<property name="CONSOLE_LOG_PATTERN"
value="${CONSOLE_LOG_PATTERN:-%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}"/>


<!--输出到控制台-->
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<!--此日志appender是为开发使用,只配置最底级别,控制台输出的日志级别是大于或等于此级别的日志信息-->
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>debug</level>
</filter>
<encoder>
<Pattern>${CONSOLE_LOG_PATTERN}</Pattern>
<!-- 设置字符集 -->
<charset>UTF-8</charset>
</encoder>
</appender>


<!--输出到文件-->

<!-- 时间滚动输出 level为 DEBUG 日志 -->
<appender name="DEBUG_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- 正在记录的日志文件的路径及文件名 -->
<file>${log.path}/log_debug.log</file>
<!--日志文件输出格式-->
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
<charset>UTF-8</charset> <!-- 设置字符集 -->
</encoder>
<!-- 日志记录器的滚动策略,按日期,按大小记录 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- 日志归档 -->
<fileNamePattern>${log.path}/debug/log-debug-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>100MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
<!--日志文件保留天数-->
<maxHistory>7</maxHistory>
</rollingPolicy>
<!-- 此日志文件只记录debug级别的 -->
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>debug</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
</appender>

<!-- 时间滚动输出 level为 INFO 日志 -->
<appender name="INFO_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- 正在记录的日志文件的路径及文件名 -->
<file>${log.path}/log_info.log</file>
<!--日志文件输出格式-->
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
<charset>UTF-8</charset>
</encoder>
<!-- 日志记录器的滚动策略,按日期,按大小记录 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- 每天日志归档路径以及格式 -->
<fileNamePattern>${log.path}/info/log-info-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>100MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
<!--日志文件保留天数-->
<maxHistory>7</maxHistory>
</rollingPolicy>
<!-- 此日志文件只记录info级别的 -->
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>info</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
</appender>

<!-- 时间滚动输出 level为 WARN 日志 -->
<appender name="WARN_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- 正在记录的日志文件的路径及文件名 -->
<file>${log.path}/log_warn.log</file>
<!--日志文件输出格式-->
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
<charset>UTF-8</charset> <!-- 此处设置字符集 -->
</encoder>
<!-- 日志记录器的滚动策略,按日期,按大小记录 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${log.path}/warn/log-warn-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>100MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
<!--日志文件保留天数-->
<maxHistory>7</maxHistory>
</rollingPolicy>
<!-- 此日志文件只记录warn级别的 -->
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>warn</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
</appender>


<!-- 时间滚动输出 level为 ERROR 日志 -->
<appender name="ERROR_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- 正在记录的日志文件的路径及文件名 -->
<file>${log.path}/log_error.log</file>
<!--日志文件输出格式-->
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
<charset>UTF-8</charset> <!-- 此处设置字符集 -->
</encoder>
<!-- 日志记录器的滚动策略,按日期,按大小记录 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${log.path}/error/log-error-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>100MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
<!--日志文件保留天数-->
<maxHistory>15</maxHistory>
</rollingPolicy>
<!-- 此日志文件只记录ERROR级别的 -->
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>ERROR</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
</appender>

<!--
<logger>用来设置某一个包或者具体的某一个类的日志打印级别、
以及指定<appender>。<logger>仅有一个name属性,
一个可选的level和一个可选的addtivity属性。
name:用来指定受此logger约束的某一个包或者具体的某一个类。
level:用来设置打印级别,大小写无关:TRACE, DEBUG, INFO, WARN, ERROR, ALL 和 OFF,
还有一个特俗值INHERITED或者同义词NULL,代表强制执行上级的级别。
如果未设置此属性,那么当前logger将会继承上级的级别。
addtivity:是否向上级logger传递打印信息。默认是true。
-->
<!--<logger name="org.springframework.web" level="info"/>-->
<!--<logger name="org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor" level="INFO"/>-->
<!--
使用mybatis的时候,sql语句是debug下才会打印,而这里我们只配置了info,所以想要查看sql语句的话,有以下两种操作:
第一种把<root level="info">改成<root level="DEBUG">这样就会打印sql,不过这样日志那边会出现很多其他消息
第二种就是单独给dao下目录配置debug模式,代码如下,这样配置sql语句会打印,其他还是正常info级别:
-->


<!--
root节点是必选节点,用来指定最基础的日志输出级别,只有一个level属性
level:用来设置打印级别,大小写无关:TRACE, DEBUG, INFO, WARN, ERROR, ALL 和 OFF,
不能设置为INHERITED或者同义词NULL。默认是DEBUG
可以包含零个或多个元素,标识这个appender将会添加到这个logger。
-->

<!--开发环境:打印控制台-->
<springProfile name="dev">
<logger name="com.telpo.iotgateway" level="debug"/>
</springProfile>

<root level="info">
<appender-ref ref="CONSOLE"/>
<appender-ref ref="DEBUG_FILE"/>
<appender-ref ref="INFO_FILE"/>
<appender-ref ref="WARN_FILE"/>
<appender-ref ref="ERROR_FILE"/>
</root>

<!--生产环境:输出到文件-->
<!--<springProfile name="pro">-->
<!--<root level="info">-->
<!--<appender-ref ref="CONSOLE" />-->
<!--<appender-ref ref="DEBUG_FILE" />-->
<!--<appender-ref ref="INFO_FILE" />-->
<!--<appender-ref ref="ERROR_FILE" />-->
<!--<appender-ref ref="WARN_FILE" />-->
<!--</root>-->
<!--</springProfile>-->

</configuration>

正在加载...
取消
保存