Nelze vybrat více než 25 témat Téma musí začínat písmenem nebo číslem, může obsahovat pomlčky („-“) a může být dlouhé až 35 znaků.

83 lines
2.6KB

  1. using Confluent.Kafka;
  2. using GpsCardGatewayPosition.Model.Config;
  3. using GpsCardGatewayPosition.Service.MqProducer.Model;
  4. using Microsoft.Extensions.Logging;
  5. using Microsoft.Extensions.Options;
  6. using Newtonsoft.Json;
  7. using System;
  8. using System.Collections.Generic;
  9. using System.Linq;
  10. using System.Reflection.PortableExecutable;
  11. using System.Text;
  12. using System.Threading.Tasks;
  13. namespace GpsCardGatewayPosition.Service.MqProducer
  14. {
  15. /// <summary>
  16. /// 消息生产者
  17. /// </summary>
  18. public class MessageProducer
  19. {
  20. private IProducer<Null, string> _producer;
  21. private readonly ServiceConfig _configService;
  22. private readonly ILogger<MessageProducer> _logger;
  23. public MessageProducer(IOptions<ServiceConfig> _optConfigService, ILogger<MessageProducer> logger)
  24. {
  25. _configService = _optConfigService.Value;
  26. _logger = logger;
  27. var config = new ProducerConfig
  28. {
  29. BootstrapServers = _configService.MqServerAddress,
  30. EnableIdempotence = true,
  31. Acks = Acks.All,
  32. //LingerMs=5000,
  33. //BatchNumMessages =1000,
  34. MessageSendMaxRetries = 3
  35. };
  36. _producer = new ProducerBuilder<Null, string>(config).Build();
  37. }
  38. public Headers CreateHeader(Dictionary<string, int> pair = null)
  39. {
  40. if (pair == null)
  41. {
  42. return null;
  43. }
  44. else
  45. {
  46. Headers headers = new Headers();
  47. foreach (var item in pair)
  48. {
  49. headers.Add(item.Key, BitConverter.GetBytes(item.Value));
  50. }
  51. return headers;
  52. }
  53. }
  54. // public async Task ProduceAsync(string topic, object message, Headers header = null)
  55. public async Task ProduceAsync(List<TopicModel> topic, object message)
  56. {
  57. try
  58. {
  59. foreach (var item in topic)
  60. {
  61. // producer = new ProducerBuilder<Null, string>(config).Build();
  62. await _producer.ProduceAsync(item.Topic, new Message<Null, string>
  63. {
  64. Headers = item.Headers,
  65. Value = JsonConvert.SerializeObject(message)
  66. }).ConfigureAwait(false);
  67. }
  68. }
  69. catch (ProduceException<Null, string> ex)
  70. {
  71. _logger.LogError($"推送到kafka失败,topic: {topic},\n message:{JsonConvert.SerializeObject(message)}: \n{ex.Error.Reason}");
  72. }
  73. }
  74. }
  75. }