Procházet zdrojové kódy

添加项目文件。

master
H Vs před 6 měsíci
rodič
revize
9f2642d415
34 změnil soubory, kde provedl 1850 přidání a 0 odebrání
  1. +30
    -0
      .dockerignore
  2. +37
    -0
      TelpoKafkaConsole.Common/CustomizeStopWatch.cs
  3. +13
    -0
      TelpoKafkaConsole.Common/TelpoKafkaConsole.Common.csproj
  4. +17
    -0
      TelpoKafkaConsole.Model/ConsumerAcls.cs
  5. +16
    -0
      TelpoKafkaConsole.Model/ProducerAcls.cs
  6. +20
    -0
      TelpoKafkaConsole.Model/ScramCredentialsUser.cs
  7. +20
    -0
      TelpoKafkaConsole.Model/ServiceConfig.cs
  8. +11
    -0
      TelpoKafkaConsole.Model/TelpoKafkaConsole.Model.csproj
  9. +22
    -0
      TelpoKafkaConsole.Model/UserAcls.cs
  10. +387
    -0
      TelpoKafkaConsole.Service/KafkaAdminService.cs
  11. +19
    -0
      TelpoKafkaConsole.Service/TelpoKafkaConsole.Service.csproj
  12. +26
    -0
      TelpoKafkaConsole.WebApi/Configs/ThreadInfoEnricher.cs
  13. +179
    -0
      TelpoKafkaConsole.WebApi/Controllers/AclsController.cs
  14. +74
    -0
      TelpoKafkaConsole.WebApi/Controllers/Api/ApiResponse.cs
  15. +149
    -0
      TelpoKafkaConsole.WebApi/Controllers/ScramAclsController.cs
  16. +89
    -0
      TelpoKafkaConsole.WebApi/Controllers/ScramCredentialsUserController.cs
  17. +40
    -0
      TelpoKafkaConsole.WebApi/Controllers/TopicController.cs
  18. +74
    -0
      TelpoKafkaConsole.WebApi/Controllers/WeatherForecastController.cs
  19. +36
    -0
      TelpoKafkaConsole.WebApi/Dockerfile
  20. +172
    -0
      TelpoKafkaConsole.WebApi/Middleware/LoggingMiddleware.cs
  21. +11
    -0
      TelpoKafkaConsole.WebApi/Model/Request/AclsReq.cs
  22. +15
    -0
      TelpoKafkaConsole.WebApi/Model/Request/ScramAclsConsumerReq.cs
  23. +14
    -0
      TelpoKafkaConsole.WebApi/Model/Request/ScramAclsProducerReq.cs
  24. +11
    -0
      TelpoKafkaConsole.WebApi/Model/Request/TopicReq.cs
  25. +9
    -0
      TelpoKafkaConsole.WebApi/Model/Request/UserReq.cs
  26. +65
    -0
      TelpoKafkaConsole.WebApi/Program.cs
  27. +40
    -0
      TelpoKafkaConsole.WebApi/Properties/launchSettings.json
  28. +25
    -0
      TelpoKafkaConsole.WebApi/TelpoKafkaConsole.WebApi.csproj
  29. +13
    -0
      TelpoKafkaConsole.WebApi/WeatherForecast.cs
  30. +14
    -0
      TelpoKafkaConsole.WebApi/appsettings.Development.json
  31. +118
    -0
      TelpoKafkaConsole.WebApi/appsettings.json
  32. +12
    -0
      TelpoKafkaConsole.WebApi/appsettings.test.json
  33. +43
    -0
      TelpoKafkaConsole.sln
  34. +29
    -0
      pem/ca-root-test.pem

+ 30
- 0
.dockerignore Zobrazit soubor

@@ -0,0 +1,30 @@
**/.classpath
**/.dockerignore
**/.env
**/.git
**/.gitignore
**/.project
**/.settings
**/.toolstarget
**/.vs
**/.vscode
**/*.*proj.user
**/*.dbmdl
**/*.jfm
**/azds.yaml
**/bin
**/charts
**/docker-compose*
**/Dockerfile*
**/node_modules
**/npm-debug.log
**/obj
**/secrets.dev.yaml
**/values.dev.yaml
LICENSE
README.md
!**/.gitignore
!.git/HEAD
!.git/config
!.git/packed-refs
!.git/refs/heads/**

+ 37
- 0
TelpoKafkaConsole.Common/CustomizeStopWatch.cs Zobrazit soubor

@@ -0,0 +1,37 @@
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace TelpoKafkaConsole.Common
{
public class CustomizeStopWatch : IDisposable
{
private readonly Stopwatch _sw;
private readonly string _domain;
private readonly ILogger _logger;

public string Content { get; set; } = default!;

public CustomizeStopWatch(string domain, ILogger logger)
{
_domain = domain;
_logger = logger;

_sw = new Stopwatch();
_sw.Start();
}

public void Dispose()
{
if (_sw != null)
{
_logger.LogInformation($"统计时间[{_domain}],耗时 {_sw.Elapsed.TotalMilliseconds} 毫秒 {Content}");
_sw.Stop();
}
}
}
}

+ 13
- 0
TelpoKafkaConsole.Common/TelpoKafkaConsole.Common.csproj Zobrazit soubor

@@ -0,0 +1,13 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="3.1.2" />
</ItemGroup>

</Project>

+ 17
- 0
TelpoKafkaConsole.Model/ConsumerAcls.cs Zobrazit soubor

@@ -0,0 +1,17 @@
using Confluent.Kafka.Admin;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace TelpoKafkaConsole.Model
{
public class ConsumerAcls:UserAcls
{
public string Group { get; set; } = default!;
public override AclOperation Operation { get; set; } = AclOperation.Read;

public override AclPermissionType Permission { get; set; } = AclPermissionType.Allow;
}
}

+ 16
- 0
TelpoKafkaConsole.Model/ProducerAcls.cs Zobrazit soubor

@@ -0,0 +1,16 @@
using Confluent.Kafka.Admin;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace TelpoKafkaConsole.Model
{
public class ProducerAcls:UserAcls
{
public override AclOperation Operation { get; set; } = AclOperation.Write;

public override AclPermissionType Permission { get; set; } = AclPermissionType.Allow;
}
}

+ 20
- 0
TelpoKafkaConsole.Model/ScramCredentialsUser.cs Zobrazit soubor

@@ -0,0 +1,20 @@
using Confluent.Kafka.Admin;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace TelpoKafkaConsole.Model
{
public class ScramCredentialsUser
{
public string Name { get; set; } = default!;

public string Password { get; set; } = string.Empty;

public ScramMechanism Mechanism { get; set; } = ScramMechanism.ScramSha256;

public int Iterations { get; set; } = 8192;
}
}

+ 20
- 0
TelpoKafkaConsole.Model/ServiceConfig.cs Zobrazit soubor

@@ -0,0 +1,20 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace TelpoKafkaConsole.Model
{
public class ServiceConfig
{
/// <summary>
/// Kafka服务地址
/// </summary>
public string KafkaServerAddress { get; set; } = default!;

public string KafkaServerPEMLocation { get; set;} = default!;

//public int TopicPartitionsNum { get; set; }
}
}

+ 11
- 0
TelpoKafkaConsole.Model/TelpoKafkaConsole.Model.csproj Zobrazit soubor

@@ -0,0 +1,11 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="2.4.0" />
</ItemGroup>
</Project>

+ 22
- 0
TelpoKafkaConsole.Model/UserAcls.cs Zobrazit soubor

@@ -0,0 +1,22 @@
using Confluent.Kafka.Admin;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace TelpoKafkaConsole.Model
{
public class UserAcls
{
public string UserName { get; set; } = default!;

public string Topic { get; set; } = default!;

//public string Group { get; set; } = default!;

public virtual AclOperation Operation { get; set; } = AclOperation.Any;

public virtual AclPermissionType Permission { get; set; } = AclPermissionType.Unknown;
}
}

+ 387
- 0
TelpoKafkaConsole.Service/KafkaAdminService.cs Zobrazit soubor

@@ -0,0 +1,387 @@
using Confluent.Kafka;
using Confluent.Kafka.Admin;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using TelpoKafkaConsole.Model;
using static System.Net.Mime.MediaTypeNames;

namespace TelpoKafkaConsole.Service
{
public class KafkaAdminService
{
private readonly ILogger<KafkaAdminService> _logger;
private readonly ServiceConfig _configService;
public IAdminClient _adminClient;
public KafkaAdminService(ILogger<KafkaAdminService> logger, IOptions<ServiceConfig> _optConfigService)
{
_logger = logger;
_configService = _optConfigService.Value;
_adminClient = new AdminClientBuilder(new AdminClientConfig
{
BootstrapServers = _configService.KafkaServerAddress,
SecurityProtocol = SecurityProtocol.SaslSsl,
SaslMechanism = SaslMechanism.ScramSha256,
SaslUsername = "superuser",
SaslPassword = "password",
SslCaLocation = _configService.KafkaServerPEMLocation
// Add any other configuration options as needed
}).Build();
}

public List<GroupInfo> ListGroups()
{

try
{
var groups = _adminClient.ListGroups(TimeSpan.FromSeconds(10));
return groups;
}
catch (Exception ex)
{
throw new Exception(ex.Message);
}
}

#region UserScramCredentials

public async Task<List<UserScramCredentialsDescription>> DescribeUserScramCredentialsAsync(IEnumerable<string> users)
{
try
{

var timeout = TimeSpan.FromSeconds(10);
var descResult = await _adminClient.DescribeUserScramCredentialsAsync(users, new DescribeUserScramCredentialsOptions() { RequestTimeout = timeout });
return descResult.UserScramCredentialsDescriptions;
//foreach (var description in descResult.UserScramCredentialsDescriptions)
//{
// Console.WriteLine($" User: {description.User}");
// foreach (var scramCredentialInfo in description.ScramCredentialInfos)
// {
// Console.WriteLine($" Mechanism: {scramCredentialInfo.Mechanism}");
// Console.WriteLine($" Iterations: {scramCredentialInfo.Iterations}");
// }
//}
}
catch (DescribeUserScramCredentialsException e)
{
if (e.Error.Code.ToString().Equals("Local_Partial")
&& e.Results.UserScramCredentialsDescriptions.Count == 1
&& e.Results.UserScramCredentialsDescriptions.First().ScramCredentialInfos.Count == 0
)
{
return new List<UserScramCredentialsDescription>();
}
else
{
var errMsg = $"An error occurred describing user SCRAM credentials for some users:\n";

foreach (var description in e.Results.UserScramCredentialsDescriptions)
{
errMsg += ($"User: {description.User} -- Error: {description.Error}\n");
if (!description.Error.IsError)
{
foreach (var scramCredentialInfo in description.ScramCredentialInfos)
{
errMsg += ($"Mechanism: {scramCredentialInfo.Mechanism} -- Iterations: {scramCredentialInfo.Iterations}\n");
}
}
}
throw new Exception(errMsg);
}

}
catch (KafkaException e)
{
// _logger.LogError($"An error occurred describing user SCRAM credentials: {e}");
throw new KafkaException(e.Error);

}
catch (Exception ex)
{
throw new Exception(ex.Message);
}
}

public async Task AlterUserScramCredentialsAsync(ScramCredentialsUser scramUser, string Action = "UPSERT")
{
var alterations = new List<UserScramCredentialAlteration>();
string user = scramUser.Name;
var mechanism = scramUser.Mechanism;
var iterations = scramUser.Iterations;
var password = Encoding.UTF8.GetBytes(scramUser.Password);
if (Action.Equals("DELETE"))
{
alterations.Add(new UserScramCredentialDeletion
{
User = user,
Mechanism = mechanism,
}
);
}
else
{
alterations.Add(new UserScramCredentialUpsertion
{
User = user,
ScramCredentialInfo = new ScramCredentialInfo
{
Mechanism = mechanism,
Iterations = iterations,
},
Password = password,
// Salt = salt,
}
);
}
var timeout = TimeSpan.FromSeconds(30);
try
{
await _adminClient.AlterUserScramCredentialsAsync(alterations,new AlterUserScramCredentialsOptions() { RequestTimeout = timeout });
//_logger.LogError("All AlterUserScramCredentials operations completed successfully");
}
catch (AlterUserScramCredentialsException e)
{
var errMsg = ($"An error occurred altering user SCRAM credentials for some users:");
foreach (var result in e.Results)
{
errMsg += ($"User: {result.User} -- Error: {result.Error}\n");
}
throw new Exception( errMsg );
}
catch (KafkaException e)
{
//_logger.LogError($"An error occurred altering user SCRAM credentials: {e}");
throw new KafkaException( e.Error );
}
catch (Exception ex)
{

//_logger.LogError(ex.Message);
throw new Exception(ex.Message);
}
}

#endregion

#region ACLs
public async Task<List<AclBinding>> DescribeAclsAsync()
{
//var name = "testtopic";//"";
//var principal = "demo-consumer";
var host = "*";
List<AclBinding> ParseAclBindings = new()
{
new() {
Pattern = new ResourcePattern
{
Type = Confluent.Kafka.Admin.ResourceType.Any,//resourceType,
//Name = "demo-orders",
ResourcePatternType = ResourcePatternType.Any//resourcePatternType
},
Entry = new AccessControlEntry
{
//Principal ="User:demo-consumer",
Host = host,
Operation = AclOperation.Any,//operation,
PermissionType = AclPermissionType.Any//permissionType
}
}
};
List<AclBindingFilter> aclBindingFilters = ParseAclBindings.Select(aclBinding => aclBinding.ToFilter()).ToList();
try
{
var result = await _adminClient.DescribeAclsAsync(aclBindingFilters[0]);
return result.AclBindings;
}
catch (DescribeAclsException e)
{
//_logger.LogError($"An error occurred in describe ACLs operation: Code: {e.Result.Error.Code}" +
// $", Reason: {e.Result.Error.Reason}");
throw new Exception($"An error occurred in describe ACLs operation: Code: {e.Result.Error.Code}, Reason: {e.Result.Error.Reason}");
}
catch (KafkaException e)
{
throw new KafkaException(e.Error);
}
catch (Exception ex)
{

//_logger.LogError(ex.Message);
throw new Exception(ex.Message);
}

}

public async Task CreateAclsAsync(List<AclBinding> aclBindings)
{
try
{
await _adminClient.CreateAclsAsync(aclBindings);
_logger.LogInformation("All create ACL operations completed successfully");
}
catch (CreateAclsException e)
{
var errMsg = ("One or more create ACL operations failed.\n");
for (int i = 0; i < e.Results.Count; ++i)
{
var result = e.Results[i];
if (!result.Error.IsError)
{
errMsg += ($"Create ACLs operation {i} completed successfully\n");
}
else
{
errMsg += ($"An error occurred in create ACL operation {i}: Code: {result.Error.Code}" +
$", Reason: {result.Error.Reason}\n");
}
}
throw new Exception(errMsg);
}
catch (KafkaException e)
{
//_logger.LogError($"An error occurred calling the CreateAcls operation: {e.Message}");
throw new Exception($"An error occurred calling the CreateAcls operation: {e.Message}");
}
catch (Exception ex)
{

//_logger.LogError(ex.Message);
throw new Exception(ex.Message);
}
}

public async Task DeleteAclsAsync(List<AclBinding> aclBindings)
{

List<AclBindingFilter> aclBindingFilters = aclBindings.Select(aclBinding => aclBinding.ToFilter()).ToList();
try
{
var results = await _adminClient.DeleteAclsAsync(aclBindingFilters);
int i = 0;
foreach (var result in results)
{
_logger.LogInformation($"Deleted ACLs in operation {i}");
// PrintAclBindings(result.AclBindings);
++i;
}
}
catch (DeleteAclsException e)
{
var errMsg = ("One or more create ACL operations failed.\n");
for (int i = 0; i < e.Results.Count; ++i)
{
var result = e.Results[i];
if (!result.Error.IsError)
{
errMsg += ($"Deleted ACLs in operation {i}\n");
// PrintAclBindings(result.AclBindings);
}
else
{
errMsg += ($"An error occurred in delete ACL operation {i}: Code: {result.Error.Code}" +
$", Reason: {result.Error.Reason}\n");
}
}

throw new Exception (errMsg);
}
catch (KafkaException e)
{
throw new KafkaException(e.Error);
}
catch (Exception ex)
{

//_logger.LogError(ex.Message);
throw new Exception(ex.Message);
}


}
#endregion

#region Topics
public async Task CreateTopic(string topicName, TimeSpan retentionTime,int numPartitions=1)
{
try
{
var configEntries = new Dictionary<string, string>
{
{ "retention.ms", ((int)retentionTime.TotalMilliseconds).ToString() }
};

await _adminClient.CreateTopicsAsync(
new TopicSpecification[] {
new() {
Name = topicName,
ReplicationFactor = 1,
NumPartitions = numPartitions,
Configs = configEntries
}
});
}
catch (CreateTopicsException e)
{
throw new Exception($"An error occurred creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}");
}
}
public async Task DeleteTopics(IEnumerable<string> topicNames)
{
try
{
await _adminClient.DeleteTopicsAsync(topicNames);
}
catch (DeleteTopicsException e)
{
throw new Exception($"An error occurred deleting topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}");
}
}
public async Task<List<TopicDescription>> DescribeTopicsAsync(IEnumerable<string> topicNames)
{
try
{
// var topicCollection = TopicCollection.OfTopicNames(new[] { topicName });
var topicCollection = TopicCollection.OfTopicNames(topicNames);
var topicDescriptions = await _adminClient.DescribeTopicsAsync(topicCollection);
return topicDescriptions.TopicDescriptions;
//foreach (var topicDescription in topicDescriptions.TopicDescriptions)
//{
// Console.WriteLine($"Topic: {topicDescription.Name}");
// foreach (var partition in topicDescription.Partitions)
// {
// Console.WriteLine($"Partition: {partition.Partition}, Leader: {partition.Leader}, Replicas: {string.Join(",", partition.Replicas)}, Isr: {string.Join(",", partition.ISR)}");
// }
//}
}
catch (DescribeTopicsException e)
{
if (e.Error.Code.ToString().Equals("Local_Partial")
&& e.Results.TopicDescriptions.First().Error.Code.ToString().Equals("UnknownTopicOrPart")
&& e.Results.TopicDescriptions.First().Name.Equals(topicNames.First())
)
{
return new List<TopicDescription>();
}
else
{
throw new Exception($"An error occurred describing topic {e.Message}");

}
// throw new Exception($"An error occurred describing topic {e.Message}");
}
}

#endregion


}
}

+ 19
- 0
TelpoKafkaConsole.Service/TelpoKafkaConsole.Service.csproj Zobrazit soubor

@@ -0,0 +1,19 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="3.1.2" />
<PackageReference Include="Microsoft.Extensions.Options" Version="3.1.2" />
<PackageReference Include="Confluent.Kafka" Version="2.4.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\TelpoKafkaConsole.Model\TelpoKafkaConsole.Model.csproj" />
</ItemGroup>

</Project>

+ 26
- 0
TelpoKafkaConsole.WebApi/Configs/ThreadInfoEnricher.cs Zobrazit soubor

@@ -0,0 +1,26 @@
using Serilog.Configuration;
using Serilog.Core;
using Serilog.Events;
using Serilog;

namespace TelpoKafkaConsole.WebApi.Configs
{
public class ThreadInfoEnricher : ILogEventEnricher
{
public void Enrich(LogEvent logEvent, ILogEventPropertyFactory propertyFactory)
{

logEvent.AddPropertyIfAbsent(propertyFactory.CreateProperty("ThreadId", Thread.CurrentThread.ManagedThreadId));
}
}

public static class EnricherExtensions
{
public static LoggerConfiguration WithThreadInfo(this LoggerEnrichmentConfiguration enrich)
{
if (enrich == null)
throw new ArgumentNullException(nameof(enrich));
return enrich.With<ThreadInfoEnricher>();
}
}
}

+ 179
- 0
TelpoKafkaConsole.WebApi/Controllers/AclsController.cs Zobrazit soubor

@@ -0,0 +1,179 @@
using Confluent.Kafka.Admin;
using Microsoft.AspNetCore.Mvc;
using TelpoKafkaConsole.Service;
using TelpoKafkaConsole.WebApi.Controllers.Api;
using TelpoKafkaConsole.WebApi.Model.Request;

// For more information on enabling Web API for empty projects, visit https://go.microsoft.com/fwlink/?LinkID=397860

namespace TelpoKafkaConsole.WebApi.Controllers
{
[Route("api/[controller]/[action]")]
[ApiController]
public class AclsController : ControllerBase
{
private readonly KafkaAdminService _servicekafkaAdmin;
public AclsController(KafkaAdminService kafkaAdminService)
{
_servicekafkaAdmin = kafkaAdminService;
}
// GET: api/<AclsController>
[HttpGet]
public async Task<ApiResponse<List<AclBinding>>> Get()
{
var acls = await _servicekafkaAdmin.DescribeAclsAsync();
return ApiResponse<List<AclBinding>>.Success(acls);
}

// GET api/<AclsController>/5
[HttpGet("{username}")]
public async Task<ApiResponse<IEnumerable<AclBinding>>> Get(string username)
{

var acls = await _servicekafkaAdmin.DescribeAclsAsync();
return ApiResponse<IEnumerable<AclBinding>>.Success(acls.Where(i => i.Entry.Principal.EndsWith(username)));
}

// POST api/<AclsController>
[HttpPost]
public async Task<ApiResponse<string>> Post([FromBody] AclsReq aclsReq)
{
List<AclBinding> aclBindings = new();
// 生产者
if (string.IsNullOrEmpty(aclsReq.Group))
{
aclBindings.Add(new AclBinding()
{
Pattern = new ResourcePattern
{
Type = ResourceType.Topic,
Name = aclsReq.Topic,
ResourcePatternType = ResourcePatternType.Literal
},
Entry = new AccessControlEntry
{
Principal = $"User:{aclsReq.UserName}",
Host = "*",
Operation = AclOperation.Write,
PermissionType = AclPermissionType.Allow
}
});
}
// 消费者
else
{
aclBindings.Add(new AclBinding()
{
Pattern = new ResourcePattern
{
Type = ResourceType.Group,
Name = aclsReq.Group,
ResourcePatternType = ResourcePatternType.Literal
},
Entry = new AccessControlEntry
{
Principal = $"User:{aclsReq.UserName}",
Host = "*",
Operation = AclOperation.Read,
PermissionType = AclPermissionType.Allow
}
});
aclBindings.Add(new AclBinding()
{
Pattern = new ResourcePattern
{
Type = ResourceType.Topic,
Name = aclsReq.Topic,
ResourcePatternType = ResourcePatternType.Literal
},
Entry = new AccessControlEntry
{
Principal = $"User:{aclsReq.UserName}",
Host = "*",
Operation = AclOperation.Read,
PermissionType = AclPermissionType.Allow
}
});
}

// - Group: {aclsReq.Group}
await _servicekafkaAdmin.CreateAclsAsync(aclBindings);
var operation = string.IsNullOrEmpty(aclsReq.Group) ? "写" : "读";
var group = string.IsNullOrEmpty(aclsReq.Group) ? "" : $"Group:{aclsReq.Group} - ";

return ApiResponse<string>.Success($"创建 ACLs 规则 用户:{aclsReq.UserName} - Topic:{aclsReq.Topic} - {group}{operation}权限成功");
}

// DELETE api/<AclsController>/5
[HttpDelete]
public async Task<ApiResponse<string>> DeleteAsync([FromBody] AclsReq aclsReq)
{
List<AclBinding> aclBindings = new();
// 生产者
if (string.IsNullOrEmpty(aclsReq.Group))
{
aclBindings.Add(new AclBinding()
{
Pattern = new ResourcePattern
{
Type = ResourceType.Topic,
Name = aclsReq.Topic,
ResourcePatternType = ResourcePatternType.Literal
},
Entry = new AccessControlEntry
{
Principal = $"User:{aclsReq.UserName}",
Host = "*",
Operation = AclOperation.Any,
PermissionType = AclPermissionType.Any
}
});
}
// 消费者
else
{
aclBindings.Add(new AclBinding()
{
Pattern = new ResourcePattern
{
Type = ResourceType.Group,
Name = aclsReq.Group,
ResourcePatternType = ResourcePatternType.Literal
},
Entry = new AccessControlEntry
{
Principal = $"User:{aclsReq.UserName}",
Host = "*",
Operation = AclOperation.Any,
PermissionType = AclPermissionType.Any
}
});
aclBindings.Add(new AclBinding()
{
Pattern = new ResourcePattern
{
Type = ResourceType.Topic,
Name = aclsReq.Topic,
ResourcePatternType = ResourcePatternType.Literal
},
Entry = new AccessControlEntry
{
Principal = $"User:{aclsReq.UserName}",
Host = "*",
Operation = AclOperation.Read,
PermissionType = AclPermissionType.Allow
}
});
}


await _servicekafkaAdmin.DeleteAclsAsync(aclBindings);

// var operation = string.IsNullOrEmpty(aclsReq.Group) ? "写" : "读";
var group = string.IsNullOrEmpty(aclsReq.Group) ? "" : $"Group:{aclsReq.Group} - ";

return ApiResponse<string>.Success($"删除 ACLs 规则 用户:{aclsReq.UserName} - Topic:{aclsReq.Topic} - {group}所有权限成功");

}
}
}

+ 74
- 0
TelpoKafkaConsole.WebApi/Controllers/Api/ApiResponse.cs Zobrazit soubor

@@ -0,0 +1,74 @@
using Newtonsoft.Json.Serialization;
using Newtonsoft.Json;

namespace TelpoKafkaConsole.WebApi.Controllers.Api
{
public class ApiResponse<T>
{
public string Timestamp { get; set; } = default!;

public T Data { get; set; } = default!;

public Result Result { get; set; } = new Result();

//public bool Succeeded { get; set; }
// public string Message { get; set; } = String.Empty;


public static ApiResponse<T> Fail(int code, string errorMessage) => new()
{
//MsgType = msgType,
//Signature = signature,
Timestamp = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff"),
Result = new()
{
Status = "failed",
Code = code,
Message = errorMessage,
},

};

//public static ApiResponse<T> Success(string msgType,string signature,T data) => new()
//{
// MsgType= msgType,
// Signature = signature,
// Timestamp = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff"),
// Data = data,
// Result = new()
// {
// Status = "succeed",
// Code = 200,
// Message = "请求成功!",
// },
//};
public static ApiResponse<T> Success(T data) => new()
{
Timestamp = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff"),
Data = data,
Result = new()
{
Status = "succeed",
Code = 200,
Message = "请求成功!",
},
};
public string ToJsonString()
{
var settings = new JsonSerializerSettings
{
DateFormatString = "yyyy-MM-dd HH:mm:ss.fff", // 设置日期格式
ContractResolver = new DefaultContractResolver { NamingStrategy = new CamelCaseNamingStrategy() }
};
return JsonConvert.SerializeObject(this, settings);
}
}


public class Result
{
public string Status { get; set; } = default!;
public int Code { get; set; }
public string Message { get; set; } = default!;
}
}

+ 149
- 0
TelpoKafkaConsole.WebApi/Controllers/ScramAclsController.cs Zobrazit soubor

@@ -0,0 +1,149 @@
using Confluent.Kafka.Admin;
using Microsoft.AspNetCore.Mvc;
using TelpoKafkaConsole.Model;
using TelpoKafkaConsole.Service;
using TelpoKafkaConsole.WebApi.Controllers.Api;
using TelpoKafkaConsole.WebApi.Model.Request;
using static Confluent.Kafka.ConfigPropertyNames;

namespace TelpoKafkaConsole.WebApi.Controllers
{
[Route("api/[controller]")]
[ApiController]
public class ScramAclsController : ControllerBase
{
private readonly KafkaAdminService _servicekafkaAdmin;
public ScramAclsController(KafkaAdminService kafkaAdminService) { _servicekafkaAdmin = kafkaAdminService; }

// POST api/<ScramAclsController>/Consumer
[HttpPost("Consumer")] // 添加了路由
public async Task<ApiResponse<string>> Consumer([FromBody] ScramAclsConsumerReq consumer)
{
// 创建用户
ScramCredentialsUser scramUser = new()
{
Name = consumer.Name,
Password = consumer.Password,
};
await _servicekafkaAdmin.AlterUserScramCredentialsAsync(scramUser);
// 创建 topic
var topics = await _servicekafkaAdmin.DescribeTopicsAsync(new List<string> { consumer.Topic });
if (topics.Count.Equals(0))
{
await _servicekafkaAdmin.CreateTopic(consumer.Topic, TimeSpan.FromDays(3), consumer.NumPartitions);
}
// 创建 alcs
List<AclBinding> aclBindings = new()
{
new AclBinding()
{
Pattern = new ResourcePattern
{
Type = ResourceType.Group,
Name = consumer.Group,
ResourcePatternType = ResourcePatternType.Literal
},
Entry = new AccessControlEntry
{
Principal = $"User:{consumer.Name}",
Host = "*",
Operation = AclOperation.Read,
PermissionType = AclPermissionType.Allow
}
},
new AclBinding()
{
Pattern = new ResourcePattern
{
Type = ResourceType.Topic,
Name = consumer.Topic,
ResourcePatternType = ResourcePatternType.Literal
},
Entry = new AccessControlEntry
{
Principal = $"User:{consumer.Name}",
Host = "*",
Operation = AclOperation.Read,
PermissionType = AclPermissionType.Allow
}
}
};
await _servicekafkaAdmin.CreateAclsAsync(aclBindings);


return ApiResponse<string>.Success($"创建 消费者用户 {consumer.Name} Acls 规则成功");
}

// POST api/<ScramAclsController>/Producer
[HttpPost("Producer")] // 添加了路由
public async Task<ApiResponse<string>> Producer([FromBody] ScramAclsProducerReq producer)
{
// 创建用户
ScramCredentialsUser scramUser = new()
{
Name = producer.Name,
Password = producer.Password,
};
await _servicekafkaAdmin.AlterUserScramCredentialsAsync(scramUser);
// 创建 topic
var topics = await _servicekafkaAdmin.DescribeTopicsAsync(new List<string> { producer.Topic });
if (topics.Count.Equals(0))
{
await _servicekafkaAdmin.CreateTopic(producer.Topic, TimeSpan.FromDays(3), producer.NumPartitions);
}
// 创建 alcs
List<AclBinding> aclBindings = new()
{
new AclBinding()
{
Pattern = new ResourcePattern
{
Type = ResourceType.Topic,
Name = producer.Topic,
ResourcePatternType = ResourcePatternType.Literal
},
Entry = new AccessControlEntry
{
Principal = $"User:{producer.Name}",
Host = "*",
Operation = AclOperation.Write,
PermissionType = AclPermissionType.Allow
}
}
};
await _servicekafkaAdmin.CreateAclsAsync(aclBindings);
return ApiResponse<string>.Success($"创建 生产者用户 {producer.Name} Acls 规则成功");
}

// DELETE api/<ScramAclsController>/{username}
[HttpDelete("{username}")]
public async Task<ApiResponse<string>> Delete(string username)
{
// 删除用户
var scramUsers = await _servicekafkaAdmin.DescribeUserScramCredentialsAsync(new List<string>
{
username
});
if (scramUsers.Count==1)
{
ScramCredentialsUser scramUser = new()
{
Name = username
};
await _servicekafkaAdmin.AlterUserScramCredentialsAsync(scramUser, "DELETE");
}
// 删除alcs
var acls = await _servicekafkaAdmin.DescribeAclsAsync();
var userAclsBinding = acls.Where(i => i.Entry.Principal.EndsWith(username)).ToList();
if (userAclsBinding.Count>0)
{
await _servicekafkaAdmin.DeleteAclsAsync(userAclsBinding);
}
return ApiResponse<string>.Success($"删除用户 {username} 和 Acls 规则成功");

}
}
}

+ 89
- 0
TelpoKafkaConsole.WebApi/Controllers/ScramCredentialsUserController.cs Zobrazit soubor

@@ -0,0 +1,89 @@
using Confluent.Kafka;
using Confluent.Kafka.Admin;
using Microsoft.AspNetCore.Mvc;
using Microsoft.OpenApi.Extensions;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using TelpoKafkaConsole.Model;
using TelpoKafkaConsole.Service;
using TelpoKafkaConsole.WebApi.Controllers.Api;
using TelpoKafkaConsole.WebApi.Model.Request;

// For more information on enabling Web API for empty projects, visit https://go.microsoft.com/fwlink/?LinkID=397860

namespace TelpoKafkaConsole.WebApi.Controllers
{
[Route("api/[controller]")]
[ApiController]
public class ScramCredentialsUserController : ControllerBase
{
private readonly KafkaAdminService _servicekafkaAdmin;
public ScramCredentialsUserController(KafkaAdminService kafkaAdminService)
{
_servicekafkaAdmin = kafkaAdminService;
}
// GET: api/<ScramCredentialsUserController>
[HttpGet]
public async Task<ApiResponse<object>> Get()
{
var usersScram = await _servicekafkaAdmin.DescribeUserScramCredentialsAsync(new List<string>());
var users = usersScram
.Select(i => new {i.User, ScramCredentialInfos=i.ScramCredentialInfos
.Select(s=>new { Mechanism=s.Mechanism.GetDisplayName(),s.Iterations }) })
.Where(i=>!i.User.Equals("superuser"));
return ApiResponse<object>.Success(users);
}

// GET api/<ScramCredentialsUserController>/5
[HttpGet("{username}")]
public async Task<ApiResponse<object>> GetAsync(string username)
{
var users = new List<string>
{
username
};
var usersScram = await _servicekafkaAdmin.DescribeUserScramCredentialsAsync(users);



var firstUserScram = usersScram.FirstOrDefault();

var user = new
{
firstUserScram?.User,
ScramCredentialInfos = firstUserScram?.ScramCredentialInfos
.Select(i => new
{
Mechanism = i.Mechanism.GetDisplayName(),
i.Iterations
})
};
return ApiResponse<object>.Success(user);
}

// POST api/<ScramCredentialsUserController>
[HttpPost]
public async Task<ApiResponse<string>> PostAsync([FromBody] UserReq user)
{
ScramCredentialsUser scramUser = new()
{
Name = user.Name,
Password = user.Password,
};
await _servicekafkaAdmin.AlterUserScramCredentialsAsync(scramUser);
return ApiResponse<string>.Success($"创建 Scram 用户{user.Name}成功");
}

[HttpDelete("{username}")]
public async Task<ApiResponse<string>> Delete(string username)
{
ScramCredentialsUser scramUser = new()
{
Name = username
};
await _servicekafkaAdmin.AlterUserScramCredentialsAsync(scramUser,"DELETE");
return ApiResponse<string>.Success($"删除 Scram 用户{username} 成功");
}
}
}

+ 40
- 0
TelpoKafkaConsole.WebApi/Controllers/TopicController.cs Zobrazit soubor

@@ -0,0 +1,40 @@
using Confluent.Kafka.Admin;
using Microsoft.AspNetCore.Mvc;
using TelpoKafkaConsole.Service;
using TelpoKafkaConsole.WebApi.Controllers.Api;
using TelpoKafkaConsole.WebApi.Model.Request;

// For more information on enabling Web API for empty projects, visit https://go.microsoft.com/fwlink/?LinkID=397860

namespace TelpoKafkaConsole.WebApi.Controllers
{
[Route("api/[controller]")]
[ApiController]
public class TopicController : ControllerBase
{
private readonly KafkaAdminService _servicekafkaAdmin;

public TopicController(KafkaAdminService kafkaAdminService) { _servicekafkaAdmin = kafkaAdminService; }
[HttpGet("{topic}")]
public async Task<ApiResponse<List<TopicDescription>>> Get(string topic)
{
var topicInfo = await _servicekafkaAdmin.DescribeTopicsAsync(new List<string>() { topic });
return ApiResponse<List<TopicDescription>>.Success(topicInfo);
}

[HttpPost]
public async Task<ApiResponse<string>> Post([FromBody] TopicReq topic)
{
await _servicekafkaAdmin.CreateTopic(topic.TopicName,TimeSpan.FromDays(3));
return ApiResponse<string>.Success($"创建 Topic {topic} 成功");
}

[HttpDelete("{topic}")]
public async Task<ApiResponse<string>> Delete(string topic)
{
await _servicekafkaAdmin.DeleteTopics(new List<string>() {topic});
return ApiResponse<string>.Success($"删除 Topic {topic} 成功");
}
}
}

+ 74
- 0
TelpoKafkaConsole.WebApi/Controllers/WeatherForecastController.cs Zobrazit soubor

@@ -0,0 +1,74 @@
using Confluent.Kafka;
using Confluent.Kafka.Admin;
using Microsoft.AspNetCore.Mvc;
using TelpoKafkaConsole.Service;
using TelpoKafkaConsole.WebApi.Controllers.Api;

namespace TelpoKafkaConsole.WebApi.Controllers
{
[ApiController]
[Route("[controller]")]
public class WeatherForecastController : ControllerBase
{
private readonly KafkaAdminService _servicekafkaAdmin;

private static readonly string[] Summaries = new[]
{
"Freezing", "Bracing", "Chilly", "Cool", "Mild", "Warm", "Balmy", "Hot", "Sweltering", "Scorching"
};

private readonly ILogger<WeatherForecastController> _logger;

public WeatherForecastController(ILogger<WeatherForecastController> logger, KafkaAdminService kafkaAdminService)
{
_logger = logger;
_servicekafkaAdmin = kafkaAdminService;
}

//[HttpGet(Name = "GetWeatherForecast")]
//public IEnumerable<WeatherForecast> Get()
//{
// return Enumerable.Range(1, 5).Select(index => new WeatherForecast
// {
// Date = DateTime.Now.AddDays(index),
// TemperatureC = Random.Shared.Next(-20, 55),
// Summary = Summaries[Random.Shared.Next(Summaries.Length)]
// })
// .ToArray();
//}
//[HttpGet(Name = "GetGroup")]
//public ApiResponse<List<GroupInfo>> GetGroup()
//{
// var group = _servicekafkaAdmin.ListGroups();
// return ApiResponse<List<GroupInfo>>.Success(group);
//}

[HttpGet(Name = "GetGroup")]
public async Task<ApiResponse<string>> GetGroupAsync()
{
// var group = _servicekafkaAdmin.ListGroups();
List<AclBinding> aclBindings = new()
{
new AclBinding()
{
Pattern = new ResourcePattern
{
Type = ResourceType.Broker,
Name = "kafka-cluster",
ResourcePatternType = ResourcePatternType.Literal
},
Entry = new AccessControlEntry
{
Principal = $"User:telpo-consumer",
Host = "*",
Operation = AclOperation.All,
PermissionType = AclPermissionType.Deny
}
}
};
await _servicekafkaAdmin.CreateAclsAsync(aclBindings);
return ApiResponse<string>.Success("ok");
}
}
}

+ 36
- 0
TelpoKafkaConsole.WebApi/Dockerfile Zobrazit soubor

@@ -0,0 +1,36 @@
#See https://aka.ms/customizecontainer to learn how to customize your debug container and how Visual Studio uses this Dockerfile to build your images for faster debugging.

FROM mcr.microsoft.com/dotnet/aspnet:6.0 AS base
WORKDIR /app
EXPOSE 80

FROM mcr.microsoft.com/dotnet/sdk:6.0 AS build
ARG BUILD_CONFIGURATION=Release
WORKDIR /src
COPY ["TelpoKafkaConsole.WebApi/TelpoKafkaConsole.WebApi.csproj", "TelpoKafkaConsole.WebApi/"]
COPY ["TelpoKafkaConsole.Common/TelpoKafkaConsole.Common.csproj", "TelpoKafkaConsole.Common/"]
COPY ["TelpoKafkaConsole.Service/TelpoKafkaConsole.Service.csproj", "TelpoKafkaConsole.Service/"]
COPY ["TelpoKafkaConsole.Model/TelpoKafkaConsole.Model.csproj", "TelpoKafkaConsole.Model/"]
RUN dotnet restore "./TelpoKafkaConsole.WebApi/./TelpoKafkaConsole.WebApi.csproj"
COPY . .
WORKDIR "/src/TelpoKafkaConsole.WebApi"
RUN dotnet build "./TelpoKafkaConsole.WebApi.csproj" -c $BUILD_CONFIGURATION -o /app/build

FROM build AS publish

ARG BUILD_CONFIGURATION=Release
RUN dotnet publish "./TelpoKafkaConsole.WebApi.csproj" -c $BUILD_CONFIGURATION -o /app/publish /p:UseAppHost=false

FROM base AS final
WORKDIR /app
COPY --from=publish /app/publish .
COPY pem /app/pem

ENV environment=Development
ENV TimeZone=Asia/Shanghai
ENV LANG C.UTF-8
RUN ln -snf /usr/share/zoneinfo/$TimeZone /etc/localtime && echo $TimeZone > /etc/timezone

#ENTRYPOINT ["dotnet", "TelpoKafkaConsole.WebApi.dll"]

ENTRYPOINT ["sh", "-c", "dotnet TelpoKafkaConsole.App.dll --environment=$environment"]

+ 172
- 0
TelpoKafkaConsole.WebApi/Middleware/LoggingMiddleware.cs Zobrazit soubor

@@ -0,0 +1,172 @@
using Confluent.Kafka;
using Microsoft.AspNetCore.Http;
using Newtonsoft.Json;
using System.Net;
using System.Text;
using TelpoKafkaConsole.Common;
using TelpoKafkaConsole.WebApi.Controllers.Api;

namespace TelpoKafkaConsole.WebApi.Middleware
{
public class LoggingMiddleware
{
private readonly RequestDelegate _next;
private readonly ILogger<LoggingMiddleware> _logger;

public LoggingMiddleware(RequestDelegate next, ILogger<LoggingMiddleware> logger)
{
_next = next;
_logger = logger;
}

//public async Task InvokeAsync(HttpContext context)
//{
// //// 在请求处理之前记录日志
// //using (_logger.BeginScope(new Dictionary<string, object> { ["RequestId"] = "" }))
// using (new CustomizeStopWatch(nameof(LoggingMiddleware), _logger))
// {
// var request = await FormatRequest(context.Request);

// _logger.LogInformation(request);

// var originalBodyStream = context.Response.Body;

// using var responseBody = new MemoryStream();
// context.Response.Body = responseBody;

// await _next(context);

// var response = await FormatResponse(context.Response);

// _logger.LogInformation(response);

// await responseBody.CopyToAsync(originalBodyStream);
// }
//}

public async Task InvokeAsync(HttpContext context)
{
//// 在请求处理之前记录日志
//using (_logger.BeginScope(new Dictionary<string, object> { ["RequestId"] = "" }))
using (new CustomizeStopWatch(nameof(LoggingMiddleware), _logger))
{
using var responseBody = new MemoryStream();
var originalBodyStream = context.Response.Body;
try
{

var request = await FormatRequest(context.Request);
_logger.LogInformation(request);
context.Response.Body = responseBody;
await _next(context);
var response = await FormatResponse(context.Response);
_logger.LogInformation(response);
// await responseBody.CopyToAsync(originalBodyStream);
}
catch (Exception ex)
{
await HandleExceptionAsync(context, ex); // 捕获异常了 在HandleExceptionAsync中处理
}

//var response = await FormatResponse(context.Response);
//_logger.LogInformation(response);
await responseBody.CopyToAsync(originalBodyStream);
}
}

private async Task<string> FormatRequest(HttpRequest request)
{
request.EnableBuffering();

var body = await new StreamReader(request.Body).ReadToEndAsync();
var formattedBody = FormatJson(body);

request.Body.Position = 0;

return $"请求: {request.Scheme} {request.Host}{request.Path} {request.QueryString} {formattedBody}";
}

private async Task<string> FormatResponse(HttpResponse response)
{
response.Body.Seek(0, SeekOrigin.Begin);

var body = await new StreamReader(response.Body).ReadToEndAsync();
var formattedBody = FormatJson(body);

response.Body.Seek(0, SeekOrigin.Begin);

return $"响应: {response.StatusCode}: {formattedBody}";
}

private static string FormatJson(string json)
{
if (string.IsNullOrEmpty(json))
{
return string.Empty;
}

try
{
var obj = JsonConvert.DeserializeObject(json);
// return JsonConvert.SerializeObject(obj, Formatting.Indented);
return JsonConvert.SerializeObject(obj);
}
catch
{
return json;
}
}

private async Task HandleExceptionAsync(HttpContext context, Exception exception)
{
context.Response.ContentType = "application/json"; // 返回json 类型
var response = context.Response;

var errorResponse = new ErrorResponse
{
Success = false
}; // 自定义的异常错误信息类型
switch (exception)
{
case ApplicationException ex:
if (ex.Message.Contains("Invalid token"))
{
response.StatusCode = (int)HttpStatusCode.Forbidden;
errorResponse.Message = ex.Message;
break;
}
response.StatusCode = (int)HttpStatusCode.BadRequest;
errorResponse.Message = ex.Message;
break;
case KeyNotFoundException ex:
response.StatusCode = (int)HttpStatusCode.NotFound;
errorResponse.Message = ex.Message;
break;
default:
response.StatusCode = (int)HttpStatusCode.InternalServerError;
errorResponse.Message = "Internal Server errors. Check Logs!";
break;
}

//var apiResponse = ApiResponse<object>.Fail(response.StatusCode, $"{exception.Message}\n{exception.InnerException}\n{exception.StackTrace}");
var apiResponse = ApiResponse<object>.Fail(response.StatusCode, $"{exception.Message}");

var resultJson = JsonConvert.SerializeObject(apiResponse);

var resultBytes = Encoding.UTF8.GetBytes(resultJson);

await response.Body.WriteAsync(resultBytes, 0, resultBytes.Length);

var responseStr = await FormatResponse(context.Response);

_logger.LogError(responseStr);
}

internal class ErrorResponse
{
public bool Success { get; set; }

public string Message { get; set; } = String.Empty;
}
}
}

+ 11
- 0
TelpoKafkaConsole.WebApi/Model/Request/AclsReq.cs Zobrazit soubor

@@ -0,0 +1,11 @@
namespace TelpoKafkaConsole.WebApi.Model.Request
{
public class AclsReq
{
public string UserName { get; set; } = default!;

public string Topic { get; set; } = default!;

public string Group { get; set; } = default!;
}
}

+ 15
- 0
TelpoKafkaConsole.WebApi/Model/Request/ScramAclsConsumerReq.cs Zobrazit soubor

@@ -0,0 +1,15 @@
namespace TelpoKafkaConsole.WebApi.Model.Request
{
public class ScramAclsConsumerReq
{
public string Name { get; set; } = default!;

public string Password { get; set; } = default!;

public string Topic { get; set; } = default!;

public int NumPartitions { get; set; } = 1;

public string Group { get; set; } = default!;
}
}

+ 14
- 0
TelpoKafkaConsole.WebApi/Model/Request/ScramAclsProducerReq.cs Zobrazit soubor

@@ -0,0 +1,14 @@
namespace TelpoKafkaConsole.WebApi.Model.Request
{
public class ScramAclsProducerReq
{
public string Name { get; set; } = default!;

public string Password { get; set; } = default!;

public string Topic { get; set; } = default!;

public int NumPartitions { get; set; } = 1;

}
}

+ 11
- 0
TelpoKafkaConsole.WebApi/Model/Request/TopicReq.cs Zobrazit soubor

@@ -0,0 +1,11 @@
namespace TelpoKafkaConsole.WebApi.Model.Request
{
public class TopicReq
{
public string TopicName { get; set; } = default!;

public int NumPartitions { get; set; } = 1;

//public int RetentionTime { get; set; } = 1000 * 24 * 3600;
}
}

+ 9
- 0
TelpoKafkaConsole.WebApi/Model/Request/UserReq.cs Zobrazit soubor

@@ -0,0 +1,9 @@
namespace TelpoKafkaConsole.WebApi.Model.Request
{
public class UserReq
{
public string Name { get; set; } = default!;

public string Password { get; set; } = default!;
}
}

+ 65
- 0
TelpoKafkaConsole.WebApi/Program.cs Zobrazit soubor

@@ -0,0 +1,65 @@
using Microsoft.Extensions.Configuration;
using Serilog;
using TelpoKafkaConsole.Model;
using TelpoKafkaConsole.Service;
using TelpoKafkaConsole.WebApi.Configs;
using TelpoKafkaConsole.WebApi.Middleware;

namespace TelpoKafkaConsole.WebApi
{
public class Program
{
public static void Main(string[] args)
{
//Ñ¡ÔñÅäÖÃÎļþappsetting.json
var config = new ConfigurationBuilder()
.SetBasePath(Directory.GetCurrentDirectory())
.AddJsonFile("appsettings.json", optional: true, reloadOnChange: true)
.Build();
Log.Logger = new LoggerConfiguration()
.ReadFrom.Configuration(config).Enrich.WithThreadInfo()
.CreateLogger();




var builder = WebApplication.CreateBuilder(args);

// Add services to the container.

builder.Services.AddControllers();
// Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();

#region ÅäÖÃÐÅÏ¢
builder.Services
.Configure<ServiceConfig>(builder.Configuration.GetSection("ServiceConfig"));
#endregion


builder.Services.AddSingleton<KafkaAdminService>();



builder.Host.UseSerilog();

var app = builder.Build();

// Configure the HTTP request pipeline.
if (app.Environment.IsDevelopment())
{
app.UseSwagger();
app.UseSwaggerUI();
}

app.UseAuthorization();

app.UseMiddleware<LoggingMiddleware>();

app.MapControllers();

app.Run();
}
}
}

+ 40
- 0
TelpoKafkaConsole.WebApi/Properties/launchSettings.json Zobrazit soubor

@@ -0,0 +1,40 @@
{
"profiles": {
"TelpoKafkaConsole.WebApi": {
"commandName": "Project",
"launchBrowser": true,
"launchUrl": "swagger",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
},
"dotnetRunMessages": true,
"applicationUrl": "http://localhost:5046"
},
"IIS Express": {
"commandName": "IISExpress",
"launchBrowser": true,
"launchUrl": "swagger",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
}
},
"Docker": {
"commandName": "Docker",
"launchBrowser": true,
"launchUrl": "{Scheme}://{ServiceHost}:{ServicePort}/swagger",
"environmentVariables": {
"ASPNETCORE_URLS": "http://+:80"
},
"publishAllPorts": true
}
},
"$schema": "https://json.schemastore.org/launchsettings.json",
"iisSettings": {
"windowsAuthentication": false,
"anonymousAuthentication": true,
"iisExpress": {
"applicationUrl": "http://localhost:10584",
"sslPort": 0
}
}
}

+ 25
- 0
TelpoKafkaConsole.WebApi/TelpoKafkaConsole.WebApi.csproj Zobrazit soubor

@@ -0,0 +1,25 @@
<Project Sdk="Microsoft.NET.Sdk.Web">

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
<DockerDefaultTargetOS>Linux</DockerDefaultTargetOS>
</PropertyGroup>

<ItemGroup>

<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.19.5" />

<PackageReference Include="Swashbuckle.AspNetCore" Version="6.5.0" />
<PackageReference Include="Serilog.AspNetCore" Version="3.4.0" />
<PackageReference Include="Serilog.Expressions" Version="3.4.0" />
<PackageReference Include="Serilog.Sinks.Async" Version="1.5.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\TelpoKafkaConsole.Common\TelpoKafkaConsole.Common.csproj" />
<ProjectReference Include="..\TelpoKafkaConsole.Service\TelpoKafkaConsole.Service.csproj" />
</ItemGroup>

</Project>

+ 13
- 0
TelpoKafkaConsole.WebApi/WeatherForecast.cs Zobrazit soubor

@@ -0,0 +1,13 @@
namespace TelpoKafkaConsole.WebApi
{
public class WeatherForecast
{
public DateTime Date { get; set; }

public int TemperatureC { get; set; }

public int TemperatureF => 32 + (int)(TemperatureC / 0.5556);

public string? Summary { get; set; }
}
}

+ 14
- 0
TelpoKafkaConsole.WebApi/appsettings.Development.json Zobrazit soubor

@@ -0,0 +1,14 @@
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
}
},
"ServiceConfig": {
"KafkaServerAddress": "k0.id.gdssjl.com:9094",
"KafkaServerPEMLocation": "C:\\Users\\vsoni\\source\\repos\\TelpoKafkaConsole\\pem\\ca-root-test.pem",
//"TopicPartitionsNum": 1

}
}

+ 118
- 0
TelpoKafkaConsole.WebApi/appsettings.json Zobrazit soubor

@@ -0,0 +1,118 @@
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
}
},
"Serilog": {
"Using": [ "Serilog.Sinks.File", "Serilog.Sinks.Async", "Serilog.Sinks.Console", "Serilog.Expressions" ],
"MinimumLevel": {
"Default": "Verbose",
"Override": {
"Microsoft": "Warning",
"Microsoft.Hosting.Lifetime": "Information",
"HttpClient": "Information"
}
},
"WriteTo": [
{
"Name": "Console",
"Args": {
"restrictedToMinimumLevel": "Verbose",
"outputTemplate": "{Timestamp:yyyy-MM-dd HH:mm:ss.fff }[{Level:u3}] [Thread-{ThreadId}] [{SourceContext:l}] [{RequestId}] {Message:lj}{NewLine}{Exception}",
"theme": "Serilog.Sinks.SystemConsole.Themes.AnsiConsoleTheme::Code, Serilog.Sinks.Console"
}
},
{
"Name": "Logger",
"Args": {
"ConfigureLogger": {
"WriteTo": [
{
"Name": "File",
"Args": {
"RestrictedToMinimumLevel": "Information",
"RollingInterval": "Day",
"RollOnFileSizeLimit": "true",
"OutputTemplate": "{Timestamp:yyyy-MM-dd HH:mm:ss.fff }[{Level:u3}] [Thread-{ThreadId}] [{SourceContext:l}] [{RequestId}] {Message:lj}{NewLine}{Exception}",
"Path": "/var/telpo_kafka_console/logs/infos/info.log",
"RetainedFileCountLimit": 10 // "--设置日志文件个数最大值,默认31,意思就是只保留最近的31个日志文件", "等于null时永远保留文件": null
//"FileSizeLimitBytes": 20971520, //设置单个文件大小为3M 默认1G
//"RollOnFileSizeLimit": true //超过文件大小后创建新的
}
}
],
"Filter": [
{
"Name": "ByIncludingOnly",
"Args": {
"Expression": "@l = 'Information'"
}
}
]
}
}
},
{
"Name": "Logger",
"Args": {
"ConfigureLogger": {
"WriteTo": [
{
"Name": "File",
"Args": {
"RestrictedToMinimumLevel": "Warning",
"RollingInterval": "Day",
"RollOnFileSizeLimit": "true",
"OutputTemplate": "{Timestamp:yyyy-MM-dd HH:mm:ss.fff }[{Level:u3}] [Thread-{ThreadId}] [{SourceContext:l}] [{RequestId}] {Message:lj}{NewLine}",
"Path": "/var/telpo_kafka_console/logs/warnings/warn.log",
"RetainedFileCountLimit": 10
}
}
],
"Filter": [
{
"Name": "ByIncludingOnly",
"Args": {
"Expression": "@l = 'Warning'"
}
}
]
}
}
},
{
"Name": "Logger",
"Args": {
"ConfigureLogger": {
"WriteTo": [
{
"Name": "File",
"Args": {
"RestrictedToMinimumLevel": "Error",
"RollingInterval": "Day",
"RollOnFileSizeLimit": "true",
"OutputTemplate": "{Timestamp:yyyy-MM-dd HH:mm:ss.fff }[{Level:u3}] [Thread-{ThreadId}][{SourceContext:l}] [{RequestId}] {Message:lj}{NewLine}{Exception}",
"Path": "/var/telpo_kafka_console/logs/errors/error.log",
"RetainedFileCountLimit": 15 // "--设置日志文件个数最大值,默认31,意思就是只保留最近的31个日志文件", "等于null时永远保留文件": null
//"FileSizeLimitBytes": 20971520, //设置单个文件大小为3M 默认1G
//"RollOnFileSizeLimit": true //超过文件大小后创建新的

}
}
],
"Filter": [
{
"Name": "ByIncludingOnly",
"Args": {
"Expression": "@l = 'Error'"
}
}
]
}
}
}
]
}
}

+ 12
- 0
TelpoKafkaConsole.WebApi/appsettings.test.json Zobrazit soubor

@@ -0,0 +1,12 @@
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
}
},
"ServiceConfig": {
"KafkaServerAddress": "k0.id.gdssjl.com:9094",
"KafkaServerLocation": "pem/ca-root-test.pem"
}
}

+ 43
- 0
TelpoKafkaConsole.sln Zobrazit soubor

@@ -0,0 +1,43 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 17
VisualStudioVersion = 17.8.34330.188
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TelpoKafkaConsole.Model", "TelpoKafkaConsole.Model\TelpoKafkaConsole.Model.csproj", "{5E43DF79-9F68-4108-B90D-C5175C680B5F}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TelpoKafkaConsole.WebApi", "TelpoKafkaConsole.WebApi\TelpoKafkaConsole.WebApi.csproj", "{D90BBA33-02B6-4C25-98CC-7E5881910085}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TelpoKafkaConsole.Common", "TelpoKafkaConsole.Common\TelpoKafkaConsole.Common.csproj", "{C6DCEB60-58AB-44A9-9C05-43B88623EE4A}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TelpoKafkaConsole.Service", "TelpoKafkaConsole.Service\TelpoKafkaConsole.Service.csproj", "{4B403911-CD11-47C7-9C01-E89C150880D5}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{5E43DF79-9F68-4108-B90D-C5175C680B5F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{5E43DF79-9F68-4108-B90D-C5175C680B5F}.Debug|Any CPU.Build.0 = Debug|Any CPU
{5E43DF79-9F68-4108-B90D-C5175C680B5F}.Release|Any CPU.ActiveCfg = Release|Any CPU
{5E43DF79-9F68-4108-B90D-C5175C680B5F}.Release|Any CPU.Build.0 = Release|Any CPU
{D90BBA33-02B6-4C25-98CC-7E5881910085}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{D90BBA33-02B6-4C25-98CC-7E5881910085}.Debug|Any CPU.Build.0 = Debug|Any CPU
{D90BBA33-02B6-4C25-98CC-7E5881910085}.Release|Any CPU.ActiveCfg = Release|Any CPU
{D90BBA33-02B6-4C25-98CC-7E5881910085}.Release|Any CPU.Build.0 = Release|Any CPU
{C6DCEB60-58AB-44A9-9C05-43B88623EE4A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{C6DCEB60-58AB-44A9-9C05-43B88623EE4A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C6DCEB60-58AB-44A9-9C05-43B88623EE4A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C6DCEB60-58AB-44A9-9C05-43B88623EE4A}.Release|Any CPU.Build.0 = Release|Any CPU
{4B403911-CD11-47C7-9C01-E89C150880D5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{4B403911-CD11-47C7-9C01-E89C150880D5}.Debug|Any CPU.Build.0 = Debug|Any CPU
{4B403911-CD11-47C7-9C01-E89C150880D5}.Release|Any CPU.ActiveCfg = Release|Any CPU
{4B403911-CD11-47C7-9C01-E89C150880D5}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {431CD79F-C549-49D3-9969-1F3804DEAB64}
EndGlobalSection
EndGlobal

+ 29
- 0
pem/ca-root-test.pem Zobrazit soubor

@@ -0,0 +1,29 @@
-----BEGIN CERTIFICATE-----
MIIE/zCCAuegAwIBAgIJAIgzt1mx6ZClMA0GCSqGSIb3DQEBCwUAMBYxFDASBgNV
BAMMC2thZmthLWFkbWluMB4XDTI0MDUwNjA2NDE1NloXDTM0MDUwNDA2NDE1Nlow
FjEUMBIGA1UEAwwLa2Fma2EtYWRtaW4wggIiMA0GCSqGSIb3DQEBAQUAA4ICDwAw
ggIKAoICAQDUCuuolDbpE6O8IvAK0YDvK+p96hjQ4bqD7UWKzie60q2OWHOfrcr3
ra4FYiEtpZMSa52Y9K+VmXnP/Ol1kJHqcJPyIEkgqnC3nMDLNtO4BTqbXb7/eL7n
ln+Sn4v96Qc6+248yV87Of7NhW0Ou7Q7Qa7Y8Podvox6CANwQcVDAuAp1segHtsl
3w3Tmz4Ty9A919lGLfSRr1XQUGGt2DLrU4GajBEs8FJoWrCWnOBH251oSwQrisLw
+/PNRR8f8NE9uxE1/qmWwwtPg0a978WaUpqaLLUd7dU+TrbbptkAAcTdV0+JPrGJ
7JCleO9akmquRrSVQfKXKCq0ardQpndGEGhfAxoqPi5BuFyn0Sx2d+NsDIQKyW7U
e7lwLafrXFwnNmKPCfigE4IvPonz6HcJORRtJDzORbjSCB347MMlkmKY+ANB5fKc
UiqEaV+/NOw99/qNICEmBUa24aU3O5ASr5hPGNz+vAtGDGKokf0K9CLcPE/FKkvY
AYzM4O/kyqJhWgs8qoHITnv0BTb9psJEZp5VLJP6iPQhJXksnCQRX9+e0Rd+4d5B
d6JZ++KfmgxMy0YhTqz58IZe0yHolxTE6S0S5IULKkPk8GnIHPgw7/zfvsOM/eFs
676401tafc/Y7gC3H/GEPHo/T8/tuHLGz7GKwcytUFQdI2YgBl0kMwIDAQABo1Aw
TjAdBgNVHQ4EFgQULe3CAIvXjKYJOGF+zt1Lm87u6p0wHwYDVR0jBBgwFoAULe3C
AIvXjKYJOGF+zt1Lm87u6p0wDAYDVR0TBAUwAwEB/zANBgkqhkiG9w0BAQsFAAOC
AgEAfktPTwfBKTONVi5ILH6/70bvIxNRQljFdRP6Ktj76FlmkGXetYQMh0Fc+Wwd
meaU7z3JFV3BJOlQkh7UPKTbmXfIrk+Kvf14sSKYf4O47GatoSY/ifZrocs+owr2
FP/6Oh3l70+vcUbLmU/328VHHumAuZjQ/199CnKgvgarEPRsekHDg0erzCu7fwVk
9ZIpRW5Kiyav+VBbg2SUVxaOHeGzy1eHxWHYeUpBpryLh4VDU1Va9LPR6nhmKPQI
GzJUgDjx9OK7Lhr6IHDWGbbOjKmHhL5nPtVOi9JPqTdJ0DXHhj94RlQdw3OxCa47
ZhvCE5re8fJGtDpExQBbaPTXR0SGJ/q8td9bcJ+zHLcCALKXwHW7cf21YiPoiQ9J
I1DmYIwxEUqbngpCA6guDoL0HONVPOwG1SG/yLHy+jaKaMHUHTptbiRpPf2znxSm
UxUxEUEg+bhtoHd3Areky4RsuPQUFbDyYEDtduSjV+00vAicxcC/sLeZRWoKsPfb
Bp4xf7Jn5OO+jNcTfjoXnlVpyBI7zdwkbyMdF/xxcaj+POIgaljTQHoGxc31taJo
vwlNqD7oLHysuL06Qzos8nEbJMT/MJzDOSIRZ3FL3aTtdnP92j/G6hVoN5XnAAXS
O31Nvpo+6guwJmFg4SuBGwT8x2wlM35nHs84vR36ScNBvq8=
-----END CERTIFICATE-----

Načítá se…
Zrušit
Uložit