Nie możesz wybrać więcej, niż 25 tematów Tematy muszą się zaczynać od litery lub cyfry, mogą zawierać myślniki ('-') i mogą mieć do 35 znaków.

10 miesięcy temu
10 miesięcy temu
10 miesięcy temu
10 miesięcy temu
10 miesięcy temu
10 miesięcy temu
10 miesięcy temu
10 miesięcy temu
10 miesięcy temu
10 miesięcy temu
10 miesięcy temu
10 miesięcy temu
10 miesięcy temu
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033
  1. using Confluent.Kafka;
  2. using GpsCardGatewayPosition.Common;
  3. using GpsCardGatewayPosition.Model.Config;
  4. using GpsCardGatewayPosition.Model.Enum;
  5. using GpsCardGatewayPosition.Model.IoT;
  6. using GpsCardGatewayPosition.Service.Cache;
  7. using GpsCardGatewayPosition.Service.MqProducer;
  8. using GpsCardGatewayPosition.Service.MqProducer.Model;
  9. using GpsCardGatewayPosition.Service.Resolver.Factory;
  10. using GpsCardGatewayPosition.Service.Resolver.Property;
  11. using GpsCardGatewayPosition.Service.Resolver.Property.Dto;
  12. using Microsoft.Extensions.Hosting;
  13. using Microsoft.Extensions.Logging;
  14. using Microsoft.Extensions.Options;
  15. using Newtonsoft.Json;
  16. using Newtonsoft.Json.Linq;
  17. using System;
  18. using System.Collections.Generic;
  19. using System.Linq;
  20. using System.Net;
  21. using System.Reflection.Metadata;
  22. using System.Text;
  23. using System.Threading.Tasks;
  24. using System.Web;
  25. using static Confluent.Kafka.ConfigPropertyNames;
  26. namespace GpsCardGatewayPosition.Postion
  27. {
  28. public class Worker : BackgroundService
  29. {
  30. private readonly ILogger<Worker> _logger;
  31. private CancellationTokenSource _tokenSource = default!;
  32. private readonly MqProcessLogic _serviceMqProcess;
  33. private readonly ServiceConfig _configService;
  34. private readonly AppsettingsConfig _configAppsettings;
  35. private readonly ResolverFactory _factoryResolver;
  36. private readonly WifiPositionResolver _resolverWifiPosition;
  37. private readonly WifiPlus2PositionResolver _resolverWifiPlus2Position;
  38. private readonly GpsPositionResolver _resolverGpsPosition;
  39. private readonly LbsGsmPositionResolver _resolverLbsGsmPosition;
  40. public Worker(ILogger<Worker> logger,IOptions<ServiceConfig> _optConfigService, IOptions<RedisConfig> optConfigRedis, IOptions<AppsettingsConfig> optConfigAppsettings, ResolverFactory resolverFactory, MqProcessLogic serviceMqProcess,
  41. WifiPositionResolver wifiPositionResolver, WifiPlus2PositionResolver wifiPlus2PositionResolver, GpsPositionResolver gpsPositionResolver,LbsGsmPositionResolver lbsGsmPositionResolver)
  42. {
  43. _configService = _optConfigService.Value;
  44. _configAppsettings = optConfigAppsettings.Value;
  45. _serviceMqProcess = serviceMqProcess;
  46. _logger = logger;
  47. _factoryResolver = resolverFactory;
  48. _resolverWifiPosition = wifiPositionResolver;
  49. _resolverWifiPlus2Position = wifiPlus2PositionResolver;
  50. _resolverGpsPosition = gpsPositionResolver;
  51. _resolverLbsGsmPosition= lbsGsmPositionResolver;
  52. //配置全局服务
  53. #region 其他业务redis服务器
  54. var csredis = new CSRedis.CSRedisClient(optConfigRedis.Value.ToString());
  55. RedisHelper.Initialization(csredis);
  56. RedisConfig csredisDb7Con = optConfigRedis.Value;
  57. csredisDb7Con.DefaultDatabase = 7;
  58. csredisDb7Con.Prefix = "TELPO";
  59. var csredisDb7 = new CSRedis.CSRedisClient(csredisDb7Con.ToString());
  60. RedisHelperDb7.Initialization(csredisDb7);
  61. //
  62. RedisConfig csredisDb0Con = optConfigRedis.Value;
  63. csredisDb0Con.DefaultDatabase = 0;
  64. csredisDb0Con.Prefix = "";
  65. var csredisDb0 = new CSRedis.CSRedisClient(csredisDb0Con.ToString());
  66. RedisHelperDb0.Initialization(csredisDb0);
  67. #endregion
  68. #region 共享维智定位缓存
  69. // 维智定位缓存Redis Server
  70. // 172.19.42.56,47.100.200.105
  71. RedisConfig csredisWayzCon = optConfigRedis.Value;
  72. csredisWayzCon.DefaultDatabase = 10;
  73. csredisDb7Con.Prefix = "_GW_";
  74. #if DEBUG
  75. csredisWayzCon.Server = "192.168.12.127:8090";
  76. #else
  77. csredisWayzCon.Server = "172.19.42.56:8090";
  78. #endif
  79. var csredisWayz = new CSRedis.CSRedisClient(csredisWayzCon.ToString());
  80. RedisHelperWayz.Initialization(csredisWayz);
  81. #endregion
  82. }
  83. public override Task StartAsync(CancellationToken cancellationToken)
  84. {
  85. _logger.LogInformation("------StartAsync");
  86. _tokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
  87. // 创建消费者
  88. return base.StartAsync(cancellationToken);
  89. }
  90. public override Task StopAsync(CancellationToken cancellationToken)
  91. {
  92. _logger.LogInformation("------StopAsync");
  93. _tokenSource.Cancel();
  94. return base.StopAsync(cancellationToken);
  95. }
  96. protected override async Task ExecuteAsync(CancellationToken stoppingToken)
  97. {
  98. var kafkaConsumer = CreateKafkaConsumer();
  99. kafkaConsumer.Subscribe("topics.storage.iot.postion");
  100. var tasks = new List<Task>();
  101. //int i = 0;
  102. try
  103. {
  104. while (!stoppingToken.IsCancellationRequested)
  105. {
  106. #region 生产
  107. // await productPostionAsync();
  108. #endregion
  109. #region 消费
  110. // 并发处理 1000 个消息
  111. var concurrencyId = Guid.NewGuid().ToString("N")[^4..];
  112. using (_logger.BeginScope(new Dictionary<string, object> { ["RequestId"] = concurrencyId }))
  113. using (new CustomizeStopWatch(nameof(Worker), _logger))
  114. {
  115. //for (int i = 0; i < 2000; i++)
  116. //{
  117. // //var consumeResult = kafkaConsumer.Consume(stoppingToken);
  118. // var consumeResult = await Task.Run(() => kafkaConsumer.Consume(stoppingToken));
  119. // if (consumeResult != null)
  120. // {
  121. // tasks.Add(ProcessMessageAsync(consumeResult, kafkaConsumer));
  122. // }
  123. //}
  124. ////var consumeResult = kafkaConsumer.Consume(stoppingToken);
  125. ////if (consumeResult != null)
  126. ////{
  127. //// tasks.Add(ProcessMessageAsync(consumeResult, kafkaConsumer));
  128. ////}
  129. //// 并行等待所有任务完成
  130. //await Task.WhenAll(tasks);
  131. //tasks.Clear(); // 清空任务列表,以便下一个循环重新填充
  132. //Console.WriteLine("----------------------------------------------------------");
  133. var consumeResult = await Task.Run(() => kafkaConsumer.Consume(stoppingToken));
  134. if (consumeResult != null)
  135. {
  136. tasks.Add(ProcessMessageAsync(consumeResult, kafkaConsumer, concurrencyId));
  137. }
  138. if (tasks.Count>=2000)
  139. {
  140. // 并行等待所有任务完成
  141. await Task.WhenAll(tasks);
  142. tasks.Clear(); // 清空任务列表,以便下一个循环重新填充
  143. Console.WriteLine("----------------------------------------------------------");
  144. }
  145. }
  146. //var consumeResult = kafkaConsumer.Consume(stoppingToken);
  147. //if (consumeResult != null)
  148. //{
  149. // i++;
  150. // tasks.Add(ProcessMessageAsync(consumeResult, kafkaConsumer, i));
  151. //}
  152. //await Task.WhenAll(tasks);
  153. #endregion
  154. }
  155. }
  156. catch (OperationCanceledException)
  157. {
  158. _logger.LogWarning("Worker exit");
  159. }
  160. catch (Exception ex)
  161. {
  162. _logger.LogError($"An error occurred: {ex.Message}");
  163. }
  164. // await Task.WhenAll(tasks);
  165. }
  166. private async Task ProductPostionAsync()
  167. {
  168. Object wifi = JObject.Parse(@"{
  169. 'MessageId': '1775412145908567552',
  170. 'BusinessId': '862622050169778',
  171. 'ReceiveTime': '2024-04-03 14:36:55',
  172. 'TopicType': 512,
  173. 'MsgType': 517,
  174. 'TopicInfo': {
  175. 'deviceType': 'CustomCategory',
  176. 'iotId': '7EBPsoxlNmILKsUpKl0L000000',
  177. 'requestId': '987',
  178. 'productKey': 'a18mXM6Cvx8',
  179. 'gmtCreate': 1712126215803,
  180. 'deviceName': '862622050169778',
  181. 'items': {
  182. 'WIFI': {
  183. 'value': {
  184. 'mmac': '04:6b:25:8e:a1:81,-68,',
  185. 'macs': 'f4:2a:7d:ca:c8:c9,-72,|04:6b:25:8e:0a:91,-75,|54:84:dc:37:e8:70,-78,|54:84:dc:37:e8:18,-82,',
  186. 'IDType': 0,
  187. 'IDNumber': ''
  188. },
  189. 'time': 1712126215802
  190. }
  191. }
  192. },
  193. 'DetailData': {
  194. 'time': 1712126215802,
  195. 'value': {
  196. 'mmac': '04:6b:25:8e:a1:81,-68,',
  197. 'macs': 'f4:2a:7d:ca:c8:c9,-72,|04:6b:25:8e:0a:91,-75,|54:84:dc:37:e8:70,-78,|54:84:dc:37:e8:18,-82,',
  198. 'IDType': 0,
  199. 'IDNumber': ''
  200. }
  201. }
  202. }");
  203. Object wifiplus2 = JObject.Parse(@"{
  204. 'MessageId': '1775412188354926080',
  205. 'BusinessId': '861281060094145',
  206. 'ReceiveTime': '2024-04-03 14:37:05',
  207. 'TopicType': 512,
  208. 'MsgType': 514,
  209. 'TopicInfo': {
  210. 'deviceType': 'CustomCategory',
  211. 'iotId': 'yQw7bqBl8gSltErYIRVBg1gs00',
  212. 'requestId': '105',
  213. 'productKey': 'a18mXM6Cvx8',
  214. 'gmtCreate': 1712126225922,
  215. 'deviceName': '861281060094145',
  216. 'items': {
  217. 'WIFIPLUS2_0': {
  218. 'time': 1712124640009,
  219. 'value': [
  220. {
  221. 'bts': '460,00,30977,247196817,-41',
  222. 'smac': '',
  223. 'nearbts': '|460,00,30977,130,-17|460,00,30977,436,-12|460,00,30977,414,-8|460,00,30977,500,-41|460,00,30977,294,-47|460,00,30977,2,-39',
  224. 'imsi': '460077277322956',
  225. 'steps': '0',
  226. 'IDType': 0,
  227. 'network': 'LTE',
  228. 'mmac': '08:10:7b:df:3d:c8,-62,',
  229. 'datetime': '1712124640001',
  230. 'macs': 'ee:b9:70:83:8d:d2,-68,|3c:06:a7:16:a7:21,-68,|3c:06:a7:80:68:1a,-70,|3e:06:a7:a0:68:1a,-70,|08:10:78:91:72:4a,-77,|68:77:24:aa:40:c3,-81,|a8:e5:44:a5:c4:20,-84,',
  231. 'imei': '861281060094145',
  232. 'cdma': 0,
  233. 'IDNumber': ''
  234. }
  235. ]
  236. }
  237. }
  238. },
  239. 'DetailData': {
  240. 'time': 1712124640009,
  241. 'value': [
  242. {
  243. 'Mmac': '08:10:7b:df:3d:c8,-62,',
  244. 'Macs': 'ee:b9:70:83:8d:d2,-68,|3c:06:a7:16:a7:21,-68,|3c:06:a7:80:68:1a,-70,|3e:06:a7:a0:68:1a,-70,|08:10:78:91:72:4a,-77,|68:77:24:aa:40:c3,-81,|a8:e5:44:a5:c4:20,-84,',
  245. 'IdType': 0,
  246. 'IdNumber': '',
  247. 'Imei': '861281060094145',
  248. 'Smac': '',
  249. 'Imsi': '460077277322956',
  250. 'NearBts': '|460,00,30977,130,-17|460,00,30977,436,-12|460,00,30977,414,-8|460,00,30977,500,-41|460,00,30977,294,-47|460,00,30977,2,-39',
  251. 'Bts': '460,00,30977,247196817,-41',
  252. 'Cdma': 0,
  253. 'Network': 'LTE',
  254. 'Steps': '0',
  255. 'Datetime': '1712124640001'
  256. }
  257. ]
  258. }
  259. }");
  260. //Object lbs = JObject.Parse(@');
  261. //Object gps = JObject.Parse(@');
  262. for (int i = 0; i < 1000; i++)
  263. {
  264. if (i % 2 == 1)
  265. {
  266. var serialno = "862622050169778";
  267. var messageId = Guid.NewGuid().ToString();
  268. await _serviceMqProcess.ProcessIotPositionAsync(messageId, serialno, wifi);
  269. }
  270. else
  271. {
  272. var serialno = "861281060094145";
  273. var messageId = Guid.NewGuid().ToString();
  274. await _serviceMqProcess.ProcessIotPositionAsync(messageId, serialno, wifiplus2);
  275. }
  276. }
  277. }
  278. private async Task ProcessMessageAsync(ConsumeResult<Ignore, string> consumeResult, IConsumer<Ignore, string> kafkaConsumer,string concurrencyId)
  279. {
  280. var msg = JsonConvert.DeserializeObject<MessageModel>(consumeResult.Message.Value);
  281. string msgId = msg!.Data["MessageId"]?.ToObject<string>()!;
  282. string bizId = msg!.Data["BusinessId"]?.ToObject<string>()!;
  283. IotTopicType topic = msg!.Data["TopicType"]!.ToObject<IotTopicType>();
  284. int type = msg!.Data["MsgType"]!.ToObject<int>();
  285. object topicInfo = msg!.Data["TopicInfo"]!.ToObject<object>()!;
  286. object detailData = msg!.Data["DetailData"]!.ToObject<object>()!;
  287. var packge = new PackageMsgModel(msgId, bizId, topic, type, topicInfo, detailData);
  288. try
  289. {
  290. using (_logger.BeginScope(new Dictionary<string, object> { ["RequestId"] =$"{concurrencyId}-{msg!.MessageId}" }))
  291. using (new CustomizeStopWatch(IotMessageTypeUtils.TryToDescription(type), _logger))
  292. {
  293. var dict = JsonConvert.DeserializeObject<Dictionary<string, object>>(msg!.Data["TopicInfo"]?["items"]?.ToString()!);
  294. var key = dict?.Keys?.FirstOrDefault() + "";
  295. switch (key)
  296. {
  297. case "GPS": //处理gps位置信息上报
  298. {
  299. //Console.WriteLine("GPS");
  300. var data = JsonConvert.DeserializeObject<PropertyItemModel<GpsInfoModel>>(dict?["GPS"] + "");
  301. //if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]");
  302. _resolverGpsPosition.SetResolveInfo(packge);
  303. await _resolverGpsPosition.ExecuteMessageAsync().ConfigureAwait(false);
  304. kafkaConsumer.Commit(consumeResult);
  305. break;
  306. //return new PackageMsgModel(msg.MessageId, topicInfo.DeviceName, IotTopicType.Property, (int)IotMessagePropertyType.GpsPosition, topicInfo, data);
  307. }
  308. case "WIFI": //处理gps位置信息上报
  309. {
  310. var data = JsonConvert.DeserializeObject<PropertyItemModel<WifiInfoModel>>(dict?["WIFI"] + "");
  311. //if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]");
  312. _resolverWifiPosition.SetResolveInfo(packge);
  313. await _resolverWifiPosition.ExecuteMessageAsync().ConfigureAwait(false);
  314. kafkaConsumer.Commit(consumeResult);
  315. break;
  316. }
  317. case "WIFIPLUS2_0": //处理gps位置信息上报
  318. {
  319. var data = JsonConvert.DeserializeObject<PropertyItemModel<List<EachWifiPlus2Model>>>(dict?["WIFIPLUS2_0"] + "");
  320. //if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]");
  321. _resolverWifiPlus2Position.SetResolveInfo(packge);
  322. await _resolverWifiPlus2Position.ExecuteMessageAsync().ConfigureAwait(false);
  323. kafkaConsumer.Commit(consumeResult);
  324. break;
  325. }
  326. case "LBS": //处理gps位置信息上报
  327. {
  328. var data = JsonConvert.DeserializeObject<PropertyItemModel<LbsInfoModel>>(dict?["LBS"] + "");
  329. //if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]");
  330. //if (data.Value.Cdma == 0)
  331. //{
  332. // return new PackageMsgModel(msg.MessageId, topicInfo.DeviceName, IotTopicType.Property, (int)IotMessagePropertyType.LbsGsmPosition, topicInfo, data);
  333. //}
  334. //else if (data.Value.Cdma == 1)
  335. //{
  336. // return new PackageMsgModel(msg.MessageId, topicInfo.DeviceName, IotTopicType.Property, (int)IotMessagePropertyType.LbsCdmaPosition, topicInfo, data);
  337. //}
  338. _resolverLbsGsmPosition.SetResolveInfo(packge);
  339. await _resolverLbsGsmPosition.ExecuteMessageAsync().ConfigureAwait(false);
  340. kafkaConsumer.Commit(consumeResult);
  341. break;
  342. }
  343. }
  344. }
  345. }
  346. catch (Exception ex)
  347. {
  348. _logger.LogError($"解析定位消息发生异常 messageId: {msg!.MessageId}, Message: {ex.Message}, body: {JsonConvert.SerializeObject(msg)}");
  349. }
  350. await Task.CompletedTask;
  351. }
  352. private async Task ProcessMessageAsync6(ConsumeResult<Ignore, string> consumeResult, IConsumer<Ignore, string> kafkaConsumer)
  353. {
  354. var msg = JsonConvert.DeserializeObject<MessageModel>(consumeResult.Message.Value);
  355. string msgId = msg!.Data["MessageId"]?.ToObject<string>()!;
  356. string bizId = msg!.Data["BusinessId"]?.ToObject<string>()!;
  357. IotTopicType topic = msg!.Data["TopicType"]!.ToObject<IotTopicType>();
  358. int type = msg!.Data["MsgType"]!.ToObject<int>();
  359. object topicInfo = msg!.Data["TopicInfo"]!.ToObject<object>()!;
  360. object detailData = msg!.Data["DetailData"]!.ToObject<object>()!;
  361. var packge = new PackageMsgModel(msgId, bizId, topic, type, topicInfo, detailData);
  362. try
  363. {
  364. using (_logger.BeginScope(new Dictionary<string, object> { ["RequestId"] = msg!.MessageId }))
  365. using (new CustomizeStopWatch(IotMessageTypeUtils.TryToDescription(type), _logger))
  366. {
  367. var dict = JsonConvert.DeserializeObject<Dictionary<string, object>>(msg!.Data["TopicInfo"]?["items"]?.ToString()!);
  368. var key = dict?.Keys?.FirstOrDefault() + "";
  369. switch (key)
  370. {
  371. case "GPS": //处理gps位置信息上报
  372. {
  373. //Console.WriteLine("GPS");
  374. var data = JsonConvert.DeserializeObject<PropertyItemModel<GpsInfoModel>>(dict?["GPS"] + "");
  375. //if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]");
  376. _resolverGpsPosition.SetResolveInfo(packge);
  377. await _resolverGpsPosition.ExecuteMessageAsync().ConfigureAwait(false);
  378. break;
  379. //return new PackageMsgModel(msg.MessageId, topicInfo.DeviceName, IotTopicType.Property, (int)IotMessagePropertyType.GpsPosition, topicInfo, data);
  380. }
  381. case "WIFI": //处理gps位置信息上报
  382. {
  383. var data = JsonConvert.DeserializeObject<PropertyItemModel<WifiInfoModel>>(dict?["WIFI"] + "");
  384. //if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]");
  385. _resolverWifiPosition.SetResolveInfo(packge);
  386. await _resolverWifiPosition.ExecuteMessageAsync().ConfigureAwait(false);
  387. break;
  388. }
  389. case "WIFIPLUS2_0": //处理gps位置信息上报
  390. {
  391. var data = JsonConvert.DeserializeObject<PropertyItemModel<List<EachWifiPlus2Model>>>(dict?["WIFIPLUS2_0"] + "");
  392. //if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]");
  393. _resolverWifiPlus2Position.SetResolveInfo(packge);
  394. await _resolverWifiPlus2Position.ExecuteMessageAsync().ConfigureAwait(false);
  395. break;
  396. }
  397. case "LBS": //处理gps位置信息上报
  398. {
  399. var data = JsonConvert.DeserializeObject<PropertyItemModel<LbsInfoModel>>(dict?["LBS"] + "");
  400. //if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]");
  401. //if (data.Value.Cdma == 0)
  402. //{
  403. // return new PackageMsgModel(msg.MessageId, topicInfo.DeviceName, IotTopicType.Property, (int)IotMessagePropertyType.LbsGsmPosition, topicInfo, data);
  404. //}
  405. //else if (data.Value.Cdma == 1)
  406. //{
  407. // return new PackageMsgModel(msg.MessageId, topicInfo.DeviceName, IotTopicType.Property, (int)IotMessagePropertyType.LbsCdmaPosition, topicInfo, data);
  408. //}
  409. _resolverLbsGsmPosition.SetResolveInfo(packge);
  410. await _resolverLbsGsmPosition.ExecuteMessageAsync().ConfigureAwait(false);
  411. break;
  412. }
  413. }
  414. }
  415. }
  416. catch (Exception ex)
  417. {
  418. _logger.LogError($"解析定位消息发生异常 messageId: {msg!.MessageId}, Message: {ex.Message}, body: {JsonConvert.SerializeObject(msg)}");
  419. }
  420. await Task.CompletedTask;
  421. }
  422. private async Task ProcessMessageAsync5(ConsumeResult<Ignore, string> consumeResult, IConsumer<Ignore, string> kafkaConsumer)
  423. {
  424. var msg = JsonConvert.DeserializeObject<MessageModel>(consumeResult.Message.Value);
  425. try
  426. {
  427. using (_logger.BeginScope(new Dictionary<string, object> { ["RequestId"] = msg!.MessageId }))
  428. //using (new CustomizeStopWatch(nameof(ProcessMessageAsync), _logger))
  429. {
  430. _factoryResolver.Resolver(msg);
  431. }
  432. }
  433. catch (Exception ex)
  434. {
  435. _logger.LogError($"解析Property主题消息发生异常 messageId: {msg!.MessageId}, Message: {ex.Message}, body: {JsonConvert.SerializeObject(msg)}");
  436. }
  437. await Task.CompletedTask;
  438. }
  439. private async Task ProcessMessageAsync4(ConsumeResult<Ignore, string> consumeResult, IConsumer<Ignore, string> kafkaConsumer)
  440. {
  441. var msg = JsonConvert.DeserializeObject<MessageModel>(consumeResult.Message.Value);
  442. try
  443. {
  444. using (_logger.BeginScope(new Dictionary<string, object> { ["RequestId"] = msg!.MessageId }))
  445. //using (new CustomizeStopWatch(nameof(ProcessMessageAsync), _logger))
  446. {
  447. //var positionType = msg!.Data;
  448. string msgId = msg!.Data["MessageId"]?.ToObject<string>()!;
  449. string bizId = msg!.Data["BusinessId"]?.ToObject<string>()!;
  450. IotTopicType topic = msg!.Data["TopicType"]!.ToObject<IotTopicType>();
  451. int type = msg!.Data["MsgType"]!.ToObject<int>();
  452. object topicInfo = msg!.Data["TopicInfo"]!.ToObject<object>()!;
  453. object detailData = msg!.Data["DetailData"]!.ToObject<object>()!;
  454. var packge = new PackageMsgModel(msgId, bizId, topic, type, topicInfo, detailData);
  455. //return new PackageMsgModel(msgId, bizId, topic, type, topicInfo, detailData);
  456. var dict = JsonConvert.DeserializeObject<Dictionary<string, object>>(msg!.Data["TopicInfo"]?["items"]?.ToString()!);
  457. var key = dict?.Keys?.FirstOrDefault() + "";
  458. switch (key)
  459. {
  460. case "GPS": //处理gps位置信息上报
  461. {
  462. Console.WriteLine("GPS");
  463. var data = JsonConvert.DeserializeObject<PropertyItemModel<GpsInfoModel>>(dict?["GPS"] + "");
  464. if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]");
  465. break;
  466. //return new PackageMsgModel(msg.MessageId, topicInfo.DeviceName, IotTopicType.Property, (int)IotMessagePropertyType.GpsPosition, topicInfo, data);
  467. }
  468. case "WIFI": //处理gps位置信息上报
  469. {
  470. var data = JsonConvert.DeserializeObject<PropertyItemModel<WifiInfoModel>>(dict?["WIFI"] + "");
  471. if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]");
  472. Console.WriteLine("WIFI");
  473. break;
  474. }
  475. case "WIFIPLUS2_0": //处理gps位置信息上报
  476. {
  477. var data = JsonConvert.DeserializeObject<PropertyItemModel<List<EachWifiPlus2Model>>>(dict?["WIFIPLUS2_0"] + "");
  478. if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]");
  479. Console.WriteLine("WIFIPLUS2_0");
  480. break;
  481. }
  482. case "LBS": //处理gps位置信息上报
  483. {
  484. var data = JsonConvert.DeserializeObject<PropertyItemModel<LbsInfoModel>>(dict?["LBS"] + "");
  485. if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]");
  486. //if (data.Value.Cdma == 0)
  487. //{
  488. // return new PackageMsgModel(msg.MessageId, topicInfo.DeviceName, IotTopicType.Property, (int)IotMessagePropertyType.LbsGsmPosition, topicInfo, data);
  489. //}
  490. //else if (data.Value.Cdma == 1)
  491. //{
  492. // return new PackageMsgModel(msg.MessageId, topicInfo.DeviceName, IotTopicType.Property, (int)IotMessagePropertyType.LbsCdmaPosition, topicInfo, data);
  493. //}
  494. Console.WriteLine("WIFIPLUS2_0");
  495. break;
  496. }
  497. }
  498. }
  499. }
  500. catch (Exception ex)
  501. {
  502. _logger.LogError($"解析Property主题消息发生异常 messageId: {msg!.MessageId}, Message: {ex.Message}, body: {JsonConvert.SerializeObject(msg)}");
  503. }
  504. await Task.CompletedTask;
  505. }
  506. private async Task ProcessMessageAsync3(ConsumeResult<Ignore, string> consumeResult, IConsumer<Ignore, string> kafkaConsumer)
  507. {
  508. #region
  509. Object data = JObject.Parse(@"{
  510. 'timestamp': 1670231477000,
  511. 'id': '679e9043-14fe-44df-aba6-b2fd62592fea',
  512. 'asset': {
  513. 'id': '679e9043-14fe-44df-aba6-b2fd62592fea',
  514. 'manufacturer': 'xiaomi',
  515. 'imeiMd5': '1be14c6210b3115f',
  516. 'uniqueId': '8f95204d-a60e-40bb-b8be-45a1ab3bef2a'
  517. },
  518. 'location': {
  519. 'Wifis': [
  520. {
  521. 'timestamp': 1515743846504,
  522. 'signalStrength': 40,
  523. 'macAddress': '6a:77:24:27:9c:04',
  524. 'ssid': '',
  525. 'frequency': 2412,
  526. 'channel': 0,
  527. 'connected': true
  528. },
  529. {
  530. 'timestamp': 1515743846504,
  531. 'signalStrength': 60,
  532. 'macAddress': 'ec:f0:fe:1e:e1:a8',
  533. 'ssid': 'Z VENTURES 9F',
  534. 'frequency': 2457,
  535. 'channel': 0
  536. },
  537. {
  538. 'timestamp': 1515743846504,
  539. 'signalStrength': 60,
  540. 'macAddress': '54:a7:03:b5:a5:5e',
  541. 'ssid': 'Z VENTURES 9F',
  542. 'frequency': 2457,
  543. 'channel': 0
  544. },
  545. {
  546. 'timestamp': 1515743846504,
  547. 'signalStrength': 60,
  548. 'macAddress': 'ec:f8:eb:b6:89:81',
  549. 'ssid': 'Z VENTURES 9F',
  550. 'frequency': 2457,
  551. 'channel': 0
  552. },
  553. {
  554. 'timestamp': 1515743846504,
  555. 'signalStrength': 60,
  556. 'macAddress': 'a0:69:d9:c1:6c:8b',
  557. 'ssid': 'Z VENTURES 9F',
  558. 'frequency': 2457,
  559. 'channel': 0
  560. },
  561. {
  562. 'timestamp': 1515743846504,
  563. 'signalStrength': 60,
  564. 'macAddress': '64:64:4a:d5:f5:ca',
  565. 'ssid': 'Z VENTURES 9F',
  566. 'frequency': 2457,
  567. 'channel': 0
  568. },
  569. {
  570. 'timestamp': 1515743846504,
  571. 'signalStrength': 60,
  572. 'macAddress': 'c8:5b:a0:f8:f4:6c',
  573. 'ssid': 'Z VENTURES 9F',
  574. 'frequency': 2457,
  575. 'channel': 0
  576. },
  577. {
  578. 'timestamp': 1515743846504,
  579. 'signalStrength': 60,
  580. 'macAddress': '84:74:60:bf:fd:20',
  581. 'ssid': 'Z VENTURES 9F',
  582. 'frequency': 2457,
  583. 'channel': 0
  584. }
  585. ]
  586. }
  587. }");
  588. // 将对象转换为 JSON 字符串
  589. var jsonData = JsonConvert.SerializeObject(data);
  590. // 创建 StringContent 对象,将 JSON 字符串传递给它
  591. var content = new StringContent(jsonData, Encoding.UTF8, "application/json");
  592. #endregion
  593. var msg = JsonConvert.DeserializeObject<MessageModel>(consumeResult.Message.Value);
  594. using (_logger.BeginScope(new Dictionary<string, object> { ["RequestId"] = msg!.MessageId }))
  595. //using (new CustomizeStopWatch(nameof(ProcessMessageAsync), _logger))
  596. {
  597. using var httpClient = new HttpClient();
  598. using var request = new HttpRequestMessage
  599. {
  600. RequestUri = new Uri($"https://api.newayz.com/location/hub/v1/track_points?access_key=OK73jLa48dU9V3EvNN1RLpdjcVOdIEcn&field_masks=location.position"),
  601. Method = HttpMethod.Post,
  602. // Version = new Version(2, 0)
  603. Content = content
  604. };
  605. using var tokenResponse = await httpClient.SendAsync(request);
  606. tokenResponse.EnsureSuccessStatusCode();
  607. var token = await tokenResponse.Content.ReadAsStringAsync();
  608. //await Task.Delay(0);
  609. //Thread.Sleep(0);
  610. //Console.WriteLine($"{Guid.NewGuid().ToString()}-- {token[^10..]}");
  611. }
  612. }
  613. private async Task ProcessMessageAsync2(ConsumeResult<Ignore, string> consumeResult, IConsumer<Ignore, string> kafkaConsumer)
  614. {
  615. // using (_logger.BeginScope(new Dictionary<string, object> { ["RequestId"] = i }))
  616. /**
  617. using (new CustomizeStopWatch(nameof(ProcessMessageAsync), _logger))
  618. {
  619. //if (i==800)
  620. //{
  621. // Console.WriteLine($"第{i}消息:{consumeResult.Message.Value.Substring(0, 10)}");
  622. //}
  623. //await Task.CompletedTask;
  624. Object data = JObject.Parse(@"{
  625. 'timestamp': 1670231477000,
  626. 'id': '679e9043-14fe-44df-aba6-b2fd62592fea',
  627. 'asset': {
  628. 'id': '679e9043-14fe-44df-aba6-b2fd62592fea',
  629. 'manufacturer': 'xiaomi',
  630. 'imeiMd5': '1be14c6210b3115f',
  631. 'uniqueId': '8f95204d-a60e-40bb-b8be-45a1ab3bef2a'
  632. },
  633. 'location': {
  634. 'Wifis': [
  635. {
  636. 'timestamp': 1515743846504,
  637. 'signalStrength': 40,
  638. 'macAddress': '6a:77:24:27:9c:04',
  639. 'ssid': '',
  640. 'frequency': 2412,
  641. 'channel': 0,
  642. 'connected': true
  643. },
  644. {
  645. 'timestamp': 1515743846504,
  646. 'signalStrength': 60,
  647. 'macAddress': 'ec:f0:fe:1e:e1:a8',
  648. 'ssid': 'Z VENTURES 9F',
  649. 'frequency': 2457,
  650. 'channel': 0
  651. },
  652. {
  653. 'timestamp': 1515743846504,
  654. 'signalStrength': 60,
  655. 'macAddress': '54:a7:03:b5:a5:5e',
  656. 'ssid': 'Z VENTURES 9F',
  657. 'frequency': 2457,
  658. 'channel': 0
  659. },
  660. {
  661. 'timestamp': 1515743846504,
  662. 'signalStrength': 60,
  663. 'macAddress': 'ec:f8:eb:b6:89:81',
  664. 'ssid': 'Z VENTURES 9F',
  665. 'frequency': 2457,
  666. 'channel': 0
  667. },
  668. {
  669. 'timestamp': 1515743846504,
  670. 'signalStrength': 60,
  671. 'macAddress': 'a0:69:d9:c1:6c:8b',
  672. 'ssid': 'Z VENTURES 9F',
  673. 'frequency': 2457,
  674. 'channel': 0
  675. },
  676. {
  677. 'timestamp': 1515743846504,
  678. 'signalStrength': 60,
  679. 'macAddress': '64:64:4a:d5:f5:ca',
  680. 'ssid': 'Z VENTURES 9F',
  681. 'frequency': 2457,
  682. 'channel': 0
  683. },
  684. {
  685. 'timestamp': 1515743846504,
  686. 'signalStrength': 60,
  687. 'macAddress': 'c8:5b:a0:f8:f4:6c',
  688. 'ssid': 'Z VENTURES 9F',
  689. 'frequency': 2457,
  690. 'channel': 0
  691. },
  692. {
  693. 'timestamp': 1515743846504,
  694. 'signalStrength': 60,
  695. 'macAddress': '84:74:60:bf:fd:20',
  696. 'ssid': 'Z VENTURES 9F',
  697. 'frequency': 2457,
  698. 'channel': 0
  699. }
  700. ]
  701. }
  702. }");
  703. // 将对象转换为 JSON 字符串
  704. var jsonData = JsonConvert.SerializeObject(data);
  705. // 创建 StringContent 对象,将 JSON 字符串传递给它
  706. var content = new StringContent(jsonData, Encoding.UTF8, "application/json");
  707. using var httpClient = new HttpClient();
  708. using var request = new HttpRequestMessage
  709. {
  710. RequestUri = new Uri($"https://api.newayz.com/location/hub/v1/track_points?access_key=OK73jLa48dU9V3EvNN1RLpdjcVOdIEcn&field_masks=location.position"),
  711. Method = HttpMethod.Post,
  712. // Version = new Version(2, 0)
  713. Content = content
  714. };
  715. using var tokenResponse = await httpClient.SendAsync(request);
  716. tokenResponse.EnsureSuccessStatusCode();
  717. var token = await tokenResponse.Content.ReadAsStringAsync();
  718. Console.WriteLine($"Successfully authenticated. token {token}");
  719. }
  720. */
  721. Object data = JObject.Parse(@"{
  722. 'timestamp': 1670231477000,
  723. 'id': '679e9043-14fe-44df-aba6-b2fd62592fea',
  724. 'asset': {
  725. 'id': '679e9043-14fe-44df-aba6-b2fd62592fea',
  726. 'manufacturer': 'xiaomi',
  727. 'imeiMd5': '1be14c6210b3115f',
  728. 'uniqueId': '8f95204d-a60e-40bb-b8be-45a1ab3bef2a'
  729. },
  730. 'location': {
  731. 'Wifis': [
  732. {
  733. 'timestamp': 1515743846504,
  734. 'signalStrength': 40,
  735. 'macAddress': '6a:77:24:27:9c:04',
  736. 'ssid': '',
  737. 'frequency': 2412,
  738. 'channel': 0,
  739. 'connected': true
  740. },
  741. {
  742. 'timestamp': 1515743846504,
  743. 'signalStrength': 60,
  744. 'macAddress': 'ec:f0:fe:1e:e1:a8',
  745. 'ssid': 'Z VENTURES 9F',
  746. 'frequency': 2457,
  747. 'channel': 0
  748. },
  749. {
  750. 'timestamp': 1515743846504,
  751. 'signalStrength': 60,
  752. 'macAddress': '54:a7:03:b5:a5:5e',
  753. 'ssid': 'Z VENTURES 9F',
  754. 'frequency': 2457,
  755. 'channel': 0
  756. },
  757. {
  758. 'timestamp': 1515743846504,
  759. 'signalStrength': 60,
  760. 'macAddress': 'ec:f8:eb:b6:89:81',
  761. 'ssid': 'Z VENTURES 9F',
  762. 'frequency': 2457,
  763. 'channel': 0
  764. },
  765. {
  766. 'timestamp': 1515743846504,
  767. 'signalStrength': 60,
  768. 'macAddress': 'a0:69:d9:c1:6c:8b',
  769. 'ssid': 'Z VENTURES 9F',
  770. 'frequency': 2457,
  771. 'channel': 0
  772. },
  773. {
  774. 'timestamp': 1515743846504,
  775. 'signalStrength': 60,
  776. 'macAddress': '64:64:4a:d5:f5:ca',
  777. 'ssid': 'Z VENTURES 9F',
  778. 'frequency': 2457,
  779. 'channel': 0
  780. },
  781. {
  782. 'timestamp': 1515743846504,
  783. 'signalStrength': 60,
  784. 'macAddress': 'c8:5b:a0:f8:f4:6c',
  785. 'ssid': 'Z VENTURES 9F',
  786. 'frequency': 2457,
  787. 'channel': 0
  788. },
  789. {
  790. 'timestamp': 1515743846504,
  791. 'signalStrength': 60,
  792. 'macAddress': '84:74:60:bf:fd:20',
  793. 'ssid': 'Z VENTURES 9F',
  794. 'frequency': 2457,
  795. 'channel': 0
  796. }
  797. ]
  798. }
  799. }");
  800. // 将对象转换为 JSON 字符串
  801. var jsonData = JsonConvert.SerializeObject(data);
  802. // 创建 StringContent 对象,将 JSON 字符串传递给它
  803. var content = new StringContent(jsonData, Encoding.UTF8, "application/json");
  804. using var httpClient = new HttpClient();
  805. using var request = new HttpRequestMessage
  806. {
  807. RequestUri = new Uri($"https://api.newayz.com/location/hub/v1/track_points?access_key=OK73jLa48dU9V3EvNN1RLpdjcVOdIEcn&field_masks=location.position"),
  808. Method = HttpMethod.Post,
  809. // Version = new Version(2, 0)
  810. Content = content
  811. };
  812. using var tokenResponse = await httpClient.SendAsync(request);
  813. tokenResponse.EnsureSuccessStatusCode();
  814. var token = await tokenResponse.Content.ReadAsStringAsync();
  815. Console.WriteLine($"{Guid.NewGuid().ToString()}-- {token[^10..]}");
  816. }
  817. private async Task ProcessMessageAsync(ConsumeResult<Ignore, string> consumeResult, IConsumer<Ignore, string> kafkaConsumer,int i)
  818. {
  819. using (_logger.BeginScope(new Dictionary<string, object> { ["RequestId"] = i }))
  820. using (new CustomizeStopWatch(nameof(ProcessMessageAsync), _logger))
  821. {
  822. //if (i==800)
  823. //{
  824. // Console.WriteLine($"第{i}消息:{consumeResult.Message.Value.Substring(0, 10)}");
  825. //}
  826. //await Task.CompletedTask;
  827. Object data = JObject.Parse(@"{
  828. 'timestamp': 1670231477000,
  829. 'id': '679e9043-14fe-44df-aba6-b2fd62592fea',
  830. 'asset': {
  831. 'id': '679e9043-14fe-44df-aba6-b2fd62592fea',
  832. 'manufacturer': 'xiaomi',
  833. 'imeiMd5': '1be14c6210b3115f',
  834. 'uniqueId': '8f95204d-a60e-40bb-b8be-45a1ab3bef2a'
  835. },
  836. 'location': {
  837. 'Wifis': [
  838. {
  839. 'timestamp': 1515743846504,
  840. 'signalStrength': 40,
  841. 'macAddress': '6a:77:24:27:9c:04',
  842. 'ssid': '',
  843. 'frequency': 2412,
  844. 'channel': 0,
  845. 'connected': true
  846. },
  847. {
  848. 'timestamp': 1515743846504,
  849. 'signalStrength': 60,
  850. 'macAddress': 'ec:f0:fe:1e:e1:a8',
  851. 'ssid': 'Z VENTURES 9F',
  852. 'frequency': 2457,
  853. 'channel': 0
  854. },
  855. {
  856. 'timestamp': 1515743846504,
  857. 'signalStrength': 60,
  858. 'macAddress': '54:a7:03:b5:a5:5e',
  859. 'ssid': 'Z VENTURES 9F',
  860. 'frequency': 2457,
  861. 'channel': 0
  862. },
  863. {
  864. 'timestamp': 1515743846504,
  865. 'signalStrength': 60,
  866. 'macAddress': 'ec:f8:eb:b6:89:81',
  867. 'ssid': 'Z VENTURES 9F',
  868. 'frequency': 2457,
  869. 'channel': 0
  870. },
  871. {
  872. 'timestamp': 1515743846504,
  873. 'signalStrength': 60,
  874. 'macAddress': 'a0:69:d9:c1:6c:8b',
  875. 'ssid': 'Z VENTURES 9F',
  876. 'frequency': 2457,
  877. 'channel': 0
  878. },
  879. {
  880. 'timestamp': 1515743846504,
  881. 'signalStrength': 60,
  882. 'macAddress': '64:64:4a:d5:f5:ca',
  883. 'ssid': 'Z VENTURES 9F',
  884. 'frequency': 2457,
  885. 'channel': 0
  886. },
  887. {
  888. 'timestamp': 1515743846504,
  889. 'signalStrength': 60,
  890. 'macAddress': 'c8:5b:a0:f8:f4:6c',
  891. 'ssid': 'Z VENTURES 9F',
  892. 'frequency': 2457,
  893. 'channel': 0
  894. },
  895. {
  896. 'timestamp': 1515743846504,
  897. 'signalStrength': 60,
  898. 'macAddress': '84:74:60:bf:fd:20',
  899. 'ssid': 'Z VENTURES 9F',
  900. 'frequency': 2457,
  901. 'channel': 0
  902. }
  903. ]
  904. }
  905. }");
  906. // 将对象转换为 JSON 字符串
  907. var jsonData = JsonConvert.SerializeObject(data);
  908. // 创建 StringContent 对象,将 JSON 字符串传递给它
  909. var content = new StringContent(jsonData, Encoding.UTF8, "application/json");
  910. using var httpClient = new HttpClient();
  911. using var request = new HttpRequestMessage
  912. {
  913. RequestUri = new Uri($"https://api.newayz.com/location/hub/v1/track_points?access_key=OK73jLa48dU9V3EvNN1RLpdjcVOdIEcn&field_masks=location.position"),
  914. Method = HttpMethod.Post,
  915. // Version = new Version(2, 0)
  916. Content= content
  917. };
  918. using var tokenResponse = await httpClient.SendAsync(request);
  919. tokenResponse.EnsureSuccessStatusCode();
  920. var token = await tokenResponse.Content.ReadAsStringAsync();
  921. Console.WriteLine($"Successfully authenticated. token {token}");
  922. }
  923. }
  924. private IConsumer<Ignore, string> CreateKafkaConsumer()
  925. {
  926. //var partitionIndex = 0;
  927. var collection = new List<int>();
  928. // 获取注册的kafka消费者
  929. var consumerConfig = new ConsumerConfig
  930. {
  931. GroupId = "iot.position",
  932. BootstrapServers = _configService.MqServerAddress,
  933. AutoOffsetReset = AutoOffsetReset.Earliest,
  934. EnableAutoCommit = false, // 关闭自动提交偏移量
  935. CancellationDelayMaxMs = 1//set CancellationDelayMaxMs
  936. };
  937. return new ConsumerBuilder<Ignore, string>(consumerConfig)
  938. .SetErrorHandler((_, e) =>
  939. {
  940. Console.WriteLine($"消费者创建出错,代码:{e.Code} |原因: {e.Reason}");
  941. _logger.LogInformation($"消费者创建出错,代码:{e.Code} |原因: {e.Reason}");
  942. })
  943. .SetPartitionsAssignedHandler((c, partitions) =>
  944. {
  945. //// 在这里手动指定要消费的分区
  946. //var partitionsToConsume = new List<TopicPartitionOffset>
  947. //{
  948. // new TopicPartitionOffset("topics.storage.test_env_db", partitionIndex, Offset.Unset)
  949. //};
  950. ////c.Assign(partitionsToConsume);
  951. //Console.WriteLine($"Assigned partitions: {string.Join(", ", partitionsToConsume)}");
  952. //return partitionsToConsume;
  953. })
  954. .SetPartitionsRevokedHandler((c, partitions) =>
  955. {
  956. })
  957. .Build();
  958. }
  959. }
  960. }