杨雷 vor 5 Monaten
Ursprung
Commit
e94a6a4b01
1 geänderte Dateien mit 57 neuen und 45 gelöschten Zeilen
  1. +57
    -45
      TelpoPush.Position.Worker/Service/Mq/KafkaService.cs

+ 57
- 45
TelpoPush.Position.Worker/Service/Mq/KafkaService.cs Datei anzeigen

@@ -21,28 +21,35 @@ namespace TelpoPush.Position.Worker.Service.Mq
{
BootstrapServers = _configService.KafkaBootstrapServers,
GroupId = _configService.KafkaGroupId,
EnableAutoCommit = false,
StatisticsIntervalMs = 5000,
SessionTimeoutMs = 6000,
AutoOffsetReset = AutoOffsetReset.Earliest,
EnablePartitionEof = true,
CancellationDelayMaxMs=1
EnableAutoCommit = false, // 禁止AutoCommit
Acks = Acks.Leader, // 假设只需要Leader响应即可
AutoOffsetReset = AutoOffsetReset.Earliest,// 从最早的开始消费起
CancellationDelayMaxMs = 1//set CancellationDelayMaxMs
};
}


//_consumerConfig = new ConsumerConfig
//{
// BootstrapServers = _configService.KafkaBootstrapServers,
// GroupId = _configService.KafkaGroupId,
// EnableAutoCommit = false,
// StatisticsIntervalMs = 5000,
// SessionTimeoutMs = 6000,
// AutoOffsetReset = AutoOffsetReset.Earliest,
// EnablePartitionEof = true,
// CancellationDelayMaxMs=1
//};
}
public async Task SubscribeAsync(Action<string, string, Headers> messageFunc, CancellationToken cancellationToken)
{
List<string> topics = _configService.KafkaTopics;
using (var consumer = new ConsumerBuilder<Ignore, string>(_consumerConfig)
.SetErrorHandler((_, e) =>
{
Console.WriteLine($"Error: {e.Reason}");
logger.LogError($"Error: {e.Reason}");
})
.SetStatisticsHandler((_, json) =>
{
Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} > 消息监听中..");
logger.LogInformation($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} > 消息监听中..");
})
.SetPartitionsAssignedHandler((c, partitions) =>
@@ -53,7 +60,6 @@ namespace TelpoPush.Position.Worker.Service.Mq
.SetPartitionsRevokedHandler((c, partitions) =>
{
string partitionsStr = string.Join(", ", partitions);
Console.WriteLine($" - 回收了 kafka 的分区: {partitionsStr}");
logger.LogInformation($" - 回收了 kafka 分区: {partitionsStr}");
})
.Build())
@@ -67,8 +73,14 @@ namespace TelpoPush.Position.Worker.Service.Mq
try
{
var consumeResult = consumer.Consume(cancellationToken);
string topic = consumeResult.Topic;
string messageResult = consumeResult.Message.Value;
Headers headers = consumeResult.Message.Headers;
bool isPartitionEOF = consumeResult.IsPartitionEOF;
var partition = consumeResult.Partition;

int DataType = -1, AlarmType = -1, OperType = -1;
foreach (var item in consumeResult?.Headers)
foreach (var item in headers)
{
if (item.Key == KafkaHeader.DataType)
DataType = BitConverter.ToInt32(item.GetValueBytes(), 0);
@@ -78,54 +90,54 @@ namespace TelpoPush.Position.Worker.Service.Mq
OperType = BitConverter.ToInt32(item.GetValueBytes(), 0);
}
var Headers = new { DataType, AlarmType, OperType };
logger.LogInformation($"Consumed topic '{consumeResult.Topic}', message '{consumeResult.Message?.Value}' , headers '{JsonConvert.SerializeObject(Headers)}', at: '{consumeResult?.TopicPartitionOffset}'.");
if (consumeResult.IsPartitionEOF)
logger.LogInformation($"Consumed topic '{topic}' , message '{messageResult}' , headers '{JsonConvert.SerializeObject(Headers)}', at '{consumeResult?.TopicPartitionOffset}'.");
if (isPartitionEOF)
{
logger.LogInformation($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} 已经到底了:{consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");
Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} 已经到底了:{consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");
logger.LogInformation($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} 已经到底了:{topic}, partition {partition}, offset {consumeResult?.Offset}.");
continue;
}

string messageResult = null;
Headers headers = null;
try
{
messageResult = consumeResult.Message.Value;
headers = consumeResult.Message.Headers;
}
catch (Exception ex)
{
var errorMessage = $" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}【Exception 消息反序列化失败,Value:{consumeResult.Message.Value}】 :{ex.StackTrace?.ToString()}";
Console.WriteLine(errorMessage);
logger.LogError(errorMessage);
messageResult = null;
}
if (!string.IsNullOrEmpty(messageResult)/* && consumeResult.Offset % commitPeriod == 0*/)
{
string topic = consumeResult.Topic;
if (!string.IsNullOrEmpty(messageResult))
messageFunc(topic, messageResult, headers);
#region 注释
//string messageResult = null;
//Headers headers = null;
//try
//{
// messageResult = consumeResult.Message.Value;
// headers = consumeResult.Message.Headers;
//}
//catch (Exception ex)
//{
// var errorMessage = $" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}【Exception 消息反序列化失败,Value:{consumeResult.Message.Value}】 :{ex.StackTrace?.ToString()}";
// Console.WriteLine(errorMessage);
// logger.LogError(errorMessage);
// messageResult = null;
//}
//if (!string.IsNullOrEmpty(messageResult)/* && consumeResult.Offset % commitPeriod == 0*/)
//{
// string topic = consumeResult.Topic;
// messageFunc(topic, messageResult, headers);

try
{
consumer.Commit(consumeResult);
}
catch (KafkaException e)
{
Console.WriteLine(e.Message);
}
}
// //try
// //{
// // consumer.Commit(consumeResult);
// //}
// //catch (KafkaException e)
// //{
// // Console.WriteLine(e.Message);
// //}
//}
#endregion
}
catch (ConsumeException e)
{
logger.LogError($"Consume error: {e.Error.Reason}");
Console.WriteLine($"Consume error: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
logger.LogError("Closing consumer.");
Console.WriteLine("Closing consumer.");
consumer.Close();
}
}


Laden…
Abbrechen
Speichern