|
- using Confluent.Kafka;
- using Microsoft.Extensions.Options;
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Net;
- using System.Reflection.PortableExecutable;
- using System.Text;
- using System.Threading.Tasks;
- using TelpoPush.Common;
- using TelpoPush.Models.Config;
- using TelpoPush.Service.Mq.Kafka;
-
- namespace TelpoPush.Worker.ThirdSsl.Handlers
- {
- public class KafkaSubscribe
- {
- private readonly ILogger<KafkaSubscribe> _logger;
- private readonly IHostEnvironment _env;
- private readonly IKafkaService _kafkaService;
- private readonly ThirdSslProcess _thirdSslProcess;
- private readonly ServiceConfig _configService;
-
-
- public KafkaSubscribe(
- ILogger<KafkaSubscribe> logger, IHostEnvironment env,
- IKafkaService kafkaService,
- ThirdSslProcess thirdSslProcess)
- {
- _logger = logger;
- _env = env;
- _kafkaService = kafkaService;
- _thirdSslProcess = thirdSslProcess;
- }
- public async Task SubscribeAsync()
- {
-
- //string msg = "{\"MessageTime\":\"2024-05-13 10:29:09.407\",\"TopicName\":\"topic.push.telpo.zkheartrate\",\"MessageId\":\"1715567350000\",\"IMEI\":\"868437060014409\",\"Content\":\"{\\\"heartRates\\\":{\\\"imei\\\":\\\"868437060014409\\\",\\\"agencyid\\\":\\\"77d562ac-dd78-40ea-af6d-224ada9d70dc\\\",\\\"data\\\":[{\\\"time\\\":1715567340,\\\"value\\\":172,\\\"isAnomaly\\\":1},{\\\"time\\\":1715567342,\\\"value\\\":174,\\\"isAnomaly\\\":1},{\\\"time\\\":1715567344,\\\"value\\\":174,\\\"isAnomaly\\\":1},{\\\"time\\\":1715567346,\\\"value\\\":174,\\\"isAnomaly\\\":1},{\\\"time\\\":1715567348,\\\"value\\\":174,\\\"isAnomaly\\\":1},{\\\"time\\\":1715567350,\\\"value\\\":174,\\\"isAnomaly\\\":1}]}}\"}";
- //await _thirdSslProcess.SendSslThird(msg, "", null);
-
- //string msg2 = "{\"MessageTime\":\"2024-05-13 14:10:43.814\",\"TopicName\":\"topic.push.telpo.zkheartrate\",\"MessageId\":\"1715580644000\",\"IMEI\":\"868437060014409\",\"Content\":\"{\\\"anomalyCancel\\\":{\\\"imei\\\":\\\"868437060014409\\\",\\\"agencyid\\\":\\\"77d562ac-dd78-40ea-af6d-224ada9d70dc\\\",\\\"time\\\":1715580644}}\"}";
-
- //await _thirdSslProcess.SendSslThird(msg2, "", null);
-
-
- //await _kafkaService.SubscribeAsync(DoReceive, CancellationToken.None);
-
- LimitedConcurrencyLevelTaskScheduler lcts = new LimitedConcurrencyLevelTaskScheduler(5);
- TaskFactory factory = new TaskFactory(lcts);
- try
- {
- await factory.StartNew(() =>
- {
- _kafkaService.SubscribeAsync(DoReceive, CancellationToken.None);
- });
- }
- catch (Exception ex)
- {
- _logger.LogError($"Subscribe 处理Kafka数据发生异常 {ex.Message}|{ex.Source}|{ex.StackTrace}");
- }
- }
- async void DoReceive(string topic, string message, Headers headers)
- {
- await _thirdSslProcess.SendSslThird(message, topic, headers);
- }
- }
- }
|