|
- from fastapi import FastAPI,Request
- from pydantic import BaseModel
- from contextlib import asynccontextmanager
-
- from celery import Celery
- # from aiokafka import AIOKafkaConsumer
- import asyncio
- import json
- import time
- import uvicorn
-
- import logging
- from logging.handlers import TimedRotatingFileHandler,RotatingFileHandler
- from starlette.middleware.base import BaseHTTPMiddleware
- from starlette.middleware.gzip import GZipMiddleware
- from services.gewe_service import GeWeService # 导入 GeWeChatCom
- from common.log import logger
- from app.endpoints.config_endpoint import config_router
- from app.endpoints.contacts_endpoint import contacts_router
- from app.endpoints.groups_endpoint import groups_router
- from app.endpoints.sns_endpoint import sns_router
- from app.endpoints.agent_endpoint import agent_router
- from app.endpoints.pipeline_endpoint import messages_router
- from tasks import background_worker_task
-
-
-
- from services.redis_service import RedisService
- from services.kafka_service import KafkaService
- from services.biz_service import BizService
-
- from app.middleware import http_context
-
-
- from celery.result import AsyncResult
- from tasks import add_task,sync_contacts_task
- from config import load_config
- from config import conf
- from common.utils import *
-
-
-
- load_config()
-
- # Kafka 配置
- #KAFKA_BOOTSTRAP_SERVERS = '192.168.2.121:9092'
- KAFKA_BOOTSTRAP_SERVERS = conf().get("kafka_bootstrap_servers")
- KAFKA_TOPIC = 'topic.ai.ops.wx'
- KAFKA_GROUP_ID = 'ai-ops-wx'
-
- # 用于存储后台任务的全局变量
- background_tasks = set()
-
- async def kafka_consumer():
- while True:
- # 这里模拟 Kafka 消费者的逻辑
- # print("Kafka consumer is running...")
- await asyncio.sleep(1)
-
- async def background_worker(redis_service:RedisService,kafka_service:KafkaService,gewe_service:GeWeService):
- lock_name = "background_wxchat_thread_lock"
- lock_identifier = str(time.time()) # 使用时间戳作为唯一标识
-
- while True:
- # 尝试获取分布式锁
- if await redis_service.acquire_lock(lock_name, timeout=10):
- try:
- logger.info("分布式锁已成功获取")
- # 启动任务
- print('启动任务')
- # 启动后台任务
-
- await startup_sync_data_task_async(redis_service, kafka_service, gewe_service) # 确保传递了正确的参数
- print('启动任务完成')
- # 保持锁的续期
- while True:
- await asyncio.sleep(30) # 每30秒检查一次锁的状态
- if not await redis_service.renew_lock(lock_name, lock_identifier, timeout=60):
- break # 如果无法续期锁,退出循环
- finally:
- # 释放锁
- await redis_service.release_lock(lock_name, lock_identifier)
- break # 任务完成后退出循环
- else:
- # 如果获取锁失败,等待一段时间后重试
- logger.info("获取分布式锁失败,等待10秒后重试...")
- await asyncio.sleep(10)
-
- async def startup_sync_data_task_async(redis_service: RedisService, kafka_service: KafkaService, gewe_service: GeWeService):
- try:
- login_keys = []
- async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): # 使用 async for 遍历异步生成器
- login_keys.append(key)
- for k in login_keys:
- r = await redis_service.get_hash(k)
- app_id = r.get("appId")
- token_id = r.get("tokenId")
- wxid = r.get("wxid")
- status = r.get('status')
- if status == '0':
- continue
- # 同步联系人列表
- ret, msg, contacts_list = await gewe_service.fetch_contacts_list_async(token_id, app_id)
- if ret != 200:
- logger.warning(f"同步联系人列表失败: {ret}-{msg}")
- continue
- friend_wxids = [c for c in contacts_list['friends'] if c not in ['fmessage', 'medianote', 'weixin', 'weixingongzhong']] # 可以调整截取范围
- data = await gewe_service.save_contacts_brief_to_cache_async(token_id, app_id, wxid, friend_wxids)
- chatrooms = contacts_list['chatrooms']
- # 同步群列表
- logger.info(f'{wxid} 的群数量 {len(chatrooms)}')
- logger.info(f'{wxid} 同步群列表')
- await gewe_service.save_groups_info_to_cache_async(token_id, app_id, wxid, chatrooms)
- logger.info(f'{wxid} 同步群成员')
- # 同步群成员
- await gewe_service.save_groups_members_to_cache_async(token_id, app_id, wxid, chatrooms)
- logger.info(f'{wxid} 好友信息推送到kafka')
- # 联系人推送到kafka
- k_message = wx_all_contacts_message(wxid, data)
- await kafka_service.send_message_async(k_message)
- # 全量群信息推送到kafka
- all_groups_info_menbers=await gewe_service.get_groups_info_members_from_cache_async(wxid)
- k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers)
- await kafka_service.send_message_async(k_message)
-
- except Exception as e:
- logger.error(f"任务执行过程中发生异常: {e}")
-
- @asynccontextmanager
- async def lifespan(app: FastAPI):
-
- #app.state.redis_helper = RedisHelper(host='192.168.2.121',password='telpo#1234', port=8090, db=3)
-
-
- # 初始化 RedisHelper
- redis_service = RedisService()
- redis_host=conf().get("redis_host")
- redis_port=conf().get("redis_port")
- redis_password=conf().get("redis_password")
- redis_db=conf().get("redis_db")
- await redis_service.init(host=redis_host,port=redis_port, password=redis_password, db=redis_db)
- app.state.redis_service = redis_service
-
-
- # 初始化 KafkaService
- kafka_service= KafkaService(KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC, KAFKA_TOPIC,KAFKA_GROUP_ID)
- await kafka_service.start()
- app.state.kafka_service = kafka_service
- # redis_service_instance=app.state.redis_service
-
- # 初始化 GeWeChatCom
- app.state.gewe_service = await GeWeService.get_instance(redis_service,"http://api.geweapi.com/gewe")
- gewe_service=app.state.gewe_service
- # # 初始化 GeWeChatCom
- #app.state.gwechat_service = GeWeService(app)
-
- # 初始化业务服务
- biz_service = BizService(app)
- app.state.biz_service = biz_service
- biz_service.setup_handlers()
- # 在应用程序启动时启动 Kafka 消费者任务
- # try:
- # yield # 应用程序运行期间
- # finally:
- # # 在应用程序关闭时取消所有后台任务
- # await kafka_service.stop()
-
- #task = asyncio.create_task(kafka_consumer())
-
- redis_config = {
- 'host': conf().get("redis_host"),
- 'port': conf().get("redis_port"),
- 'password': conf().get("redis_password"),
- 'db': conf().get("redis_db"),
- }
- kafka_config = {
- 'bootstrap_servers': KAFKA_BOOTSTRAP_SERVERS,
- 'topic': KAFKA_TOPIC,
- 'group_id': KAFKA_GROUP_ID,
- }
- gewe_config = {
- 'api_url': "http://api.geweapi.com/gewe",
- }
-
- # Use Celery task
- # worker_task = background_worker_task.delay(redis_config, kafka_config, gewe_config)
- # background_tasks.add(worker_task)
-
- environment = os.environ.get('environment', 'default')
- if environment != 'default':
- task=asyncio.create_task(background_worker(redis_service,kafka_service,gewe_service))
- background_tasks.add(task)
- try:
- yield # 应用程序运行期间
- finally:
- # # 在应用程序关闭时取消所有后台任务
- # task.cancel()
- # try:
- # await task
- # except asyncio.CancelledError:
- # pass
- background_tasks.clear()
- # 关闭 KafkaService
- print('应用关闭')
- await kafka_service.stop()
-
- app = FastAPI(lifespan=lifespan,max_request_size=100 * 1024 * 1024)
-
- # 配置日志:输出到文件,文件最大 10MB,保留 5 个备份文件
- # log_handler = RotatingFileHandler(
- # "app.log", # 日志文件名
- # maxBytes=10 * 1024 * 1024, # 文件大小限制:10MB
- # backupCount=5, # 保留 5 个备份文件
- # encoding="utf-8" # 日志文件的编码
- # )
-
- # # 设置日志格式
- # log_handler.setFormatter(
- # logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- # )
-
- # 获取根日志记录器,并设置日志级别为 INFO
- # logging.basicConfig(
- # level=logging.INFO, # 设置日志记录级别
- # handlers=[log_handler] # 配置文件日志处理器
- # )
-
-
-
-
- app.add_middleware(BaseHTTPMiddleware, dispatch=http_context)
- app.add_middleware(GZipMiddleware, minimum_size=1000)
-
- app.include_router(config_router)
- app.include_router(contacts_router)
- app.include_router(groups_router)
- app.include_router(sns_router)
- app.include_router(agent_router)
- app.include_router(messages_router)
-
- @app.get("/")
- async def root():
- logger.info("Root route is called")
- return {"message": "Kafka consumer is running in the background"}
-
- class AddRequest(BaseModel):
- x: int
- y: int
-
- @app.post("/add")
- async def add_numbers(request: AddRequest):
- task = add_task.delay(request.x, request.y)
- return {"task_id": task.id}
-
- @app.get("/task/{task_id}")
- async def get_task_status(task_id: str):
- task_result = AsyncResult(task_id)
- return {
- "task_id": task_id,
- "task_status": task_result.status,
- "task_result": task_result.result
- }
|