From 598f85703721293c82f2069746822d4a0bbbfc6c Mon Sep 17 00:00:00 2001 From: H Vs Date: Thu, 27 Mar 2025 09:58:03 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9A=E6=97=B6=E6=9B=B4=E6=96=B0=E5=BE=AE?= =?UTF-8?q?=E4=BF=A1=E8=B5=84=E6=96=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/main.py | 17 ++--- celery_app.py | 150 +++++++++++++++++++++++---------------- run.py | 8 +-- services/gewe_service.py | 21 +++++- tasks.py | 114 +++++++++++++++++++++++++++-- 5 files changed, 230 insertions(+), 80 deletions(-) diff --git a/app/main.py b/app/main.py index d28fe9a..56334a2 100644 --- a/app/main.py +++ b/app/main.py @@ -148,7 +148,7 @@ async def lifespan(app: FastAPI): # redis_service_instance=app.state.redis_service # 初始化 GeWeChatCom - app.state.gewe_service = await GeWeService.get_instance(app,"http://api.geweapi.com/gewe") + app.state.gewe_service = await GeWeService.get_instance(redis_service,"http://api.geweapi.com/gewe") gewe_service=app.state.gewe_service # # 初始化 GeWeChatCom #app.state.gwechat_service = GeWeService(app) @@ -182,8 +182,9 @@ async def lifespan(app: FastAPI): } # Use Celery task - worker_task = background_worker_task.delay(redis_config, kafka_config, gewe_config) - background_tasks.add(worker_task) + # worker_task = background_worker_task.delay(redis_config, kafka_config, gewe_config) + # background_tasks.add(worker_task) + environment = os.environ.get('environment', 'default') if environment != 'default': task=asyncio.create_task(background_worker(redis_service,kafka_service,gewe_service)) @@ -192,11 +193,11 @@ async def lifespan(app: FastAPI): yield # 应用程序运行期间 finally: # # 在应用程序关闭时取消所有后台任务 - task.cancel() - try: - await task - except asyncio.CancelledError: - pass + # task.cancel() + # try: + # await task + # except asyncio.CancelledError: + # pass background_tasks.clear() # 关闭 KafkaService print('应用关闭') diff --git a/celery_app.py b/celery_app.py index 7cdc343..0589f84 100644 --- a/celery_app.py +++ b/celery_app.py @@ -1,66 +1,34 @@ -# from celery import Celery - -# # 创建 Celery 应用 -# celery_app = Celery( -# 'ai_ops_wechat_app', -# broker='redis://:telpo%231234@192.168.2.121:8090/3', -# backend='redis://:telpo%231234@192.168.2.121:8090/3', -# ) - -# # 配置 Celery -# celery_app.conf.update( -# task_serializer='json', -# accept_content=['json'], -# result_serializer='json', -# timezone='Asia/Shanghai', -# enable_utc=True, -# ) - -# #celery_app.autodiscover_tasks(['app.tasks']) - -# from celery import Celery - -# def make_celery(app): -# celery = Celery( -# app.import_name, -# backend=app.config['CELERY_RESULT_BACKEND'], -# broker=app.config['CELERY_BROKER_URL'] -# ) -# celery.conf.update(app.config) -# # 自动发现任务 -# celery.autodiscover_tasks(['app.tasks']) -# return celery - -# # 初始化 Flask -# app = Flask(__name__) -# app.config.update( -# CELERY_BROKER_URL='redis://:telpo%231234@192.168.2.121:8090/3', -# CELERY_RESULT_BACKEND='redis://:telpo%231234@192.168.2.121:8090/3' -# ) - -# celery = make_celery(app) - from celery import Celery import celery.schedules from redbeat import RedBeatSchedulerEntry from datetime import timedelta +from services.redis_service import RedisService +from services.kafka_service import KafkaService +from services.biz_service import BizService + from config import load_config,conf +from urllib.parse import quote +import asyncio,os load_config() +KAFKA_BOOTSTRAP_SERVERS = conf().get("kafka_bootstrap_servers") +KAFKA_TOPIC = 'topic.ai.ops.wx' +KAFKA_GROUP_ID = 'ai-ops-wx' + redis_host=conf().get("redis_host") redis_port=conf().get("redis_port") redis_password=conf().get("redis_password") redis_db=conf().get("redis_db") - +encoded_password = quote(redis_password) # 配置 Celery celery_app = Celery( "worker", - broker=f"redis://:telpo%231234@{redis_host}:{redis_port}/{redis_db}", - backend=f"redis://:telpo%231234@{redis_host}:{redis_port}/{redis_db}", + broker=f"redis://:{encoded_password}@{redis_host}:{redis_port}/{redis_db}", + backend=f"redis://:{encoded_password}@{redis_host}:{redis_port}/{redis_db}", include=['tasks'] ) @@ -68,28 +36,86 @@ celery_app = Celery( celery_app.conf.update( timezone="Asia/Shanghai", # 设定时区 beat_scheduler="redbeat.RedBeatScheduler", # 使用 RedBeat 作为调度器 - redbeat_redis_url=f"redis://:telpo%231234@{redis_host}:{redis_port}/{redis_db}" # redbeat 存储任务调度信息的 Redis + redbeat_redis_url=f"redis://:{encoded_password}@{redis_host}:{redis_port}/{redis_db}" # redbeat 存储任务调度信息的 Redis , redbeat_lock_timeout=60, # 避免多个 Beat 实例冲突 beat_max_loop_interval=5 # 让 Celery Beat 每 5 秒检查一次任务 ) -task_name = "tasks.scheduled_task" -# 任务执行间隔(每 10 秒执行一次) -schedule = celery.schedules.schedule(timedelta(seconds=3)) - -# RedBeat 任务唯一 ID -redbeat_entry = RedBeatSchedulerEntry( - name="redbeat:scheduled_task", # 任务 ID - task=task_name, # 任务名称 - schedule=schedule, # 任务调度时间 - args=[], - app=celery_app +# task_name = "tasks.scheduled_task" +# # 任务执行间隔(每 10 秒执行一次) +# schedule = celery.schedules.schedule(timedelta(seconds=3)) + +# # RedBeat 任务唯一 ID +# redbeat_entry = RedBeatSchedulerEntry( +# name="redbeat:scheduled_task", # 任务 ID +# task=task_name, # 任务名称 +# schedule=schedule, # 任务调度时间 +# args=[], +# app=celery_app -) +# ) + +# # 保存任务到 Redis +# redbeat_entry.save() -# 保存任务到 Redis -redbeat_entry.save() -# 自动发现任务 -#celery.autodiscover_tasks(['tasks']) +# tasks_schedule = [ +# ("redbeat:scheduled_task", "tasks.scheduled_task", 3), +# ("redbeat:scheduled_task_sync_wx", "tasks.scheduled_task_sync_wx", 15), +# ] + +# # 创建并保存 RedBeat 任务 +# for task_id, task_name, interval in tasks_schedule: +# redbeat_entry = RedBeatSchedulerEntry( +# name=task_id, +# task=task_name, +# schedule=celery.schedules.schedule(timedelta(seconds=interval)), +# args=[], +# app=celery_app +# ) +# redbeat_entry.save() + +# 获取配置文件中的 redis_config、kafka_config、gewe_config +redis_config = { + 'host': redis_host, + 'port': redis_port, + 'password': redis_password, + 'db': redis_db, +} + + +kafka_config = { + 'bootstrap_servers': KAFKA_BOOTSTRAP_SERVERS, + 'topic': KAFKA_TOPIC, + 'group_id': KAFKA_GROUP_ID, +} +gewe_config = { + 'api_url': "http://api.geweapi.com/gewe", +} + +scheduled_task_sync_wx_info_interval = 10 +environment = os.environ.get('environment', 'default') +if environment != 'default': + scheduled_task_sync_wx_info_interval = 60*11 + +# 定义定时任务列表 (任务 ID, 任务名称, 执行间隔秒, 任务参数) +tasks_schedule = [ + #("redbeat:scheduled_task", "tasks.scheduled_task", 3, []), + #("redbeat:scheduled_task_sync_wx", "tasks.scheduled_task_sync_wx", 15, []), + ("redbeat:scheduled_task_sync_wx_info", "tasks.scheduled_task_sync_wx_info", scheduled_task_sync_wx_info_interval, [redis_config, kafka_config, gewe_config]), + +] + + + +# 注册 RedBeat 任务 +for task_id, task_name, interval, task_args in tasks_schedule: + redbeat_entry = RedBeatSchedulerEntry( + name=task_id, + task=task_name, + schedule=celery.schedules.schedule(timedelta(seconds=interval)), + args=task_args, + app=celery_app + ) + redbeat_entry.save() diff --git a/run.py b/run.py index 0fdb27f..cefe7a1 100644 --- a/run.py +++ b/run.py @@ -27,10 +27,10 @@ def start_celery_beat(): if __name__ == "__main__": # 启动 FastAPI、Celery Worker 和 Celery Beat fastapi_process = start_fastapi() - #celery_worker_process = start_celery_worker() - #celery_beat_process = start_celery_beat() + celery_worker_process = start_celery_worker() + celery_beat_process = start_celery_beat() # 等待子进程完成 fastapi_process.wait() - #celery_worker_process.wait() - #celery_beat_process.wait() + celery_worker_process.wait() + celery_beat_process.wait() diff --git a/services/gewe_service.py b/services/gewe_service.py index 1fd698b..fac529f 100644 --- a/services/gewe_service.py +++ b/services/gewe_service.py @@ -12,17 +12,18 @@ from fastapi import FastAPI, Depends from common.singleton import singleton from common.log import logger from model.models import AddGroupContactsHistory +from services.redis_service import RedisService #@singleton class GeWeService: _instance = None _lock = asyncio.Lock() # 异步锁,确保单例初始化线程安全 - def __init__(self,app:FastAPI, base_url: str): + def __init__(self,redis_service:RedisService, base_url: str): if GeWeService._instance is not None: raise RuntimeError("请使用 get_instance() 获取单例!") self.base_url = base_url - self.redis_service=app.state.redis_service + self.redis_service=redis_service @classmethod async def get_instance(cls, app:FastAPI,base_url: str = "http://api.geweapi.com/gewe"): @@ -583,6 +584,22 @@ class GeWeService: response_object = await response.json() return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None) + + ############################### 个人模块 ############################### + async def get_profile_async(self, token_id, app_id): + api_url = f"{self.base_url}/v2/api/personal/getProfile" + headers = { + 'X-GEWE-TOKEN': token_id, + 'Content-Type': 'application/json' + } + data = { + "appId": app_id, + } + async with aiohttp.ClientSession() as session: + async with session.post(api_url, headers=headers, json=data) as response: + response_object = await response.json() + return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None) + ############################### 朋友圈模块 ################################### # 在新设备登录后的1-3天内,您将无法使用朋友圈发布、点赞、评论等功能。在此期间,如果尝试进行这些操作,您将收到来自微信团队的提醒。请注意遵守相关规定。 diff --git a/tasks.py b/tasks.py index 999ff41..16f8083 100644 --- a/tasks.py +++ b/tasks.py @@ -82,7 +82,113 @@ def background_worker_task(self, redis_config, kafka_config, gewe_config): -@celery_app.task(name='tasks.scheduled_task', bind=True, acks_late=True) -def scheduled_task(self): - print("🚀 定时任务执行成功!") - return "Hello from Celery Beat + RedBeat!" \ No newline at end of file +# @celery_app.task(name='tasks.scheduled_task', bind=True, acks_late=True) +# def scheduled_task(self): +# print("🚀 定时任务执行成功!~~~~~~~~~~~~~~~~~") +# return "Hello from Celery Beat + RedBeat!" + + +# @celery_app.task(name='tasks.scheduled_task_sync_wx', bind=True, acks_late=True) +# def scheduled_task_sync_wx(self,redis_service,kafka_service,gewe_service): +# print("🚀 scheduled_task_sync_wx 定时任务执行成功!") +# return "Hello from Celery Beat + RedBeat!" + +# @celery_app.task(name='tasks.scheduled_task_sync_wx_info_1', bind=True, acks_late=True) +# def scheduled_task_sync_wx_info_1(self,redis_config, kafka_config, gewe_config): +# ''' +# 定时获取微信号资料 +# ''' +# loop = asyncio.new_event_loop() +# asyncio.set_event_loop(loop) +# async def task(): +# try: +# redis_service = RedisService() +# await redis_service.init(**redis_config) +# # gewe_service = await GeWeService.get_instance(None, gewe_config['api_url']) +# login_keys = [] +# async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): +# login_keys.append(key) + +# print(login_keys) +# # for k in login_keys: +# # r = await redis_service.get_hash(k) +# # app_id = r.get("appId") +# # token_id = r.get("tokenId") +# # wxid = r.get("wxid") +# # status = r.get('status') +# # if status == '0': +# # continue +# # ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id) +# # if ret != 200: +# # logger.warning(f"同步微信号 {wxid} 资料失败: {ret}-{msg}") +# # continue +# # nickname=profile.get("nickName") +# # head_img_url=profile.get("smallHeadImgUrl") +# # r.update({"nickName":nickname,"headImgUrl":head_img_url,"modify_at":int(time.time())}) +# # cleaned_login_info = {k: (v if v is not None else '') for k, v in r.items()} +# # await redis_service.set_hash(k, cleaned_login_info) +# # logger.info(f"同步微信号 {wxid} 资料 成功") +# # redis_service.update_hash_field(k,"nickName",nickname) +# # redis_service.update_hash_field(k,"headImgUrl",head_img_url) +# # redis_service.update_hash_field(k,"modify_at",int(time.time())) +# except Exception as e: +# logger.error(f"任务执行过程中发生异常: {e}") + +# print("scheduled_task_sync_wx_info 定时任务执行成功!") +# return "Hello from Celery Beat + RedBeat!" + +# loop.run_until_complete(task()) +# loop.close() + + + +@celery_app.task(name='tasks.scheduled_task_sync_wx_info', bind=True, acks_late=True) +def scheduled_task_sync_wx_info(self, redis_config, kafka_config, gewe_config): + ''' + 定时获取微信号资料 + ''' + async def task(): + try: + redis_service = RedisService() + await redis_service.init(**redis_config) + gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url']) + login_keys = [] + async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): + login_keys.append(key) + + print(login_keys) + for k in login_keys: + r = await redis_service.get_hash(k) + app_id = r.get("appId") + token_id = r.get("tokenId") + wxid = r.get("wxid") + status = r.get('status') + if status == '0': + continue + ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id) + if ret != 200: + logger.warning(f"同步微信号 {wxid} 资料失败: {ret}-{msg}") + continue + nickname=profile.get("nickName") + head_img_url=profile.get("smallHeadImgUrl") + # print(nickname) + + nickname=profile.get("nickName") + head_img_url=profile.get("smallHeadImgUrl") + r.update({"nickName":nickname,"headImgUrl":head_img_url,"modify_at":int(time.time())}) + cleaned_login_info = {k: (v if v is not None else '') for k, v in r.items()} + await redis_service.set_hash(k, cleaned_login_info) + logger.info(f"定时同步微信号{wxid}-昵称{nickname} 资料成功") + + except Exception as e: + logger.error(f"任务执行过程中发生异常: {e}") + + print("scheduled_task_sync_wx_info 定时任务执行成功!") + return "Hello from Celery Beat + RedBeat!" + + loop = asyncio.get_event_loop() + if loop.is_closed(): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + loop.run_until_complete(task()) # 在现有事件循环中运行任务