You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. using Confluent.Kafka.Admin;
  2. using Microsoft.AspNetCore.Mvc;
  3. using TelpoKafkaConsole.Service;
  4. using TelpoKafkaConsole.WebApi.Controllers.Api;
  5. using TelpoKafkaConsole.WebApi.Model.Request;
  6. // For more information on enabling Web API for empty projects, visit https://go.microsoft.com/fwlink/?LinkID=397860
  7. namespace TelpoKafkaConsole.WebApi.Controllers
  8. {
  9. [Route("api/[controller]/[action]")]
  10. [ApiController]
  11. public class AclsController : ControllerBase
  12. {
  13. private readonly KafkaAdminService _servicekafkaAdmin;
  14. public AclsController(KafkaAdminService kafkaAdminService)
  15. {
  16. _servicekafkaAdmin = kafkaAdminService;
  17. }
  18. // GET: api/<AclsController>
  19. [HttpGet]
  20. public async Task<ApiResponse<List<AclBinding>>> Get()
  21. {
  22. var acls = await _servicekafkaAdmin.DescribeAclsAsync();
  23. return ApiResponse<List<AclBinding>>.Success(acls);
  24. }
  25. // GET api/<AclsController>/5
  26. [HttpGet("{username}")]
  27. public async Task<ApiResponse<IEnumerable<AclBinding>>> Get(string username)
  28. {
  29. var acls = await _servicekafkaAdmin.DescribeAclsAsync();
  30. return ApiResponse<IEnumerable<AclBinding>>.Success(acls.Where(i => i.Entry.Principal.EndsWith(username)));
  31. }
  32. // POST api/<AclsController>
  33. [HttpPost]
  34. public async Task<ApiResponse<string>> Post([FromBody] AclsReq aclsReq)
  35. {
  36. List<AclBinding> aclBindings = new();
  37. // 生产者
  38. if (string.IsNullOrEmpty(aclsReq.Group))
  39. {
  40. aclBindings.Add(new AclBinding()
  41. {
  42. Pattern = new ResourcePattern
  43. {
  44. Type = ResourceType.Topic,
  45. Name = aclsReq.Topic,
  46. ResourcePatternType = ResourcePatternType.Literal
  47. },
  48. Entry = new AccessControlEntry
  49. {
  50. Principal = $"User:{aclsReq.UserName}",
  51. Host = "*",
  52. Operation = AclOperation.Write,
  53. PermissionType = AclPermissionType.Allow
  54. }
  55. });
  56. }
  57. // 消费者
  58. else
  59. {
  60. aclBindings.Add(new AclBinding()
  61. {
  62. Pattern = new ResourcePattern
  63. {
  64. Type = ResourceType.Group,
  65. Name = aclsReq.Group,
  66. ResourcePatternType = ResourcePatternType.Literal
  67. },
  68. Entry = new AccessControlEntry
  69. {
  70. Principal = $"User:{aclsReq.UserName}",
  71. Host = "*",
  72. Operation = AclOperation.Read,
  73. PermissionType = AclPermissionType.Allow
  74. }
  75. });
  76. aclBindings.Add(new AclBinding()
  77. {
  78. Pattern = new ResourcePattern
  79. {
  80. Type = ResourceType.Topic,
  81. Name = aclsReq.Topic,
  82. ResourcePatternType = ResourcePatternType.Literal
  83. },
  84. Entry = new AccessControlEntry
  85. {
  86. Principal = $"User:{aclsReq.UserName}",
  87. Host = "*",
  88. Operation = AclOperation.Read,
  89. PermissionType = AclPermissionType.Allow
  90. }
  91. });
  92. }
  93. // - Group: {aclsReq.Group}
  94. await _servicekafkaAdmin.CreateAclsAsync(aclBindings);
  95. var operation = string.IsNullOrEmpty(aclsReq.Group) ? "写" : "读";
  96. var group = string.IsNullOrEmpty(aclsReq.Group) ? "" : $"Group:{aclsReq.Group} - ";
  97. return ApiResponse<string>.Success($"创建 ACLs 规则 用户:{aclsReq.UserName} - Topic:{aclsReq.Topic} - {group}{operation}权限成功");
  98. }
  99. // DELETE api/<AclsController>/5
  100. [HttpDelete]
  101. public async Task<ApiResponse<string>> DeleteAsync([FromBody] AclsReq aclsReq)
  102. {
  103. List<AclBinding> aclBindings = new();
  104. // 生产者
  105. if (string.IsNullOrEmpty(aclsReq.Group))
  106. {
  107. aclBindings.Add(new AclBinding()
  108. {
  109. Pattern = new ResourcePattern
  110. {
  111. Type = ResourceType.Topic,
  112. Name = aclsReq.Topic,
  113. ResourcePatternType = ResourcePatternType.Literal
  114. },
  115. Entry = new AccessControlEntry
  116. {
  117. Principal = $"User:{aclsReq.UserName}",
  118. Host = "*",
  119. Operation = AclOperation.Any,
  120. PermissionType = AclPermissionType.Any
  121. }
  122. });
  123. }
  124. // 消费者
  125. else
  126. {
  127. aclBindings.Add(new AclBinding()
  128. {
  129. Pattern = new ResourcePattern
  130. {
  131. Type = ResourceType.Group,
  132. Name = aclsReq.Group,
  133. ResourcePatternType = ResourcePatternType.Literal
  134. },
  135. Entry = new AccessControlEntry
  136. {
  137. Principal = $"User:{aclsReq.UserName}",
  138. Host = "*",
  139. Operation = AclOperation.Any,
  140. PermissionType = AclPermissionType.Any
  141. }
  142. });
  143. aclBindings.Add(new AclBinding()
  144. {
  145. Pattern = new ResourcePattern
  146. {
  147. Type = ResourceType.Topic,
  148. Name = aclsReq.Topic,
  149. ResourcePatternType = ResourcePatternType.Literal
  150. },
  151. Entry = new AccessControlEntry
  152. {
  153. Principal = $"User:{aclsReq.UserName}",
  154. Host = "*",
  155. Operation = AclOperation.Read,
  156. PermissionType = AclPermissionType.Allow
  157. }
  158. });
  159. }
  160. await _servicekafkaAdmin.DeleteAclsAsync(aclBindings);
  161. // var operation = string.IsNullOrEmpty(aclsReq.Group) ? "写" : "读";
  162. var group = string.IsNullOrEmpty(aclsReq.Group) ? "" : $"Group:{aclsReq.Group} - ";
  163. return ApiResponse<string>.Success($"删除 ACLs 规则 用户:{aclsReq.UserName} - Topic:{aclsReq.Topic} - {group}所有权限成功");
  164. }
  165. }
  166. }