From 12a3358538576ab2c4651f2b07eb0161bd4cfd19 Mon Sep 17 00:00:00 2001 From: H Vs Date: Mon, 31 Mar 2025 15:02:48 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4=E8=A7=86=E9=A2=91=E8=BD=AC?= =?UTF-8?q?=E5=8F=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 2 +- celery_app.py | 245 ++++++++++++++++++++++++++++---------- celery_config.py | 55 +++++++++ run.py | 48 +++++++- services/biz_service.py | 4 +- services/gewe_service.py | 4 +- services/redis_service.py | 32 +++++ tasks.py | 113 +++++++++++++++++- 8 files changed, 433 insertions(+), 70 deletions(-) create mode 100644 celery_config.py diff --git a/.gitignore b/.gitignore index 2732b7a..9f8ccd6 100644 --- a/.gitignore +++ b/.gitignore @@ -36,4 +36,4 @@ client_config.json !plugins.json tmp/ logs/ -cmd.txt \ No newline at end of file +*.txt \ No newline at end of file diff --git a/celery_app.py b/celery_app.py index 0589f84..7788fe4 100644 --- a/celery_app.py +++ b/celery_app.py @@ -1,46 +1,50 @@ -from celery import Celery +#from celery import Celery import celery.schedules from redbeat import RedBeatSchedulerEntry from datetime import timedelta +from celery.schedules import crontab +from celery_config import * from services.redis_service import RedisService from services.kafka_service import KafkaService from services.biz_service import BizService -from config import load_config,conf +#from config import load_config,conf from urllib.parse import quote -import asyncio,os - - -load_config() - -KAFKA_BOOTSTRAP_SERVERS = conf().get("kafka_bootstrap_servers") -KAFKA_TOPIC = 'topic.ai.ops.wx' -KAFKA_GROUP_ID = 'ai-ops-wx' - -redis_host=conf().get("redis_host") -redis_port=conf().get("redis_port") -redis_password=conf().get("redis_password") -redis_db=conf().get("redis_db") -encoded_password = quote(redis_password) -# 配置 Celery -celery_app = Celery( - "worker", - broker=f"redis://:{encoded_password}@{redis_host}:{redis_port}/{redis_db}", - backend=f"redis://:{encoded_password}@{redis_host}:{redis_port}/{redis_db}", - include=['tasks'] -) - -# 配置 redbeat 作为 Celery Beat 调度器 -celery_app.conf.update( - timezone="Asia/Shanghai", # 设定时区 - beat_scheduler="redbeat.RedBeatScheduler", # 使用 RedBeat 作为调度器 - redbeat_redis_url=f"redis://:{encoded_password}@{redis_host}:{redis_port}/{redis_db}" # redbeat 存储任务调度信息的 Redis - , - redbeat_lock_timeout=60, # 避免多个 Beat 实例冲突 - beat_max_loop_interval=5 # 让 Celery Beat 每 5 秒检查一次任务 -) +import asyncio,os,random,sys +#from tasks import add_friends_task +from common.log import logger + + +# load_config() + +# KAFKA_BOOTSTRAP_SERVERS = conf().get("kafka_bootstrap_servers") +# KAFKA_TOPIC = 'topic.ai.ops.wx' +# KAFKA_GROUP_ID = 'ai-ops-wx' + +# redis_host=conf().get("redis_host") +# redis_port=conf().get("redis_port") +# redis_password=conf().get("redis_password") +# redis_db=conf().get("redis_db") +# encoded_password = quote(redis_password) +# # 配置 Celery +# celery_app = Celery( +# "worker", +# broker=f"redis://:{encoded_password}@{redis_host}:{redis_port}/{redis_db}", +# backend=f"redis://:{encoded_password}@{redis_host}:{redis_port}/{redis_db}", +# include=['tasks'] +# ) + +# # 配置 redbeat 作为 Celery Beat 调度器 +# celery_app.conf.update( +# timezone="Asia/Shanghai", # 设定时区 +# beat_scheduler="redbeat.RedBeatScheduler", # 使用 RedBeat 作为调度器 +# redbeat_redis_url=f"redis://:{encoded_password}@{redis_host}:{redis_port}/{redis_db}" # redbeat 存储任务调度信息的 Redis +# , +# redbeat_lock_timeout=60, # 避免多个 Beat 实例冲突 +# beat_max_loop_interval=5 # 让 Celery Beat 每 5 秒检查一次任务 +# ) # task_name = "tasks.scheduled_task" # # 任务执行间隔(每 10 秒执行一次) @@ -76,46 +80,165 @@ celery_app.conf.update( # ) # redbeat_entry.save() -# 获取配置文件中的 redis_config、kafka_config、gewe_config -redis_config = { - 'host': redis_host, - 'port': redis_port, - 'password': redis_password, - 'db': redis_db, -} - - -kafka_config = { - 'bootstrap_servers': KAFKA_BOOTSTRAP_SERVERS, - 'topic': KAFKA_TOPIC, - 'group_id': KAFKA_GROUP_ID, -} -gewe_config = { - 'api_url': "http://api.geweapi.com/gewe", -} - -scheduled_task_sync_wx_info_interval = 10 +# # 获取配置文件中的 redis_config、kafka_config、gewe_config +# redis_config = { +# 'host': redis_host, +# 'port': redis_port, +# 'password': redis_password, +# 'db': redis_db, +# } + + +# kafka_config = { +# 'bootstrap_servers': KAFKA_BOOTSTRAP_SERVERS, +# 'topic': KAFKA_TOPIC, +# 'group_id': KAFKA_GROUP_ID, +# } +# gewe_config = { +# 'api_url': "http://api.geweapi.com/gewe", +# } + +scheduled_task_sync_wx_info_interval = 6000 environment = os.environ.get('environment', 'default') if environment != 'default': scheduled_task_sync_wx_info_interval = 60*11 + # 定义定时任务列表 (任务 ID, 任务名称, 执行间隔秒, 任务参数) -tasks_schedule = [ - #("redbeat:scheduled_task", "tasks.scheduled_task", 3, []), - #("redbeat:scheduled_task_sync_wx", "tasks.scheduled_task_sync_wx", 15, []), - ("redbeat:scheduled_task_sync_wx_info", "tasks.scheduled_task_sync_wx_info", scheduled_task_sync_wx_info_interval, [redis_config, kafka_config, gewe_config]), +# tasks_schedule = [ +# #("redbeat:scheduled_task", "tasks.scheduled_task", 3, []), +# #("redbeat:scheduled_task_sync_wx", "tasks.scheduled_task_sync_wx", 15, []), +# ("redbeat:scheduled_task_sync_wx_info", "tasks.scheduled_task_sync_wx_info", scheduled_task_sync_wx_info_interval, [redis_config, kafka_config, gewe_config]), +# #("redbeat:add_friends_task", "tasks.add_friends_task", random.randint(1, 10), [redis_config]), # 10分钟执行一次 +# ] + + +# # 注册 RedBeat 任务 +# for task_id, task_name, interval, task_args in tasks_schedule: +# redbeat_entry = RedBeatSchedulerEntry( +# name=task_id, +# task=task_name, +# schedule=celery.schedules.schedule(timedelta(seconds=interval)), +# args=task_args, +# app=celery_app +# ) +# redbeat_entry.save() -] +# # 为 add_friends_task 生成随机的分钟和小时 +random_minute = random.randint(0, 59) +random_hour = random.randint(0, 23) # 可以根据需要调整小时范围 -# 注册 RedBeat 任务 -for task_id, task_name, interval, task_args in tasks_schedule: +# 定义定时任务列表 (任务 ID, 任务名称, 执行间隔或调度, 任务参数) +tasks_schedule = [ + # 其他任务保持不变 + ("redbeat:scheduled_task_sync_wx_info", "tasks.scheduled_task_sync_wx_info", + celery.schedules.schedule(timedelta(seconds=scheduled_task_sync_wx_info_interval)), + [redis_config, kafka_config, gewe_config]), + + # # add_friends_task 使用 crontab 随机时间 + # ("redbeat:add_friends_task", "tasks.add_friends_task", + # crontab(minute=random_minute, hour='*'), # 每小时在随机分钟执行 + # # 如果想每天在随机时间执行,可以用: crontab(minute=random_minute, hour=random_hour) + # [redis_config]), +] + +# # 注册 RedBeat 任务 +for task_id, task_name, schedule_obj, task_args in tasks_schedule: redbeat_entry = RedBeatSchedulerEntry( name=task_id, task=task_name, - schedule=celery.schedules.schedule(timedelta(seconds=interval)), - args=task_args, + schedule=schedule_obj, # 现在使用 schedule_obj 而不是创建新的 schedule + args=task_args, app=celery_app ) redbeat_entry.save() + # 如果是 add_friends_task,打印其随机调度信息 + if task_name == "tasks.add_friends_task": + if isinstance(schedule_obj, crontab): + print(f"已注册 `{task_name}` 任务,将在每小时的第 {random_minute} 分钟执行") + logger.info(f"已注册 `{task_name}` 任务,将在每小时的第 {random_minute} 分钟执行") + else: + print('scheduled_task_sync_wx_info 定时任务执行成功!') + logger.info(f"scheduled_task_sync_wx_info 定时任务执行成功!") + + +# def setup_schedule(): +# # 初始化第一次执行(例如:15秒后执行) +# initial_run_in = 5 +# entry = RedBeatSchedulerEntry( +# name='random-task', +# task='tasks.random_scheduled_task', +# schedule=timedelta(seconds=initial_run_in), +# app=celery_app +# ) +# entry.save() +# print(f"Initial task scheduled in {initial_run_in} seconds") + +# setup_schedule() + +# for task_id, task_name, schedule_obj, task_args in tasks_schedule: +# entry_key = f'redbeat:{task_id}' # RedBeat 任务的键 +# existing_entry = None + + +# try: +# existing_entry = RedBeatSchedulerEntry.from_key(entry_key, app=celery_app) +# except KeyError: +# # 任务不存在 +# pass + +# print(existing_entry) + +# if existing_entry: +# print(f"任务 `{task_name}` 已存在,跳过注册") +# logger.info(f"任务 `{task_name}` 已存在,跳过注册") +# else: +# redbeat_entry = RedBeatSchedulerEntry( +# name=task_id, +# task=task_name, +# schedule=schedule_obj, # 现在使用 schedule_obj 而不是创建新的 schedule +# args=task_args, +# app=celery_app +# ) +# redbeat_entry.save() + +# if task_name == "tasks.add_friends_task": +# if isinstance(schedule_obj, crontab): +# print(f"已注册 `{task_name}` 任务,将在每小时的第 {schedule_obj._orig_minute} 分钟执行") +# logger.info(f"已注册 `{task_name}` 任务,将在每小时的第 {schedule_obj._orig_minute} 分钟执行") +# else: +# print("scheduled_task_sync_wx_info 定时任务执行成功!") +# logger.info("scheduled_task_sync_wx_info 定时任务执行成功!") + + + + + + +# # **仅在首次启动时,手动注册 RedBeat 任务** +# def register_initial_tasks(): +# try: +# # 生成一个随机的首次任务执行间隔(5-15 分钟) +# initial_interval = random.randint(1, 3) +# #initial_interval= + +# # RedBeat 任务注册 +# redbeat_entry = RedBeatSchedulerEntry( +# name="redbeat:add_friends_task", +# task="tasks.add_friends_task", +# schedule=celery.schedules.schedule(timedelta(seconds=initial_interval)), +# args=[redis_config], +# app=celery_app +# ) +# redbeat_entry.save() +# print(f"已注册 `tasks.add_friends_task` 任务,首次将在 {initial_interval} 秒后执行") +# except Exception as e: +# print(f"任务注册失败: {e}") + +# register_initial_tasks() + +#add_friends_task.apply_async(args=[redis_config]) + +#trigger_initial_task() \ No newline at end of file diff --git a/celery_config.py b/celery_config.py new file mode 100644 index 0000000..e984dcd --- /dev/null +++ b/celery_config.py @@ -0,0 +1,55 @@ +from celery import Celery + + +from config import load_config,conf +from urllib.parse import quote + + +load_config() + +KAFKA_BOOTSTRAP_SERVERS = conf().get("kafka_bootstrap_servers") +KAFKA_TOPIC = 'topic.ai.ops.wx' +KAFKA_GROUP_ID = 'ai-ops-wx' + +redis_host=conf().get("redis_host") +redis_port=conf().get("redis_port") +redis_password=conf().get("redis_password") +redis_db=conf().get("redis_db") +encoded_password = quote(redis_password) + +# 配置 Celery +celery_app = Celery( + "worker", + broker=f"redis://:{encoded_password}@{redis_host}:{redis_port}/{redis_db}", + backend=f"redis://:{encoded_password}@{redis_host}:{redis_port}/{redis_db}", + include=['tasks'] +) + +# 配置 redbeat 作为 Celery Beat 调度器 +celery_app.conf.update( + timezone="Asia/Shanghai", # 设定时区 + beat_scheduler="redbeat.RedBeatScheduler", # 使用 RedBeat 作为调度器 + redbeat_redis_url=f"redis://:{encoded_password}@{redis_host}:{redis_port}/{redis_db}" # redbeat 存储任务调度信息的 Redis + , + redbeat_lock_timeout=60, # 避免多个 Beat 实例冲突 + beat_max_loop_interval=5 # 让 Celery Beat 每 5 秒检查一次任务 +) + + +# 获取配置文件中的 redis_config、kafka_config、gewe_config +redis_config = { + 'host': redis_host, + 'port': redis_port, + 'password': redis_password, + 'db': redis_db, +} + + +kafka_config = { + 'bootstrap_servers': KAFKA_BOOTSTRAP_SERVERS, + 'topic': KAFKA_TOPIC, + 'group_id': KAFKA_GROUP_ID, +} +gewe_config = { + 'api_url': "http://api.geweapi.com/gewe", +} \ No newline at end of file diff --git a/run.py b/run.py index cefe7a1..de0e476 100644 --- a/run.py +++ b/run.py @@ -1,6 +1,11 @@ import subprocess import sys import os +import time +import signal + +processes = [] + def start_fastapi(): """ 启动 FastAPI 服务 """ @@ -14,14 +19,17 @@ def start_fastapi(): def start_celery_worker(): """ 启动 Celery Worker """ if sys.platform == "win32": - process = subprocess.Popen(["celery", "-A", "celery_app", "worker", "--loglevel=info", "-P", "solo"]) + process = subprocess.Popen(["celery", "-A", "celery_app", "worker", "--loglevel=info", "-P", "solo"], + stdout=None, stderr=None) else: - process = subprocess.Popen(["celery", "-A", "celery_app", "worker", "--loglevel=info"]) + process = subprocess.Popen(["celery", "-A", "celery_app", "worker", "--loglevel=info"], + stdout=None, stderr=None) return process def start_celery_beat(): """ 启动 Celery Beat,使用 RedBeat 作为调度器 """ - process = subprocess.Popen(["celery", "-A", "celery_app", "beat", "--scheduler", "redbeat.RedBeatScheduler", "--loglevel=info"]) + process = subprocess.Popen(["celery", "-A", "celery_app", "beat", "--scheduler", "redbeat.RedBeatScheduler", "--loglevel=info"], + stdout=None, stderr=None) return process if __name__ == "__main__": @@ -34,3 +42,37 @@ if __name__ == "__main__": fastapi_process.wait() celery_worker_process.wait() celery_beat_process.wait() + + +# def signal_handler(sig, frame): +# """处理退出信号,确保子进程也被终止""" +# print('正在关闭所有服务...') +# for process in processes: +# process.terminate() # 尝试优雅终止 + +# # 给进程一点时间来优雅终止 +# time.sleep(2) + +# # 检查是否有进程仍在运行,如果有则强制终止 +# for process in processes: +# if process.poll() is None: # 进程仍在运行 +# process.kill() # 强制终止 + +# sys.exit(0) + +# if __name__ == "__main__": +# # 注册信号处理程序 +# signal.signal(signal.SIGINT, signal_handler) # Ctrl+C +# signal.signal(signal.SIGTERM, signal_handler) # 终止信号 + +# # 启动 FastAPI、Celery Worker 和 Celery Beat +# #fastapi_process = start_fastapi() +# celery_worker_process = start_celery_worker() +# celery_beat_process = start_celery_beat() + +# # 让主进程保持运行,但不阻塞在特定子进程上 +# try: +# while True: +# time.sleep(1) # 简单地循环等待,让信号处理程序有机会工作 +# except KeyboardInterrupt: +# pass # 如果收到 KeyboardInterrupt,信号处理程序会处理 diff --git a/services/biz_service.py b/services/biz_service.py index 7c32c5e..62ab11d 100644 --- a/services/biz_service.py +++ b/services/biz_service.py @@ -113,6 +113,7 @@ class BizService(): self.wxchat.forward_video_aeskey = '' self.wxchat.forward_video_cdnvideourl = '' self.wxchat.forward_video_length = 0 + self.wxchat.video_duration = 0 for intersection_wxid in intersection_wxids: for wx_content in wx_content_list: @@ -243,8 +244,9 @@ class BizService(): self.wxchat.forward_video_aeskey = res["aesKey"] self.wxchat.forward_video_cdnvideourl = res["cdnThumbUrl"] self.wxchat.forward_video_length = res["length"] + self.wxchat.video_duration=video_duration else: - ret,ret_msg,res = await self.wxchat.forward_video_async(token_id, app_id, t, self.wxchat.forward_video_aeskey, self.wxchat.forward_video_cdnvideourl, self.wxchat.forward_video_length) + ret,ret_msg,res = await self.wxchat.forward_video_async(token_id, app_id, t, self.wxchat.forward_video_aeskey, self.wxchat.forward_video_cdnvideourl, self.wxchat.forward_video_length,self.wxchat.video_duration) print('转发视频') if ret==200: logger.info(f'{agent_wxid} 向 {t} 发送视频【{file_url}】{ret_msg}') diff --git a/services/gewe_service.py b/services/gewe_service.py index 3346034..4210ba7 100644 --- a/services/gewe_service.py +++ b/services/gewe_service.py @@ -377,7 +377,7 @@ class GeWeService: response_object = await response.json() return response_object.get('data', None), response_object.get('ret', None), response_object.get('msg', None) - async def forward_video_async(self, token_id, app_id, to_wxid, aeskey, cdnvideourl, length): + async def forward_video_async(self, token_id, app_id, to_wxid, aeskey, cdnvideourl, length,video_duration): api_url = f"{self.base_url}/v2/api/message/forwardVideo" headers = { 'X-GEWE-TOKEN': token_id, @@ -386,7 +386,7 @@ class GeWeService: data = { "appId": app_id, "toWxid": to_wxid, - "xml": f"\n\n\t\n" + "xml": f"\n\n\t\n" } async with aiohttp.ClientSession() as session: async with session.post(api_url, headers=headers, json=data) as response: diff --git a/services/redis_service.py b/services/redis_service.py index 8d7a9fd..8940a89 100644 --- a/services/redis_service.py +++ b/services/redis_service.py @@ -193,6 +193,38 @@ class RedisService: await self.client.delete(queue_name) print(f"Cleared queue {queue_name}") + async def increment_hash_field(self, hash_key, field, amount=1): + """ + 对哈希表中的指定字段进行递增操作(原子性)。 + + :param hash_key: 哈希表的 key + :param field: 要递增的字段 + :param amount: 递增的数值(默认为 1) + :return: 递增后的值 + """ + return await self.client.hincrby(hash_key, field, amount) + + async def expire(self, key, timeout): + """ + 设置 Redis 键的过期时间(单位:秒) + + :param key: Redis 键 + :param timeout: 过期时间(秒) + """ + await self.client.expire(key, timeout) + + async def expire_field(self, hash_key, field, timeout): + """ + 通过辅助键方式设置哈希表中某个字段的过期时间 + + :param hash_key: 哈希表的 key + :param field: 要设置过期的字段 + :param timeout: 过期时间(秒) + """ + expire_key = f"{hash_key}:{field}:expire" + await self.client.set(expire_key, "1") + await self.client.expire(expire_key, timeout) + # Dependency injection helper function async def get_redis_service(request: Request) -> RedisService: return request.app.state.redis_serive \ No newline at end of file diff --git a/tasks.py b/tasks.py index 3be0b7e..b562bc3 100644 --- a/tasks.py +++ b/tasks.py @@ -1,12 +1,16 @@ from celery_app import celery_app from fastapi import Request,FastAPI -import time +import time,datetime +from celery import Celery +import celery.schedules +from redbeat import RedBeatSchedulerEntry +from datetime import timedelta from services.redis_service import RedisService from services.kafka_service import KafkaService from services.gewe_service import GeWeService from common.log import logger -import asyncio +import asyncio,random @celery_app.task(name='tasks.add_task', bind=True, acks_late=True) @@ -190,3 +194,108 @@ def scheduled_task_sync_wx_info(self, redis_config, kafka_config, gewe_config): asyncio.set_event_loop(loop) loop.run_until_complete(task()) # 在现有事件循环中运行任务 + + + +REDIS_KEY_PATTERN = "friend_add_limit:{date}" +REDIS_LAST_RUN_KEY = "last_run_time:add_friends_task" + +@celery_app.task(name='tasks.add_friends_task', bind=True, acks_late=True) +def add_friends_task(self,redis_config): + """ + 限制每天最多 15 个,每 2 小时最多 8 个 + """ + async def task(): + redis_service = RedisService() + await redis_service.init(**redis_config) + today_str = datetime.datetime.now().strftime("%Y%m%d") + redis_key = REDIS_KEY_PATTERN.format(date=today_str) + + # 获取当前总添加数量 + total_added = await redis_service.get_hash_field(redis_key, "total") or 0 + last_2h_added =await redis_service.get_hash_field(redis_key, "last_2h") or 0 + + total_added = int(total_added) + last_2h_added = int(last_2h_added) + + logger.info(f"当前添加好友总数: {total_added}, 过去2小时添加: {last_2h_added}") + + # 判断是否超过限制 + if total_added >= 15: + logger.warning("今日好友添加已达上限!") + return + + if last_2h_added >= 8: + logger.warning("过去2小时添加已达上限!") + return + + # 计算本次要添加的好友数量 (控制每天 5-15 个) + max_add = min(15 - total_added, 8 - last_2h_added) + if max_add <= 0: + return + + num_to_add = min(max_add, 1) # 每次最多加 1 个 + logger.info(f"本次添加 {num_to_add} 位好友") + + # TODO: 调用好友添加逻辑 (接口 or 业务逻辑) + # success = add_friends(num_to_add) + + success = num_to_add # 假设成功添加 num_to_add 个 + + # 更新 Redis 计数 + if success > 0: + await redis_service.increment_hash_field(redis_key, "total", success) + await redis_service.increment_hash_field(redis_key, "last_2h", success) + + # 设置 Redis 过期时间 (每日记录存 1 天, 2 小时记录存 2 小时) + await redis_service.expire(redis_key, 86400) # 24小时 + await redis_service.expire_field(redis_key, "last_2h", 7200) # 2小时 + + logger.info(f"成功添加 {success} 位好友, 今日总数 {total_added + success}") + + # 生成一个新的随机时间(5-15 分钟之间) + # next_interval = random.randint(10, 20) + + # # 计算新的执行时间 + # next_run_time = datetime.datetime.now() + timedelta(seconds=next_interval) + + # # 重新注册 RedBeat 任务,确保下次执行时间不同 + # redbeat_entry = RedBeatSchedulerEntry( + # name="redbeat:add_friends_task", + # task="tasks.add_friends_task", + # schedule=celery.schedules.schedule(timedelta(seconds=next_interval)), + # args=[redis_config], + # app=celery_app + # ) + + # # 设置任务的下次执行时间 + # redbeat_entry.last_run_at = next_run_time + # redbeat_entry.save() + + # logger.info(f"下次任务将在 {next_run_time} 执行(间隔 {next_interval} 秒)") + + loop = asyncio.get_event_loop() + + if loop.is_closed(): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + loop.run_until_complete(task()) # 在现有事件循环中运行任务 + + +@celery_app.task(name='tasks.random_scheduled_task', bind=True, acks_late=True) +def random_scheduled_task(self,): + print(f"Task executed at {datetime.datetime.now()}") + # 随机生成下次执行时间(例如:10-60秒内的随机时间) + next_run_in = random.randint(10, 60) + print(f"Next execution will be in {next_run_in} seconds") + + # 设置下次执行时间 + entry = RedBeatSchedulerEntry( + name='random-task', + task='tasks.random_scheduled_task', + schedule=timedelta(seconds=next_run_in), + app=celery_app + ) + entry.save() + return f"Scheduled next run in {next_run_in} seconds" \ No newline at end of file