diff --git a/pom.xml b/pom.xml index 382d7f3..20c8038 100644 --- a/pom.xml +++ b/pom.xml @@ -87,6 +87,51 @@ 1.2.28 + + io.netty + netty-all + 4.1.13.Final + + + + com.telpo + common + 1.1.19 + + + + + org.springframework.boot + spring-boot-starter-data-redis + + + + + org.apache.commons + commons-pool2 + + + + + com.squareup.okhttp3 + okhttp + 4.8.0 + + + + + de.codecentric + spring-boot-admin-starter-client + 2.2.4 + + + + + net.sourceforge.javacsv + javacsv + 2.0 + + diff --git a/src/main/java/com/telpo/dipperposition/DipperPositionApplication.java b/src/main/java/com/telpo/dipperposition/DipperPositionApplication.java index 5bafebb..865170a 100644 --- a/src/main/java/com/telpo/dipperposition/DipperPositionApplication.java +++ b/src/main/java/com/telpo/dipperposition/DipperPositionApplication.java @@ -1,11 +1,15 @@ package com.telpo.dipperposition; +import com.telpo.dipperposition.server.DipperPositionServer; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.properties.ConfigurationPropertiesScan; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.client.discovery.EnableDiscoveryClient; +import java.net.InetSocketAddress; + /** * @program: gateway * @description: 网关启动类 @@ -16,11 +20,17 @@ import org.springframework.cloud.client.discovery.EnableDiscoveryClient; @EnableConfigurationProperties @EnableDiscoveryClient @Slf4j +@ConfigurationPropertiesScan public class DipperPositionApplication { public static void main(String[] args) { log.info("北斗定位服务开始!"); SpringApplication.run(DipperPositionApplication.class, args); log.info("北斗定位服务启动!"); + //启动服务端 + DipperPositionServer nettyServer = new DipperPositionServer(); + nettyServer.startTimeAsnc(); + nettyServer.startPosAsnc(); + nettyServer.startStarsAsnc(); } } diff --git a/src/main/java/com/telpo/dipperposition/common/CSVUtil.java b/src/main/java/com/telpo/dipperposition/common/CSVUtil.java new file mode 100644 index 0000000..e6d1ab0 --- /dev/null +++ b/src/main/java/com/telpo/dipperposition/common/CSVUtil.java @@ -0,0 +1,87 @@ +package com.telpo.dipperposition.common; + +import com.csvreader.CsvReader; +import com.csvreader.CsvWriter; + +import java.io.FileNotFoundException; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; +/** + * @program: dipperposition + * @description: CSV文件读取单元 + * @author: linwl + * @create: 2021-01-14 11:19 + **/ +public class CSVUtil { + /** + * 读取每行的数据 + * + * @param readPath + * @return + */ + public static List readCSV(String readPath) { + String filePath = readPath; + List listData = new ArrayList<>(); + try { + filePath = readPath; + CsvReader csvReader = new CsvReader(filePath); + // 读表头 + boolean re = csvReader.readHeaders(); + while (csvReader.readRecord()) { + String rawRecord = csvReader.getRawRecord(); + listData.add(rawRecord); + } + return listData; + } catch (FileNotFoundException e) { + throw new RuntimeException("文件未找到"); + } catch (IOException e) { + throw new RuntimeException(e.getMessage()); + } + + } + + /** + * 写入文件头 + * @param writePath + * @param header + */ + public static void writeCSV(String writePath, String[] header) { + String filePath = writePath; + try { + CsvWriter csvWriter = new CsvWriter(writePath, ',', Charset.forName("UTF-8")); + //String [] header = {"SkuId","SsuId","图片地址","大小(bit)","高度","宽度"}; + csvWriter.writeRecord(header); + csvWriter.close(); + } catch (Exception e) { + e.printStackTrace(); + } + + } + + /** + * 利用输入输出流持续写 + * @param fileName + * @param content + */ + public static void writeContent(String fileName, String content) { + FileWriter writer = null; + try { + // 打开一个写文件器,构造函数中的第二个参数true表示以追加形式写文件 + writer = new FileWriter(fileName, true); + writer.write(content + "\r\n"); + } catch (IOException e) { + e.printStackTrace(); + } finally { + try { + if (writer != null) { + writer.close(); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + } +} diff --git a/src/main/java/com/telpo/dipperposition/common/HexConvert.java b/src/main/java/com/telpo/dipperposition/common/HexConvert.java new file mode 100644 index 0000000..8f79122 --- /dev/null +++ b/src/main/java/com/telpo/dipperposition/common/HexConvert.java @@ -0,0 +1,140 @@ +package com.telpo.dipperposition.common; + +/** + * @program: dipperposition + * @description: 16进制处理 + * @author: linwl + * @create: 2021-01-14 22:05 + **/ +public class HexConvert { + public static String convertStringToHex(String str){ + + char[] chars = str.toCharArray(); + + StringBuffer hex = new StringBuffer(); + for(int i = 0; i < chars.length; i++){ + hex.append(Integer.toHexString((int)chars[i])); + } + + return hex.toString(); + } + + public static String convertHexToString(String hex){ + + StringBuilder sb = new StringBuilder(); + StringBuilder sb2 = new StringBuilder(); + + for( int i=0; i> 4)); + hex += String.valueOf(hexStr.charAt(b & 0x0F)); + result += hex + " "; + } + return result; + } + + //將10進制轉換為16進制 + public static String encodeHEX(long numb){ + + String hex= Long.toHexString(numb); + return hex; + + } + + + //將16進制字符串轉換為10進制數字 + public static long decodeHEX(String hexs){ + long longValue= Long.parseLong("123ABC", 16); + return longValue; + } + + + /** + * 生成校验码的int值 + * */ + public static String makeChecksum(String data) { + if (data == null || data.equals("")) { + return ""; + } + int total = 0; + int len = data.length(); + int num = 0; + while (num < len) { + String s = data.substring(num, num + 2); + //System.out.println(s); + total += Integer.parseInt(s, 16); + num = num + 2; + } + /** + * 用256求余最大是255,即16进制的FF + */ + int mod = total % 256; + String hex = Integer.toHexString(mod); + len = hex.length(); + // 如果不够校验位的长度,补0,这里用的是两位校验 + if (len < 2) { + hex = "0" + hex; + } + return hex; + } +// +// public static void main(String[] args) { +// +// +// System.out.println("======ASCII码转换为16进制======"); +// String str = "*00007VERSION\\n1$"; +// System.out.println("字符串: " + str); +// String hex = HexConvert.convertStringToHex(str); +// System.out.println("====转换为16进制=====" + hex); +// +// System.out.println("======16进制转换为ASCII======"); +// System.out.println("Hex : " + hex); +// System.out.println("ASCII : " + HexConvert.convertHexToString(hex)); +// +// byte[] bytes = HexConvert.hexStringToBytes( hex ); +// +// System.out.println(HexConvert.BinaryToHexString( bytes )); +// } + +} diff --git a/src/main/java/com/telpo/dipperposition/common/OkHttpUtil.java b/src/main/java/com/telpo/dipperposition/common/OkHttpUtil.java new file mode 100644 index 0000000..a31eb99 --- /dev/null +++ b/src/main/java/com/telpo/dipperposition/common/OkHttpUtil.java @@ -0,0 +1,155 @@ +package com.telpo.dipperposition.common; + +import com.alibaba.fastjson.JSONObject; +import lombok.extern.slf4j.Slf4j; +import okhttp3.*; +import org.apache.commons.lang3.ObjectUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.text.MessageFormat; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; + +/** + * @program: DataPushServer + * @description: okhttp工具类 + * @author: linwl + * @create: 2020-07-17 15:43 + */ +@Slf4j +@Component +public class OkHttpUtil { + + @Autowired + private OkHttpClient okHttpClient; + + /** + * 根据map获取get请求参数 + * + * @param queries + * @return + */ + public StringBuffer getQueryString(String url, Map queries) { + StringBuffer sb = new StringBuffer(url); + if (queries != null && queries.keySet().size() > 0) { + boolean firstFlag = true; + Iterator iterator = queries.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = (Map.Entry) iterator.next(); + if (firstFlag) { + sb.append("?" + entry.getKey() + "=" + entry.getValue()); + firstFlag = false; + } else { + sb.append("&" + entry.getKey() + "=" + entry.getValue()); + } + } + } + return sb; + } + + /** + * get + * + * @param url 请求的url + * @param queries 请求的参数,在浏览器?后面的数据,没有可以传null + * @return + */ + public String get(String url, Map queries) { + StringBuffer sb = getQueryString(url, queries); + Request request = new Request.Builder().url(sb.toString()).build(); + log.debug(MessageFormat.format("发送Get to url<{0}>,参数为:{1}", url, queries)); + return execNewCall(request); + } + + /** + * post + * + * @param url 请求的url + * @param params post form 提交的参数 + * @return + */ + public String postFormParams(String url, Map params) { + FormBody.Builder builder = new FormBody.Builder(); + // 添加参数 + if (params != null && params.keySet().size() > 0) { + for (String key : params.keySet()) { + builder.add(key, params.get(key)); + } + } + log.debug(MessageFormat.format("发送post from to url<{0}>,参数为:{1}", url, params)); + Request request = new Request.Builder().url(url).post(builder.build()).build(); + return execNewCall(request); + } + + /** Post请求发送JSON数据....{"name":"zhangsan","pwd":"123456"} 参数一:请求Url 参数二:请求的JSON 参数三:请求回调 */ + public String postJsonParams(String url, String jsonParams) { + RequestBody requestBody = RequestBody.create(jsonParams, MediaType.parse("application/json; charset=utf-8")); + Request request = new Request.Builder().url(url).post(requestBody).build(); + log.debug(MessageFormat.format("发送post json to url<{0}>,参数为:{1}", url, jsonParams)); + return execNewCall(request); + } + + /** Post请求发送xml数据.... 参数一:请求Url 参数二:请求的xmlString 参数三:请求回调 */ + public String postXmlParams(String url, String xml) { + RequestBody requestBody = + RequestBody.create(xml, MediaType.parse("application/xml; charset=utf-8")); + Request request = new Request.Builder().url(url).post(requestBody).build(); + log.debug(MessageFormat.format("发送post xml to url<{0}>,参数为:{1}", url, xml)); + return execNewCall(request); + } + + /** + * 调用okhttp的newCall方法 + * + * @param request + * @return + */ + private String execNewCall(Request request) { + try (Response response = okHttpClient.newCall(request).execute()) { + if (response.isSuccessful()) { + return Objects.requireNonNull(response.body()).string(); + } + } catch (Exception e) { + log.error("okhttp3 put error >> ex = {}", ExceptionUtils.getStackTrace(e)); + } + return "FAIL"; + } + + /** + * Post请求发送JSON数据....{"name":"zhangsan","pwd":"123456"} 参数一:请求Url 参数二:请求的JSON 参数三:请求回调 + */ + public String postJsonParamsWithToken(String url, String token, String jsonParams) { + RequestBody requestBody = + RequestBody.create(jsonParams, MediaType.parse("application/json; charset=utf-8")); + Request request = new Request.Builder().url(url). + addHeader("Authorization", token).post(requestBody).build(); + log.debug(MessageFormat.format("发送post json to url<{0}>,参数为:{1}", url, jsonParams)); + return execNewCall(request); + } + + public JSONObject postRequestWithJson(String url, String accessToken, JSONObject postData) { + String postResult; + if (ObjectUtils.isNotEmpty(accessToken)) { + postResult = postJsonParamsWithToken(url, accessToken, JSONObject.toJSONString(postData)); + } else { + postResult = postJsonParams(url, JSONObject.toJSONString(postData)); + } + + if (postResult == null) { + log.error("访问错误"); + return null; + } else { + log.debug(postResult); + if(("FAIL").equals(postResult.toString())) { + JSONObject object = new JSONObject(); + object.put("result", "FAIL"); + return object; + } else { + return JSONObject.parseObject(postResult); + } + } + } +} diff --git a/src/main/java/com/telpo/dipperposition/common/RedisUtil.java b/src/main/java/com/telpo/dipperposition/common/RedisUtil.java new file mode 100644 index 0000000..97933f0 --- /dev/null +++ b/src/main/java/com/telpo/dipperposition/common/RedisUtil.java @@ -0,0 +1,665 @@ +package com.telpo.dipperposition.common; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; + +import javax.annotation.Resource; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +/** + * @program: DataPushServer + * @description: redis工具类 + * @author: linwl + * @create: 2020-07-11 10:26 + */ +@Component +@Slf4j +public class RedisUtil { + + @Resource private RedisTemplate redisTemplate; + + // =============================common============================ + /** + * 指定缓存失效时间 + * + * @param key 键 + * @param time 时间(秒) + * @return + */ + public boolean expire(String key, long time) { + try { + if (time > 0) { + redisTemplate.expire(key, time, TimeUnit.SECONDS); + } + return true; + } catch (Exception e) { + log.error(key, e); + return false; + } + } + + /** + * 根据key 获取过期时间 + * + * @param key 键 不能为null + * @return 时间(秒) 返回0代表为永久有效 + */ + public long getExpire(String key) { + return redisTemplate.getExpire(key, TimeUnit.SECONDS); + } + + /** + * 判断key是否存在 + * + * @param key 键 + * @return true 存在 false不存在 + */ + public boolean hasKey(String key) { + try { + return redisTemplate.hasKey(key); + } catch (Exception e) { + log.error(key, e); + return false; + } + } + + /** + * 删除缓存 + * + * @param key 可以传一个值 或多个 + */ + @SuppressWarnings("unchecked") + public void del(String... key) { + if (key != null && key.length > 0) { + if (key.length == 1) { + redisTemplate.delete(key[0]); + } else { + redisTemplate.delete(CollectionUtils.arrayToList(key)); + } + } + } + + // ============================String============================= + /** + * 普通缓存获取 + * + * @param key 键 + * @return 值 + */ + public Object get(String key) { + return key == null ? null : redisTemplate.opsForValue().get(key); + } + + /** + * 普通缓存放入 + * + * @param key 键 + * @param value 值 + * @return true成功 false失败 + */ + public boolean set(String key, Object value) { + try { + redisTemplate.opsForValue().set(key, value); + return true; + } catch (Exception e) { + log.error(key, e); + return false; + } + } + + /** + * 普通缓存放入并设置时间 + * + * @param key 键 + * @param value 值 + * @param time 时间(秒) time要大于0 如果time小于等于0 将设置无限期 + * @return true成功 false 失败 + */ + public boolean set(String key, Object value, long time) { + try { + if (time > 0) { + redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS); + } else { + set(key, value); + } + return true; + } catch (Exception e) { + log.error(key, e); + return false; + } + } + + /** + * 递增 适用场景: https://blog.csdn.net/y_y_y_k_k_k_k/article/details/79218254 高并发生成订单号,秒杀类的业务逻辑等。。 + * + * @param key 键 + * @param delta 要增加几(大于0) + * @return + */ + public long incr(String key, long delta) { + if (delta < 0) { + throw new RuntimeException("递增因子必须大于0"); + } + return redisTemplate.opsForValue().increment(key, delta); + } + + /** + * 递减 + * + * @param key 键 + * @param delta 要减少几(小于0) + * @return + */ + public long decr(String key, long delta) { + if (delta < 0) { + throw new RuntimeException("递减因子必须大于0"); + } + return redisTemplate.opsForValue().increment(key, -delta); + } + + // ================================Map================================= + /** + * HashGet + * + * @param key 键 不能为null + * @param item 项 不能为null + * @return 值 + */ + public Object hget(String key, String item) { + return redisTemplate.opsForHash().get(key, item); + } + + /** + * 获取hashKey对应的所有键值 + * + * @param key 键 + * @return 对应的多个键值 + */ + public Map hmget(String key) { + return redisTemplate.opsForHash().entries(key); + } + + /** + * HashSet + * + * @param key 键 + * @param map 对应多个键值 + * @return true 成功 false 失败 + */ + public boolean hmset(String key, Map map) { + try { + redisTemplate.opsForHash().putAll(key, map); + return true; + } catch (Exception e) { + log.error(key, e); + return false; + } + } + + /** + * HashSet 并设置时间 + * + * @param key 键 + * @param map 对应多个键值 + * @param time 时间(秒) + * @return true成功 false失败 + */ + public boolean hmset(String key, Map map, long time) { + try { + redisTemplate.opsForHash().putAll(key, map); + if (time > 0) { + expire(key, time); + } + return true; + } catch (Exception e) { + log.error(key, e); + return false; + } + } + + /** + * 向一张hash表中放入数据,如果不存在将创建 + * + * @param key 键 + * @param item 项 + * @param value 值 + * @return true 成功 false失败 + */ + public boolean hset(String key, String item, Object value) { + try { + redisTemplate.opsForHash().put(key, item, value); + return true; + } catch (Exception e) { + log.error(key, e); + return false; + } + } + + /** + * 向一张hash表中放入数据,如果不存在将创建 + * + * @param key 键 + * @param item 项 + * @param value 值 + * @param time 时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间 + * @return true 成功 false失败 + */ + public boolean hset(String key, String item, Object value, long time) { + try { + redisTemplate.opsForHash().put(key, item, value); + if (time > 0) { + expire(key, time); + } + return true; + } catch (Exception e) { + log.error(key, e); + return false; + } + } + + /** + * 删除hash表中的值 + * + * @param key 键 不能为null + * @param item 项 可以使多个 不能为null + */ + public void hdel(String key, Object... item) { + redisTemplate.opsForHash().delete(key, item); + } + + /** + * 判断hash表中是否有该项的值 + * + * @param key 键 不能为null + * @param item 项 不能为null + * @return true 存在 false不存在 + */ + public boolean hHasKey(String key, String item) { + return redisTemplate.opsForHash().hasKey(key, item); + } + + /** + * hash递增 如果不存在,就会创建一个 并把新增后的值返回 + * + * @param key 键 + * @param item 项 + * @param by 要增加几(大于0) + * @return + */ + public double hincr(String key, String item, double by) { + return redisTemplate.opsForHash().increment(key, item, by); + } + + /** + * hash递减 + * + * @param key 键 + * @param item 项 + * @param by 要减少记(小于0) + * @return + */ + public double hdecr(String key, String item, double by) { + return redisTemplate.opsForHash().increment(key, item, -by); + } + + // ============================set============================= + /** + * 根据key获取Set中的所有值 + * + * @param key 键 + * @return + */ + public Set sGet(String key) { + try { + return redisTemplate.opsForSet().members(key); + } catch (Exception e) { + log.error(key, e); + return null; + } + } + + /** + * 根据value从一个set中查询,是否存在 + * + * @param key 键 + * @param value 值 + * @return true 存在 false不存在 + */ + public boolean sHasKey(String key, Object value) { + try { + return redisTemplate.opsForSet().isMember(key, value); + } catch (Exception e) { + log.error(key, e); + return false; + } + } + + /** + * 将数据放入set缓存 + * + * @param key 键 + * @param values 值 可以是多个 + * @return 成功个数 + */ + public long sSet(String key, Object... values) { + try { + return redisTemplate.opsForSet().add(key, values); + } catch (Exception e) { + log.error(key, e); + return 0; + } + } + + /** + * 将set数据放入缓存 + * + * @param key 键 + * @param time 时间(秒) + * @param values 值 可以是多个 + * @return 成功个数 + */ + public long sSetAndTime(String key, long time, Object... values) { + try { + Long count = redisTemplate.opsForSet().add(key, values); + if (time > 0) { + expire(key, time); + } + return count; + } catch (Exception e) { + log.error(key, e); + return 0; + } + } + + /** + * 获取set缓存的长度 + * + * @param key 键 + * @return + */ + public long sGetSetSize(String key) { + try { + return redisTemplate.opsForSet().size(key); + } catch (Exception e) { + log.error(key, e); + return 0; + } + } + + /** + * 移除值为value的 + * + * @param key 键 + * @param values 值 可以是多个 + * @return 移除的个数 + */ + public long setRemove(String key, Object... values) { + try { + Long count = redisTemplate.opsForSet().remove(key, values); + return count; + } catch (Exception e) { + log.error(key, e); + return 0; + } + } + + // ============================zset============================= + /** + * 根据key获取Set中的所有值 + * + * @param key 键 + * @return + */ + public Set zSGet(String key) { + try { + return redisTemplate.opsForSet().members(key); + } catch (Exception e) { + log.error(key, e); + return null; + } + } + + /** + * 根据value从一个set中查询,是否存在 + * + * @param key 键 + * @param value 值 + * @return true 存在 false不存在 + */ + public boolean zSHasKey(String key, Object value) { + try { + return redisTemplate.opsForSet().isMember(key, value); + } catch (Exception e) { + log.error(key, e); + return false; + } + } + + public Boolean zSSet(String key, Object value, double score) { + try { + return redisTemplate.opsForZSet().add(key, value, 2); + } catch (Exception e) { + log.error(key, e); + return false; + } + } + + /** + * 将set数据放入缓存 + * + * @param key 键 + * @param time 时间(秒) + * @param values 值 可以是多个 + * @return 成功个数 + */ + public long zSSetAndTime(String key, long time, Object... values) { + try { + Long count = redisTemplate.opsForSet().add(key, values); + if (time > 0) { + expire(key, time); + } + return count; + } catch (Exception e) { + log.error(key, e); + return 0; + } + } + + /** + * 获取set缓存的长度 + * + * @param key 键 + * @return + */ + public long zSGetSetSize(String key) { + try { + return redisTemplate.opsForSet().size(key); + } catch (Exception e) { + log.error(key, e); + return 0; + } + } + + /** + * 移除值为value的 + * + * @param key 键 + * @param values 值 可以是多个 + * @return 移除的个数 + */ + public long zSetRemove(String key, Object... values) { + try { + Long count = redisTemplate.opsForSet().remove(key, values); + return count; + } catch (Exception e) { + log.error(key, e); + return 0; + } + } + // ===============================list================================= + + /** + * 获取list缓存的内容 取出来的元素 总数 end-start+1 + * + * @param key 键 + * @param start 开始 0 是第一个元素 + * @param end 结束 -1代表所有值 + * @return + */ + public List lGet(String key, long start, long end) { + try { + return redisTemplate.opsForList().range(key, start, end); + } catch (Exception e) { + log.error(key, e); + return null; + } + } + + /** + * 获取list缓存的长度 + * + * @param key 键 + * @return + */ + public long lGetListSize(String key) { + try { + return redisTemplate.opsForList().size(key); + } catch (Exception e) { + log.error(key, e); + return 0; + } + } + + /** + * 通过索引 获取list中的值 + * + * @param key 键 + * @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表尾,-2倒数第二个元素,依次类推 + * @return + */ + public Object lGetIndex(String key, long index) { + try { + return redisTemplate.opsForList().index(key, index); + } catch (Exception e) { + log.error(key, e); + return null; + } + } + + /** + * 将list放入缓存 + * + * @param key 键 + * @param value 值 + * @return + */ + public boolean lSet(String key, Object value) { + try { + redisTemplate.opsForList().rightPush(key, value); + return true; + } catch (Exception e) { + log.error(key, e); + return false; + } + } + + /** + * 将list放入缓存 + * + * @param key 键 + * @param value 值 + * @param time 时间(秒) + * @return + */ + public boolean lSet(String key, Object value, long time) { + try { + redisTemplate.opsForList().rightPush(key, value); + if (time > 0) { + expire(key, time); + } + return true; + } catch (Exception e) { + log.error(key, e); + return false; + } + } + + /** + * 将list放入缓存 + * + * @param key 键 + * @param value 值 + * @return + */ + public boolean lSet(String key, List value) { + try { + redisTemplate.opsForList().rightPushAll(key, value); + return true; + } catch (Exception e) { + log.error(key, e); + return false; + } + } + + /** + * 将list放入缓存 + * + * @param key 键 + * @param value 值 + * @param time 时间(秒) + * @return + */ + public boolean lSet(String key, List value, long time) { + try { + redisTemplate.opsForList().rightPushAll(key, value); + if (time > 0) { + expire(key, time); + } + return true; + } catch (Exception e) { + log.error(key, e); + return false; + } + } + + /** + * 根据索引修改list中的某条数据 + * + * @param key 键 + * @param index 索引 + * @param value 值 + * @return + */ + public boolean lUpdateIndex(String key, long index, Object value) { + try { + redisTemplate.opsForList().set(key, index, value); + return true; + } catch (Exception e) { + log.error(key, e); + return false; + } + } + + /** + * 移除N个值为value + * + * @param key 键 + * @param count 移除多少个 + * @param value 值 + * @return 移除的个数 + */ + public long lRemove(String key, long count, Object value) { + try { + Long remove = redisTemplate.opsForList().remove(key, count, value); + return remove; + } catch (Exception e) { + log.error(key, e); + return 0; + } + } +} diff --git a/src/main/java/com/telpo/dipperposition/common/SocketClient.java b/src/main/java/com/telpo/dipperposition/common/SocketClient.java new file mode 100644 index 0000000..79e8e94 --- /dev/null +++ b/src/main/java/com/telpo/dipperposition/common/SocketClient.java @@ -0,0 +1,85 @@ +package com.telpo.dipperposition.common; + +import lombok.extern.slf4j.Slf4j; + +import java.io.*; +import java.net.Socket; +/** + * @program: dipperposition + * @description: socket连接单元 + * @author: king + * @create: 2021-01-14 13:52 + **/ +@Slf4j +public class SocketClient { + //定义一个Socket对象 + Socket socket = null; + + public SocketClient(String host, int port, int timeout) { + try { + //需要服务器的IP地址和端口号,才能获得正确的Socket对象 + socket = new Socket(host, port); + socket.setSoTimeout(timeout); + } catch (IOException e) { + log.error("Socket Connect Error:" + e.getMessage()); + } + } + + public String getOutput() { + try { + OutputStream os = socket.getOutputStream(); + return os.toString(); + } catch (IOException e) { + log.error("Socket getOutputStream Error:" + e.getMessage()); + return null; + } + } + + + public String sendCmd(String astCmd, String ackAckCheckRef) { + try { + OutputStream os=socket.getOutputStream(); + PrintWriter pw=new PrintWriter(os); + // TODO 发生命令 + String info="用户名:Tom,用户密码:123456"; + pw.write(astCmd); + pw.flush(); + socket.shutdownOutput(); + + //接收服务器的相应 + String reply=null; + //输入流 + InputStream is=socket.getInputStream(); + BufferedReader br=new BufferedReader(new InputStreamReader(is)); + + String ackResult=""; + String ackHexOut = HexConvert.convertStringToHex(ackAckCheckRef); + while(!((reply=br.readLine())==null)){ + log.debug("接收服务器的信息:"+reply); + if (ackHexOut.equals(reply)) { + reply=br.readLine(); + ackResult = HexConvert.convertHexToString(reply); + break; + } + } + //4.关闭资源 + br.close(); + is.close(); + pw.close(); + os.close(); + return ackResult; + } catch (IOException e) { + log.error("Socket sendCmd Error:" + e.getMessage()); + return null; + } + } + + public void closeConnection() { + try { + socket.close(); + //socket.shutdownOutput(); + } catch (IOException e) { + log.error("Socket getOutputStream Error:" + e.getMessage()); + } + } +} diff --git a/src/main/java/com/telpo/dipperposition/common/TimeTools.java b/src/main/java/com/telpo/dipperposition/common/TimeTools.java new file mode 100644 index 0000000..f23879c --- /dev/null +++ b/src/main/java/com/telpo/dipperposition/common/TimeTools.java @@ -0,0 +1,39 @@ +package com.telpo.dipperposition.common; + +import tools.CommonTools; + +import java.time.LocalTime; +import java.time.format.DateTimeFormatter; + +/** + * @program: DataPushServer + * @description: 时间工具类 + * @author: linwl + * @create: 2020-08-05 11:16 + */ +public class TimeTools { + + private TimeTools() {} + + /** + * 校验时间段 + * + * @param startTime 起始时间 HH:mm + * @param endTime 结束时间 HH:mm + * @return + */ + public static boolean checkTime(String startTime, String endTime) { + boolean result = false; + DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("HH:mm"); + LocalTime pushStartTime = LocalTime.parse(startTime, dateTimeFormatter); + LocalTime pushEndTime = LocalTime.parse(endTime, dateTimeFormatter); + LocalTime zeroTime = LocalTime.parse("00:00"); + if (zeroTime.equals(pushEndTime)) { + pushEndTime = pushEndTime.plusNanos(-10); + } + if (CommonTools.checkInTime(pushStartTime, pushEndTime)) { + result = true; + } + return result; + } +} diff --git a/src/main/java/com/telpo/dipperposition/config/PositionConfig.java b/src/main/java/com/telpo/dipperposition/config/PositionConfig.java index 34b9059..d9faef3 100644 --- a/src/main/java/com/telpo/dipperposition/config/PositionConfig.java +++ b/src/main/java/com/telpo/dipperposition/config/PositionConfig.java @@ -1,22 +1,11 @@ package com.telpo.dipperposition.config; -import com.alibaba.nacos.api.NacosFactory; -import com.alibaba.nacos.api.PropertyKeyConst; -import com.alibaba.nacos.api.config.ConfigService; -import com.telpo.dipperposition.co.RzlAccount; import lombok.Getter; import lombok.Setter; import lombok.ToString; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.beans.factory.config.ConfigurableBeanFactory; -import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Lazy; -import org.springframework.context.annotation.Scope; - -import java.util.Properties; /** * @program: gateway diff --git a/src/main/java/com/telpo/dipperposition/config/db/MongoDbContext.java b/src/main/java/com/telpo/dipperposition/config/db/MongoDbContext.java new file mode 100644 index 0000000..f27c529 --- /dev/null +++ b/src/main/java/com/telpo/dipperposition/config/db/MongoDbContext.java @@ -0,0 +1,66 @@ +package com.telpo.dipperposition.config.db; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.data.mongodb.MongoDbFactory; +import org.springframework.data.mongodb.core.SimpleMongoClientDbFactory; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; + +import javax.annotation.PostConstruct; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +/** + * @program: DataPushServer + * @description: mongdb数据库连接上下文 + * @author: linwl + * @create: 2020-07-11 14:31 + */ +@Component +public class MongoDbContext { + + private static final Map MONGO_CLIENT_DB_FACTORY_MAP = new HashMap<>(); + private static final ThreadLocal MONGO_DB_FACTORY_THREAD_LOCAL = + new ThreadLocal<>(); + @Autowired + MongoListProperties mongoListProperties; + + public static MongoDbFactory getMongoDbFactory() { + return MONGO_DB_FACTORY_THREAD_LOCAL.get(); + } + + public static void setMongoDbFactory(String name) { + MONGO_DB_FACTORY_THREAD_LOCAL.set(MONGO_CLIENT_DB_FACTORY_MAP.get(name)); + } + + public static void removeMongoDbFactory() { + MONGO_DB_FACTORY_THREAD_LOCAL.remove(); + } + + @PostConstruct + public void afterPropertiesSet() { + if (!CollectionUtils.isEmpty(mongoListProperties.getDblist())) { + mongoListProperties + .getDblist() + .forEach( + info -> { + MONGO_CLIENT_DB_FACTORY_MAP.put( + info.getDatabase(), new SimpleMongoClientDbFactory(info.getUri())); + }); + } + } + + @Bean(name = "mongoTemplate") + public MultiMongoTemplate dynamicMongoTemplate() { + Iterator iterator = MONGO_CLIENT_DB_FACTORY_MAP.values().iterator(); + return new MultiMongoTemplate(iterator.next()); + } + + @Bean(name = "mongoDbFactory") + public MongoDbFactory mongoDbFactory() { + Iterator iterator = MONGO_CLIENT_DB_FACTORY_MAP.values().iterator(); + return iterator.next(); + } +} diff --git a/src/main/java/com/telpo/dipperposition/config/db/MongoListProperties.java b/src/main/java/com/telpo/dipperposition/config/db/MongoListProperties.java new file mode 100644 index 0000000..804eff3 --- /dev/null +++ b/src/main/java/com/telpo/dipperposition/config/db/MongoListProperties.java @@ -0,0 +1,30 @@ +package com.telpo.dipperposition.config.db; + +import lombok.Data; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import org.springframework.boot.context.properties.ConfigurationProperties; + +import java.util.List; + +/** + * @program: DataPushServer + * @description: mongo连接配置类 + * @author: linwl + * @create: 2020-07-11 14:41 + */ +@Getter +@Setter +@ToString +@ConfigurationProperties(prefix = "mongo.datasource") +public class MongoListProperties { + + private List dblist; + + @Data + public static class MongoList { + private String uri; + private String database; + } +} diff --git a/src/main/java/com/telpo/dipperposition/config/db/MultiMongoTemplate.java b/src/main/java/com/telpo/dipperposition/config/db/MultiMongoTemplate.java new file mode 100644 index 0000000..df79df4 --- /dev/null +++ b/src/main/java/com/telpo/dipperposition/config/db/MultiMongoTemplate.java @@ -0,0 +1,26 @@ +package com.telpo.dipperposition.config.db; + +import com.mongodb.client.MongoDatabase; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.mongodb.MongoDbFactory; +import org.springframework.data.mongodb.core.MongoTemplate; + +/** + * @program: DataPushServer + * @description: 多mongo数据源配置 + * @author: linwl + * @create: 2020-07-11 14:21 + */ +@Slf4j +public class MultiMongoTemplate extends MongoTemplate { + + public MultiMongoTemplate(MongoDbFactory mongoDbFactory) { + super(mongoDbFactory); + } + + @Override + protected MongoDatabase doGetDatabase() { + MongoDbFactory mongoDbFactory = MongoDbContext.getMongoDbFactory(); + return mongoDbFactory == null ? super.doGetDatabase() : mongoDbFactory.getDb(); + } +} diff --git a/src/main/java/com/telpo/dipperposition/controller/DipperPositionController.java b/src/main/java/com/telpo/dipperposition/controller/DipperPositionController.java index 4783081..df2e40f 100644 --- a/src/main/java/com/telpo/dipperposition/controller/DipperPositionController.java +++ b/src/main/java/com/telpo/dipperposition/controller/DipperPositionController.java @@ -1,23 +1,10 @@ package com.telpo.dipperposition.controller; -import com.alibaba.cloud.nacos.parser.NacosDataParserHandler; -import com.alibaba.nacos.api.PropertyKeyConst; -import com.alibaba.nacos.api.exception.NacosException; -import com.telpo.dipperposition.co.RzlAccount; -import com.telpo.dipperposition.config.PositionConfig; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; -import java.io.IOException; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.Executor; -import com.alibaba.nacos.api.NacosFactory; -import com.alibaba.nacos.api.config.ConfigService; -import com.alibaba.nacos.api.config.listener.Listener; /** * @program: DipperPositionController @@ -33,22 +20,7 @@ public class DipperPositionController { //RzlAccount rzlAccount; @RequestMapping("/getPos") - public String getPos() throws NacosException, InterruptedException, IOException { -// String group = "DEFAULT_GROUP"; -// String dataId = "dipperposition-service"; -// String positionId = "position.hello"; -// Properties properties = new Properties(); -// String serverAddr = "172.16.192.26"; -// properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr); -// ConfigService configService = NacosFactory.createConfigService(properties); -// String content = configService.getConfig(dataId, group, 10000); -// System.out.println(content); -// //log.info(positionId + ":" + content.positionId) -// Map dataMap = NacosDataParserHandler.getInstance().parseNacosData(content,"yaml"); -// return dataMap == null ? "" : (String)dataMap.get(positionId); - String returnStr= "return position = " + hello; - - return returnStr; - //return "Helle world!"; //ContextLoader.getCurrentWebApplicationContext().toString(); + public String getPos() { + return "return position = " + hello; } } diff --git a/src/main/java/com/telpo/dipperposition/entity/mongo/IPProvinceEntity.java b/src/main/java/com/telpo/dipperposition/entity/mongo/IPProvinceEntity.java new file mode 100644 index 0000000..d9f27df --- /dev/null +++ b/src/main/java/com/telpo/dipperposition/entity/mongo/IPProvinceEntity.java @@ -0,0 +1,24 @@ +package com.telpo.dipperposition.entity.mongo; + +import com.telpo.dipperposition.vo.IPProvinceVo; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import models.BaseMongoDbEntity; + +/** + * @program: IPProvinceEntity + * @description: 位置实体类 + * @author: linwl + * @create: 2020-07-11 15:33 + */ +@ToString +@Getter +@Setter +public class IPProvinceEntity extends BaseMongoDbEntity { + + /** Ip */ + private String ip; + /** Ip所在省份 */ + private String province; +} diff --git a/src/main/java/com/telpo/dipperposition/enums/DipperReturnValue.java b/src/main/java/com/telpo/dipperposition/enums/DipperReturnValue.java new file mode 100644 index 0000000..68fa68f --- /dev/null +++ b/src/main/java/com/telpo/dipperposition/enums/DipperReturnValue.java @@ -0,0 +1,53 @@ +package com.telpo.dipperposition.enums; + +/** + * @program: DipperReturnValue + * @description: 推送类型枚举 + * @author: king + * @create: 2021-01-24 11:44 + */ +public enum DipperReturnValue { + ACK(0, "SDBP-PUB–ACK"), + NACK(1, "SDBP-PUB–NACK"); + + /** 状态值 */ + private int value; + /** 描述 */ + private String describe; + + private DipperReturnValue(int value, String describe) { + this.value = value; + this.describe = describe; + } + + /** + * 获取推送类型 + * + * @param value + * @return + */ + public static DipperReturnValue getByValue(int value) { + for (DipperReturnValue enums : DipperReturnValue.values()) { + if (enums.getValue() == value) { + return enums; + } + } + return null; + } + + public int getValue() { + return value; + } + + public void setValue(int value) { + this.value = value; + } + + public String getDescribe() { + return describe; + } + + public void setDescribe(String describe) { + this.describe = describe; + } +} diff --git a/src/main/java/com/telpo/dipperposition/handler/NettyServerHandler.java b/src/main/java/com/telpo/dipperposition/handler/NettyServerHandler.java new file mode 100644 index 0000000..2549967 --- /dev/null +++ b/src/main/java/com/telpo/dipperposition/handler/NettyServerHandler.java @@ -0,0 +1,145 @@ +package com.telpo.dipperposition.handler; + +import com.telpo.dipperposition.enums.DipperReturnValue; +import com.telpo.dipperposition.service.IDipperAstPosAsyncTaskService; +import com.telpo.dipperposition.service.IDipperAstTimeAsyncTaskService; +import com.telpo.dipperposition.service.IDipperDataAsyncTaskService; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; + +import java.time.LocalDateTime; + +/** + * @program: dipperposition + * @description: Netty服务器处理句柄 + * @author: linwl + * @create: 2021-01-13 13:56 + **/ +@Slf4j +public class NettyServerHandler extends ChannelInboundHandlerAdapter { + + + @Autowired + private IDipperAstTimeAsyncTaskService dipperTimeAsyncTaskService; + @Autowired + private IDipperAstPosAsyncTaskService dipperAstPosAsyncTaskService; + @Autowired + private IDipperDataAsyncTaskService dipperDataAsyncTaskService; + + @Value(value = "${position.server.timeAsycPort}") + private String timeAsycServerPort; + + @Value(value = "${position.server.posAsycPort}") + private String posAsycServerPort; + + @Value(value = "${position.server.starsAsycPort}") + private String starsAsycServerPort; + /** + * 客户端连接会触发 + */ + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + log.info("Channel active......"); + + SocketChannel channel = (SocketChannel) ctx.channel(); + log.info("链接报告开始"); + log.info("链接报告信息:有一客户端链接到本服务端"); + log.info("链接报告IP:" + channel.localAddress().getHostString()); + log.info("链接报告Port:" + channel.localAddress().getPort()); + log.info("链接报告完毕"); + //通知客户端链接建立成功 + // 默认返回取得时间成功 + String ackAckCheckRef = "233E0101020004020A1D"; + if (Integer.parseInt(posAsycServerPort) == channel.localAddress().getPort()) { + ackAckCheckRef = "233E010102000401091C"; + } + if (Integer.parseInt(starsAsycServerPort) == channel.localAddress().getPort()) { + ackAckCheckRef = "233E010102000421293C"; + } + //String str = "通知客户端链接建立成功" + " " + LocalDateTime.now() + " " + channel.localAddress().getHostString() + + // "\r\n"; + ByteBuf buf = Unpooled.buffer(ackAckCheckRef.getBytes().length); + buf.writeBytes(ackAckCheckRef.getBytes("GBK")); + ctx.writeAndFlush(buf); + } + + + /** + * 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据 + */ + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + log.info("客户端断开链接,IP:{}", ctx.channel().localAddress().toString()); + } + + /** + * 客户端发消息会触发 + */ + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + //接收msg消息{与上一章节相比,此处已经不需要自己进行解码} + SocketChannel channel = (SocketChannel) ctx.channel(); + String ipAddress = channel.remoteAddress().toString(); + String message = " 接收到消息:{0}, 客户端IP:{1}"; + log.info(message ,msg, ipAddress); + + String channelAns = ""; + // 返回时间指令 + if (Integer.parseInt(timeAsycServerPort) == channel.localAddress().getPort()) { + // 初始时间辅助输入; + channelAns = dipperTimeAsyncTaskService.pushAstTime(); + } + + // 发送SDBP-AST-POS获取辅助位置信息 + if (Integer.parseInt(posAsycServerPort) == channel.localAddress().getPort()) { + channelAns = dipperAstPosAsyncTaskService.pushAstPos(ipAddress); + } + + // 从缓存获取SDBP-AST-EPH星历数 + if (Integer.parseInt(starsAsycServerPort) == channel.localAddress().getPort()) { + channelAns = dipperDataAsyncTaskService.getAstEPH(); + } + + + // 最后把SDBP-AST-TIME、SDBP-AST-POS、SDBP-AST-EPH并包一起发给设备。 + // 设备采用16进制获取数据,则代理服务器也是采用16进制返回数据。 + // 通知客户端链消息发送成功 + // String str = "服务端收到:" + LocalDateTime.now() + " " + msg + "\r\n"; + ByteBuf buf = Unpooled.buffer(channelAns.getBytes().length); + buf.writeBytes(channelAns.getBytes("GBK")); + ctx.writeAndFlush(buf); + //ctx.write("你也好哦"); + //ctx.flush(); + } + +// @Override +// public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) +// throws Exception { +// if (msg instanceof HttpRequest) { +// HttpRequest mReq = (HttpRequest) msg; +// String clientIP = mReq.headers().get("X-Forwarded-For"); +// if (clientIP == null) { +// InetSocketAddress insocket = (InetSocketAddress) ctx.channel() +// .remoteAddress(); +// clientIP = insocket.getAddress().getHostAddress(); +// } +// } +// } + + /** + * 发生异常触发 + */ + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + cause.printStackTrace(); + ctx.close(); + } + + +} diff --git a/src/main/java/com/telpo/dipperposition/handler/ServerChannelInitializer.java b/src/main/java/com/telpo/dipperposition/handler/ServerChannelInitializer.java new file mode 100644 index 0000000..95d98dd --- /dev/null +++ b/src/main/java/com/telpo/dipperposition/handler/ServerChannelInitializer.java @@ -0,0 +1,24 @@ +package com.telpo.dipperposition.handler; + +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.string.StringDecoder; +import io.netty.handler.codec.string.StringEncoder; +import io.netty.util.CharsetUtil; +/** + * @program: dipperposition + * @description: 服务器通道初始化 + * @author: king + * @create: 2021-01-13 13:54 + **/ +public class ServerChannelInitializer extends ChannelInitializer { + @Override + protected void initChannel(SocketChannel socketChannel) throws Exception { + //添加编解码 + socketChannel.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8)); + socketChannel.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8)); + socketChannel.pipeline().addLast(new NettyServerHandler()); + } + +} diff --git a/src/main/java/com/telpo/dipperposition/mapper/IPProvinceMapper.java b/src/main/java/com/telpo/dipperposition/mapper/IPProvinceMapper.java new file mode 100644 index 0000000..b42acc0 --- /dev/null +++ b/src/main/java/com/telpo/dipperposition/mapper/IPProvinceMapper.java @@ -0,0 +1,52 @@ +package com.telpo.dipperposition.mapper; + +import com.telpo.dipperposition.entity.mongo.IPProvinceEntity; +import db.BaseMongoDbDao; +import org.springframework.stereotype.Repository; + +import java.util.List; + +/** + * @program: DataPushServer + * @description: 推送记录mapper + * @author: linwl + * @create: 2020-07-20 11:12 + */ +@Repository +public class IPProvinceMapper extends BaseMongoDbDao { + + @Override + protected Class getEntityClass() { + return IPProvinceEntity.class; + } + + @Override + public void save(IPProvinceEntity entity, String collectionName) { + super.save(entity, collectionName); + } + + @Override + public void updateFirst(IPProvinceEntity srcObj, IPProvinceEntity targetObj) { + super.updateFirst(srcObj, targetObj); + } + + @Override + public List getPage(IPProvinceEntity object, int start, int size) { + return super.getPage(object, start, size); + } + + @Override + public List queryList(IPProvinceEntity object) { + return super.queryList(object); + } + + @Override + public List queryList(IPProvinceEntity object, String collectionName) { + return super.queryList(object, collectionName); + } + + @Override + public void deleteById(String id) { + super.deleteById(id); + } +} diff --git a/src/main/java/com/telpo/dipperposition/server/DipperPositionServer.java b/src/main/java/com/telpo/dipperposition/server/DipperPositionServer.java new file mode 100644 index 0000000..6afdc44 --- /dev/null +++ b/src/main/java/com/telpo/dipperposition/server/DipperPositionServer.java @@ -0,0 +1,153 @@ +package com.telpo.dipperposition.server; + +import com.telpo.dipperposition.handler.ServerChannelInitializer; +import com.telpo.dipperposition.service.IDipperDataAsyncTaskService; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; + +import java.net.InetSocketAddress; + + +/** + * @program: DipperPositionServer + * @description: 北斗定位 + * @author: king + * @create: 2021-01-13 14:01 + */ +@Slf4j +public class DipperPositionServer { + + @Value(value = "${position.serverAddr}") + private String serverAddr; + + @Value(value = "${position.server.timeAsycPort}") + private String timeAsycServerPort; + + @Value(value = "${position.server.posAsycPort}") + private String posAsycServerPort; + + @Value(value = "${position.server.starsAsycPort}") + private String starsAsycServerPort; + + /* + * 时间同步进程线程 + */ + public void startTimeAsnc() { + + //new 一个主线程组 + EventLoopGroup mainThreadGroup = new NioEventLoopGroup(1); + //new 一个工作线程组 + EventLoopGroup workThreadGroup = new NioEventLoopGroup(200); + InetSocketAddress socketAddress = new InetSocketAddress(serverAddr, Integer.parseInt(timeAsycServerPort)); + ServerBootstrap bootstrap = new ServerBootstrap() + .group(mainThreadGroup, workThreadGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new ServerChannelInitializer()) + .localAddress(socketAddress) + //设置队列大小 + .option(ChannelOption.SO_BACKLOG, 1024) + // 两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文 + .childOption(ChannelOption.SO_KEEPALIVE, true); + + + //绑定端口,开始接收进来的连接 + try { + ChannelFuture future = bootstrap.bind(socketAddress).sync(); + log.info("服务器启动开始监听端口: {}", socketAddress.getPort()); + + + future.channel().closeFuture().sync(); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + //关闭主线程组 + mainThreadGroup.shutdownGracefully(); + //关闭工作线程组 + workThreadGroup.shutdownGracefully(); + } + } + + /* + * 时间同步进程线程 + */ + public void startPosAsnc() { + + //new 一个主线程组 + EventLoopGroup mainThreadGroup = new NioEventLoopGroup(1); + //new 一个工作线程组 + EventLoopGroup workThreadGroup = new NioEventLoopGroup(200); + InetSocketAddress socketAddress = new InetSocketAddress(serverAddr, Integer.parseInt(posAsycServerPort)); + ServerBootstrap bootstrap = new ServerBootstrap() + .group(mainThreadGroup, workThreadGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new ServerChannelInitializer()) + .localAddress(socketAddress) + //设置队列大小 + .option(ChannelOption.SO_BACKLOG, 1024) + // 两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文 + .childOption(ChannelOption.SO_KEEPALIVE, true); + + + //绑定端口,开始接收进来的连接 + try { + ChannelFuture future = bootstrap.bind(socketAddress).sync(); + log.info("服务器启动开始监听端口: {}", socketAddress.getPort()); + + + future.channel().closeFuture().sync(); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + //关闭主线程组 + mainThreadGroup.shutdownGracefully(); + //关闭工作线程组 + workThreadGroup.shutdownGracefully(); + } + } + + + /* + * 星历同步进程线程 + */ + public void startStarsAsnc() { + + //new 一个主线程组 + EventLoopGroup mainThreadGroup = new NioEventLoopGroup(1); + //new 一个工作线程组 + EventLoopGroup workThreadGroup = new NioEventLoopGroup(200); + InetSocketAddress socketAddress = new InetSocketAddress(serverAddr, Integer.parseInt(starsAsycServerPort)); + ServerBootstrap bootstrap = new ServerBootstrap() + .group(mainThreadGroup, workThreadGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new ServerChannelInitializer()) + .localAddress(socketAddress) + //设置队列大小 + .option(ChannelOption.SO_BACKLOG, 1024) + // 两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文 + .childOption(ChannelOption.SO_KEEPALIVE, true); + + + //绑定端口,开始接收进来的连接 + try { + ChannelFuture future = bootstrap.bind(socketAddress).sync(); + log.info("服务器启动开始监听端口: {}", socketAddress.getPort()); + + + future.channel().closeFuture().sync(); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + //关闭主线程组 + mainThreadGroup.shutdownGracefully(); + //关闭工作线程组 + workThreadGroup.shutdownGracefully(); + } + } +} diff --git a/src/main/java/com/telpo/dipperposition/service/IDipperAstPosAsyncTaskService.java b/src/main/java/com/telpo/dipperposition/service/IDipperAstPosAsyncTaskService.java new file mode 100644 index 0000000..c5e2006 --- /dev/null +++ b/src/main/java/com/telpo/dipperposition/service/IDipperAstPosAsyncTaskService.java @@ -0,0 +1,23 @@ +package com.telpo.dipperposition.service; + +import java.io.UnsupportedEncodingException; + +/** + * @program: IDipperAstPosAsyncTaskService + * @description: 系统预先基于省份的省会城市的经纬度作为辅助信息, + * 根据设备请求的IP地址,从高德IP定位服务获取相关的省份,再匹配相应的位置信息作为辅助位置信息。 + * 如果匹配不到省份,则以武汉中心作为辅助为位置信息。 + * 关于IP与省份的关系保存到缓存中,用于下次使用时,先在缓存中获取匹配信息,匹配不到,再请求高德IP定位服务。 + * 高德IP定位服务:https://lbs.amap.com/api/webservice/guide/api/ipconfig。 + * @author: king + * @create: 2021-01-17 16:24 + */ +public interface IDipperAstPosAsyncTaskService { + + /** + * 同步任务 + * + */ + String pushAstPos(String ipAddress) throws UnsupportedEncodingException; + +} diff --git a/src/main/java/com/telpo/dipperposition/service/IDipperAstTimeAsyncTaskService.java b/src/main/java/com/telpo/dipperposition/service/IDipperAstTimeAsyncTaskService.java new file mode 100644 index 0000000..5dfeb0e --- /dev/null +++ b/src/main/java/com/telpo/dipperposition/service/IDipperAstTimeAsyncTaskService.java @@ -0,0 +1,17 @@ +package com.telpo.dipperposition.service; + +/** + * @program: IDipperDataAsyncTaskService + * @description: 发送SDBP-AST-TIME获取时间信息 + * @author: king + * @create: 2021-01-17 16:24 + */ +public interface IDipperAstTimeAsyncTaskService { + + /** + * 同步任务 + * + */ + String pushAstTime(); + +} diff --git a/src/main/java/com/telpo/dipperposition/service/IDipperDataAsyncTaskService.java b/src/main/java/com/telpo/dipperposition/service/IDipperDataAsyncTaskService.java new file mode 100644 index 0000000..f050b9c --- /dev/null +++ b/src/main/java/com/telpo/dipperposition/service/IDipperDataAsyncTaskService.java @@ -0,0 +1,26 @@ +package com.telpo.dipperposition.service; + +/** + * @program: IDipperDataAsyncTaskService + * @description: 发送bds获取星历数据。 + * * 每30分钟获取1次,30秒超时, + * * 如果失败,则可以等待10秒再获取1次 + * @author: king + * @create: 2021-01-17 16:24 + */ +public interface IDipperDataAsyncTaskService { + + /** + * 同步任务 + * + */ + void pullAstEPH(); + + + /** + * 根据IP获取EPH + * + */ + String getAstEPH(); + +} diff --git a/src/main/java/com/telpo/dipperposition/service/IPProvinceService.java b/src/main/java/com/telpo/dipperposition/service/IPProvinceService.java new file mode 100644 index 0000000..f8e3c66 --- /dev/null +++ b/src/main/java/com/telpo/dipperposition/service/IPProvinceService.java @@ -0,0 +1,45 @@ +package com.telpo.dipperposition.service; + +import com.telpo.dipperposition.entity.mongo.IPProvinceEntity; +import com.telpo.dipperposition.vo.IPProvinceVo; + +/** + * @program: IPProvinceService + * @description: IP省份服务接口 + * @author: king + * @create: 2020-07-20 11:09 + */ +public interface IPProvinceService { + + /** + * 保存IP省份 + * + * @param entity + * @return + */ + boolean saveIPProvince(IPProvinceEntity entity); + + /** + * 更新IP省份 + * + * @param query + * @param update + * @return + */ + boolean updateIPProvince( + IPProvinceEntity query, IPProvinceEntity update); + + /** + * 根据ID移除IP省份记录 + * + * @param id + * @return + */ + boolean romveById(String id); + + /* + * @param ipAddress + * 获取IP省份 + */ + IPProvinceEntity getIPProvince(String ipAddress); +} diff --git a/src/main/java/com/telpo/dipperposition/service/impl/DipperAstPosAsyncTaskServiceImpl.java b/src/main/java/com/telpo/dipperposition/service/impl/DipperAstPosAsyncTaskServiceImpl.java new file mode 100644 index 0000000..fc1831e --- /dev/null +++ b/src/main/java/com/telpo/dipperposition/service/impl/DipperAstPosAsyncTaskServiceImpl.java @@ -0,0 +1,251 @@ +package com.telpo.dipperposition.service.impl; + +import com.alibaba.fastjson.JSONObject; +import com.google.common.base.Joiner; +import com.telpo.dipperposition.common.*; +import com.telpo.dipperposition.entity.mongo.IPProvinceEntity; +import com.telpo.dipperposition.mapper.IPProvinceMapper; +import com.telpo.dipperposition.service.IDipperAstPosAsyncTaskService; +import com.telpo.dipperposition.service.IPProvinceService; +import com.telpo.dipperposition.vo.IPProvinceVo; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.ObjectUtils; +import org.apache.commons.lang3.StringUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; + +import java.io.UnsupportedEncodingException; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.List; + +/** + * @program: DipperAstPosAsyncTaskServiceImpl + * @description: 系统预先基于省份的省会城市的经纬度作为辅助信息, + * * 根据设备请求的IP地址,从高德IP定位服务获取相关的省份,再匹配相应的位置信息作为辅助位置信息。 + * * 如果匹配不到省份,则以武汉中心作为辅助为位置信息。 + * * 关于IP与省份的关系保存到缓存中,用于下次使用时,先在缓存中获取匹配信息,匹配不到,再请求高德IP定位服务。 + * * 高德IP定位服务:https://lbs.amap.com/api/webservice/guide/api/ipconfig。 + * @author: king + * @create: 2021-01-10 14:01 + */ +@Service +@Slf4j +public class DipperAstPosAsyncTaskServiceImpl implements IDipperAstPosAsyncTaskService { + + @Autowired + private RedisUtil redisUtil; + @Autowired + private OkHttpUtil okHttpUtil; + @Autowired + private IPProvinceService iPProvinceService; + + @Value("${pos.centerProvinceFilePath}") + String centerProvinceFilePath; + + @Value("${pos.ipPositionRequestPath}") + String ipPositionRequestPath; + @Value("${pos.ipPositionRequestKey}") + String ipPositionRequestKey; + @Value("${pos.centerProvince}") + String centerProvince; + + + @Value("${pos.ast.server}") + String astServer; + @Value("${pos.ast.posAstPort}") + int posAstPort; + @Value("${pos.ast.timeout}") + int astTimeout; + +// private String getAstPos(String ipAddress) throws UnsupportedEncodingException { +// +// String centerAddress = getIpPositionProvince(ipAddress); +// if (ObjectUtils.isEmpty(centerAddress) || centerAddress.equals("0")) { +// log.warn("IP地址非法,无法获取辅助位置信息!"); +// // 返回武汉的定位数据 +// centerAddress = centerProvince; +// } else { +// // 保存到mongoDB +// createIPProvince(ipAddress, centerAddress); +// } +// +// String lonAndAlt; +// if (redisUtil.hasKey(centerAddress)) { +// // 获取省会城市定位信息 +// lonAndAlt= (String) redisUtil.get(centerAddress); +// } else { +// // 请求高德IP定位服务 +// this.getPosFromFile(centerAddress); +// lonAndAlt = (String) redisUtil.get(centerAddress); +// } +// +// return lonAndAlt; +// } + + // 从CSV文件读取省会城市中心点位置信息 + private void getPosFromFile(String centerAddress) { + // 不存在说明token是已过期了 + String centerProvinceName = ""; + String centerProvinceLonAndAlt = ""; + List centerAddressSets = CSVUtil.readCSV(centerProvinceFilePath); + for (String centerAddressSet:centerAddressSets) { + String[] centerAddressSetArray = centerAddressSet.split(","); + if (centerAddressSetArray.length < 3) { + log.warn("CSV数据格式错误"); + } else { + centerProvinceName = centerAddressSetArray[3]; + centerProvinceLonAndAlt = centerAddressSetArray[1]+","+centerAddressSetArray[2]; + redisUtil.set(centerProvinceName, centerProvinceLonAndAlt, 0); + } + } + } + + // 根据IP获取省会信息 + private String getIpPositionProvince(String ipAddress) { + + // 关于IP与省份的关系保存到缓存中 + // 使用时,先在缓存中获取匹配信息 + // 用mongodb实现 + IPProvinceEntity ipProvinceEntity = iPProvinceService.getIPProvince(ipAddress); + if (ipProvinceEntity == null) { + // 匹配不到,再请求高德IP定位服务。 + JSONObject userObj = new JSONObject(); + userObj.put("ip", ipAddress); + userObj.put("key", ipPositionRequestKey); + + JSONObject json = okHttpUtil.postRequestWithJson(ipPositionRequestPath, null, userObj); + if (ObjectUtils.isNotEmpty(json)) { + String province = (String) json.get("province"); + if (ObjectUtils.isEmpty(province)) { + log.debug("json is :" + json.toString()); + return null; + } + return province; + } else { + // 意外错误 + log.debug("ip address is null"); + return null; + } + } else { + return ipProvinceEntity.getProvince(); + } + } + + // 将IP对应的省会保存到mongoDB + private void createIPProvince(String ipAddress, String province) { + log.debug("异步创建推送失败任务记录!"); + try { + IPProvinceEntity ipProvinceEntity = iPProvinceService.getIPProvince(ipAddress); + if (ipProvinceEntity == null) { + //DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + ipProvinceEntity.setIp(ipAddress); + ipProvinceEntity.setProvince(province); + iPProvinceService.saveIPProvince(ipProvinceEntity); + } else { + + ipProvinceEntity.setProvince(province); + iPProvinceService.updateIPProvince(ipProvinceEntity, ipProvinceEntity); + } + } catch (Exception e) { + log.error("创建推送失败记录异常:", e); + } + } + + /* + * 获取定位辅助信息 + * @param ipAddress + */ + @Override + public String pushAstPos(String ipAddress) throws UnsupportedEncodingException { + // (1) 获取省会城市信息 + String centerAddress = getIpPositionProvince(ipAddress); + if (ObjectUtils.isEmpty(centerAddress) || centerAddress.equals("0")) { + log.warn("IP地址非法,无法获取辅助位置信息!"); + // 返回武汉的定位数据 + centerAddress = centerProvince; + } else { + // 保存到mongoDB + createIPProvince(ipAddress, centerAddress); + } + + String lonAndAlt; + if (redisUtil.hasKey(centerAddress)) { + // 获取省会城市定位信息 + lonAndAlt= (String) redisUtil.get(centerAddress); + } else { + // 请求高德IP定位服务 + this.getPosFromFile(centerAddress); + lonAndAlt = (String) redisUtil.get(centerAddress); + } + + // (2) 处理返回结果 + if (lonAndAlt == null) { + // null处理 + log.error("系统错误,请联系系统管理员。"); + return null; + //return; + } + + // push to GNNS Server + String pushResult = getCmdOfPos(lonAndAlt); + return pushResult; + } + + // 组装命令发送给设备 + private String getCmdOfPos(String astPos) { + + // 创建Socket客户端实例; + // SocketClient client = new SocketClient(astServer, posAstPort, astTimeout); + + // 时间和位置不是从服务器获取,而是本地生成 + String[] astPosArray = astPos.split(","); + String lan = astPosArray[0].trim(); + String alt = astPosArray[1].trim(); + double lanValue = Double.parseDouble(lan) * 10000000; + long lanLongValue = Double.doubleToLongBits(lanValue); + if (lanLongValue < 0) { + lanLongValue = lanLongValue + 4294967295L + 1; + } + double altValue = Double.parseDouble(alt) * 10000000; + long altLongValue = Double.doubleToLongBits(altValue); + if (altLongValue < 0) { + altLongValue = altLongValue + 4294967295L + 1; + } + // 数值换算举例(以经度举例。纬度、高度、位置精度换算方法一致): + // (1)经度数值为 113.431,则换算方法如下: + // 113.431/比例因子 = 1134310000(十进制) + //  439C3270(十六进制) + //  经度数据填入 70 32 9C 43(小端模式) + // (2)经度数值为-113.431,则换算方法如下: + // 113.431/比例因子 = 1134310000(十进制) + //  439C3270(十六进制) + //  FFFFFFFF - 439C3270 + 1= BC63CD90(补码) + // 经度数据填入 90 CD 63 BC(小端模式) + // 指令(十六进制) + // 举例: 23 3E 04 01 10 00 70 32 9C 43 D0 B2 CE 0D 70 17 00 00 40 0D 03 00 CA 95 + // 其中 + // 23 3E 为同步头 + // 04 01 为识别码 + // 10 00 表示长度为 16 + // 70 32 9C 43 表示注入的辅助经度为 113.431 度 + // D0 B2 CE 0D 表示注入的辅助纬度为 23.165 度 + // 00 2F 为校验和 + + // astTimeCmd 组装 + String astTimeCmd = "233E0401"; + astTimeCmd += "1000"; + astTimeCmd += HexConvert.encodeHEX(lanLongValue); + astTimeCmd += HexConvert.encodeHEX(altLongValue); + + String hexIn = HexConvert.convertStringToHex(astTimeCmd) + HexConvert.makeChecksum(astTimeCmd); + + //String sendResult = client.sendCmd(hexIn, ackAckCheckRef); + //client.closeConnection(); + + //return sendResult; + return hexIn; + } +} diff --git a/src/main/java/com/telpo/dipperposition/service/impl/DipperAstTimeAsyncTaskServiceImpl.java b/src/main/java/com/telpo/dipperposition/service/impl/DipperAstTimeAsyncTaskServiceImpl.java new file mode 100644 index 0000000..488936d --- /dev/null +++ b/src/main/java/com/telpo/dipperposition/service/impl/DipperAstTimeAsyncTaskServiceImpl.java @@ -0,0 +1,104 @@ +package com.telpo.dipperposition.service.impl; + +import com.telpo.dipperposition.common.HexConvert; +import com.telpo.dipperposition.common.SocketClient; +import com.telpo.dipperposition.service.IDipperAstTimeAsyncTaskService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import java.time.LocalDateTime; + +/** + * @program: DipperDataAsyncTaskServiceImpl + * @description: 发送SDBP-AST-TIME获取时间信息 + * @author: king + * @create: 2021-01-10 14:01 + */ +@Service +@Slf4j +public class DipperAstTimeAsyncTaskServiceImpl implements IDipperAstTimeAsyncTaskService { + + @Override + public String pushAstTime() { + + // (1) 发送SDBP-AST-TIME + // String sendResult = pushTimeToDipper(); + //if (sendResult == null) { + // log.error("取不到时间。"); + // return null; + //} + + // (2) 获取时间信息 + return pushTimeToDipper(); + //return sendResult; + } + + private String pushTimeToDipper() { + + // 创建Socket客户端实例; + // SocketClient client = new SocketClient(astServer, timeAstPort, astTimeout); + + // 时间和位置不是从服务器获取,而是本地生成(其中时间误差不超过3s) + // 23 3E 04 02 10 00 20 E1 07 09 14 0C 22 38 00 00 00 00 00 28 6B EE 22 98 + // 23 3E 为同步头 + // 04 02 为识别码 + // 10 00 表示长度为 16 + // --日期-- + // 20 E1 表示闰秒改正数为 + // E1 07 表示年为 2017 年(十六进制 07E1 转为十进制) + // 09 表示 9 月 + // 14 表示 20 日 + // 0C 22 38 00 00 00 00 00 表示 UTC时间,为12时34分56.0秒(小数秒建议固定为 0) + // 00 28 6B EE 表示 4 秒的时间精度(十六进制 EE6B2800 转为十进制为 4000000000,乘以比 例因子 10-9就是 4 秒) + // 00 2F 为校验和 + // TODO astTimeCmd 组装 + String astTimeCmd = "233E0402"; + astTimeCmd += "1000"; + astTimeCmd += "20E1"; + LocalDateTime now = LocalDateTime.now(); + int year = now.getYear(); + int month = now.getMonthValue(); + int day = now.getDayOfMonth(); + String hexYearString = Integer.toHexString(year); + hexYearString = "0" + hexYearString; + astTimeCmd += hexYearString.substring(2,2) + hexYearString.substring(0,2); + String hexMonthString = Integer.toHexString(month); + hexMonthString = "0" + hexMonthString; + astTimeCmd += hexMonthString; + String hexDayString = Integer.toHexString(day); + if (day < 16) { + hexDayString = "0" + hexDayString; + } + astTimeCmd += hexDayString; + + int hour = now.getHour(); + int minitor = now.getMinute(); + int second = now.getSecond(); + String hexHourString = Integer.toHexString(hour); + if (hour < 16) { + hexHourString = "0" + hexHourString; + } + astTimeCmd += hexHourString; + String hexMinitorString = Integer.toHexString(minitor); + if (minitor < 16) { + hexMinitorString = "0" + hexMinitorString; + } + astTimeCmd += hexMinitorString; + String hexSecondString = Integer.toHexString(second); + if (second < 16) { + hexSecondString = "0" + hexSecondString; + } + astTimeCmd += hexSecondString; + astTimeCmd += "0000000000"; + astTimeCmd += "00286BEE"; + + String hexIn = HexConvert.convertStringToHex(astTimeCmd) + HexConvert.makeChecksum(astTimeCmd); + + //String ackAckCheckRef = "233E0101020004020A1D"; + //String sendResult = client.sendCmd(hexIn, ackAckCheckRef); + //client.closeConnection(); + + return hexIn; + } +} diff --git a/src/main/java/com/telpo/dipperposition/service/impl/DipperDataAsyncTaskServiceImpl.java b/src/main/java/com/telpo/dipperposition/service/impl/DipperDataAsyncTaskServiceImpl.java new file mode 100644 index 0000000..92870c5 --- /dev/null +++ b/src/main/java/com/telpo/dipperposition/service/impl/DipperDataAsyncTaskServiceImpl.java @@ -0,0 +1,73 @@ +package com.telpo.dipperposition.service.impl; + +import com.telpo.dipperposition.common.HexConvert; +import com.telpo.dipperposition.common.RedisUtil; +import com.telpo.dipperposition.common.SocketClient; +import com.telpo.dipperposition.service.IDipperDataAsyncTaskService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +/** + * @program: DipperDataAsyncTaskServiceImpl + * @description: 获取星历数据。 + * @author: king + * @create: 2021-01-10 14:01 + */ +@Service +@Slf4j +public class DipperDataAsyncTaskServiceImpl implements IDipperDataAsyncTaskService { + + @Autowired + private RedisUtil redisUtil; + + @Value("${pos.ast.server}") + String astServer; + @Value("${pos.ast.ephAstPort}") + int ephAstPort; + @Value("${pos.ast.ephAstHexPort}") + int ephAstHexPort; + @Value("${pos.ast.timeout}") + int astTimeout; + + + private static String DIPPER_DATA_KEY = "TaidouDipperData"; + + @Override + public void pullAstEPH() { + + // (1) 发送bds获取星历数据 + String dipperData = pullEPHFromDipper(); + + // (2) 获取星历数据 + if (dipperData == null) { + log.error("获取星历数据错误,取不到星历数据。"); + } else { + // 保存到DB或者缓存 + redisUtil.set(DIPPER_DATA_KEY,dipperData); + } + } + + + private String pullEPHFromDipper() { + + // 创建Socket客户端实例; + SocketClient client = new SocketClient(astServer, ephAstPort, astTimeout); + + // astTimeCmd 组装 + String astTimeCmd = "all"; + String hexIn = HexConvert.convertStringToHex(astTimeCmd) + HexConvert.makeChecksum(astTimeCmd); + + String ackAckCheckRef = "233E010102000421293C"; + String sendResult = client.sendCmd(hexIn, ackAckCheckRef); + client.closeConnection(); + + return sendResult; + } + + @Override + public String getAstEPH(){ + return (String)redisUtil.get(DIPPER_DATA_KEY); + } +} diff --git a/src/main/java/com/telpo/dipperposition/service/impl/IPProvinceServiceImpl.java b/src/main/java/com/telpo/dipperposition/service/impl/IPProvinceServiceImpl.java new file mode 100644 index 0000000..ca2de93 --- /dev/null +++ b/src/main/java/com/telpo/dipperposition/service/impl/IPProvinceServiceImpl.java @@ -0,0 +1,62 @@ +package com.telpo.dipperposition.service.impl; + +import com.telpo.dipperposition.entity.mongo.IPProvinceEntity; +import com.telpo.dipperposition.mapper.IPProvinceMapper; +import com.telpo.dipperposition.service.IPProvinceService; +import com.telpo.dipperposition.vo.IPProvinceVo; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.ObjectUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.List; + +/** + * @program: DataPushServer + * @description: 推送记录服务接口实现类 + * @author: linwl + * @create: 2020-07-20 11:09 + */ +@Slf4j +@Service +public class IPProvinceServiceImpl implements IPProvinceService { + + @Autowired + private IPProvinceMapper iPProvinceMapper; + + @Override + public boolean saveIPProvince(IPProvinceEntity entity) { + iPProvinceMapper.save(entity); + return true; + } + + @Override + public boolean updateIPProvince( + IPProvinceEntity query, IPProvinceEntity update) { + iPProvinceMapper.updateFirst(query, update); + return true; + } + + @Override + public boolean romveById(String id) { + iPProvinceMapper.deleteById(id); + return false; + } + + @Override + public IPProvinceEntity getIPProvince(String ipAddress) { + try { + IPProvinceEntity query = new IPProvinceEntity(); + query.setIp(ipAddress); + List pushRecords = iPProvinceMapper.queryList(query); + if (ObjectUtils.isNotEmpty(pushRecords)) { + return pushRecords.get(0); + } else { + return null; + } + } catch (Exception e) { + log.error("获取IP省份异常:", e); + return null; + } + } +} diff --git a/src/main/java/com/telpo/dipperposition/task/ScheduleService.java b/src/main/java/com/telpo/dipperposition/task/ScheduleService.java new file mode 100644 index 0000000..70d31f1 --- /dev/null +++ b/src/main/java/com/telpo/dipperposition/task/ScheduleService.java @@ -0,0 +1,39 @@ +package com.telpo.dipperposition.task; + +import com.telpo.dipperposition.service.IDipperDataAsyncTaskService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + + +/** + * @program: DataPushServer + * @description: 定时执行任务服务 + * @author: king + * @create: 2021-01-17 16:24 + */ +@Component +@Slf4j +public class ScheduleService { + + @Autowired + private IDipperDataAsyncTaskService dipperDataAsyncTaskService; + + /* + * 调用9012端口的接口获取星历数据。 + * 通过TCP连接服务器agnss.techtotop.com:9012,发送bds获取星历数据。 + * 每30分钟获取1次,30秒超时, + * 如果失败,则可以等待10秒再获取1次。 * + */ + @Scheduled(cron = "${scheduler.task.cron}") + public void pullData() { + log.info("开始星历数据同步!"); + // 获取推送失败的记录 + try { + dipperDataAsyncTaskService.pullAstEPH(); + } catch (Exception e) { + log.error("执行定时获取星历数据发生异常:", e); + } + } +} diff --git a/src/main/java/com/telpo/dipperposition/vo/IPProvinceVo.java b/src/main/java/com/telpo/dipperposition/vo/IPProvinceVo.java new file mode 100644 index 0000000..df9abeb --- /dev/null +++ b/src/main/java/com/telpo/dipperposition/vo/IPProvinceVo.java @@ -0,0 +1,20 @@ +package com.telpo.dipperposition.vo; + +import lombok.Getter; +import lombok.Setter; + +/** + * @program: DataPushServer + * @description: 基础类 + * @author: linwl + * @create: 2020-07-17 16:41 + */ +@Setter +@Getter +public class IPProvinceVo { + + /** Ip */ + private String ip; + /** Ip所在省份 */ + private String province; +} diff --git a/src/main/resources/bootstrap.yaml b/src/main/resources/bootstrap.yaml index 6f36d67..707693d 100644 --- a/src/main/resources/bootstrap.yaml +++ b/src/main/resources/bootstrap.yaml @@ -7,4 +7,28 @@ spring: nacos: config: server-addr: 172.16.192.26:8848 - file-extension: yml \ No newline at end of file + file-extension: yaml +scheduler: + task: + pool: + size: 2 + #等待任务完成退出最大秒数 + await-seconds: 600 + cron: "0 */30 * * * *" + +pos: + centerProvinceFilePath: /csv/provinceLonAlt.csv + ipPositionRequestPath: https://restapi.amap.com/v3/ip + ipPositionRequestKey: 65e794b0a1a4b87eeec86f93fea05411 + centerProvince: 湖北省 + ast: + server: agnss.techtotop.com + ephAstPort: 8012 + ephAstHexPort: 9012 + timeout: 30000 + +position: + server: + timeAsycPort: + posAsycPort: + starsAsycPort: \ No newline at end of file