No puede seleccionar más de 25 temas Los temas deben comenzar con una letra o número, pueden incluir guiones ('-') y pueden tener hasta 35 caracteres de largo.

445 líneas
18KB

  1. using Confluent.Kafka.Admin;
  2. using Microsoft.AspNetCore.Mvc;
  3. using TelpoKafkaConsole.Model;
  4. using TelpoKafkaConsole.Service;
  5. using TelpoKafkaConsole.WebApi.Controllers.Api;
  6. using TelpoKafkaConsole.WebApi.Model.Request;
  7. using static Confluent.Kafka.ConfigPropertyNames;
  8. namespace TelpoKafkaConsole.WebApi.Controllers
  9. {
  10. [Route("api/[controller]")]
  11. [ApiController]
  12. public class ScramAclsController : ControllerBase
  13. {
  14. private readonly KafkaAdminService _servicekafkaAdmin;
  15. public ScramAclsController(KafkaAdminService kafkaAdminService) { _servicekafkaAdmin = kafkaAdminService; }
  16. // POST api/<ScramAclsController>/Consumer
  17. [HttpPost("Consumer")] // 添加了路由
  18. public async Task<ApiResponse<string>> Consumer([FromBody] ScramAclsConsumerReq consumer)
  19. {
  20. // 创建 SASL 用户
  21. ScramCredentialsUser scramUser = new ScramCredentialsUser
  22. {
  23. Name = consumer.Name,
  24. Password = consumer.Password
  25. };
  26. await _servicekafkaAdmin.AlterUserScramCredentialsAsync(scramUser);
  27. // 检查并创建主题(如果不存在)
  28. var topics = await _servicekafkaAdmin.DescribeTopicsAsync(new List<string> { consumer.Topic });
  29. if (topics.Count == 0)
  30. {
  31. await _servicekafkaAdmin.CreateTopic(consumer.Topic, TimeSpan.FromDays(3), Math.Max(3, consumer.NumPartitions));
  32. }
  33. // 创建 ACLs
  34. List<AclBinding> aclBindings = new List<AclBinding>
  35. {
  36. // 允许用户读取特定主题
  37. new AclBinding
  38. {
  39. Pattern = new ResourcePattern
  40. {
  41. Type = ResourceType.Topic,
  42. Name = consumer.Topic,
  43. ResourcePatternType = ResourcePatternType.Literal
  44. },
  45. Entry = new AccessControlEntry
  46. {
  47. Principal = $"User:{consumer.Name}",
  48. Host = "*",
  49. Operation = AclOperation.Read,
  50. PermissionType = AclPermissionType.Allow
  51. }
  52. },
  53. // 允许用户读取特定消费组
  54. new AclBinding
  55. {
  56. Pattern = new ResourcePattern
  57. {
  58. Type = ResourceType.Group,
  59. Name = consumer.Group,
  60. ResourcePatternType = ResourcePatternType.Literal
  61. },
  62. Entry = new AccessControlEntry
  63. {
  64. Principal = $"User:{consumer.Name}",
  65. Host = "*",
  66. Operation = AclOperation.Read,
  67. PermissionType = AclPermissionType.Allow
  68. }
  69. },
  70. // 禁止用户执行任何其他操作(写入、修改、删除、描述等)
  71. new AclBinding
  72. {
  73. Pattern = new ResourcePattern
  74. {
  75. Type = ResourceType.Group,
  76. Name = consumer.Group,
  77. ResourcePatternType = ResourcePatternType.Literal
  78. },
  79. Entry = new AccessControlEntry
  80. {
  81. Principal = $"User:{consumer.Name}",
  82. Host = "*",
  83. Operation = AclOperation.All,
  84. PermissionType = AclPermissionType.Deny
  85. }
  86. }
  87. };
  88. // 创建 ACLs
  89. await _servicekafkaAdmin.CreateAclsAsync(aclBindings);
  90. return ApiResponse<string>.Success($"创建消费者用户 {consumer.Name} 和 ACLs 规则成功");
  91. }
  92. /**public async Task<ApiResponse<string>> Consumer([FromBody] ScramAclsConsumerReq consumer)
  93. {
  94. // 创建用户
  95. ScramCredentialsUser scramUser = new()
  96. {
  97. Name = consumer.Name,
  98. Password = consumer.Password,
  99. };
  100. await _servicekafkaAdmin.AlterUserScramCredentialsAsync(scramUser);
  101. // 创建 topic
  102. var topics = await _servicekafkaAdmin.DescribeTopicsAsync(new List<string> { consumer.Topic });
  103. if (topics.Count.Equals(0))
  104. {
  105. await _servicekafkaAdmin.CreateTopic(consumer.Topic, TimeSpan.FromDays(3), consumer.NumPartitions>3 ? 3: consumer.NumPartitions);
  106. }
  107. // 创建 alcs
  108. List<AclBinding> aclBindings = new()
  109. {
  110. //new AclBinding()
  111. //{
  112. // Pattern = new ResourcePattern
  113. // {
  114. // Type = ResourceType.Broker,
  115. // Name = "kafka-cluster",
  116. // ResourcePatternType = ResourcePatternType.Literal
  117. // },
  118. // Entry = new AccessControlEntry
  119. // {
  120. // Principal = $"User:{consumer.Name}",
  121. // Host = "*",
  122. // Operation = AclOperation.All,
  123. // PermissionType = AclPermissionType.Deny
  124. // }
  125. //},
  126. //new AclBinding()
  127. // {
  128. // Pattern = new ResourcePattern
  129. // {
  130. // Type = ResourceType.Broker,
  131. // Name = "kafka-cluster",
  132. // ResourcePatternType = ResourcePatternType.Literal
  133. // },
  134. // Entry = new AccessControlEntry
  135. // {
  136. // Principal = $"User:superuser",
  137. // Host = "*",
  138. // Operation = AclOperation.All,
  139. // PermissionType = AclPermissionType.Allow
  140. // }
  141. //},
  142. //// 禁止查看
  143. // new AclBinding()
  144. // {
  145. // Pattern = new ResourcePattern
  146. // {
  147. // Type = ResourceType.Broker,
  148. // Name = "kafka-cluster",
  149. // ResourcePatternType = ResourcePatternType.Literal
  150. // },
  151. // Entry = new AccessControlEntry
  152. // {
  153. // Principal = $"User:{consumer.Name}",
  154. // Host = "*",
  155. // Operation = AclOperation.Describe,
  156. // PermissionType = AclPermissionType.Deny
  157. // }
  158. // },
  159. // // 禁止修改
  160. // new AclBinding()
  161. // {
  162. // Pattern = new ResourcePattern
  163. // {
  164. // Type = ResourceType.Broker,
  165. // Name = "kafka-cluster",
  166. // ResourcePatternType = ResourcePatternType.Literal
  167. // },
  168. // Entry = new AccessControlEntry
  169. // {
  170. // Principal = $"User:{consumer.Name}",
  171. // Host = "*",
  172. // Operation = AclOperation.Alter,
  173. // PermissionType = AclPermissionType.Deny
  174. // }
  175. // },
  176. //// 禁止写入
  177. //new AclBinding()
  178. // {
  179. // Pattern = new ResourcePattern
  180. // {
  181. // Type = ResourceType.Broker,
  182. // Name = "kafka-cluster",
  183. // ResourcePatternType = ResourcePatternType.Literal
  184. // },
  185. // Entry = new AccessControlEntry
  186. // {
  187. // Principal = $"User:{consumer.Name}",
  188. // Host = "*",
  189. // Operation = AclOperation.Write,
  190. // PermissionType = AclPermissionType.Deny
  191. // }
  192. // },
  193. //// 禁止创建
  194. //new AclBinding()
  195. // {
  196. // Pattern = new ResourcePattern
  197. // {
  198. // Type = ResourceType.Broker,
  199. // Name = "kafka-cluster",
  200. // ResourcePatternType = ResourcePatternType.Literal
  201. // },
  202. // Entry = new AccessControlEntry
  203. // {
  204. // Principal = $"User:{consumer.Name}",
  205. // Host = "*",
  206. // Operation = AclOperation.Create,
  207. // PermissionType = AclPermissionType.Deny
  208. // }
  209. // },
  210. //new AclBinding()
  211. // {
  212. // Pattern = new ResourcePattern
  213. // {
  214. // Type = ResourceType.Broker,
  215. // Name = "kafka-cluster",
  216. // ResourcePatternType = ResourcePatternType.Literal
  217. // },
  218. // Entry = new AccessControlEntry
  219. // {
  220. // Principal = $"User:{consumer.Name}",
  221. // Host = "*",
  222. // Operation = AclOperation.Unknown,
  223. // PermissionType = AclPermissionType.Deny
  224. // }
  225. // },
  226. new AclBinding()
  227. {
  228. Pattern = new ResourcePattern
  229. {
  230. Type = ResourceType.Group,
  231. Name = consumer.Group,
  232. ResourcePatternType = ResourcePatternType.Literal
  233. },
  234. Entry = new AccessControlEntry
  235. {
  236. Principal = $"User:{consumer.Name}",
  237. Host = "*",
  238. Operation = AclOperation.Read,
  239. PermissionType = AclPermissionType.Allow
  240. }
  241. },
  242. new AclBinding()
  243. {
  244. Pattern = new ResourcePattern
  245. {
  246. Type = ResourceType.Topic,
  247. Name = consumer.Topic,
  248. ResourcePatternType = ResourcePatternType.Literal
  249. },
  250. Entry = new AccessControlEntry
  251. {
  252. Principal = $"User:{consumer.Name}",
  253. Host = "*",
  254. Operation = AclOperation.Read,
  255. PermissionType = AclPermissionType.Allow
  256. }
  257. },
  258. new AclBinding()
  259. {
  260. Pattern = new ResourcePattern
  261. {
  262. Type = ResourceType.Any,
  263. Name = consumer.Group,
  264. ResourcePatternType = ResourcePatternType.Literal
  265. },
  266. Entry = new AccessControlEntry
  267. {
  268. Principal = $"User:{consumer.Name}",
  269. Host = "*",
  270. Operation = AclOperation.Write,
  271. PermissionType = AclPermissionType.Deny
  272. }
  273. },
  274. new AclBinding()
  275. {
  276. Pattern = new ResourcePattern
  277. {
  278. Type = ResourceType.Topic,
  279. Name = consumer.Group,
  280. ResourcePatternType = ResourcePatternType.Literal
  281. },
  282. Entry = new AccessControlEntry
  283. {
  284. Principal = $"User:{consumer.Name}",
  285. Host = "*",
  286. Operation = AclOperation.Write,
  287. PermissionType = AclPermissionType.Deny
  288. }
  289. },
  290. new AclBinding()
  291. {
  292. Pattern = new ResourcePattern
  293. {
  294. Type = ResourceType.Topic,
  295. Name = consumer.Group,
  296. ResourcePatternType = ResourcePatternType.Literal
  297. },
  298. Entry = new AccessControlEntry
  299. {
  300. Principal = $"User:{consumer.Name}",
  301. Host = "*",
  302. Operation = AclOperation.Alter,
  303. PermissionType = AclPermissionType.Deny
  304. }
  305. },
  306. new AclBinding()
  307. {
  308. Pattern = new ResourcePattern
  309. {
  310. Type = ResourceType.Topic,
  311. Name = consumer.Group,
  312. ResourcePatternType = ResourcePatternType.Literal
  313. },
  314. Entry = new AccessControlEntry
  315. {
  316. Principal = $"User:{consumer.Name}",
  317. Host = "*",
  318. Operation = AclOperation.Describe,
  319. PermissionType = AclPermissionType.Deny
  320. }
  321. },
  322. new AclBinding()
  323. {
  324. Pattern = new ResourcePattern
  325. {
  326. Type = ResourceType.Topic,
  327. Name = consumer.Group,
  328. ResourcePatternType = ResourcePatternType.Literal
  329. },
  330. Entry = new AccessControlEntry
  331. {
  332. Principal = $"User:{consumer.Name}",
  333. Host = "*",
  334. Operation = AclOperation.Delete,
  335. PermissionType = AclPermissionType.Deny
  336. }
  337. }
  338. };
  339. await _servicekafkaAdmin.CreateAclsAsync(aclBindings);
  340. return ApiResponse<string>.Success($"创建 消费者用户 {consumer.Name} Acls 规则成功");
  341. }
  342. */
  343. // POST api/<ScramAclsController>/Producer
  344. [HttpPost("Producer")] // 添加了路由
  345. public async Task<ApiResponse<string>> Producer([FromBody] ScramAclsProducerReq producer)
  346. {
  347. // 创建用户
  348. ScramCredentialsUser scramUser = new()
  349. {
  350. Name = producer.Name,
  351. Password = producer.Password,
  352. };
  353. await _servicekafkaAdmin.AlterUserScramCredentialsAsync(scramUser);
  354. // 创建 topic
  355. var topics = await _servicekafkaAdmin.DescribeTopicsAsync(new List<string> { producer.Topic });
  356. if (topics.Count.Equals(0))
  357. {
  358. await _servicekafkaAdmin.CreateTopic(producer.Topic, TimeSpan.FromDays(3), producer.NumPartitions);
  359. }
  360. // 创建 alcs
  361. List<AclBinding> aclBindings = new()
  362. {
  363. new AclBinding()
  364. {
  365. Pattern = new ResourcePattern
  366. {
  367. Type = ResourceType.Broker,
  368. Name = "kafka-cluster",
  369. ResourcePatternType = ResourcePatternType.Literal
  370. },
  371. Entry = new AccessControlEntry
  372. {
  373. Principal = $"User:{producer.Name}",
  374. Host = "*",
  375. Operation = AclOperation.All,
  376. PermissionType = AclPermissionType.Deny
  377. }
  378. },
  379. new AclBinding()
  380. {
  381. Pattern = new ResourcePattern
  382. {
  383. Type = ResourceType.Topic,
  384. Name = producer.Topic,
  385. ResourcePatternType = ResourcePatternType.Literal
  386. },
  387. Entry = new AccessControlEntry
  388. {
  389. Principal = $"User:{producer.Name}",
  390. Host = "*",
  391. Operation = AclOperation.Write,
  392. PermissionType = AclPermissionType.Allow
  393. }
  394. }
  395. };
  396. await _servicekafkaAdmin.CreateAclsAsync(aclBindings);
  397. return ApiResponse<string>.Success($"创建 生产者用户 {producer.Name} Acls 规则成功");
  398. }
  399. // DELETE api/<ScramAclsController>/{username}
  400. [HttpDelete("{username}")]
  401. public async Task<ApiResponse<string>> Delete(string username)
  402. {
  403. // 删除用户
  404. var scramUsers = await _servicekafkaAdmin.DescribeUserScramCredentialsAsync(new List<string>
  405. {
  406. username
  407. });
  408. if (scramUsers.Count==1)
  409. {
  410. ScramCredentialsUser scramUser = new()
  411. {
  412. Name = username
  413. };
  414. await _servicekafkaAdmin.AlterUserScramCredentialsAsync(scramUser, "DELETE");
  415. }
  416. // 删除alcs
  417. var acls = await _servicekafkaAdmin.DescribeAclsAsync();
  418. var userAclsBinding = acls.Where(i => i.Entry.Principal.EndsWith(username)).ToList();
  419. if (userAclsBinding.Count>0)
  420. {
  421. await _servicekafkaAdmin.DeleteAclsAsync(userAclsBinding);
  422. }
  423. return ApiResponse<string>.Success($"删除用户 {username} 和 Acls 规则成功");
  424. }
  425. }
  426. }