diff --git a/TelpoPush.Position.Worker/Common/HttpHelperAsync.cs b/TelpoPush.Position.Worker/Common/HttpHelperAsync.cs new file mode 100644 index 0000000..c691226 --- /dev/null +++ b/TelpoPush.Position.Worker/Common/HttpHelperAsync.cs @@ -0,0 +1,460 @@ +using Microsoft.Extensions.Logging; +using Newtonsoft.Json; +using System; +using System.Collections.Generic; +using System.Net.Http; +using System.Net.Http.Headers; +using System.Text; +using System.Threading.Tasks; + +namespace TelpoPush.Position.Worker.Common +{ + /// + /// HTTP帮助类 + /// + public class HttpHelperAsync + { + private IHttpClientFactory _httpClientFactory; + private readonly ILogger _logger; + public HttpHelperAsync(IHttpClientFactory httpClientFactory, ILogger logger) + { + _httpClientFactory = httpClientFactory; + _logger = logger; + } + + + #region 异步 + /// + /// 发起POST异步请求表单 + /// + /// 请求地址 + /// POST提交的内容 + /// POST内容的媒体类型,如:application/xml、application/json + /// HTTP响应上的content-type内容头的值,如:application/xml、application/json、application/text、application/x-www-form-urlencoded等 + /// 请求头信息 + /// 请求超时时间,单位秒 + /// 返回string + public async Task PostFormAsync(string url, MultipartFormDataContent content, + Dictionary headers = null, + int timeOut = 5) + { + try + { + var hostName = GetHostName(url); + using (HttpClient client = _httpClientFactory.CreateClient(hostName)) + { + client.Timeout = TimeSpan.FromSeconds(timeOut); + if (headers?.Count > 0) + { + foreach (string key in headers.Keys) + { + client.DefaultRequestHeaders.Add(key, headers[key]); + } + } + content.Headers.Add("ContentType", "multipart/form-data");//声明头部 + using (HttpResponseMessage response = await client.PostAsync(url, content)) + { + return JsonConvert.SerializeObject(new { response.IsSuccessStatusCode, response.StatusCode }); + //if (response.IsSuccessStatusCode) + //{ + // string responseString = await response.Content.ReadAsStringAsync(); + // return responseString; + //} + //else + //{ + // return string.Empty; + //} + } + } + } + catch (Exception ex) + { + return $"推送完成:请求响应超过{timeOut}秒,异常 {ex.Message}"; + } + } + + + + /// + /// 发起GET异步请求 + /// + /// 返回类型 + /// 请求地址 + /// 请求头信息 + /// 请求超时时间,单位秒 + /// 返回string + public async Task GetAsync(string url, Dictionary headers = null, int timeOut = 30) + { + var hostName = GetHostName(url); + using (HttpClient client = _httpClientFactory.CreateClient(hostName)) + { + client.Timeout = TimeSpan.FromSeconds(timeOut); + if (headers?.Count > 0) + { + foreach (string key in headers.Keys) + { + client.DefaultRequestHeaders.Add(key, headers[key]); + } + } + using (HttpResponseMessage response = await client.GetAsync(url)) + { + if (response.IsSuccessStatusCode) + { + string responseString = await response.Content.ReadAsStringAsync(); + return responseString; + } + else + { + return string.Empty; + } + } + } + } + + + /// + /// 发起POST异步请求 + /// + /// 请求地址 + /// POST提交的内容 + /// POST内容的媒体类型,如:application/xml、application/json + /// HTTP响应上的content-type内容头的值,如:application/xml、application/json、application/text、application/x-www-form-urlencoded等 + /// 请求头信息 + /// 请求超时时间,单位秒 + /// 返回string + public async Task PostAsync(string url, string body, + Dictionary headers = null, + int timeOut = 30, + string bodyMediaType = "application/json", + string responseContentType = "application/json;charset=utf-8") + { + + //content.Headers.ContentType = new MediaTypeHeaderValue("application/json"); + + //var clientHandler = new HttpClientHandler + //{ + // ServerCertificateCustomValidationCallback = (message, certificate2, arg3, arg4) => true + //}; + //using (var client = new HttpClient(clientHandler)) + //{ + // client.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json")); + + try + { + var hostName = GetHostName(url); + using (HttpClient client = _httpClientFactory.CreateClient(hostName)) + { + client.Timeout = TimeSpan.FromSeconds(timeOut); + if (headers?.Count > 0) + { + foreach (string key in headers.Keys) + { + client.DefaultRequestHeaders.Add(key, headers[key]); + } + } + StringContent content = new StringContent(body, System.Text.Encoding.UTF8, mediaType: bodyMediaType); + if (!string.IsNullOrWhiteSpace(responseContentType)) + { + content.Headers.ContentType = System.Net.Http.Headers.MediaTypeHeaderValue.Parse(responseContentType); + } + using (HttpResponseMessage response = await client.PostAsync(url, content)) + { + if (response.IsSuccessStatusCode) + { + string responseString = await response.Content.ReadAsStringAsync(); + return responseString; + } + else + return $"请求异常:{response.IsSuccessStatusCode},{response.StatusCode}"; + } + } + } + catch(Exception ex) + { + return $"请求异常:{ex.Message}"; + } + } + + /// + /// 发起POST异步请求 + /// + /// 请求地址 + /// POST提交的内容 + /// POST内容的媒体类型,如:application/xml、application/json + /// HTTP响应上的content-type内容头的值,如:application/xml、application/json、application/text、application/x-www-form-urlencoded等 + /// 请求头信息 + /// 请求超时时间,单位秒 + /// 返回string + public async Task PutAsync(string url, string body, + string bodyMediaType = null, + string responseContentType = null, + Dictionary headers = null, + int timeOut = 30) + { + var hostName = GetHostName(url); + using (HttpClient client = _httpClientFactory.CreateClient(hostName)) + { + client.Timeout = TimeSpan.FromSeconds(timeOut); + if (headers?.Count > 0) + { + foreach (string key in headers.Keys) + { + client.DefaultRequestHeaders.Add(key, headers[key]); + } + } + StringContent content = new StringContent(body, System.Text.Encoding.UTF8, mediaType: bodyMediaType); + if (!string.IsNullOrWhiteSpace(responseContentType)) + { + content.Headers.ContentType = System.Net.Http.Headers.MediaTypeHeaderValue.Parse(responseContentType); + } + using (HttpResponseMessage response = await client.PutAsync(url, content)) + { + if (response.IsSuccessStatusCode) + { + string responseString = await response.Content.ReadAsStringAsync(); + return responseString; + } + else + { + return string.Empty; + } + } + } + } + + /// + /// 发起GET异步请求 + /// + /// 返回类型 + /// 请求地址 + /// 请求头信息 + /// 请求超时时间,单位秒 + /// 返回string + public async Task DeleteAsync(string url, Dictionary headers = null, int timeOut = 30) + { + var hostName = GetHostName(url); + using (HttpClient client = _httpClientFactory.CreateClient(hostName)) + { + client.Timeout = TimeSpan.FromSeconds(timeOut); + if (headers?.Count > 0) + { + foreach (string key in headers.Keys) + { + client.DefaultRequestHeaders.Add(key, headers[key]); + } + } + using (HttpResponseMessage response = await client.DeleteAsync(url)) + { + if (response.IsSuccessStatusCode) + { + string responseString = await response.Content.ReadAsStringAsync(); + return responseString; + } + else + { + return string.Empty; + } + } + } + } + + /// + /// 发起GET异步请求 + /// + /// 返回类型 + /// 请求地址 + /// 请求头信息 + /// 请求超时时间,单位秒 + /// 返回T + public async Task GetAsync(string url, Dictionary headers = null, int timeOut = 30) where T : new() + { + string responseString = await GetAsync(url, headers, timeOut); + if (!string.IsNullOrWhiteSpace(responseString)) + { + return JsonConvert.DeserializeObject(responseString); + } + else + { + return default(T); + } + } + + + /// + /// 发起POST异步请求 + /// + /// 返回类型 + /// 请求地址 + /// POST提交的内容 + /// POST内容的媒体类型,如:application/xml、application/json + /// HTTP响应上的content-type内容头的值,如:application/xml、application/json、application/text、application/x-www-form-urlencoded等 + /// 请求头信息 + /// 请求超时时间,单位秒 + /// 返回T + public async Task PostAsync(string url, string body, + Dictionary headers = null, + int timeOut = 30, + string bodyMediaType = "application/json", + string responseContentType = "application/json;charset=utf-8" + ) where T : new() + { + string responseString = await PostAsync(url, body, headers, timeOut, bodyMediaType, responseContentType); + if (!string.IsNullOrWhiteSpace(responseString)) + { + return JsonConvert.DeserializeObject(responseString); + } + else + { + return default(T); + } + } + + /// + /// 发起PUT异步请求 + /// + /// 返回类型 + /// 请求地址 + /// POST提交的内容 + /// POST内容的媒体类型,如:application/xml、application/json + /// HTTP响应上的content-type内容头的值,如:application/xml、application/json、application/text、application/x-www-form-urlencoded等 + /// 请求头信息 + /// 请求超时时间,单位秒 + /// 返回T + public async Task PutAsync(string url, string body, + string bodyMediaType = null, + string responseContentType = null, + Dictionary headers = null, + int timeOut = 30) where T : new() + { + string responseString = await PutAsync(url, body, bodyMediaType, responseContentType, headers, timeOut); + if (!string.IsNullOrWhiteSpace(responseString)) + { + return JsonConvert.DeserializeObject(responseString); + } + else + { + return default(T); + } + } + + /// + /// 发起DELETE异步请求 + /// + /// 返回类型 + /// 请求地址 + /// 请求头信息 + /// 请求超时时间,单位秒 + /// 返回T + public async Task DeleteAsync(string url, Dictionary headers = null, int timeOut = 30) where T : new() + { + string responseString = await DeleteAsync(url, headers, timeOut); + if (!string.IsNullOrWhiteSpace(responseString)) + { + return JsonConvert.DeserializeObject(responseString); + } + else + { + return default(T); + } + } + #region 私有函数 + + /// + /// 获取请求的主机名 + /// + /// + /// + private static string GetHostName(string url) + { + if (!string.IsNullOrWhiteSpace(url)) + { + return url.Replace("https://", "").Replace("http://", "").Split('/')[0]; + } + else + { + return "AnyHost"; + } + } + + #endregion + + + #endregion + + #region 同步 + + /// + /// 发起GET同步请求 + /// + /// + /// + /// + /// + /// + + + public string HttpGet(string url, Dictionary headers = null, string contentType = "application/json;charset=utf-8") + { + if (string.IsNullOrEmpty(url)) return ""; + try + { + using (HttpClient client = new HttpClient()) + { + if (contentType != null) + client.DefaultRequestHeaders.Add("ContentType", contentType); + if (headers != null) + { + foreach (var header in headers) + client.DefaultRequestHeaders.Add(header.Key, header.Value); + } + HttpResponseMessage response = client.GetAsync(url).Result; + return response.Content.ReadAsStringAsync().Result; + } + } + catch (Exception ex) + { + _logger.LogDebug($"HttpGet/URL:{url},headers:{JsonConvert.SerializeObject(headers)},异常:{ex.Message}|{ex.Source}|{ex.StackTrace}"); + } + return ""; + } + /// + /// 发起POST同步请求 + /// + /// + /// + /// application/xml、application/json、application/text、application/x-www-form-urlencoded + /// 填充消息头 + /// + public string HttpPost(string url, string postData = null, Dictionary headers = null, string contentType = "application/json") + { + if (string.IsNullOrEmpty(url)) return ""; + try + { + postData = postData ?? ""; + using (HttpClient client = new HttpClient()) + { + if (headers != null) + { + foreach (var header in headers) + client.DefaultRequestHeaders.Add(header.Key, header.Value); + } + using (HttpContent httpContent = new StringContent(postData, Encoding.UTF8)) + { + if (contentType != null) + httpContent.Headers.ContentType = new MediaTypeHeaderValue(contentType); + + HttpResponseMessage response = client.PostAsync(url, httpContent).Result; + return response.Content.ReadAsStringAsync().Result; + } + } + } + catch (Exception ex){ + _logger.LogDebug($"HttpPost/URL:{url},postStr:{postData},headers:{JsonConvert.SerializeObject(headers)},异常:{ex.Message}|{ex.Source}|{ex.StackTrace}"); + } + return ""; + } + + + #endregion + } +} \ No newline at end of file diff --git a/TelpoPush.Position.Worker/Common/LimitedConcurrencyLevelTaskScheduler.cs b/TelpoPush.Position.Worker/Common/LimitedConcurrencyLevelTaskScheduler.cs new file mode 100644 index 0000000..89b65dc --- /dev/null +++ b/TelpoPush.Position.Worker/Common/LimitedConcurrencyLevelTaskScheduler.cs @@ -0,0 +1,137 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +namespace TelpoPush.Position.Worker.Common +{ + /// + /// Provides a task scheduler that ensures a maximum concurrency level while + /// running on top of the ThreadPool. + /// + public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler + { + /// Whether the current thread is processing work items. + [ThreadStatic] + private static bool _currentThreadIsProcessingItems; + /// The list of tasks to be executed. + private readonly LinkedList _tasks = new LinkedList(); // protected by lock(_tasks) + /// The maximum concurrency level allowed by this scheduler. + private readonly int _maxDegreeOfParallelism; + /// Whether the scheduler is currently processing work items. + private int _delegatesQueuedOrRunning = 0; // protected by lock(_tasks) + + /// + /// Initializes an instance of the LimitedConcurrencyLevelTaskScheduler class with the + /// specified degree of parallelism. + /// + /// The maximum degree of parallelism provided by this scheduler. + public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism) + { + if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism"); + _maxDegreeOfParallelism = maxDegreeOfParallelism; + } + + /// Queues a task to the scheduler. + /// The task to be queued. + protected sealed override void QueueTask(Task task) + { + // Add the task to the list of tasks to be processed. If there aren't enough + // delegates currently queued or running to process tasks, schedule another. + lock (_tasks) + { + _tasks.AddLast(task); + if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism) + { + ++_delegatesQueuedOrRunning; + NotifyThreadPoolOfPendingWork(); + } + } + } + + /// + /// Informs the ThreadPool that there's work to be executed for this scheduler. + /// + private void NotifyThreadPoolOfPendingWork() + { + ThreadPool.UnsafeQueueUserWorkItem(_ => + { + // Note that the current thread is now processing work items. + // This is necessary to enable inlining of tasks into this thread. + _currentThreadIsProcessingItems = true; + try + { + // Process all available items in the queue. + while (true) + { + Task item; + lock (_tasks) + { + // When there are no more items to be processed, + // note that we're done processing, and get out. + if (_tasks.Count == 0) + { + --_delegatesQueuedOrRunning; + break; + } + + // Get the next item from the queue + item = _tasks.First.Value; + _tasks.RemoveFirst(); + } + + // Execute the task we pulled out of the queue + base.TryExecuteTask(item); + } + } + // We're done processing items on the current thread + finally { _currentThreadIsProcessingItems = false; } + }, null); + } + + /// Attempts to execute the specified task on the current thread. + /// The task to be executed. + /// + /// Whether the task could be executed on the current thread. + protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) + { + // If this thread isn't already processing a task, we don't support inlining + if (!_currentThreadIsProcessingItems) return false; + + // If the task was previously queued, remove it from the queue + if (taskWasPreviouslyQueued) TryDequeue(task); + + // Try to run the task. + return base.TryExecuteTask(task); + } + + /// Attempts to remove a previously scheduled task from the scheduler. + /// The task to be removed. + /// Whether the task could be found and removed. + protected sealed override bool TryDequeue(Task task) + { + lock (_tasks) return _tasks.Remove(task); + } + + /// Gets the maximum concurrency level supported by this scheduler. + public sealed override int MaximumConcurrencyLevel { get { return _maxDegreeOfParallelism; } } + + /// Gets an enumerable of the tasks currently scheduled on this scheduler. + /// An enumerable of the tasks currently scheduled. + protected sealed override IEnumerable GetScheduledTasks() + { + bool lockTaken = false; + try + { + Monitor.TryEnter(_tasks, ref lockTaken); + if (lockTaken) return _tasks.ToArray(); + else throw new NotSupportedException(); + } + finally + { + if (lockTaken) Monitor.Exit(_tasks); + } + } + } +} diff --git a/TelpoPush.Position.Worker/Common/TimeHelper.cs b/TelpoPush.Position.Worker/Common/TimeHelper.cs new file mode 100644 index 0000000..da65dab --- /dev/null +++ b/TelpoPush.Position.Worker/Common/TimeHelper.cs @@ -0,0 +1,33 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace TelpoPush.Position.Worker.Common +{ + public class TimeHelper + { + /// + /// 时间戳转成时间类型 + /// + /// + /// + public static DateTime ConvertToLocalDateTime(string timeStamp) + { + DateTime dtStart = TimeZoneInfo.ConvertTime(new DateTime(1970, 1, 1), TimeZoneInfo.Utc, TimeZoneInfo.Local); + if (timeStamp.Length == 13) + { + long lTime = long.Parse(timeStamp + "0000"); + TimeSpan toNow = new TimeSpan(lTime); + return dtStart.Add(toNow); + } + return dtStart.AddSeconds(long.Parse(timeStamp)); + } + + public static string ToDateTimeStr(DateTime dt) + { + return dt.ToString("yyyy-MM-dd HH:mm:ss"); + } + } +} diff --git a/TelpoPush.Position.Worker/Common/Utils.cs b/TelpoPush.Position.Worker/Common/Utils.cs new file mode 100644 index 0000000..77ac374 --- /dev/null +++ b/TelpoPush.Position.Worker/Common/Utils.cs @@ -0,0 +1,40 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using System.Security.Cryptography; + +namespace TelpoPush.Position.Worker.Common +{ + public class Utils + { + public static MultipartFormDataContent GetMultipartFormDataContent(Dictionary dic, string appId, ref Dictionary outDic) + { + MultipartFormDataContent mfdc = new MultipartFormDataContent(); + StringBuilder sb = new StringBuilder(); + if (dic != null && dic.Count > 0) + { + var dicOrderBy = dic.OrderBy(z => z.Key); + foreach (KeyValuePair kv in dicOrderBy) + { + sb.Append($"{kv.Key}={kv.Value.ToString()}&"); + mfdc.Add(new StringContent(kv.Value.ToString()), kv.Key);//参数, 内容在前,参数名称在后 + } + } + string signStr = $"{sb.ToString().Trim('&')}{appId}"; + byte[] bytes = Encoding.UTF8.GetBytes(signStr); + byte[] hash = SHA256.Create().ComputeHash(bytes); + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < hash.Length; i++) + { + builder.Append(hash[i].ToString("X2")); + } + string sign = builder.ToString().ToLower(); + dic.Add("sign", sign); + mfdc.Add(new StringContent(sign), "sign");//参数, 内容在前,参数名称在后 + outDic = dic; + return mfdc; + } + } +} diff --git a/TelpoPush.Position.Worker/Handlers/KafkaSubscribe.cs b/TelpoPush.Position.Worker/Handlers/KafkaSubscribe.cs new file mode 100644 index 0000000..3689dbf --- /dev/null +++ b/TelpoPush.Position.Worker/Handlers/KafkaSubscribe.cs @@ -0,0 +1,62 @@ +using Confluent.Kafka; +using TelpoPush.Position.Worker.Common; +using TelpoPush.Position.Worker.Service.Mq; + +namespace TelpoPush.Position.Worker.Handlers +{ + public class KafkaSubscribe + { + private readonly ILogger _logger; + private readonly IHostEnvironment _env; + private readonly IKafkaService _kafkaService; + private readonly PositionProcess _positionProcess; + + + public KafkaSubscribe( + ILogger logger, IHostEnvironment env, + IKafkaService kafkaService, + PositionProcess positionProcess) + { + _logger = logger; + _env = env; + _kafkaService = kafkaService; + _positionProcess = positionProcess; + } + public async Task SubscribeAsync() + { +#if DEBUG + + _logger.LogInformation("11312"); + + var temp = new Headers(); + string topic = "topic.push.position"; + //temp.Add(new Header("DataType", new byte[] { 0, 0, 0, 0 })); + //temp.Add(new Header("AlarmType", new byte[] { 2, 0, 0, 0 })); + //string psych = "{\"messageId\":\"1790941606816612864\",\"topic\":\"topic.push.third\",\"time\":\"2024-05-16 11:05:27\",\"data\":{\"imei\":\"861281060093147\",\"atteryLowId\":\"861281060093147664577f9\",\"info\":\"设备电量低于15%\"}}"; + //await _positionProcess.SendPosition(psych, topic, temp); + + //// await _kafkaService.SubscribeAsync(DoReceive, CancellationToken.None); + +#else + + LimitedConcurrencyLevelTaskScheduler lcts = new LimitedConcurrencyLevelTaskScheduler(5); + TaskFactory factory = new TaskFactory(lcts); + try + { + await factory.StartNew(async () => + { + await _kafkaService.SubscribeAsync(DoReceive, CancellationToken.None); + }); + } + catch (Exception ex) + { + _logger.LogError($"Subscribe 处理Kafka数据发生异常 {ex.Message}|{ex.Source}|{ex.StackTrace}"); + } +#endif + } + async void DoReceive(string topic, string message, Headers headers) + { + await _positionProcess.SendPosition(message, topic, headers); + } + } +} diff --git a/TelpoPush.Position.Worker/Handlers/PositionProcess.cs b/TelpoPush.Position.Worker/Handlers/PositionProcess.cs new file mode 100644 index 0000000..5404dc6 --- /dev/null +++ b/TelpoPush.Position.Worker/Handlers/PositionProcess.cs @@ -0,0 +1,346 @@ +using Confluent.Kafka; +using Microsoft.Extensions.Options; +using Newtonsoft.Json; +using TelpoPush.Position.Worker.Common; +using TelpoPush.Position.Worker.Models.Config; +using TelpoPush.Position.Worker.Models.Enum; +using TelpoPush.Position.Worker.Models.MqTemplates; +using TelpoPush.Position.Worker.Models.PushTemplates; +using TelpoPush.Position.Worker.Service.Cache; +using TelpoPush.Position.Worker.Service.Mq; + +namespace TelpoPush.Position.Worker.Handlers +{ + public class PositionProcess + { + private readonly static object _syncLocker = new object(); + private readonly IHostEnvironment _env; + private readonly ILogger _logger; + private readonly HttpHelperAsync _httpHelper; + private readonly RedisUtil _redis; + private readonly MqProcessMessage _serviceMqProcess; + private readonly PositionConfig _positionConfig; + + public PositionProcess( + IHostEnvironment env, + ILogger logger, + HttpHelperAsync httpHelper, + RedisUtil redis, + MqProcessMessage serviceMqProcess, + IOptions positionConfig + ) + { + _env = env; + _logger = logger; + _httpHelper = httpHelper; + _redis = redis; + _positionConfig = positionConfig.Value; + _serviceMqProcess = serviceMqProcess; + + } + + public async Task SendPosition(string? message, string topic, Headers headers) + { + #region 数据初始验证 + bool isHandle = true; + BaseModel model = null; + string imei = ""; + if (!string.IsNullOrEmpty(message)) + { + model = JsonConvert.DeserializeObject(message); + if (model != null) + { + var Jo = JsonConvert.DeserializeObject>(model.data.ToString()); + if (Jo.ContainsKey("imei")) + imei = Jo["imei"].ToString(); + if (Jo.ContainsKey("Imei")) + imei = Jo["Imei"].ToString(); + else if (Jo.ContainsKey("nickname")) + imei = Jo["nickname"].ToString(); + if (string.IsNullOrEmpty(imei)) + { + _logger.LogInformation($"[数据信息不完整] imei信息不存在:{message}"); + isHandle = false; + } + else + await _redis.GetGpsDevice(imei); + } + else + { + _logger.LogInformation($"[数据信息不完整] 数据解析异常:{message}"); + isHandle = false; + } + } + else + { + _logger.LogInformation($"[数据信息不完整] message数据异常:{message}"); + isHandle = false; + } + #endregion + if (isHandle) + { + lock (_syncLocker) + { + //Headers 解析 + HeadersDto headersDto = new HeadersDto(); + try + { + foreach (var item in headers) + { + if (item.Key == KafkaHeader.DataType) + headersDto.DataType = BitConverter.ToInt32(item.GetValueBytes(), 0); + else if (item.Key == KafkaHeader.AlarmType) + headersDto.AlarmType = BitConverter.ToInt32(item.GetValueBytes(), 0); + else if (item.Key == KafkaHeader.OperType) + headersDto.OperType = BitConverter.ToInt32(item.GetValueBytes(), 0); + } + } + catch (Exception ex) + { + _logger.LogError($"当前工作线程Headers异常,{ex.Message}|{ex.Source}|{ex.StackTrace}"); + } + try + { + #region 注释 + //string dataType = headersDto.DataType != null ? "_" + headersDto.DataType : ""; + //string alarmType = headersDto.AlarmType != null ? "_" + headersDto.AlarmType : ""; + //string operType = headersDto.OperType != null ? "_" + headersDto.OperType : ""; + //string key = dataType + alarmType + operType; + //var dataStatus = _redis.IsDateStatus(model, imei, key).Result; + //过滤 + //if (headersDto.DataType == (int)MqDataType.TemperatureInfo + // || headersDto.DataType == (int)MqDataType.Temperature1Info + // || headersDto.DataType == (int)MqDataType.BindDevice + // || headersDto.DataType == (int)MqDataType.PositionInfo + // || headersDto.DataType == (int)MqDataType.HeartRateInfo + // || headersDto.DataType == (int)MqDataType.HeartRate1Info + // || headersDto.DataType == (int)MqDataType.Spo2Info + // || headersDto.DataType == (int)MqDataType.Spo21Info + // || headersDto.DataType == (int)MqDataType.BloodPressInfo + // || headersDto.DataType == (int)MqDataType.BloodPress1Info + // || headersDto.DataType == (int)MqDataType.SportResult + // || headersDto.DataType == (int)MqDataType.BloodSugar + // ) + // dataStatus.isPush = true; + //dataStatus.isPush = true; + //if (dataStatus.isPush) + //{ + //switch (topic) + //{ + // case "topic.push.third": + // switch (headersDto.DataType) + // { + // case (int)MqDataType.AlarmInfo: //报警消息 + // break; + // case (int)MqDataType.TemperatureInfo: //体温消息 + // break; + // case (int)MqDataType.PositionInfo: //定位消息 + // // DataServicePusPosition(model, imei); + // break; + // case (int)MqDataType.StepInfo: //步数消息 + + // break; + // case (int)MqDataType.BatteryLevelInfo: //电量消息 + // break; + // case (int)MqDataType.DeviceCallLog: //设备通话记录 + + // break; + // case (int)MqDataType.DeviceSmsLog: //设备短信记录 + + // break; + // case (int)MqDataType.DeviceConfigInfo: //设备配置信息 + + // break; + // case (int)MqDataType.Status: //设备状态(offline,online) + + // break; + // case (int)MqDataType.Active: //设备激活状态 + // break; + // case (int)MqDataType.reply: //指令回调 + + // break; + // case (int)MqDataType.Weather: //天气查询 + + // break; + // case (int)MqDataType.ReadMsg: //短消息阅读 + + // break; + // case (int)MqDataType.StudyAINotifyStatusUpload: //学习能力状态 + + // break; + // case (int)MqDataType.HeartRateInfo: //心率 + + // break; + // case (int)MqDataType.HeartRate1Info: //周期性心率 + + // break; + // case (int)MqDataType.Spo2Info: //血氧 + + // break; + // case (int)MqDataType.Spo21Info: //周期性血氧 + + // break; + // case (int)MqDataType.Temperature1Info: //周期性报体温数据 + + // break; + // case (int)MqDataType.DrownReportInfo: //防溺水告警 + + // break; + // case (int)MqDataType.WearStatusInfo: //手表未佩戴告警 + + // break; + // case (int)MqDataType.BloodPressInfo: //血压 + + // break; + // case (int)MqDataType.BloodPress1Info: //周期性血压 + + // break; + // case (int)MqDataType.PsychInfo: //心理监测 + + // break; + // case (int)MqDataType.AiCallResult: //AI呼叫结果回调 + + // case (int)MqDataType.CrossBorder: //越界上报(围栏进出告警) + + // break; + // case (int)MqDataType.SportResult: //运动数据上报 + + // break; + // case (int)MqDataType.BindDevice: //绑定业务 + + // break; + // case (int)MqDataType.BloodSugar: //血糖业务 + + // break; + // default: + // break; + // } + // break; + // default: + // break; + //} + //} + //else + // _logger.LogInformation($"数据未处理(历史数据):{JsonConvert.SerializeObject(dataStatus)}"); + + #endregion + + switch (topic) + { + case "topic.push.position": + switch (headersDto.DataType) + { + case (int)MqDataType.PositionInfo: //定位消息 + DataServicePusPosition(model, imei); + break; + default: + break; + } + break; + default: + break; + } + + } + catch (Exception ex) + { + _logger.LogError($"当前工作线程异常: {ex.Message}|{ex.Source}|{ex.StackTrace}"); + } + } + } + } + + //位置 + public async Task DataServicePusPosition(BaseModel model, string imei) + { + var device = await _redis.GetGpsDevice(imei); + if (device != null) + { + await _redis.SetPersonInfoHash(imei); //更行设备用户详情缓存 + var position = JsonConvert.DeserializeObject(model.data.ToString()); + if ((int)position.locationType != 2) // 限制条件:locationType=2 LBS定位不推送 + { + Dictionary dicHeader = new Dictionary(); + dicHeader.Add(MqHeader.DataType, (int)MqDataType.PositionInfo); + + #region 定位-围栏推送服务 + var fenceObj = new + { + messageId = model.messageId, + topic = "topic.push.position", + time = model.time, + data = new + { + DeviceId = device.deviceId, + imei = position.imei, + wifiInfo = position.wifiMacs, + address = position.address, + baiduLatitude = position.baiduLatitude, + baiduLongitude = position.baiduLongitude, + gaodeLatitude = position.gaodeLatitude, + gaodeLongitude = position.gaodeLongitude, + originalLatitude = position.originalLatitude, + originalLongitude = position.originalLongitude, + locationType = position.locationType, + LastUpdate = model.time, + UtcDate = DateTime.Parse(model.time).AddHours(-8).ToString("yyyy-MM-dd HH:mm:ss"), + Radius = position.radius, + } + }; + await _serviceMqProcess.ProcessFenceServer(imei, fenceObj, dicHeader, "定位-围栏"); + #endregion + + #region 定位-JAVA数据推送服务 + var settingInfos = await _redis.GetManufactorPushSettingHash(imei, _positionConfig.RzlManufactorId, (int)MqDataType.PositionInfo); + settingInfos = null; + if (settingInfos != null) + { + Dictionary dic = new Dictionary(); + dic.Add("imei", imei); + dic.Add("locationType", (int)position.locationType); + dic.Add("altitude", position.radius); + dic.Add("gaodeLongitude", position.gaodeLongitude); + dic.Add("gaodeLatitude", position.gaodeLatitude); + dic.Add("originalLongitude", position.originalLongitude); + dic.Add("originalLatitude", position.originalLatitude); + dic.Add("baiduLongitude", position.baiduLongitude); + dic.Add("baiduLatitude", position.baiduLatitude); + dic.Add("address", position.address); + if (!string.IsNullOrEmpty(position.wifiMacs)) + { + position.wifiMacs = position.wifiMacs.Replace(",|", "|").Trim(','); + dic.Add("wifiMacs", position.wifiMacs); + } + dic.Add("dataTime", model.time); + MultipartFormDataContent mfdc = Utils.GetMultipartFormDataContent(dic, _positionConfig.RzlManufactorId, ref dic); + var result = await _httpHelper.PostFormAsync(settingInfos.pushUrl, mfdc); + _logger.LogInformation($"[定位-RZL数据-替换JAVA推送服务<{imei}>] url:{settingInfos.pushUrl},参数:{JsonConvert.SerializeObject(dic)},结果:{result}"); + } + else + { + if (!string.IsNullOrEmpty(position.wifiMacs)) + position.wifiMacs = position.wifiMacs.Replace(",|", "|").Trim(','); + PushPositionTemplate positionInfo = new PushPositionTemplate //上报存储位置信息 + { + MessageId = model.messageId, + Imei = imei, + Altitude = position.radius, + BaiduLatitude = position.baiduLatitude, + BaiduLongitude = position.baiduLongitude, + GaodeLatitude = position.gaodeLatitude, + GaodeLongitude = position.gaodeLongitude, + LocationType = (int)position.locationType, + OriginalLatitude = position.originalLatitude, + OriginalLongitude = position.originalLongitude, + Address = position.address, + wifiMacs = position.wifiMacs, + Time = model.time + }; + await _serviceMqProcess.ProcessDataPushServer(imei, positionInfo, dicHeader, "定位"); + } + #endregion + } + } + } + } +} diff --git a/TelpoPush.Position.Worker/Models/CacheTemplates/DeviceInfoModel.cs b/TelpoPush.Position.Worker/Models/CacheTemplates/DeviceInfoModel.cs new file mode 100644 index 0000000..e0f0dd3 --- /dev/null +++ b/TelpoPush.Position.Worker/Models/CacheTemplates/DeviceInfoModel.cs @@ -0,0 +1,19 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace TelpoPush.Position.Worker.Models.CacheTemplates +{ + public class DeviceInfoModel + { + public string deviceId { get; set; } + public string imei { get; set; } + public string deviceName { get; set; } + public string orgId { get; set; } + public string apiUid { get; set; } + public string activeStatus { get; set; } + public string activeTime { get; set; } + } +} diff --git a/TelpoPush.Position.Worker/Models/CacheTemplates/ManufactorPushSettingInfoModel.cs b/TelpoPush.Position.Worker/Models/CacheTemplates/ManufactorPushSettingInfoModel.cs new file mode 100644 index 0000000..f8d46a5 --- /dev/null +++ b/TelpoPush.Position.Worker/Models/CacheTemplates/ManufactorPushSettingInfoModel.cs @@ -0,0 +1,35 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace TelpoPush.Position.Worker.Models.CacheTemplates +{ + + public class ManufactorPushSettingInfoModel + { + public string id { get; set; } + public string manufactorId { get; set; } + public string manufactorName { get; set; } + public string imei { get; set; } + public List pushSettings { get; set; } + public int settingType { get; set; } + } + public class PushSettingsItem + { + public string dataName { get; set; } + public int dataType { get; set; } + public List settingInfos { get; set; } + } + public class SettingInfosItem + { + public bool pushFlag { get; set; } + public string pushStartTime { get; set; } + public string pushEndTime { get; set; } + public int pushType { get; set; } + public string pushUrl { get; set; } + public string remark { get; set; } + } + +} diff --git a/TelpoPush.Position.Worker/Models/CacheTemplates/PersonInfoModel.cs b/TelpoPush.Position.Worker/Models/CacheTemplates/PersonInfoModel.cs new file mode 100644 index 0000000..7ad6a92 --- /dev/null +++ b/TelpoPush.Position.Worker/Models/CacheTemplates/PersonInfoModel.cs @@ -0,0 +1,42 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace TelpoPush.Position.Worker.Models.CacheTemplates +{ + + public class PersonInfoModel + { + public PersonModel person { get; set; } + public string time { get; set; } + } + public class PersonModel + { + public string personId { get; set; } + public string serialno { get; set; } + public string personName { get; set; } + public string deviceId { get; set; } + public string nickName { get; set; } + public bool gender { get; set; } + public int height { get; set; } + public int weight { get; set; } + public string bornDate { get; set; } + public string school { get; set; } + public string grade { get; set; } + public string className { get; set; } + public string imagePath { get; set; } + public string imagePathSmall { get; set; } + public int age { get; set; } + public string createTime { get; set; } + public string remarks { get; set; } + public int ishypertension { get; set; } + public string emotion { get; set; } + public int profession { get; set; } + public int regularity { get; set; } + public int chronicDisease { get; set; } + public string apiUid { get; set; } + public string apiRemark { get; set; } + } +} diff --git a/TelpoPush.Position.Worker/Models/Config/PositionConfig.cs b/TelpoPush.Position.Worker/Models/Config/PositionConfig.cs new file mode 100644 index 0000000..1050538 --- /dev/null +++ b/TelpoPush.Position.Worker/Models/Config/PositionConfig.cs @@ -0,0 +1,19 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace TelpoPush.Position.Worker.Models.Config +{ + public class PositionConfig + { + public string CoreServiceApiUrl { get; set; } + public string GpsWebApiUrl { get; set; } + + public string RzlManufactorId { get; set; } + public string RzlPushTemperatureUrl { get; set; } + public string RzlPushTranspondUrl { get; set; } + public string RzlVoiceCallback { get; set; } + } +} diff --git a/TelpoPush.Position.Worker/Models/Config/RedisConfig.cs b/TelpoPush.Position.Worker/Models/Config/RedisConfig.cs new file mode 100644 index 0000000..bfcb7c6 --- /dev/null +++ b/TelpoPush.Position.Worker/Models/Config/RedisConfig.cs @@ -0,0 +1,94 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace TelpoPush.Position.Worker.Models.Config +{ + /// + /// Redis配置模板类 + /// + public class RedisConfig + { + public string Server { get; set; } + /// + /// Redis server password + /// + public string Password { get; set; } + /// + /// Redis server database, default 0 + /// + public int? DefaultDatabase { get; set; } + /// + /// The asynchronous method automatically uses pipeline, and the 10W concurrent time is 450ms(welcome to feedback) + /// + public bool? AsyncPipeline { get; set; } + /// + /// Connection pool size, default 50 + /// + public int? Poolsize { get; set; } + /// + /// Idle time of elements in the connection pool(MS), suitable for connecting to remote redis server, default 20000 + /// + public int? IdleTimeout { get; set; } + /// + /// Connection timeout(MS), default 5000 + /// + public int? ConnectTimeout { get; set; } + /// + /// Send / receive timeout(MS), default 10000 + /// + public int? SyncTimeout { get; set; } + /// + /// Preheat connections, receive values such as preheat = 5 preheat 5 connections, default 5 + /// + public int? Preheat { get; set; } + /// + /// Follow system exit event to release automatically, default true + /// + public bool? AutoDispose { get; set; } + /// + /// Enable encrypted transmission, default false + /// + public bool? Ssl { get; set; } + /// + /// 是否尝试集群模式,阿里云、腾讯云集群需要设置此选项为 false, default true + /// + public bool? Testcluster { get; set; } + /// + /// Execution error, retry attempts, default 0 + /// + public int? Tryit { get; set; } + /// + /// Connection name, use client list command to view + /// + public string Name { get; set; } + /// + /// key前辍,所有方法都会附带此前辍,csredis.Set(prefix + "key", 111) + /// + public string Prefix { get; set; } + + public override string ToString() + { + if (string.IsNullOrWhiteSpace(Server)) throw new ArgumentNullException(nameof(Server)); + var sb = new StringBuilder(Server); + if (!string.IsNullOrWhiteSpace(Password)) sb.Append($",password={Password}"); + if (DefaultDatabase.HasValue) sb.Append($",defaultDatabase={DefaultDatabase}"); + if (AsyncPipeline.HasValue) sb.Append($",asyncPipeline={AsyncPipeline}"); + if (Poolsize.HasValue) sb.Append($",poolsize={Poolsize}"); + if (IdleTimeout.HasValue) sb.Append($",idleTimeout={IdleTimeout}"); + if (ConnectTimeout.HasValue) sb.Append($",connectTimeout={ConnectTimeout}"); + if (SyncTimeout.HasValue) sb.Append($",syncTimeout={0}"); + if (Preheat.HasValue) sb.Append($",preheat={Preheat}"); + if (AutoDispose.HasValue) sb.Append($",autoDispose={AutoDispose}"); + if (Ssl.HasValue) sb.Append($",ssl={Ssl}"); + if (Testcluster.HasValue) sb.Append($",testcluster={Testcluster}"); + if (Tryit.HasValue) sb.Append($",tryit={Tryit}"); + if (!string.IsNullOrWhiteSpace(Name)) sb.Append($",name={Name}"); + if (!string.IsNullOrWhiteSpace(Prefix)) sb.Append($",prefix={Prefix}"); + + return sb.ToString(); + } + } +} diff --git a/TelpoPush.Position.Worker/Models/Config/ServiceConfig.cs b/TelpoPush.Position.Worker/Models/Config/ServiceConfig.cs new file mode 100644 index 0000000..3142782 --- /dev/null +++ b/TelpoPush.Position.Worker/Models/Config/ServiceConfig.cs @@ -0,0 +1,32 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace TelpoPush.Position.Worker.Models.Config +{ + public class ServiceConfig + { + /// + /// 数据服务Host Url + /// + public string TelpoDataUrl { get; set; } + + /// + /// Kafka服务地址 + /// + public string KafkaBootstrapServers { get; set; } + public List KafkaTopics { get; set; } + public string KafkaGroupId { get; set; } + //public string KafkaSaslUsername { get; set; } + //public string KafkaSaslPassword { get; set; } + //public string KafkaSslCaLocation { get; set; } + + /// + /// 默认缓存时间 + /// + public int CacheDurationSeconds { get; set; } + public int CacheDurationSeconds10 { get; set; } + } +} diff --git a/TelpoPush.Position.Worker/Models/Enum/HeadersDto.cs b/TelpoPush.Position.Worker/Models/Enum/HeadersDto.cs new file mode 100644 index 0000000..b8d043b --- /dev/null +++ b/TelpoPush.Position.Worker/Models/Enum/HeadersDto.cs @@ -0,0 +1,16 @@ +using Newtonsoft.Json; +namespace TelpoPush.Position.Worker.Models.Enum +{ + /// + /// 消息数据头 + /// + public class HeadersDto + { + [JsonProperty(PropertyName = "DataType")] + public int? DataType { get; set; } + [JsonProperty(PropertyName = "AlarmType")] + public int? AlarmType { get; set; } + [JsonProperty(PropertyName = "OperType")] + public int? OperType { get; set; } + } +} diff --git a/TelpoPush.Position.Worker/Models/Enum/MqDataTopic.cs b/TelpoPush.Position.Worker/Models/Enum/MqDataTopic.cs new file mode 100644 index 0000000..e49651f --- /dev/null +++ b/TelpoPush.Position.Worker/Models/Enum/MqDataTopic.cs @@ -0,0 +1,19 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace TelpoPush.Position.Worker.Models.Enum +{ + + public enum MqDataTopic : int + { + /// + /// 中高实时心率 + /// + ZkRealHRMonitorTopic = 1 + + + } +} diff --git a/TelpoPush.Position.Worker/Models/Enum/MqDataType.cs b/TelpoPush.Position.Worker/Models/Enum/MqDataType.cs new file mode 100644 index 0000000..69dcc6d --- /dev/null +++ b/TelpoPush.Position.Worker/Models/Enum/MqDataType.cs @@ -0,0 +1,156 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace TelpoPush.Position.Worker.Models.Enum +{ + /// + /// 数据类型,标识发布到kafka的消息的数据类型 + /// + public enum MqDataType : int + { + /// + /// 报警消息 + /// + AlarmInfo = 0, + + /// + /// 温度数据信息 + /// + TemperatureInfo = 1, + + /// + /// 步数信息 + /// + StepInfo = 2, + + /// + /// 电量信息 + /// + BatteryLevelInfo = 3, + + /// + /// 设备配置信息 + /// + DeviceConfigInfo = 4, + + /// + /// 设备通话记录 + /// + DeviceCallLog = 5, + + /// + /// 设备短信记录 + /// + DeviceSmsLog = 6, + + /// + /// 位置信息 + /// + PositionInfo = 7, + + /// + /// 支付 + /// + PayInfo = 8, + + /// + /// 设备状态(offline,online) + /// + Status = 9, + + /// + /// 设备激活状态(激活1,未激活0) + /// + Active = 10, + + /// + /// 指令回调 + /// + reply = 11, + + /// + /// 天气查询 + /// + Weather = 12, + + /// + /// 短信阅读事件 + /// + ReadMsg = 13, + /// + /// 学习能力状态上报事件 + /// + StudyAINotifyStatusUpload = 14, + /// + /// 心率 + /// + HeartRateInfo = 15, + /// + /// 血氧 + /// + Spo2Info = 16, + /// + /// 周期性报体温数据。 + /// + Temperature1Info = 17, + /// + /// 周期心率。 + /// + HeartRate1Info = 18, + /// + /// 周期性血氧 + /// + Spo21Info = 19, + /// + /// 溺水状态 + /// + DrownReportInfo = 20, + /// + /// 手表佩戴状态 + /// + WearStatusInfo = 21, + /// + /// 血压 + /// + BloodPressInfo = 22, + /// + /// 周期性血压 + /// + BloodPress1Info = 23, + /// + /// 心理监测 + /// + PsychInfo = 24, + + /// + /// AI呼叫回调结果 + /// + AiCallResult = 25, + + /// + /// 越界上报(围栏进出告警) + /// + CrossBorder = 26, + + /// + /// 运动数据上报 + /// + SportResult = 27, + /// + /// 运动数据上报 + /// + BloodSugar = 28, + + /// + /// 中考实时心率数据上报 + /// + ZkRealHeartRate = 29, + /// + /// 绑定业务 + /// + BindDevice = 100 + } +} diff --git a/TelpoPush.Position.Worker/Models/Enum/MqHeader.cs b/TelpoPush.Position.Worker/Models/Enum/MqHeader.cs new file mode 100644 index 0000000..c847efb --- /dev/null +++ b/TelpoPush.Position.Worker/Models/Enum/MqHeader.cs @@ -0,0 +1,26 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace TelpoPush.Position.Worker.Models.Enum +{ + public static class MqHeader + { + /// + /// DataType + /// + public const string DataType = "DataType"; + + /// + /// OperType + /// + public const string OperType = "OperType"; + + /// + /// AlarmType + /// + public const string AlarmTypes = "AlarmType"; + } +} diff --git a/TelpoPush.Position.Worker/Models/MqTemplates/BaseModel.cs b/TelpoPush.Position.Worker/Models/MqTemplates/BaseModel.cs new file mode 100644 index 0000000..c7415e4 --- /dev/null +++ b/TelpoPush.Position.Worker/Models/MqTemplates/BaseModel.cs @@ -0,0 +1,17 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace TelpoPush.Position.Worker.Models.MqTemplates +{ + public class BaseModel + { + public string messageId { get; set; } + public string topic { get; set; } + public string time { get; set; } + public object data { get; set; } + } + +} diff --git a/TelpoPush.Position.Worker/Models/MqTemplates/MqPositionTemplate.cs b/TelpoPush.Position.Worker/Models/MqTemplates/MqPositionTemplate.cs new file mode 100644 index 0000000..921704d --- /dev/null +++ b/TelpoPush.Position.Worker/Models/MqTemplates/MqPositionTemplate.cs @@ -0,0 +1,29 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace TelpoPush.Position.Worker.Models.MqTemplates +{ + public class MqPositionTemplate + { + public string imei { get; set; } + public int locationType { get; set; } + public string address { get; set; } + public int altitude { get; set; } + public decimal baiduLatitude { get; set; } + public decimal baiduLongitude { get; set; } + public decimal gaodeLatitude { get; set; } + public decimal gaodeLongitude { get; set; } + public decimal originalLatitude { get; set; } + public decimal originalLongitude { get; set; } + public string postcode { get; set; } + public string hashParam { get; set; } + public int radius { get; set; } + public string province { get; set; } + public string city { get; set; } + public string district { get; set; } + public string wifiMacs { get; set; } + } +} diff --git a/TelpoPush.Position.Worker/Models/PushTemplates/PushFencePositionTemplate.cs b/TelpoPush.Position.Worker/Models/PushTemplates/PushFencePositionTemplate.cs new file mode 100644 index 0000000..2818491 --- /dev/null +++ b/TelpoPush.Position.Worker/Models/PushTemplates/PushFencePositionTemplate.cs @@ -0,0 +1,35 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace TelpoPush.Position.Worker.Models.PushTemplates +{ + public class PushFencePositionTemplate + { + + public string messageId { get; set; } + public string topic { get; set; } + public string time { get; set; } + public data data { get; set; } + } + + public class data + { + public string DeviceId { get; set; } + public int Radius { get; set; } + public string imei { get; set; } + public string wifiInfo { get; set; } + public int locationType { get; set; } + public string address { get; set; } + public decimal baiduLatitude { get; set; } + public decimal baiduLongitude { get; set; } + public decimal gaodeLatitude { get; set; } + public decimal gaodeLongitude { get; set; } + public decimal originalLatitude { get; set; } + public decimal originalLongitude { get; set; } + public string LastUpdate { get; set; } + public string UtcDate { get; set; } + } +} diff --git a/TelpoPush.Position.Worker/Models/PushTemplates/PushPositionTemplate.cs b/TelpoPush.Position.Worker/Models/PushTemplates/PushPositionTemplate.cs new file mode 100644 index 0000000..3af8181 --- /dev/null +++ b/TelpoPush.Position.Worker/Models/PushTemplates/PushPositionTemplate.cs @@ -0,0 +1,73 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace TelpoPush.Position.Worker.Models.PushTemplates +{ + public class PushPositionTemplate + { + + /// + /// 服务跟踪Id + /// + public string MessageId { get; set; } + + /// + /// 海拔 + /// + public double Altitude { get; set; } + + /// + /// 定位类型 + /// + public int LocationType { get; set; } + /// + /// 设备IMEI + /// + public string Imei { get; set; } + + /// + /// 时间 + /// yyyy-MM-dd HH:mm:ss + /// + public string Time { get; set; } + + /// + /// 原始纬度 + /// + public decimal OriginalLatitude { get; set; } + + /// + /// 原始经度 + /// + public decimal OriginalLongitude { get; set; } + + /// + /// 百度地图纬度 + /// + public decimal BaiduLatitude { get; set; } + + /// + /// 百度地图经度 + /// + public decimal BaiduLongitude { get; set; } + + /// + /// 高德地图纬度 + /// + public decimal GaodeLatitude { get; set; } + + /// + /// 高德地图经度 + /// + public decimal GaodeLongitude { get; set; } + + /// + /// 地址 + /// + public string Address { get; set; } + public string wifiMacs { get; set; } + } +} diff --git a/TelpoPush.Position.Worker/Program.cs b/TelpoPush.Position.Worker/Program.cs index a33bec4..f1e32cf 100644 --- a/TelpoPush.Position.Worker/Program.cs +++ b/TelpoPush.Position.Worker/Program.cs @@ -1,9 +1,13 @@ -using Dapper; + using Newtonsoft.Json; using Newtonsoft.Json.Serialization; using Serilog; using TelpoDataService.Util.Clients; using TelpoPush.Position.Worker; +using TelpoPush.Position.Worker.Common; +using TelpoPush.Position.Worker.Handlers; +using TelpoPush.Position.Worker.Service.Cache; +using TelpoPush.Position.Worker.Service.Mq; #region ־ @@ -76,14 +80,14 @@ try builder.Services.AddSerilog(); builder.Services.AddHttpClient(); - //builder.Services.AddTransient(); - //builder.Services.AddSingleton(); - //builder.Services.AddSingleton(); - //builder.Services.AddSingleton(); - //builder.Services.AddSingleton(); - //builder.Services.AddSingleton(); - //builder.Services.AddSingleton(); - //builder.Services.AddSingleton(); + builder.Services.AddTransient(); + builder.Services.AddSingleton(); + builder.Services.AddSingleton(); + builder.Services.AddSingleton(); + builder.Services.AddSingleton(); + builder.Services.AddSingleton(); + builder.Services.AddSingleton(); + builder.Services.AddSingleton(); builder.Services.AddHostedService(); var host = builder.Build(); host.Run(); diff --git a/TelpoPush.Position.Worker/Service/Cache/MemoryCacheUtil.cs b/TelpoPush.Position.Worker/Service/Cache/MemoryCacheUtil.cs new file mode 100644 index 0000000..d10b2ad --- /dev/null +++ b/TelpoPush.Position.Worker/Service/Cache/MemoryCacheUtil.cs @@ -0,0 +1,82 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Microsoft.Extensions.Caching.Memory; + +namespace TelpoPush.Position.Worker.Service.Cache +{ + internal class MemoryCacheUtil + { + static MemoryCache cache = new MemoryCache(new MemoryCacheOptions()); + /// + /// 创建缓存项的文件 + /// + /// 缓存Key + /// object对象 + public static void Set(string key, object value) + { + if (key != null) + { + cache.Set(key, value); + } + } + /// + /// 创建缓存项过期 + /// + /// 缓存Key + /// object对象 + /// 过期时间(秒) + public static void Set(string key, object value, int expires) + { + if (key != null) + { + cache.Set(key, value, new MemoryCacheEntryOptions() + //设置缓存时间,如果被访问重置缓存时间。设置相对过期时间x秒 + .SetSlidingExpiration(TimeSpan.FromSeconds(expires))); + } + } + + /// + /// 获取缓存对象 + /// + /// 缓存Key + /// object对象 + public static object Get(string key) + { + object val = null; + if (key != null && cache.TryGetValue(key, out val)) + { + + return val; + } + else + { + return default(object); + } + } + + /// + /// 获取缓存对象 + /// + /// T对象 + /// 缓存Key + /// + public static T Get(string key) + { + object obj = Get(key); + return obj == null ? default(T) : (T)obj; + } + + + /// + /// 移除缓存项的文件 + /// + /// 缓存Key + public static void Remove(string key) + { + cache.Remove(key); + } + } + } diff --git a/TelpoPush.Position.Worker/Service/Cache/RedisUtil.cs b/TelpoPush.Position.Worker/Service/Cache/RedisUtil.cs new file mode 100644 index 0000000..9af7fea --- /dev/null +++ b/TelpoPush.Position.Worker/Service/Cache/RedisUtil.cs @@ -0,0 +1,208 @@ +using Microsoft.Extensions.Options; +using TelpoPush.Position.Worker.Models.CacheTemplates; +using TelpoPush.Position.Worker.Models.Config; + +namespace TelpoPush.Position.Worker.Service.Cache +{ + public class RedisUtil + { + private const string CACHE_HASH_KEY_TELPO_GPSDEVICE = "TELPO#GPSDEVICE_HASH"; + private const string CACHE_HASH_KEY_TELPO_GPSDEVICE_PERSON = "TELPO#GPSDEVICE_PERSON_HASH"; + private const string CACHE_HASH_KEY_TELPO_MANUFACTOR_CONFIG = "TELPO#MANUFACTOR_CONFG_HASH"; + private const string CACHE_HASH_KEY_TELPO_GPSDEVICE_PUSHSITTTIGS = "TELPO#GPSDEVICE_PUSH_SITTINGS_HASH"; + + private readonly ILogger _logger; + private readonly ServiceConfig _configService; + private readonly SqlMapper _sqlMapper; + public RedisUtil(ILogger logger,IOptions optConfigRedis, IOptions configService, SqlMapper sqlMapper) + { + _configService = configService.Value; + _logger = logger; + optConfigRedis.Value.Prefix = ""; + optConfigRedis.Value.DefaultDatabase = 7; + var csredis = new CSRedis.CSRedisClient(optConfigRedis.Value.ToString()); + RedisHelper.Initialization(csredis); + _sqlMapper = sqlMapper; + } + + public async Task GetGpsDevice(string imei) + { + if (string.IsNullOrWhiteSpace(imei)) return null; + string keyCache = $"{imei}_GpsDevice"; + try + { + var objCache = MemoryCacheUtil.Get(keyCache); + if (objCache == null) + { + var obj = await RedisHelper.HGetAsync(CACHE_HASH_KEY_TELPO_GPSDEVICE, imei); + if (obj == null) + { + var deviceInfo = _sqlMapper.DeviceInfo(imei); + if (deviceInfo != null) + { + RedisHelper.HSetAsync(CACHE_HASH_KEY_TELPO_GPSDEVICE, imei, deviceInfo); + MemoryCacheUtil.Set(keyCache, deviceInfo, _configService.CacheDurationSeconds10); + return deviceInfo; + } + } + else + { + MemoryCacheUtil.Set(keyCache, obj, _configService.CacheDurationSeconds10); + return obj; + } + } + return objCache; + } + catch (Exception ex) + { + _logger.LogError($"GetGpsDevice,key={imei},缓存异常,重新获取数据,{ex.Message}|{ex.Source}|{ex.StackTrace}"); + var deviceInfo = _sqlMapper.DeviceInfo(imei); + if (deviceInfo != null) + { + RedisHelper.HSetAsync(CACHE_HASH_KEY_TELPO_GPSDEVICE, imei, deviceInfo); + MemoryCacheUtil.Set(keyCache, deviceInfo, _configService.CacheDurationSeconds10); + return deviceInfo; + } + } + return null; + } + + public async Task SetPersonInfoHash(string imei) + { + if (string.IsNullOrWhiteSpace(imei)) return null; + string keyCache = $"{imei}_PersonInfoHash"; + var personInfo = _sqlMapper.PersonInfo(imei); + PersonInfoModel model = new PersonInfoModel() + { + person = personInfo, + time = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss") + }; + if (personInfo != null) + { + await RedisHelper.HSetAsync(CACHE_HASH_KEY_TELPO_GPSDEVICE_PERSON, imei, model); + MemoryCacheUtil.Set(keyCache, model, 60);//1分钟 + } + else + { + await RedisHelper.HDelAsync(CACHE_HASH_KEY_TELPO_GPSDEVICE_PERSON, imei); + MemoryCacheUtil.Remove(keyCache); + } + return model; + } + + public async Task GetHealthyDeviceKey(string imei) + { + if (string.IsNullOrWhiteSpace(imei)) return null; + string HealthyDeviceKey = $"Telpol:HealthyDeviceKey:{imei}"; + string keyCache = $"{imei}_HealthyDeviceKey"; + try + { + var objCache = MemoryCacheUtil.Get(keyCache); + if (objCache == null) + { + string deviceKey = await RedisHelper.HGetAsync(HealthyDeviceKey, "data"); + if (!string.IsNullOrEmpty(deviceKey)) + { + MemoryCacheUtil.Set(keyCache, deviceKey, _configService.CacheDurationSeconds10); + return deviceKey; + } + } + return objCache; + } + catch (Exception ex) + { + _logger.LogError($"GetHealthyDeviceKey,key={imei},缓存异常,重新获取数据,{ex.Message}|{ex.Source}|{ex.StackTrace}"); + string deviceKey = await RedisHelper.HGetAsync(HealthyDeviceKey, "data"); + if (!string.IsNullOrEmpty(deviceKey)) + { + MemoryCacheUtil.Set(keyCache, deviceKey, _configService.CacheDurationSeconds10); + return deviceKey; + } + else + return ""; + } + } + + public async Task GetManufactorPushSettingHash(string imei, string manufactorId, int dataType) + { + if (string.IsNullOrWhiteSpace(manufactorId)) return null; + string keyCache = $"{manufactorId}_{dataType}_ManufactorPushSettingHash"; + SettingInfosItem settingInfos = null; + try + { + var objCache = MemoryCacheUtil.Get(keyCache); + if (objCache == null) + { + var obj = await RedisHelper.HGetAsync(CACHE_HASH_KEY_TELPO_GPSDEVICE_PUSHSITTTIGS, manufactorId); + if (obj != null) + { + if (obj.pushSettings.Any()) + { + DateTime dt = DateTime.Now; + var settingsItem = obj.pushSettings.FirstOrDefault(x => x.dataType == dataType); + if (settingsItem != null) + { + if (settingsItem.settingInfos.Any()) + { + foreach (var item in settingsItem.settingInfos) + { + DateTime startTime = DateTime.Parse($"{dt.Year}-{dt.Month}-{dt.Day} {item.pushStartTime}"); + DateTime endTime = DateTime.Parse($"{dt.Year}-{dt.Month}-{dt.Day} {item.pushEndTime}"); + + if (item.pushFlag && (dt > startTime && dt < endTime)) + { + settingInfos = item; + break; + } + } + } + } + } + if (settingInfos != null) + MemoryCacheUtil.Set(keyCache, settingInfos, _configService.CacheDurationSeconds10); + else + MemoryCacheUtil.Remove(keyCache); + } + } + else + settingInfos = objCache; + } + catch (Exception ex) + { + _logger.LogError($"GetManufactorPushSettingHash(imei={imei},dataType={dataType}),key={manufactorId},缓存异常,重新获取数据,{ex.Message}|{ex.Source}|{ex.StackTrace}"); + var obj = await RedisHelper.HGetAsync(CACHE_HASH_KEY_TELPO_GPSDEVICE_PUSHSITTTIGS, manufactorId); + if (obj != null) + { + if (obj.pushSettings.Any()) + { + DateTime dt = DateTime.Now; + var settingsItem = obj.pushSettings.FirstOrDefault(x => x.dataType == dataType); + if (settingsItem != null) + { + if (settingsItem.settingInfos.Any()) + { + foreach (var item in settingsItem.settingInfos) + { + DateTime startTime = DateTime.Parse($"{dt.Year}-{dt.Month}-{dt.Day} {item.pushStartTime}"); + DateTime endTime = DateTime.Parse($"{dt.Year}-{dt.Month}-{dt.Day} {item.pushEndTime}"); + + if (item.pushFlag && (dt > startTime && dt < endTime)) + { + settingInfos = item; + break; + } + } + } + } + } + if (settingInfos != null) + MemoryCacheUtil.Set(keyCache, settingInfos, _configService.CacheDurationSeconds10); + else + MemoryCacheUtil.Remove(keyCache); + } + } + return settingInfos; + } + + } +} diff --git a/TelpoPush.Position.Worker/Service/Cache/SqlMapper.cs b/TelpoPush.Position.Worker/Service/Cache/SqlMapper.cs new file mode 100644 index 0000000..7805893 --- /dev/null +++ b/TelpoPush.Position.Worker/Service/Cache/SqlMapper.cs @@ -0,0 +1,39 @@ +using Dapper; +using MySql.Data.MySqlClient; +using System.Data; +using TelpoPush.Position.Worker.Models.CacheTemplates; + +namespace TelpoPush.Position.Worker.Service.Cache +{ + public class SqlMapper + { + private readonly IConfiguration _config; + private static string gps_conn = ""; + private static string telcommon_conn = ""; + private static string healthy_conn = ""; + public SqlMapper(IConfiguration config) + { + _config = config; + gps_conn = _config["ConnectionStrings:DB_Connection_String"].ToString(); + telcommon_conn = _config["ConnectionStrings:Telpo_common_ConnString"].ToString(); + healthy_conn = _config["ConnectionStrings:Telpo_Healthy_ConnString"].ToString(); + } + + public DeviceInfoModel DeviceInfo(string imei) + { + using (IDbConnection connection = new MySqlConnection(gps_conn)) + { + var sql = @"SELECT d.device_id deviceId, p.nick_name deviceName,p.api_uid apiUid,d.serialno imei, d.org_uid orgId, d.active_status activeStatus,active_time activeTime FROM gps_device as d LEFT JOIN `gps_person` as p on p.device_id=d.device_id WHERE d.serialno=@imei"; + return connection.QueryFirstOrDefault(sql, new { imei }); + } + } + public PersonModel PersonInfo(string imei) + { + using (IDbConnection connection = new MySqlConnection(gps_conn)) + { + var sql = @"SELECT person_id personId, serialno, person_name personName, device_id deviceId, nick_name nickName, gender, height, weight, born_date bornDate, school, grade, class_name className, image_path imagePath,image_path_small imagePathSmall, age, create_time createTime, remarks, ishypertension, emotion,profession,regularity,chronic_disease chronicDisease,api_uid apiUid,api_remark apiRemark FROM gps_person WHERE serialno=@imei"; + return connection.QueryFirstOrDefault(sql, new { imei }); + } + } + } +} diff --git a/TelpoPush.Position.Worker/Service/Mq/IKafkaService.cs b/TelpoPush.Position.Worker/Service/Mq/IKafkaService.cs new file mode 100644 index 0000000..4481052 --- /dev/null +++ b/TelpoPush.Position.Worker/Service/Mq/IKafkaService.cs @@ -0,0 +1,9 @@ +using Confluent.Kafka; + +namespace TelpoPush.Position.Worker.Service.Mq +{ + public interface IKafkaService + { + Task SubscribeAsync(Action messageFunc, CancellationToken cancellationToken); + } +} diff --git a/TelpoPush.Position.Worker/Service/Mq/KafkaHeader.cs b/TelpoPush.Position.Worker/Service/Mq/KafkaHeader.cs new file mode 100644 index 0000000..b236389 --- /dev/null +++ b/TelpoPush.Position.Worker/Service/Mq/KafkaHeader.cs @@ -0,0 +1,15 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace TelpoPush.Position.Worker.Service.Mq +{ + public class KafkaHeader + { + public static readonly string DataType = "DataType"; + public static readonly string AlarmType = "AlarmType"; + public static readonly string OperType = "OperType"; + } +} diff --git a/TelpoPush.Position.Worker/Service/Mq/KafkaService.cs b/TelpoPush.Position.Worker/Service/Mq/KafkaService.cs new file mode 100644 index 0000000..caa80f3 --- /dev/null +++ b/TelpoPush.Position.Worker/Service/Mq/KafkaService.cs @@ -0,0 +1,135 @@ +using Confluent.Kafka; +using Microsoft.Extensions.Options; +using Newtonsoft.Json; +using TelpoPush.Position.Worker.Models.Config; + +namespace TelpoPush.Position.Worker.Service.Mq +{ + public class KafkaService : IKafkaService + { + private readonly ConsumerConfig _consumerConfig; + private readonly IHostEnvironment env; + private readonly ILogger logger; + private readonly ServiceConfig _configService; + public KafkaService(ILogger _logger, IHostEnvironment _env, IOptions optConfigService) + { + _configService = optConfigService.Value; + env = _env; + logger = _logger; + + _consumerConfig = new ConsumerConfig + { + BootstrapServers = _configService.KafkaBootstrapServers, + GroupId = _configService.KafkaGroupId, + EnableAutoCommit = false, + StatisticsIntervalMs = 5000, + SessionTimeoutMs = 6000, + AutoOffsetReset = AutoOffsetReset.Earliest, + EnablePartitionEof = true, + CancellationDelayMaxMs=1 + + }; + } + + public async Task SubscribeAsync(Action messageFunc, CancellationToken cancellationToken) + { + List topics = _configService.KafkaTopics; + using (var consumer = new ConsumerBuilder(_consumerConfig) + .SetErrorHandler((_, e) => + { + Console.WriteLine($"Error: {e.Reason}"); + logger.LogError($"Error: {e.Reason}"); + }) + .SetStatisticsHandler((_, json) => + { + Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} > 消息监听中.."); + logger.LogInformation($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} > 消息监听中.."); + }) + .SetPartitionsAssignedHandler((c, partitions) => + { + string partitionsStr = string.Join(", ", partitions); + logger.LogInformation($" - 分配的 kafka 分区: {partitionsStr}"); + }) + .SetPartitionsRevokedHandler((c, partitions) => + { + string partitionsStr = string.Join(", ", partitions); + Console.WriteLine($" - 回收了 kafka 的分区: {partitionsStr}"); + logger.LogInformation($" - 回收了 kafka 分区: {partitionsStr}"); + }) + .Build()) + { + + consumer.Subscribe(topics); + try + { + while (true) + { + try + { + var consumeResult = consumer.Consume(cancellationToken); + int DataType = -1, AlarmType = -1, OperType = -1; + foreach (var item in consumeResult?.Headers) + { + if (item.Key == KafkaHeader.DataType) + DataType = BitConverter.ToInt32(item.GetValueBytes(), 0); + else if (item.Key == KafkaHeader.AlarmType) + AlarmType = BitConverter.ToInt32(item.GetValueBytes(), 0); + else if (item.Key == KafkaHeader.OperType) + OperType = BitConverter.ToInt32(item.GetValueBytes(), 0); + } + var Headers = new { DataType, AlarmType, OperType }; + logger.LogInformation($"Consumed topic '{consumeResult.Topic}', message '{consumeResult.Message?.Value}' , headers '{JsonConvert.SerializeObject(Headers)}', at: '{consumeResult?.TopicPartitionOffset}'."); + if (consumeResult.IsPartitionEOF) + { + logger.LogInformation($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} 已经到底了:{consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}."); + Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} 已经到底了:{consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}."); + continue; + } + + string messageResult = null; + Headers headers = null; + try + { + messageResult = consumeResult.Message.Value; + headers = consumeResult.Message.Headers; + } + catch (Exception ex) + { + var errorMessage = $" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}【Exception 消息反序列化失败,Value:{consumeResult.Message.Value}】 :{ex.StackTrace?.ToString()}"; + Console.WriteLine(errorMessage); + logger.LogError(errorMessage); + messageResult = null; + } + if (!string.IsNullOrEmpty(messageResult)/* && consumeResult.Offset % commitPeriod == 0*/) + { + string topic = consumeResult.Topic; + messageFunc(topic, messageResult, headers); + + try + { + consumer.Commit(consumeResult); + } + catch (KafkaException e) + { + Console.WriteLine(e.Message); + } + } + } + catch (ConsumeException e) + { + logger.LogError($"Consume error: {e.Error.Reason}"); + Console.WriteLine($"Consume error: {e.Error.Reason}"); + } + } + } + catch (OperationCanceledException) + { + logger.LogError("Closing consumer."); + Console.WriteLine("Closing consumer."); + consumer.Close(); + } + } + await Task.CompletedTask; + } + } +} diff --git a/TelpoPush.Position.Worker/Service/Mq/MessageProducer.cs b/TelpoPush.Position.Worker/Service/Mq/MessageProducer.cs new file mode 100644 index 0000000..32d33ec --- /dev/null +++ b/TelpoPush.Position.Worker/Service/Mq/MessageProducer.cs @@ -0,0 +1,77 @@ +using Confluent.Kafka; +using Microsoft.Extensions.Options; +using Newtonsoft.Json; +using TelpoPush.Position.Worker.Models.Config; + +namespace TelpoPush.Position.Worker.Service.Mq +{ + /// + /// 消息生产者 + /// + public class MessageProducer + { + private readonly ILogger _logger; + private readonly ServiceConfig _configService; + private readonly IProducer _producer; + + public MessageProducer(ILogger logger, IOptions optConfigService) + { + _logger = logger; + _configService = optConfigService.Value; + + var config = new ProducerConfig + { + BootstrapServers = _configService.KafkaBootstrapServers, + EnableIdempotence = true, + Acks = Acks.All, + //LingerMs=5000, + //BatchNumMessages =1000, + //BatchSize=32768, + //CompressionType= CompressionType.Lz4, + MessageSendMaxRetries = 3 + }; + _producer = new ProducerBuilder(config).Build(); + } + + public Headers CreateHeader(Dictionary pair = null) + { + if (pair == null) + { + return null; + } + else + { + Headers headers = new Headers(); + + foreach (var item in pair) + { + headers.Add(item.Key, BitConverter.GetBytes(item.Value)); + } + return headers; + } + } + + public async Task ProduceAsync(List topic, object message) + { + try + { + foreach (var item in topic) + { + // producer = new ProducerBuilder(config).Build(); + await _producer.ProduceAsync(item.Topic, new Message + { + Headers = item.Headers, + Value = JsonConvert.SerializeObject(message) + }); + } + } + catch (ProduceException e) + { + _logger.LogError($"推送到kafka失败,topic: {topic},\n message:{JsonConvert.SerializeObject(message)}: \n{e.Error.Reason}"); + } + } + + + + } +} diff --git a/TelpoPush.Position.Worker/Service/Mq/MqProcessMessage.cs b/TelpoPush.Position.Worker/Service/Mq/MqProcessMessage.cs new file mode 100644 index 0000000..b0ec173 --- /dev/null +++ b/TelpoPush.Position.Worker/Service/Mq/MqProcessMessage.cs @@ -0,0 +1,70 @@ +using Newtonsoft.Json; +using TelpoPush.Position.Worker.Models.Enum; +using TelpoPush.Position.Worker.Models.MqTemplates; + +namespace TelpoPush.Position.Worker.Service.Mq +{ + public class MqProcessMessage + { + private readonly ILogger _logger; + private readonly MessageProducer _messageProducer; + + public MqProcessMessage(ILogger logger, MessageProducer producer) + { + _logger = logger; + _messageProducer = producer; + } + + public async Task ProcessProperty(string imei, BaseModel model, HeadersDto headers) + { + List ls = new List(); + + ls.Add(new TopicModel() + { + Topic = "topic.push.property", + Headers = _messageProducer.CreateHeader(new Dictionary + { + {MqHeader.DataType,headers.DataType.Value } + }) + }); + await _messageProducer.ProduceAsync(ls, model); + _logger.LogInformation($"【成功】Third推送(topic.property):IMEI<{imei}>,pushData:{JsonConvert.SerializeObject(model)}"); + } + + public async Task ProcessDataPushServer(string imei, object model, Dictionary headers, string tag) + { + List ls = new List(); + ls.Add(new TopicModel() + { + Topic = "topic.push", + Headers = _messageProducer.CreateHeader(headers) + }); + await _messageProducer.ProduceAsync(ls, model); + _logger.LogInformation($"【{tag}-成功】Third推送(topic.push):IMEI<{imei}>,Header<{JsonConvert.SerializeObject(headers)}>,pushData:{JsonConvert.SerializeObject(model)}"); + } + + public async Task ProcessFenceServer(string imei, object model, Dictionary headers, string tag) + { + List ls = new List(); + ls.Add(new TopicModel() + { + Topic = "topic.push.fence", + Headers = _messageProducer.CreateHeader(headers) + }); + await _messageProducer.ProduceAsync(ls, model); + _logger.LogInformation($"【{tag}-成功】Third推送(topic.push.fence):IMEI<{imei}>,Header<{JsonConvert.SerializeObject(headers)}>,pushData:{JsonConvert.SerializeObject(model)}"); + } + + public async Task ProcessThirdhServer(string imei, object model, Dictionary headers, string tag) + { + List ls = new List(); + ls.Add(new TopicModel() + { + Topic = "topic.push.third", + Headers = _messageProducer.CreateHeader(headers) + }); + await _messageProducer.ProduceAsync(ls, model); + _logger.LogInformation($"【{tag}-成功】Third推送(topic.push.third):IMEI<{imei}>,Header<{JsonConvert.SerializeObject(headers)}>,pushData:{JsonConvert.SerializeObject(model)}"); + } + } +} diff --git a/TelpoPush.Position.Worker/Service/Mq/TopicModel.cs b/TelpoPush.Position.Worker/Service/Mq/TopicModel.cs new file mode 100644 index 0000000..9aa1910 --- /dev/null +++ b/TelpoPush.Position.Worker/Service/Mq/TopicModel.cs @@ -0,0 +1,15 @@ +using Confluent.Kafka; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace TelpoPush.Position.Worker.Service.Mq +{ + public class TopicModel + { + public string Topic { get; set; } + public Headers Headers { get; set; } + } +} diff --git a/TelpoPush.Position.Worker/TelpoPush.Position.Worker.csproj b/TelpoPush.Position.Worker/TelpoPush.Position.Worker.csproj index 1440c91..7f9b487 100644 --- a/TelpoPush.Position.Worker/TelpoPush.Position.Worker.csproj +++ b/TelpoPush.Position.Worker/TelpoPush.Position.Worker.csproj @@ -23,4 +23,8 @@ + + + + diff --git a/TelpoPush.Position.Worker/Worker.cs b/TelpoPush.Position.Worker/Worker.cs index 42d0d80..b4ec66d 100644 --- a/TelpoPush.Position.Worker/Worker.cs +++ b/TelpoPush.Position.Worker/Worker.cs @@ -1,22 +1,23 @@ +using TelpoPush.Position.Worker.Handlers; + namespace TelpoPush.Position.Worker { public class Worker : BackgroundService { private readonly ILogger _logger; + KafkaSubscribe _kafkaSubscribe; - public Worker(ILogger logger) + public Worker(ILogger logger, KafkaSubscribe kafkaSubscribe) { _logger = logger; + _kafkaSubscribe = kafkaSubscribe; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { while (!stoppingToken.IsCancellationRequested) { - if (_logger.IsEnabled(LogLevel.Information)) - { - _logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now); - } + await _kafkaSubscribe.SubscribeAsync(); await Task.Delay(1000, stoppingToken); } }