diff --git a/TelpoPush.WanJiaAn.Worker/Handlers/KafkaSubscribe.cs b/TelpoPush.WanJiaAn.Worker/Handlers/KafkaSubscribe.cs index 6b12899..9ea96c0 100644 --- a/TelpoPush.WanJiaAn.Worker/Handlers/KafkaSubscribe.cs +++ b/TelpoPush.WanJiaAn.Worker/Handlers/KafkaSubscribe.cs @@ -26,29 +26,36 @@ namespace TelpoPush.WanJiaAn.Worker.Handlers { #if DEBUG //_logger.LogInformation("11312"); - //var temp = new Headers(); - //string topic = "topic.push.WanJiaAn"; - //temp.Add(new Header("DataType", new byte[] { 0, 0, 0, 0 })); - //temp.Add(new Header("AlarmType", new byte[] { 2, 0, 0, 0 })); - //string psych = "{\"messageId\":\"1790941606816612864\",\"topic\":\"topic.push.third\",\"time\":\"2024-05-16 11:05:27\",\"data\":{\"imei\":\"861281060093147\",\"atteryLowId\":\"861281060093147664577f9\",\"info\":\"设备电量低于15%\"}}"; - //await _WanJiaAnProcess.SendWanJiaAn(psych, topic, temp); - // await _kafkaService.SubscribeAsync(DoReceive, CancellationToken.None); + var temp = new Headers(); + string topic = "topic.wanjiaan.push.telpo"; + temp.Add(new Header("DataType", new byte[] { 0, 0, 0, 0 })); + temp.Add(new Header("AlarmType", new byte[] { 2, 0, 0, 0 })); + string json1 = "{\"msg_id\":\"33090000000023391723109173034\",\"device_id\":\"3309000000002339\",\"type\":\"DEVICE_REPORT\",\"device_type\":\"XZL-Q42\",\"event_time\":\"1723109173\",\"app_id\":6755,\"user_id\":\"tel1000\",\"data\":{\"alarm_event\":{\"event_id\":\"17231091733309000000002339\",\"event_start\":\"1723109173\",\"event_type\":2,\"report_type\":1}}}"; + await _WanJiaAnProcess.SendWanJiaAn(json1, topic, temp); + + string json2 = "{\"msg_id\":\"33090000000023391723109197082\",\"device_id\":\"3309000000002339\",\"type\":\"DEVICE_REPORT\",\"device_type\":\"XZL-Q42\",\"event_time\":\"1723109197\",\"app_id\":6755,\"user_id\":\"tel1000\",\"data\":{\"alarm_event\":{\"event_end\":\"1723109197\",\"event_id\":\"17231091733309000000002339\",\"event_start\":\"1723109173\",\"event_type\":2,\"report_type\":2}}}"; + await _WanJiaAnProcess.SendWanJiaAn(json2, topic, temp); + + string json3 = "{\"msg_id\":\"33090000000023391723109197036\",\"device_id\":\"3309000000002339\",\"type\":\"DEVICE_REPORT\",\"device_type\":\"XZL-Q42\",\"event_time\":\"1723109197\",\"app_id\":6755,\"user_id\":\"tel1000\",\"data\":{\"alarm_event\":{\"event_end\":\"1723109197\",\"event_id\":\"17231091733309000000002339\",\"event_start\":\"1723109173\",\"event_type\":2,\"image\":\"http://s-cn-xiaoz-1304525121.cos.ap-guangzhou.myqcloud.com/cloud_ts%2F7_3309000000002339%2F3309000000002339_1723109174_p37.jpeg?sign=q-sign-algorithm%3Dsha1%26q-ak%3DAKIDdtlQq8bpIWdZH4PBD1xe4Qgon9IXLLrB%26q-sign-time%3D1723109175%3B3446823150%26q-key-time%3D1723109175%3B3446823150%26q-header-list%3Dhost%26q-url-param-list%3D%26q-signature%3D708db58b15fdcaceb8bf6eeded5e2664b015cb6c\",\"report_type\":3,\"url\":\"http://s-cn-xiaoz-1304525121.cos.ap-guangzhou.myqcloud.com/cloud_ts%2F7_3309000000002339%2F1723046400_1723109171_1723109175.ts?sign=q-sign-algorithm%3Dsha1%26q-ak%3DAKIDdtlQq8bpIWdZH4PBD1xe4Qgon9IXLLrB%26q-sign-time%3D1723109176%3B3446823152%26q-key-time%3D1723109176%3B3446823152%26q-header-list%3Dhost%26q-url-param-list%3D%26q-signature%3D2fad04c97bfd9113491eada7d221cc5caab57e5b\",\"video_end\":\"1723109175\",\"video_start\":\"1723109171\"}}}"; + await _WanJiaAnProcess.SendWanJiaAn(json3, topic, temp); + + // await _kafkaService.SubscribeAsync(DoReceive, CancellationToken.None); #else - LimitedConcurrencyLevelTaskScheduler lcts = new LimitedConcurrencyLevelTaskScheduler(5); - TaskFactory factory = new TaskFactory(lcts); - try - { - await factory.StartNew(async () => - { - await _kafkaService.SubscribeAsync(DoReceive, CancellationToken.None); - }); - } - catch (Exception ex) - { - _logger.LogError($"Subscribe 处理Kafka数据发生异常 {ex.Message}|{ex.Source}|{ex.StackTrace}"); - } + //LimitedConcurrencyLevelTaskScheduler lcts = new LimitedConcurrencyLevelTaskScheduler(5); + //TaskFactory factory = new TaskFactory(lcts); + //try + //{ + // await factory.StartNew(async () => + // { + // await _kafkaService.SubscribeAsync(DoReceive, CancellationToken.None); + // }); + //} + //catch (Exception ex) + //{ + // _logger.LogError($"Subscribe 处理Kafka数据发生异常 {ex.Message}|{ex.Source}|{ex.StackTrace}"); + //} #endif } async void DoReceive(string topic, string message, Headers headers) diff --git a/TelpoPush.WanJiaAn.Worker/Handlers/WanJiaAnProcess.cs b/TelpoPush.WanJiaAn.Worker/Handlers/WanJiaAnProcess.cs index f452f97..bfc2ffb 100644 --- a/TelpoPush.WanJiaAn.Worker/Handlers/WanJiaAnProcess.cs +++ b/TelpoPush.WanJiaAn.Worker/Handlers/WanJiaAnProcess.cs @@ -1,6 +1,11 @@ using Confluent.Kafka; using Microsoft.Extensions.Options; using Newtonsoft.Json; +using System.ComponentModel.DataAnnotations; +using TelpoDataService.Util.Clients; +using TelpoDataService.Util.Entities.GpsLocationHistory; +using TelpoDataService.Util.Models; +using TelpoDataService.Util.QueryObjects; using TelpoPush.WanJiaAn.Worker.Common; using TelpoPush.WanJiaAn.Worker.Models.Config; using TelpoPush.WanJiaAn.Worker.Models.Enum; @@ -20,14 +25,16 @@ namespace TelpoPush.WanJiaAn.Worker.Handlers private readonly RedisUtil _redis; private readonly MqProcessMessage _serviceMqProcess; private readonly WanJiaAnConfig _WanJiaAnConfig; + private readonly GpsLocationHistoryAccessorClient _messageWanJiaAnClient; public WanJiaAnProcess( IHostEnvironment env, ILogger logger, HttpHelperAsync httpHelper, RedisUtil redis, - MqProcessMessage serviceMqProcess, - IOptions WanJiaAnConfig + MqProcessMessage serviceMqProcess, + IOptions WanJiaAnConfig, + GpsLocationHistoryAccessorClient messageWanJiaAnClient ) { _env = env; @@ -36,6 +43,7 @@ namespace TelpoPush.WanJiaAn.Worker.Handlers _redis = redis; _WanJiaAnConfig = WanJiaAnConfig.Value; _serviceMqProcess = serviceMqProcess; + _messageWanJiaAnClient = messageWanJiaAnClient; } @@ -53,7 +61,7 @@ namespace TelpoPush.WanJiaAn.Worker.Handlers var Jo = JsonConvert.DeserializeObject>(message); if (Jo.ContainsKey("device_id")) imei = Jo["device_id"].ToString(); - + if (string.IsNullOrEmpty(imei)) { _logger.LogInformation($"[数据信息不完整] device_id信息不存在:{message}"); @@ -245,42 +253,102 @@ namespace TelpoPush.WanJiaAn.Worker.Handlers //位置 public async Task DataServicePusWanJiaAn(BaseModel model) { - if (model.data != null) + try { - var alarmEvent = JsonConvert.DeserializeObject(model.data.ToString()); - if (alarmEvent?.alarm_event != null) + if (model.data != null) { - var data = alarmEvent?.alarm_event; - if (data?.event_type == 2 || data?.event_type == 52)//2:哭声检测;52:遮脸提醒 + _logger.LogInformation($"{JsonConvert.SerializeObject(model)}"); + + var alarmEvent = JsonConvert.DeserializeObject(model.data.ToString()); + if (alarmEvent?.alarm_event != null) { - string event_id = data.event_id; - string device_id = model.device_id; - string user_id = model.user_id; - int event_type = data.event_type; - int report_type = data.report_type; - DateTime event_time = TimeHelper.ConvertToLocalDateTime(model.event_time); - DateTime event_start = TimeHelper.ConvertToLocalDateTime(data.event_start); - if (!string.IsNullOrEmpty(data.event_end)) - { - DateTime event_end = TimeHelper.ConvertToLocalDateTime(data.event_end); - } - if (!string.IsNullOrEmpty(data.video_start)) - { - DateTime video_start = TimeHelper.ConvertToLocalDateTime(data.video_start); - } - if (!string.IsNullOrEmpty(data.video_end)) + var data = alarmEvent?.alarm_event; + if (data?.event_type == 2 || data?.event_type == 52)//2:哭声检测;52:遮脸提醒 { - DateTime video_end = TimeHelper.ConvertToLocalDateTime(data.video_end); + string tag = ""; + if (data.report_type == 1) + tag = "事件开始"; + else if (data.report_type == 2) + tag = "事件结束"; + else if (data.report_type == 3) + tag = "资源上报"; + + var wanjiaanAlarm = new WanjiaanAlarmEvent() + { + EventId = data.event_id, + DeviceId = model.device_id, + UserId = model.user_id, + EventTime = TimeHelper.ConvertToLocalDateTime(model.event_time), + EventType = data.event_type, + ReportType = data.report_type, + EventStart = TimeHelper.ConvertToLocalDateTime(data.event_start), + ImageUrl = data.image, + VideoUrl = data.url, + Remark = $"{tag}:{JsonConvert.SerializeObject(model)};", + CreateTime = DateTime.Now, + }; + if (!string.IsNullOrEmpty(data.event_end)) + wanjiaanAlarm.EventEnd = TimeHelper.ConvertToLocalDateTime(data.event_end); + if (!string.IsNullOrEmpty(data.video_start)) + wanjiaanAlarm.VideoStart = TimeHelper.ConvertToLocalDateTime(data.video_start); + if (!string.IsNullOrEmpty(data.video_end)) + wanjiaanAlarm.VideoEnd = TimeHelper.ConvertToLocalDateTime(data.video_end); + + var alarmEventDb = await _redis.GetWanjiaanAlarmEvent(wanjiaanAlarm.EventId); + if (alarmEventDb != null) + wanjiaanAlarm.Remark = alarmEventDb.Remark + wanjiaanAlarm.Remark; + await _redis.SetWanjiaanAlarmEvent(wanjiaanAlarm.EventId, wanjiaanAlarm); + if (data.report_type == 3) + { + await _messageWanJiaAnClient.AddAsync(wanjiaanAlarm); + await _redis.DelWanjiaanAlarmEvent(wanjiaanAlarm.EventId); + } + + //var param = new GeneralParam + //{ + // Filters = new List { + // new QueryFilterCondition { + // Key = nameof(WanjiaanAlarmEvent.DeviceId), + // Value = device_id, + // ValueType = QueryValueTypeEnum.String, + // Operator = QueryOperatorEnum.Equal + // }, new QueryFilterCondition { + // Key = nameof(WanjiaanAlarmEvent.EventId), + // Value = event_id, + // ValueType = QueryValueTypeEnum.String, + // Operator = QueryOperatorEnum.Equal + // } + //} + //}; + //var alarmEventDb =_messageWanJiaAnClient.GetFirst(param); + //if (alarmEventDb != null) + //{ + // alarmEventDb.UserId = user_id; + // alarmEventDb.EventTime = event_time; + // alarmEventDb.EventType = event_type; + // alarmEventDb.ReportType = report_type; + // alarmEventDb.EventStart = event_start; + // alarmEventDb.EventEnd = wanjiaanAlarm.EventEnd; + // alarmEventDb.ImageUrl = image_url; + // alarmEventDb.VideoUrl = video_url; + // alarmEventDb.VideoStart = wanjiaanAlarm.VideoStart; + // alarmEventDb.VideoEnd = wanjiaanAlarm.VideoEnd; + // alarmEventDb.Remark = alarmEventDb.Remark + remark; + // _messageWanJiaAnClient.Update(alarmEventDb); + // await _redis.SetWanjiaanAlarmEvent(event_id, alarmEventDb); + //} + //else + //{ + // _messageWanJiaAnClient.Add(wanjiaanAlarm); + // await _redis.SetWanjiaanAlarmEvent(event_id, wanjiaanAlarm); + //} } - string image_url = data.image; - string video_url = data.url; - string remark = JsonConvert.SerializeObject(model); - DateTime create_time = DateTime.Now; - - var obj = new { }; } } - + } + catch (Exception ex) + { + string s = ex.Message; } } } diff --git a/TelpoPush.WanJiaAn.Worker/Models/CacheTemplates/PersonInfoModel.cs b/TelpoPush.WanJiaAn.Worker/Models/CacheTemplates/PersonInfoModel.cs deleted file mode 100644 index b191244..0000000 --- a/TelpoPush.WanJiaAn.Worker/Models/CacheTemplates/PersonInfoModel.cs +++ /dev/null @@ -1,42 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace TelpoPush.WanJiaAn.Worker.Models.CacheTemplates -{ - - public class PersonInfoModel - { - public PersonModel person { get; set; } - public string time { get; set; } - } - public class PersonModel - { - public string personId { get; set; } - public string serialno { get; set; } - public string personName { get; set; } - public string deviceId { get; set; } - public string nickName { get; set; } - public bool gender { get; set; } - public int height { get; set; } - public int weight { get; set; } - public string bornDate { get; set; } - public string school { get; set; } - public string grade { get; set; } - public string className { get; set; } - public string imagePath { get; set; } - public string imagePathSmall { get; set; } - public int age { get; set; } - public string createTime { get; set; } - public string remarks { get; set; } - public int ishypertension { get; set; } - public string emotion { get; set; } - public int profession { get; set; } - public int regularity { get; set; } - public int chronicDisease { get; set; } - public string apiUid { get; set; } - public string apiRemark { get; set; } - } -} diff --git a/TelpoPush.WanJiaAn.Worker/Service/Cache/RedisUtil.cs b/TelpoPush.WanJiaAn.Worker/Service/Cache/RedisUtil.cs index dab2b0e..dc9a917 100644 --- a/TelpoPush.WanJiaAn.Worker/Service/Cache/RedisUtil.cs +++ b/TelpoPush.WanJiaAn.Worker/Service/Cache/RedisUtil.cs @@ -1,4 +1,5 @@ using Microsoft.Extensions.Options; +using TelpoDataService.Util.Entities.GpsLocationHistory; using TelpoPush.WanJiaAn.Worker.Models.CacheTemplates; using TelpoPush.WanJiaAn.Worker.Models.Config; @@ -10,7 +11,7 @@ namespace TelpoPush.WanJiaAn.Worker.Service.Cache private const string CACHE_HASH_KEY_TELPO_GPSDEVICE_PERSON = "TELPO#GPSDEVICE_PERSON_HASH"; private const string CACHE_HASH_KEY_TELPO_MANUFACTOR_CONFIG = "TELPO#MANUFACTOR_CONFG_HASH"; private const string CACHE_HASH_KEY_TELPO_GPSDEVICE_PUSHSITTTIGS = "TELPO#GPSDEVICE_PUSH_SITTINGS_HASH"; - + private const string CACHE_HASH_KEY_TELPO_WANJIAAN_ALARMEVENT = "TELPO#WANJIAAN_ALARMEVENT_HASH"; private readonly ILogger _logger; private readonly ServiceConfig _configService; private readonly SqlMapper _sqlMapper; @@ -40,7 +41,7 @@ namespace TelpoPush.WanJiaAn.Worker.Service.Cache var deviceInfo = _sqlMapper.DeviceInfo(imei); if (deviceInfo != null) { - RedisHelper.HSetAsync(CACHE_HASH_KEY_TELPO_GPSDEVICE, imei, deviceInfo); + await RedisHelper.HSetAsync(CACHE_HASH_KEY_TELPO_GPSDEVICE, imei, deviceInfo); MemoryCacheUtil.Set(keyCache, deviceInfo, _configService.CacheDurationSeconds10); return deviceInfo; } @@ -67,28 +68,6 @@ namespace TelpoPush.WanJiaAn.Worker.Service.Cache return null; } - public async Task SetPersonInfoHash(string imei) - { - if (string.IsNullOrWhiteSpace(imei)) return null; - string keyCache = $"{imei}_PersonInfoHash"; - var personInfo = _sqlMapper.PersonInfo(imei); - PersonInfoModel model = new PersonInfoModel() - { - person = personInfo, - time = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss") - }; - if (personInfo != null) - { - await RedisHelper.HSetAsync(CACHE_HASH_KEY_TELPO_GPSDEVICE_PERSON, imei, model); - MemoryCacheUtil.Set(keyCache, model, 60);//1分钟 - } - else - { - await RedisHelper.HDelAsync(CACHE_HASH_KEY_TELPO_GPSDEVICE_PERSON, imei); - MemoryCacheUtil.Remove(keyCache); - } - return model; - } public async Task GetHealthyDeviceKey(string imei) { @@ -204,5 +183,19 @@ namespace TelpoPush.WanJiaAn.Worker.Service.Cache return settingInfos; } + + public async Task SetWanjiaanAlarmEvent(string key, WanjiaanAlarmEvent data) + { + await RedisHelper.HSetAsync(CACHE_HASH_KEY_TELPO_WANJIAAN_ALARMEVENT, key, data); + } + + public async Task DelWanjiaanAlarmEvent(string key) + { + await RedisHelper.HDelAsync(CACHE_HASH_KEY_TELPO_WANJIAAN_ALARMEVENT, key); + } + public async Task GetWanjiaanAlarmEvent(string key) + { + return await RedisHelper.HGetAsync(CACHE_HASH_KEY_TELPO_WANJIAAN_ALARMEVENT, key); + } } } diff --git a/TelpoPush.WanJiaAn.Worker/Service/Cache/SqlMapper.cs b/TelpoPush.WanJiaAn.Worker/Service/Cache/SqlMapper.cs index 6be2cd2..08646ea 100644 --- a/TelpoPush.WanJiaAn.Worker/Service/Cache/SqlMapper.cs +++ b/TelpoPush.WanJiaAn.Worker/Service/Cache/SqlMapper.cs @@ -27,13 +27,5 @@ namespace TelpoPush.WanJiaAn.Worker.Service.Cache return connection.QueryFirstOrDefault(sql, new { imei }); } } - public PersonModel PersonInfo(string imei) - { - using (IDbConnection connection = new MySqlConnection(gps_conn)) - { - var sql = @"SELECT person_id personId, serialno, person_name personName, device_id deviceId, nick_name nickName, gender, height, weight, born_date bornDate, school, grade, class_name className, image_path imagePath,image_path_small imagePathSmall, age, create_time createTime, remarks, ishypertension, emotion,profession,regularity,chronic_disease chronicDisease,api_uid apiUid,api_remark apiRemark FROM gps_person WHERE serialno=@imei"; - return connection.QueryFirstOrDefault(sql, new { imei }); - } - } } } diff --git a/TelpoPush.WanJiaAn.Worker/TelpoPush.WanJiaAn.Worker.csproj b/TelpoPush.WanJiaAn.Worker/TelpoPush.WanJiaAn.Worker.csproj index 0b0cd87..3c490c4 100644 --- a/TelpoPush.WanJiaAn.Worker/TelpoPush.WanJiaAn.Worker.csproj +++ b/TelpoPush.WanJiaAn.Worker/TelpoPush.WanJiaAn.Worker.csproj @@ -20,7 +20,7 @@ - + diff --git a/TelpoPush.WanJiaAn.Worker/Worker.cs b/TelpoPush.WanJiaAn.Worker/Worker.cs index f37aefe..48b58c1 100644 --- a/TelpoPush.WanJiaAn.Worker/Worker.cs +++ b/TelpoPush.WanJiaAn.Worker/Worker.cs @@ -18,7 +18,7 @@ namespace TelpoPush.WanJiaAn.Worker while (!stoppingToken.IsCancellationRequested) { await _kafkaSubscribe.SubscribeAsync(); - await Task.Delay(1000, stoppingToken); + await Task.Delay(10000000, stoppingToken); } } }