You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

Worker.cs 44KB

7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033
  1. using Confluent.Kafka;
  2. using GpsCardGatewayPosition.Common;
  3. using GpsCardGatewayPosition.Model.Config;
  4. using GpsCardGatewayPosition.Model.Enum;
  5. using GpsCardGatewayPosition.Model.IoT;
  6. using GpsCardGatewayPosition.Service.Cache;
  7. using GpsCardGatewayPosition.Service.MqProducer;
  8. using GpsCardGatewayPosition.Service.MqProducer.Model;
  9. using GpsCardGatewayPosition.Service.Resolver.Factory;
  10. using GpsCardGatewayPosition.Service.Resolver.Property;
  11. using GpsCardGatewayPosition.Service.Resolver.Property.Dto;
  12. using Microsoft.Extensions.Hosting;
  13. using Microsoft.Extensions.Logging;
  14. using Microsoft.Extensions.Options;
  15. using Newtonsoft.Json;
  16. using Newtonsoft.Json.Linq;
  17. using System;
  18. using System.Collections.Generic;
  19. using System.Linq;
  20. using System.Net;
  21. using System.Reflection.Metadata;
  22. using System.Text;
  23. using System.Threading.Tasks;
  24. using System.Web;
  25. using static Confluent.Kafka.ConfigPropertyNames;
  26. namespace GpsCardGatewayPosition.Postion
  27. {
  28. public class Worker : BackgroundService
  29. {
  30. private readonly ILogger<Worker> _logger;
  31. private CancellationTokenSource _tokenSource = default!;
  32. private readonly MqProcessLogic _serviceMqProcess;
  33. private readonly ServiceConfig _configService;
  34. private readonly AppsettingsConfig _configAppsettings;
  35. private readonly ResolverFactory _factoryResolver;
  36. private readonly WifiPositionResolver _resolverWifiPosition;
  37. private readonly WifiPlus2PositionResolver _resolverWifiPlus2Position;
  38. private readonly GpsPositionResolver _resolverGpsPosition;
  39. private readonly LbsGsmPositionResolver _resolverLbsGsmPosition;
  40. public Worker(ILogger<Worker> logger,IOptions<ServiceConfig> _optConfigService, IOptions<RedisConfig> optConfigRedis, IOptions<AppsettingsConfig> optConfigAppsettings, ResolverFactory resolverFactory, MqProcessLogic serviceMqProcess,
  41. WifiPositionResolver wifiPositionResolver, WifiPlus2PositionResolver wifiPlus2PositionResolver, GpsPositionResolver gpsPositionResolver,LbsGsmPositionResolver lbsGsmPositionResolver)
  42. {
  43. _configService = _optConfigService.Value;
  44. _configAppsettings = optConfigAppsettings.Value;
  45. _serviceMqProcess = serviceMqProcess;
  46. _logger = logger;
  47. _factoryResolver = resolverFactory;
  48. _resolverWifiPosition = wifiPositionResolver;
  49. _resolverWifiPlus2Position = wifiPlus2PositionResolver;
  50. _resolverGpsPosition = gpsPositionResolver;
  51. _resolverLbsGsmPosition= lbsGsmPositionResolver;
  52. //配置全局服务
  53. #region 其他业务redis服务器
  54. var csredis = new CSRedis.CSRedisClient(optConfigRedis.Value.ToString());
  55. RedisHelper.Initialization(csredis);
  56. RedisConfig csredisDb7Con = optConfigRedis.Value;
  57. csredisDb7Con.DefaultDatabase = 7;
  58. csredisDb7Con.Prefix = "TELPO";
  59. var csredisDb7 = new CSRedis.CSRedisClient(csredisDb7Con.ToString());
  60. RedisHelperDb7.Initialization(csredisDb7);
  61. //
  62. RedisConfig csredisDb0Con = optConfigRedis.Value;
  63. csredisDb0Con.DefaultDatabase = 0;
  64. csredisDb0Con.Prefix = "";
  65. var csredisDb0 = new CSRedis.CSRedisClient(csredisDb0Con.ToString());
  66. RedisHelperDb0.Initialization(csredisDb0);
  67. #endregion
  68. #region 共享维智定位缓存
  69. // 维智定位缓存Redis Server
  70. // 172.19.42.56,47.100.200.105
  71. RedisConfig csredisWayzCon = optConfigRedis.Value;
  72. csredisWayzCon.DefaultDatabase = 10;
  73. csredisDb7Con.Prefix = "_GW_";
  74. #if DEBUG
  75. csredisWayzCon.Server = "192.168.12.127:8090";
  76. #else
  77. csredisWayzCon.Server = "172.19.42.56:8090";
  78. #endif
  79. var csredisWayz = new CSRedis.CSRedisClient(csredisWayzCon.ToString());
  80. RedisHelperWayz.Initialization(csredisWayz);
  81. #endregion
  82. }
  83. public override Task StartAsync(CancellationToken cancellationToken)
  84. {
  85. _logger.LogInformation("------StartAsync");
  86. _tokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
  87. // 创建消费者
  88. return base.StartAsync(cancellationToken);
  89. }
  90. public override Task StopAsync(CancellationToken cancellationToken)
  91. {
  92. _logger.LogInformation("------StopAsync");
  93. _tokenSource.Cancel();
  94. return base.StopAsync(cancellationToken);
  95. }
  96. protected override async Task ExecuteAsync(CancellationToken stoppingToken)
  97. {
  98. var kafkaConsumer = CreateKafkaConsumer();
  99. kafkaConsumer.Subscribe("topics.storage.iot.postion");
  100. var tasks = new List<Task>();
  101. //int i = 0;
  102. try
  103. {
  104. while (!stoppingToken.IsCancellationRequested)
  105. {
  106. #region 生产
  107. // await productPostionAsync();
  108. #endregion
  109. #region 消费
  110. // 并发处理 1000 个消息
  111. var concurrencyId = Guid.NewGuid().ToString("N")[^4..];
  112. using (_logger.BeginScope(new Dictionary<string, object> { ["RequestId"] = concurrencyId }))
  113. using (new CustomizeStopWatch(nameof(Worker), _logger))
  114. {
  115. //for (int i = 0; i < 2000; i++)
  116. //{
  117. // //var consumeResult = kafkaConsumer.Consume(stoppingToken);
  118. // var consumeResult = await Task.Run(() => kafkaConsumer.Consume(stoppingToken));
  119. // if (consumeResult != null)
  120. // {
  121. // tasks.Add(ProcessMessageAsync(consumeResult, kafkaConsumer));
  122. // }
  123. //}
  124. ////var consumeResult = kafkaConsumer.Consume(stoppingToken);
  125. ////if (consumeResult != null)
  126. ////{
  127. //// tasks.Add(ProcessMessageAsync(consumeResult, kafkaConsumer));
  128. ////}
  129. //// 并行等待所有任务完成
  130. //await Task.WhenAll(tasks);
  131. //tasks.Clear(); // 清空任务列表,以便下一个循环重新填充
  132. //Console.WriteLine("----------------------------------------------------------");
  133. var consumeResult = await Task.Run(() => kafkaConsumer.Consume(stoppingToken));
  134. if (consumeResult != null)
  135. {
  136. tasks.Add(ProcessMessageAsync(consumeResult, kafkaConsumer, concurrencyId));
  137. }
  138. if (tasks.Count>=2000)
  139. {
  140. // 并行等待所有任务完成
  141. await Task.WhenAll(tasks);
  142. tasks.Clear(); // 清空任务列表,以便下一个循环重新填充
  143. Console.WriteLine("----------------------------------------------------------");
  144. }
  145. }
  146. //var consumeResult = kafkaConsumer.Consume(stoppingToken);
  147. //if (consumeResult != null)
  148. //{
  149. // i++;
  150. // tasks.Add(ProcessMessageAsync(consumeResult, kafkaConsumer, i));
  151. //}
  152. //await Task.WhenAll(tasks);
  153. #endregion
  154. }
  155. }
  156. catch (OperationCanceledException)
  157. {
  158. _logger.LogWarning("Worker exit");
  159. }
  160. catch (Exception ex)
  161. {
  162. _logger.LogError($"An error occurred: {ex.Message}");
  163. }
  164. // await Task.WhenAll(tasks);
  165. }
  166. private async Task ProductPostionAsync()
  167. {
  168. Object wifi = JObject.Parse(@"{
  169. 'MessageId': '1775412145908567552',
  170. 'BusinessId': '862622050169778',
  171. 'ReceiveTime': '2024-04-03 14:36:55',
  172. 'TopicType': 512,
  173. 'MsgType': 517,
  174. 'TopicInfo': {
  175. 'deviceType': 'CustomCategory',
  176. 'iotId': '7EBPsoxlNmILKsUpKl0L000000',
  177. 'requestId': '987',
  178. 'productKey': 'a18mXM6Cvx8',
  179. 'gmtCreate': 1712126215803,
  180. 'deviceName': '862622050169778',
  181. 'items': {
  182. 'WIFI': {
  183. 'value': {
  184. 'mmac': '04:6b:25:8e:a1:81,-68,',
  185. 'macs': 'f4:2a:7d:ca:c8:c9,-72,|04:6b:25:8e:0a:91,-75,|54:84:dc:37:e8:70,-78,|54:84:dc:37:e8:18,-82,',
  186. 'IDType': 0,
  187. 'IDNumber': ''
  188. },
  189. 'time': 1712126215802
  190. }
  191. }
  192. },
  193. 'DetailData': {
  194. 'time': 1712126215802,
  195. 'value': {
  196. 'mmac': '04:6b:25:8e:a1:81,-68,',
  197. 'macs': 'f4:2a:7d:ca:c8:c9,-72,|04:6b:25:8e:0a:91,-75,|54:84:dc:37:e8:70,-78,|54:84:dc:37:e8:18,-82,',
  198. 'IDType': 0,
  199. 'IDNumber': ''
  200. }
  201. }
  202. }");
  203. Object wifiplus2 = JObject.Parse(@"{
  204. 'MessageId': '1775412188354926080',
  205. 'BusinessId': '861281060094145',
  206. 'ReceiveTime': '2024-04-03 14:37:05',
  207. 'TopicType': 512,
  208. 'MsgType': 514,
  209. 'TopicInfo': {
  210. 'deviceType': 'CustomCategory',
  211. 'iotId': 'yQw7bqBl8gSltErYIRVBg1gs00',
  212. 'requestId': '105',
  213. 'productKey': 'a18mXM6Cvx8',
  214. 'gmtCreate': 1712126225922,
  215. 'deviceName': '861281060094145',
  216. 'items': {
  217. 'WIFIPLUS2_0': {
  218. 'time': 1712124640009,
  219. 'value': [
  220. {
  221. 'bts': '460,00,30977,247196817,-41',
  222. 'smac': '',
  223. 'nearbts': '|460,00,30977,130,-17|460,00,30977,436,-12|460,00,30977,414,-8|460,00,30977,500,-41|460,00,30977,294,-47|460,00,30977,2,-39',
  224. 'imsi': '460077277322956',
  225. 'steps': '0',
  226. 'IDType': 0,
  227. 'network': 'LTE',
  228. 'mmac': '08:10:7b:df:3d:c8,-62,',
  229. 'datetime': '1712124640001',
  230. 'macs': 'ee:b9:70:83:8d:d2,-68,|3c:06:a7:16:a7:21,-68,|3c:06:a7:80:68:1a,-70,|3e:06:a7:a0:68:1a,-70,|08:10:78:91:72:4a,-77,|68:77:24:aa:40:c3,-81,|a8:e5:44:a5:c4:20,-84,',
  231. 'imei': '861281060094145',
  232. 'cdma': 0,
  233. 'IDNumber': ''
  234. }
  235. ]
  236. }
  237. }
  238. },
  239. 'DetailData': {
  240. 'time': 1712124640009,
  241. 'value': [
  242. {
  243. 'Mmac': '08:10:7b:df:3d:c8,-62,',
  244. 'Macs': 'ee:b9:70:83:8d:d2,-68,|3c:06:a7:16:a7:21,-68,|3c:06:a7:80:68:1a,-70,|3e:06:a7:a0:68:1a,-70,|08:10:78:91:72:4a,-77,|68:77:24:aa:40:c3,-81,|a8:e5:44:a5:c4:20,-84,',
  245. 'IdType': 0,
  246. 'IdNumber': '',
  247. 'Imei': '861281060094145',
  248. 'Smac': '',
  249. 'Imsi': '460077277322956',
  250. 'NearBts': '|460,00,30977,130,-17|460,00,30977,436,-12|460,00,30977,414,-8|460,00,30977,500,-41|460,00,30977,294,-47|460,00,30977,2,-39',
  251. 'Bts': '460,00,30977,247196817,-41',
  252. 'Cdma': 0,
  253. 'Network': 'LTE',
  254. 'Steps': '0',
  255. 'Datetime': '1712124640001'
  256. }
  257. ]
  258. }
  259. }");
  260. //Object lbs = JObject.Parse(@');
  261. //Object gps = JObject.Parse(@');
  262. for (int i = 0; i < 1000; i++)
  263. {
  264. if (i % 2 == 1)
  265. {
  266. var serialno = "862622050169778";
  267. var messageId = Guid.NewGuid().ToString();
  268. await _serviceMqProcess.ProcessIotPositionAsync(messageId, serialno, wifi);
  269. }
  270. else
  271. {
  272. var serialno = "861281060094145";
  273. var messageId = Guid.NewGuid().ToString();
  274. await _serviceMqProcess.ProcessIotPositionAsync(messageId, serialno, wifiplus2);
  275. }
  276. }
  277. }
  278. private async Task ProcessMessageAsync(ConsumeResult<Ignore, string> consumeResult, IConsumer<Ignore, string> kafkaConsumer,string concurrencyId)
  279. {
  280. var msg = JsonConvert.DeserializeObject<MessageModel>(consumeResult.Message.Value);
  281. string msgId = msg!.Data["MessageId"]?.ToObject<string>()!;
  282. string bizId = msg!.Data["BusinessId"]?.ToObject<string>()!;
  283. IotTopicType topic = msg!.Data["TopicType"]!.ToObject<IotTopicType>();
  284. int type = msg!.Data["MsgType"]!.ToObject<int>();
  285. object topicInfo = msg!.Data["TopicInfo"]!.ToObject<object>()!;
  286. object detailData = msg!.Data["DetailData"]!.ToObject<object>()!;
  287. var packge = new PackageMsgModel(msgId, bizId, topic, type, topicInfo, detailData);
  288. try
  289. {
  290. using (_logger.BeginScope(new Dictionary<string, object> { ["RequestId"] =$"{concurrencyId}-{msg!.MessageId}" }))
  291. using (new CustomizeStopWatch(IotMessageTypeUtils.TryToDescription(type), _logger))
  292. {
  293. var dict = JsonConvert.DeserializeObject<Dictionary<string, object>>(msg!.Data["TopicInfo"]?["items"]?.ToString()!);
  294. var key = dict?.Keys?.FirstOrDefault() + "";
  295. switch (key)
  296. {
  297. case "GPS": //处理gps位置信息上报
  298. {
  299. //Console.WriteLine("GPS");
  300. var data = JsonConvert.DeserializeObject<PropertyItemModel<GpsInfoModel>>(dict?["GPS"] + "");
  301. //if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]");
  302. _resolverGpsPosition.SetResolveInfo(packge);
  303. await _resolverGpsPosition.ExecuteMessageAsync().ConfigureAwait(false);
  304. kafkaConsumer.Commit(consumeResult);
  305. break;
  306. //return new PackageMsgModel(msg.MessageId, topicInfo.DeviceName, IotTopicType.Property, (int)IotMessagePropertyType.GpsPosition, topicInfo, data);
  307. }
  308. case "WIFI": //处理gps位置信息上报
  309. {
  310. var data = JsonConvert.DeserializeObject<PropertyItemModel<WifiInfoModel>>(dict?["WIFI"] + "");
  311. //if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]");
  312. _resolverWifiPosition.SetResolveInfo(packge);
  313. await _resolverWifiPosition.ExecuteMessageAsync().ConfigureAwait(false);
  314. kafkaConsumer.Commit(consumeResult);
  315. break;
  316. }
  317. case "WIFIPLUS2_0": //处理gps位置信息上报
  318. {
  319. var data = JsonConvert.DeserializeObject<PropertyItemModel<List<EachWifiPlus2Model>>>(dict?["WIFIPLUS2_0"] + "");
  320. //if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]");
  321. _resolverWifiPlus2Position.SetResolveInfo(packge);
  322. await _resolverWifiPlus2Position.ExecuteMessageAsync().ConfigureAwait(false);
  323. kafkaConsumer.Commit(consumeResult);
  324. break;
  325. }
  326. case "LBS": //处理gps位置信息上报
  327. {
  328. var data = JsonConvert.DeserializeObject<PropertyItemModel<LbsInfoModel>>(dict?["LBS"] + "");
  329. //if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]");
  330. //if (data.Value.Cdma == 0)
  331. //{
  332. // return new PackageMsgModel(msg.MessageId, topicInfo.DeviceName, IotTopicType.Property, (int)IotMessagePropertyType.LbsGsmPosition, topicInfo, data);
  333. //}
  334. //else if (data.Value.Cdma == 1)
  335. //{
  336. // return new PackageMsgModel(msg.MessageId, topicInfo.DeviceName, IotTopicType.Property, (int)IotMessagePropertyType.LbsCdmaPosition, topicInfo, data);
  337. //}
  338. _resolverLbsGsmPosition.SetResolveInfo(packge);
  339. await _resolverLbsGsmPosition.ExecuteMessageAsync().ConfigureAwait(false);
  340. kafkaConsumer.Commit(consumeResult);
  341. break;
  342. }
  343. }
  344. }
  345. }
  346. catch (Exception ex)
  347. {
  348. _logger.LogError($"解析定位消息发生异常 messageId: {msg!.MessageId}, Message: {ex.Message}, body: {JsonConvert.SerializeObject(msg)}");
  349. }
  350. await Task.CompletedTask;
  351. }
  352. private async Task ProcessMessageAsync6(ConsumeResult<Ignore, string> consumeResult, IConsumer<Ignore, string> kafkaConsumer)
  353. {
  354. var msg = JsonConvert.DeserializeObject<MessageModel>(consumeResult.Message.Value);
  355. string msgId = msg!.Data["MessageId"]?.ToObject<string>()!;
  356. string bizId = msg!.Data["BusinessId"]?.ToObject<string>()!;
  357. IotTopicType topic = msg!.Data["TopicType"]!.ToObject<IotTopicType>();
  358. int type = msg!.Data["MsgType"]!.ToObject<int>();
  359. object topicInfo = msg!.Data["TopicInfo"]!.ToObject<object>()!;
  360. object detailData = msg!.Data["DetailData"]!.ToObject<object>()!;
  361. var packge = new PackageMsgModel(msgId, bizId, topic, type, topicInfo, detailData);
  362. try
  363. {
  364. using (_logger.BeginScope(new Dictionary<string, object> { ["RequestId"] = msg!.MessageId }))
  365. using (new CustomizeStopWatch(IotMessageTypeUtils.TryToDescription(type), _logger))
  366. {
  367. var dict = JsonConvert.DeserializeObject<Dictionary<string, object>>(msg!.Data["TopicInfo"]?["items"]?.ToString()!);
  368. var key = dict?.Keys?.FirstOrDefault() + "";
  369. switch (key)
  370. {
  371. case "GPS": //处理gps位置信息上报
  372. {
  373. //Console.WriteLine("GPS");
  374. var data = JsonConvert.DeserializeObject<PropertyItemModel<GpsInfoModel>>(dict?["GPS"] + "");
  375. //if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]");
  376. _resolverGpsPosition.SetResolveInfo(packge);
  377. await _resolverGpsPosition.ExecuteMessageAsync().ConfigureAwait(false);
  378. break;
  379. //return new PackageMsgModel(msg.MessageId, topicInfo.DeviceName, IotTopicType.Property, (int)IotMessagePropertyType.GpsPosition, topicInfo, data);
  380. }
  381. case "WIFI": //处理gps位置信息上报
  382. {
  383. var data = JsonConvert.DeserializeObject<PropertyItemModel<WifiInfoModel>>(dict?["WIFI"] + "");
  384. //if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]");
  385. _resolverWifiPosition.SetResolveInfo(packge);
  386. await _resolverWifiPosition.ExecuteMessageAsync().ConfigureAwait(false);
  387. break;
  388. }
  389. case "WIFIPLUS2_0": //处理gps位置信息上报
  390. {
  391. var data = JsonConvert.DeserializeObject<PropertyItemModel<List<EachWifiPlus2Model>>>(dict?["WIFIPLUS2_0"] + "");
  392. //if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]");
  393. _resolverWifiPlus2Position.SetResolveInfo(packge);
  394. await _resolverWifiPlus2Position.ExecuteMessageAsync().ConfigureAwait(false);
  395. break;
  396. }
  397. case "LBS": //处理gps位置信息上报
  398. {
  399. var data = JsonConvert.DeserializeObject<PropertyItemModel<LbsInfoModel>>(dict?["LBS"] + "");
  400. //if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]");
  401. //if (data.Value.Cdma == 0)
  402. //{
  403. // return new PackageMsgModel(msg.MessageId, topicInfo.DeviceName, IotTopicType.Property, (int)IotMessagePropertyType.LbsGsmPosition, topicInfo, data);
  404. //}
  405. //else if (data.Value.Cdma == 1)
  406. //{
  407. // return new PackageMsgModel(msg.MessageId, topicInfo.DeviceName, IotTopicType.Property, (int)IotMessagePropertyType.LbsCdmaPosition, topicInfo, data);
  408. //}
  409. _resolverLbsGsmPosition.SetResolveInfo(packge);
  410. await _resolverLbsGsmPosition.ExecuteMessageAsync().ConfigureAwait(false);
  411. break;
  412. }
  413. }
  414. }
  415. }
  416. catch (Exception ex)
  417. {
  418. _logger.LogError($"解析定位消息发生异常 messageId: {msg!.MessageId}, Message: {ex.Message}, body: {JsonConvert.SerializeObject(msg)}");
  419. }
  420. await Task.CompletedTask;
  421. }
  422. private async Task ProcessMessageAsync5(ConsumeResult<Ignore, string> consumeResult, IConsumer<Ignore, string> kafkaConsumer)
  423. {
  424. var msg = JsonConvert.DeserializeObject<MessageModel>(consumeResult.Message.Value);
  425. try
  426. {
  427. using (_logger.BeginScope(new Dictionary<string, object> { ["RequestId"] = msg!.MessageId }))
  428. //using (new CustomizeStopWatch(nameof(ProcessMessageAsync), _logger))
  429. {
  430. _factoryResolver.Resolver(msg);
  431. }
  432. }
  433. catch (Exception ex)
  434. {
  435. _logger.LogError($"解析Property主题消息发生异常 messageId: {msg!.MessageId}, Message: {ex.Message}, body: {JsonConvert.SerializeObject(msg)}");
  436. }
  437. await Task.CompletedTask;
  438. }
  439. private async Task ProcessMessageAsync4(ConsumeResult<Ignore, string> consumeResult, IConsumer<Ignore, string> kafkaConsumer)
  440. {
  441. var msg = JsonConvert.DeserializeObject<MessageModel>(consumeResult.Message.Value);
  442. try
  443. {
  444. using (_logger.BeginScope(new Dictionary<string, object> { ["RequestId"] = msg!.MessageId }))
  445. //using (new CustomizeStopWatch(nameof(ProcessMessageAsync), _logger))
  446. {
  447. //var positionType = msg!.Data;
  448. string msgId = msg!.Data["MessageId"]?.ToObject<string>()!;
  449. string bizId = msg!.Data["BusinessId"]?.ToObject<string>()!;
  450. IotTopicType topic = msg!.Data["TopicType"]!.ToObject<IotTopicType>();
  451. int type = msg!.Data["MsgType"]!.ToObject<int>();
  452. object topicInfo = msg!.Data["TopicInfo"]!.ToObject<object>()!;
  453. object detailData = msg!.Data["DetailData"]!.ToObject<object>()!;
  454. var packge = new PackageMsgModel(msgId, bizId, topic, type, topicInfo, detailData);
  455. //return new PackageMsgModel(msgId, bizId, topic, type, topicInfo, detailData);
  456. var dict = JsonConvert.DeserializeObject<Dictionary<string, object>>(msg!.Data["TopicInfo"]?["items"]?.ToString()!);
  457. var key = dict?.Keys?.FirstOrDefault() + "";
  458. switch (key)
  459. {
  460. case "GPS": //处理gps位置信息上报
  461. {
  462. Console.WriteLine("GPS");
  463. var data = JsonConvert.DeserializeObject<PropertyItemModel<GpsInfoModel>>(dict?["GPS"] + "");
  464. if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]");
  465. break;
  466. //return new PackageMsgModel(msg.MessageId, topicInfo.DeviceName, IotTopicType.Property, (int)IotMessagePropertyType.GpsPosition, topicInfo, data);
  467. }
  468. case "WIFI": //处理gps位置信息上报
  469. {
  470. var data = JsonConvert.DeserializeObject<PropertyItemModel<WifiInfoModel>>(dict?["WIFI"] + "");
  471. if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]");
  472. Console.WriteLine("WIFI");
  473. break;
  474. }
  475. case "WIFIPLUS2_0": //处理gps位置信息上报
  476. {
  477. var data = JsonConvert.DeserializeObject<PropertyItemModel<List<EachWifiPlus2Model>>>(dict?["WIFIPLUS2_0"] + "");
  478. if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]");
  479. Console.WriteLine("WIFIPLUS2_0");
  480. break;
  481. }
  482. case "LBS": //处理gps位置信息上报
  483. {
  484. var data = JsonConvert.DeserializeObject<PropertyItemModel<LbsInfoModel>>(dict?["LBS"] + "");
  485. if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]");
  486. //if (data.Value.Cdma == 0)
  487. //{
  488. // return new PackageMsgModel(msg.MessageId, topicInfo.DeviceName, IotTopicType.Property, (int)IotMessagePropertyType.LbsGsmPosition, topicInfo, data);
  489. //}
  490. //else if (data.Value.Cdma == 1)
  491. //{
  492. // return new PackageMsgModel(msg.MessageId, topicInfo.DeviceName, IotTopicType.Property, (int)IotMessagePropertyType.LbsCdmaPosition, topicInfo, data);
  493. //}
  494. Console.WriteLine("WIFIPLUS2_0");
  495. break;
  496. }
  497. }
  498. }
  499. }
  500. catch (Exception ex)
  501. {
  502. _logger.LogError($"解析Property主题消息发生异常 messageId: {msg!.MessageId}, Message: {ex.Message}, body: {JsonConvert.SerializeObject(msg)}");
  503. }
  504. await Task.CompletedTask;
  505. }
  506. private async Task ProcessMessageAsync3(ConsumeResult<Ignore, string> consumeResult, IConsumer<Ignore, string> kafkaConsumer)
  507. {
  508. #region
  509. Object data = JObject.Parse(@"{
  510. 'timestamp': 1670231477000,
  511. 'id': '679e9043-14fe-44df-aba6-b2fd62592fea',
  512. 'asset': {
  513. 'id': '679e9043-14fe-44df-aba6-b2fd62592fea',
  514. 'manufacturer': 'xiaomi',
  515. 'imeiMd5': '1be14c6210b3115f',
  516. 'uniqueId': '8f95204d-a60e-40bb-b8be-45a1ab3bef2a'
  517. },
  518. 'location': {
  519. 'Wifis': [
  520. {
  521. 'timestamp': 1515743846504,
  522. 'signalStrength': 40,
  523. 'macAddress': '6a:77:24:27:9c:04',
  524. 'ssid': '',
  525. 'frequency': 2412,
  526. 'channel': 0,
  527. 'connected': true
  528. },
  529. {
  530. 'timestamp': 1515743846504,
  531. 'signalStrength': 60,
  532. 'macAddress': 'ec:f0:fe:1e:e1:a8',
  533. 'ssid': 'Z VENTURES 9F',
  534. 'frequency': 2457,
  535. 'channel': 0
  536. },
  537. {
  538. 'timestamp': 1515743846504,
  539. 'signalStrength': 60,
  540. 'macAddress': '54:a7:03:b5:a5:5e',
  541. 'ssid': 'Z VENTURES 9F',
  542. 'frequency': 2457,
  543. 'channel': 0
  544. },
  545. {
  546. 'timestamp': 1515743846504,
  547. 'signalStrength': 60,
  548. 'macAddress': 'ec:f8:eb:b6:89:81',
  549. 'ssid': 'Z VENTURES 9F',
  550. 'frequency': 2457,
  551. 'channel': 0
  552. },
  553. {
  554. 'timestamp': 1515743846504,
  555. 'signalStrength': 60,
  556. 'macAddress': 'a0:69:d9:c1:6c:8b',
  557. 'ssid': 'Z VENTURES 9F',
  558. 'frequency': 2457,
  559. 'channel': 0
  560. },
  561. {
  562. 'timestamp': 1515743846504,
  563. 'signalStrength': 60,
  564. 'macAddress': '64:64:4a:d5:f5:ca',
  565. 'ssid': 'Z VENTURES 9F',
  566. 'frequency': 2457,
  567. 'channel': 0
  568. },
  569. {
  570. 'timestamp': 1515743846504,
  571. 'signalStrength': 60,
  572. 'macAddress': 'c8:5b:a0:f8:f4:6c',
  573. 'ssid': 'Z VENTURES 9F',
  574. 'frequency': 2457,
  575. 'channel': 0
  576. },
  577. {
  578. 'timestamp': 1515743846504,
  579. 'signalStrength': 60,
  580. 'macAddress': '84:74:60:bf:fd:20',
  581. 'ssid': 'Z VENTURES 9F',
  582. 'frequency': 2457,
  583. 'channel': 0
  584. }
  585. ]
  586. }
  587. }");
  588. // 将对象转换为 JSON 字符串
  589. var jsonData = JsonConvert.SerializeObject(data);
  590. // 创建 StringContent 对象,将 JSON 字符串传递给它
  591. var content = new StringContent(jsonData, Encoding.UTF8, "application/json");
  592. #endregion
  593. var msg = JsonConvert.DeserializeObject<MessageModel>(consumeResult.Message.Value);
  594. using (_logger.BeginScope(new Dictionary<string, object> { ["RequestId"] = msg!.MessageId }))
  595. //using (new CustomizeStopWatch(nameof(ProcessMessageAsync), _logger))
  596. {
  597. using var httpClient = new HttpClient();
  598. using var request = new HttpRequestMessage
  599. {
  600. RequestUri = new Uri($"https://api.newayz.com/location/hub/v1/track_points?access_key=OK73jLa48dU9V3EvNN1RLpdjcVOdIEcn&field_masks=location.position"),
  601. Method = HttpMethod.Post,
  602. // Version = new Version(2, 0)
  603. Content = content
  604. };
  605. using var tokenResponse = await httpClient.SendAsync(request);
  606. tokenResponse.EnsureSuccessStatusCode();
  607. var token = await tokenResponse.Content.ReadAsStringAsync();
  608. //await Task.Delay(0);
  609. //Thread.Sleep(0);
  610. //Console.WriteLine($"{Guid.NewGuid().ToString()}-- {token[^10..]}");
  611. }
  612. }
  613. private async Task ProcessMessageAsync2(ConsumeResult<Ignore, string> consumeResult, IConsumer<Ignore, string> kafkaConsumer)
  614. {
  615. // using (_logger.BeginScope(new Dictionary<string, object> { ["RequestId"] = i }))
  616. /**
  617. using (new CustomizeStopWatch(nameof(ProcessMessageAsync), _logger))
  618. {
  619. //if (i==800)
  620. //{
  621. // Console.WriteLine($"第{i}消息:{consumeResult.Message.Value.Substring(0, 10)}");
  622. //}
  623. //await Task.CompletedTask;
  624. Object data = JObject.Parse(@"{
  625. 'timestamp': 1670231477000,
  626. 'id': '679e9043-14fe-44df-aba6-b2fd62592fea',
  627. 'asset': {
  628. 'id': '679e9043-14fe-44df-aba6-b2fd62592fea',
  629. 'manufacturer': 'xiaomi',
  630. 'imeiMd5': '1be14c6210b3115f',
  631. 'uniqueId': '8f95204d-a60e-40bb-b8be-45a1ab3bef2a'
  632. },
  633. 'location': {
  634. 'Wifis': [
  635. {
  636. 'timestamp': 1515743846504,
  637. 'signalStrength': 40,
  638. 'macAddress': '6a:77:24:27:9c:04',
  639. 'ssid': '',
  640. 'frequency': 2412,
  641. 'channel': 0,
  642. 'connected': true
  643. },
  644. {
  645. 'timestamp': 1515743846504,
  646. 'signalStrength': 60,
  647. 'macAddress': 'ec:f0:fe:1e:e1:a8',
  648. 'ssid': 'Z VENTURES 9F',
  649. 'frequency': 2457,
  650. 'channel': 0
  651. },
  652. {
  653. 'timestamp': 1515743846504,
  654. 'signalStrength': 60,
  655. 'macAddress': '54:a7:03:b5:a5:5e',
  656. 'ssid': 'Z VENTURES 9F',
  657. 'frequency': 2457,
  658. 'channel': 0
  659. },
  660. {
  661. 'timestamp': 1515743846504,
  662. 'signalStrength': 60,
  663. 'macAddress': 'ec:f8:eb:b6:89:81',
  664. 'ssid': 'Z VENTURES 9F',
  665. 'frequency': 2457,
  666. 'channel': 0
  667. },
  668. {
  669. 'timestamp': 1515743846504,
  670. 'signalStrength': 60,
  671. 'macAddress': 'a0:69:d9:c1:6c:8b',
  672. 'ssid': 'Z VENTURES 9F',
  673. 'frequency': 2457,
  674. 'channel': 0
  675. },
  676. {
  677. 'timestamp': 1515743846504,
  678. 'signalStrength': 60,
  679. 'macAddress': '64:64:4a:d5:f5:ca',
  680. 'ssid': 'Z VENTURES 9F',
  681. 'frequency': 2457,
  682. 'channel': 0
  683. },
  684. {
  685. 'timestamp': 1515743846504,
  686. 'signalStrength': 60,
  687. 'macAddress': 'c8:5b:a0:f8:f4:6c',
  688. 'ssid': 'Z VENTURES 9F',
  689. 'frequency': 2457,
  690. 'channel': 0
  691. },
  692. {
  693. 'timestamp': 1515743846504,
  694. 'signalStrength': 60,
  695. 'macAddress': '84:74:60:bf:fd:20',
  696. 'ssid': 'Z VENTURES 9F',
  697. 'frequency': 2457,
  698. 'channel': 0
  699. }
  700. ]
  701. }
  702. }");
  703. // 将对象转换为 JSON 字符串
  704. var jsonData = JsonConvert.SerializeObject(data);
  705. // 创建 StringContent 对象,将 JSON 字符串传递给它
  706. var content = new StringContent(jsonData, Encoding.UTF8, "application/json");
  707. using var httpClient = new HttpClient();
  708. using var request = new HttpRequestMessage
  709. {
  710. RequestUri = new Uri($"https://api.newayz.com/location/hub/v1/track_points?access_key=OK73jLa48dU9V3EvNN1RLpdjcVOdIEcn&field_masks=location.position"),
  711. Method = HttpMethod.Post,
  712. // Version = new Version(2, 0)
  713. Content = content
  714. };
  715. using var tokenResponse = await httpClient.SendAsync(request);
  716. tokenResponse.EnsureSuccessStatusCode();
  717. var token = await tokenResponse.Content.ReadAsStringAsync();
  718. Console.WriteLine($"Successfully authenticated. token {token}");
  719. }
  720. */
  721. Object data = JObject.Parse(@"{
  722. 'timestamp': 1670231477000,
  723. 'id': '679e9043-14fe-44df-aba6-b2fd62592fea',
  724. 'asset': {
  725. 'id': '679e9043-14fe-44df-aba6-b2fd62592fea',
  726. 'manufacturer': 'xiaomi',
  727. 'imeiMd5': '1be14c6210b3115f',
  728. 'uniqueId': '8f95204d-a60e-40bb-b8be-45a1ab3bef2a'
  729. },
  730. 'location': {
  731. 'Wifis': [
  732. {
  733. 'timestamp': 1515743846504,
  734. 'signalStrength': 40,
  735. 'macAddress': '6a:77:24:27:9c:04',
  736. 'ssid': '',
  737. 'frequency': 2412,
  738. 'channel': 0,
  739. 'connected': true
  740. },
  741. {
  742. 'timestamp': 1515743846504,
  743. 'signalStrength': 60,
  744. 'macAddress': 'ec:f0:fe:1e:e1:a8',
  745. 'ssid': 'Z VENTURES 9F',
  746. 'frequency': 2457,
  747. 'channel': 0
  748. },
  749. {
  750. 'timestamp': 1515743846504,
  751. 'signalStrength': 60,
  752. 'macAddress': '54:a7:03:b5:a5:5e',
  753. 'ssid': 'Z VENTURES 9F',
  754. 'frequency': 2457,
  755. 'channel': 0
  756. },
  757. {
  758. 'timestamp': 1515743846504,
  759. 'signalStrength': 60,
  760. 'macAddress': 'ec:f8:eb:b6:89:81',
  761. 'ssid': 'Z VENTURES 9F',
  762. 'frequency': 2457,
  763. 'channel': 0
  764. },
  765. {
  766. 'timestamp': 1515743846504,
  767. 'signalStrength': 60,
  768. 'macAddress': 'a0:69:d9:c1:6c:8b',
  769. 'ssid': 'Z VENTURES 9F',
  770. 'frequency': 2457,
  771. 'channel': 0
  772. },
  773. {
  774. 'timestamp': 1515743846504,
  775. 'signalStrength': 60,
  776. 'macAddress': '64:64:4a:d5:f5:ca',
  777. 'ssid': 'Z VENTURES 9F',
  778. 'frequency': 2457,
  779. 'channel': 0
  780. },
  781. {
  782. 'timestamp': 1515743846504,
  783. 'signalStrength': 60,
  784. 'macAddress': 'c8:5b:a0:f8:f4:6c',
  785. 'ssid': 'Z VENTURES 9F',
  786. 'frequency': 2457,
  787. 'channel': 0
  788. },
  789. {
  790. 'timestamp': 1515743846504,
  791. 'signalStrength': 60,
  792. 'macAddress': '84:74:60:bf:fd:20',
  793. 'ssid': 'Z VENTURES 9F',
  794. 'frequency': 2457,
  795. 'channel': 0
  796. }
  797. ]
  798. }
  799. }");
  800. // 将对象转换为 JSON 字符串
  801. var jsonData = JsonConvert.SerializeObject(data);
  802. // 创建 StringContent 对象,将 JSON 字符串传递给它
  803. var content = new StringContent(jsonData, Encoding.UTF8, "application/json");
  804. using var httpClient = new HttpClient();
  805. using var request = new HttpRequestMessage
  806. {
  807. RequestUri = new Uri($"https://api.newayz.com/location/hub/v1/track_points?access_key=OK73jLa48dU9V3EvNN1RLpdjcVOdIEcn&field_masks=location.position"),
  808. Method = HttpMethod.Post,
  809. // Version = new Version(2, 0)
  810. Content = content
  811. };
  812. using var tokenResponse = await httpClient.SendAsync(request);
  813. tokenResponse.EnsureSuccessStatusCode();
  814. var token = await tokenResponse.Content.ReadAsStringAsync();
  815. Console.WriteLine($"{Guid.NewGuid().ToString()}-- {token[^10..]}");
  816. }
  817. private async Task ProcessMessageAsync(ConsumeResult<Ignore, string> consumeResult, IConsumer<Ignore, string> kafkaConsumer,int i)
  818. {
  819. using (_logger.BeginScope(new Dictionary<string, object> { ["RequestId"] = i }))
  820. using (new CustomizeStopWatch(nameof(ProcessMessageAsync), _logger))
  821. {
  822. //if (i==800)
  823. //{
  824. // Console.WriteLine($"第{i}消息:{consumeResult.Message.Value.Substring(0, 10)}");
  825. //}
  826. //await Task.CompletedTask;
  827. Object data = JObject.Parse(@"{
  828. 'timestamp': 1670231477000,
  829. 'id': '679e9043-14fe-44df-aba6-b2fd62592fea',
  830. 'asset': {
  831. 'id': '679e9043-14fe-44df-aba6-b2fd62592fea',
  832. 'manufacturer': 'xiaomi',
  833. 'imeiMd5': '1be14c6210b3115f',
  834. 'uniqueId': '8f95204d-a60e-40bb-b8be-45a1ab3bef2a'
  835. },
  836. 'location': {
  837. 'Wifis': [
  838. {
  839. 'timestamp': 1515743846504,
  840. 'signalStrength': 40,
  841. 'macAddress': '6a:77:24:27:9c:04',
  842. 'ssid': '',
  843. 'frequency': 2412,
  844. 'channel': 0,
  845. 'connected': true
  846. },
  847. {
  848. 'timestamp': 1515743846504,
  849. 'signalStrength': 60,
  850. 'macAddress': 'ec:f0:fe:1e:e1:a8',
  851. 'ssid': 'Z VENTURES 9F',
  852. 'frequency': 2457,
  853. 'channel': 0
  854. },
  855. {
  856. 'timestamp': 1515743846504,
  857. 'signalStrength': 60,
  858. 'macAddress': '54:a7:03:b5:a5:5e',
  859. 'ssid': 'Z VENTURES 9F',
  860. 'frequency': 2457,
  861. 'channel': 0
  862. },
  863. {
  864. 'timestamp': 1515743846504,
  865. 'signalStrength': 60,
  866. 'macAddress': 'ec:f8:eb:b6:89:81',
  867. 'ssid': 'Z VENTURES 9F',
  868. 'frequency': 2457,
  869. 'channel': 0
  870. },
  871. {
  872. 'timestamp': 1515743846504,
  873. 'signalStrength': 60,
  874. 'macAddress': 'a0:69:d9:c1:6c:8b',
  875. 'ssid': 'Z VENTURES 9F',
  876. 'frequency': 2457,
  877. 'channel': 0
  878. },
  879. {
  880. 'timestamp': 1515743846504,
  881. 'signalStrength': 60,
  882. 'macAddress': '64:64:4a:d5:f5:ca',
  883. 'ssid': 'Z VENTURES 9F',
  884. 'frequency': 2457,
  885. 'channel': 0
  886. },
  887. {
  888. 'timestamp': 1515743846504,
  889. 'signalStrength': 60,
  890. 'macAddress': 'c8:5b:a0:f8:f4:6c',
  891. 'ssid': 'Z VENTURES 9F',
  892. 'frequency': 2457,
  893. 'channel': 0
  894. },
  895. {
  896. 'timestamp': 1515743846504,
  897. 'signalStrength': 60,
  898. 'macAddress': '84:74:60:bf:fd:20',
  899. 'ssid': 'Z VENTURES 9F',
  900. 'frequency': 2457,
  901. 'channel': 0
  902. }
  903. ]
  904. }
  905. }");
  906. // 将对象转换为 JSON 字符串
  907. var jsonData = JsonConvert.SerializeObject(data);
  908. // 创建 StringContent 对象,将 JSON 字符串传递给它
  909. var content = new StringContent(jsonData, Encoding.UTF8, "application/json");
  910. using var httpClient = new HttpClient();
  911. using var request = new HttpRequestMessage
  912. {
  913. RequestUri = new Uri($"https://api.newayz.com/location/hub/v1/track_points?access_key=OK73jLa48dU9V3EvNN1RLpdjcVOdIEcn&field_masks=location.position"),
  914. Method = HttpMethod.Post,
  915. // Version = new Version(2, 0)
  916. Content= content
  917. };
  918. using var tokenResponse = await httpClient.SendAsync(request);
  919. tokenResponse.EnsureSuccessStatusCode();
  920. var token = await tokenResponse.Content.ReadAsStringAsync();
  921. Console.WriteLine($"Successfully authenticated. token {token}");
  922. }
  923. }
  924. private IConsumer<Ignore, string> CreateKafkaConsumer()
  925. {
  926. //var partitionIndex = 0;
  927. var collection = new List<int>();
  928. // 获取注册的kafka消费者
  929. var consumerConfig = new ConsumerConfig
  930. {
  931. GroupId = "iot.position",
  932. BootstrapServers = _configService.MqServerAddress,
  933. AutoOffsetReset = AutoOffsetReset.Earliest,
  934. EnableAutoCommit = false, // 关闭自动提交偏移量
  935. CancellationDelayMaxMs = 1//set CancellationDelayMaxMs
  936. };
  937. return new ConsumerBuilder<Ignore, string>(consumerConfig)
  938. .SetErrorHandler((_, e) =>
  939. {
  940. Console.WriteLine($"消费者创建出错,代码:{e.Code} |原因: {e.Reason}");
  941. _logger.LogInformation($"消费者创建出错,代码:{e.Code} |原因: {e.Reason}");
  942. })
  943. .SetPartitionsAssignedHandler((c, partitions) =>
  944. {
  945. //// 在这里手动指定要消费的分区
  946. //var partitionsToConsume = new List<TopicPartitionOffset>
  947. //{
  948. // new TopicPartitionOffset("topics.storage.test_env_db", partitionIndex, Offset.Unset)
  949. //};
  950. ////c.Assign(partitionsToConsume);
  951. //Console.WriteLine($"Assigned partitions: {string.Join(", ", partitionsToConsume)}");
  952. //return partitionsToConsume;
  953. })
  954. .SetPartitionsRevokedHandler((c, partitions) =>
  955. {
  956. })
  957. .Build();
  958. }
  959. }
  960. }