您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符

79 行
2.7KB

  1. using HealthMonitor.Common;
  2. using HealthMonitor.Core.Common.Extensions;
  3. using HealthMonitor.Service.Sub;
  4. using TDengineDriver;
  5. using TDengineTMQ;
  6. namespace HealthMonitor.WebApi
  7. {
  8. public class Worker : BackgroundService
  9. {
  10. private readonly ILogger<Worker> _logger;
  11. private readonly IServiceProvider _services;
  12. private readonly TDengineDataSubcribe _tdEngineDataSubcribe;
  13. private readonly PackageProcess _processor;
  14. private CancellationTokenSource _tokenSource=default!;
  15. public Worker(ILogger<Worker> logger, IServiceProvider services, PackageProcess processor,TDengineDataSubcribe tdEngineDataSubcribe)
  16. {
  17. _logger = logger;
  18. _tdEngineDataSubcribe = tdEngineDataSubcribe;
  19. _services = services;
  20. _processor = processor;
  21. }
  22. public override Task StartAsync(CancellationToken cancellationToken)
  23. {
  24. _logger.LogInformation("------StartAsync");
  25. _tokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
  26. // 创建消费者
  27. // _tdEngineDataSubcribe.CreateConsumer();
  28. return base.StartAsync(cancellationToken);
  29. }
  30. public override Task StopAsync(CancellationToken cancellationToken)
  31. {
  32. _logger.LogInformation("------StopAsync");
  33. _tokenSource.Cancel(); //停止工作线程
  34. // 关闭消费者
  35. // _tdEngineDataSubcribe.CloseConsumer();
  36. return base.StopAsync(cancellationToken);
  37. }
  38. protected override Task ExecuteAsync(CancellationToken stoppingToken)
  39. {
  40. // var processor = _services.GetService<PackageProcess>();
  41. TaskFactory factory = new(_tokenSource.Token);
  42. factory.StartNew(async () =>
  43. {
  44. if (_tokenSource.IsCancellationRequested)
  45. _logger.LogWarning("Worker exit");
  46. _logger.LogInformation("------ResolveAsync");
  47. while (!_tokenSource.IsCancellationRequested)
  48. {
  49. //
  50. await _processor.ResolveAsync().ConfigureAwait(false);
  51. // await _tdEngineDataSubcribe.ProcessMsg();
  52. }
  53. }, TaskCreationOptions.LongRunning);
  54. factory.StartNew(() =>
  55. {
  56. _logger.LogInformation("------_tdEngineDataSubcribe");
  57. while (!_tokenSource.IsCancellationRequested)
  58. {
  59. _tdEngineDataSubcribe.BeginListen(_tokenSource.Token);
  60. }
  61. }, TaskCreationOptions.LongRunning);
  62. return Task.Delay(1000, _tokenSource.Token);
  63. }
  64. }
  65. }