You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

268 lines
9.6KB

  1. from fastapi import FastAPI,Request
  2. from pydantic import BaseModel
  3. from contextlib import asynccontextmanager
  4. from celery import Celery
  5. # from aiokafka import AIOKafkaConsumer
  6. import asyncio
  7. import json
  8. import time
  9. import uvicorn
  10. import logging
  11. from logging.handlers import TimedRotatingFileHandler,RotatingFileHandler
  12. from starlette.middleware.base import BaseHTTPMiddleware
  13. from starlette.middleware.gzip import GZipMiddleware
  14. from services.gewe_service import GeWeService # 导入 GeWeChatCom
  15. from common.log import logger
  16. from app.endpoints.config_endpoint import config_router
  17. from app.endpoints.contacts_endpoint import contacts_router
  18. from app.endpoints.groups_endpoint import groups_router
  19. from app.endpoints.sns_endpoint import sns_router
  20. from app.endpoints.agent_endpoint import agent_router
  21. from app.endpoints.pipeline_endpoint import messages_router
  22. from app.endpoints.label_endpoint import label_router
  23. from tasks import background_worker_task
  24. from services.redis_service import RedisService
  25. from services.kafka_service import KafkaService
  26. from services.biz_service import BizService
  27. from app.middleware import http_context
  28. from celery.result import AsyncResult
  29. from tasks import add_task,sync_contacts_task
  30. from config import load_config
  31. from config import conf
  32. from common.utils import *
  33. load_config()
  34. # Kafka 配置
  35. #KAFKA_BOOTSTRAP_SERVERS = '192.168.2.121:9092'
  36. KAFKA_BOOTSTRAP_SERVERS = conf().get("kafka_bootstrap_servers")
  37. KAFKA_TOPIC = 'topic.ai.ops.wx'
  38. KAFKA_GROUP_ID = 'ai-ops-wx'
  39. # 用于存储后台任务的全局变量
  40. background_tasks = set()
  41. async def kafka_consumer():
  42. while True:
  43. # 这里模拟 Kafka 消费者的逻辑
  44. # print("Kafka consumer is running...")
  45. await asyncio.sleep(1)
  46. async def background_worker(redis_service:RedisService,kafka_service:KafkaService,gewe_service:GeWeService):
  47. lock_name = "background_wxchat_thread_lock"
  48. lock_identifier = str(time.time()) # 使用时间戳作为唯一标识
  49. while True:
  50. # 尝试获取分布式锁
  51. if await redis_service.acquire_lock(lock_name, timeout=10):
  52. try:
  53. logger.info("分布式锁已成功获取")
  54. # 启动任务
  55. print('启动任务')
  56. # 启动后台任务
  57. await startup_sync_data_task_async(redis_service, kafka_service, gewe_service) # 确保传递了正确的参数
  58. print('启动任务完成')
  59. # 保持锁的续期
  60. while True:
  61. await asyncio.sleep(30) # 每30秒检查一次锁的状态
  62. if not await redis_service.renew_lock(lock_name, lock_identifier, timeout=60):
  63. break # 如果无法续期锁,退出循环
  64. finally:
  65. # 释放锁
  66. await redis_service.release_lock(lock_name, lock_identifier)
  67. break # 任务完成后退出循环
  68. else:
  69. # 如果获取锁失败,等待一段时间后重试
  70. logger.info("获取分布式锁失败,等待10秒后重试...")
  71. await asyncio.sleep(10)
  72. async def startup_sync_data_task_async(redis_service: RedisService, kafka_service: KafkaService, gewe_service: GeWeService):
  73. try:
  74. login_keys = []
  75. async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): # 使用 async for 遍历异步生成器
  76. login_keys.append(key)
  77. for k in login_keys:
  78. r = await redis_service.get_hash(k)
  79. app_id = r.get("appId")
  80. token_id = r.get("tokenId")
  81. wxid = r.get("wxid")
  82. status = r.get('status')
  83. if status == '0':
  84. continue
  85. # 同步联系人列表
  86. ret, msg, contacts_list = await gewe_service.fetch_contacts_list_async(token_id, app_id)
  87. if ret != 200:
  88. logger.warning(f"同步联系人列表失败: {ret}-{msg}")
  89. continue
  90. friend_wxids = [c for c in contacts_list['friends'] if c not in ['fmessage', 'medianote', 'weixin', 'weixingongzhong']] # 可以调整截取范围
  91. data = await gewe_service.save_contacts_brief_to_cache_async(token_id, app_id, wxid, friend_wxids)
  92. chatrooms = contacts_list['chatrooms']
  93. # 同步群列表
  94. logger.info(f'{wxid} 的群数量 {len(chatrooms)}')
  95. logger.info(f'{wxid} 同步群列表')
  96. await gewe_service.save_groups_info_to_cache_async(token_id, app_id, wxid, chatrooms)
  97. logger.info(f'{wxid} 同步群成员')
  98. # 同步群成员
  99. await gewe_service.save_groups_members_to_cache_async(token_id, app_id, wxid, chatrooms)
  100. logger.info(f'{wxid} 好友信息推送到kafka')
  101. # 联系人推送到kafka
  102. #k_message = wx_all_contacts_message(wxid, data)
  103. k_message = wx_all_contacts_key_message(wxid)
  104. await kafka_service.send_message_async(k_message)
  105. # 全量群信息推送到kafka
  106. #all_groups_info_menbers=await gewe_service.get_groups_info_members_from_cache_async(wxid)
  107. #k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers)
  108. k_message=wx_groups_info_members_key_message(wxid)
  109. await kafka_service.send_message_async(k_message)
  110. except Exception as e:
  111. logger.error(f"任务执行过程中发生异常: {e}")
  112. @asynccontextmanager
  113. async def lifespan(app: FastAPI):
  114. #app.state.redis_helper = RedisHelper(host='192.168.2.121',password='telpo#1234', port=8090, db=3)
  115. # 初始化 RedisHelper
  116. redis_service = RedisService()
  117. redis_host=conf().get("redis_host")
  118. redis_port=conf().get("redis_port")
  119. redis_password=conf().get("redis_password")
  120. redis_db=conf().get("redis_db")
  121. await redis_service.init(host=redis_host,port=redis_port, password=redis_password, db=redis_db)
  122. app.state.redis_service = redis_service
  123. # 初始化 KafkaService
  124. kafka_service= KafkaService(KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC, KAFKA_TOPIC,KAFKA_GROUP_ID)
  125. await kafka_service.start()
  126. app.state.kafka_service = kafka_service
  127. # redis_service_instance=app.state.redis_service
  128. # 初始化 GeWeChatCom
  129. app.state.gewe_service = await GeWeService.get_instance(redis_service,"http://api.geweapi.com/gewe")
  130. gewe_service=app.state.gewe_service
  131. # # 初始化 GeWeChatCom
  132. #app.state.gwechat_service = GeWeService(app)
  133. # 初始化业务服务
  134. biz_service = BizService(app)
  135. app.state.biz_service = biz_service
  136. biz_service.setup_handlers()
  137. # 在应用程序启动时启动 Kafka 消费者任务
  138. # try:
  139. # yield # 应用程序运行期间
  140. # finally:
  141. # # 在应用程序关闭时取消所有后台任务
  142. # await kafka_service.stop()
  143. #task = asyncio.create_task(kafka_consumer())
  144. redis_config = {
  145. 'host': conf().get("redis_host"),
  146. 'port': conf().get("redis_port"),
  147. 'password': conf().get("redis_password"),
  148. 'db': conf().get("redis_db"),
  149. }
  150. kafka_config = {
  151. 'bootstrap_servers': KAFKA_BOOTSTRAP_SERVERS,
  152. 'topic': KAFKA_TOPIC,
  153. 'group_id': KAFKA_GROUP_ID,
  154. }
  155. gewe_config = {
  156. 'api_url': "http://api.geweapi.com/gewe",
  157. }
  158. # Use Celery task
  159. # worker_task = background_worker_task.delay(redis_config, kafka_config, gewe_config)
  160. # background_tasks.add(worker_task)
  161. environment = os.environ.get('environment', 'default')
  162. if environment != 'default':
  163. task=asyncio.create_task(background_worker(redis_service,kafka_service,gewe_service))
  164. background_tasks.add(task)
  165. try:
  166. yield # 应用程序运行期间
  167. finally:
  168. # # 在应用程序关闭时取消所有后台任务
  169. # task.cancel()
  170. # try:
  171. # await task
  172. # except asyncio.CancelledError:
  173. # pass
  174. background_tasks.clear()
  175. # 关闭 KafkaService
  176. print('应用关闭')
  177. await kafka_service.stop()
  178. app = FastAPI(lifespan=lifespan,max_request_size=100 * 1024 * 1024)
  179. # 配置日志:输出到文件,文件最大 10MB,保留 5 个备份文件
  180. # log_handler = RotatingFileHandler(
  181. # "app.log", # 日志文件名
  182. # maxBytes=10 * 1024 * 1024, # 文件大小限制:10MB
  183. # backupCount=5, # 保留 5 个备份文件
  184. # encoding="utf-8" # 日志文件的编码
  185. # )
  186. # # 设置日志格式
  187. # log_handler.setFormatter(
  188. # logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  189. # )
  190. # 获取根日志记录器,并设置日志级别为 INFO
  191. # logging.basicConfig(
  192. # level=logging.INFO, # 设置日志记录级别
  193. # handlers=[log_handler] # 配置文件日志处理器
  194. # )
  195. app.add_middleware(BaseHTTPMiddleware, dispatch=http_context)
  196. app.add_middleware(GZipMiddleware, minimum_size=1000)
  197. app.include_router(config_router)
  198. app.include_router(contacts_router)
  199. app.include_router(groups_router)
  200. app.include_router(sns_router)
  201. app.include_router(agent_router)
  202. app.include_router(messages_router)
  203. app.include_router(label_router)
  204. @app.get("/")
  205. async def root():
  206. logger.info("Root route is called")
  207. return {"message": "Kafka consumer is running in the background"}
  208. class AddRequest(BaseModel):
  209. x: int
  210. y: int
  211. @app.post("/add")
  212. async def add_numbers(request: AddRequest):
  213. task = add_task.delay(request.x, request.y)
  214. return {"task_id": task.id}
  215. @app.get("/task/{task_id}")
  216. async def get_task_status(task_id: str):
  217. task_result = AsyncResult(task_id)
  218. return {
  219. "task_id": task_id,
  220. "task_status": task_result.status,
  221. "task_result": task_result.result
  222. }