commit 96808327c6351cfdba75aa9c79552ff18a332e53
Author: 杨雷 <284428564@QQ.com>
Date: Mon May 13 17:01:53 2024 +0800
第一次提交
diff --git a/.dockerignore b/.dockerignore
new file mode 100644
index 0000000..fe1152b
--- /dev/null
+++ b/.dockerignore
@@ -0,0 +1,30 @@
+**/.classpath
+**/.dockerignore
+**/.env
+**/.git
+**/.gitignore
+**/.project
+**/.settings
+**/.toolstarget
+**/.vs
+**/.vscode
+**/*.*proj.user
+**/*.dbmdl
+**/*.jfm
+**/azds.yaml
+**/bin
+**/charts
+**/docker-compose*
+**/Dockerfile*
+**/node_modules
+**/npm-debug.log
+**/obj
+**/secrets.dev.yaml
+**/values.dev.yaml
+LICENSE
+README.md
+!**/.gitignore
+!.git/HEAD
+!.git/config
+!.git/packed-refs
+!.git/refs/heads/**
\ No newline at end of file
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..4ce6fdd
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,340 @@
+## Ignore Visual Studio temporary files, build results, and
+## files generated by popular Visual Studio add-ons.
+##
+## Get latest from https://github.com/github/gitignore/blob/master/VisualStudio.gitignore
+
+# User-specific files
+*.rsuser
+*.suo
+*.user
+*.userosscache
+*.sln.docstates
+
+# User-specific files (MonoDevelop/Xamarin Studio)
+*.userprefs
+
+# Build results
+[Dd]ebug/
+[Dd]ebugPublic/
+[Rr]elease/
+[Rr]eleases/
+x64/
+x86/
+[Aa][Rr][Mm]/
+[Aa][Rr][Mm]64/
+bld/
+[Bb]in/
+[Oo]bj/
+[Ll]og/
+
+# Visual Studio 2015/2017 cache/options directory
+.vs/
+# Uncomment if you have tasks that create the project's static files in wwwroot
+#wwwroot/
+
+# Visual Studio 2017 auto generated files
+Generated\ Files/
+
+# MSTest test Results
+[Tt]est[Rr]esult*/
+[Bb]uild[Ll]og.*
+
+# NUNIT
+*.VisualState.xml
+TestResult.xml
+
+# Build Results of an ATL Project
+[Dd]ebugPS/
+[Rr]eleasePS/
+dlldata.c
+
+# Benchmark Results
+BenchmarkDotNet.Artifacts/
+
+# .NET Core
+project.lock.json
+project.fragment.lock.json
+artifacts/
+
+# StyleCop
+StyleCopReport.xml
+
+# Files built by Visual Studio
+*_i.c
+*_p.c
+*_h.h
+*.ilk
+*.meta
+*.obj
+*.iobj
+*.pch
+*.pdb
+*.ipdb
+*.pgc
+*.pgd
+*.rsp
+*.sbr
+*.tlb
+*.tli
+*.tlh
+*.tmp
+*.tmp_proj
+*_wpftmp.csproj
+*.log
+*.vspscc
+*.vssscc
+.builds
+*.pidb
+*.svclog
+*.scc
+
+# Chutzpah Test files
+_Chutzpah*
+
+# Visual C++ cache files
+ipch/
+*.aps
+*.ncb
+*.opendb
+*.opensdf
+*.sdf
+*.cachefile
+*.VC.db
+*.VC.VC.opendb
+
+# Visual Studio profiler
+*.psess
+*.vsp
+*.vspx
+*.sap
+
+# Visual Studio Trace Files
+*.e2e
+
+# TFS 2012 Local Workspace
+$tf/
+
+# Guidance Automation Toolkit
+*.gpState
+
+# ReSharper is a .NET coding add-in
+_ReSharper*/
+*.[Rr]e[Ss]harper
+*.DotSettings.user
+
+# JustCode is a .NET coding add-in
+.JustCode
+
+# TeamCity is a build add-in
+_TeamCity*
+
+# DotCover is a Code Coverage Tool
+*.dotCover
+
+# AxoCover is a Code Coverage Tool
+.axoCover/*
+!.axoCover/settings.json
+
+# Visual Studio code coverage results
+*.coverage
+*.coveragexml
+
+# NCrunch
+_NCrunch_*
+.*crunch*.local.xml
+nCrunchTemp_*
+
+# MightyMoose
+*.mm.*
+AutoTest.Net/
+
+# Web workbench (sass)
+.sass-cache/
+
+# Installshield output folder
+[Ee]xpress/
+
+# DocProject is a documentation generator add-in
+DocProject/buildhelp/
+DocProject/Help/*.HxT
+DocProject/Help/*.HxC
+DocProject/Help/*.hhc
+DocProject/Help/*.hhk
+DocProject/Help/*.hhp
+DocProject/Help/Html2
+DocProject/Help/html
+
+# Click-Once directory
+publish/
+
+# Publish Web Output
+*.[Pp]ublish.xml
+*.azurePubxml
+# Note: Comment the next line if you want to checkin your web deploy settings,
+# but database connection strings (with potential passwords) will be unencrypted
+*.pubxml
+*.publishproj
+
+# Microsoft Azure Web App publish settings. Comment the next line if you want to
+# checkin your Azure Web App publish settings, but sensitive information contained
+# in these scripts will be unencrypted
+PublishScripts/
+
+# NuGet Packages
+*.nupkg
+# The packages folder can be ignored because of Package Restore
+**/[Pp]ackages/*
+# except build/, which is used as an MSBuild target.
+!**/[Pp]ackages/build/
+# Uncomment if necessary however generally it will be regenerated when needed
+#!**/[Pp]ackages/repositories.config
+# NuGet v3's project.json files produces more ignorable files
+*.nuget.props
+*.nuget.targets
+
+# Microsoft Azure Build Output
+csx/
+*.build.csdef
+
+# Microsoft Azure Emulator
+ecf/
+rcf/
+
+# Windows Store app package directories and files
+AppPackages/
+BundleArtifacts/
+Package.StoreAssociation.xml
+_pkginfo.txt
+*.appx
+
+# Visual Studio cache files
+# files ending in .cache can be ignored
+*.[Cc]ache
+# but keep track of directories ending in .cache
+!?*.[Cc]ache/
+
+# Others
+ClientBin/
+~$*
+*~
+*.dbmdl
+*.dbproj.schemaview
+*.jfm
+*.pfx
+*.publishsettings
+orleans.codegen.cs
+
+# Including strong name files can present a security risk
+# (https://github.com/github/gitignore/pull/2483#issue-259490424)
+#*.snk
+
+# Since there are multiple workflows, uncomment next line to ignore bower_components
+# (https://github.com/github/gitignore/pull/1529#issuecomment-104372622)
+#bower_components/
+
+# RIA/Silverlight projects
+Generated_Code/
+
+# Backup & report files from converting an old project file
+# to a newer Visual Studio version. Backup files are not needed,
+# because we have git ;-)
+_UpgradeReport_Files/
+Backup*/
+UpgradeLog*.XML
+UpgradeLog*.htm
+ServiceFabricBackup/
+*.rptproj.bak
+
+# SQL Server files
+*.mdf
+*.ldf
+*.ndf
+
+# Business Intelligence projects
+*.rdl.data
+*.bim.layout
+*.bim_*.settings
+*.rptproj.rsuser
+*- Backup*.rdl
+
+# Microsoft Fakes
+FakesAssemblies/
+
+# GhostDoc plugin setting file
+*.GhostDoc.xml
+
+# Node.js Tools for Visual Studio
+.ntvs_analysis.dat
+node_modules/
+
+# Visual Studio 6 build log
+*.plg
+
+# Visual Studio 6 workspace options file
+*.opt
+
+# Visual Studio 6 auto-generated workspace file (contains which files were open etc.)
+*.vbw
+
+# Visual Studio LightSwitch build output
+**/*.HTMLClient/GeneratedArtifacts
+**/*.DesktopClient/GeneratedArtifacts
+**/*.DesktopClient/ModelManifest.xml
+**/*.Server/GeneratedArtifacts
+**/*.Server/ModelManifest.xml
+_Pvt_Extensions
+
+# Paket dependency manager
+.paket/paket.exe
+paket-files/
+
+# FAKE - F# Make
+.fake/
+
+# JetBrains Rider
+.idea/
+*.sln.iml
+
+# CodeRush personal settings
+.cr/personal
+
+# Python Tools for Visual Studio (PTVS)
+__pycache__/
+*.pyc
+
+# Cake - Uncomment if you are using it
+# tools/**
+# !tools/packages.config
+
+# Tabs Studio
+*.tss
+
+# Telerik's JustMock configuration file
+*.jmconfig
+
+# BizTalk build output
+*.btp.cs
+*.btm.cs
+*.odx.cs
+*.xsd.cs
+
+# OpenCover UI analysis results
+OpenCover/
+
+# Azure Stream Analytics local run output
+ASALocalRun/
+
+# MSBuild Binary and Structured Log
+*.binlog
+
+# NVidia Nsight GPU debugger configuration file
+*.nvuser
+
+# MFractors (Xamarin productivity tool) working folder
+.mfractor/
+
+# Local History for Visual Studio
+.localhistory/
+
+# BeatPulse healthcheck temp database
+healthchecksdb
\ No newline at end of file
diff --git a/TelpoPush.Common/HttpHelperAsync.cs b/TelpoPush.Common/HttpHelperAsync.cs
new file mode 100644
index 0000000..1d7eb10
--- /dev/null
+++ b/TelpoPush.Common/HttpHelperAsync.cs
@@ -0,0 +1,460 @@
+using Microsoft.Extensions.Logging;
+using Newtonsoft.Json;
+using System;
+using System.Collections.Generic;
+using System.Net.Http;
+using System.Net.Http.Headers;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace TelpoPush.Common
+{
+ ///
+ /// HTTP帮助类
+ ///
+ public class HttpHelperAsync
+ {
+ private IHttpClientFactory _httpClientFactory;
+ private readonly ILogger _logger;
+ public HttpHelperAsync(IHttpClientFactory httpClientFactory, ILogger logger)
+ {
+ _httpClientFactory = httpClientFactory;
+ _logger = logger;
+ }
+
+
+ #region 异步
+ ///
+ /// 发起POST异步请求表单
+ ///
+ /// 请求地址
+ /// POST提交的内容
+ /// POST内容的媒体类型,如:application/xml、application/json
+ /// HTTP响应上的content-type内容头的值,如:application/xml、application/json、application/text、application/x-www-form-urlencoded等
+ /// 请求头信息
+ /// 请求超时时间,单位秒
+ /// 返回string
+ public async Task PostFormAsync(string url, MultipartFormDataContent content,
+ Dictionary headers = null,
+ int timeOut = 5)
+ {
+ try
+ {
+ var hostName = GetHostName(url);
+ using (HttpClient client = _httpClientFactory.CreateClient(hostName))
+ {
+ client.Timeout = TimeSpan.FromSeconds(timeOut);
+ if (headers?.Count > 0)
+ {
+ foreach (string key in headers.Keys)
+ {
+ client.DefaultRequestHeaders.Add(key, headers[key]);
+ }
+ }
+ content.Headers.Add("ContentType", "multipart/form-data");//声明头部
+ using (HttpResponseMessage response = await client.PostAsync(url, content))
+ {
+ return JsonConvert.SerializeObject(new { response.IsSuccessStatusCode, response.StatusCode });
+ //if (response.IsSuccessStatusCode)
+ //{
+ // string responseString = await response.Content.ReadAsStringAsync();
+ // return responseString;
+ //}
+ //else
+ //{
+ // return string.Empty;
+ //}
+ }
+ }
+ }
+ catch (Exception ex)
+ {
+ return $"推送完成:请求响应超过{timeOut}秒,异常 {ex.Message}";
+ }
+ }
+
+
+
+ ///
+ /// 发起GET异步请求
+ ///
+ /// 返回类型
+ /// 请求地址
+ /// 请求头信息
+ /// 请求超时时间,单位秒
+ /// 返回string
+ public async Task GetAsync(string url, Dictionary headers = null, int timeOut = 30)
+ {
+ var hostName = GetHostName(url);
+ using (HttpClient client = _httpClientFactory.CreateClient(hostName))
+ {
+ client.Timeout = TimeSpan.FromSeconds(timeOut);
+ if (headers?.Count > 0)
+ {
+ foreach (string key in headers.Keys)
+ {
+ client.DefaultRequestHeaders.Add(key, headers[key]);
+ }
+ }
+ using (HttpResponseMessage response = await client.GetAsync(url))
+ {
+ if (response.IsSuccessStatusCode)
+ {
+ string responseString = await response.Content.ReadAsStringAsync();
+ return responseString;
+ }
+ else
+ {
+ return string.Empty;
+ }
+ }
+ }
+ }
+
+
+ ///
+ /// 发起POST异步请求
+ ///
+ /// 请求地址
+ /// POST提交的内容
+ /// POST内容的媒体类型,如:application/xml、application/json
+ /// HTTP响应上的content-type内容头的值,如:application/xml、application/json、application/text、application/x-www-form-urlencoded等
+ /// 请求头信息
+ /// 请求超时时间,单位秒
+ /// 返回string
+ public async Task PostAsync(string url, string body,
+ Dictionary headers = null,
+ int timeOut = 30,
+ string bodyMediaType = "application/json",
+ string responseContentType = "application/json;charset=utf-8")
+ {
+
+ //content.Headers.ContentType = new MediaTypeHeaderValue("application/json");
+
+ //var clientHandler = new HttpClientHandler
+ //{
+ // ServerCertificateCustomValidationCallback = (message, certificate2, arg3, arg4) => true
+ //};
+ //using (var client = new HttpClient(clientHandler))
+ //{
+ // client.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));
+
+ try
+ {
+ var hostName = GetHostName(url);
+ using (HttpClient client = _httpClientFactory.CreateClient(hostName))
+ {
+ client.Timeout = TimeSpan.FromSeconds(timeOut);
+ if (headers?.Count > 0)
+ {
+ foreach (string key in headers.Keys)
+ {
+ client.DefaultRequestHeaders.Add(key, headers[key]);
+ }
+ }
+ StringContent content = new StringContent(body, System.Text.Encoding.UTF8, mediaType: bodyMediaType);
+ if (!string.IsNullOrWhiteSpace(responseContentType))
+ {
+ content.Headers.ContentType = System.Net.Http.Headers.MediaTypeHeaderValue.Parse(responseContentType);
+ }
+ using (HttpResponseMessage response = await client.PostAsync(url, content))
+ {
+ if (response.IsSuccessStatusCode)
+ {
+ string responseString = await response.Content.ReadAsStringAsync();
+ return responseString;
+ }
+ else
+ return $"请求异常:{response.IsSuccessStatusCode},{response.StatusCode}";
+ }
+ }
+ }
+ catch(Exception ex)
+ {
+ return $"请求异常:{ex.Message}";
+ }
+ }
+
+ ///
+ /// 发起POST异步请求
+ ///
+ /// 请求地址
+ /// POST提交的内容
+ /// POST内容的媒体类型,如:application/xml、application/json
+ /// HTTP响应上的content-type内容头的值,如:application/xml、application/json、application/text、application/x-www-form-urlencoded等
+ /// 请求头信息
+ /// 请求超时时间,单位秒
+ /// 返回string
+ public async Task PutAsync(string url, string body,
+ string bodyMediaType = null,
+ string responseContentType = null,
+ Dictionary headers = null,
+ int timeOut = 30)
+ {
+ var hostName = GetHostName(url);
+ using (HttpClient client = _httpClientFactory.CreateClient(hostName))
+ {
+ client.Timeout = TimeSpan.FromSeconds(timeOut);
+ if (headers?.Count > 0)
+ {
+ foreach (string key in headers.Keys)
+ {
+ client.DefaultRequestHeaders.Add(key, headers[key]);
+ }
+ }
+ StringContent content = new StringContent(body, System.Text.Encoding.UTF8, mediaType: bodyMediaType);
+ if (!string.IsNullOrWhiteSpace(responseContentType))
+ {
+ content.Headers.ContentType = System.Net.Http.Headers.MediaTypeHeaderValue.Parse(responseContentType);
+ }
+ using (HttpResponseMessage response = await client.PutAsync(url, content))
+ {
+ if (response.IsSuccessStatusCode)
+ {
+ string responseString = await response.Content.ReadAsStringAsync();
+ return responseString;
+ }
+ else
+ {
+ return string.Empty;
+ }
+ }
+ }
+ }
+
+ ///
+ /// 发起GET异步请求
+ ///
+ /// 返回类型
+ /// 请求地址
+ /// 请求头信息
+ /// 请求超时时间,单位秒
+ /// 返回string
+ public async Task DeleteAsync(string url, Dictionary headers = null, int timeOut = 30)
+ {
+ var hostName = GetHostName(url);
+ using (HttpClient client = _httpClientFactory.CreateClient(hostName))
+ {
+ client.Timeout = TimeSpan.FromSeconds(timeOut);
+ if (headers?.Count > 0)
+ {
+ foreach (string key in headers.Keys)
+ {
+ client.DefaultRequestHeaders.Add(key, headers[key]);
+ }
+ }
+ using (HttpResponseMessage response = await client.DeleteAsync(url))
+ {
+ if (response.IsSuccessStatusCode)
+ {
+ string responseString = await response.Content.ReadAsStringAsync();
+ return responseString;
+ }
+ else
+ {
+ return string.Empty;
+ }
+ }
+ }
+ }
+
+ ///
+ /// 发起GET异步请求
+ ///
+ /// 返回类型
+ /// 请求地址
+ /// 请求头信息
+ /// 请求超时时间,单位秒
+ /// 返回T
+ public async Task GetAsync(string url, Dictionary headers = null, int timeOut = 30) where T : new()
+ {
+ string responseString = await GetAsync(url, headers, timeOut);
+ if (!string.IsNullOrWhiteSpace(responseString))
+ {
+ return JsonConvert.DeserializeObject(responseString);
+ }
+ else
+ {
+ return default(T);
+ }
+ }
+
+
+ ///
+ /// 发起POST异步请求
+ ///
+ /// 返回类型
+ /// 请求地址
+ /// POST提交的内容
+ /// POST内容的媒体类型,如:application/xml、application/json
+ /// HTTP响应上的content-type内容头的值,如:application/xml、application/json、application/text、application/x-www-form-urlencoded等
+ /// 请求头信息
+ /// 请求超时时间,单位秒
+ /// 返回T
+ public async Task PostAsync(string url, string body,
+ Dictionary headers = null,
+ int timeOut = 30,
+ string bodyMediaType = "application/json",
+ string responseContentType = "application/json;charset=utf-8"
+ ) where T : new()
+ {
+ string responseString = await PostAsync(url, body, headers, timeOut, bodyMediaType, responseContentType);
+ if (!string.IsNullOrWhiteSpace(responseString))
+ {
+ return JsonConvert.DeserializeObject(responseString);
+ }
+ else
+ {
+ return default(T);
+ }
+ }
+
+ ///
+ /// 发起PUT异步请求
+ ///
+ /// 返回类型
+ /// 请求地址
+ /// POST提交的内容
+ /// POST内容的媒体类型,如:application/xml、application/json
+ /// HTTP响应上的content-type内容头的值,如:application/xml、application/json、application/text、application/x-www-form-urlencoded等
+ /// 请求头信息
+ /// 请求超时时间,单位秒
+ /// 返回T
+ public async Task PutAsync(string url, string body,
+ string bodyMediaType = null,
+ string responseContentType = null,
+ Dictionary headers = null,
+ int timeOut = 30) where T : new()
+ {
+ string responseString = await PutAsync(url, body, bodyMediaType, responseContentType, headers, timeOut);
+ if (!string.IsNullOrWhiteSpace(responseString))
+ {
+ return JsonConvert.DeserializeObject(responseString);
+ }
+ else
+ {
+ return default(T);
+ }
+ }
+
+ ///
+ /// 发起DELETE异步请求
+ ///
+ /// 返回类型
+ /// 请求地址
+ /// 请求头信息
+ /// 请求超时时间,单位秒
+ /// 返回T
+ public async Task DeleteAsync(string url, Dictionary headers = null, int timeOut = 30) where T : new()
+ {
+ string responseString = await DeleteAsync(url, headers, timeOut);
+ if (!string.IsNullOrWhiteSpace(responseString))
+ {
+ return JsonConvert.DeserializeObject(responseString);
+ }
+ else
+ {
+ return default(T);
+ }
+ }
+ #region 私有函数
+
+ ///
+ /// 获取请求的主机名
+ ///
+ ///
+ ///
+ private static string GetHostName(string url)
+ {
+ if (!string.IsNullOrWhiteSpace(url))
+ {
+ return url.Replace("https://", "").Replace("http://", "").Split('/')[0];
+ }
+ else
+ {
+ return "AnyHost";
+ }
+ }
+
+ #endregion
+
+
+ #endregion
+
+ #region 同步
+
+ ///
+ /// 发起GET同步请求
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+
+
+ public string HttpGet(string url, Dictionary headers = null, string contentType = "application/json;charset=utf-8")
+ {
+ if (string.IsNullOrEmpty(url)) return "";
+ try
+ {
+ using (HttpClient client = new HttpClient())
+ {
+ if (contentType != null)
+ client.DefaultRequestHeaders.Add("ContentType", contentType);
+ if (headers != null)
+ {
+ foreach (var header in headers)
+ client.DefaultRequestHeaders.Add(header.Key, header.Value);
+ }
+ HttpResponseMessage response = client.GetAsync(url).Result;
+ return response.Content.ReadAsStringAsync().Result;
+ }
+ }
+ catch (Exception ex)
+ {
+ _logger.LogDebug($"HttpGet/URL:{url},headers:{JsonConvert.SerializeObject(headers)},异常:{ex.Message}|{ex.Source}|{ex.StackTrace}");
+ }
+ return "";
+ }
+ ///
+ /// 发起POST同步请求
+ ///
+ ///
+ ///
+ /// application/xml、application/json、application/text、application/x-www-form-urlencoded
+ /// 填充消息头
+ ///
+ public string HttpPost(string url, string postData = null, Dictionary headers = null, string contentType = "application/json")
+ {
+ if (string.IsNullOrEmpty(url)) return "";
+ try
+ {
+ postData = postData ?? "";
+ using (HttpClient client = new HttpClient())
+ {
+ if (headers != null)
+ {
+ foreach (var header in headers)
+ client.DefaultRequestHeaders.Add(header.Key, header.Value);
+ }
+ using (HttpContent httpContent = new StringContent(postData, Encoding.UTF8))
+ {
+ if (contentType != null)
+ httpContent.Headers.ContentType = new MediaTypeHeaderValue(contentType);
+
+ HttpResponseMessage response = client.PostAsync(url, httpContent).Result;
+ return response.Content.ReadAsStringAsync().Result;
+ }
+ }
+ }
+ catch (Exception ex){
+ _logger.LogDebug($"HttpPost/URL:{url},postStr:{postData},headers:{JsonConvert.SerializeObject(headers)},异常:{ex.Message}|{ex.Source}|{ex.StackTrace}");
+ }
+ return "";
+ }
+
+
+ #endregion
+ }
+}
\ No newline at end of file
diff --git a/TelpoPush.Common/LimitedConcurrencyLevelTaskScheduler.cs b/TelpoPush.Common/LimitedConcurrencyLevelTaskScheduler.cs
new file mode 100644
index 0000000..43388f3
--- /dev/null
+++ b/TelpoPush.Common/LimitedConcurrencyLevelTaskScheduler.cs
@@ -0,0 +1,137 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace TelpoPush.Common
+{
+ ///
+ /// Provides a task scheduler that ensures a maximum concurrency level while
+ /// running on top of the ThreadPool.
+ ///
+ public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
+ {
+ /// Whether the current thread is processing work items.
+ [ThreadStatic]
+ private static bool _currentThreadIsProcessingItems;
+ /// The list of tasks to be executed.
+ private readonly LinkedList _tasks = new LinkedList(); // protected by lock(_tasks)
+ /// The maximum concurrency level allowed by this scheduler.
+ private readonly int _maxDegreeOfParallelism;
+ /// Whether the scheduler is currently processing work items.
+ private int _delegatesQueuedOrRunning = 0; // protected by lock(_tasks)
+
+ ///
+ /// Initializes an instance of the LimitedConcurrencyLevelTaskScheduler class with the
+ /// specified degree of parallelism.
+ ///
+ /// The maximum degree of parallelism provided by this scheduler.
+ public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism)
+ {
+ if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism");
+ _maxDegreeOfParallelism = maxDegreeOfParallelism;
+ }
+
+ /// Queues a task to the scheduler.
+ /// The task to be queued.
+ protected sealed override void QueueTask(Task task)
+ {
+ // Add the task to the list of tasks to be processed. If there aren't enough
+ // delegates currently queued or running to process tasks, schedule another.
+ lock (_tasks)
+ {
+ _tasks.AddLast(task);
+ if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism)
+ {
+ ++_delegatesQueuedOrRunning;
+ NotifyThreadPoolOfPendingWork();
+ }
+ }
+ }
+
+ ///
+ /// Informs the ThreadPool that there's work to be executed for this scheduler.
+ ///
+ private void NotifyThreadPoolOfPendingWork()
+ {
+ ThreadPool.UnsafeQueueUserWorkItem(_ =>
+ {
+ // Note that the current thread is now processing work items.
+ // This is necessary to enable inlining of tasks into this thread.
+ _currentThreadIsProcessingItems = true;
+ try
+ {
+ // Process all available items in the queue.
+ while (true)
+ {
+ Task item;
+ lock (_tasks)
+ {
+ // When there are no more items to be processed,
+ // note that we're done processing, and get out.
+ if (_tasks.Count == 0)
+ {
+ --_delegatesQueuedOrRunning;
+ break;
+ }
+
+ // Get the next item from the queue
+ item = _tasks.First.Value;
+ _tasks.RemoveFirst();
+ }
+
+ // Execute the task we pulled out of the queue
+ base.TryExecuteTask(item);
+ }
+ }
+ // We're done processing items on the current thread
+ finally { _currentThreadIsProcessingItems = false; }
+ }, null);
+ }
+
+ /// Attempts to execute the specified task on the current thread.
+ /// The task to be executed.
+ ///
+ /// Whether the task could be executed on the current thread.
+ protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
+ {
+ // If this thread isn't already processing a task, we don't support inlining
+ if (!_currentThreadIsProcessingItems) return false;
+
+ // If the task was previously queued, remove it from the queue
+ if (taskWasPreviouslyQueued) TryDequeue(task);
+
+ // Try to run the task.
+ return base.TryExecuteTask(task);
+ }
+
+ /// Attempts to remove a previously scheduled task from the scheduler.
+ /// The task to be removed.
+ /// Whether the task could be found and removed.
+ protected sealed override bool TryDequeue(Task task)
+ {
+ lock (_tasks) return _tasks.Remove(task);
+ }
+
+ /// Gets the maximum concurrency level supported by this scheduler.
+ public sealed override int MaximumConcurrencyLevel { get { return _maxDegreeOfParallelism; } }
+
+ /// Gets an enumerable of the tasks currently scheduled on this scheduler.
+ /// An enumerable of the tasks currently scheduled.
+ protected sealed override IEnumerable GetScheduledTasks()
+ {
+ bool lockTaken = false;
+ try
+ {
+ Monitor.TryEnter(_tasks, ref lockTaken);
+ if (lockTaken) return _tasks.ToArray();
+ else throw new NotSupportedException();
+ }
+ finally
+ {
+ if (lockTaken) Monitor.Exit(_tasks);
+ }
+ }
+ }
+}
diff --git a/TelpoPush.Common/TelpoPush.Common.csproj b/TelpoPush.Common/TelpoPush.Common.csproj
new file mode 100644
index 0000000..21813ad
--- /dev/null
+++ b/TelpoPush.Common/TelpoPush.Common.csproj
@@ -0,0 +1,14 @@
+
+
+
+ net8.0
+ enable
+ enable
+
+
+
+
+
+
+
+
diff --git a/TelpoPush.Common/TimeHelper.cs b/TelpoPush.Common/TimeHelper.cs
new file mode 100644
index 0000000..19d7b0e
--- /dev/null
+++ b/TelpoPush.Common/TimeHelper.cs
@@ -0,0 +1,28 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace TelpoPush.Common
+{
+ public class TimeHelper
+ {
+ ///
+ /// 时间戳转成时间类型
+ ///
+ ///
+ ///
+ public static DateTime ConvertToLocalDateTime(string timeStamp)
+ {
+ DateTime dtStart = TimeZoneInfo.ConvertTime(new DateTime(1970, 1, 1), TimeZoneInfo.Utc, TimeZoneInfo.Local);
+ if (timeStamp.Length == 13)
+ {
+ long lTime = long.Parse(timeStamp + "0000");
+ TimeSpan toNow = new TimeSpan(lTime);
+ return dtStart.Add(toNow);
+ }
+ return dtStart.AddSeconds(long.Parse(timeStamp));
+ }
+ }
+}
diff --git a/TelpoPush.Models/CacheTemplates/ManufactorKafkaModel.cs b/TelpoPush.Models/CacheTemplates/ManufactorKafkaModel.cs
new file mode 100644
index 0000000..d2fa270
--- /dev/null
+++ b/TelpoPush.Models/CacheTemplates/ManufactorKafkaModel.cs
@@ -0,0 +1,19 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace TelpoPush.Models.CacheTemplates
+{
+ public class ManufactorKafkaModel
+ {
+ public string manufactorId { get; set; }
+ public string manufactorName { get; set; }
+ public string kafkaServers { get; set; }
+ public string kafkaTopic { get; set; }
+ public string kafkaGroup { get; set; }
+ public string kafkaUsername { get; set; }
+ public string kafkaPassword { get; set; }
+ }
+}
diff --git a/TelpoPush.Models/Config/RedisConfig.cs b/TelpoPush.Models/Config/RedisConfig.cs
new file mode 100644
index 0000000..d78dafc
--- /dev/null
+++ b/TelpoPush.Models/Config/RedisConfig.cs
@@ -0,0 +1,94 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace TelpoPush.Models.Config
+{
+ ///
+ /// Redis配置模板类
+ ///
+ public class RedisConfig
+ {
+ public string Server { get; set; }
+ ///
+ /// Redis server password
+ ///
+ public string Password { get; set; }
+ ///
+ /// Redis server database, default 0
+ ///
+ public int? DefaultDatabase { get; set; }
+ ///
+ /// The asynchronous method automatically uses pipeline, and the 10W concurrent time is 450ms(welcome to feedback)
+ ///
+ public bool? AsyncPipeline { get; set; }
+ ///
+ /// Connection pool size, default 50
+ ///
+ public int? Poolsize { get; set; }
+ ///
+ /// Idle time of elements in the connection pool(MS), suitable for connecting to remote redis server, default 20000
+ ///
+ public int? IdleTimeout { get; set; }
+ ///
+ /// Connection timeout(MS), default 5000
+ ///
+ public int? ConnectTimeout { get; set; }
+ ///
+ /// Send / receive timeout(MS), default 10000
+ ///
+ public int? SyncTimeout { get; set; }
+ ///
+ /// Preheat connections, receive values such as preheat = 5 preheat 5 connections, default 5
+ ///
+ public int? Preheat { get; set; }
+ ///
+ /// Follow system exit event to release automatically, default true
+ ///
+ public bool? AutoDispose { get; set; }
+ ///
+ /// Enable encrypted transmission, default false
+ ///
+ public bool? Ssl { get; set; }
+ ///
+ /// 是否尝试集群模式,阿里云、腾讯云集群需要设置此选项为 false, default true
+ ///
+ public bool? Testcluster { get; set; }
+ ///
+ /// Execution error, retry attempts, default 0
+ ///
+ public int? Tryit { get; set; }
+ ///
+ /// Connection name, use client list command to view
+ ///
+ public string Name { get; set; }
+ ///
+ /// key前辍,所有方法都会附带此前辍,csredis.Set(prefix + "key", 111)
+ ///
+ public string Prefix { get; set; }
+
+ public override string ToString()
+ {
+ if (string.IsNullOrWhiteSpace(Server)) throw new ArgumentNullException(nameof(Server));
+ var sb = new StringBuilder(Server);
+ if (!string.IsNullOrWhiteSpace(Password)) sb.Append($",password={Password}");
+ if (DefaultDatabase.HasValue) sb.Append($",defaultDatabase={DefaultDatabase}");
+ if (AsyncPipeline.HasValue) sb.Append($",asyncPipeline={AsyncPipeline}");
+ if (Poolsize.HasValue) sb.Append($",poolsize={Poolsize}");
+ if (IdleTimeout.HasValue) sb.Append($",idleTimeout={IdleTimeout}");
+ if (ConnectTimeout.HasValue) sb.Append($",connectTimeout={ConnectTimeout}");
+ if (SyncTimeout.HasValue) sb.Append($",syncTimeout={0}");
+ if (Preheat.HasValue) sb.Append($",preheat={Preheat}");
+ if (AutoDispose.HasValue) sb.Append($",autoDispose={AutoDispose}");
+ if (Ssl.HasValue) sb.Append($",ssl={Ssl}");
+ if (Testcluster.HasValue) sb.Append($",testcluster={Testcluster}");
+ if (Tryit.HasValue) sb.Append($",tryit={Tryit}");
+ if (!string.IsNullOrWhiteSpace(Name)) sb.Append($",name={Name}");
+ if (!string.IsNullOrWhiteSpace(Prefix)) sb.Append($",prefix={Prefix}");
+
+ return sb.ToString();
+ }
+ }
+}
diff --git a/TelpoPush.Models/Config/ServiceConfig.cs b/TelpoPush.Models/Config/ServiceConfig.cs
new file mode 100644
index 0000000..733d774
--- /dev/null
+++ b/TelpoPush.Models/Config/ServiceConfig.cs
@@ -0,0 +1,32 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace TelpoPush.Models.Config
+{
+ public class ServiceConfig
+ {
+ ///
+ /// 数据服务Host Url
+ ///
+ public string TelpoDataUrl { get; set; }
+
+ ///
+ /// Kafka服务地址
+ ///
+ public string KafkaBootstrapServers { get; set; }
+ public List KafkaTopics { get; set; }
+ public string KafkaGroupId { get; set; }
+ public string KafkaSaslUsername { get; set; }
+ public string KafkaSaslPassword { get; set; }
+ public string KafkaSslCaLocation { get; set; }
+
+ ///
+ /// 默认缓存时间
+ ///
+ public int CacheDurationSeconds { get; set; }
+ public int CacheDurationSeconds10 { get; set; }
+ }
+}
diff --git a/TelpoPush.Models/Dto/HeadersDto.cs b/TelpoPush.Models/Dto/HeadersDto.cs
new file mode 100644
index 0000000..3f358b2
--- /dev/null
+++ b/TelpoPush.Models/Dto/HeadersDto.cs
@@ -0,0 +1,16 @@
+using Newtonsoft.Json;
+namespace TelpoPush.Models.Dto
+{
+ ///
+ /// 消息数据头
+ ///
+ public class HeadersDto
+ {
+ [JsonProperty(PropertyName = "DataType")]
+ public int? DataType { get; set; }
+ [JsonProperty(PropertyName = "AlarmType")]
+ public int? AlarmType { get; set; }
+ [JsonProperty(PropertyName = "OperType")]
+ public int? OperType { get; set; }
+ }
+}
diff --git a/TelpoPush.Models/Dto/MqHeader.cs b/TelpoPush.Models/Dto/MqHeader.cs
new file mode 100644
index 0000000..637f7d7
--- /dev/null
+++ b/TelpoPush.Models/Dto/MqHeader.cs
@@ -0,0 +1,26 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace TelpoPush.Models.Dto
+{
+ public static class MqHeader
+ {
+ ///
+ /// DataType
+ ///
+ public const string DataType = "DataType";
+
+ ///
+ /// OperType
+ ///
+ public const string OperType = "OperType";
+
+ ///
+ /// AlarmType
+ ///
+ public const string AlarmTypes = "AlarmType";
+ }
+}
diff --git a/TelpoPush.Models/Enum/MqDataTopic.cs b/TelpoPush.Models/Enum/MqDataTopic.cs
new file mode 100644
index 0000000..9e5575f
--- /dev/null
+++ b/TelpoPush.Models/Enum/MqDataTopic.cs
@@ -0,0 +1,19 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace TelpoPush.Models.Enum
+{
+
+ public enum MqDataTopic : int
+ {
+ ///
+ /// 中高实时心率
+ ///
+ ZkRealHRMonitorTopic = 1
+
+
+ }
+}
diff --git a/TelpoPush.Models/Enum/MqDataType.cs b/TelpoPush.Models/Enum/MqDataType.cs
new file mode 100644
index 0000000..d054131
--- /dev/null
+++ b/TelpoPush.Models/Enum/MqDataType.cs
@@ -0,0 +1,152 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace TelpoPush.Models.Enum
+{
+ ///
+ /// 数据类型,标识发布到kafka的消息的数据类型
+ ///
+ public enum MqDataType : int
+ {
+ ///
+ /// 报警消息
+ ///
+ AlarmInfo = 0,
+
+ ///
+ /// 温度数据信息
+ ///
+ TemperatureInfo = 1,
+
+ ///
+ /// 步数信息
+ ///
+ StepInfo = 2,
+
+ ///
+ /// 电量信息
+ ///
+ BatteryLevelInfo = 3,
+
+ ///
+ /// 设备配置信息
+ ///
+ DeviceConfigInfo = 4,
+
+ ///
+ /// 设备通话记录
+ ///
+ DeviceCallLog = 5,
+
+ ///
+ /// 设备短信记录
+ ///
+ DeviceSmsLog = 6,
+
+ ///
+ /// 位置信息
+ ///
+ PositionInfo = 7,
+
+ ///
+ /// 支付
+ ///
+ PayInfo = 8,
+
+ ///
+ /// 设备状态(offline,online)
+ ///
+ Status = 9,
+
+ ///
+ /// 设备激活状态(激活1,未激活0)
+ ///
+ Active = 10,
+
+ ///
+ /// 指令回调
+ ///
+ reply = 11,
+
+ ///
+ /// 天气查询
+ ///
+ Weather = 12,
+
+ ///
+ /// 短信阅读事件
+ ///
+ ReadMsg = 13,
+ ///
+ /// 学习能力状态上报事件
+ ///
+ StudyAINotifyStatusUpload = 14,
+ ///
+ /// 心率
+ ///
+ HeartRateInfo = 15,
+ ///
+ /// 血氧
+ ///
+ Spo2Info = 16,
+ ///
+ /// 周期性报体温数据。
+ ///
+ Temperature1Info = 17,
+ ///
+ /// 周期心率。
+ ///
+ HeartRate1Info = 18,
+ ///
+ /// 周期性血氧
+ ///
+ Spo21Info = 19,
+ ///
+ /// 溺水状态
+ ///
+ DrownReportInfo = 20,
+ ///
+ /// 手表佩戴状态
+ ///
+ WearStatusInfo = 21,
+ ///
+ /// 血压
+ ///
+ BloodPressInfo = 22,
+ ///
+ /// 周期性血压
+ ///
+ BloodPress1Info = 23,
+ ///
+ /// 心理监测
+ ///
+ PsychInfo = 24,
+
+ ///
+ /// AI呼叫回调结果
+ ///
+ AiCallResult = 25,
+
+ ///
+ /// 越界上报(围栏进出告警)
+ ///
+ CrossBorder = 26,
+
+ ///
+ /// 运动数据上报
+ ///
+ SportResult = 27,
+ ///
+ /// 运动数据上报
+ ///
+ BloodSugar = 28,
+
+ ///
+ /// 绑定业务
+ ///
+ BindDevice = 100
+ }
+}
diff --git a/TelpoPush.Models/MqTemplates/BaseModel.cs b/TelpoPush.Models/MqTemplates/BaseModel.cs
new file mode 100644
index 0000000..6fe6b4e
--- /dev/null
+++ b/TelpoPush.Models/MqTemplates/BaseModel.cs
@@ -0,0 +1,18 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace TelpoPush.Models.MqTemplates
+{
+ public class BaseModel
+ {
+ public string MessageId { get; set; }
+ public string IMEI { get; set; }
+ public string TopicName { get; set; }
+ public string MessageTime { get; set; }
+ public object Content { get; set; }
+ }
+
+}
diff --git a/TelpoPush.Models/MqTemplates/ZkRealHeartRatesTemplate.cs b/TelpoPush.Models/MqTemplates/ZkRealHeartRatesTemplate.cs
new file mode 100644
index 0000000..ddb088d
--- /dev/null
+++ b/TelpoPush.Models/MqTemplates/ZkRealHeartRatesTemplate.cs
@@ -0,0 +1,41 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace TelpoPush.Models.MqTemplates
+{
+ public class ZkRealHeartRatesTemplate
+ {
+ public ZkRealHeartRateData heartRates { get; set; }
+ }
+ public class ZkRealHeartRateData
+ {
+ public string imei { get; set; }
+ public string agencyid { get; set; }
+ public List data { get; set; }
+ }
+ public class ZkRealHeartRateDataItem
+ {
+ public int time { get; set; }
+ public int value { get; set; }
+ public int isAnomaly { get; set; }
+ }
+
+ public class ZkRealHeartRateAnomalyCancelTemplate
+ {
+ public ZkRealHeartRateAnomalyCancelData anomalyCancel { get; set; }
+ }
+ public class ZkRealHeartRateAnomalyCancelData
+ {
+ public string imei { get; set; }
+ public string agencyid { get; set; }
+ public int time { get; set; }
+ }
+
+}
+
+
+
+
diff --git a/TelpoPush.Models/PushTemplates/WxModel.cs b/TelpoPush.Models/PushTemplates/WxModel.cs
new file mode 100644
index 0000000..0123941
--- /dev/null
+++ b/TelpoPush.Models/PushTemplates/WxModel.cs
@@ -0,0 +1,18 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace TelpoPush.Models.PushTemplates
+{
+ public class WxModel
+ {
+ public string deviceId { get; set; }
+ public string imei { get; set; }
+ public int alarmTypeId { get; set; }
+ public string alarmDeviceName { get; set; }
+ public string alarmRemarks { get; set; }
+ public string address { get; set; }
+ }
+}
diff --git a/TelpoPush.Models/TelpoPush.Models.csproj b/TelpoPush.Models/TelpoPush.Models.csproj
new file mode 100644
index 0000000..3b5b170
--- /dev/null
+++ b/TelpoPush.Models/TelpoPush.Models.csproj
@@ -0,0 +1,13 @@
+
+
+
+ net8.0
+ enable
+ enable
+
+
+
+
+
+
+
diff --git a/TelpoPush.Service/Biz/IZkRealHRMonitorService.cs b/TelpoPush.Service/Biz/IZkRealHRMonitorService.cs
new file mode 100644
index 0000000..2da9574
--- /dev/null
+++ b/TelpoPush.Service/Biz/IZkRealHRMonitorService.cs
@@ -0,0 +1,22 @@
+using Confluent.Kafka;
+using Microsoft.Extensions.Logging;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using TelpoDataService.Util.Clients;
+using TelpoDataService.Util.Entities.GpsLocationHistory;
+using TelpoDataService.Util.Models;
+using TelpoDataService.Util.QueryObjects;
+using TelpoPush.Common;
+using TelpoPush.Models.MqTemplates;
+
+namespace TelpoPush.Service.Biz
+{
+ public interface IZkRealHRMonitorService
+ {
+ Task Save(ZkRealHeartRateData data, string MessageId);
+ Task SaveAnomalyCancel(ZkRealHeartRateAnomalyCancelData data, string MessageId);
+ }
+}
diff --git a/TelpoPush.Service/Biz/Impl/ZkRealHRMonitorService.cs b/TelpoPush.Service/Biz/Impl/ZkRealHRMonitorService.cs
new file mode 100644
index 0000000..b381396
--- /dev/null
+++ b/TelpoPush.Service/Biz/Impl/ZkRealHRMonitorService.cs
@@ -0,0 +1,82 @@
+using Confluent.Kafka;
+using Microsoft.Extensions.Logging;
+using Newtonsoft.Json;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using TelpoDataService.Util.Clients;
+using TelpoDataService.Util.Entities.GpsLocationHistory;
+using TelpoDataService.Util.Models;
+using TelpoDataService.Util.QueryObjects;
+using TelpoPush.Common;
+using TelpoPush.Models.MqTemplates;
+using TelpoPush.Service.Cache;
+
+namespace TelpoPush.Service.Biz
+{
+ public class ZkRealHRMonitorService : IZkRealHRMonitorService
+ {
+
+ private readonly ILogger _logger;
+ private readonly RedisUtil _redis;
+ private readonly GpsLocationHistoryAccessorClient _messageRealHeartRateAiClient;
+ public ZkRealHRMonitorService(
+ ILogger logger, RedisUtil redis,
+ GpsLocationHistoryAccessorClient messageRealHeartRateAiClient)
+ {
+ _logger = logger;
+ _redis = redis;
+ _messageRealHeartRateAiClient = messageRealHeartRateAiClient;
+ }
+
+ public async Task Save(ZkRealHeartRateData data, string MessageId)
+ {
+ string deviceKey = await _redis.GetHealthyDeviceKey(data.imei);
+ foreach (var item in data.data)
+ {
+ if (item.isAnomaly == 1)
+ {
+ DateTime lastUpdate = TimeHelper.ConvertToLocalDateTime(item.time.ToString());
+ HisGpsRealHeartRate model = new HisGpsRealHeartRate()
+ {
+ HeartRateId = item.time.ToString(),
+ MessageId = $"{MessageId}_{item.time}",
+ Serialno = data.imei,
+ ManufactorId = data.agencyid,
+ HeartRate = item.value,
+ IsAnomaly = item.isAnomaly,
+ LastUpdate = lastUpdate,
+ Method = 1,
+ IsDisplay = 1,
+ DeviceKey = deviceKey,
+ CreateTime = DateTime.Now,
+ };
+ await _messageRealHeartRateAiClient.AddAsync(model);
+ }
+ }
+ }
+
+ public async Task SaveAnomalyCancel(ZkRealHeartRateAnomalyCancelData data, string MessageId)
+ {
+ string deviceKey = await _redis.GetHealthyDeviceKey(data.imei);
+ DateTime lastUpdate = TimeHelper.ConvertToLocalDateTime(data.time.ToString());
+ HisGpsRealHeartRate model = new HisGpsRealHeartRate()
+ {
+ HeartRateId = data.time.ToString(),
+ MessageId = $"{MessageId}_{data.time}",
+ Serialno = data.imei,
+ ManufactorId = data.agencyid,
+ HeartRate = 0,
+ IsAnomaly = -1,
+ LastUpdate = lastUpdate,
+ Method = 1,
+ IsDisplay = 1,
+ DeviceKey = deviceKey,
+ CreateTime = DateTime.Now,
+ };
+ await _messageRealHeartRateAiClient.AddAsync(model);
+ }
+ }
+}
diff --git a/TelpoPush.Service/Cache/MemoryCacheUtil.cs b/TelpoPush.Service/Cache/MemoryCacheUtil.cs
new file mode 100644
index 0000000..6dda381
--- /dev/null
+++ b/TelpoPush.Service/Cache/MemoryCacheUtil.cs
@@ -0,0 +1,82 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Caching.Memory;
+
+namespace TelpoPush.Service.Cache
+{
+ internal class MemoryCacheUtil
+ {
+ static MemoryCache cache = new MemoryCache(new MemoryCacheOptions());
+ ///
+ /// 创建缓存项的文件
+ ///
+ /// 缓存Key
+ /// object对象
+ public static void Set(string key, object value)
+ {
+ if (key != null)
+ {
+ cache.Set(key, value);
+ }
+ }
+ ///
+ /// 创建缓存项过期
+ ///
+ /// 缓存Key
+ /// object对象
+ /// 过期时间(秒)
+ public static void Set(string key, object value, int expires)
+ {
+ if (key != null)
+ {
+ cache.Set(key, value, new MemoryCacheEntryOptions()
+ //设置缓存时间,如果被访问重置缓存时间。设置相对过期时间x秒
+ .SetSlidingExpiration(TimeSpan.FromSeconds(expires)));
+ }
+ }
+
+ ///
+ /// 获取缓存对象
+ ///
+ /// 缓存Key
+ /// object对象
+ public static object Get(string key)
+ {
+ object val = null;
+ if (key != null && cache.TryGetValue(key, out val))
+ {
+
+ return val;
+ }
+ else
+ {
+ return default(object);
+ }
+ }
+
+ ///
+ /// 获取缓存对象
+ ///
+ /// T对象
+ /// 缓存Key
+ ///
+ public static T Get(string key)
+ {
+ object obj = Get(key);
+ return obj == null ? default(T) : (T)obj;
+ }
+
+
+ ///
+ /// 移除缓存项的文件
+ ///
+ /// 缓存Key
+ public static void Remove(string key)
+ {
+ cache.Remove(key);
+ }
+ }
+ }
diff --git a/TelpoPush.Service/Cache/RedisUtil.cs b/TelpoPush.Service/Cache/RedisUtil.cs
new file mode 100644
index 0000000..044d2cc
--- /dev/null
+++ b/TelpoPush.Service/Cache/RedisUtil.cs
@@ -0,0 +1,134 @@
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using TelpoPush.Models.CacheTemplates;
+using TelpoPush.Models.Config;
+
+namespace TelpoPush.Service.Cache
+{
+ public class RedisUtil
+ {
+ private const string CACHE_HASH_KEY_TELPO_MANUFACTOR_CONFIG = "TELPO#MANUFACTOR_CONFG_HASH";
+
+ private readonly ILogger _logger;
+ private readonly ServiceConfig _configService;
+ private readonly SqlMapper _sqlMapper;
+ public RedisUtil(ILogger logger,IOptions optConfigRedis, IOptions configService, SqlMapper sqlMapper)
+ {
+ _configService = configService.Value;
+ _logger = logger;
+ optConfigRedis.Value.Prefix = "";
+ optConfigRedis.Value.DefaultDatabase = 7;
+ var csredis = new CSRedis.CSRedisClient(optConfigRedis.Value.ToString());
+ RedisHelper.Initialization(csredis);
+ _sqlMapper = sqlMapper;
+ }
+
+
+ public async Task GetHealthyDeviceKey(string imei)
+ {
+ if (string.IsNullOrWhiteSpace(imei)) return null;
+ string HealthyDeviceKey = $"Telpol:HealthyDeviceKey:{imei}";
+ string keyCache = $"{imei}_HealthyDeviceKey";
+ try
+ {
+ var objCache = MemoryCacheUtil.Get(keyCache);
+ if (objCache == null)
+ {
+ string deviceKey = await RedisHelper.HGetAsync(HealthyDeviceKey, "data");
+ if (!string.IsNullOrEmpty(deviceKey))
+ {
+ MemoryCacheUtil.Set(keyCache, deviceKey, _configService.CacheDurationSeconds10);
+ return deviceKey;
+ }
+ }
+ return objCache;
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError($"GetHealthyDeviceKey,key={imei},缓存异常,重新获取数据,{ex.Message}|{ex.Source}|{ex.StackTrace}");
+ string deviceKey = await RedisHelper.HGetAsync(HealthyDeviceKey, "data");
+ if (!string.IsNullOrEmpty(deviceKey))
+ {
+ MemoryCacheUtil.Set(keyCache, deviceKey, _configService.CacheDurationSeconds10);
+ return deviceKey;
+ }
+ else
+ return "";
+ }
+ }
+
+ //public async Task ManufactorKafkaTopicQuery()
+ //{
+
+ // List topics = new List();
+ // string keyCache = $"ManufactorKafkaConfig";
+ // try
+ // {
+ // var objCache = MemoryCacheUtil.Get>(keyCache);
+ // if (objCache == null)
+ // {
+ // var obj = await RedisHelper.HGetAsync>(CACHE_HASH_KEY_TELPO_MANUFACTOR_CONFIG, imei);
+ // if (obj == null)
+ // {
+ // var manufactors = _sqlMapper.ManufactorKafkaTopicQuery();
+ // if (manufactors != null)
+ // {
+ // foreach (var item in manufactors) {
+ // if (!string.IsNullOrEmpty(item.kafkaTopic))
+ // topics.Add(item.kafkaTopic);
+ // await RedisHelper.HSetAsync(CACHE_HASH_KEY_TELPO_MANUFACTOR_CONFIG, item.manufactorId, item);
+ // }
+ // MemoryCacheUtil.Set(keyCache, topics, _configService.CacheDurationSeconds10);
+ // return topics;
+ // }
+ // }
+ // else
+ // {
+ // foreach (var item in obj)
+ // {
+ // if (!string.IsNullOrEmpty(item.kafkaTopic))
+ // topics.Add(item.kafkaTopic);
+ // await RedisHelper.HSetAsync(CACHE_HASH_KEY_TELPO_MANUFACTOR_CONFIG, item.manufactorId, item);
+ // }
+ // MemoryCacheUtil.Set(keyCache, obj, _configService.CacheDurationSeconds10);
+ // return topics;
+ // }
+ // }
+ // return objCache;
+ // }
+ // catch (Exception ex)
+ // {
+ // _logger.LogError($"ManufactorKafkaTopicQuery,缓存异常,重新获取数据,{ex.Message}|{ex.Source}|{ex.StackTrace}");
+
+ // var manufactors = _sqlMapper.ManufactorKafkaTopicQuery();
+ // if (manufactors != null)
+ // {
+ // foreach (var item in manufactors)
+ // {
+ // if (!string.IsNullOrEmpty(item.kafkaTopic))
+ // topics.Add(item.kafkaTopic);
+ // await RedisHelper.HSetAsync(CACHE_HASH_KEY_TELPO_MANUFACTOR_CONFIG, item.manufactorId, item);
+ // }
+ // MemoryCacheUtil.Set(keyCache, topics, _configService.CacheDurationSeconds10);
+ // return topics;
+ // }
+
+
+ // //var manufactors = _sqlMapper.ManufactorKafkaTopicQuery();
+ // //if (manufactors != null)
+ // //{
+ // // RedisHelper.HSetAsync(CACHE_HASH_KEY_TELPO_MANUFACTOR_CONFIG, imei, manufactor);
+ // // RedisHelper.Set(keyCache, manufactor, _configService.CacheDurationSeconds10);
+ // // return manufactor;
+ // //}
+ // }
+ // return null;
+ //}
+
+ }
+}
diff --git a/TelpoPush.Service/Cache/SqlMapper.cs b/TelpoPush.Service/Cache/SqlMapper.cs
new file mode 100644
index 0000000..658da9e
--- /dev/null
+++ b/TelpoPush.Service/Cache/SqlMapper.cs
@@ -0,0 +1,37 @@
+using Dapper;
+using Microsoft.Extensions.Configuration;
+using MySql.Data.MySqlClient;
+using System;
+using System.Collections.Generic;
+using System.Data;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using TelpoPush.Models.CacheTemplates;
+
+namespace TelpoPush.Service.Cache
+{
+ public class SqlMapper
+ {
+ private readonly IConfiguration _config;
+ private static string gps_conn = "";
+ private static string telcommon_conn = "";
+ private static string healthy_conn = "";
+ public SqlMapper(IConfiguration config)
+ {
+ _config = config;
+ gps_conn = _config["ConnectionStrings:DB_Connection_String"].ToString();
+ telcommon_conn = _config["ConnectionStrings:Telpo_common_ConnString"].ToString();
+ healthy_conn = _config["ConnectionStrings:Telpo_Healthy_ConnString"].ToString();
+ }
+
+ public List ManufactorKafkaTopicQuery()
+ {
+ using (IDbConnection connection = new MySqlConnection(telcommon_conn))
+ {
+ var sql = @"SELECT manufactor_id manufactorId,manufactor_name manufactorName,kafka_servers kafkaServers,kafka_topic kafkaTopic,kafka_username kafkaUsername, kafka_password kafkaPassword FROM manufactor_real_monitor";
+ return connection.Query(sql).ToList();
+ }
+ }
+ }
+}
diff --git a/TelpoPush.Service/Mq/Kafka/IKafkaService.cs b/TelpoPush.Service/Mq/Kafka/IKafkaService.cs
new file mode 100644
index 0000000..22b1bd2
--- /dev/null
+++ b/TelpoPush.Service/Mq/Kafka/IKafkaService.cs
@@ -0,0 +1,9 @@
+using Confluent.Kafka;
+
+namespace TelpoPush.Service.Mq.Kafka
+{
+ public interface IKafkaService
+ {
+ Task SubscribeAsync(Action messageFunc, CancellationToken cancellationToken);
+ }
+}
diff --git a/TelpoPush.Service/Mq/Kafka/KafkaHeader.cs b/TelpoPush.Service/Mq/Kafka/KafkaHeader.cs
new file mode 100644
index 0000000..a334391
--- /dev/null
+++ b/TelpoPush.Service/Mq/Kafka/KafkaHeader.cs
@@ -0,0 +1,15 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace TelpoPush.Service.Mq.Kafka
+{
+ public class KafkaHeader
+ {
+ public static readonly string DataType = "DataType";
+ public static readonly string AlarmType = "AlarmType";
+ public static readonly string OperType = "OperType";
+ }
+}
diff --git a/TelpoPush.Service/Mq/Kafka/KafkaService.cs b/TelpoPush.Service/Mq/Kafka/KafkaService.cs
new file mode 100644
index 0000000..bfb0b00
--- /dev/null
+++ b/TelpoPush.Service/Mq/Kafka/KafkaService.cs
@@ -0,0 +1,138 @@
+using Confluent.Kafka;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+using Newtonsoft.Json;
+using TelpoPush.Models.Config;
+
+namespace TelpoPush.Service.Mq.Kafka
+{
+ public class KafkaService: IKafkaService
+ {
+ private readonly ConsumerConfig _consumerConfig;
+ private readonly IHostEnvironment env;
+ private readonly ILogger logger;
+ private readonly ServiceConfig _configService;
+ public KafkaService(ILogger _logger, IHostEnvironment _env, IOptions optConfigService)
+ {
+ //config = _configuration;
+ _configService = optConfigService.Value;
+ env = _env;
+ logger = _logger;
+
+ _consumerConfig = new ConsumerConfig
+ {
+ BootstrapServers = _configService.KafkaBootstrapServers,
+ SecurityProtocol = SecurityProtocol.SaslSsl,
+ SaslMechanism = SaslMechanism.ScramSha256,
+ GroupId = _configService.KafkaGroupId,
+ SaslUsername = _configService.KafkaSaslUsername,
+ SaslPassword = _configService.KafkaSaslPassword,
+ SslCaLocation = _configService.KafkaSslCaLocation
+ //SslCaLocation = @"D:\THOMAS\Project\SSJL\C#\Net8\TelpoPushThirdSsl\pem\ca-root.pem",
+ EnableAutoCommit = false, // 禁止AutoCommit
+ AutoOffsetReset = AutoOffsetReset.Earliest, // 从最早的开始消费起
+ };
+ }
+
+ public async Task SubscribeAsync(Action messageFunc, CancellationToken cancellationToken)
+ {
+
+ List topics = _configService.KafkaTopics;
+
+ using (var consumer = new ConsumerBuilder(_consumerConfig)
+ .SetErrorHandler((_, e) =>
+ {
+ Console.WriteLine($"Error: {e.Reason}");
+ })
+ .SetStatisticsHandler((_, json) =>
+ {
+ Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} > 消息监听中..");
+ })
+ .SetPartitionsAssignedHandler((c, partitions) =>
+ {
+ string partitionsStr = string.Join(", ", partitions);
+ Console.WriteLine($" - 分配的 kafka 分区: {partitionsStr}");
+ })
+ .SetPartitionsRevokedHandler((c, partitions) =>
+ {
+ string partitionsStr = string.Join(", ", partitions);
+ Console.WriteLine($" - 回收了 kafka 的分区: {partitionsStr}");
+ })
+ .Build())
+ {
+
+ consumer.Subscribe(topics);
+ try
+ {
+ while (true)
+ {
+ try
+ {
+
+ var consumeResult = consumer.Consume(cancellationToken);
+ int DataType = -1, AlarmType = -1, OperType = -1;
+ foreach (var item in consumeResult?.Headers)
+ {
+ if (item.Key == KafkaHeader.DataType)
+ DataType = BitConverter.ToInt32(item.GetValueBytes(), 0);
+ else if (item.Key == KafkaHeader.AlarmType)
+ AlarmType = BitConverter.ToInt32(item.GetValueBytes(), 0);
+ else if (item.Key == KafkaHeader.OperType)
+ OperType = BitConverter.ToInt32(item.GetValueBytes(), 0);
+ }
+ var Headers = new { DataType, AlarmType, OperType };
+ logger.LogInformation($"Consumed topic '{consumeResult.Topic}', message '{consumeResult.Message?.Value}' , headers '{JsonConvert.SerializeObject(Headers)}', at: '{consumeResult?.TopicPartitionOffset}'.");
+ if (consumeResult.IsPartitionEOF)
+ {
+ Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} 已经到底了:{consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");
+ continue;
+ }
+ // 根据消息内容动态决定topic
+ //var dynamicTopic = GetDynamicTopic(consumeResult.Message?.Value);
+ //consumer.Subscribe(new List() { dynamicTopic });
+
+ string messageResult = null;
+ Headers headers = null;
+ try
+ {
+ messageResult = consumeResult.Message.Value;
+ headers = consumeResult.Message.Headers;
+ }
+ catch (Exception ex)
+ {
+ var errorMessage = $" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}【Exception 消息反序列化失败,Value:{consumeResult.Message.Value}】 :{ex.StackTrace?.ToString()}";
+ Console.WriteLine(errorMessage);
+ messageResult = null;
+ }
+ if (!string.IsNullOrEmpty(messageResult)/* && consumeResult.Offset % commitPeriod == 0*/)
+ {
+ string topic = consumeResult.Topic;
+ messageFunc(topic, messageResult, headers);
+
+ try
+ {
+ consumer.Commit(consumeResult);
+ }
+ catch (KafkaException e)
+ {
+ Console.WriteLine(e.Message);
+ }
+ }
+ }
+ catch (ConsumeException e)
+ {
+ Console.WriteLine($"Consume error: {e.Error.Reason}");
+ }
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ Console.WriteLine("Closing consumer.");
+ consumer.Close();
+ }
+ }
+ await Task.CompletedTask;
+ }
+ }
+}
diff --git a/TelpoPush.Service/Mq/Kafka/MessageProducer.cs b/TelpoPush.Service/Mq/Kafka/MessageProducer.cs
new file mode 100644
index 0000000..5466322
--- /dev/null
+++ b/TelpoPush.Service/Mq/Kafka/MessageProducer.cs
@@ -0,0 +1,78 @@
+using Confluent.Kafka;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+using Newtonsoft.Json;
+using TelpoPush.Models.Config;
+
+namespace TelpoPush.Service.Mq.Kafka
+{
+ ///
+ /// 消息生产者
+ ///
+ public class MessageProducer
+ {
+ private readonly ILogger _logger;
+ private readonly ServiceConfig _configService;
+ private readonly IProducer _producer;
+
+ public MessageProducer(ILogger logger, IOptions optConfigService)
+ {
+ _logger = logger;
+ _configService = optConfigService.Value;
+
+ var config = new ProducerConfig
+ {
+ BootstrapServers = _configService.KafkaBootstrapServers,
+ EnableIdempotence = true,
+ Acks = Acks.All,
+ //LingerMs=5000,
+ //BatchNumMessages =1000,
+ //BatchSize=32768,
+ //CompressionType= CompressionType.Lz4,
+ MessageSendMaxRetries = 3
+ };
+ _producer = new ProducerBuilder(config).Build();
+ }
+
+ public Headers CreateHeader(Dictionary pair = null)
+ {
+ if (pair == null)
+ {
+ return null;
+ }
+ else
+ {
+ Headers headers = new Headers();
+
+ foreach (var item in pair)
+ {
+ headers.Add(item.Key, BitConverter.GetBytes(item.Value));
+ }
+ return headers;
+ }
+ }
+
+ public async Task ProduceAsync(List topic, object message)
+ {
+ try
+ {
+ foreach (var item in topic)
+ {
+ // producer = new ProducerBuilder(config).Build();
+ await _producer.ProduceAsync(item.Topic, new Message
+ {
+ Headers = item.Headers,
+ Value = JsonConvert.SerializeObject(message)
+ });
+ }
+ }
+ catch (ProduceException e)
+ {
+ _logger.LogError($"推送到kafka失败,topic: {topic},\n message:{JsonConvert.SerializeObject(message)}: \n{e.Error.Reason}");
+ }
+ }
+
+
+
+ }
+}
diff --git a/TelpoPush.Service/Mq/Kafka/MqProcessMessage.cs b/TelpoPush.Service/Mq/Kafka/MqProcessMessage.cs
new file mode 100644
index 0000000..6040dee
--- /dev/null
+++ b/TelpoPush.Service/Mq/Kafka/MqProcessMessage.cs
@@ -0,0 +1,87 @@
+using Microsoft.Extensions.Logging;
+using Newtonsoft.Json;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using TelpoPush.Models.Dto;
+using TelpoPush.Models.Enum;
+using TelpoPush.Models.MqTemplates;
+using TelpoPush.Models.PushTemplates;
+
+namespace TelpoPush.Service.Mq.Kafka
+{
+ public class MqProcessMessage
+ {
+ private readonly ILogger _logger;
+ private readonly MessageProducer _messageProducer;
+
+ public MqProcessMessage(ILogger logger, MessageProducer producer)
+ {
+ _logger = logger;
+ _messageProducer = producer;
+ }
+ public async Task ProcessWxAlarm(WxModel model, string timeString)
+ {
+ List ls = new List();
+
+ ls.Add(new TopicModel()
+ {
+ Topic = "topic.push.wx",
+ Headers = _messageProducer.CreateHeader(new Dictionary
+ {
+ {MqHeader.DataType,(int)MqDataType.AlarmInfo },
+ })
+ });
+ await _messageProducer.ProduceAsync(ls, new
+ {
+ messageId = string.Format("{0:yyyyMMddHHmmssffff}", DateTime.Now),
+ topic = string.Join(",", ls.Select(e => e.Topic)), //MqTopic.Wx,
+ time = timeString,
+ data = model
+ });
+ _logger.LogInformation($"【成功】Third推送(topic.wx):IMEI<{model.imei}>,pushData:{JsonConvert.SerializeObject(model)}");
+ }
+
+ public async Task ProcessProperty(string imei, BaseModel model, HeadersDto headers)
+ {
+ List ls = new List();
+
+ ls.Add(new TopicModel()
+ {
+ Topic = "topic.push.property",
+ Headers = _messageProducer.CreateHeader(new Dictionary
+ {
+ {MqHeader.DataType,headers.DataType.Value }
+ })
+ });
+ await _messageProducer.ProduceAsync(ls, model);
+ _logger.LogInformation($"【成功】Third推送(topic.property):IMEI<{imei}>,pushData:{JsonConvert.SerializeObject(model)}");
+ }
+
+ public async Task ProcessDataPushServer(string imei, object model, Dictionary headers, string tag)
+ {
+ List ls = new List();
+ ls.Add(new TopicModel()
+ {
+ Topic = "topic.push",
+ Headers = _messageProducer.CreateHeader(headers)
+ });
+ await _messageProducer.ProduceAsync(ls, model);
+ _logger.LogInformation($"【{tag}-成功】Third推送(topic.push):IMEI<{imei}>,Header<{JsonConvert.SerializeObject(headers)}>,pushData:{JsonConvert.SerializeObject(model)}");
+ }
+
+ public async Task ProcessThirdhServer(string imei, object model, Dictionary headers, string tag)
+ {
+ List ls = new List();
+ ls.Add(new TopicModel()
+ {
+ Topic = "topic.push.third",
+ Headers = _messageProducer.CreateHeader(headers)
+ });
+ await _messageProducer.ProduceAsync(ls, model);
+ _logger.LogInformation($"【{tag}-成功】Third推送(topic.push.third):IMEI<{imei}>,Header<{JsonConvert.SerializeObject(headers)}>,pushData:{JsonConvert.SerializeObject(model)}");
+ }
+ }
+}
diff --git a/TelpoPush.Service/Mq/Kafka/TopicModel.cs b/TelpoPush.Service/Mq/Kafka/TopicModel.cs
new file mode 100644
index 0000000..6f26059
--- /dev/null
+++ b/TelpoPush.Service/Mq/Kafka/TopicModel.cs
@@ -0,0 +1,15 @@
+using Confluent.Kafka;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace TelpoPush.Service.Mq.Kafka
+{
+ public class TopicModel
+ {
+ public string Topic { get; set; }
+ public Headers Headers { get; set; }
+ }
+}
diff --git a/TelpoPush.Service/TelpoPush.Service.csproj b/TelpoPush.Service/TelpoPush.Service.csproj
new file mode 100644
index 0000000..46335e1
--- /dev/null
+++ b/TelpoPush.Service/TelpoPush.Service.csproj
@@ -0,0 +1,29 @@
+
+
+
+ net8.0
+ enable
+ enable
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/TelpoPush.Worker.ThirdSsl/Dockerfile b/TelpoPush.Worker.ThirdSsl/Dockerfile
new file mode 100644
index 0000000..f4fc8c5
--- /dev/null
+++ b/TelpoPush.Worker.ThirdSsl/Dockerfile
@@ -0,0 +1,40 @@
+#See https://aka.ms/customizecontainer to learn how to customize your debug container and how Visual Studio uses this Dockerfile to build your images for faster debugging.
+
+FROM mcr.microsoft.com/dotnet/sdk:8.0-alpine AS base
+USER app
+WORKDIR /app
+
+FROM mcr.microsoft.com/dotnet/sdk:8.0-alpine AS build
+ARG BUILD_CONFIGURATION=Release
+WORKDIR /src
+COPY ["TelpoPush.Worker.ThirdSsl/TelpoPush.Worker.ThirdSsl.csproj", "TelpoPush.Worker.ThirdSsl/"]
+COPY ["TelpoPush.Common/TelpoPush.Common.csproj", "TelpoPush.Common/"]
+COPY ["TelpoPush.Models/TelpoPush.Models.csproj", "TelpoPush.Models/"]
+COPY ["TelpoPush.Service/TelpoPush.Service.csproj", "TelpoPush.Service/"]
+
+COPY ["nuget.config","."]
+RUN dotnet nuget remove source nuget.org
+RUN dotnet nuget add source https://repo.huaweicloud.com/repository/nuget/v3/index.json -n huaweicloud_nuget
+
+RUN dotnet restore "./TelpoPush.Worker.ThirdSsl/TelpoPush.Worker.ThirdSsl.csproj"
+COPY . .
+WORKDIR "/src/TelpoPush.Worker.ThirdSsl"
+RUN dotnet build "./TelpoPush.Worker.ThirdSsl.csproj" -c $BUILD_CONFIGURATION -o /app/build
+
+FROM build AS publish
+ARG BUILD_CONFIGURATION=Release
+RUN dotnet publish "./TelpoPush.Worker.ThirdSsl.csproj" -c $BUILD_CONFIGURATION -o /app/publish /p:UseAppHost=false
+
+FROM base AS final
+WORKDIR /app
+COPY --from=publish /app/publish .
+COPY pem /app/pem
+
+
+ENV environment=Development
+ENV TimeZone=Asia/Shanghai
+ENV LANG C.UTF-8
+RUN ln -snf /usr/share/zoneinfo/$TimeZone /etc/localtime && echo $TimeZone > /etc/timezone
+
+#ENTRYPOINT ["dotnet", "TelpoPush.Worker.ThirdSsl.dll"]
+ENTRYPOINT ["sh", "-c", "dotnet TelpoPush.Worker.ThirdSsl.dll --environment=$environment"]
\ No newline at end of file
diff --git a/TelpoPush.Worker.ThirdSsl/Handlers/KafkaSubscribe.cs b/TelpoPush.Worker.ThirdSsl/Handlers/KafkaSubscribe.cs
new file mode 100644
index 0000000..9ba405e
--- /dev/null
+++ b/TelpoPush.Worker.ThirdSsl/Handlers/KafkaSubscribe.cs
@@ -0,0 +1,67 @@
+using Confluent.Kafka;
+using Microsoft.Extensions.Options;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Net;
+using System.Reflection.PortableExecutable;
+using System.Text;
+using System.Threading.Tasks;
+using TelpoPush.Common;
+using TelpoPush.Models.Config;
+using TelpoPush.Service.Mq.Kafka;
+
+namespace TelpoPush.Worker.ThirdSsl.Handlers
+{
+ public class KafkaSubscribe
+ {
+ private readonly ILogger _logger;
+ private readonly IHostEnvironment _env;
+ private readonly IKafkaService _kafkaService;
+ private readonly ThirdSslProcess _thirdSslProcess;
+ private readonly ServiceConfig _configService;
+
+
+ public KafkaSubscribe(
+ ILogger logger, IHostEnvironment env,
+ IKafkaService kafkaService,
+ ThirdSslProcess thirdSslProcess)
+ {
+ _logger = logger;
+ _env = env;
+ _kafkaService = kafkaService;
+ _thirdSslProcess = thirdSslProcess;
+ }
+ public async Task SubscribeAsync()
+ {
+
+ //string msg = "{\"MessageTime\":\"2024-05-13 10:29:09.407\",\"TopicName\":\"topic.push.telpo.zkheartrate\",\"MessageId\":\"1715567350000\",\"IMEI\":\"868437060014409\",\"Content\":\"{\\\"heartRates\\\":{\\\"imei\\\":\\\"868437060014409\\\",\\\"agencyid\\\":\\\"77d562ac-dd78-40ea-af6d-224ada9d70dc\\\",\\\"data\\\":[{\\\"time\\\":1715567340,\\\"value\\\":172,\\\"isAnomaly\\\":1},{\\\"time\\\":1715567342,\\\"value\\\":174,\\\"isAnomaly\\\":1},{\\\"time\\\":1715567344,\\\"value\\\":174,\\\"isAnomaly\\\":1},{\\\"time\\\":1715567346,\\\"value\\\":174,\\\"isAnomaly\\\":1},{\\\"time\\\":1715567348,\\\"value\\\":174,\\\"isAnomaly\\\":1},{\\\"time\\\":1715567350,\\\"value\\\":174,\\\"isAnomaly\\\":1}]}}\"}";
+ //await _thirdSslProcess.SendSslThird(msg, "", null);
+
+ //string msg2 = "{\"MessageTime\":\"2024-05-13 14:10:43.814\",\"TopicName\":\"topic.push.telpo.zkheartrate\",\"MessageId\":\"1715580644000\",\"IMEI\":\"868437060014409\",\"Content\":\"{\\\"anomalyCancel\\\":{\\\"imei\\\":\\\"868437060014409\\\",\\\"agencyid\\\":\\\"77d562ac-dd78-40ea-af6d-224ada9d70dc\\\",\\\"time\\\":1715580644}}\"}";
+
+ //await _thirdSslProcess.SendSslThird(msg2, "", null);
+
+
+ //await _kafkaService.SubscribeAsync(DoReceive, CancellationToken.None);
+
+ LimitedConcurrencyLevelTaskScheduler lcts = new LimitedConcurrencyLevelTaskScheduler(5);
+ TaskFactory factory = new TaskFactory(lcts);
+ try
+ {
+ await factory.StartNew(() =>
+ {
+ _kafkaService.SubscribeAsync(DoReceive, CancellationToken.None);
+ });
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError($"Subscribe 处理Kafka数据发生异常 {ex.Message}|{ex.Source}|{ex.StackTrace}");
+ }
+ }
+ async void DoReceive(string topic, string message, Headers headers)
+ {
+ await _thirdSslProcess.SendSslThird(message, topic, headers);
+ }
+ }
+}
diff --git a/TelpoPush.Worker.ThirdSsl/Handlers/ThirdSslProcess.cs b/TelpoPush.Worker.ThirdSsl/Handlers/ThirdSslProcess.cs
new file mode 100644
index 0000000..2cd6ca2
--- /dev/null
+++ b/TelpoPush.Worker.ThirdSsl/Handlers/ThirdSslProcess.cs
@@ -0,0 +1,71 @@
+using Confluent.Kafka;
+using Microsoft.Extensions.Options;
+using Newtonsoft.Json;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Reflection.PortableExecutable;
+using System.Text;
+using System.Threading.Tasks;
+using TelpoPush.Models.MqTemplates;
+using TelpoPush.Service.Biz;
+
+namespace TelpoPush.Worker.ThirdSsl.Handlers
+{
+ public class ThirdSslProcess
+ {
+ private readonly static object _syncLocker = new object();
+ private readonly IHostEnvironment _env;
+ private readonly ILogger _logger;
+ private readonly IZkRealHRMonitorService _zkRealHRMonitorService;
+
+ public ThirdSslProcess(
+ IHostEnvironment env,
+ ILogger logger,
+ IZkRealHRMonitorService zkRealHRMonitorService)
+ {
+ _env = env;
+ _logger = logger;
+ _zkRealHRMonitorService= zkRealHRMonitorService;
+ }
+
+ public async Task SendSslThird(string? message, string topic, Headers headers)
+ {
+ //lock (_syncLocker)
+ //{
+ _logger.LogInformation($"获取kafka数据Message:<{message}>");
+
+ if (!string.IsNullOrEmpty(message))
+ {
+ BaseModel model = JsonConvert.DeserializeObject(message);
+ if (!string.IsNullOrEmpty(model.Content.ToString()))
+ {
+ switch (model.TopicName)
+ {
+ case "topic.push.telpo.zkheartrate":
+ await DataServiceZkRealHRMonitor(model);
+ break;
+ default:
+ break;
+ }
+ }
+ }
+ // }
+ }
+
+ public async Task DataServiceZkRealHRMonitor(BaseModel model)
+ {
+ string content = model.Content.ToString().ToLower();
+ if (content.Contains("heartrates"))
+ {
+ var data = JsonConvert.DeserializeObject(content);
+ await _zkRealHRMonitorService.Save(data.heartRates, model.MessageId);
+ }
+ else if (content.Contains("anomalycancel"))
+ {
+ var data = JsonConvert.DeserializeObject(content);
+ await _zkRealHRMonitorService.SaveAnomalyCancel(data.anomalyCancel, model.MessageId);
+ }
+ }
+ }
+}
diff --git a/TelpoPush.Worker.ThirdSsl/Program.cs b/TelpoPush.Worker.ThirdSsl/Program.cs
new file mode 100644
index 0000000..97643d6
--- /dev/null
+++ b/TelpoPush.Worker.ThirdSsl/Program.cs
@@ -0,0 +1,103 @@
+using Newtonsoft.Json;
+using Newtonsoft.Json.Serialization;
+using Serilog;
+using TelpoDataService.Util.Clients;
+using TelpoPush.Common;
+using TelpoPush.Models.Config;
+using TelpoPush.Service.Biz;
+using TelpoPush.Service.Cache;
+using TelpoPush.Service.Mq.Kafka;
+using TelpoPush.Worker.ThirdSsl;
+using TelpoPush.Worker.ThirdSsl.Handlers;
+
+
+#region ־
+//using Serilog.Events;
+//using Serilog;
+//using Microsoft.AspNetCore.Builder;
+
+//Log.Logger = new LoggerConfiguration()
+//#if DEBUG
+// .MinimumLevel.Debug()
+//#else
+// .MinimumLevel.Information()
+//#endif
+// .MinimumLevel.Override("Microsoft", LogEventLevel.Information)
+// .MinimumLevel.Override("Microsoft.EntityFrameworkCore", LogEventLevel.Warning)
+// .Enrich.FromLogContext()
+// //.Filter.ByExcluding(c => c.Properties.Any(p => p.Value.ToString().Contains("Microsoft")))//
+// .WriteTo.Async(c => c.File("/var/telpo_pushthird_ssl2/logs/infos/info.log",
+// restrictedToMinimumLevel: LogEventLevel.Information,
+// rollingInterval: RollingInterval.Day,//()
+// //fileSizeLimitBytes: 20971520, //õļСΪ3M Ĭ1G
+// rollOnFileSizeLimit: true, //ļСµ
+// retainedFileCountLimit: 7,//Ĭ31˼ֻ31־ļ"
+// outputTemplate: "{Timestamp:yyyy-MM-dd HH:mm:ss.fff}[{Level:u3}] [Thread-{ThreadId}] [{SourceContext:l}] {Message:lj}{NewLine}{Exception}"
+// )
+// )
+// .WriteTo.Async(c => c.File("/var/telpo_pushthird_ssl2/logs/errors/errors.log",
+// restrictedToMinimumLevel: LogEventLevel.Error,
+// rollingInterval: RollingInterval.Day,
+// rollOnFileSizeLimit: true,
+// retainedFileCountLimit: 7,
+// outputTemplate: "{Timestamp:yyyy-MM-dd HH:mm:ss.fff }[{Level:u3}] [Thread-{ThreadId}] [{SourceContext:l}] {Message:lj}{NewLine}{Exception}"
+// )
+// )
+// .WriteTo.Async(c => c.Console())
+// .CreateLogger();
+
+//ѡļappsetting.json
+var configuration = new ConfigurationBuilder()
+ .SetBasePath(Directory.GetCurrentDirectory())
+ .AddJsonFile("appsettings.json", optional: true, reloadOnChange: true)
+ .Build();
+
+Log.Logger = new LoggerConfiguration()
+ .ReadFrom.Configuration(configuration)
+ .Enrich.WithThreadId()
+ .CreateLogger();
+#endregion
+try
+{
+ Log.Information("Starting up");
+ var builder = Host.CreateApplicationBuilder(args);
+
+ var config = builder.Configuration;
+ builder.Services.Configure(config.GetSection("ServiceConfig"));
+ builder.Services.Configure(config.GetSection("Redis"));
+
+ JsonSerializerSettings setting = new JsonSerializerSettings();
+ JsonConvert.DefaultSettings = () =>
+ {
+ setting.DateFormatString = "yyyy-MM-dd HH:mm:ss";
+ setting.ContractResolver = new CamelCasePropertyNamesContractResolver();
+ return setting;
+ };
+ builder.Services.AddTelpoDataServices(opt =>
+ {
+ opt.TelpoDataUrl = config["ServiceConfig:TelpoDataUrl"];
+ });
+
+ builder.Services.AddSerilog();
+ builder.Services.AddHttpClient();
+ builder.Services.AddTransient();
+ builder.Services.AddSingleton();
+ builder.Services.AddSingleton();
+ builder.Services.AddSingleton();
+ builder.Services.AddSingleton();
+ builder.Services.AddSingleton();
+ builder.Services.AddSingleton();
+ builder.Services.AddSingleton();
+ builder.Services.AddSingleton();
+ builder.Services.AddHostedService();
+ var host = builder.Build();
+ host.Run();
+}
+catch (Exception ex)
+{
+ Log.Fatal(ex, "Application start-up failed");
+}
+finally
+{
+ Log.CloseAndFlush();
+}
\ No newline at end of file
diff --git a/TelpoPush.Worker.ThirdSsl/Properties/launchSettings.json b/TelpoPush.Worker.ThirdSsl/Properties/launchSettings.json
new file mode 100644
index 0000000..82122e0
--- /dev/null
+++ b/TelpoPush.Worker.ThirdSsl/Properties/launchSettings.json
@@ -0,0 +1,15 @@
+{
+ "profiles": {
+ "TelpoPush.Worker.ThirdSsl": {
+ "commandName": "Project",
+ "environmentVariables": {
+ "DOTNET_ENVIRONMENT": "test"
+ },
+ "dotnetRunMessages": true
+ },
+ "Container (Dockerfile)": {
+ "commandName": "Docker"
+ }
+ },
+ "$schema": "http://json.schemastore.org/launchsettings.json"
+}
\ No newline at end of file
diff --git a/TelpoPush.Worker.ThirdSsl/TelpoPush.Worker.ThirdSsl.csproj b/TelpoPush.Worker.ThirdSsl/TelpoPush.Worker.ThirdSsl.csproj
new file mode 100644
index 0000000..2881f9a
--- /dev/null
+++ b/TelpoPush.Worker.ThirdSsl/TelpoPush.Worker.ThirdSsl.csproj
@@ -0,0 +1,26 @@
+
+
+
+ net8.0
+ enable
+ enable
+ dotnet-TelpoPush.Worker.ThirdSsl-c73c0758-309e-49cb-b0cb-49ecc2eaa9fe
+ Linux
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/TelpoPush.Worker.ThirdSsl/Worker.cs b/TelpoPush.Worker.ThirdSsl/Worker.cs
new file mode 100644
index 0000000..13a151f
--- /dev/null
+++ b/TelpoPush.Worker.ThirdSsl/Worker.cs
@@ -0,0 +1,28 @@
+using Microsoft.Extensions.Logging;
+using TelpoPush.Worker.ThirdSsl.Handlers;
+
+namespace TelpoPush.Worker.ThirdSsl
+{
+ public class Worker : BackgroundService
+ {
+ private readonly IHostEnvironment _env;
+ private readonly ILogger _logger;
+ KafkaSubscribe _kafkaSubscribe;
+
+ public Worker(ILogger logger, IHostEnvironment env, KafkaSubscribe kafkaSubscribe)
+ {
+ _env = env;
+ _logger = logger;
+ _kafkaSubscribe = kafkaSubscribe;
+ }
+
+ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
+ {
+ while (!stoppingToken.IsCancellationRequested)
+ {
+ await _kafkaSubscribe.SubscribeAsync();
+ await Task.Delay(30000, stoppingToken);
+ }
+ }
+ }
+}
diff --git a/TelpoPush.Worker.ThirdSsl/appsettings.Development.json b/TelpoPush.Worker.ThirdSsl/appsettings.Development.json
new file mode 100644
index 0000000..11c2110
--- /dev/null
+++ b/TelpoPush.Worker.ThirdSsl/appsettings.Development.json
@@ -0,0 +1,34 @@
+{
+ "ConnectionStrings": {
+ "DB_Connection_String": "server=139.224.254.18;port=3305;database=gps_card;uid=root;pwd=telpo#1234;CharSet=utf8;MinimumPoolSize=10;MaximumPoolSize=1000;SslMode=none",
+ "Telpo_common_ConnString": "server=139.224.254.18;port=3305;database=telpo_common;uid=root;pwd=telpo#1234;CharSet=utf8;MinimumPoolSize=10;MaximumPoolSize=1000;SslMode=none",
+ "Telpo_Healthy_ConnString": "server=139.224.254.18;port=3305;database=telpo_healthy;uid=root;pwd=telpo#1234;CharSet=utf8;MinimumPoolSize=10;MaximumPoolSize=1000;SslMode=none"
+
+ //"DB_Connection_String": "server=rm-uf6j529mu0v6g0btpco.mysql.rds.aliyuncs.com;port=3305;database=gps_card;uid=root;pwd=telpo#1234;CharSet=utf8;MinimumPoolSize=10;MaximumPoolSize=1000;SslMode=none",
+ //"Telpo_common_ConnString": "server=rm-uf6j529mu0v6g0btpco.mysql.rds.aliyuncs.com;port=3305;database=telpo_common;uid=linwl;pwd=linwl#1234;CharSet=utf8;MinimumPoolSize=10;MaximumPoolSize=1000;SslMode=none"
+ },
+ "ServiceConfig": {
+ "TelpoDataUrl": "http://id.ssjlai.com/data",
+ "KafkaBootstrapServers": "k0.id.gdssjl.com:9094", // "172.19.42.53:9092",
+ "KafkaTopics": [ "topic.push.telpo.zkheartrate" ],
+ "KafkaGroupId": "telpo-tcp-group",
+ "KafkaSaslUsername": "telpo-consumer",
+ "KafkaSaslPassword": "telpo-consumer-pwd",
+ "KafkaSslCaLocation": "pem/ca-root.pem",
+ "CacheDurationSeconds": 1200, //20分钟
+ "CacheDurationSeconds10": 600 //10分钟
+ },
+ "Redis": {
+ //"Server": "139.224.254.18:8090",
+ "Server": "47.116.142.20:8090",
+ "Password": "telpo#1234",
+ "DefaultDatabase": 7,
+ "Poolsize": 150,
+ "Preheat": 50,
+ "Prefix": "_Third_",
+ "ConnectTimeout": 5000,
+ "IdleTimeout": 20000
+ },
+
+ "Environment": "dev"
+}
\ No newline at end of file
diff --git a/TelpoPush.Worker.ThirdSsl/appsettings.json b/TelpoPush.Worker.ThirdSsl/appsettings.json
new file mode 100644
index 0000000..9d1e6d3
--- /dev/null
+++ b/TelpoPush.Worker.ThirdSsl/appsettings.json
@@ -0,0 +1,77 @@
+{
+ "Logging": {
+ "LogLevel": {
+ "Default": "Information",
+ "Microsoft": "Warning",
+ "Microsoft.Hosting.Lifetime": "Information",
+ "System.Net.Http.HttpClient": "Warning"
+ }
+ },
+ "Serilog": {
+ "Using": [ "Serilog.Sinks.File", "Serilog.Sinks.Async", "Serilog.Sinks.Console", "Serilog.Sinks.Exceptionless" ],
+ "MinimumLevel": {
+ "Default": "Verbose",
+ "Override": {
+ "Microsoft": "Warning",
+ "Microsoft.Hosting.Lifetime": "Information",
+ "System.Net.Http.HttpClient": "Warning"
+ }
+ },
+ "WriteTo:Information": {
+ "Name": "Async",
+ "Args": {
+ "Configure": [
+ {
+ "Name": "File",
+ "Args": {
+ "RestrictedToMinimumLevel": "Information",
+ "RollingInterval": "Day",
+ "RollOnFileSizeLimit": "true",
+ "OutputTemplate": "{Timestamp:yyyy-MM-dd HH:mm:ss }[{Level:u3}] [Thread-{ThreadId}] [{SourceContext:l}] {Message:lj}{NewLine}{Exception}",
+ "Path": "/var/telpo_pushthird_ssl/logs/infos/info.log",
+ "RetainedFileCountLimit": 7 // "--设置日志文件个数最大值,默认31,意思就是只保留最近的31个日志文件", "等于null时永远保留文件": null
+ // "FileSizeLimitBytes": 20971520, //设置单个文件大小为3M 默认1G
+ // "RollOnFileSizeLimit": true //超过文件大小后创建新的
+
+ }
+ }
+ ]
+ }
+ },
+ "WriteTo:Error": {
+ "Name": "Async",
+ "Args": {
+ "Configure": [
+ {
+ "Name": "File",
+ "Args": {
+ "RestrictedToMinimumLevel": "Error",
+ "RollingInterval": "Day",
+ "RollOnFileSizeLimit": "true",
+ "OutputTemplate": "{Timestamp:yyyy-MM-dd HH:mm:ss }[{Level:u3}] [Thread-{ThreadId}][{SourceContext:l}] {Message:lj}{NewLine}{Exception}",
+ "Path": "/var/telpo_pushthird_ssl/logs/errors/error.log",
+ "RetainedFileCountLimit": 7 // "--设置日志文件个数最大值,默认31,意思就是只保留最近的31个日志文件", "等于null时永远保留文件": null
+ // "FileSizeLimitBytes": 20971520, //设置单个文件大小为3M 默认1G
+ // "RollOnFileSizeLimit": true //超过文件大小后创建新的
+ }
+ }
+ ]
+ }
+ },
+ "WriteTo:Console": {
+ "Name": "Console",
+ "Args": {
+ "restrictedToMinimumLevel": "Verbose",
+ "outputTemplate": "{Timestamp:yyyy-MM-dd HH:mm:ss }[{Level:u3}] [Thread-{ThreadId}] [{SourceContext:l}] {Message:lj}{NewLine}{Exception}",
+ "theme": "Serilog.Sinks.SystemConsole.Themes.AnsiConsoleTheme::Code, Serilog.Sinks.Console"
+ }
+ },
+ "WriteTo:Exceptionless": {
+ "Name": "Exceptionless",
+ "Args": {
+ "serverUrl": "http://8.134.157.154:5000",
+ "apiKey": "f2I0sng4BBQS9IAsbdBK1W71oOYGPpsC66s7pzJu"
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/TelpoPush.Worker.ThirdSsl/appsettings.production.json b/TelpoPush.Worker.ThirdSsl/appsettings.production.json
new file mode 100644
index 0000000..91fbc96
--- /dev/null
+++ b/TelpoPush.Worker.ThirdSsl/appsettings.production.json
@@ -0,0 +1,31 @@
+{
+ "ConnectionStrings": {
+ "DB_Connection_String": "server=rm-uf6j529mu0v6g0btp.mysql.rds.aliyuncs.com;port=3305;database=gps_card;uid=root;pwd=telpo#1234;CharSet=utf8;MinimumPoolSize=10;MaximumPoolSize=1000;SslMode=none",
+ "Telpo_common_ConnString": "server=rm-uf6j529mu0v6g0btp.mysql.rds.aliyuncs.com;port=3305;database=telpo_common;uid=linwl;pwd=linwl#1234;CharSet=utf8;MinimumPoolSize=10;MaximumPoolSize=1000;SslMode=none",
+ "Telpo_Healthy_ConnString": "server=rm-uf6j529mu0v6g0btp.mysql.rds.aliyuncs.com;port=3305;database=telpo_healthy;uid=root;pwd=telpo#1234;CharSet=utf8;MinimumPoolSize=10;MaximumPoolSize=1000;SslMode=none"
+ },
+ "ServiceConfig": {
+ "TelpoDataUrl": "http://ai.ssjlai.com/data",
+ "KafkaBootstrapServers": "k0.id.gdssjl.com:9094", // "172.19.42.53:9092",
+ "KafkaTopics": [ "topic.push.telpo.zkheartrate" ],
+ "KafkaGroupId": "telpo-tcp-group",
+ "KafkaSaslUsername": "telpo-consumer",
+ "KafkaSaslPassword": "telpo-consumer-pwd",
+ "KafkaSslCaLocation": "pem/ca-root.pem",
+ "CacheDurationSeconds": 1200, //20分钟
+ "CacheDurationSeconds10": 600 //10分钟
+ },
+ "Redis": {
+ "Server": "139.224.254.18:8090",
+ //"Server": "47.116.142.20:8090",
+ "Password": "telpo#1234",
+ "DefaultDatabase": 7,
+ "Poolsize": 150,
+ "Preheat": 50,
+ "Prefix": "_Third_",
+ "ConnectTimeout": 5000,
+ "IdleTimeout": 20000
+ },
+
+ "Environment": "production"
+}
\ No newline at end of file
diff --git a/TelpoPush.Worker.ThirdSsl/appsettings.test.json b/TelpoPush.Worker.ThirdSsl/appsettings.test.json
new file mode 100644
index 0000000..f48daa6
--- /dev/null
+++ b/TelpoPush.Worker.ThirdSsl/appsettings.test.json
@@ -0,0 +1,34 @@
+{
+ "ConnectionStrings": {
+ "DB_Connection_String": "server=139.224.254.18;port=3305;database=gps_card;uid=root;pwd=telpo#1234;CharSet=utf8;MinimumPoolSize=10;MaximumPoolSize=1000;SslMode=none",
+ "Telpo_common_ConnString": "server=139.224.254.18;port=3305;database=telpo_common;uid=root;pwd=telpo#1234;CharSet=utf8;MinimumPoolSize=10;MaximumPoolSize=1000;SslMode=none",
+ "Telpo_Healthy_ConnString": "server=139.224.254.18;port=3305;database=telpo_healthy;uid=root;pwd=telpo#1234;CharSet=utf8;MinimumPoolSize=10;MaximumPoolSize=1000;SslMode=none"
+
+ //"DB_Connection_String": "server=rm-uf6j529mu0v6g0btpco.mysql.rds.aliyuncs.com;port=3305;database=gps_card;uid=root;pwd=telpo#1234;CharSet=utf8;MinimumPoolSize=10;MaximumPoolSize=1000;SslMode=none",
+ //"Telpo_common_ConnString": "server=rm-uf6j529mu0v6g0btpco.mysql.rds.aliyuncs.com;port=3305;database=telpo_common;uid=linwl;pwd=linwl#1234;CharSet=utf8;MinimumPoolSize=10;MaximumPoolSize=1000;SslMode=none"
+ },
+ "ServiceConfig": {
+ "TelpoDataUrl": "http://id.ssjlai.com/data",
+ "KafkaBootstrapServers": "k0.id.gdssjl.com:9094", // "172.19.42.53:9092",
+ "KafkaTopics": [ "topic.push.telpo.zkheartrate" ],
+ "KafkaGroupId": "telpo-tcp-group",
+ "KafkaSaslUsername": "telpo-consumer",
+ "KafkaSaslPassword": "telpo-consumer-pwd",
+ "KafkaSslCaLocation": "pem/ca-root.pem",
+ "CacheDurationSeconds": 1200, //20分钟
+ "CacheDurationSeconds10": 600 //10分钟
+ },
+ "Redis": {
+ //"Server": "139.224.254.18:8090",
+ "Server": "47.116.142.20:8090",
+ "Password": "telpo#1234",
+ "DefaultDatabase": 7,
+ "Poolsize": 150,
+ "Preheat": 50,
+ "Prefix": "_Third_",
+ "ConnectTimeout": 5000,
+ "IdleTimeout": 20000
+ },
+
+ "Environment": "test"
+}
\ No newline at end of file
diff --git a/TelpoPushThirdSsl.sln b/TelpoPushThirdSsl.sln
new file mode 100644
index 0000000..322f408
--- /dev/null
+++ b/TelpoPushThirdSsl.sln
@@ -0,0 +1,43 @@
+
+Microsoft Visual Studio Solution File, Format Version 12.00
+# Visual Studio Version 17
+VisualStudioVersion = 17.9.34723.18
+MinimumVisualStudioVersion = 10.0.40219.1
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TelpoPush.Worker.ThirdSsl", "TelpoPush.Worker.ThirdSsl\TelpoPush.Worker.ThirdSsl.csproj", "{3E174613-0E05-49C4-9DAE-B3556D183697}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TelpoPush.Common", "TelpoPush.Common\TelpoPush.Common.csproj", "{E53008EA-CF26-4539-A419-33E30475A279}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TelpoPush.Models", "TelpoPush.Models\TelpoPush.Models.csproj", "{68D9CFA2-857A-46D0-8A3C-E9AE77538A93}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TelpoPush.Service", "TelpoPush.Service\TelpoPush.Service.csproj", "{1D83D4E9-FDFD-4CD5-AE03-F0FBAA8C1193}"
+EndProject
+Global
+ GlobalSection(SolutionConfigurationPlatforms) = preSolution
+ Debug|Any CPU = Debug|Any CPU
+ Release|Any CPU = Release|Any CPU
+ EndGlobalSection
+ GlobalSection(ProjectConfigurationPlatforms) = postSolution
+ {3E174613-0E05-49C4-9DAE-B3556D183697}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {3E174613-0E05-49C4-9DAE-B3556D183697}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {3E174613-0E05-49C4-9DAE-B3556D183697}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {3E174613-0E05-49C4-9DAE-B3556D183697}.Release|Any CPU.Build.0 = Release|Any CPU
+ {E53008EA-CF26-4539-A419-33E30475A279}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {E53008EA-CF26-4539-A419-33E30475A279}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {E53008EA-CF26-4539-A419-33E30475A279}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {E53008EA-CF26-4539-A419-33E30475A279}.Release|Any CPU.Build.0 = Release|Any CPU
+ {68D9CFA2-857A-46D0-8A3C-E9AE77538A93}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {68D9CFA2-857A-46D0-8A3C-E9AE77538A93}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {68D9CFA2-857A-46D0-8A3C-E9AE77538A93}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {68D9CFA2-857A-46D0-8A3C-E9AE77538A93}.Release|Any CPU.Build.0 = Release|Any CPU
+ {1D83D4E9-FDFD-4CD5-AE03-F0FBAA8C1193}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {1D83D4E9-FDFD-4CD5-AE03-F0FBAA8C1193}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {1D83D4E9-FDFD-4CD5-AE03-F0FBAA8C1193}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {1D83D4E9-FDFD-4CD5-AE03-F0FBAA8C1193}.Release|Any CPU.Build.0 = Release|Any CPU
+ EndGlobalSection
+ GlobalSection(SolutionProperties) = preSolution
+ HideSolutionNode = FALSE
+ EndGlobalSection
+ GlobalSection(ExtensibilityGlobals) = postSolution
+ SolutionGuid = {E0EFD813-32E2-4D4C-B69A-493EFBBA7EE9}
+ EndGlobalSection
+EndGlobal
diff --git a/nuget.config b/nuget.config
new file mode 100644
index 0000000..365f5c7
--- /dev/null
+++ b/nuget.config
@@ -0,0 +1,7 @@
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/pem/ca-root.pem b/pem/ca-root.pem
new file mode 100644
index 0000000..f035779
--- /dev/null
+++ b/pem/ca-root.pem
@@ -0,0 +1,29 @@
+-----BEGIN CERTIFICATE-----
+MIIE/zCCAuegAwIBAgIJAIgzt1mx6ZClMA0GCSqGSIb3DQEBCwUAMBYxFDASBgNV
+BAMMC2thZmthLWFkbWluMB4XDTI0MDUwNjA2NDE1NloXDTM0MDUwNDA2NDE1Nlow
+FjEUMBIGA1UEAwwLa2Fma2EtYWRtaW4wggIiMA0GCSqGSIb3DQEBAQUAA4ICDwAw
+ggIKAoICAQDUCuuolDbpE6O8IvAK0YDvK+p96hjQ4bqD7UWKzie60q2OWHOfrcr3
+ra4FYiEtpZMSa52Y9K+VmXnP/Ol1kJHqcJPyIEkgqnC3nMDLNtO4BTqbXb7/eL7n
+ln+Sn4v96Qc6+248yV87Of7NhW0Ou7Q7Qa7Y8Podvox6CANwQcVDAuAp1segHtsl
+3w3Tmz4Ty9A919lGLfSRr1XQUGGt2DLrU4GajBEs8FJoWrCWnOBH251oSwQrisLw
++/PNRR8f8NE9uxE1/qmWwwtPg0a978WaUpqaLLUd7dU+TrbbptkAAcTdV0+JPrGJ
+7JCleO9akmquRrSVQfKXKCq0ardQpndGEGhfAxoqPi5BuFyn0Sx2d+NsDIQKyW7U
+e7lwLafrXFwnNmKPCfigE4IvPonz6HcJORRtJDzORbjSCB347MMlkmKY+ANB5fKc
+UiqEaV+/NOw99/qNICEmBUa24aU3O5ASr5hPGNz+vAtGDGKokf0K9CLcPE/FKkvY
+AYzM4O/kyqJhWgs8qoHITnv0BTb9psJEZp5VLJP6iPQhJXksnCQRX9+e0Rd+4d5B
+d6JZ++KfmgxMy0YhTqz58IZe0yHolxTE6S0S5IULKkPk8GnIHPgw7/zfvsOM/eFs
+676401tafc/Y7gC3H/GEPHo/T8/tuHLGz7GKwcytUFQdI2YgBl0kMwIDAQABo1Aw
+TjAdBgNVHQ4EFgQULe3CAIvXjKYJOGF+zt1Lm87u6p0wHwYDVR0jBBgwFoAULe3C
+AIvXjKYJOGF+zt1Lm87u6p0wDAYDVR0TBAUwAwEB/zANBgkqhkiG9w0BAQsFAAOC
+AgEAfktPTwfBKTONVi5ILH6/70bvIxNRQljFdRP6Ktj76FlmkGXetYQMh0Fc+Wwd
+meaU7z3JFV3BJOlQkh7UPKTbmXfIrk+Kvf14sSKYf4O47GatoSY/ifZrocs+owr2
+FP/6Oh3l70+vcUbLmU/328VHHumAuZjQ/199CnKgvgarEPRsekHDg0erzCu7fwVk
+9ZIpRW5Kiyav+VBbg2SUVxaOHeGzy1eHxWHYeUpBpryLh4VDU1Va9LPR6nhmKPQI
+GzJUgDjx9OK7Lhr6IHDWGbbOjKmHhL5nPtVOi9JPqTdJ0DXHhj94RlQdw3OxCa47
+ZhvCE5re8fJGtDpExQBbaPTXR0SGJ/q8td9bcJ+zHLcCALKXwHW7cf21YiPoiQ9J
+I1DmYIwxEUqbngpCA6guDoL0HONVPOwG1SG/yLHy+jaKaMHUHTptbiRpPf2znxSm
+UxUxEUEg+bhtoHd3Areky4RsuPQUFbDyYEDtduSjV+00vAicxcC/sLeZRWoKsPfb
+Bp4xf7Jn5OO+jNcTfjoXnlVpyBI7zdwkbyMdF/xxcaj+POIgaljTQHoGxc31taJo
+vwlNqD7oLHysuL06Qzos8nEbJMT/MJzDOSIRZ3FL3aTtdnP92j/G6hVoN5XnAAXS
+O31Nvpo+6guwJmFg4SuBGwT8x2wlM35nHs84vR36ScNBvq8=
+-----END CERTIFICATE-----
diff --git a/setup_production.sh b/setup_production.sh
new file mode 100644
index 0000000..3d84c2b
--- /dev/null
+++ b/setup_production.sh
@@ -0,0 +1,18 @@
+#!/usr/bin/env bash
+image_version=$version
+# 删除镜像
+docker rmi -f $(
+ docker images | grep registry.cn-shanghai.aliyuncs.com/gps_card/telpo_push_third_ssl | awk '{print $3}'
+)
+# 构建telpo/mrp:$image_version镜像
+docker build -f ./TelpoPush.Worker.ThirdSsl/Dockerfile . -t telpo/telpo_push_third_ssl:$image_version
+#TODO:推送镜像到阿里仓库
+echo '=================开始推送镜像======================='
+docker login --username=rzl_wangjx@1111649216405698 --password=telpo.123 registry.cn-shanghai.aliyuncs.com
+docker tag telpo/telpo_push_third_ssl:$image_version registry.cn-shanghai.aliyuncs.com/gps_card/telpo_push_third_ssl:$image_version
+docker push registry.cn-shanghai.aliyuncs.com/gps_card/telpo_push_third_ssl:$image_version
+echo '=================推送镜像完成======================='
+#删除产生的None镜像
+docker rmi -f $(docker images | grep none | awk '{print $3}')
+# 查看镜像列表
+docker images
diff --git a/setup_test.sh b/setup_test.sh
new file mode 100644
index 0000000..f863a51
--- /dev/null
+++ b/setup_test.sh
@@ -0,0 +1,17 @@
+#!/usr/bin/env bash
+image_version=$version
+# 删除镜像
+docker rmi -f $(
+ docker images | grep 139.224.254.18:5000/telpo_push_third_ssl | awk '{print $3}'
+)
+# 构建telpo/mrp:$image_version镜像
+docker build -f ./TelpoPush.Worker.ThirdSsl/Dockerfile . -t telpo/telpo_push_third_ssl:$image_version
+#TODO:推送镜像到私有仓库
+echo '=================开始推送镜像======================='
+docker tag telpo/telpo_push_third_ssl:$image_version 139.224.254.18:5000/telpo_push_third_ssl:$image_version
+docker push 139.224.254.18:5000/telpo_push_third_ssl:$image_version
+echo '=================推送镜像完成======================='
+#删除产生的None镜像
+docker rmi -f $(docker images | grep none | awk '{print $3}')
+# 查看镜像列表
+docker images
\ No newline at end of file
diff --git a/telpo_push_third_ssl_run.sh b/telpo_push_third_ssl_run.sh
new file mode 100644
index 0000000..0eb60d4
--- /dev/null
+++ b/telpo_push_third_ssl_run.sh
@@ -0,0 +1,27 @@
+#!/bin/bash
+environment=$1
+version=$2
+echo "环境变量为${environment},版本为$version!"
+if [[ ${environment} == 'production' ]]; then
+ echo "开始远程构建容器"
+ docker stop telpo_push_third_ssl || true
+ docker rm telpo_push_third_ssl || true
+ docker rmi -f $(docker images | grep registry.cn-shanghai.aliyuncs.com/gps_card/telpo_push_third_ssl | awk '{print $3}')
+ docker login --username=rzl_wangjx@1111649216405698 --password=telpo.123 registry.cn-shanghai.aliyuncs.com
+ docker pull registry.cn-shanghai.aliyuncs.com/gps_card/telpo_push_third_ssl:$version
+ docker run -d --network=host -e environment=production -v /home/data/telpo_push_third_ssl/log:/var/telpo_pushthird_ssl/logs --restart=always --name telpo_push_third_ssl registry.cn-shanghai.aliyuncs.com/gps_card/telpo_push_third_ssl:$version;
+ #删除产生的None镜像
+ docker rmi -f $(docker images | grep none | awk '{print $3}')
+ docker ps -a
+
+elif [[ ${environment} == 'test' ]]; then
+ echo "开始在测试环境远程构建容器"
+ docker stop telpo_push_third_ssl || true
+ docker rm telpo_push_third_ssl || true
+ docker rmi -f $(docker images | grep 139.224.254.18:5000/telpo_push_third_ssl | awk '{print $3}')
+ docker pull 139.224.254.18:5000/telpo_push_third_ssl:$version
+ docker run -d --network=host -e environment=${environment} -v /home/data/telpo_push_third_ssl/log:/var/telpo_pushthird_ssl/logs --restart=always --name telpo_push_third_ssl 139.224.254.18:5000/telpo_push_third_ssl:$version;
+ #删除产生的None镜像
+ docker rmi -f $(docker images | grep none | awk '{print $3}')
+ docker ps -a
+fi
\ No newline at end of file