from fastapi import FastAPI,Request from pydantic import BaseModel from contextlib import asynccontextmanager from celery import Celery # from aiokafka import AIOKafkaConsumer import asyncio import json import time import uvicorn import logging from logging.handlers import TimedRotatingFileHandler,RotatingFileHandler from starlette.middleware.base import BaseHTTPMiddleware from starlette.middleware.gzip import GZipMiddleware from services.gewe_service import GeWeService # 导入 GeWeChatCom from common.log import logger from app.endpoints.config_endpoint import config_router from app.endpoints.contacts_endpoint import contacts_router from app.endpoints.groups_endpoint import groups_router from app.endpoints.sns_endpoint import sns_router from app.endpoints.agent_endpoint import agent_router from app.endpoints.pipeline_endpoint import messages_router from app.endpoints.label_endpoint import label_router from app.endpoints.add_contacts_endpoint import add_contacts_group_history_router from tasks import background_worker_task from services.redis_service import RedisService from services.kafka_service import KafkaService from services.biz_service import BizService from app.middleware import http_context from celery.result import AsyncResult from tasks import add_task,sync_contacts_task from config import load_config from config import conf from common.utils import * load_config() # Kafka 配置 #KAFKA_BOOTSTRAP_SERVERS = '192.168.2.121:9092' KAFKA_BOOTSTRAP_SERVERS = conf().get("kafka_bootstrap_servers") KAFKA_TOPIC = 'topic.ai.ops.wx' KAFKA_GROUP_ID = 'ai-ops-wx' # 用于存储后台任务的全局变量 background_tasks = set() async def kafka_consumer(): while True: # 这里模拟 Kafka 消费者的逻辑 # print("Kafka consumer is running...") await asyncio.sleep(1) async def background_worker(redis_service:RedisService,kafka_service:KafkaService,gewe_service:GeWeService): lock_name = "background_wxchat_thread_lock" lock_identifier = str(time.time()) # 使用时间戳作为唯一标识 while True: # 尝试获取分布式锁 if await redis_service.acquire_lock(lock_name, timeout=10): try: logger.info("分布式锁已成功获取") # 启动任务 print('启动任务') # 启动后台任务 await startup_sync_data_task_async(redis_service, kafka_service, gewe_service) # 确保传递了正确的参数 print('启动任务完成') # 保持锁的续期 while True: await asyncio.sleep(30) # 每30秒检查一次锁的状态 if not await redis_service.renew_lock(lock_name, lock_identifier, timeout=60): break # 如果无法续期锁,退出循环 finally: # 释放锁 await redis_service.release_lock(lock_name, lock_identifier) break # 任务完成后退出循环 else: # 如果获取锁失败,等待一段时间后重试 logger.info("获取分布式锁失败,等待10秒后重试...") await asyncio.sleep(10) async def startup_sync_data_task_async(redis_service: RedisService, kafka_service: KafkaService, gewe_service: GeWeService): try: login_keys = [] async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): # 使用 async for 遍历异步生成器 login_keys.append(key) for k in login_keys: r = await redis_service.get_hash(k) app_id = r.get("appId") token_id = r.get("tokenId") wxid = r.get("wxid") status = r.get('status') if status == '0': continue # 同步联系人列表 ret, msg, contacts_list = await gewe_service.fetch_contacts_list_async(token_id, app_id) if ret != 200: logger.warning(f"同步联系人列表失败: {ret}-{msg}") continue friend_wxids = [c for c in contacts_list['friends'] if c not in ['fmessage', 'medianote', 'weixin', 'weixingongzhong']] # 可以调整截取范围 data = await gewe_service.save_contacts_brief_to_cache_async(token_id, app_id, wxid, friend_wxids) chatrooms = contacts_list['chatrooms'] # 同步群列表 logger.info(f'{wxid} 的群数量 {len(chatrooms)}') logger.info(f'{wxid} 同步群列表') await gewe_service.save_groups_info_to_cache_async(token_id, app_id, wxid, chatrooms) logger.info(f'{wxid} 同步群成员') # 同步群成员 await gewe_service.save_groups_members_to_cache_async(token_id, app_id, wxid, chatrooms) logger.info(f'{wxid} 好友信息推送到kafka') # 联系人推送到kafka #k_message = wx_all_contacts_message(wxid, data) k_message = wx_all_contacts_key_message(wxid) await kafka_service.send_message_async(k_message) # 全量群信息推送到kafka #all_groups_info_menbers=await gewe_service.get_groups_info_members_from_cache_async(wxid) #k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers) k_message=wx_groups_info_members_key_message(wxid) await kafka_service.send_message_async(k_message) except Exception as e: logger.error(f"任务执行过程中发生异常: {e}") @asynccontextmanager async def lifespan(app: FastAPI): #app.state.redis_helper = RedisHelper(host='192.168.2.121',password='telpo#1234', port=8090, db=3) # 初始化 RedisHelper redis_service = RedisService() redis_host=conf().get("redis_host") redis_port=conf().get("redis_port") redis_password=conf().get("redis_password") redis_db=conf().get("redis_db") wx_chat_api=conf().get("wx_chat_api") await redis_service.init(host=redis_host,port=redis_port, password=redis_password, db=redis_db) app.state.redis_service = redis_service # 初始化 KafkaService kafka_service= KafkaService(KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC, KAFKA_TOPIC,KAFKA_GROUP_ID) await kafka_service.start() app.state.kafka_service = kafka_service # redis_service_instance=app.state.redis_service # 初始化 GeWeChatCom #app.state.gewe_service = await GeWeService.get_instance(redis_service,"http://api.geweapi.com/gewe") app.state.gewe_service = await GeWeService.get_instance(redis_service,wx_chat_api) gewe_service=app.state.gewe_service # 初始化业务服务 biz_service = BizService(app) app.state.biz_service = biz_service biz_service.setup_handlers() # 初始化消息锁 app.state.message_lock = { (await redis_service.get_hash(key)).get("appId"): asyncio.Lock() async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*') } # 在应用程序启动时启动 Kafka 消费者任务 # try: # yield # 应用程序运行期间 # finally: # # 在应用程序关闭时取消所有后台任务 # await kafka_service.stop() #task = asyncio.create_task(kafka_consumer()) redis_config = { 'host': conf().get("redis_host"), 'port': conf().get("redis_port"), 'password': conf().get("redis_password"), 'db': conf().get("redis_db"), } kafka_config = { 'bootstrap_servers': KAFKA_BOOTSTRAP_SERVERS, 'topic': KAFKA_TOPIC, 'group_id': KAFKA_GROUP_ID, } gewe_config = { 'api_url': wx_chat_api, } # Use Celery task # worker_task = background_worker_task.delay(redis_config, kafka_config, gewe_config) # background_tasks.add(worker_task) environment = os.environ.get('environment', 'default') if environment != 'default': task=asyncio.create_task(background_worker(redis_service,kafka_service,gewe_service)) background_tasks.add(task) try: yield # 应用程序运行期间 finally: # # 在应用程序关闭时取消所有后台任务 # task.cancel() # try: # await task # except asyncio.CancelledError: # pass background_tasks.clear() # 关闭 KafkaService print('应用关闭') await kafka_service.stop() app = FastAPI(lifespan=lifespan,max_request_size=100 * 1024 * 1024) # 配置日志:输出到文件,文件最大 10MB,保留 5 个备份文件 # log_handler = RotatingFileHandler( # "app.log", # 日志文件名 # maxBytes=10 * 1024 * 1024, # 文件大小限制:10MB # backupCount=5, # 保留 5 个备份文件 # encoding="utf-8" # 日志文件的编码 # ) # # 设置日志格式 # log_handler.setFormatter( # logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') # ) # 获取根日志记录器,并设置日志级别为 INFO # logging.basicConfig( # level=logging.INFO, # 设置日志记录级别 # handlers=[log_handler] # 配置文件日志处理器 # ) app.add_middleware(BaseHTTPMiddleware, dispatch=http_context) app.add_middleware(GZipMiddleware, minimum_size=1000) app.include_router(config_router) app.include_router(contacts_router) app.include_router(groups_router) app.include_router(sns_router) app.include_router(agent_router) app.include_router(messages_router) app.include_router(label_router) app.include_router(add_contacts_group_history_router) @app.get("/") async def root(): logger.info("Root route is called") return {"message": "Kafka consumer is running in the background"} class AddRequest(BaseModel): x: int y: int @app.post("/add") async def add_numbers(request: AddRequest): task = add_task.delay(request.x, request.y) return {"task_id": task.id} @app.get("/task/{task_id}") async def get_task_status(task_id: str): task_result = AsyncResult(task_id) return { "task_id": task_id, "task_status": task_result.status, "task_result": task_result.result }