@@ -30,13 +30,13 @@ environment = os.environ.get('environment', 'default') | |||||
if environment == 'production': | if environment == 'production': | ||||
scheduled_task_sync_wx_info_interval = 60*11 | scheduled_task_sync_wx_info_interval = 60*11 | ||||
scheduled_task_add_contacts_from_chatrooms_interval = 3600*2 | |||||
scheduled_task_add_contacts_from_chatrooms_interval = 60*11 | |||||
elif environment == 'test': | elif environment == 'test': | ||||
scheduled_task_sync_wx_info_interval = 60*11 | scheduled_task_sync_wx_info_interval = 60*11 | ||||
scheduled_task_add_contacts_from_chatrooms_interval = 60*11 | scheduled_task_add_contacts_from_chatrooms_interval = 60*11 | ||||
else: | else: | ||||
scheduled_task_sync_wx_info_interval = 6000 | scheduled_task_sync_wx_info_interval = 6000 | ||||
scheduled_task_add_contacts_from_chatrooms_interval=6000 | |||||
scheduled_task_add_contacts_from_chatrooms_interval=60 | |||||
# 定义定时任务列表 (任务 ID, 任务名称, 执行间隔秒, 任务参数) | # 定义定时任务列表 (任务 ID, 任务名称, 执行间隔秒, 任务参数) | ||||
@@ -1388,7 +1388,7 @@ class GeWeService: | |||||
async def save_task_run_time_async(self,task_name,log:list,expire_time=None): | async def save_task_run_time_async(self,task_name,log:list,expire_time=None): | ||||
hash_key = f"__AI_OPS_WX__:TASK_RUN_TIME:{task_name}" | hash_key = f"__AI_OPS_WX__:TASK_RUN_TIME:{task_name}" | ||||
await self.redis_service.set_hash(hash_key, "data,", json.dumps(log, ensure_ascii=False), expire_time) | |||||
await self.redis_service.set_hash(hash_key,{"data": json.dumps(log, ensure_ascii=False)}, expire_time) | |||||
async def get_task_run_time_async(self,task_name)->list: | async def get_task_run_time_async(self,task_name)->list: | ||||
hash_key = f"__AI_OPS_WX__:TASK_RUN_TIME:{task_name}" | hash_key = f"__AI_OPS_WX__:TASK_RUN_TIME:{task_name}" | ||||
@@ -168,6 +168,9 @@ class KafkaService: | |||||
"""Start both producer and consumer""" | """Start both producer and consumer""" | ||||
await self.connect_producer() | await self.connect_producer() | ||||
async def stop_producer(self): | |||||
if self.producer: | |||||
await self.producer.stop() | |||||
async def stop(self): | async def stop(self): | ||||
"""Graceful shutdown""" | """Graceful shutdown""" | ||||
@@ -25,19 +25,12 @@ class RedisService: | |||||
async def set_hash(self, hash_key, data, timeout=None): | async def set_hash(self, hash_key, data, timeout=None): | ||||
"""添加或更新哈希,并设置有效期""" | """添加或更新哈希,并设置有效期""" | ||||
await self.client.hmset_dict(hash_key, data) | |||||
# 使用 hmset 方法设置哈希表数据 | |||||
await self.client.hmset(hash_key, data) | |||||
if timeout: | if timeout: | ||||
# 设置有效期(单位:秒) | # 设置有效期(单位:秒) | ||||
await self.client.expire(hash_key, timeout) | await self.client.expire(hash_key, timeout) | ||||
# async def set_hash(self, hash_key, data, timeout=None): | |||||
# """添加或更新哈希,并设置有效期""" | |||||
# # 使用 hmset 方法设置哈希表数据 | |||||
# await self.client.hmset(hash_key, data) | |||||
# if timeout: | |||||
# # 设置有效期(单位:秒) | |||||
# await self.client.expire(hash_key, timeout) | |||||
# async def set_hash(self, hash_key, data, timeout=None): | # async def set_hash(self, hash_key, data, timeout=None): | ||||
# """添加或更新哈希,并设置有效期""" | # """添加或更新哈希,并设置有效期""" | ||||
@@ -732,13 +732,14 @@ def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config, | |||||
k_message = wx_add_contacts_from_chatroom_message(history.wxid,history.chatroomId,history.contactWixd,history.addTime) | k_message = wx_add_contacts_from_chatroom_message(history.wxid,history.chatroomId,history.contactWixd,history.addTime) | ||||
await kafka_service.send_message_async(k_message) | await kafka_service.send_message_async(k_message) | ||||
await asyncio.sleep(random.uniform(1.5, 3)) | await asyncio.sleep(random.uniform(1.5, 3)) | ||||
await asyncio.sleep(random.uniform(1.5, 3)) | await asyncio.sleep(random.uniform(1.5, 3)) | ||||
except Exception as e: | except Exception as e: | ||||
logger.error(f"任务执行过程中发生异常: {e}") | logger.error(f"任务执行过程中发生异常: {e}") | ||||
finally: | |||||
await kafka_service.stop_producer() | |||||
loop = asyncio.get_event_loop() | loop = asyncio.get_event_loop() | ||||
if loop.is_closed(): | if loop.is_closed(): | ||||