You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

ScramAclsController.cs 7.0KB

9 months ago
9 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. using Confluent.Kafka.Admin;
  2. using Microsoft.AspNetCore.Mvc;
  3. using TelpoKafkaConsole.Model;
  4. using TelpoKafkaConsole.Service;
  5. using TelpoKafkaConsole.WebApi.Controllers.Api;
  6. using TelpoKafkaConsole.WebApi.Model.Request;
  7. using static Confluent.Kafka.ConfigPropertyNames;
  8. namespace TelpoKafkaConsole.WebApi.Controllers
  9. {
  10. [Route("api/[controller]")]
  11. [ApiController]
  12. public class ScramAclsController : ControllerBase
  13. {
  14. private readonly KafkaAdminService _servicekafkaAdmin;
  15. public ScramAclsController(KafkaAdminService kafkaAdminService) { _servicekafkaAdmin = kafkaAdminService; }
  16. // POST api/<ScramAclsController>/Consumer
  17. [HttpPost("Consumer")] // 添加了路由
  18. public async Task<ApiResponse<string>> Consumer([FromBody] ScramAclsConsumerReq consumer)
  19. {
  20. // 创建用户
  21. ScramCredentialsUser scramUser = new()
  22. {
  23. Name = consumer.Name,
  24. Password = consumer.Password,
  25. };
  26. await _servicekafkaAdmin.AlterUserScramCredentialsAsync(scramUser);
  27. // 创建 topic
  28. var topics = await _servicekafkaAdmin.DescribeTopicsAsync(new List<string> { consumer.Topic });
  29. if (topics.Count.Equals(0))
  30. {
  31. await _servicekafkaAdmin.CreateTopic(consumer.Topic, TimeSpan.FromDays(3), consumer.NumPartitions);
  32. }
  33. // 创建 alcs
  34. List<AclBinding> aclBindings = new()
  35. {
  36. new AclBinding()
  37. {
  38. Pattern = new ResourcePattern
  39. {
  40. Type = ResourceType.Broker,
  41. Name = "kafka-cluster",
  42. ResourcePatternType = ResourcePatternType.Literal
  43. },
  44. Entry = new AccessControlEntry
  45. {
  46. Principal = $"User:{consumer.Name}",
  47. Host = "*",
  48. Operation = AclOperation.All,
  49. PermissionType = AclPermissionType.Deny
  50. }
  51. },
  52. new AclBinding()
  53. {
  54. Pattern = new ResourcePattern
  55. {
  56. Type = ResourceType.Group,
  57. Name = consumer.Group,
  58. ResourcePatternType = ResourcePatternType.Literal
  59. },
  60. Entry = new AccessControlEntry
  61. {
  62. Principal = $"User:{consumer.Name}",
  63. Host = "*",
  64. Operation = AclOperation.Read,
  65. PermissionType = AclPermissionType.Allow
  66. }
  67. },
  68. new AclBinding()
  69. {
  70. Pattern = new ResourcePattern
  71. {
  72. Type = ResourceType.Topic,
  73. Name = consumer.Topic,
  74. ResourcePatternType = ResourcePatternType.Literal
  75. },
  76. Entry = new AccessControlEntry
  77. {
  78. Principal = $"User:{consumer.Name}",
  79. Host = "*",
  80. Operation = AclOperation.Read,
  81. PermissionType = AclPermissionType.Allow
  82. }
  83. }
  84. };
  85. await _servicekafkaAdmin.CreateAclsAsync(aclBindings);
  86. return ApiResponse<string>.Success($"创建 消费者用户 {consumer.Name} Acls 规则成功");
  87. }
  88. // POST api/<ScramAclsController>/Producer
  89. [HttpPost("Producer")] // 添加了路由
  90. public async Task<ApiResponse<string>> Producer([FromBody] ScramAclsProducerReq producer)
  91. {
  92. // 创建用户
  93. ScramCredentialsUser scramUser = new()
  94. {
  95. Name = producer.Name,
  96. Password = producer.Password,
  97. };
  98. await _servicekafkaAdmin.AlterUserScramCredentialsAsync(scramUser);
  99. // 创建 topic
  100. var topics = await _servicekafkaAdmin.DescribeTopicsAsync(new List<string> { producer.Topic });
  101. if (topics.Count.Equals(0))
  102. {
  103. await _servicekafkaAdmin.CreateTopic(producer.Topic, TimeSpan.FromDays(3), producer.NumPartitions);
  104. }
  105. // 创建 alcs
  106. List<AclBinding> aclBindings = new()
  107. {
  108. new AclBinding()
  109. {
  110. Pattern = new ResourcePattern
  111. {
  112. Type = ResourceType.Broker,
  113. Name = "kafka-cluster",
  114. ResourcePatternType = ResourcePatternType.Literal
  115. },
  116. Entry = new AccessControlEntry
  117. {
  118. Principal = $"User:{producer.Name}",
  119. Host = "*",
  120. Operation = AclOperation.All,
  121. PermissionType = AclPermissionType.Deny
  122. }
  123. },
  124. new AclBinding()
  125. {
  126. Pattern = new ResourcePattern
  127. {
  128. Type = ResourceType.Topic,
  129. Name = producer.Topic,
  130. ResourcePatternType = ResourcePatternType.Literal
  131. },
  132. Entry = new AccessControlEntry
  133. {
  134. Principal = $"User:{producer.Name}",
  135. Host = "*",
  136. Operation = AclOperation.Write,
  137. PermissionType = AclPermissionType.Allow
  138. }
  139. }
  140. };
  141. await _servicekafkaAdmin.CreateAclsAsync(aclBindings);
  142. return ApiResponse<string>.Success($"创建 生产者用户 {producer.Name} Acls 规则成功");
  143. }
  144. // DELETE api/<ScramAclsController>/{username}
  145. [HttpDelete("{username}")]
  146. public async Task<ApiResponse<string>> Delete(string username)
  147. {
  148. // 删除用户
  149. var scramUsers = await _servicekafkaAdmin.DescribeUserScramCredentialsAsync(new List<string>
  150. {
  151. username
  152. });
  153. if (scramUsers.Count==1)
  154. {
  155. ScramCredentialsUser scramUser = new()
  156. {
  157. Name = username
  158. };
  159. await _servicekafkaAdmin.AlterUserScramCredentialsAsync(scramUser, "DELETE");
  160. }
  161. // 删除alcs
  162. var acls = await _servicekafkaAdmin.DescribeAclsAsync();
  163. var userAclsBinding = acls.Where(i => i.Entry.Principal.EndsWith(username)).ToList();
  164. if (userAclsBinding.Count>0)
  165. {
  166. await _servicekafkaAdmin.DeleteAclsAsync(userAclsBinding);
  167. }
  168. return ApiResponse<string>.Success($"删除用户 {username} 和 Acls 规则成功");
  169. }
  170. }
  171. }