You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

KafkaAdminService.cs 15KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387
  1. using Confluent.Kafka;
  2. using Confluent.Kafka.Admin;
  3. using Microsoft.Extensions.Logging;
  4. using Microsoft.Extensions.Options;
  5. using System;
  6. using System.Collections.Generic;
  7. using System.Linq;
  8. using System.Text;
  9. using System.Threading.Tasks;
  10. using TelpoKafkaConsole.Model;
  11. using static System.Net.Mime.MediaTypeNames;
  12. namespace TelpoKafkaConsole.Service
  13. {
  14. public class KafkaAdminService
  15. {
  16. private readonly ILogger<KafkaAdminService> _logger;
  17. private readonly ServiceConfig _configService;
  18. public IAdminClient _adminClient;
  19. public KafkaAdminService(ILogger<KafkaAdminService> logger, IOptions<ServiceConfig> _optConfigService)
  20. {
  21. _logger = logger;
  22. _configService = _optConfigService.Value;
  23. _adminClient = new AdminClientBuilder(new AdminClientConfig
  24. {
  25. BootstrapServers = _configService.KafkaServerAddress,
  26. SecurityProtocol = SecurityProtocol.SaslSsl,
  27. SaslMechanism = SaslMechanism.ScramSha256,
  28. SaslUsername = "superuser",
  29. SaslPassword = "password",
  30. SslCaLocation = _configService.KafkaServerPEMLocation
  31. // Add any other configuration options as needed
  32. }).Build();
  33. }
  34. public List<GroupInfo> ListGroups()
  35. {
  36. try
  37. {
  38. var groups = _adminClient.ListGroups(TimeSpan.FromSeconds(10));
  39. return groups;
  40. }
  41. catch (Exception ex)
  42. {
  43. throw new Exception(ex.Message);
  44. }
  45. }
  46. #region UserScramCredentials
  47. public async Task<List<UserScramCredentialsDescription>> DescribeUserScramCredentialsAsync(IEnumerable<string> users)
  48. {
  49. try
  50. {
  51. var timeout = TimeSpan.FromSeconds(10);
  52. var descResult = await _adminClient.DescribeUserScramCredentialsAsync(users, new DescribeUserScramCredentialsOptions() { RequestTimeout = timeout });
  53. return descResult.UserScramCredentialsDescriptions;
  54. //foreach (var description in descResult.UserScramCredentialsDescriptions)
  55. //{
  56. // Console.WriteLine($" User: {description.User}");
  57. // foreach (var scramCredentialInfo in description.ScramCredentialInfos)
  58. // {
  59. // Console.WriteLine($" Mechanism: {scramCredentialInfo.Mechanism}");
  60. // Console.WriteLine($" Iterations: {scramCredentialInfo.Iterations}");
  61. // }
  62. //}
  63. }
  64. catch (DescribeUserScramCredentialsException e)
  65. {
  66. if (e.Error.Code.ToString().Equals("Local_Partial")
  67. && e.Results.UserScramCredentialsDescriptions.Count == 1
  68. && e.Results.UserScramCredentialsDescriptions.First().ScramCredentialInfos.Count == 0
  69. )
  70. {
  71. return new List<UserScramCredentialsDescription>();
  72. }
  73. else
  74. {
  75. var errMsg = $"An error occurred describing user SCRAM credentials for some users:\n";
  76. foreach (var description in e.Results.UserScramCredentialsDescriptions)
  77. {
  78. errMsg += ($"User: {description.User} -- Error: {description.Error}\n");
  79. if (!description.Error.IsError)
  80. {
  81. foreach (var scramCredentialInfo in description.ScramCredentialInfos)
  82. {
  83. errMsg += ($"Mechanism: {scramCredentialInfo.Mechanism} -- Iterations: {scramCredentialInfo.Iterations}\n");
  84. }
  85. }
  86. }
  87. throw new Exception(errMsg);
  88. }
  89. }
  90. catch (KafkaException e)
  91. {
  92. // _logger.LogError($"An error occurred describing user SCRAM credentials: {e}");
  93. throw new KafkaException(e.Error);
  94. }
  95. catch (Exception ex)
  96. {
  97. throw new Exception(ex.Message);
  98. }
  99. }
  100. public async Task AlterUserScramCredentialsAsync(ScramCredentialsUser scramUser, string Action = "UPSERT")
  101. {
  102. var alterations = new List<UserScramCredentialAlteration>();
  103. string user = scramUser.Name;
  104. var mechanism = scramUser.Mechanism;
  105. var iterations = scramUser.Iterations;
  106. var password = Encoding.UTF8.GetBytes(scramUser.Password);
  107. if (Action.Equals("DELETE"))
  108. {
  109. alterations.Add(new UserScramCredentialDeletion
  110. {
  111. User = user,
  112. Mechanism = mechanism,
  113. }
  114. );
  115. }
  116. else
  117. {
  118. alterations.Add(new UserScramCredentialUpsertion
  119. {
  120. User = user,
  121. ScramCredentialInfo = new ScramCredentialInfo
  122. {
  123. Mechanism = mechanism,
  124. Iterations = iterations,
  125. },
  126. Password = password,
  127. // Salt = salt,
  128. }
  129. );
  130. }
  131. var timeout = TimeSpan.FromSeconds(30);
  132. try
  133. {
  134. await _adminClient.AlterUserScramCredentialsAsync(alterations,new AlterUserScramCredentialsOptions() { RequestTimeout = timeout });
  135. //_logger.LogError("All AlterUserScramCredentials operations completed successfully");
  136. }
  137. catch (AlterUserScramCredentialsException e)
  138. {
  139. var errMsg = ($"An error occurred altering user SCRAM credentials for some users:");
  140. foreach (var result in e.Results)
  141. {
  142. errMsg += ($"User: {result.User} -- Error: {result.Error}\n");
  143. }
  144. throw new Exception( errMsg );
  145. }
  146. catch (KafkaException e)
  147. {
  148. //_logger.LogError($"An error occurred altering user SCRAM credentials: {e}");
  149. throw new KafkaException( e.Error );
  150. }
  151. catch (Exception ex)
  152. {
  153. //_logger.LogError(ex.Message);
  154. throw new Exception(ex.Message);
  155. }
  156. }
  157. #endregion
  158. #region ACLs
  159. public async Task<List<AclBinding>> DescribeAclsAsync()
  160. {
  161. //var name = "testtopic";//"";
  162. //var principal = "demo-consumer";
  163. var host = "*";
  164. List<AclBinding> ParseAclBindings = new()
  165. {
  166. new() {
  167. Pattern = new ResourcePattern
  168. {
  169. Type = Confluent.Kafka.Admin.ResourceType.Any,//resourceType,
  170. //Name = "demo-orders",
  171. ResourcePatternType = ResourcePatternType.Any//resourcePatternType
  172. },
  173. Entry = new AccessControlEntry
  174. {
  175. //Principal ="User:demo-consumer",
  176. Host = host,
  177. Operation = AclOperation.Any,//operation,
  178. PermissionType = AclPermissionType.Any//permissionType
  179. }
  180. }
  181. };
  182. List<AclBindingFilter> aclBindingFilters = ParseAclBindings.Select(aclBinding => aclBinding.ToFilter()).ToList();
  183. try
  184. {
  185. var result = await _adminClient.DescribeAclsAsync(aclBindingFilters[0]);
  186. return result.AclBindings;
  187. }
  188. catch (DescribeAclsException e)
  189. {
  190. //_logger.LogError($"An error occurred in describe ACLs operation: Code: {e.Result.Error.Code}" +
  191. // $", Reason: {e.Result.Error.Reason}");
  192. throw new Exception($"An error occurred in describe ACLs operation: Code: {e.Result.Error.Code}, Reason: {e.Result.Error.Reason}");
  193. }
  194. catch (KafkaException e)
  195. {
  196. throw new KafkaException(e.Error);
  197. }
  198. catch (Exception ex)
  199. {
  200. //_logger.LogError(ex.Message);
  201. throw new Exception(ex.Message);
  202. }
  203. }
  204. public async Task CreateAclsAsync(List<AclBinding> aclBindings)
  205. {
  206. try
  207. {
  208. await _adminClient.CreateAclsAsync(aclBindings);
  209. _logger.LogInformation("All create ACL operations completed successfully");
  210. }
  211. catch (CreateAclsException e)
  212. {
  213. var errMsg = ("One or more create ACL operations failed.\n");
  214. for (int i = 0; i < e.Results.Count; ++i)
  215. {
  216. var result = e.Results[i];
  217. if (!result.Error.IsError)
  218. {
  219. errMsg += ($"Create ACLs operation {i} completed successfully\n");
  220. }
  221. else
  222. {
  223. errMsg += ($"An error occurred in create ACL operation {i}: Code: {result.Error.Code}" +
  224. $", Reason: {result.Error.Reason}\n");
  225. }
  226. }
  227. throw new Exception(errMsg);
  228. }
  229. catch (KafkaException e)
  230. {
  231. //_logger.LogError($"An error occurred calling the CreateAcls operation: {e.Message}");
  232. throw new Exception($"An error occurred calling the CreateAcls operation: {e.Message}");
  233. }
  234. catch (Exception ex)
  235. {
  236. //_logger.LogError(ex.Message);
  237. throw new Exception(ex.Message);
  238. }
  239. }
  240. public async Task DeleteAclsAsync(List<AclBinding> aclBindings)
  241. {
  242. List<AclBindingFilter> aclBindingFilters = aclBindings.Select(aclBinding => aclBinding.ToFilter()).ToList();
  243. try
  244. {
  245. var results = await _adminClient.DeleteAclsAsync(aclBindingFilters);
  246. int i = 0;
  247. foreach (var result in results)
  248. {
  249. _logger.LogInformation($"Deleted ACLs in operation {i}");
  250. // PrintAclBindings(result.AclBindings);
  251. ++i;
  252. }
  253. }
  254. catch (DeleteAclsException e)
  255. {
  256. var errMsg = ("One or more create ACL operations failed.\n");
  257. for (int i = 0; i < e.Results.Count; ++i)
  258. {
  259. var result = e.Results[i];
  260. if (!result.Error.IsError)
  261. {
  262. errMsg += ($"Deleted ACLs in operation {i}\n");
  263. // PrintAclBindings(result.AclBindings);
  264. }
  265. else
  266. {
  267. errMsg += ($"An error occurred in delete ACL operation {i}: Code: {result.Error.Code}" +
  268. $", Reason: {result.Error.Reason}\n");
  269. }
  270. }
  271. throw new Exception (errMsg);
  272. }
  273. catch (KafkaException e)
  274. {
  275. throw new KafkaException(e.Error);
  276. }
  277. catch (Exception ex)
  278. {
  279. //_logger.LogError(ex.Message);
  280. throw new Exception(ex.Message);
  281. }
  282. }
  283. #endregion
  284. #region Topics
  285. public async Task CreateTopic(string topicName, TimeSpan retentionTime,int numPartitions=1)
  286. {
  287. try
  288. {
  289. var configEntries = new Dictionary<string, string>
  290. {
  291. { "retention.ms", ((int)retentionTime.TotalMilliseconds).ToString() }
  292. };
  293. await _adminClient.CreateTopicsAsync(
  294. new TopicSpecification[] {
  295. new() {
  296. Name = topicName,
  297. ReplicationFactor = 1,
  298. NumPartitions = numPartitions>3 ? 3 : numPartitions,
  299. Configs = configEntries
  300. }
  301. });
  302. }
  303. catch (CreateTopicsException e)
  304. {
  305. throw new Exception($"An error occurred creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}");
  306. }
  307. }
  308. public async Task DeleteTopics(IEnumerable<string> topicNames)
  309. {
  310. try
  311. {
  312. await _adminClient.DeleteTopicsAsync(topicNames);
  313. }
  314. catch (DeleteTopicsException e)
  315. {
  316. throw new Exception($"An error occurred deleting topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}");
  317. }
  318. }
  319. public async Task<List<TopicDescription>> DescribeTopicsAsync(IEnumerable<string> topicNames)
  320. {
  321. try
  322. {
  323. // var topicCollection = TopicCollection.OfTopicNames(new[] { topicName });
  324. var topicCollection = TopicCollection.OfTopicNames(topicNames);
  325. var topicDescriptions = await _adminClient.DescribeTopicsAsync(topicCollection);
  326. return topicDescriptions.TopicDescriptions;
  327. //foreach (var topicDescription in topicDescriptions.TopicDescriptions)
  328. //{
  329. // Console.WriteLine($"Topic: {topicDescription.Name}");
  330. // foreach (var partition in topicDescription.Partitions)
  331. // {
  332. // Console.WriteLine($"Partition: {partition.Partition}, Leader: {partition.Leader}, Replicas: {string.Join(",", partition.Replicas)}, Isr: {string.Join(",", partition.ISR)}");
  333. // }
  334. //}
  335. }
  336. catch (DescribeTopicsException e)
  337. {
  338. if (e.Error.Code.ToString().Equals("Local_Partial")
  339. && e.Results.TopicDescriptions.First().Error.Code.ToString().Equals("UnknownTopicOrPart")
  340. && e.Results.TopicDescriptions.First().Name.Equals(topicNames.First())
  341. )
  342. {
  343. return new List<TopicDescription>();
  344. }
  345. else
  346. {
  347. throw new Exception($"An error occurred describing topic {e.Message}");
  348. }
  349. // throw new Exception($"An error occurred describing topic {e.Message}");
  350. }
  351. }
  352. #endregion
  353. }
  354. }