Browse Source

数据保存

master
杨雷 3 months ago
parent
commit
6b60ff7044
7 changed files with 146 additions and 128 deletions
  1. +27
    -20
      TelpoPush.WanJiaAn.Worker/Handlers/KafkaSubscribe.cs
  2. +100
    -32
      TelpoPush.WanJiaAn.Worker/Handlers/WanJiaAnProcess.cs
  3. +0
    -42
      TelpoPush.WanJiaAn.Worker/Models/CacheTemplates/PersonInfoModel.cs
  4. +17
    -24
      TelpoPush.WanJiaAn.Worker/Service/Cache/RedisUtil.cs
  5. +0
    -8
      TelpoPush.WanJiaAn.Worker/Service/Cache/SqlMapper.cs
  6. +1
    -1
      TelpoPush.WanJiaAn.Worker/TelpoPush.WanJiaAn.Worker.csproj
  7. +1
    -1
      TelpoPush.WanJiaAn.Worker/Worker.cs

+ 27
- 20
TelpoPush.WanJiaAn.Worker/Handlers/KafkaSubscribe.cs View File

@@ -26,29 +26,36 @@ namespace TelpoPush.WanJiaAn.Worker.Handlers
{
#if DEBUG
//_logger.LogInformation("11312");
//var temp = new Headers();
//string topic = "topic.push.WanJiaAn";
//temp.Add(new Header("DataType", new byte[] { 0, 0, 0, 0 }));
//temp.Add(new Header("AlarmType", new byte[] { 2, 0, 0, 0 }));
//string psych = "{\"messageId\":\"1790941606816612864\",\"topic\":\"topic.push.third\",\"time\":\"2024-05-16 11:05:27\",\"data\":{\"imei\":\"861281060093147\",\"atteryLowId\":\"861281060093147664577f9\",\"info\":\"设备电量低于15%\"}}";
//await _WanJiaAnProcess.SendWanJiaAn(psych, topic, temp);
// await _kafkaService.SubscribeAsync(DoReceive, CancellationToken.None);
var temp = new Headers();
string topic = "topic.wanjiaan.push.telpo";
temp.Add(new Header("DataType", new byte[] { 0, 0, 0, 0 }));
temp.Add(new Header("AlarmType", new byte[] { 2, 0, 0, 0 }));
string json1 = "{\"msg_id\":\"33090000000023391723109173034\",\"device_id\":\"3309000000002339\",\"type\":\"DEVICE_REPORT\",\"device_type\":\"XZL-Q42\",\"event_time\":\"1723109173\",\"app_id\":6755,\"user_id\":\"tel1000\",\"data\":{\"alarm_event\":{\"event_id\":\"17231091733309000000002339\",\"event_start\":\"1723109173\",\"event_type\":2,\"report_type\":1}}}";
await _WanJiaAnProcess.SendWanJiaAn(json1, topic, temp);

string json2 = "{\"msg_id\":\"33090000000023391723109197082\",\"device_id\":\"3309000000002339\",\"type\":\"DEVICE_REPORT\",\"device_type\":\"XZL-Q42\",\"event_time\":\"1723109197\",\"app_id\":6755,\"user_id\":\"tel1000\",\"data\":{\"alarm_event\":{\"event_end\":\"1723109197\",\"event_id\":\"17231091733309000000002339\",\"event_start\":\"1723109173\",\"event_type\":2,\"report_type\":2}}}";
await _WanJiaAnProcess.SendWanJiaAn(json2, topic, temp);

string json3 = "{\"msg_id\":\"33090000000023391723109197036\",\"device_id\":\"3309000000002339\",\"type\":\"DEVICE_REPORT\",\"device_type\":\"XZL-Q42\",\"event_time\":\"1723109197\",\"app_id\":6755,\"user_id\":\"tel1000\",\"data\":{\"alarm_event\":{\"event_end\":\"1723109197\",\"event_id\":\"17231091733309000000002339\",\"event_start\":\"1723109173\",\"event_type\":2,\"image\":\"http://s-cn-xiaoz-1304525121.cos.ap-guangzhou.myqcloud.com/cloud_ts%2F7_3309000000002339%2F3309000000002339_1723109174_p37.jpeg?sign=q-sign-algorithm%3Dsha1%26q-ak%3DAKIDdtlQq8bpIWdZH4PBD1xe4Qgon9IXLLrB%26q-sign-time%3D1723109175%3B3446823150%26q-key-time%3D1723109175%3B3446823150%26q-header-list%3Dhost%26q-url-param-list%3D%26q-signature%3D708db58b15fdcaceb8bf6eeded5e2664b015cb6c\",\"report_type\":3,\"url\":\"http://s-cn-xiaoz-1304525121.cos.ap-guangzhou.myqcloud.com/cloud_ts%2F7_3309000000002339%2F1723046400_1723109171_1723109175.ts?sign=q-sign-algorithm%3Dsha1%26q-ak%3DAKIDdtlQq8bpIWdZH4PBD1xe4Qgon9IXLLrB%26q-sign-time%3D1723109176%3B3446823152%26q-key-time%3D1723109176%3B3446823152%26q-header-list%3Dhost%26q-url-param-list%3D%26q-signature%3D2fad04c97bfd9113491eada7d221cc5caab57e5b\",\"video_end\":\"1723109175\",\"video_start\":\"1723109171\"}}}";
await _WanJiaAnProcess.SendWanJiaAn(json3, topic, temp);

// await _kafkaService.SubscribeAsync(DoReceive, CancellationToken.None);

#else
LimitedConcurrencyLevelTaskScheduler lcts = new LimitedConcurrencyLevelTaskScheduler(5);
TaskFactory factory = new TaskFactory(lcts);
try
{
await factory.StartNew(async () =>
{
await _kafkaService.SubscribeAsync(DoReceive, CancellationToken.None);
});
}
catch (Exception ex)
{
_logger.LogError($"Subscribe 处理Kafka数据发生异常 {ex.Message}|{ex.Source}|{ex.StackTrace}");
}
//LimitedConcurrencyLevelTaskScheduler lcts = new LimitedConcurrencyLevelTaskScheduler(5);
//TaskFactory factory = new TaskFactory(lcts);
//try
//{
// await factory.StartNew(async () =>
// {
// await _kafkaService.SubscribeAsync(DoReceive, CancellationToken.None);
// });
//}
//catch (Exception ex)
//{
// _logger.LogError($"Subscribe 处理Kafka数据发生异常 {ex.Message}|{ex.Source}|{ex.StackTrace}");
//}
#endif
}
async void DoReceive(string topic, string message, Headers headers)


+ 100
- 32
TelpoPush.WanJiaAn.Worker/Handlers/WanJiaAnProcess.cs View File

@@ -1,6 +1,11 @@
using Confluent.Kafka;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using System.ComponentModel.DataAnnotations;
using TelpoDataService.Util.Clients;
using TelpoDataService.Util.Entities.GpsLocationHistory;
using TelpoDataService.Util.Models;
using TelpoDataService.Util.QueryObjects;
using TelpoPush.WanJiaAn.Worker.Common;
using TelpoPush.WanJiaAn.Worker.Models.Config;
using TelpoPush.WanJiaAn.Worker.Models.Enum;
@@ -20,14 +25,16 @@ namespace TelpoPush.WanJiaAn.Worker.Handlers
private readonly RedisUtil _redis;
private readonly MqProcessMessage _serviceMqProcess;
private readonly WanJiaAnConfig _WanJiaAnConfig;
private readonly GpsLocationHistoryAccessorClient<WanjiaanAlarmEvent> _messageWanJiaAnClient;

public WanJiaAnProcess(
IHostEnvironment env,
ILogger<WanJiaAnProcess> logger,
HttpHelperAsync httpHelper,
RedisUtil redis,
MqProcessMessage serviceMqProcess,
IOptions<WanJiaAnConfig> WanJiaAnConfig
MqProcessMessage serviceMqProcess,
IOptions<WanJiaAnConfig> WanJiaAnConfig,
GpsLocationHistoryAccessorClient<WanjiaanAlarmEvent> messageWanJiaAnClient
)
{
_env = env;
@@ -36,6 +43,7 @@ namespace TelpoPush.WanJiaAn.Worker.Handlers
_redis = redis;
_WanJiaAnConfig = WanJiaAnConfig.Value;
_serviceMqProcess = serviceMqProcess;
_messageWanJiaAnClient = messageWanJiaAnClient;

}

@@ -53,7 +61,7 @@ namespace TelpoPush.WanJiaAn.Worker.Handlers
var Jo = JsonConvert.DeserializeObject<Dictionary<string, object>>(message);
if (Jo.ContainsKey("device_id"))
imei = Jo["device_id"].ToString();
if (string.IsNullOrEmpty(imei))
{
_logger.LogInformation($"[数据信息不完整] device_id信息不存在:{message}");
@@ -245,42 +253,102 @@ namespace TelpoPush.WanJiaAn.Worker.Handlers
//位置
public async Task DataServicePusWanJiaAn(BaseModel model)
{
if (model.data != null)
try
{
var alarmEvent = JsonConvert.DeserializeObject<MqAlarmEventTemplate>(model.data.ToString());
if (alarmEvent?.alarm_event != null)
if (model.data != null)
{
var data = alarmEvent?.alarm_event;
if (data?.event_type == 2 || data?.event_type == 52)//2:哭声检测;52:遮脸提醒
_logger.LogInformation($"{JsonConvert.SerializeObject(model)}");

var alarmEvent = JsonConvert.DeserializeObject<MqAlarmEventTemplate>(model.data.ToString());
if (alarmEvent?.alarm_event != null)
{
string event_id = data.event_id;
string device_id = model.device_id;
string user_id = model.user_id;
int event_type = data.event_type;
int report_type = data.report_type;
DateTime event_time = TimeHelper.ConvertToLocalDateTime(model.event_time);
DateTime event_start = TimeHelper.ConvertToLocalDateTime(data.event_start);
if (!string.IsNullOrEmpty(data.event_end))
{
DateTime event_end = TimeHelper.ConvertToLocalDateTime(data.event_end);
}
if (!string.IsNullOrEmpty(data.video_start))
{
DateTime video_start = TimeHelper.ConvertToLocalDateTime(data.video_start);
}
if (!string.IsNullOrEmpty(data.video_end))
var data = alarmEvent?.alarm_event;
if (data?.event_type == 2 || data?.event_type == 52)//2:哭声检测;52:遮脸提醒
{
DateTime video_end = TimeHelper.ConvertToLocalDateTime(data.video_end);
string tag = "";
if (data.report_type == 1)
tag = "事件开始";
else if (data.report_type == 2)
tag = "事件结束";
else if (data.report_type == 3)
tag = "资源上报";

var wanjiaanAlarm = new WanjiaanAlarmEvent()
{
EventId = data.event_id,
DeviceId = model.device_id,
UserId = model.user_id,
EventTime = TimeHelper.ConvertToLocalDateTime(model.event_time),
EventType = data.event_type,
ReportType = data.report_type,
EventStart = TimeHelper.ConvertToLocalDateTime(data.event_start),
ImageUrl = data.image,
VideoUrl = data.url,
Remark = $"{tag}:{JsonConvert.SerializeObject(model)};",
CreateTime = DateTime.Now,
};
if (!string.IsNullOrEmpty(data.event_end))
wanjiaanAlarm.EventEnd = TimeHelper.ConvertToLocalDateTime(data.event_end);
if (!string.IsNullOrEmpty(data.video_start))
wanjiaanAlarm.VideoStart = TimeHelper.ConvertToLocalDateTime(data.video_start);
if (!string.IsNullOrEmpty(data.video_end))
wanjiaanAlarm.VideoEnd = TimeHelper.ConvertToLocalDateTime(data.video_end);

var alarmEventDb = await _redis.GetWanjiaanAlarmEvent(wanjiaanAlarm.EventId);
if (alarmEventDb != null)
wanjiaanAlarm.Remark = alarmEventDb.Remark + wanjiaanAlarm.Remark;
await _redis.SetWanjiaanAlarmEvent(wanjiaanAlarm.EventId, wanjiaanAlarm);
if (data.report_type == 3)
{
await _messageWanJiaAnClient.AddAsync(wanjiaanAlarm);
await _redis.DelWanjiaanAlarmEvent(wanjiaanAlarm.EventId);
}

//var param = new GeneralParam
//{
// Filters = new List<QueryFilterCondition> {
// new QueryFilterCondition {
// Key = nameof(WanjiaanAlarmEvent.DeviceId),
// Value = device_id,
// ValueType = QueryValueTypeEnum.String,
// Operator = QueryOperatorEnum.Equal
// }, new QueryFilterCondition {
// Key = nameof(WanjiaanAlarmEvent.EventId),
// Value = event_id,
// ValueType = QueryValueTypeEnum.String,
// Operator = QueryOperatorEnum.Equal
// }
//}
//};
//var alarmEventDb =_messageWanJiaAnClient.GetFirst(param);
//if (alarmEventDb != null)
//{
// alarmEventDb.UserId = user_id;
// alarmEventDb.EventTime = event_time;
// alarmEventDb.EventType = event_type;
// alarmEventDb.ReportType = report_type;
// alarmEventDb.EventStart = event_start;
// alarmEventDb.EventEnd = wanjiaanAlarm.EventEnd;
// alarmEventDb.ImageUrl = image_url;
// alarmEventDb.VideoUrl = video_url;
// alarmEventDb.VideoStart = wanjiaanAlarm.VideoStart;
// alarmEventDb.VideoEnd = wanjiaanAlarm.VideoEnd;
// alarmEventDb.Remark = alarmEventDb.Remark + remark;
// _messageWanJiaAnClient.Update(alarmEventDb);
// await _redis.SetWanjiaanAlarmEvent(event_id, alarmEventDb);
//}
//else
//{
// _messageWanJiaAnClient.Add(wanjiaanAlarm);
// await _redis.SetWanjiaanAlarmEvent(event_id, wanjiaanAlarm);
//}
}
string image_url = data.image;
string video_url = data.url;
string remark = JsonConvert.SerializeObject(model);
DateTime create_time = DateTime.Now;

var obj = new { };
}
}

}
catch (Exception ex)
{
string s = ex.Message;
}
}
}


+ 0
- 42
TelpoPush.WanJiaAn.Worker/Models/CacheTemplates/PersonInfoModel.cs View File

@@ -1,42 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace TelpoPush.WanJiaAn.Worker.Models.CacheTemplates
{

public class PersonInfoModel
{
public PersonModel person { get; set; }
public string time { get; set; }
}
public class PersonModel
{
public string personId { get; set; }
public string serialno { get; set; }
public string personName { get; set; }
public string deviceId { get; set; }
public string nickName { get; set; }
public bool gender { get; set; }
public int height { get; set; }
public int weight { get; set; }
public string bornDate { get; set; }
public string school { get; set; }
public string grade { get; set; }
public string className { get; set; }
public string imagePath { get; set; }
public string imagePathSmall { get; set; }
public int age { get; set; }
public string createTime { get; set; }
public string remarks { get; set; }
public int ishypertension { get; set; }
public string emotion { get; set; }
public int profession { get; set; }
public int regularity { get; set; }
public int chronicDisease { get; set; }
public string apiUid { get; set; }
public string apiRemark { get; set; }
}
}

+ 17
- 24
TelpoPush.WanJiaAn.Worker/Service/Cache/RedisUtil.cs View File

@@ -1,4 +1,5 @@
using Microsoft.Extensions.Options;
using TelpoDataService.Util.Entities.GpsLocationHistory;
using TelpoPush.WanJiaAn.Worker.Models.CacheTemplates;
using TelpoPush.WanJiaAn.Worker.Models.Config;

@@ -10,7 +11,7 @@ namespace TelpoPush.WanJiaAn.Worker.Service.Cache
private const string CACHE_HASH_KEY_TELPO_GPSDEVICE_PERSON = "TELPO#GPSDEVICE_PERSON_HASH";
private const string CACHE_HASH_KEY_TELPO_MANUFACTOR_CONFIG = "TELPO#MANUFACTOR_CONFG_HASH";
private const string CACHE_HASH_KEY_TELPO_GPSDEVICE_PUSHSITTTIGS = "TELPO#GPSDEVICE_PUSH_SITTINGS_HASH";
private const string CACHE_HASH_KEY_TELPO_WANJIAAN_ALARMEVENT = "TELPO#WANJIAAN_ALARMEVENT_HASH";
private readonly ILogger<RedisUtil> _logger;
private readonly ServiceConfig _configService;
private readonly SqlMapper _sqlMapper;
@@ -40,7 +41,7 @@ namespace TelpoPush.WanJiaAn.Worker.Service.Cache
var deviceInfo = _sqlMapper.DeviceInfo(imei);
if (deviceInfo != null)
{
RedisHelper.HSetAsync(CACHE_HASH_KEY_TELPO_GPSDEVICE, imei, deviceInfo);
await RedisHelper.HSetAsync(CACHE_HASH_KEY_TELPO_GPSDEVICE, imei, deviceInfo);
MemoryCacheUtil.Set(keyCache, deviceInfo, _configService.CacheDurationSeconds10);
return deviceInfo;
}
@@ -67,28 +68,6 @@ namespace TelpoPush.WanJiaAn.Worker.Service.Cache
return null;
}

public async Task<PersonInfoModel> SetPersonInfoHash(string imei)
{
if (string.IsNullOrWhiteSpace(imei)) return null;
string keyCache = $"{imei}_PersonInfoHash";
var personInfo = _sqlMapper.PersonInfo(imei);
PersonInfoModel model = new PersonInfoModel()
{
person = personInfo,
time = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")
};
if (personInfo != null)
{
await RedisHelper.HSetAsync(CACHE_HASH_KEY_TELPO_GPSDEVICE_PERSON, imei, model);
MemoryCacheUtil.Set(keyCache, model, 60);//1分钟
}
else
{
await RedisHelper.HDelAsync(CACHE_HASH_KEY_TELPO_GPSDEVICE_PERSON, imei);
MemoryCacheUtil.Remove(keyCache);
}
return model;
}

public async Task<string> GetHealthyDeviceKey(string imei)
{
@@ -204,5 +183,19 @@ namespace TelpoPush.WanJiaAn.Worker.Service.Cache
return settingInfos;
}


public async Task SetWanjiaanAlarmEvent(string key, WanjiaanAlarmEvent data)
{
await RedisHelper.HSetAsync(CACHE_HASH_KEY_TELPO_WANJIAAN_ALARMEVENT, key, data);
}

public async Task DelWanjiaanAlarmEvent(string key)
{
await RedisHelper.HDelAsync(CACHE_HASH_KEY_TELPO_WANJIAAN_ALARMEVENT, key);
}
public async Task<WanjiaanAlarmEvent> GetWanjiaanAlarmEvent(string key)
{
return await RedisHelper.HGetAsync<WanjiaanAlarmEvent>(CACHE_HASH_KEY_TELPO_WANJIAAN_ALARMEVENT, key);
}
}
}

+ 0
- 8
TelpoPush.WanJiaAn.Worker/Service/Cache/SqlMapper.cs View File

@@ -27,13 +27,5 @@ namespace TelpoPush.WanJiaAn.Worker.Service.Cache
return connection.QueryFirstOrDefault<DeviceInfoModel>(sql, new { imei });
}
}
public PersonModel PersonInfo(string imei)
{
using (IDbConnection connection = new MySqlConnection(gps_conn))
{
var sql = @"SELECT person_id personId, serialno, person_name personName, device_id deviceId, nick_name nickName, gender, height, weight, born_date bornDate, school, grade, class_name className, image_path imagePath,image_path_small imagePathSmall, age, create_time createTime, remarks, ishypertension, emotion,profession,regularity,chronic_disease chronicDisease,api_uid apiUid,api_remark apiRemark FROM gps_person WHERE serialno=@imei";
return connection.QueryFirstOrDefault<PersonModel>(sql, new { imei });
}
}
}
}

+ 1
- 1
TelpoPush.WanJiaAn.Worker/TelpoPush.WanJiaAn.Worker.csproj View File

@@ -20,7 +20,7 @@
<PackageReference Include="Microsoft.Extensions.Caching.Memory" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="8.0.0" />
<PackageReference Include="MySql.Data" Version="8.4.0" />
<PackageReference Include="TelpoDataService.Util" Version="1.6.9.28-beta1" />
<PackageReference Include="TelpoDataService.Util" Version="1.6.9.32-beta1" />
</ItemGroup>

<ItemGroup>


+ 1
- 1
TelpoPush.WanJiaAn.Worker/Worker.cs View File

@@ -18,7 +18,7 @@ namespace TelpoPush.WanJiaAn.Worker
while (!stoppingToken.IsCancellationRequested)
{
await _kafkaSubscribe.SubscribeAsync();
await Task.Delay(1000, stoppingToken);
await Task.Delay(10000000, stoppingToken);
}
}
}


Loading…
Cancel
Save