|
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033 |
- using Confluent.Kafka;
- using GpsCardGatewayPosition.Common;
- using GpsCardGatewayPosition.Model.Config;
- using GpsCardGatewayPosition.Model.Enum;
- using GpsCardGatewayPosition.Model.IoT;
- using GpsCardGatewayPosition.Service.Cache;
- using GpsCardGatewayPosition.Service.MqProducer;
- using GpsCardGatewayPosition.Service.MqProducer.Model;
- using GpsCardGatewayPosition.Service.Resolver.Factory;
- using GpsCardGatewayPosition.Service.Resolver.Property;
- using GpsCardGatewayPosition.Service.Resolver.Property.Dto;
- using Microsoft.Extensions.Hosting;
- using Microsoft.Extensions.Logging;
- using Microsoft.Extensions.Options;
- using Newtonsoft.Json;
- using Newtonsoft.Json.Linq;
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Net;
- using System.Reflection.Metadata;
- using System.Text;
- using System.Threading.Tasks;
- using System.Web;
- using static Confluent.Kafka.ConfigPropertyNames;
-
- namespace GpsCardGatewayPosition.Postion
- {
- public class Worker : BackgroundService
- {
- private readonly ILogger<Worker> _logger;
- private CancellationTokenSource _tokenSource = default!;
- private readonly MqProcessLogic _serviceMqProcess;
- private readonly ServiceConfig _configService;
- private readonly AppsettingsConfig _configAppsettings;
- private readonly ResolverFactory _factoryResolver;
-
-
-
- private readonly WifiPositionResolver _resolverWifiPosition;
- private readonly WifiPlus2PositionResolver _resolverWifiPlus2Position;
- private readonly GpsPositionResolver _resolverGpsPosition;
- private readonly LbsGsmPositionResolver _resolverLbsGsmPosition;
-
- public Worker(ILogger<Worker> logger,IOptions<ServiceConfig> _optConfigService, IOptions<RedisConfig> optConfigRedis, IOptions<AppsettingsConfig> optConfigAppsettings, ResolverFactory resolverFactory, MqProcessLogic serviceMqProcess,
- WifiPositionResolver wifiPositionResolver, WifiPlus2PositionResolver wifiPlus2PositionResolver, GpsPositionResolver gpsPositionResolver,LbsGsmPositionResolver lbsGsmPositionResolver)
- {
- _configService = _optConfigService.Value;
- _configAppsettings = optConfigAppsettings.Value;
- _serviceMqProcess = serviceMqProcess;
- _logger = logger;
- _factoryResolver = resolverFactory;
-
-
- _resolverWifiPosition = wifiPositionResolver;
- _resolverWifiPlus2Position = wifiPlus2PositionResolver;
- _resolverGpsPosition = gpsPositionResolver;
- _resolverLbsGsmPosition= lbsGsmPositionResolver;
-
- //配置全局服务
- #region 其他业务redis服务器
- var csredis = new CSRedis.CSRedisClient(optConfigRedis.Value.ToString());
- RedisHelper.Initialization(csredis);
-
- RedisConfig csredisDb7Con = optConfigRedis.Value;
- csredisDb7Con.DefaultDatabase = 7;
- csredisDb7Con.Prefix = "TELPO";
- var csredisDb7 = new CSRedis.CSRedisClient(csredisDb7Con.ToString());
- RedisHelperDb7.Initialization(csredisDb7);
-
- //
- RedisConfig csredisDb0Con = optConfigRedis.Value;
- csredisDb0Con.DefaultDatabase = 0;
- csredisDb0Con.Prefix = "";
- var csredisDb0 = new CSRedis.CSRedisClient(csredisDb0Con.ToString());
- RedisHelperDb0.Initialization(csredisDb0);
- #endregion
-
- #region 共享维智定位缓存
- // 维智定位缓存Redis Server
- // 172.19.42.56,47.100.200.105
- RedisConfig csredisWayzCon = optConfigRedis.Value;
- csredisWayzCon.DefaultDatabase = 10;
- csredisDb7Con.Prefix = "_GW_";
- #if DEBUG
- csredisWayzCon.Server = "192.168.12.127:8090";
- #else
- csredisWayzCon.Server = "172.19.42.56:8090";
- #endif
- var csredisWayz = new CSRedis.CSRedisClient(csredisWayzCon.ToString());
- RedisHelperWayz.Initialization(csredisWayz);
- #endregion
- }
- public override Task StartAsync(CancellationToken cancellationToken)
- {
- _logger.LogInformation("------StartAsync");
- _tokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
- // 创建消费者
- return base.StartAsync(cancellationToken);
- }
-
- public override Task StopAsync(CancellationToken cancellationToken)
- {
- _logger.LogInformation("------StopAsync");
- _tokenSource.Cancel();
- return base.StopAsync(cancellationToken);
- }
- protected override async Task ExecuteAsync(CancellationToken stoppingToken)
- {
- var kafkaConsumer = CreateKafkaConsumer();
- kafkaConsumer.Subscribe("topics.storage.iot.postion");
- var tasks = new List<Task>();
- //int i = 0;
- try
- {
- while (!stoppingToken.IsCancellationRequested)
- {
- #region 生产
-
- // await productPostionAsync();
-
- #endregion
-
- #region 消费
- // 并发处理 1000 个消息
- var concurrencyId = Guid.NewGuid().ToString("N")[^4..];
- using (_logger.BeginScope(new Dictionary<string, object> { ["RequestId"] = concurrencyId }))
- using (new CustomizeStopWatch(nameof(Worker), _logger))
- {
- //for (int i = 0; i < 2000; i++)
- //{
- // //var consumeResult = kafkaConsumer.Consume(stoppingToken);
- // var consumeResult = await Task.Run(() => kafkaConsumer.Consume(stoppingToken));
- // if (consumeResult != null)
- // {
- // tasks.Add(ProcessMessageAsync(consumeResult, kafkaConsumer));
- // }
- //}
- ////var consumeResult = kafkaConsumer.Consume(stoppingToken);
- ////if (consumeResult != null)
- ////{
- //// tasks.Add(ProcessMessageAsync(consumeResult, kafkaConsumer));
- ////}
- //// 并行等待所有任务完成
- //await Task.WhenAll(tasks);
- //tasks.Clear(); // 清空任务列表,以便下一个循环重新填充
- //Console.WriteLine("----------------------------------------------------------");
-
- var consumeResult = await Task.Run(() => kafkaConsumer.Consume(stoppingToken));
- if (consumeResult != null)
- {
- tasks.Add(ProcessMessageAsync(consumeResult, kafkaConsumer, concurrencyId));
- }
-
- if (tasks.Count>=2000)
- {
- // 并行等待所有任务完成
- await Task.WhenAll(tasks);
- tasks.Clear(); // 清空任务列表,以便下一个循环重新填充
- Console.WriteLine("----------------------------------------------------------");
- }
-
-
-
- }
- //var consumeResult = kafkaConsumer.Consume(stoppingToken);
- //if (consumeResult != null)
- //{
- // i++;
- // tasks.Add(ProcessMessageAsync(consumeResult, kafkaConsumer, i));
- //}
- //await Task.WhenAll(tasks);
- #endregion
- }
- }
- catch (OperationCanceledException)
- {
- _logger.LogWarning("Worker exit");
- }
- catch (Exception ex)
- {
- _logger.LogError($"An error occurred: {ex.Message}");
- }
-
- // await Task.WhenAll(tasks);
- }
-
- private async Task ProductPostionAsync()
- {
- Object wifi = JObject.Parse(@"{
- 'MessageId': '1775412145908567552',
- 'BusinessId': '862622050169778',
- 'ReceiveTime': '2024-04-03 14:36:55',
- 'TopicType': 512,
- 'MsgType': 517,
- 'TopicInfo': {
- 'deviceType': 'CustomCategory',
- 'iotId': '7EBPsoxlNmILKsUpKl0L000000',
- 'requestId': '987',
- 'productKey': 'a18mXM6Cvx8',
- 'gmtCreate': 1712126215803,
- 'deviceName': '862622050169778',
- 'items': {
- 'WIFI': {
- 'value': {
- 'mmac': '04:6b:25:8e:a1:81,-68,',
- 'macs': 'f4:2a:7d:ca:c8:c9,-72,|04:6b:25:8e:0a:91,-75,|54:84:dc:37:e8:70,-78,|54:84:dc:37:e8:18,-82,',
- 'IDType': 0,
- 'IDNumber': ''
- },
- 'time': 1712126215802
- }
- }
- },
- 'DetailData': {
- 'time': 1712126215802,
- 'value': {
- 'mmac': '04:6b:25:8e:a1:81,-68,',
- 'macs': 'f4:2a:7d:ca:c8:c9,-72,|04:6b:25:8e:0a:91,-75,|54:84:dc:37:e8:70,-78,|54:84:dc:37:e8:18,-82,',
- 'IDType': 0,
- 'IDNumber': ''
- }
- }
- }");
-
- Object wifiplus2 = JObject.Parse(@"{
- 'MessageId': '1775412188354926080',
- 'BusinessId': '861281060094145',
- 'ReceiveTime': '2024-04-03 14:37:05',
- 'TopicType': 512,
- 'MsgType': 514,
- 'TopicInfo': {
- 'deviceType': 'CustomCategory',
- 'iotId': 'yQw7bqBl8gSltErYIRVBg1gs00',
- 'requestId': '105',
- 'productKey': 'a18mXM6Cvx8',
- 'gmtCreate': 1712126225922,
- 'deviceName': '861281060094145',
- 'items': {
- 'WIFIPLUS2_0': {
- 'time': 1712124640009,
- 'value': [
- {
- 'bts': '460,00,30977,247196817,-41',
- 'smac': '',
- 'nearbts': '|460,00,30977,130,-17|460,00,30977,436,-12|460,00,30977,414,-8|460,00,30977,500,-41|460,00,30977,294,-47|460,00,30977,2,-39',
- 'imsi': '460077277322956',
- 'steps': '0',
- 'IDType': 0,
- 'network': 'LTE',
- 'mmac': '08:10:7b:df:3d:c8,-62,',
- 'datetime': '1712124640001',
- 'macs': 'ee:b9:70:83:8d:d2,-68,|3c:06:a7:16:a7:21,-68,|3c:06:a7:80:68:1a,-70,|3e:06:a7:a0:68:1a,-70,|08:10:78:91:72:4a,-77,|68:77:24:aa:40:c3,-81,|a8:e5:44:a5:c4:20,-84,',
- 'imei': '861281060094145',
- 'cdma': 0,
- 'IDNumber': ''
- }
- ]
- }
- }
- },
- 'DetailData': {
- 'time': 1712124640009,
- 'value': [
- {
- 'Mmac': '08:10:7b:df:3d:c8,-62,',
- 'Macs': 'ee:b9:70:83:8d:d2,-68,|3c:06:a7:16:a7:21,-68,|3c:06:a7:80:68:1a,-70,|3e:06:a7:a0:68:1a,-70,|08:10:78:91:72:4a,-77,|68:77:24:aa:40:c3,-81,|a8:e5:44:a5:c4:20,-84,',
- 'IdType': 0,
- 'IdNumber': '',
- 'Imei': '861281060094145',
- 'Smac': '',
- 'Imsi': '460077277322956',
- 'NearBts': '|460,00,30977,130,-17|460,00,30977,436,-12|460,00,30977,414,-8|460,00,30977,500,-41|460,00,30977,294,-47|460,00,30977,2,-39',
- 'Bts': '460,00,30977,247196817,-41',
- 'Cdma': 0,
- 'Network': 'LTE',
- 'Steps': '0',
- 'Datetime': '1712124640001'
- }
- ]
- }
- }");
-
- //Object lbs = JObject.Parse(@');
-
- //Object gps = JObject.Parse(@');
- for (int i = 0; i < 1000; i++)
- {
- if (i % 2 == 1)
- {
- var serialno = "862622050169778";
- var messageId = Guid.NewGuid().ToString();
- await _serviceMqProcess.ProcessIotPositionAsync(messageId, serialno, wifi);
- }
- else
- {
- var serialno = "861281060094145";
- var messageId = Guid.NewGuid().ToString();
- await _serviceMqProcess.ProcessIotPositionAsync(messageId, serialno, wifiplus2);
- }
- }
- }
- private async Task ProcessMessageAsync(ConsumeResult<Ignore, string> consumeResult, IConsumer<Ignore, string> kafkaConsumer,string concurrencyId)
- {
- var msg = JsonConvert.DeserializeObject<MessageModel>(consumeResult.Message.Value);
- string msgId = msg!.Data["MessageId"]?.ToObject<string>()!;
- string bizId = msg!.Data["BusinessId"]?.ToObject<string>()!;
- IotTopicType topic = msg!.Data["TopicType"]!.ToObject<IotTopicType>();
- int type = msg!.Data["MsgType"]!.ToObject<int>();
- object topicInfo = msg!.Data["TopicInfo"]!.ToObject<object>()!;
- object detailData = msg!.Data["DetailData"]!.ToObject<object>()!;
- var packge = new PackageMsgModel(msgId, bizId, topic, type, topicInfo, detailData);
-
- try
- {
- using (_logger.BeginScope(new Dictionary<string, object> { ["RequestId"] =$"{concurrencyId}-{msg!.MessageId}" }))
- using (new CustomizeStopWatch(IotMessageTypeUtils.TryToDescription(type), _logger))
- {
- var dict = JsonConvert.DeserializeObject<Dictionary<string, object>>(msg!.Data["TopicInfo"]?["items"]?.ToString()!);
- var key = dict?.Keys?.FirstOrDefault() + "";
- switch (key)
- {
- case "GPS": //处理gps位置信息上报
- {
- //Console.WriteLine("GPS");
- var data = JsonConvert.DeserializeObject<PropertyItemModel<GpsInfoModel>>(dict?["GPS"] + "");
- //if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]");
- _resolverGpsPosition.SetResolveInfo(packge);
- await _resolverGpsPosition.ExecuteMessageAsync().ConfigureAwait(false);
- kafkaConsumer.Commit(consumeResult);
- break;
- //return new PackageMsgModel(msg.MessageId, topicInfo.DeviceName, IotTopicType.Property, (int)IotMessagePropertyType.GpsPosition, topicInfo, data);
- }
-
- case "WIFI": //处理gps位置信息上报
- {
- var data = JsonConvert.DeserializeObject<PropertyItemModel<WifiInfoModel>>(dict?["WIFI"] + "");
- //if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]");
- _resolverWifiPosition.SetResolveInfo(packge);
- await _resolverWifiPosition.ExecuteMessageAsync().ConfigureAwait(false);
- kafkaConsumer.Commit(consumeResult);
- break;
- }
- case "WIFIPLUS2_0": //处理gps位置信息上报
- {
- var data = JsonConvert.DeserializeObject<PropertyItemModel<List<EachWifiPlus2Model>>>(dict?["WIFIPLUS2_0"] + "");
- //if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]");
- _resolverWifiPlus2Position.SetResolveInfo(packge);
- await _resolverWifiPlus2Position.ExecuteMessageAsync().ConfigureAwait(false);
- kafkaConsumer.Commit(consumeResult);
- break;
- }
- case "LBS": //处理gps位置信息上报
- {
- var data = JsonConvert.DeserializeObject<PropertyItemModel<LbsInfoModel>>(dict?["LBS"] + "");
- //if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]");
-
- //if (data.Value.Cdma == 0)
- //{
- // return new PackageMsgModel(msg.MessageId, topicInfo.DeviceName, IotTopicType.Property, (int)IotMessagePropertyType.LbsGsmPosition, topicInfo, data);
- //}
- //else if (data.Value.Cdma == 1)
- //{
- // return new PackageMsgModel(msg.MessageId, topicInfo.DeviceName, IotTopicType.Property, (int)IotMessagePropertyType.LbsCdmaPosition, topicInfo, data);
- //}
- _resolverLbsGsmPosition.SetResolveInfo(packge);
- await _resolverLbsGsmPosition.ExecuteMessageAsync().ConfigureAwait(false);
- kafkaConsumer.Commit(consumeResult);
- break;
- }
- }
-
- }
- }
- catch (Exception ex)
- {
- _logger.LogError($"解析定位消息发生异常 messageId: {msg!.MessageId}, Message: {ex.Message}, body: {JsonConvert.SerializeObject(msg)}");
- }
-
- await Task.CompletedTask;
- }
- private async Task ProcessMessageAsync6(ConsumeResult<Ignore, string> consumeResult, IConsumer<Ignore, string> kafkaConsumer)
- {
- var msg = JsonConvert.DeserializeObject<MessageModel>(consumeResult.Message.Value);
- string msgId = msg!.Data["MessageId"]?.ToObject<string>()!;
- string bizId = msg!.Data["BusinessId"]?.ToObject<string>()!;
- IotTopicType topic = msg!.Data["TopicType"]!.ToObject<IotTopicType>();
- int type = msg!.Data["MsgType"]!.ToObject<int>();
- object topicInfo = msg!.Data["TopicInfo"]!.ToObject<object>()!;
- object detailData = msg!.Data["DetailData"]!.ToObject<object>()!;
- var packge = new PackageMsgModel(msgId, bizId, topic, type, topicInfo, detailData);
-
- try
- {
- using (_logger.BeginScope(new Dictionary<string, object> { ["RequestId"] = msg!.MessageId }))
- using (new CustomizeStopWatch(IotMessageTypeUtils.TryToDescription(type), _logger))
- {
- var dict = JsonConvert.DeserializeObject<Dictionary<string, object>>(msg!.Data["TopicInfo"]?["items"]?.ToString()!);
- var key = dict?.Keys?.FirstOrDefault() + "";
- switch (key)
- {
- case "GPS": //处理gps位置信息上报
- {
- //Console.WriteLine("GPS");
- var data = JsonConvert.DeserializeObject<PropertyItemModel<GpsInfoModel>>(dict?["GPS"] + "");
- //if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]");
- _resolverGpsPosition.SetResolveInfo(packge);
- await _resolverGpsPosition.ExecuteMessageAsync().ConfigureAwait(false);
- break;
- //return new PackageMsgModel(msg.MessageId, topicInfo.DeviceName, IotTopicType.Property, (int)IotMessagePropertyType.GpsPosition, topicInfo, data);
- }
-
- case "WIFI": //处理gps位置信息上报
- {
- var data = JsonConvert.DeserializeObject<PropertyItemModel<WifiInfoModel>>(dict?["WIFI"] + "");
- //if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]");
- _resolverWifiPosition.SetResolveInfo(packge);
- await _resolverWifiPosition.ExecuteMessageAsync().ConfigureAwait(false);
- break;
- }
- case "WIFIPLUS2_0": //处理gps位置信息上报
- {
- var data = JsonConvert.DeserializeObject<PropertyItemModel<List<EachWifiPlus2Model>>>(dict?["WIFIPLUS2_0"] + "");
- //if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]");
- _resolverWifiPlus2Position.SetResolveInfo(packge);
- await _resolverWifiPlus2Position.ExecuteMessageAsync().ConfigureAwait(false);
- break;
- }
- case "LBS": //处理gps位置信息上报
- {
- var data = JsonConvert.DeserializeObject<PropertyItemModel<LbsInfoModel>>(dict?["LBS"] + "");
- //if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]");
-
- //if (data.Value.Cdma == 0)
- //{
- // return new PackageMsgModel(msg.MessageId, topicInfo.DeviceName, IotTopicType.Property, (int)IotMessagePropertyType.LbsGsmPosition, topicInfo, data);
- //}
- //else if (data.Value.Cdma == 1)
- //{
- // return new PackageMsgModel(msg.MessageId, topicInfo.DeviceName, IotTopicType.Property, (int)IotMessagePropertyType.LbsCdmaPosition, topicInfo, data);
- //}
- _resolverLbsGsmPosition.SetResolveInfo(packge);
- await _resolverLbsGsmPosition.ExecuteMessageAsync().ConfigureAwait(false);
- break;
- }
- }
-
- }
- }
- catch (Exception ex)
- {
- _logger.LogError($"解析定位消息发生异常 messageId: {msg!.MessageId}, Message: {ex.Message}, body: {JsonConvert.SerializeObject(msg)}");
- }
-
- await Task.CompletedTask;
- }
-
- private async Task ProcessMessageAsync5(ConsumeResult<Ignore, string> consumeResult, IConsumer<Ignore, string> kafkaConsumer)
- {
- var msg = JsonConvert.DeserializeObject<MessageModel>(consumeResult.Message.Value);
- try
- {
-
- using (_logger.BeginScope(new Dictionary<string, object> { ["RequestId"] = msg!.MessageId }))
- //using (new CustomizeStopWatch(nameof(ProcessMessageAsync), _logger))
- {
- _factoryResolver.Resolver(msg);
- }
- }
- catch (Exception ex)
- {
- _logger.LogError($"解析Property主题消息发生异常 messageId: {msg!.MessageId}, Message: {ex.Message}, body: {JsonConvert.SerializeObject(msg)}");
- }
-
- await Task.CompletedTask;
- }
- private async Task ProcessMessageAsync4(ConsumeResult<Ignore, string> consumeResult, IConsumer<Ignore, string> kafkaConsumer)
- {
- var msg = JsonConvert.DeserializeObject<MessageModel>(consumeResult.Message.Value);
- try
- {
-
- using (_logger.BeginScope(new Dictionary<string, object> { ["RequestId"] = msg!.MessageId }))
- //using (new CustomizeStopWatch(nameof(ProcessMessageAsync), _logger))
- {
- //var positionType = msg!.Data;
- string msgId = msg!.Data["MessageId"]?.ToObject<string>()!;
- string bizId = msg!.Data["BusinessId"]?.ToObject<string>()!;
- IotTopicType topic = msg!.Data["TopicType"]!.ToObject<IotTopicType>();
- int type = msg!.Data["MsgType"]!.ToObject<int>();
- object topicInfo = msg!.Data["TopicInfo"]!.ToObject<object>()!;
- object detailData = msg!.Data["DetailData"]!.ToObject<object>()!;
- var packge = new PackageMsgModel(msgId, bizId, topic, type, topicInfo, detailData);
-
- //return new PackageMsgModel(msgId, bizId, topic, type, topicInfo, detailData);
- var dict = JsonConvert.DeserializeObject<Dictionary<string, object>>(msg!.Data["TopicInfo"]?["items"]?.ToString()!);
- var key = dict?.Keys?.FirstOrDefault() + "";
- switch (key)
- {
- case "GPS": //处理gps位置信息上报
- {
- Console.WriteLine("GPS");
- var data = JsonConvert.DeserializeObject<PropertyItemModel<GpsInfoModel>>(dict?["GPS"] + "");
- if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]");
- break;
- //return new PackageMsgModel(msg.MessageId, topicInfo.DeviceName, IotTopicType.Property, (int)IotMessagePropertyType.GpsPosition, topicInfo, data);
- }
-
- case "WIFI": //处理gps位置信息上报
- {
- var data = JsonConvert.DeserializeObject<PropertyItemModel<WifiInfoModel>>(dict?["WIFI"] + "");
- if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]");
-
- Console.WriteLine("WIFI");
-
- break;
- }
- case "WIFIPLUS2_0": //处理gps位置信息上报
- {
- var data = JsonConvert.DeserializeObject<PropertyItemModel<List<EachWifiPlus2Model>>>(dict?["WIFIPLUS2_0"] + "");
- if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]");
-
- Console.WriteLine("WIFIPLUS2_0");
-
- break;
- }
- case "LBS": //处理gps位置信息上报
- {
- var data = JsonConvert.DeserializeObject<PropertyItemModel<LbsInfoModel>>(dict?["LBS"] + "");
- if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]");
-
- //if (data.Value.Cdma == 0)
- //{
- // return new PackageMsgModel(msg.MessageId, topicInfo.DeviceName, IotTopicType.Property, (int)IotMessagePropertyType.LbsGsmPosition, topicInfo, data);
- //}
- //else if (data.Value.Cdma == 1)
- //{
- // return new PackageMsgModel(msg.MessageId, topicInfo.DeviceName, IotTopicType.Property, (int)IotMessagePropertyType.LbsCdmaPosition, topicInfo, data);
- //}
- Console.WriteLine("WIFIPLUS2_0");
-
- break;
- }
- }
-
- }
- }
- catch (Exception ex)
- {
- _logger.LogError($"解析Property主题消息发生异常 messageId: {msg!.MessageId}, Message: {ex.Message}, body: {JsonConvert.SerializeObject(msg)}");
- }
-
- await Task.CompletedTask;
- }
-
- private async Task ProcessMessageAsync3(ConsumeResult<Ignore, string> consumeResult, IConsumer<Ignore, string> kafkaConsumer)
- {
- #region
-
- Object data = JObject.Parse(@"{
- 'timestamp': 1670231477000,
- 'id': '679e9043-14fe-44df-aba6-b2fd62592fea',
- 'asset': {
- 'id': '679e9043-14fe-44df-aba6-b2fd62592fea',
- 'manufacturer': 'xiaomi',
- 'imeiMd5': '1be14c6210b3115f',
- 'uniqueId': '8f95204d-a60e-40bb-b8be-45a1ab3bef2a'
- },
- 'location': {
- 'Wifis': [
- {
- 'timestamp': 1515743846504,
- 'signalStrength': 40,
- 'macAddress': '6a:77:24:27:9c:04',
- 'ssid': '',
- 'frequency': 2412,
- 'channel': 0,
- 'connected': true
- },
- {
- 'timestamp': 1515743846504,
- 'signalStrength': 60,
- 'macAddress': 'ec:f0:fe:1e:e1:a8',
- 'ssid': 'Z VENTURES 9F',
- 'frequency': 2457,
- 'channel': 0
- },
- {
- 'timestamp': 1515743846504,
- 'signalStrength': 60,
- 'macAddress': '54:a7:03:b5:a5:5e',
- 'ssid': 'Z VENTURES 9F',
- 'frequency': 2457,
- 'channel': 0
- },
- {
- 'timestamp': 1515743846504,
- 'signalStrength': 60,
- 'macAddress': 'ec:f8:eb:b6:89:81',
- 'ssid': 'Z VENTURES 9F',
- 'frequency': 2457,
- 'channel': 0
- },
- {
- 'timestamp': 1515743846504,
- 'signalStrength': 60,
- 'macAddress': 'a0:69:d9:c1:6c:8b',
- 'ssid': 'Z VENTURES 9F',
- 'frequency': 2457,
- 'channel': 0
- },
- {
- 'timestamp': 1515743846504,
- 'signalStrength': 60,
- 'macAddress': '64:64:4a:d5:f5:ca',
- 'ssid': 'Z VENTURES 9F',
- 'frequency': 2457,
- 'channel': 0
- },
- {
- 'timestamp': 1515743846504,
- 'signalStrength': 60,
- 'macAddress': 'c8:5b:a0:f8:f4:6c',
- 'ssid': 'Z VENTURES 9F',
- 'frequency': 2457,
- 'channel': 0
- },
- {
- 'timestamp': 1515743846504,
- 'signalStrength': 60,
- 'macAddress': '84:74:60:bf:fd:20',
- 'ssid': 'Z VENTURES 9F',
- 'frequency': 2457,
- 'channel': 0
- }
- ]
- }
- }");
- // 将对象转换为 JSON 字符串
- var jsonData = JsonConvert.SerializeObject(data);
-
- // 创建 StringContent 对象,将 JSON 字符串传递给它
- var content = new StringContent(jsonData, Encoding.UTF8, "application/json");
- #endregion
-
- var msg = JsonConvert.DeserializeObject<MessageModel>(consumeResult.Message.Value);
- using (_logger.BeginScope(new Dictionary<string, object> { ["RequestId"] = msg!.MessageId }))
- //using (new CustomizeStopWatch(nameof(ProcessMessageAsync), _logger))
- {
- using var httpClient = new HttpClient();
- using var request = new HttpRequestMessage
- {
- RequestUri = new Uri($"https://api.newayz.com/location/hub/v1/track_points?access_key=OK73jLa48dU9V3EvNN1RLpdjcVOdIEcn&field_masks=location.position"),
- Method = HttpMethod.Post,
- // Version = new Version(2, 0)
- Content = content
- };
- using var tokenResponse = await httpClient.SendAsync(request);
- tokenResponse.EnsureSuccessStatusCode();
-
- var token = await tokenResponse.Content.ReadAsStringAsync();
- //await Task.Delay(0);
- //Thread.Sleep(0);
- //Console.WriteLine($"{Guid.NewGuid().ToString()}-- {token[^10..]}");
- }
-
- }
-
- private async Task ProcessMessageAsync2(ConsumeResult<Ignore, string> consumeResult, IConsumer<Ignore, string> kafkaConsumer)
- {
- // using (_logger.BeginScope(new Dictionary<string, object> { ["RequestId"] = i }))
- /**
- using (new CustomizeStopWatch(nameof(ProcessMessageAsync), _logger))
- {
- //if (i==800)
- //{
- // Console.WriteLine($"第{i}消息:{consumeResult.Message.Value.Substring(0, 10)}");
-
- //}
- //await Task.CompletedTask;
- Object data = JObject.Parse(@"{
- 'timestamp': 1670231477000,
- 'id': '679e9043-14fe-44df-aba6-b2fd62592fea',
- 'asset': {
- 'id': '679e9043-14fe-44df-aba6-b2fd62592fea',
- 'manufacturer': 'xiaomi',
- 'imeiMd5': '1be14c6210b3115f',
- 'uniqueId': '8f95204d-a60e-40bb-b8be-45a1ab3bef2a'
- },
- 'location': {
- 'Wifis': [
- {
- 'timestamp': 1515743846504,
- 'signalStrength': 40,
- 'macAddress': '6a:77:24:27:9c:04',
- 'ssid': '',
- 'frequency': 2412,
- 'channel': 0,
- 'connected': true
- },
- {
- 'timestamp': 1515743846504,
- 'signalStrength': 60,
- 'macAddress': 'ec:f0:fe:1e:e1:a8',
- 'ssid': 'Z VENTURES 9F',
- 'frequency': 2457,
- 'channel': 0
- },
- {
- 'timestamp': 1515743846504,
- 'signalStrength': 60,
- 'macAddress': '54:a7:03:b5:a5:5e',
- 'ssid': 'Z VENTURES 9F',
- 'frequency': 2457,
- 'channel': 0
- },
- {
- 'timestamp': 1515743846504,
- 'signalStrength': 60,
- 'macAddress': 'ec:f8:eb:b6:89:81',
- 'ssid': 'Z VENTURES 9F',
- 'frequency': 2457,
- 'channel': 0
- },
- {
- 'timestamp': 1515743846504,
- 'signalStrength': 60,
- 'macAddress': 'a0:69:d9:c1:6c:8b',
- 'ssid': 'Z VENTURES 9F',
- 'frequency': 2457,
- 'channel': 0
- },
- {
- 'timestamp': 1515743846504,
- 'signalStrength': 60,
- 'macAddress': '64:64:4a:d5:f5:ca',
- 'ssid': 'Z VENTURES 9F',
- 'frequency': 2457,
- 'channel': 0
- },
- {
- 'timestamp': 1515743846504,
- 'signalStrength': 60,
- 'macAddress': 'c8:5b:a0:f8:f4:6c',
- 'ssid': 'Z VENTURES 9F',
- 'frequency': 2457,
- 'channel': 0
- },
- {
- 'timestamp': 1515743846504,
- 'signalStrength': 60,
- 'macAddress': '84:74:60:bf:fd:20',
- 'ssid': 'Z VENTURES 9F',
- 'frequency': 2457,
- 'channel': 0
- }
- ]
- }
- }");
- // 将对象转换为 JSON 字符串
- var jsonData = JsonConvert.SerializeObject(data);
-
- // 创建 StringContent 对象,将 JSON 字符串传递给它
- var content = new StringContent(jsonData, Encoding.UTF8, "application/json");
- using var httpClient = new HttpClient();
- using var request = new HttpRequestMessage
- {
- RequestUri = new Uri($"https://api.newayz.com/location/hub/v1/track_points?access_key=OK73jLa48dU9V3EvNN1RLpdjcVOdIEcn&field_masks=location.position"),
- Method = HttpMethod.Post,
- // Version = new Version(2, 0)
- Content = content
- };
- using var tokenResponse = await httpClient.SendAsync(request);
- tokenResponse.EnsureSuccessStatusCode();
-
- var token = await tokenResponse.Content.ReadAsStringAsync();
- Console.WriteLine($"Successfully authenticated. token {token}");
- }
- */
-
- Object data = JObject.Parse(@"{
- 'timestamp': 1670231477000,
- 'id': '679e9043-14fe-44df-aba6-b2fd62592fea',
- 'asset': {
- 'id': '679e9043-14fe-44df-aba6-b2fd62592fea',
- 'manufacturer': 'xiaomi',
- 'imeiMd5': '1be14c6210b3115f',
- 'uniqueId': '8f95204d-a60e-40bb-b8be-45a1ab3bef2a'
- },
- 'location': {
- 'Wifis': [
- {
- 'timestamp': 1515743846504,
- 'signalStrength': 40,
- 'macAddress': '6a:77:24:27:9c:04',
- 'ssid': '',
- 'frequency': 2412,
- 'channel': 0,
- 'connected': true
- },
- {
- 'timestamp': 1515743846504,
- 'signalStrength': 60,
- 'macAddress': 'ec:f0:fe:1e:e1:a8',
- 'ssid': 'Z VENTURES 9F',
- 'frequency': 2457,
- 'channel': 0
- },
- {
- 'timestamp': 1515743846504,
- 'signalStrength': 60,
- 'macAddress': '54:a7:03:b5:a5:5e',
- 'ssid': 'Z VENTURES 9F',
- 'frequency': 2457,
- 'channel': 0
- },
- {
- 'timestamp': 1515743846504,
- 'signalStrength': 60,
- 'macAddress': 'ec:f8:eb:b6:89:81',
- 'ssid': 'Z VENTURES 9F',
- 'frequency': 2457,
- 'channel': 0
- },
- {
- 'timestamp': 1515743846504,
- 'signalStrength': 60,
- 'macAddress': 'a0:69:d9:c1:6c:8b',
- 'ssid': 'Z VENTURES 9F',
- 'frequency': 2457,
- 'channel': 0
- },
- {
- 'timestamp': 1515743846504,
- 'signalStrength': 60,
- 'macAddress': '64:64:4a:d5:f5:ca',
- 'ssid': 'Z VENTURES 9F',
- 'frequency': 2457,
- 'channel': 0
- },
- {
- 'timestamp': 1515743846504,
- 'signalStrength': 60,
- 'macAddress': 'c8:5b:a0:f8:f4:6c',
- 'ssid': 'Z VENTURES 9F',
- 'frequency': 2457,
- 'channel': 0
- },
- {
- 'timestamp': 1515743846504,
- 'signalStrength': 60,
- 'macAddress': '84:74:60:bf:fd:20',
- 'ssid': 'Z VENTURES 9F',
- 'frequency': 2457,
- 'channel': 0
- }
- ]
- }
- }");
- // 将对象转换为 JSON 字符串
- var jsonData = JsonConvert.SerializeObject(data);
-
- // 创建 StringContent 对象,将 JSON 字符串传递给它
- var content = new StringContent(jsonData, Encoding.UTF8, "application/json");
- using var httpClient = new HttpClient();
- using var request = new HttpRequestMessage
- {
- RequestUri = new Uri($"https://api.newayz.com/location/hub/v1/track_points?access_key=OK73jLa48dU9V3EvNN1RLpdjcVOdIEcn&field_masks=location.position"),
- Method = HttpMethod.Post,
- // Version = new Version(2, 0)
- Content = content
- };
- using var tokenResponse = await httpClient.SendAsync(request);
- tokenResponse.EnsureSuccessStatusCode();
-
- var token = await tokenResponse.Content.ReadAsStringAsync();
- Console.WriteLine($"{Guid.NewGuid().ToString()}-- {token[^10..]}");
-
- }
-
- private async Task ProcessMessageAsync(ConsumeResult<Ignore, string> consumeResult, IConsumer<Ignore, string> kafkaConsumer,int i)
- {
- using (_logger.BeginScope(new Dictionary<string, object> { ["RequestId"] = i }))
- using (new CustomizeStopWatch(nameof(ProcessMessageAsync), _logger))
- {
- //if (i==800)
- //{
- // Console.WriteLine($"第{i}消息:{consumeResult.Message.Value.Substring(0, 10)}");
-
- //}
- //await Task.CompletedTask;
- Object data = JObject.Parse(@"{
- 'timestamp': 1670231477000,
- 'id': '679e9043-14fe-44df-aba6-b2fd62592fea',
- 'asset': {
- 'id': '679e9043-14fe-44df-aba6-b2fd62592fea',
- 'manufacturer': 'xiaomi',
- 'imeiMd5': '1be14c6210b3115f',
- 'uniqueId': '8f95204d-a60e-40bb-b8be-45a1ab3bef2a'
- },
- 'location': {
- 'Wifis': [
- {
- 'timestamp': 1515743846504,
- 'signalStrength': 40,
- 'macAddress': '6a:77:24:27:9c:04',
- 'ssid': '',
- 'frequency': 2412,
- 'channel': 0,
- 'connected': true
- },
- {
- 'timestamp': 1515743846504,
- 'signalStrength': 60,
- 'macAddress': 'ec:f0:fe:1e:e1:a8',
- 'ssid': 'Z VENTURES 9F',
- 'frequency': 2457,
- 'channel': 0
- },
- {
- 'timestamp': 1515743846504,
- 'signalStrength': 60,
- 'macAddress': '54:a7:03:b5:a5:5e',
- 'ssid': 'Z VENTURES 9F',
- 'frequency': 2457,
- 'channel': 0
- },
- {
- 'timestamp': 1515743846504,
- 'signalStrength': 60,
- 'macAddress': 'ec:f8:eb:b6:89:81',
- 'ssid': 'Z VENTURES 9F',
- 'frequency': 2457,
- 'channel': 0
- },
- {
- 'timestamp': 1515743846504,
- 'signalStrength': 60,
- 'macAddress': 'a0:69:d9:c1:6c:8b',
- 'ssid': 'Z VENTURES 9F',
- 'frequency': 2457,
- 'channel': 0
- },
- {
- 'timestamp': 1515743846504,
- 'signalStrength': 60,
- 'macAddress': '64:64:4a:d5:f5:ca',
- 'ssid': 'Z VENTURES 9F',
- 'frequency': 2457,
- 'channel': 0
- },
- {
- 'timestamp': 1515743846504,
- 'signalStrength': 60,
- 'macAddress': 'c8:5b:a0:f8:f4:6c',
- 'ssid': 'Z VENTURES 9F',
- 'frequency': 2457,
- 'channel': 0
- },
- {
- 'timestamp': 1515743846504,
- 'signalStrength': 60,
- 'macAddress': '84:74:60:bf:fd:20',
- 'ssid': 'Z VENTURES 9F',
- 'frequency': 2457,
- 'channel': 0
- }
- ]
- }
- }");
- // 将对象转换为 JSON 字符串
- var jsonData = JsonConvert.SerializeObject(data);
-
- // 创建 StringContent 对象,将 JSON 字符串传递给它
- var content = new StringContent(jsonData, Encoding.UTF8, "application/json");
- using var httpClient = new HttpClient();
- using var request = new HttpRequestMessage
- {
- RequestUri = new Uri($"https://api.newayz.com/location/hub/v1/track_points?access_key=OK73jLa48dU9V3EvNN1RLpdjcVOdIEcn&field_masks=location.position"),
- Method = HttpMethod.Post,
- // Version = new Version(2, 0)
- Content= content
- };
- using var tokenResponse = await httpClient.SendAsync(request);
- tokenResponse.EnsureSuccessStatusCode();
-
- var token = await tokenResponse.Content.ReadAsStringAsync();
- Console.WriteLine($"Successfully authenticated. token {token}");
- }
-
- }
- private IConsumer<Ignore, string> CreateKafkaConsumer()
- {
- //var partitionIndex = 0;
- var collection = new List<int>();
- // 获取注册的kafka消费者
-
- var consumerConfig = new ConsumerConfig
- {
- GroupId = "iot.position",
- BootstrapServers = _configService.MqServerAddress,
- AutoOffsetReset = AutoOffsetReset.Earliest,
- EnableAutoCommit = false, // 关闭自动提交偏移量
- CancellationDelayMaxMs = 1//set CancellationDelayMaxMs
- };
-
- return new ConsumerBuilder<Ignore, string>(consumerConfig)
- .SetErrorHandler((_, e) =>
- {
- Console.WriteLine($"消费者创建出错,代码:{e.Code} |原因: {e.Reason}");
- _logger.LogInformation($"消费者创建出错,代码:{e.Code} |原因: {e.Reason}");
- })
- .SetPartitionsAssignedHandler((c, partitions) =>
- {
- //// 在这里手动指定要消费的分区
- //var partitionsToConsume = new List<TopicPartitionOffset>
- //{
- // new TopicPartitionOffset("topics.storage.test_env_db", partitionIndex, Offset.Unset)
- //};
-
- ////c.Assign(partitionsToConsume);
- //Console.WriteLine($"Assigned partitions: {string.Join(", ", partitionsToConsume)}");
- //return partitionsToConsume;
- })
- .SetPartitionsRevokedHandler((c, partitions) =>
- {
-
- })
- .Build();
- }
-
- }
- }
|