Du kannst nicht mehr als 25 Themen auswählen Themen müssen entweder mit einem Buchstaben oder einer Ziffer beginnen. Sie können Bindestriche („-“) enthalten und bis zu 35 Zeichen lang sein.

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029
  1. using Confluent.Kafka;
  2. using GpsCardGatewayPosition.Common;
  3. using GpsCardGatewayPosition.Model.Config;
  4. using GpsCardGatewayPosition.Model.Enum;
  5. using GpsCardGatewayPosition.Model.IoT;
  6. using GpsCardGatewayPosition.Service.Cache;
  7. using GpsCardGatewayPosition.Service.MqProducer;
  8. using GpsCardGatewayPosition.Service.MqProducer.Model;
  9. using GpsCardGatewayPosition.Service.Resolver.Factory;
  10. using GpsCardGatewayPosition.Service.Resolver.Property;
  11. using GpsCardGatewayPosition.Service.Resolver.Property.Dto;
  12. using Microsoft.Extensions.Hosting;
  13. using Microsoft.Extensions.Logging;
  14. using Microsoft.Extensions.Options;
  15. using Newtonsoft.Json;
  16. using Newtonsoft.Json.Linq;
  17. using System;
  18. using System.Collections.Generic;
  19. using System.Linq;
  20. using System.Net;
  21. using System.Reflection.Metadata;
  22. using System.Text;
  23. using System.Threading.Tasks;
  24. using System.Web;
  25. using static Confluent.Kafka.ConfigPropertyNames;
  26. namespace GpsCardGatewayPosition.Postion
  27. {
  28. public class Worker : BackgroundService
  29. {
  30. private readonly ILogger<Worker> _logger;
  31. private CancellationTokenSource _tokenSource = default!;
  32. private readonly MqProcessLogic _serviceMqProcess;
  33. private readonly ServiceConfig _configService;
  34. private readonly AppsettingsConfig _configAppsettings;
  35. private readonly ResolverFactory _factoryResolver;
  36. private readonly WifiPositionResolver _resolverWifiPosition;
  37. private readonly WifiPlus2PositionResolver _resolverWifiPlus2Position;
  38. private readonly GpsPositionResolver _resolverGpsPosition;
  39. private readonly LbsGsmPositionResolver _resolverLbsGsmPosition;
  40. public Worker(ILogger<Worker> logger,IOptions<ServiceConfig> _optConfigService, IOptions<RedisConfig> optConfigRedis, IOptions<AppsettingsConfig> optConfigAppsettings, ResolverFactory resolverFactory, MqProcessLogic serviceMqProcess,
  41. WifiPositionResolver wifiPositionResolver, WifiPlus2PositionResolver wifiPlus2PositionResolver, GpsPositionResolver gpsPositionResolver,LbsGsmPositionResolver lbsGsmPositionResolver)
  42. {
  43. _configService = _optConfigService.Value;
  44. _configAppsettings = optConfigAppsettings.Value;
  45. _serviceMqProcess = serviceMqProcess;
  46. _logger = logger;
  47. _factoryResolver = resolverFactory;
  48. _resolverWifiPosition = wifiPositionResolver;
  49. _resolverWifiPlus2Position = wifiPlus2PositionResolver;
  50. _resolverGpsPosition = gpsPositionResolver;
  51. _resolverLbsGsmPosition= lbsGsmPositionResolver;
  52. //配置全局服务
  53. #region 其他业务redis服务器
  54. var csredis = new CSRedis.CSRedisClient(optConfigRedis.Value.ToString());
  55. RedisHelper.Initialization(csredis);
  56. RedisConfig csredisDb7Con = optConfigRedis.Value;
  57. csredisDb7Con.DefaultDatabase = 7;
  58. csredisDb7Con.Prefix = "TELPO";
  59. var csredisDb7 = new CSRedis.CSRedisClient(csredisDb7Con.ToString());
  60. RedisHelperDb7.Initialization(csredisDb7);
  61. //
  62. RedisConfig csredisDb0Con = optConfigRedis.Value;
  63. csredisDb0Con.DefaultDatabase = 0;
  64. csredisDb0Con.Prefix = "";
  65. var csredisDb0 = new CSRedis.CSRedisClient(csredisDb0Con.ToString());
  66. RedisHelperDb0.Initialization(csredisDb0);
  67. #endregion
  68. #region 共享维智定位缓存
  69. // 维智定位缓存Redis Server
  70. // 172.19.42.56,47.100.200.105
  71. RedisConfig csredisWayzCon = optConfigRedis.Value;
  72. csredisWayzCon.DefaultDatabase = 10;
  73. csredisDb7Con.Prefix = "_GW_";
  74. #if DEBUG
  75. csredisWayzCon.Server = "192.168.2.121:8090";
  76. #else
  77. csredisWayzCon.Server = "172.19.42.56:8090";
  78. #endif
  79. var csredisWayz = new CSRedis.CSRedisClient(csredisWayzCon.ToString());
  80. RedisHelperWayz.Initialization(csredisWayz);
  81. #endregion
  82. }
  83. public override Task StartAsync(CancellationToken cancellationToken)
  84. {
  85. _logger.LogInformation("------StartAsync");
  86. _tokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
  87. // 创建消费者
  88. return base.StartAsync(cancellationToken);
  89. }
  90. public override Task StopAsync(CancellationToken cancellationToken)
  91. {
  92. _logger.LogInformation("------StopAsync");
  93. _tokenSource.Cancel();
  94. return base.StopAsync(cancellationToken);
  95. }
  96. protected override async Task ExecuteAsync(CancellationToken stoppingToken)
  97. {
  98. var kafkaConsumer = CreateKafkaConsumer();
  99. kafkaConsumer.Subscribe("topics.storage.iot.postion");
  100. var tasks = new List<Task>();
  101. //int i = 0;
  102. try
  103. {
  104. while (!stoppingToken.IsCancellationRequested)
  105. {
  106. #region 生产
  107. // await productPostionAsync();
  108. #endregion
  109. #region 消费
  110. // 并发处理 1000 个消息
  111. var concurrencyId = Guid.NewGuid().ToString("N")[^4..];
  112. using (_logger.BeginScope(new Dictionary<string, object> { ["RequestId"] = concurrencyId }))
  113. using (new CustomizeStopWatch(nameof(Worker), _logger))
  114. {
  115. //for (int i = 0; i < 2000; i++)
  116. //{
  117. // //var consumeResult = kafkaConsumer.Consume(stoppingToken);
  118. // var consumeResult = await Task.Run(() => kafkaConsumer.Consume(stoppingToken));
  119. // if (consumeResult != null)
  120. // {
  121. // tasks.Add(ProcessMessageAsync(consumeResult, kafkaConsumer));
  122. // }
  123. //}
  124. ////var consumeResult = kafkaConsumer.Consume(stoppingToken);
  125. ////if (consumeResult != null)
  126. ////{
  127. //// tasks.Add(ProcessMessageAsync(consumeResult, kafkaConsumer));
  128. ////}
  129. //// 并行等待所有任务完成
  130. //await Task.WhenAll(tasks);
  131. //tasks.Clear(); // 清空任务列表,以便下一个循环重新填充
  132. //Console.WriteLine("----------------------------------------------------------");
  133. var consumeResult = await Task.Run(() => kafkaConsumer.Consume(stoppingToken));
  134. if (consumeResult != null)
  135. {
  136. tasks.Add(ProcessMessageAsync(consumeResult, kafkaConsumer, concurrencyId));
  137. }
  138. if (tasks.Count>=2000)
  139. {
  140. // 并行等待所有任务完成
  141. await Task.WhenAll(tasks);
  142. tasks.Clear(); // 清空任务列表,以便下一个循环重新填充
  143. Console.WriteLine("----------------------------------------------------------");
  144. }
  145. }
  146. //var consumeResult = kafkaConsumer.Consume(stoppingToken);
  147. //if (consumeResult != null)
  148. //{
  149. // i++;
  150. // tasks.Add(ProcessMessageAsync(consumeResult, kafkaConsumer, i));
  151. //}
  152. //await Task.WhenAll(tasks);
  153. #endregion
  154. }
  155. }
  156. catch (OperationCanceledException)
  157. {
  158. _logger.LogWarning("Worker exit");
  159. }
  160. catch (Exception ex)
  161. {
  162. _logger.LogError($"An error occurred: {ex.Message}");
  163. }
  164. // await Task.WhenAll(tasks);
  165. }
  166. private async Task ProductPostionAsync()
  167. {
  168. Object wifi = JObject.Parse(@"{
  169. 'MessageId': '1775412145908567552',
  170. 'BusinessId': '862622050169778',
  171. 'ReceiveTime': '2024-04-03 14:36:55',
  172. 'TopicType': 512,
  173. 'MsgType': 517,
  174. 'TopicInfo': {
  175. 'deviceType': 'CustomCategory',
  176. 'iotId': '7EBPsoxlNmILKsUpKl0L000000',
  177. 'requestId': '987',
  178. 'productKey': 'a18mXM6Cvx8',
  179. 'gmtCreate': 1712126215803,
  180. 'deviceName': '862622050169778',
  181. 'items': {
  182. 'WIFI': {
  183. 'value': {
  184. 'mmac': '04:6b:25:8e:a1:81,-68,',
  185. 'macs': 'f4:2a:7d:ca:c8:c9,-72,|04:6b:25:8e:0a:91,-75,|54:84:dc:37:e8:70,-78,|54:84:dc:37:e8:18,-82,',
  186. 'IDType': 0,
  187. 'IDNumber': ''
  188. },
  189. 'time': 1712126215802
  190. }
  191. }
  192. },
  193. 'DetailData': {
  194. 'time': 1712126215802,
  195. 'value': {
  196. 'mmac': '04:6b:25:8e:a1:81,-68,',
  197. 'macs': 'f4:2a:7d:ca:c8:c9,-72,|04:6b:25:8e:0a:91,-75,|54:84:dc:37:e8:70,-78,|54:84:dc:37:e8:18,-82,',
  198. 'IDType': 0,
  199. 'IDNumber': ''
  200. }
  201. }
  202. }");
  203. Object wifiplus2 = JObject.Parse(@"{
  204. 'MessageId': '1775412188354926080',
  205. 'BusinessId': '861281060094145',
  206. 'ReceiveTime': '2024-04-03 14:37:05',
  207. 'TopicType': 512,
  208. 'MsgType': 514,
  209. 'TopicInfo': {
  210. 'deviceType': 'CustomCategory',
  211. 'iotId': 'yQw7bqBl8gSltErYIRVBg1gs00',
  212. 'requestId': '105',
  213. 'productKey': 'a18mXM6Cvx8',
  214. 'gmtCreate': 1712126225922,
  215. 'deviceName': '861281060094145',
  216. 'items': {
  217. 'WIFIPLUS2_0': {
  218. 'time': 1712124640009,
  219. 'value': [
  220. {
  221. 'bts': '460,00,30977,247196817,-41',
  222. 'smac': '',
  223. 'nearbts': '|460,00,30977,130,-17|460,00,30977,436,-12|460,00,30977,414,-8|460,00,30977,500,-41|460,00,30977,294,-47|460,00,30977,2,-39',
  224. 'imsi': '460077277322956',
  225. 'steps': '0',
  226. 'IDType': 0,
  227. 'network': 'LTE',
  228. 'mmac': '08:10:7b:df:3d:c8,-62,',
  229. 'datetime': '1712124640001',
  230. 'macs': 'ee:b9:70:83:8d:d2,-68,|3c:06:a7:16:a7:21,-68,|3c:06:a7:80:68:1a,-70,|3e:06:a7:a0:68:1a,-70,|08:10:78:91:72:4a,-77,|68:77:24:aa:40:c3,-81,|a8:e5:44:a5:c4:20,-84,',
  231. 'imei': '861281060094145',
  232. 'cdma': 0,
  233. 'IDNumber': ''
  234. }
  235. ]
  236. }
  237. }
  238. },
  239. 'DetailData': {
  240. 'time': 1712124640009,
  241. 'value': [
  242. {
  243. 'Mmac': '08:10:7b:df:3d:c8,-62,',
  244. 'Macs': 'ee:b9:70:83:8d:d2,-68,|3c:06:a7:16:a7:21,-68,|3c:06:a7:80:68:1a,-70,|3e:06:a7:a0:68:1a,-70,|08:10:78:91:72:4a,-77,|68:77:24:aa:40:c3,-81,|a8:e5:44:a5:c4:20,-84,',
  245. 'IdType': 0,
  246. 'IdNumber': '',
  247. 'Imei': '861281060094145',
  248. 'Smac': '',
  249. 'Imsi': '460077277322956',
  250. 'NearBts': '|460,00,30977,130,-17|460,00,30977,436,-12|460,00,30977,414,-8|460,00,30977,500,-41|460,00,30977,294,-47|460,00,30977,2,-39',
  251. 'Bts': '460,00,30977,247196817,-41',
  252. 'Cdma': 0,
  253. 'Network': 'LTE',
  254. 'Steps': '0',
  255. 'Datetime': '1712124640001'
  256. }
  257. ]
  258. }
  259. }");
  260. //Object lbs = JObject.Parse(@');
  261. //Object gps = JObject.Parse(@');
  262. for (int i = 0; i < 1000; i++)
  263. {
  264. if (i % 2 == 1)
  265. {
  266. var serialno = "862622050169778";
  267. var messageId = Guid.NewGuid().ToString();
  268. await _serviceMqProcess.ProcessIotPositionAsync(messageId, serialno, wifi);
  269. }
  270. else
  271. {
  272. var serialno = "861281060094145";
  273. var messageId = Guid.NewGuid().ToString();
  274. await _serviceMqProcess.ProcessIotPositionAsync(messageId, serialno, wifiplus2);
  275. }
  276. }
  277. }
  278. private async Task ProcessMessageAsync(ConsumeResult<Ignore, string> consumeResult, IConsumer<Ignore, string> kafkaConsumer,string concurrencyId)
  279. {
  280. var msg = JsonConvert.DeserializeObject<MessageModel>(consumeResult.Message.Value);
  281. string msgId = msg!.Data["MessageId"]?.ToObject<string>()!;
  282. string bizId = msg!.Data["BusinessId"]?.ToObject<string>()!;
  283. IotTopicType topic = msg!.Data["TopicType"]!.ToObject<IotTopicType>();
  284. int type = msg!.Data["MsgType"]!.ToObject<int>();
  285. object topicInfo = msg!.Data["TopicInfo"]!.ToObject<object>()!;
  286. object detailData = msg!.Data["DetailData"]!.ToObject<object>()!;
  287. var packge = new PackageMsgModel(msgId, bizId, topic, type, topicInfo, detailData);
  288. try
  289. {
  290. using (_logger.BeginScope(new Dictionary<string, object> { ["RequestId"] =$"{concurrencyId}-{msg!.MessageId}" }))
  291. using (new CustomizeStopWatch(IotMessageTypeUtils.TryToDescription(type), _logger))
  292. {
  293. var dict = JsonConvert.DeserializeObject<Dictionary<string, object>>(msg!.Data["TopicInfo"]?["items"]?.ToString()!);
  294. var key = dict?.Keys?.FirstOrDefault() + "";
  295. switch (key)
  296. {
  297. case "GPS": //处理gps位置信息上报
  298. {
  299. //Console.WriteLine("GPS");
  300. var data = JsonConvert.DeserializeObject<PropertyItemModel<GpsInfoModel>>(dict?["GPS"] + "");
  301. //if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]");
  302. _resolverGpsPosition.SetResolveInfo(packge);
  303. await _resolverGpsPosition.ExecuteMessageAsync().ConfigureAwait(false);
  304. break;
  305. //return new PackageMsgModel(msg.MessageId, topicInfo.DeviceName, IotTopicType.Property, (int)IotMessagePropertyType.GpsPosition, topicInfo, data);
  306. }
  307. case "WIFI": //处理gps位置信息上报
  308. {
  309. var data = JsonConvert.DeserializeObject<PropertyItemModel<WifiInfoModel>>(dict?["WIFI"] + "");
  310. //if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]");
  311. _resolverWifiPosition.SetResolveInfo(packge);
  312. await _resolverWifiPosition.ExecuteMessageAsync().ConfigureAwait(false);
  313. break;
  314. }
  315. case "WIFIPLUS2_0": //处理gps位置信息上报
  316. {
  317. var data = JsonConvert.DeserializeObject<PropertyItemModel<List<EachWifiPlus2Model>>>(dict?["WIFIPLUS2_0"] + "");
  318. //if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]");
  319. _resolverWifiPlus2Position.SetResolveInfo(packge);
  320. await _resolverWifiPlus2Position.ExecuteMessageAsync().ConfigureAwait(false);
  321. break;
  322. }
  323. case "LBS": //处理gps位置信息上报
  324. {
  325. var data = JsonConvert.DeserializeObject<PropertyItemModel<LbsInfoModel>>(dict?["LBS"] + "");
  326. //if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]");
  327. //if (data.Value.Cdma == 0)
  328. //{
  329. // return new PackageMsgModel(msg.MessageId, topicInfo.DeviceName, IotTopicType.Property, (int)IotMessagePropertyType.LbsGsmPosition, topicInfo, data);
  330. //}
  331. //else if (data.Value.Cdma == 1)
  332. //{
  333. // return new PackageMsgModel(msg.MessageId, topicInfo.DeviceName, IotTopicType.Property, (int)IotMessagePropertyType.LbsCdmaPosition, topicInfo, data);
  334. //}
  335. _resolverLbsGsmPosition.SetResolveInfo(packge);
  336. await _resolverLbsGsmPosition.ExecuteMessageAsync().ConfigureAwait(false);
  337. break;
  338. }
  339. }
  340. }
  341. }
  342. catch (Exception ex)
  343. {
  344. _logger.LogError($"解析定位消息发生异常 messageId: {msg!.MessageId}, Message: {ex.Message}, body: {JsonConvert.SerializeObject(msg)}");
  345. }
  346. await Task.CompletedTask;
  347. }
  348. private async Task ProcessMessageAsync6(ConsumeResult<Ignore, string> consumeResult, IConsumer<Ignore, string> kafkaConsumer)
  349. {
  350. var msg = JsonConvert.DeserializeObject<MessageModel>(consumeResult.Message.Value);
  351. string msgId = msg!.Data["MessageId"]?.ToObject<string>()!;
  352. string bizId = msg!.Data["BusinessId"]?.ToObject<string>()!;
  353. IotTopicType topic = msg!.Data["TopicType"]!.ToObject<IotTopicType>();
  354. int type = msg!.Data["MsgType"]!.ToObject<int>();
  355. object topicInfo = msg!.Data["TopicInfo"]!.ToObject<object>()!;
  356. object detailData = msg!.Data["DetailData"]!.ToObject<object>()!;
  357. var packge = new PackageMsgModel(msgId, bizId, topic, type, topicInfo, detailData);
  358. try
  359. {
  360. using (_logger.BeginScope(new Dictionary<string, object> { ["RequestId"] = msg!.MessageId }))
  361. using (new CustomizeStopWatch(IotMessageTypeUtils.TryToDescription(type), _logger))
  362. {
  363. var dict = JsonConvert.DeserializeObject<Dictionary<string, object>>(msg!.Data["TopicInfo"]?["items"]?.ToString()!);
  364. var key = dict?.Keys?.FirstOrDefault() + "";
  365. switch (key)
  366. {
  367. case "GPS": //处理gps位置信息上报
  368. {
  369. //Console.WriteLine("GPS");
  370. var data = JsonConvert.DeserializeObject<PropertyItemModel<GpsInfoModel>>(dict?["GPS"] + "");
  371. //if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]");
  372. _resolverGpsPosition.SetResolveInfo(packge);
  373. await _resolverGpsPosition.ExecuteMessageAsync().ConfigureAwait(false);
  374. break;
  375. //return new PackageMsgModel(msg.MessageId, topicInfo.DeviceName, IotTopicType.Property, (int)IotMessagePropertyType.GpsPosition, topicInfo, data);
  376. }
  377. case "WIFI": //处理gps位置信息上报
  378. {
  379. var data = JsonConvert.DeserializeObject<PropertyItemModel<WifiInfoModel>>(dict?["WIFI"] + "");
  380. //if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]");
  381. _resolverWifiPosition.SetResolveInfo(packge);
  382. await _resolverWifiPosition.ExecuteMessageAsync().ConfigureAwait(false);
  383. break;
  384. }
  385. case "WIFIPLUS2_0": //处理gps位置信息上报
  386. {
  387. var data = JsonConvert.DeserializeObject<PropertyItemModel<List<EachWifiPlus2Model>>>(dict?["WIFIPLUS2_0"] + "");
  388. //if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]");
  389. _resolverWifiPlus2Position.SetResolveInfo(packge);
  390. await _resolverWifiPlus2Position.ExecuteMessageAsync().ConfigureAwait(false);
  391. break;
  392. }
  393. case "LBS": //处理gps位置信息上报
  394. {
  395. var data = JsonConvert.DeserializeObject<PropertyItemModel<LbsInfoModel>>(dict?["LBS"] + "");
  396. //if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]");
  397. //if (data.Value.Cdma == 0)
  398. //{
  399. // return new PackageMsgModel(msg.MessageId, topicInfo.DeviceName, IotTopicType.Property, (int)IotMessagePropertyType.LbsGsmPosition, topicInfo, data);
  400. //}
  401. //else if (data.Value.Cdma == 1)
  402. //{
  403. // return new PackageMsgModel(msg.MessageId, topicInfo.DeviceName, IotTopicType.Property, (int)IotMessagePropertyType.LbsCdmaPosition, topicInfo, data);
  404. //}
  405. _resolverLbsGsmPosition.SetResolveInfo(packge);
  406. await _resolverLbsGsmPosition.ExecuteMessageAsync().ConfigureAwait(false);
  407. break;
  408. }
  409. }
  410. }
  411. }
  412. catch (Exception ex)
  413. {
  414. _logger.LogError($"解析定位消息发生异常 messageId: {msg!.MessageId}, Message: {ex.Message}, body: {JsonConvert.SerializeObject(msg)}");
  415. }
  416. await Task.CompletedTask;
  417. }
  418. private async Task ProcessMessageAsync5(ConsumeResult<Ignore, string> consumeResult, IConsumer<Ignore, string> kafkaConsumer)
  419. {
  420. var msg = JsonConvert.DeserializeObject<MessageModel>(consumeResult.Message.Value);
  421. try
  422. {
  423. using (_logger.BeginScope(new Dictionary<string, object> { ["RequestId"] = msg!.MessageId }))
  424. //using (new CustomizeStopWatch(nameof(ProcessMessageAsync), _logger))
  425. {
  426. _factoryResolver.Resolver(msg);
  427. }
  428. }
  429. catch (Exception ex)
  430. {
  431. _logger.LogError($"解析Property主题消息发生异常 messageId: {msg!.MessageId}, Message: {ex.Message}, body: {JsonConvert.SerializeObject(msg)}");
  432. }
  433. await Task.CompletedTask;
  434. }
  435. private async Task ProcessMessageAsync4(ConsumeResult<Ignore, string> consumeResult, IConsumer<Ignore, string> kafkaConsumer)
  436. {
  437. var msg = JsonConvert.DeserializeObject<MessageModel>(consumeResult.Message.Value);
  438. try
  439. {
  440. using (_logger.BeginScope(new Dictionary<string, object> { ["RequestId"] = msg!.MessageId }))
  441. //using (new CustomizeStopWatch(nameof(ProcessMessageAsync), _logger))
  442. {
  443. //var positionType = msg!.Data;
  444. string msgId = msg!.Data["MessageId"]?.ToObject<string>()!;
  445. string bizId = msg!.Data["BusinessId"]?.ToObject<string>()!;
  446. IotTopicType topic = msg!.Data["TopicType"]!.ToObject<IotTopicType>();
  447. int type = msg!.Data["MsgType"]!.ToObject<int>();
  448. object topicInfo = msg!.Data["TopicInfo"]!.ToObject<object>()!;
  449. object detailData = msg!.Data["DetailData"]!.ToObject<object>()!;
  450. var packge = new PackageMsgModel(msgId, bizId, topic, type, topicInfo, detailData);
  451. //return new PackageMsgModel(msgId, bizId, topic, type, topicInfo, detailData);
  452. var dict = JsonConvert.DeserializeObject<Dictionary<string, object>>(msg!.Data["TopicInfo"]?["items"]?.ToString()!);
  453. var key = dict?.Keys?.FirstOrDefault() + "";
  454. switch (key)
  455. {
  456. case "GPS": //处理gps位置信息上报
  457. {
  458. Console.WriteLine("GPS");
  459. var data = JsonConvert.DeserializeObject<PropertyItemModel<GpsInfoModel>>(dict?["GPS"] + "");
  460. if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]");
  461. break;
  462. //return new PackageMsgModel(msg.MessageId, topicInfo.DeviceName, IotTopicType.Property, (int)IotMessagePropertyType.GpsPosition, topicInfo, data);
  463. }
  464. case "WIFI": //处理gps位置信息上报
  465. {
  466. var data = JsonConvert.DeserializeObject<PropertyItemModel<WifiInfoModel>>(dict?["WIFI"] + "");
  467. if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]");
  468. Console.WriteLine("WIFI");
  469. break;
  470. }
  471. case "WIFIPLUS2_0": //处理gps位置信息上报
  472. {
  473. var data = JsonConvert.DeserializeObject<PropertyItemModel<List<EachWifiPlus2Model>>>(dict?["WIFIPLUS2_0"] + "");
  474. if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]");
  475. Console.WriteLine("WIFIPLUS2_0");
  476. break;
  477. }
  478. case "LBS": //处理gps位置信息上报
  479. {
  480. var data = JsonConvert.DeserializeObject<PropertyItemModel<LbsInfoModel>>(dict?["LBS"] + "");
  481. if (Utils.GetTimeDeviationMilliseconds(data!.Time) > _configAppsettings.DeviceTimeDeviationMillieconds) throw new ArgumentException($"设备消息时间戳异常[{data.Time}]");
  482. //if (data.Value.Cdma == 0)
  483. //{
  484. // return new PackageMsgModel(msg.MessageId, topicInfo.DeviceName, IotTopicType.Property, (int)IotMessagePropertyType.LbsGsmPosition, topicInfo, data);
  485. //}
  486. //else if (data.Value.Cdma == 1)
  487. //{
  488. // return new PackageMsgModel(msg.MessageId, topicInfo.DeviceName, IotTopicType.Property, (int)IotMessagePropertyType.LbsCdmaPosition, topicInfo, data);
  489. //}
  490. Console.WriteLine("WIFIPLUS2_0");
  491. break;
  492. }
  493. }
  494. }
  495. }
  496. catch (Exception ex)
  497. {
  498. _logger.LogError($"解析Property主题消息发生异常 messageId: {msg!.MessageId}, Message: {ex.Message}, body: {JsonConvert.SerializeObject(msg)}");
  499. }
  500. await Task.CompletedTask;
  501. }
  502. private async Task ProcessMessageAsync3(ConsumeResult<Ignore, string> consumeResult, IConsumer<Ignore, string> kafkaConsumer)
  503. {
  504. #region
  505. Object data = JObject.Parse(@"{
  506. 'timestamp': 1670231477000,
  507. 'id': '679e9043-14fe-44df-aba6-b2fd62592fea',
  508. 'asset': {
  509. 'id': '679e9043-14fe-44df-aba6-b2fd62592fea',
  510. 'manufacturer': 'xiaomi',
  511. 'imeiMd5': '1be14c6210b3115f',
  512. 'uniqueId': '8f95204d-a60e-40bb-b8be-45a1ab3bef2a'
  513. },
  514. 'location': {
  515. 'Wifis': [
  516. {
  517. 'timestamp': 1515743846504,
  518. 'signalStrength': 40,
  519. 'macAddress': '6a:77:24:27:9c:04',
  520. 'ssid': '',
  521. 'frequency': 2412,
  522. 'channel': 0,
  523. 'connected': true
  524. },
  525. {
  526. 'timestamp': 1515743846504,
  527. 'signalStrength': 60,
  528. 'macAddress': 'ec:f0:fe:1e:e1:a8',
  529. 'ssid': 'Z VENTURES 9F',
  530. 'frequency': 2457,
  531. 'channel': 0
  532. },
  533. {
  534. 'timestamp': 1515743846504,
  535. 'signalStrength': 60,
  536. 'macAddress': '54:a7:03:b5:a5:5e',
  537. 'ssid': 'Z VENTURES 9F',
  538. 'frequency': 2457,
  539. 'channel': 0
  540. },
  541. {
  542. 'timestamp': 1515743846504,
  543. 'signalStrength': 60,
  544. 'macAddress': 'ec:f8:eb:b6:89:81',
  545. 'ssid': 'Z VENTURES 9F',
  546. 'frequency': 2457,
  547. 'channel': 0
  548. },
  549. {
  550. 'timestamp': 1515743846504,
  551. 'signalStrength': 60,
  552. 'macAddress': 'a0:69:d9:c1:6c:8b',
  553. 'ssid': 'Z VENTURES 9F',
  554. 'frequency': 2457,
  555. 'channel': 0
  556. },
  557. {
  558. 'timestamp': 1515743846504,
  559. 'signalStrength': 60,
  560. 'macAddress': '64:64:4a:d5:f5:ca',
  561. 'ssid': 'Z VENTURES 9F',
  562. 'frequency': 2457,
  563. 'channel': 0
  564. },
  565. {
  566. 'timestamp': 1515743846504,
  567. 'signalStrength': 60,
  568. 'macAddress': 'c8:5b:a0:f8:f4:6c',
  569. 'ssid': 'Z VENTURES 9F',
  570. 'frequency': 2457,
  571. 'channel': 0
  572. },
  573. {
  574. 'timestamp': 1515743846504,
  575. 'signalStrength': 60,
  576. 'macAddress': '84:74:60:bf:fd:20',
  577. 'ssid': 'Z VENTURES 9F',
  578. 'frequency': 2457,
  579. 'channel': 0
  580. }
  581. ]
  582. }
  583. }");
  584. // 将对象转换为 JSON 字符串
  585. var jsonData = JsonConvert.SerializeObject(data);
  586. // 创建 StringContent 对象,将 JSON 字符串传递给它
  587. var content = new StringContent(jsonData, Encoding.UTF8, "application/json");
  588. #endregion
  589. var msg = JsonConvert.DeserializeObject<MessageModel>(consumeResult.Message.Value);
  590. using (_logger.BeginScope(new Dictionary<string, object> { ["RequestId"] = msg!.MessageId }))
  591. //using (new CustomizeStopWatch(nameof(ProcessMessageAsync), _logger))
  592. {
  593. using var httpClient = new HttpClient();
  594. using var request = new HttpRequestMessage
  595. {
  596. RequestUri = new Uri($"https://api.newayz.com/location/hub/v1/track_points?access_key=OK73jLa48dU9V3EvNN1RLpdjcVOdIEcn&field_masks=location.position"),
  597. Method = HttpMethod.Post,
  598. // Version = new Version(2, 0)
  599. Content = content
  600. };
  601. using var tokenResponse = await httpClient.SendAsync(request);
  602. tokenResponse.EnsureSuccessStatusCode();
  603. var token = await tokenResponse.Content.ReadAsStringAsync();
  604. //await Task.Delay(0);
  605. //Thread.Sleep(0);
  606. //Console.WriteLine($"{Guid.NewGuid().ToString()}-- {token[^10..]}");
  607. }
  608. }
  609. private async Task ProcessMessageAsync2(ConsumeResult<Ignore, string> consumeResult, IConsumer<Ignore, string> kafkaConsumer)
  610. {
  611. // using (_logger.BeginScope(new Dictionary<string, object> { ["RequestId"] = i }))
  612. /**
  613. using (new CustomizeStopWatch(nameof(ProcessMessageAsync), _logger))
  614. {
  615. //if (i==800)
  616. //{
  617. // Console.WriteLine($"第{i}消息:{consumeResult.Message.Value.Substring(0, 10)}");
  618. //}
  619. //await Task.CompletedTask;
  620. Object data = JObject.Parse(@"{
  621. 'timestamp': 1670231477000,
  622. 'id': '679e9043-14fe-44df-aba6-b2fd62592fea',
  623. 'asset': {
  624. 'id': '679e9043-14fe-44df-aba6-b2fd62592fea',
  625. 'manufacturer': 'xiaomi',
  626. 'imeiMd5': '1be14c6210b3115f',
  627. 'uniqueId': '8f95204d-a60e-40bb-b8be-45a1ab3bef2a'
  628. },
  629. 'location': {
  630. 'Wifis': [
  631. {
  632. 'timestamp': 1515743846504,
  633. 'signalStrength': 40,
  634. 'macAddress': '6a:77:24:27:9c:04',
  635. 'ssid': '',
  636. 'frequency': 2412,
  637. 'channel': 0,
  638. 'connected': true
  639. },
  640. {
  641. 'timestamp': 1515743846504,
  642. 'signalStrength': 60,
  643. 'macAddress': 'ec:f0:fe:1e:e1:a8',
  644. 'ssid': 'Z VENTURES 9F',
  645. 'frequency': 2457,
  646. 'channel': 0
  647. },
  648. {
  649. 'timestamp': 1515743846504,
  650. 'signalStrength': 60,
  651. 'macAddress': '54:a7:03:b5:a5:5e',
  652. 'ssid': 'Z VENTURES 9F',
  653. 'frequency': 2457,
  654. 'channel': 0
  655. },
  656. {
  657. 'timestamp': 1515743846504,
  658. 'signalStrength': 60,
  659. 'macAddress': 'ec:f8:eb:b6:89:81',
  660. 'ssid': 'Z VENTURES 9F',
  661. 'frequency': 2457,
  662. 'channel': 0
  663. },
  664. {
  665. 'timestamp': 1515743846504,
  666. 'signalStrength': 60,
  667. 'macAddress': 'a0:69:d9:c1:6c:8b',
  668. 'ssid': 'Z VENTURES 9F',
  669. 'frequency': 2457,
  670. 'channel': 0
  671. },
  672. {
  673. 'timestamp': 1515743846504,
  674. 'signalStrength': 60,
  675. 'macAddress': '64:64:4a:d5:f5:ca',
  676. 'ssid': 'Z VENTURES 9F',
  677. 'frequency': 2457,
  678. 'channel': 0
  679. },
  680. {
  681. 'timestamp': 1515743846504,
  682. 'signalStrength': 60,
  683. 'macAddress': 'c8:5b:a0:f8:f4:6c',
  684. 'ssid': 'Z VENTURES 9F',
  685. 'frequency': 2457,
  686. 'channel': 0
  687. },
  688. {
  689. 'timestamp': 1515743846504,
  690. 'signalStrength': 60,
  691. 'macAddress': '84:74:60:bf:fd:20',
  692. 'ssid': 'Z VENTURES 9F',
  693. 'frequency': 2457,
  694. 'channel': 0
  695. }
  696. ]
  697. }
  698. }");
  699. // 将对象转换为 JSON 字符串
  700. var jsonData = JsonConvert.SerializeObject(data);
  701. // 创建 StringContent 对象,将 JSON 字符串传递给它
  702. var content = new StringContent(jsonData, Encoding.UTF8, "application/json");
  703. using var httpClient = new HttpClient();
  704. using var request = new HttpRequestMessage
  705. {
  706. RequestUri = new Uri($"https://api.newayz.com/location/hub/v1/track_points?access_key=OK73jLa48dU9V3EvNN1RLpdjcVOdIEcn&field_masks=location.position"),
  707. Method = HttpMethod.Post,
  708. // Version = new Version(2, 0)
  709. Content = content
  710. };
  711. using var tokenResponse = await httpClient.SendAsync(request);
  712. tokenResponse.EnsureSuccessStatusCode();
  713. var token = await tokenResponse.Content.ReadAsStringAsync();
  714. Console.WriteLine($"Successfully authenticated. token {token}");
  715. }
  716. */
  717. Object data = JObject.Parse(@"{
  718. 'timestamp': 1670231477000,
  719. 'id': '679e9043-14fe-44df-aba6-b2fd62592fea',
  720. 'asset': {
  721. 'id': '679e9043-14fe-44df-aba6-b2fd62592fea',
  722. 'manufacturer': 'xiaomi',
  723. 'imeiMd5': '1be14c6210b3115f',
  724. 'uniqueId': '8f95204d-a60e-40bb-b8be-45a1ab3bef2a'
  725. },
  726. 'location': {
  727. 'Wifis': [
  728. {
  729. 'timestamp': 1515743846504,
  730. 'signalStrength': 40,
  731. 'macAddress': '6a:77:24:27:9c:04',
  732. 'ssid': '',
  733. 'frequency': 2412,
  734. 'channel': 0,
  735. 'connected': true
  736. },
  737. {
  738. 'timestamp': 1515743846504,
  739. 'signalStrength': 60,
  740. 'macAddress': 'ec:f0:fe:1e:e1:a8',
  741. 'ssid': 'Z VENTURES 9F',
  742. 'frequency': 2457,
  743. 'channel': 0
  744. },
  745. {
  746. 'timestamp': 1515743846504,
  747. 'signalStrength': 60,
  748. 'macAddress': '54:a7:03:b5:a5:5e',
  749. 'ssid': 'Z VENTURES 9F',
  750. 'frequency': 2457,
  751. 'channel': 0
  752. },
  753. {
  754. 'timestamp': 1515743846504,
  755. 'signalStrength': 60,
  756. 'macAddress': 'ec:f8:eb:b6:89:81',
  757. 'ssid': 'Z VENTURES 9F',
  758. 'frequency': 2457,
  759. 'channel': 0
  760. },
  761. {
  762. 'timestamp': 1515743846504,
  763. 'signalStrength': 60,
  764. 'macAddress': 'a0:69:d9:c1:6c:8b',
  765. 'ssid': 'Z VENTURES 9F',
  766. 'frequency': 2457,
  767. 'channel': 0
  768. },
  769. {
  770. 'timestamp': 1515743846504,
  771. 'signalStrength': 60,
  772. 'macAddress': '64:64:4a:d5:f5:ca',
  773. 'ssid': 'Z VENTURES 9F',
  774. 'frequency': 2457,
  775. 'channel': 0
  776. },
  777. {
  778. 'timestamp': 1515743846504,
  779. 'signalStrength': 60,
  780. 'macAddress': 'c8:5b:a0:f8:f4:6c',
  781. 'ssid': 'Z VENTURES 9F',
  782. 'frequency': 2457,
  783. 'channel': 0
  784. },
  785. {
  786. 'timestamp': 1515743846504,
  787. 'signalStrength': 60,
  788. 'macAddress': '84:74:60:bf:fd:20',
  789. 'ssid': 'Z VENTURES 9F',
  790. 'frequency': 2457,
  791. 'channel': 0
  792. }
  793. ]
  794. }
  795. }");
  796. // 将对象转换为 JSON 字符串
  797. var jsonData = JsonConvert.SerializeObject(data);
  798. // 创建 StringContent 对象,将 JSON 字符串传递给它
  799. var content = new StringContent(jsonData, Encoding.UTF8, "application/json");
  800. using var httpClient = new HttpClient();
  801. using var request = new HttpRequestMessage
  802. {
  803. RequestUri = new Uri($"https://api.newayz.com/location/hub/v1/track_points?access_key=OK73jLa48dU9V3EvNN1RLpdjcVOdIEcn&field_masks=location.position"),
  804. Method = HttpMethod.Post,
  805. // Version = new Version(2, 0)
  806. Content = content
  807. };
  808. using var tokenResponse = await httpClient.SendAsync(request);
  809. tokenResponse.EnsureSuccessStatusCode();
  810. var token = await tokenResponse.Content.ReadAsStringAsync();
  811. Console.WriteLine($"{Guid.NewGuid().ToString()}-- {token[^10..]}");
  812. }
  813. private async Task ProcessMessageAsync(ConsumeResult<Ignore, string> consumeResult, IConsumer<Ignore, string> kafkaConsumer,int i)
  814. {
  815. using (_logger.BeginScope(new Dictionary<string, object> { ["RequestId"] = i }))
  816. using (new CustomizeStopWatch(nameof(ProcessMessageAsync), _logger))
  817. {
  818. //if (i==800)
  819. //{
  820. // Console.WriteLine($"第{i}消息:{consumeResult.Message.Value.Substring(0, 10)}");
  821. //}
  822. //await Task.CompletedTask;
  823. Object data = JObject.Parse(@"{
  824. 'timestamp': 1670231477000,
  825. 'id': '679e9043-14fe-44df-aba6-b2fd62592fea',
  826. 'asset': {
  827. 'id': '679e9043-14fe-44df-aba6-b2fd62592fea',
  828. 'manufacturer': 'xiaomi',
  829. 'imeiMd5': '1be14c6210b3115f',
  830. 'uniqueId': '8f95204d-a60e-40bb-b8be-45a1ab3bef2a'
  831. },
  832. 'location': {
  833. 'Wifis': [
  834. {
  835. 'timestamp': 1515743846504,
  836. 'signalStrength': 40,
  837. 'macAddress': '6a:77:24:27:9c:04',
  838. 'ssid': '',
  839. 'frequency': 2412,
  840. 'channel': 0,
  841. 'connected': true
  842. },
  843. {
  844. 'timestamp': 1515743846504,
  845. 'signalStrength': 60,
  846. 'macAddress': 'ec:f0:fe:1e:e1:a8',
  847. 'ssid': 'Z VENTURES 9F',
  848. 'frequency': 2457,
  849. 'channel': 0
  850. },
  851. {
  852. 'timestamp': 1515743846504,
  853. 'signalStrength': 60,
  854. 'macAddress': '54:a7:03:b5:a5:5e',
  855. 'ssid': 'Z VENTURES 9F',
  856. 'frequency': 2457,
  857. 'channel': 0
  858. },
  859. {
  860. 'timestamp': 1515743846504,
  861. 'signalStrength': 60,
  862. 'macAddress': 'ec:f8:eb:b6:89:81',
  863. 'ssid': 'Z VENTURES 9F',
  864. 'frequency': 2457,
  865. 'channel': 0
  866. },
  867. {
  868. 'timestamp': 1515743846504,
  869. 'signalStrength': 60,
  870. 'macAddress': 'a0:69:d9:c1:6c:8b',
  871. 'ssid': 'Z VENTURES 9F',
  872. 'frequency': 2457,
  873. 'channel': 0
  874. },
  875. {
  876. 'timestamp': 1515743846504,
  877. 'signalStrength': 60,
  878. 'macAddress': '64:64:4a:d5:f5:ca',
  879. 'ssid': 'Z VENTURES 9F',
  880. 'frequency': 2457,
  881. 'channel': 0
  882. },
  883. {
  884. 'timestamp': 1515743846504,
  885. 'signalStrength': 60,
  886. 'macAddress': 'c8:5b:a0:f8:f4:6c',
  887. 'ssid': 'Z VENTURES 9F',
  888. 'frequency': 2457,
  889. 'channel': 0
  890. },
  891. {
  892. 'timestamp': 1515743846504,
  893. 'signalStrength': 60,
  894. 'macAddress': '84:74:60:bf:fd:20',
  895. 'ssid': 'Z VENTURES 9F',
  896. 'frequency': 2457,
  897. 'channel': 0
  898. }
  899. ]
  900. }
  901. }");
  902. // 将对象转换为 JSON 字符串
  903. var jsonData = JsonConvert.SerializeObject(data);
  904. // 创建 StringContent 对象,将 JSON 字符串传递给它
  905. var content = new StringContent(jsonData, Encoding.UTF8, "application/json");
  906. using var httpClient = new HttpClient();
  907. using var request = new HttpRequestMessage
  908. {
  909. RequestUri = new Uri($"https://api.newayz.com/location/hub/v1/track_points?access_key=OK73jLa48dU9V3EvNN1RLpdjcVOdIEcn&field_masks=location.position"),
  910. Method = HttpMethod.Post,
  911. // Version = new Version(2, 0)
  912. Content= content
  913. };
  914. using var tokenResponse = await httpClient.SendAsync(request);
  915. tokenResponse.EnsureSuccessStatusCode();
  916. var token = await tokenResponse.Content.ReadAsStringAsync();
  917. Console.WriteLine($"Successfully authenticated. token {token}");
  918. }
  919. }
  920. private IConsumer<Ignore, string> CreateKafkaConsumer()
  921. {
  922. //var partitionIndex = 0;
  923. var collection = new List<int>();
  924. // 获取注册的kafka消费者
  925. var consumerConfig = new ConsumerConfig
  926. {
  927. GroupId = "iot.position",
  928. BootstrapServers = _configService.MqServerAddress,
  929. AutoOffsetReset = AutoOffsetReset.Earliest,
  930. EnableAutoCommit = false, // 关闭自动提交偏移量
  931. CancellationDelayMaxMs = 1//set CancellationDelayMaxMs
  932. };
  933. return new ConsumerBuilder<Ignore, string>(consumerConfig)
  934. .SetErrorHandler((_, e) =>
  935. {
  936. Console.WriteLine($"消费者创建出错,代码:{e.Code} |原因: {e.Reason}");
  937. _logger.LogInformation($"消费者创建出错,代码:{e.Code} |原因: {e.Reason}");
  938. })
  939. .SetPartitionsAssignedHandler((c, partitions) =>
  940. {
  941. //// 在这里手动指定要消费的分区
  942. //var partitionsToConsume = new List<TopicPartitionOffset>
  943. //{
  944. // new TopicPartitionOffset("topics.storage.test_env_db", partitionIndex, Offset.Unset)
  945. //};
  946. ////c.Assign(partitionsToConsume);
  947. //Console.WriteLine($"Assigned partitions: {string.Join(", ", partitionsToConsume)}");
  948. //return partitionsToConsume;
  949. })
  950. .SetPartitionsRevokedHandler((c, partitions) =>
  951. {
  952. })
  953. .Build();
  954. }
  955. }
  956. }