using Confluent.Kafka; using GpsCardGatewayPosition.Common; using GpsCardGatewayPosition.Model.Config; using GpsCardGatewayPosition.Model.Enum; using GpsCardGatewayPosition.Model.IoT; using GpsCardGatewayPosition.Service.Cache; using GpsCardGatewayPosition.Service.MqProducer; using GpsCardGatewayPosition.Service.MqProducer.Model; using GpsCardGatewayPosition.Service.Resolver.Factory; using GpsCardGatewayPosition.Service.Resolver.Property; using GpsCardGatewayPosition.Service.Resolver.Property.Dto; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Newtonsoft.Json; using Newtonsoft.Json.Linq; using System; using System.Collections.Generic; using System.Linq; using System.Net; using System.Reflection.Metadata; using System.Text; using System.Threading.Tasks; using System.Web; using static Confluent.Kafka.ConfigPropertyNames; namespace GpsCardGatewayPosition.Postion { public class Worker : BackgroundService { private readonly ILogger _logger; private CancellationTokenSource _tokenSource = default!; private readonly MqProcessLogic _serviceMqProcess; private readonly ServiceConfig _configService; private readonly AppsettingsConfig _configAppsettings; private readonly ResolverFactory _factoryResolver; private readonly WifiPositionResolver _resolverWifiPosition; private readonly WifiPlus2PositionResolver _resolverWifiPlus2Position; private readonly GpsPositionResolver _resolverGpsPosition; private readonly LbsGsmPositionResolver _resolverLbsGsmPosition; public Worker(ILogger logger,IOptions _optConfigService, IOptions optConfigRedis, IOptions optConfigAppsettings, ResolverFactory resolverFactory, MqProcessLogic serviceMqProcess, WifiPositionResolver wifiPositionResolver, WifiPlus2PositionResolver wifiPlus2PositionResolver, GpsPositionResolver gpsPositionResolver,LbsGsmPositionResolver lbsGsmPositionResolver) { _configService = _optConfigService.Value; _configAppsettings = optConfigAppsettings.Value; _serviceMqProcess = serviceMqProcess; _logger = logger; _factoryResolver = resolverFactory; _resolverWifiPosition = wifiPositionResolver; _resolverWifiPlus2Position = wifiPlus2PositionResolver; _resolverGpsPosition = gpsPositionResolver; _resolverLbsGsmPosition= lbsGsmPositionResolver; //配置全局服务 #region 其他业务redis服务器 var csredis = new CSRedis.CSRedisClient(optConfigRedis.Value.ToString()); RedisHelper.Initialization(csredis); RedisConfig csredisDb7Con = optConfigRedis.Value; csredisDb7Con.DefaultDatabase = 7; csredisDb7Con.Prefix = "TELPO"; var csredisDb7 = new CSRedis.CSRedisClient(csredisDb7Con.ToString()); RedisHelperDb7.Initialization(csredisDb7); // RedisConfig csredisDb0Con = optConfigRedis.Value; csredisDb0Con.DefaultDatabase = 0; csredisDb0Con.Prefix = ""; var csredisDb0 = new CSRedis.CSRedisClient(csredisDb0Con.ToString()); RedisHelperDb0.Initialization(csredisDb0); #endregion #region 共享维智定位缓存 // 维智定位缓存Redis Server // 172.19.42.56,47.100.200.105 RedisConfig csredisWayzCon = optConfigRedis.Value; csredisWayzCon.DefaultDatabase = 10; csredisDb7Con.Prefix = "_GW_"; #if DEBUG csredisWayzCon.Server = "192.168.12.127:8090"; #else csredisWayzCon.Server = "172.19.42.56:8090"; #endif var csredisWayz = new CSRedis.CSRedisClient(csredisWayzCon.ToString()); RedisHelperWayz.Initialization(csredisWayz); #endregion } public override Task StartAsync(CancellationToken cancellationToken) { _logger.LogInformation("------StartAsync"); _tokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); // 创建消费者 return base.StartAsync(cancellationToken); } public override Task StopAsync(CancellationToken cancellationToken) { _logger.LogInformation("------StopAsync"); _tokenSource.Cancel(); return base.StopAsync(cancellationToken); } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { var kafkaConsumer = CreateKafkaConsumer(); kafkaConsumer.Subscribe("topics.storage.iot.postion"); var tasks = new List(); //int i = 0; try { while (!stoppingToken.IsCancellationRequested) { #region 生产 // await productPostionAsync(); #endregion #region 消费 // 并发处理 1000 个消息 var concurrencyId = Guid.NewGuid().ToString("N")[^4..]; using (_logger.BeginScope(new Dictionary { ["RequestId"] = concurrencyId })) using (new CustomizeStopWatch(nameof(Worker), _logger)) { //for (int i = 0; i < 2000; i++) //{ // //var consumeResult = kafkaConsumer.Consume(stoppingToken); // var consumeResult = await Task.Run(() => kafkaConsumer.Consume(stoppingToken)); // if (consumeResult != null) // { // tasks.Add(ProcessMessageAsync(consumeResult, kafkaConsumer)); // } //} ////var consumeResult = kafkaConsumer.Consume(stoppingToken); ////if (consumeResult != null) ////{ //// tasks.Add(ProcessMessageAsync(consumeResult, kafkaConsumer)); ////} //// 并行等待所有任务完成 //await Task.WhenAll(tasks); //tasks.Clear(); // 清空任务列表,以便下一个循环重新填充 //Console.WriteLine("----------------------------------------------------------"); var consumeResult = await Task.Run(() => kafkaConsumer.Consume(stoppingToken)); if (consumeResult != null) { tasks.Add(ProcessMessageAsync(consumeResult, kafkaConsumer, concurrencyId)); } if (tasks.Count>=2000) { // 并行等待所有任务完成 await Task.WhenAll(tasks); tasks.Clear(); // 清空任务列表,以便下一个循环重新填充 Console.WriteLine("----------------------------------------------------------"); } } //var consumeResult = kafkaConsumer.Consume(stoppingToken); //if (consumeResult != null) //{ // i++; // tasks.Add(ProcessMessageAsync(consumeResult, kafkaConsumer, i)); //} //await Task.WhenAll(tasks); #endregion } } catch (OperationCanceledException) { _logger.LogWarning("Worker exit"); } catch (Exception ex) { _logger.LogError($"An error occurred: {ex.Message}"); } // await Task.WhenAll(tasks); } private async Task ProductPostionAsync() { Object wifi = JObject.Parse(@"{ 'MessageId': '1775412145908567552', 'BusinessId': '862622050169778', 'ReceiveTime': '2024-04-03 14:36:55', 'TopicType': 512, 'MsgType': 517, 'TopicInfo': { 'deviceType': 'CustomCategory', 'iotId': '7EBPsoxlNmILKsUpKl0L000000', 'requestId': '987', 'productKey': 'a18mXM6Cvx8', 'gmtCreate': 1712126215803, 'deviceName': '862622050169778', 'items': { 'WIFI': { 'value': { 'mmac': '04:6b:25:8e:a1:81,-68,', 'macs': 'f4:2a:7d:ca:c8:c9,-72,|04:6b:25:8e:0a:91,-75,|54:84:dc:37:e8:70,-78,|54:84:dc:37:e8:18,-82,', 'IDType': 0, 'IDNumber': '' }, 'time': 1712126215802 } } }, 'DetailData': { 'time': 1712126215802, 'value': { 'mmac': '04:6b:25:8e:a1:81,-68,', 'macs': 'f4:2a:7d:ca:c8:c9,-72,|04:6b:25:8e:0a:91,-75,|54:84:dc:37:e8:70,-78,|54:84:dc:37:e8:18,-82,', 'IDType': 0, 'IDNumber': '' } } }"); Object wifiplus2 = JObject.Parse(@"{ 'MessageId': '1775412188354926080', 'BusinessId': '861281060094145', 'ReceiveTime': '2024-04-03 14:37:05', 'TopicType': 512, 'MsgType': 514, 'TopicInfo': { 'deviceType': 'CustomCategory', 'iotId': 'yQw7bqBl8gSltErYIRVBg1gs00', 'requestId': '105', 'productKey': 'a18mXM6Cvx8', 'gmtCreate': 1712126225922, 'deviceName': '861281060094145', 'items': { 'WIFIPLUS2_0': { 'time': 1712124640009, 'value': [ { 'bts': '460,00,30977,247196817,-41', 'smac': '', 'nearbts': '|460,00,30977,130,-17|460,00,30977,436,-12|460,00,30977,414,-8|460,00,30977,500,-41|460,00,30977,294,-47|460,00,30977,2,-39', 'imsi': '460077277322956', 'steps': '0', 'IDType': 0, 'network': 'LTE', 'mmac': '08:10:7b:df:3d:c8,-62,', 'datetime': '1712124640001', 'macs': 'ee:b9:70:83:8d:d2,-68,|3c:06:a7:16:a7:21,-68,|3c:06:a7:80:68:1a,-70,|3e:06:a7:a0:68:1a,-70,|08:10:78:91:72:4a,-77,|68:77:24:aa:40:c3,-81,|a8:e5:44:a5:c4:20,-84,', 'imei': '861281060094145', 'cdma': 0, 'IDNumber': '' } ] } } }, 'DetailData': { 'time': 1712124640009, 'value': [ { 'Mmac': '08:10:7b:df:3d:c8,-62,', 'Macs': 'ee:b9:70:83:8d:d2,-68,|3c:06:a7:16:a7:21,-68,|3c:06:a7:80:68:1a,-70,|3e:06:a7:a0:68:1a,-70,|08:10:78:91:72:4a,-77,|68:77:24:aa:40:c3,-81,|a8:e5:44:a5:c4:20,-84,', 'IdType': 0, 'IdNumber': '', 'Imei': '861281060094145', 'Smac': '', 'Imsi': '460077277322956', 'NearBts': '|460,00,30977,130,-17|460,00,30977,436,-12|460,00,30977,414,-8|460,00,30977,500,-41|460,00,30977,294,-47|460,00,30977,2,-39', 'Bts': '460,00,30977,247196817,-41', 'Cdma': 0, 'Network': 'LTE', 'Steps': '0', 'Datetime': '1712124640001' } ] } }"); //Object lbs = JObject.Parse(@'); //Object gps = JObject.Parse(@'); for (int i = 0; i < 1000; i++) { if (i % 2 == 1) { var serialno = "862622050169778"; var messageId = Guid.NewGuid().ToString(); await _serviceMqProcess.ProcessIotPositionAsync(messageId, serialno, wifi); } else { var serialno = "861281060094145"; var messageId = Guid.NewGuid().ToString(); await _serviceMqProcess.ProcessIotPositionAsync(messageId, serialno, wifiplus2); } } } private async Task ProcessMessageAsync(ConsumeResult consumeResult, IConsumer kafkaConsumer,string concurrencyId) { var msg = JsonConvert.DeserializeObject(consumeResult.Message.Value); string msgId = msg!.Data["MessageId"]?.ToObject()!; string bizId = msg!.Data["BusinessId"]?.ToObject()!; IotTopicType topic = msg!.Data["TopicType"]!.ToObject(); int type = msg!.Data["MsgType"]!.ToObject(); object topicInfo = msg!.Data["TopicInfo"]!.ToObject()!; object detailData = msg!.Data["DetailData"]!.ToObject()!; var packge = new PackageMsgModel(msgId, bizId, topic, type, topicInfo, detailData); try { using (_logger.BeginScope(new Dictionary { ["RequestId"] =$"{concurrencyId}-{msg!.MessageId}" })) using (new CustomizeStopWatch(IotMessageTypeUtils.TryToDescription(type), _logger)) { var dict = JsonConvert.DeserializeObject>(msg!.Data["TopicInfo"]?["items"]?.ToString()!); var key = dict?.Keys?.FirstOrDefault() + ""; switch (key) { case "GPS": //处理gps位置信息上报 { //Console.WriteLine("GPS"); var data = JsonConvert.DeserializeObject>(dict?["GPS"] + ""); //if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]"); _resolverGpsPosition.SetResolveInfo(packge); await _resolverGpsPosition.ExecuteMessageAsync().ConfigureAwait(false); kafkaConsumer.Commit(consumeResult); break; //return new PackageMsgModel(msg.MessageId, topicInfo.DeviceName, IotTopicType.Property, (int)IotMessagePropertyType.GpsPosition, topicInfo, data); } case "WIFI": //处理gps位置信息上报 { var data = JsonConvert.DeserializeObject>(dict?["WIFI"] + ""); //if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]"); _resolverWifiPosition.SetResolveInfo(packge); await _resolverWifiPosition.ExecuteMessageAsync().ConfigureAwait(false); kafkaConsumer.Commit(consumeResult); break; } case "WIFIPLUS2_0": //处理gps位置信息上报 { var data = JsonConvert.DeserializeObject>>(dict?["WIFIPLUS2_0"] + ""); //if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]"); _resolverWifiPlus2Position.SetResolveInfo(packge); await _resolverWifiPlus2Position.ExecuteMessageAsync().ConfigureAwait(false); kafkaConsumer.Commit(consumeResult); break; } case "LBS": //处理gps位置信息上报 { var data = JsonConvert.DeserializeObject>(dict?["LBS"] + ""); //if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]"); //if (data.Value.Cdma == 0) //{ // return new PackageMsgModel(msg.MessageId, topicInfo.DeviceName, IotTopicType.Property, (int)IotMessagePropertyType.LbsGsmPosition, topicInfo, data); //} //else if (data.Value.Cdma == 1) //{ // return new PackageMsgModel(msg.MessageId, topicInfo.DeviceName, IotTopicType.Property, (int)IotMessagePropertyType.LbsCdmaPosition, topicInfo, data); //} _resolverLbsGsmPosition.SetResolveInfo(packge); await _resolverLbsGsmPosition.ExecuteMessageAsync().ConfigureAwait(false); kafkaConsumer.Commit(consumeResult); break; } } } } catch (Exception ex) { _logger.LogError($"解析定位消息发生异常 messageId: {msg!.MessageId}, Message: {ex.Message}, body: {JsonConvert.SerializeObject(msg)}"); } await Task.CompletedTask; } private async Task ProcessMessageAsync6(ConsumeResult consumeResult, IConsumer kafkaConsumer) { var msg = JsonConvert.DeserializeObject(consumeResult.Message.Value); string msgId = msg!.Data["MessageId"]?.ToObject()!; string bizId = msg!.Data["BusinessId"]?.ToObject()!; IotTopicType topic = msg!.Data["TopicType"]!.ToObject(); int type = msg!.Data["MsgType"]!.ToObject(); object topicInfo = msg!.Data["TopicInfo"]!.ToObject()!; object detailData = msg!.Data["DetailData"]!.ToObject()!; var packge = new PackageMsgModel(msgId, bizId, topic, type, topicInfo, detailData); try { using (_logger.BeginScope(new Dictionary { ["RequestId"] = msg!.MessageId })) using (new CustomizeStopWatch(IotMessageTypeUtils.TryToDescription(type), _logger)) { var dict = JsonConvert.DeserializeObject>(msg!.Data["TopicInfo"]?["items"]?.ToString()!); var key = dict?.Keys?.FirstOrDefault() + ""; switch (key) { case "GPS": //处理gps位置信息上报 { //Console.WriteLine("GPS"); var data = JsonConvert.DeserializeObject>(dict?["GPS"] + ""); //if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]"); _resolverGpsPosition.SetResolveInfo(packge); await _resolverGpsPosition.ExecuteMessageAsync().ConfigureAwait(false); break; //return new PackageMsgModel(msg.MessageId, topicInfo.DeviceName, IotTopicType.Property, (int)IotMessagePropertyType.GpsPosition, topicInfo, data); } case "WIFI": //处理gps位置信息上报 { var data = JsonConvert.DeserializeObject>(dict?["WIFI"] + ""); //if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]"); _resolverWifiPosition.SetResolveInfo(packge); await _resolverWifiPosition.ExecuteMessageAsync().ConfigureAwait(false); break; } case "WIFIPLUS2_0": //处理gps位置信息上报 { var data = JsonConvert.DeserializeObject>>(dict?["WIFIPLUS2_0"] + ""); //if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]"); _resolverWifiPlus2Position.SetResolveInfo(packge); await _resolverWifiPlus2Position.ExecuteMessageAsync().ConfigureAwait(false); break; } case "LBS": //处理gps位置信息上报 { var data = JsonConvert.DeserializeObject>(dict?["LBS"] + ""); //if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]"); //if (data.Value.Cdma == 0) //{ // return new PackageMsgModel(msg.MessageId, topicInfo.DeviceName, IotTopicType.Property, (int)IotMessagePropertyType.LbsGsmPosition, topicInfo, data); //} //else if (data.Value.Cdma == 1) //{ // return new PackageMsgModel(msg.MessageId, topicInfo.DeviceName, IotTopicType.Property, (int)IotMessagePropertyType.LbsCdmaPosition, topicInfo, data); //} _resolverLbsGsmPosition.SetResolveInfo(packge); await _resolverLbsGsmPosition.ExecuteMessageAsync().ConfigureAwait(false); break; } } } } catch (Exception ex) { _logger.LogError($"解析定位消息发生异常 messageId: {msg!.MessageId}, Message: {ex.Message}, body: {JsonConvert.SerializeObject(msg)}"); } await Task.CompletedTask; } private async Task ProcessMessageAsync5(ConsumeResult consumeResult, IConsumer kafkaConsumer) { var msg = JsonConvert.DeserializeObject(consumeResult.Message.Value); try { using (_logger.BeginScope(new Dictionary { ["RequestId"] = msg!.MessageId })) //using (new CustomizeStopWatch(nameof(ProcessMessageAsync), _logger)) { _factoryResolver.Resolver(msg); } } catch (Exception ex) { _logger.LogError($"解析Property主题消息发生异常 messageId: {msg!.MessageId}, Message: {ex.Message}, body: {JsonConvert.SerializeObject(msg)}"); } await Task.CompletedTask; } private async Task ProcessMessageAsync4(ConsumeResult consumeResult, IConsumer kafkaConsumer) { var msg = JsonConvert.DeserializeObject(consumeResult.Message.Value); try { using (_logger.BeginScope(new Dictionary { ["RequestId"] = msg!.MessageId })) //using (new CustomizeStopWatch(nameof(ProcessMessageAsync), _logger)) { //var positionType = msg!.Data; string msgId = msg!.Data["MessageId"]?.ToObject()!; string bizId = msg!.Data["BusinessId"]?.ToObject()!; IotTopicType topic = msg!.Data["TopicType"]!.ToObject(); int type = msg!.Data["MsgType"]!.ToObject(); object topicInfo = msg!.Data["TopicInfo"]!.ToObject()!; object detailData = msg!.Data["DetailData"]!.ToObject()!; var packge = new PackageMsgModel(msgId, bizId, topic, type, topicInfo, detailData); //return new PackageMsgModel(msgId, bizId, topic, type, topicInfo, detailData); var dict = JsonConvert.DeserializeObject>(msg!.Data["TopicInfo"]?["items"]?.ToString()!); var key = dict?.Keys?.FirstOrDefault() + ""; switch (key) { case "GPS": //处理gps位置信息上报 { Console.WriteLine("GPS"); var data = JsonConvert.DeserializeObject>(dict?["GPS"] + ""); if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]"); break; //return new PackageMsgModel(msg.MessageId, topicInfo.DeviceName, IotTopicType.Property, (int)IotMessagePropertyType.GpsPosition, topicInfo, data); } case "WIFI": //处理gps位置信息上报 { var data = JsonConvert.DeserializeObject>(dict?["WIFI"] + ""); if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]"); Console.WriteLine("WIFI"); break; } case "WIFIPLUS2_0": //处理gps位置信息上报 { var data = JsonConvert.DeserializeObject>>(dict?["WIFIPLUS2_0"] + ""); if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]"); Console.WriteLine("WIFIPLUS2_0"); break; } case "LBS": //处理gps位置信息上报 { var data = JsonConvert.DeserializeObject>(dict?["LBS"] + ""); if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]"); //if (data.Value.Cdma == 0) //{ // return new PackageMsgModel(msg.MessageId, topicInfo.DeviceName, IotTopicType.Property, (int)IotMessagePropertyType.LbsGsmPosition, topicInfo, data); //} //else if (data.Value.Cdma == 1) //{ // return new PackageMsgModel(msg.MessageId, topicInfo.DeviceName, IotTopicType.Property, (int)IotMessagePropertyType.LbsCdmaPosition, topicInfo, data); //} Console.WriteLine("WIFIPLUS2_0"); break; } } } } catch (Exception ex) { _logger.LogError($"解析Property主题消息发生异常 messageId: {msg!.MessageId}, Message: {ex.Message}, body: {JsonConvert.SerializeObject(msg)}"); } await Task.CompletedTask; } private async Task ProcessMessageAsync3(ConsumeResult consumeResult, IConsumer kafkaConsumer) { #region Object data = JObject.Parse(@"{ 'timestamp': 1670231477000, 'id': '679e9043-14fe-44df-aba6-b2fd62592fea', 'asset': { 'id': '679e9043-14fe-44df-aba6-b2fd62592fea', 'manufacturer': 'xiaomi', 'imeiMd5': '1be14c6210b3115f', 'uniqueId': '8f95204d-a60e-40bb-b8be-45a1ab3bef2a' }, 'location': { 'Wifis': [ { 'timestamp': 1515743846504, 'signalStrength': 40, 'macAddress': '6a:77:24:27:9c:04', 'ssid': '', 'frequency': 2412, 'channel': 0, 'connected': true }, { 'timestamp': 1515743846504, 'signalStrength': 60, 'macAddress': 'ec:f0:fe:1e:e1:a8', 'ssid': 'Z VENTURES 9F', 'frequency': 2457, 'channel': 0 }, { 'timestamp': 1515743846504, 'signalStrength': 60, 'macAddress': '54:a7:03:b5:a5:5e', 'ssid': 'Z VENTURES 9F', 'frequency': 2457, 'channel': 0 }, { 'timestamp': 1515743846504, 'signalStrength': 60, 'macAddress': 'ec:f8:eb:b6:89:81', 'ssid': 'Z VENTURES 9F', 'frequency': 2457, 'channel': 0 }, { 'timestamp': 1515743846504, 'signalStrength': 60, 'macAddress': 'a0:69:d9:c1:6c:8b', 'ssid': 'Z VENTURES 9F', 'frequency': 2457, 'channel': 0 }, { 'timestamp': 1515743846504, 'signalStrength': 60, 'macAddress': '64:64:4a:d5:f5:ca', 'ssid': 'Z VENTURES 9F', 'frequency': 2457, 'channel': 0 }, { 'timestamp': 1515743846504, 'signalStrength': 60, 'macAddress': 'c8:5b:a0:f8:f4:6c', 'ssid': 'Z VENTURES 9F', 'frequency': 2457, 'channel': 0 }, { 'timestamp': 1515743846504, 'signalStrength': 60, 'macAddress': '84:74:60:bf:fd:20', 'ssid': 'Z VENTURES 9F', 'frequency': 2457, 'channel': 0 } ] } }"); // 将对象转换为 JSON 字符串 var jsonData = JsonConvert.SerializeObject(data); // 创建 StringContent 对象,将 JSON 字符串传递给它 var content = new StringContent(jsonData, Encoding.UTF8, "application/json"); #endregion var msg = JsonConvert.DeserializeObject(consumeResult.Message.Value); using (_logger.BeginScope(new Dictionary { ["RequestId"] = msg!.MessageId })) //using (new CustomizeStopWatch(nameof(ProcessMessageAsync), _logger)) { using var httpClient = new HttpClient(); using var request = new HttpRequestMessage { RequestUri = new Uri($"https://api.newayz.com/location/hub/v1/track_points?access_key=OK73jLa48dU9V3EvNN1RLpdjcVOdIEcn&field_masks=location.position"), Method = HttpMethod.Post, // Version = new Version(2, 0) Content = content }; using var tokenResponse = await httpClient.SendAsync(request); tokenResponse.EnsureSuccessStatusCode(); var token = await tokenResponse.Content.ReadAsStringAsync(); //await Task.Delay(0); //Thread.Sleep(0); //Console.WriteLine($"{Guid.NewGuid().ToString()}-- {token[^10..]}"); } } private async Task ProcessMessageAsync2(ConsumeResult consumeResult, IConsumer kafkaConsumer) { // using (_logger.BeginScope(new Dictionary { ["RequestId"] = i })) /** using (new CustomizeStopWatch(nameof(ProcessMessageAsync), _logger)) { //if (i==800) //{ // Console.WriteLine($"第{i}消息:{consumeResult.Message.Value.Substring(0, 10)}"); //} //await Task.CompletedTask; Object data = JObject.Parse(@"{ 'timestamp': 1670231477000, 'id': '679e9043-14fe-44df-aba6-b2fd62592fea', 'asset': { 'id': '679e9043-14fe-44df-aba6-b2fd62592fea', 'manufacturer': 'xiaomi', 'imeiMd5': '1be14c6210b3115f', 'uniqueId': '8f95204d-a60e-40bb-b8be-45a1ab3bef2a' }, 'location': { 'Wifis': [ { 'timestamp': 1515743846504, 'signalStrength': 40, 'macAddress': '6a:77:24:27:9c:04', 'ssid': '', 'frequency': 2412, 'channel': 0, 'connected': true }, { 'timestamp': 1515743846504, 'signalStrength': 60, 'macAddress': 'ec:f0:fe:1e:e1:a8', 'ssid': 'Z VENTURES 9F', 'frequency': 2457, 'channel': 0 }, { 'timestamp': 1515743846504, 'signalStrength': 60, 'macAddress': '54:a7:03:b5:a5:5e', 'ssid': 'Z VENTURES 9F', 'frequency': 2457, 'channel': 0 }, { 'timestamp': 1515743846504, 'signalStrength': 60, 'macAddress': 'ec:f8:eb:b6:89:81', 'ssid': 'Z VENTURES 9F', 'frequency': 2457, 'channel': 0 }, { 'timestamp': 1515743846504, 'signalStrength': 60, 'macAddress': 'a0:69:d9:c1:6c:8b', 'ssid': 'Z VENTURES 9F', 'frequency': 2457, 'channel': 0 }, { 'timestamp': 1515743846504, 'signalStrength': 60, 'macAddress': '64:64:4a:d5:f5:ca', 'ssid': 'Z VENTURES 9F', 'frequency': 2457, 'channel': 0 }, { 'timestamp': 1515743846504, 'signalStrength': 60, 'macAddress': 'c8:5b:a0:f8:f4:6c', 'ssid': 'Z VENTURES 9F', 'frequency': 2457, 'channel': 0 }, { 'timestamp': 1515743846504, 'signalStrength': 60, 'macAddress': '84:74:60:bf:fd:20', 'ssid': 'Z VENTURES 9F', 'frequency': 2457, 'channel': 0 } ] } }"); // 将对象转换为 JSON 字符串 var jsonData = JsonConvert.SerializeObject(data); // 创建 StringContent 对象,将 JSON 字符串传递给它 var content = new StringContent(jsonData, Encoding.UTF8, "application/json"); using var httpClient = new HttpClient(); using var request = new HttpRequestMessage { RequestUri = new Uri($"https://api.newayz.com/location/hub/v1/track_points?access_key=OK73jLa48dU9V3EvNN1RLpdjcVOdIEcn&field_masks=location.position"), Method = HttpMethod.Post, // Version = new Version(2, 0) Content = content }; using var tokenResponse = await httpClient.SendAsync(request); tokenResponse.EnsureSuccessStatusCode(); var token = await tokenResponse.Content.ReadAsStringAsync(); Console.WriteLine($"Successfully authenticated. token {token}"); } */ Object data = JObject.Parse(@"{ 'timestamp': 1670231477000, 'id': '679e9043-14fe-44df-aba6-b2fd62592fea', 'asset': { 'id': '679e9043-14fe-44df-aba6-b2fd62592fea', 'manufacturer': 'xiaomi', 'imeiMd5': '1be14c6210b3115f', 'uniqueId': '8f95204d-a60e-40bb-b8be-45a1ab3bef2a' }, 'location': { 'Wifis': [ { 'timestamp': 1515743846504, 'signalStrength': 40, 'macAddress': '6a:77:24:27:9c:04', 'ssid': '', 'frequency': 2412, 'channel': 0, 'connected': true }, { 'timestamp': 1515743846504, 'signalStrength': 60, 'macAddress': 'ec:f0:fe:1e:e1:a8', 'ssid': 'Z VENTURES 9F', 'frequency': 2457, 'channel': 0 }, { 'timestamp': 1515743846504, 'signalStrength': 60, 'macAddress': '54:a7:03:b5:a5:5e', 'ssid': 'Z VENTURES 9F', 'frequency': 2457, 'channel': 0 }, { 'timestamp': 1515743846504, 'signalStrength': 60, 'macAddress': 'ec:f8:eb:b6:89:81', 'ssid': 'Z VENTURES 9F', 'frequency': 2457, 'channel': 0 }, { 'timestamp': 1515743846504, 'signalStrength': 60, 'macAddress': 'a0:69:d9:c1:6c:8b', 'ssid': 'Z VENTURES 9F', 'frequency': 2457, 'channel': 0 }, { 'timestamp': 1515743846504, 'signalStrength': 60, 'macAddress': '64:64:4a:d5:f5:ca', 'ssid': 'Z VENTURES 9F', 'frequency': 2457, 'channel': 0 }, { 'timestamp': 1515743846504, 'signalStrength': 60, 'macAddress': 'c8:5b:a0:f8:f4:6c', 'ssid': 'Z VENTURES 9F', 'frequency': 2457, 'channel': 0 }, { 'timestamp': 1515743846504, 'signalStrength': 60, 'macAddress': '84:74:60:bf:fd:20', 'ssid': 'Z VENTURES 9F', 'frequency': 2457, 'channel': 0 } ] } }"); // 将对象转换为 JSON 字符串 var jsonData = JsonConvert.SerializeObject(data); // 创建 StringContent 对象,将 JSON 字符串传递给它 var content = new StringContent(jsonData, Encoding.UTF8, "application/json"); using var httpClient = new HttpClient(); using var request = new HttpRequestMessage { RequestUri = new Uri($"https://api.newayz.com/location/hub/v1/track_points?access_key=OK73jLa48dU9V3EvNN1RLpdjcVOdIEcn&field_masks=location.position"), Method = HttpMethod.Post, // Version = new Version(2, 0) Content = content }; using var tokenResponse = await httpClient.SendAsync(request); tokenResponse.EnsureSuccessStatusCode(); var token = await tokenResponse.Content.ReadAsStringAsync(); Console.WriteLine($"{Guid.NewGuid().ToString()}-- {token[^10..]}"); } private async Task ProcessMessageAsync(ConsumeResult consumeResult, IConsumer kafkaConsumer,int i) { using (_logger.BeginScope(new Dictionary { ["RequestId"] = i })) using (new CustomizeStopWatch(nameof(ProcessMessageAsync), _logger)) { //if (i==800) //{ // Console.WriteLine($"第{i}消息:{consumeResult.Message.Value.Substring(0, 10)}"); //} //await Task.CompletedTask; Object data = JObject.Parse(@"{ 'timestamp': 1670231477000, 'id': '679e9043-14fe-44df-aba6-b2fd62592fea', 'asset': { 'id': '679e9043-14fe-44df-aba6-b2fd62592fea', 'manufacturer': 'xiaomi', 'imeiMd5': '1be14c6210b3115f', 'uniqueId': '8f95204d-a60e-40bb-b8be-45a1ab3bef2a' }, 'location': { 'Wifis': [ { 'timestamp': 1515743846504, 'signalStrength': 40, 'macAddress': '6a:77:24:27:9c:04', 'ssid': '', 'frequency': 2412, 'channel': 0, 'connected': true }, { 'timestamp': 1515743846504, 'signalStrength': 60, 'macAddress': 'ec:f0:fe:1e:e1:a8', 'ssid': 'Z VENTURES 9F', 'frequency': 2457, 'channel': 0 }, { 'timestamp': 1515743846504, 'signalStrength': 60, 'macAddress': '54:a7:03:b5:a5:5e', 'ssid': 'Z VENTURES 9F', 'frequency': 2457, 'channel': 0 }, { 'timestamp': 1515743846504, 'signalStrength': 60, 'macAddress': 'ec:f8:eb:b6:89:81', 'ssid': 'Z VENTURES 9F', 'frequency': 2457, 'channel': 0 }, { 'timestamp': 1515743846504, 'signalStrength': 60, 'macAddress': 'a0:69:d9:c1:6c:8b', 'ssid': 'Z VENTURES 9F', 'frequency': 2457, 'channel': 0 }, { 'timestamp': 1515743846504, 'signalStrength': 60, 'macAddress': '64:64:4a:d5:f5:ca', 'ssid': 'Z VENTURES 9F', 'frequency': 2457, 'channel': 0 }, { 'timestamp': 1515743846504, 'signalStrength': 60, 'macAddress': 'c8:5b:a0:f8:f4:6c', 'ssid': 'Z VENTURES 9F', 'frequency': 2457, 'channel': 0 }, { 'timestamp': 1515743846504, 'signalStrength': 60, 'macAddress': '84:74:60:bf:fd:20', 'ssid': 'Z VENTURES 9F', 'frequency': 2457, 'channel': 0 } ] } }"); // 将对象转换为 JSON 字符串 var jsonData = JsonConvert.SerializeObject(data); // 创建 StringContent 对象,将 JSON 字符串传递给它 var content = new StringContent(jsonData, Encoding.UTF8, "application/json"); using var httpClient = new HttpClient(); using var request = new HttpRequestMessage { RequestUri = new Uri($"https://api.newayz.com/location/hub/v1/track_points?access_key=OK73jLa48dU9V3EvNN1RLpdjcVOdIEcn&field_masks=location.position"), Method = HttpMethod.Post, // Version = new Version(2, 0) Content= content }; using var tokenResponse = await httpClient.SendAsync(request); tokenResponse.EnsureSuccessStatusCode(); var token = await tokenResponse.Content.ReadAsStringAsync(); Console.WriteLine($"Successfully authenticated. token {token}"); } } private IConsumer CreateKafkaConsumer() { //var partitionIndex = 0; var collection = new List(); // 获取注册的kafka消费者 var consumerConfig = new ConsumerConfig { GroupId = "iot.position", BootstrapServers = _configService.MqServerAddress, AutoOffsetReset = AutoOffsetReset.Earliest, EnableAutoCommit = false, // 关闭自动提交偏移量 CancellationDelayMaxMs = 1//set CancellationDelayMaxMs }; return new ConsumerBuilder(consumerConfig) .SetErrorHandler((_, e) => { Console.WriteLine($"消费者创建出错,代码:{e.Code} |原因: {e.Reason}"); _logger.LogInformation($"消费者创建出错,代码:{e.Code} |原因: {e.Reason}"); }) .SetPartitionsAssignedHandler((c, partitions) => { //// 在这里手动指定要消费的分区 //var partitionsToConsume = new List //{ // new TopicPartitionOffset("topics.storage.test_env_db", partitionIndex, Offset.Unset) //}; ////c.Assign(partitionsToConsume); //Console.WriteLine($"Assigned partitions: {string.Join(", ", partitionsToConsume)}"); //return partitionsToConsume; }) .SetPartitionsRevokedHandler((c, partitions) => { }) .Build(); } } }