You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

150 lines
5.7KB

  1. using Confluent.Kafka.Admin;
  2. using Microsoft.AspNetCore.Mvc;
  3. using TelpoKafkaConsole.Model;
  4. using TelpoKafkaConsole.Service;
  5. using TelpoKafkaConsole.WebApi.Controllers.Api;
  6. using TelpoKafkaConsole.WebApi.Model.Request;
  7. using static Confluent.Kafka.ConfigPropertyNames;
  8. namespace TelpoKafkaConsole.WebApi.Controllers
  9. {
  10. [Route("api/[controller]")]
  11. [ApiController]
  12. public class ScramAclsController : ControllerBase
  13. {
  14. private readonly KafkaAdminService _servicekafkaAdmin;
  15. public ScramAclsController(KafkaAdminService kafkaAdminService) { _servicekafkaAdmin = kafkaAdminService; }
  16. // POST api/<ScramAclsController>/Consumer
  17. [HttpPost("Consumer")] // 添加了路由
  18. public async Task<ApiResponse<string>> Consumer([FromBody] ScramAclsConsumerReq consumer)
  19. {
  20. // 创建用户
  21. ScramCredentialsUser scramUser = new()
  22. {
  23. Name = consumer.Name,
  24. Password = consumer.Password,
  25. };
  26. await _servicekafkaAdmin.AlterUserScramCredentialsAsync(scramUser);
  27. // 创建 topic
  28. var topics = await _servicekafkaAdmin.DescribeTopicsAsync(new List<string> { consumer.Topic });
  29. if (topics.Count.Equals(0))
  30. {
  31. await _servicekafkaAdmin.CreateTopic(consumer.Topic, TimeSpan.FromDays(3), consumer.NumPartitions);
  32. }
  33. // 创建 alcs
  34. List<AclBinding> aclBindings = new()
  35. {
  36. new AclBinding()
  37. {
  38. Pattern = new ResourcePattern
  39. {
  40. Type = ResourceType.Group,
  41. Name = consumer.Group,
  42. ResourcePatternType = ResourcePatternType.Literal
  43. },
  44. Entry = new AccessControlEntry
  45. {
  46. Principal = $"User:{consumer.Name}",
  47. Host = "*",
  48. Operation = AclOperation.Read,
  49. PermissionType = AclPermissionType.Allow
  50. }
  51. },
  52. new AclBinding()
  53. {
  54. Pattern = new ResourcePattern
  55. {
  56. Type = ResourceType.Topic,
  57. Name = consumer.Topic,
  58. ResourcePatternType = ResourcePatternType.Literal
  59. },
  60. Entry = new AccessControlEntry
  61. {
  62. Principal = $"User:{consumer.Name}",
  63. Host = "*",
  64. Operation = AclOperation.Read,
  65. PermissionType = AclPermissionType.Allow
  66. }
  67. }
  68. };
  69. await _servicekafkaAdmin.CreateAclsAsync(aclBindings);
  70. return ApiResponse<string>.Success($"创建 消费者用户 {consumer.Name} Acls 规则成功");
  71. }
  72. // POST api/<ScramAclsController>/Producer
  73. [HttpPost("Producer")] // 添加了路由
  74. public async Task<ApiResponse<string>> Producer([FromBody] ScramAclsProducerReq producer)
  75. {
  76. // 创建用户
  77. ScramCredentialsUser scramUser = new()
  78. {
  79. Name = producer.Name,
  80. Password = producer.Password,
  81. };
  82. await _servicekafkaAdmin.AlterUserScramCredentialsAsync(scramUser);
  83. // 创建 topic
  84. var topics = await _servicekafkaAdmin.DescribeTopicsAsync(new List<string> { producer.Topic });
  85. if (topics.Count.Equals(0))
  86. {
  87. await _servicekafkaAdmin.CreateTopic(producer.Topic, TimeSpan.FromDays(3), producer.NumPartitions);
  88. }
  89. // 创建 alcs
  90. List<AclBinding> aclBindings = new()
  91. {
  92. new AclBinding()
  93. {
  94. Pattern = new ResourcePattern
  95. {
  96. Type = ResourceType.Topic,
  97. Name = producer.Topic,
  98. ResourcePatternType = ResourcePatternType.Literal
  99. },
  100. Entry = new AccessControlEntry
  101. {
  102. Principal = $"User:{producer.Name}",
  103. Host = "*",
  104. Operation = AclOperation.Write,
  105. PermissionType = AclPermissionType.Allow
  106. }
  107. }
  108. };
  109. await _servicekafkaAdmin.CreateAclsAsync(aclBindings);
  110. return ApiResponse<string>.Success($"创建 生产者用户 {producer.Name} Acls 规则成功");
  111. }
  112. // DELETE api/<ScramAclsController>/{username}
  113. [HttpDelete("{username}")]
  114. public async Task<ApiResponse<string>> Delete(string username)
  115. {
  116. // 删除用户
  117. var scramUsers = await _servicekafkaAdmin.DescribeUserScramCredentialsAsync(new List<string>
  118. {
  119. username
  120. });
  121. if (scramUsers.Count==1)
  122. {
  123. ScramCredentialsUser scramUser = new()
  124. {
  125. Name = username
  126. };
  127. await _servicekafkaAdmin.AlterUserScramCredentialsAsync(scramUser, "DELETE");
  128. }
  129. // 删除alcs
  130. var acls = await _servicekafkaAdmin.DescribeAclsAsync();
  131. var userAclsBinding = acls.Where(i => i.Entry.Principal.EndsWith(username)).ToList();
  132. if (userAclsBinding.Count>0)
  133. {
  134. await _servicekafkaAdmin.DeleteAclsAsync(userAclsBinding);
  135. }
  136. return ApiResponse<string>.Success($"删除用户 {username} 和 Acls 规则成功");
  137. }
  138. }
  139. }