|
- using Microsoft.Extensions.Logging;
- using Newtonsoft.Json;
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Text;
- using System.Threading.Tasks;
- using TelpoPush.Models.Dto;
- using TelpoPush.Models.Enum;
- using TelpoPush.Models.MqTemplates;
- using TelpoPush.Models.PushTemplates;
-
- namespace TelpoPush.Service.Mq.Kafka
- {
- public class MqProcessMessage
- {
- private readonly ILogger<MqProcessMessage> _logger;
- private readonly MessageProducer _messageProducer;
-
- public MqProcessMessage(ILogger<MqProcessMessage> logger, MessageProducer producer)
- {
- _logger = logger;
- _messageProducer = producer;
- }
- public async Task ProcessWxAlarm(WxModel model, string timeString)
- {
- List<TopicModel> ls = new List<TopicModel>();
-
- ls.Add(new TopicModel()
- {
- Topic = "topic.push.wx",
- Headers = _messageProducer.CreateHeader(new Dictionary<string, int>
- {
- {MqHeader.DataType,(int)MqDataType.AlarmInfo },
- })
- });
- await _messageProducer.ProduceAsync(ls, new
- {
- messageId = string.Format("{0:yyyyMMddHHmmssffff}", DateTime.Now),
- topic = string.Join(",", ls.Select(e => e.Topic)), //MqTopic.Wx,
- time = timeString,
- data = model
- });
- _logger.LogInformation($"【成功】Third推送(topic.wx):IMEI<{model.imei}>,pushData:{JsonConvert.SerializeObject(model)}");
- }
-
- public async Task ProcessProperty(string imei, BaseModel model, HeadersDto headers)
- {
- List<TopicModel> ls = new List<TopicModel>();
-
- ls.Add(new TopicModel()
- {
- Topic = "topic.push.property",
- Headers = _messageProducer.CreateHeader(new Dictionary<string, int>
- {
- {MqHeader.DataType,headers.DataType.Value }
- })
- });
- await _messageProducer.ProduceAsync(ls, model);
- _logger.LogInformation($"【成功】Third推送(topic.property):IMEI<{imei}>,pushData:{JsonConvert.SerializeObject(model)}");
- }
-
- public async Task ProcessDataPushServer(string imei, object model, Dictionary<string, int> headers, string tag)
- {
- List<TopicModel> ls = new List<TopicModel>();
- ls.Add(new TopicModel()
- {
- Topic = "topic.push",
- Headers = _messageProducer.CreateHeader(headers)
- });
- await _messageProducer.ProduceAsync(ls, model);
- _logger.LogInformation($"【{tag}-成功】Third推送(topic.push):IMEI<{imei}>,Header<{JsonConvert.SerializeObject(headers)}>,pushData:{JsonConvert.SerializeObject(model)}");
- }
-
- public async Task ProcessThirdhServer(string imei, object model, Dictionary<string, int> headers, string tag)
- {
- List<TopicModel> ls = new List<TopicModel>();
- ls.Add(new TopicModel()
- {
- Topic = "topic.push.third",
- Headers = _messageProducer.CreateHeader(headers)
- });
- await _messageProducer.ProduceAsync(ls, model);
- _logger.LogInformation($"【{tag}-成功】Third推送(topic.push.third):IMEI<{imei}>,Header<{JsonConvert.SerializeObject(headers)}>,pushData:{JsonConvert.SerializeObject(model)}");
- }
- }
- }
|