From 5b84e064cc8d03898f343fbcebbef492762d60fc Mon Sep 17 00:00:00 2001 From: H Vs Date: Tue, 8 Apr 2025 14:42:24 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- celery_app.py | 4 ++-- services/gewe_service.py | 2 +- services/kafka_service.py | 3 +++ services/redis_service.py | 11 ++--------- tasks.py | 3 ++- 5 files changed, 10 insertions(+), 13 deletions(-) diff --git a/celery_app.py b/celery_app.py index 224d966..694a466 100644 --- a/celery_app.py +++ b/celery_app.py @@ -30,13 +30,13 @@ environment = os.environ.get('environment', 'default') if environment == 'production': scheduled_task_sync_wx_info_interval = 60*11 - scheduled_task_add_contacts_from_chatrooms_interval = 3600*2 + scheduled_task_add_contacts_from_chatrooms_interval = 60*11 elif environment == 'test': scheduled_task_sync_wx_info_interval = 60*11 scheduled_task_add_contacts_from_chatrooms_interval = 60*11 else: scheduled_task_sync_wx_info_interval = 6000 - scheduled_task_add_contacts_from_chatrooms_interval=6000 + scheduled_task_add_contacts_from_chatrooms_interval=60 # 定义定时任务列表 (任务 ID, 任务名称, 执行间隔秒, 任务参数) diff --git a/services/gewe_service.py b/services/gewe_service.py index 89d228a..fb6ad8e 100644 --- a/services/gewe_service.py +++ b/services/gewe_service.py @@ -1388,7 +1388,7 @@ class GeWeService: async def save_task_run_time_async(self,task_name,log:list,expire_time=None): hash_key = f"__AI_OPS_WX__:TASK_RUN_TIME:{task_name}" - await self.redis_service.set_hash(hash_key, "data,", json.dumps(log, ensure_ascii=False), expire_time) + await self.redis_service.set_hash(hash_key,{"data": json.dumps(log, ensure_ascii=False)}, expire_time) async def get_task_run_time_async(self,task_name)->list: hash_key = f"__AI_OPS_WX__:TASK_RUN_TIME:{task_name}" diff --git a/services/kafka_service.py b/services/kafka_service.py index 36035b8..ebafc81 100644 --- a/services/kafka_service.py +++ b/services/kafka_service.py @@ -168,6 +168,9 @@ class KafkaService: """Start both producer and consumer""" await self.connect_producer() + async def stop_producer(self): + if self.producer: + await self.producer.stop() async def stop(self): """Graceful shutdown""" diff --git a/services/redis_service.py b/services/redis_service.py index 1d5c459..865f758 100644 --- a/services/redis_service.py +++ b/services/redis_service.py @@ -25,19 +25,12 @@ class RedisService: async def set_hash(self, hash_key, data, timeout=None): """添加或更新哈希,并设置有效期""" - await self.client.hmset_dict(hash_key, data) + # 使用 hmset 方法设置哈希表数据 + await self.client.hmset(hash_key, data) if timeout: # 设置有效期(单位:秒) await self.client.expire(hash_key, timeout) - # async def set_hash(self, hash_key, data, timeout=None): - # """添加或更新哈希,并设置有效期""" - # # 使用 hmset 方法设置哈希表数据 - # await self.client.hmset(hash_key, data) - # if timeout: - # # 设置有效期(单位:秒) - # await self.client.expire(hash_key, timeout) - # async def set_hash(self, hash_key, data, timeout=None): # """添加或更新哈希,并设置有效期""" diff --git a/tasks.py b/tasks.py index 124bbcd..d753677 100644 --- a/tasks.py +++ b/tasks.py @@ -732,13 +732,14 @@ def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config, k_message = wx_add_contacts_from_chatroom_message(history.wxid,history.chatroomId,history.contactWixd,history.addTime) await kafka_service.send_message_async(k_message) - await asyncio.sleep(random.uniform(1.5, 3)) await asyncio.sleep(random.uniform(1.5, 3)) except Exception as e: logger.error(f"任务执行过程中发生异常: {e}") + finally: + await kafka_service.stop_producer() loop = asyncio.get_event_loop() if loop.is_closed():