選択できるのは25トピックまでです。 トピックは、先頭が英数字で、英数字とダッシュ('-')を使用した35文字以内のものにしてください。

main.py 9.2KB

1ヶ月前
1ヶ月前
1ヶ月前
1ヶ月前
1ヶ月前
1ヶ月前
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. from fastapi import FastAPI,Request
  2. from pydantic import BaseModel
  3. from contextlib import asynccontextmanager
  4. from celery import Celery
  5. # from aiokafka import AIOKafkaConsumer
  6. import asyncio
  7. import json
  8. import time
  9. import uvicorn
  10. import logging
  11. from logging.handlers import TimedRotatingFileHandler,RotatingFileHandler
  12. from starlette.middleware.base import BaseHTTPMiddleware
  13. from services.gewe_service import GeWeService # 导入 GeWeChatCom
  14. from common.log import logger
  15. from app.endpoints.config_endpoint import config_router
  16. from app.endpoints.contacts_endpoint import contacts_router
  17. from app.endpoints.groups_endpoint import groups_router
  18. from app.endpoints.sns_endpoint import sns_router
  19. from app.endpoints.agent_endpoint import agent_router
  20. from app.endpoints.pipeline_endpoint import messages_router
  21. from tasks import background_worker_task
  22. from services.redis_service import RedisService
  23. from services.kafka_service import KafkaService
  24. from services.biz_service import BizService
  25. from app.middleware import http_context
  26. from celery.result import AsyncResult
  27. from tasks import add_task,sync_contacts_task
  28. from config import load_config
  29. from config import conf
  30. from common.utils import *
  31. load_config()
  32. # Kafka 配置
  33. #KAFKA_BOOTSTRAP_SERVERS = '192.168.2.121:9092'
  34. KAFKA_BOOTSTRAP_SERVERS = conf().get("kafka_bootstrap_servers")
  35. KAFKA_TOPIC = 'topic.ai.ops.wx'
  36. KAFKA_GROUP_ID = 'ai-ops-wx'
  37. # 用于存储后台任务的全局变量
  38. background_tasks = set()
  39. async def kafka_consumer():
  40. while True:
  41. # 这里模拟 Kafka 消费者的逻辑
  42. # print("Kafka consumer is running...")
  43. await asyncio.sleep(1)
  44. async def background_worker(redis_service:RedisService,kafka_service:KafkaService,gewe_service:GeWeService):
  45. lock_name = "background_wxchat_thread_lock"
  46. lock_identifier = str(time.time()) # 使用时间戳作为唯一标识
  47. while True:
  48. # 尝试获取分布式锁
  49. if await redis_service.acquire_lock(lock_name, timeout=10):
  50. try:
  51. logger.info("分布式锁已成功获取")
  52. # 启动任务
  53. print('启动任务')
  54. # 启动后台任务
  55. await startup_sync_data_task_async(redis_service, kafka_service, gewe_service) # 确保传递了正确的参数
  56. print('启动任务完成')
  57. # 保持锁的续期
  58. while True:
  59. await asyncio.sleep(30) # 每30秒检查一次锁的状态
  60. if not await redis_service.renew_lock(lock_name, lock_identifier, timeout=60):
  61. break # 如果无法续期锁,退出循环
  62. finally:
  63. # 释放锁
  64. await redis_service.release_lock(lock_name, lock_identifier)
  65. break # 任务完成后退出循环
  66. else:
  67. # 如果获取锁失败,等待一段时间后重试
  68. logger.info("获取分布式锁失败,等待10秒后重试...")
  69. await asyncio.sleep(10)
  70. async def startup_sync_data_task_async(redis_service: RedisService, kafka_service: KafkaService, gewe_service: GeWeService):
  71. try:
  72. login_keys = []
  73. async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): # 使用 async for 遍历异步生成器
  74. login_keys.append(key)
  75. for k in login_keys:
  76. r = await redis_service.get_hash(k)
  77. app_id = r.get("appId")
  78. token_id = r.get("tokenId")
  79. wxid = r.get("wxid")
  80. status = r.get('status')
  81. if status == '0':
  82. continue
  83. # 同步联系人列表
  84. ret, msg, contacts_list = await gewe_service.fetch_contacts_list_async(token_id, app_id)
  85. if ret != 200:
  86. logger.warning(f"同步联系人列表失败: {ret}-{msg}")
  87. continue
  88. friend_wxids = [c for c in contacts_list['friends'] if c not in ['fmessage', 'medianote', 'weixin', 'weixingongzhong']] # 可以调整截取范围
  89. data = await gewe_service.save_contacts_brief_to_cache_async(token_id, app_id, wxid, friend_wxids)
  90. chatrooms = contacts_list['chatrooms']
  91. # 同步群列表
  92. logger.info(f'{wxid} 的群数量 {len(chatrooms)}')
  93. logger.info(f'{wxid} 同步群列表')
  94. await gewe_service.save_groups_info_to_cache_async(token_id, app_id, wxid, chatrooms)
  95. logger.info(f'{wxid} 同步群成员')
  96. # 同步群成员
  97. await gewe_service.save_groups_members_to_cache_async(token_id, app_id, wxid, chatrooms)
  98. logger.info(f'{wxid} 好友信息推送到kafka')
  99. # 联系人推送到kafka
  100. k_message = wx_all_contacts_message(wxid, data)
  101. await kafka_service.send_message_async(k_message)
  102. # 全量群信息推送到kafka
  103. all_groups_info_menbers=await gewe_service.get_groups_info_members_from_cache_async(wxid)
  104. k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers)
  105. await kafka_service.send_message_async(k_message)
  106. except Exception as e:
  107. logger.error(f"任务执行过程中发生异常: {e}")
  108. @asynccontextmanager
  109. async def lifespan(app: FastAPI):
  110. #app.state.redis_helper = RedisHelper(host='192.168.2.121',password='telpo#1234', port=8090, db=3)
  111. # 初始化 RedisHelper
  112. redis_service = RedisService()
  113. redis_host=conf().get("redis_host")
  114. redis_port=conf().get("redis_port")
  115. redis_password=conf().get("redis_password")
  116. redis_db=conf().get("redis_db")
  117. await redis_service.init(host=redis_host,port=redis_port, password=redis_password, db=redis_db)
  118. app.state.redis_service = redis_service
  119. # 初始化 KafkaService
  120. kafka_service= KafkaService(KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC, KAFKA_TOPIC,KAFKA_GROUP_ID)
  121. await kafka_service.start()
  122. app.state.kafka_service = kafka_service
  123. # redis_service_instance=app.state.redis_service
  124. # 初始化 GeWeChatCom
  125. app.state.gewe_service = await GeWeService.get_instance(app,"http://api.geweapi.com/gewe")
  126. gewe_service=app.state.gewe_service
  127. # # 初始化 GeWeChatCom
  128. #app.state.gwechat_service = GeWeService(app)
  129. # 初始化业务服务
  130. biz_service = BizService(app)
  131. app.state.biz_service = biz_service
  132. biz_service.setup_handlers()
  133. # 在应用程序启动时启动 Kafka 消费者任务
  134. # try:
  135. # yield # 应用程序运行期间
  136. # finally:
  137. # # 在应用程序关闭时取消所有后台任务
  138. # await kafka_service.stop()
  139. #task = asyncio.create_task(kafka_consumer())
  140. redis_config = {
  141. 'host': conf().get("redis_host"),
  142. 'port': conf().get("redis_port"),
  143. 'password': conf().get("redis_password"),
  144. 'db': conf().get("redis_db"),
  145. }
  146. kafka_config = {
  147. 'bootstrap_servers': KAFKA_BOOTSTRAP_SERVERS,
  148. 'topic': KAFKA_TOPIC,
  149. 'group_id': KAFKA_GROUP_ID,
  150. }
  151. gewe_config = {
  152. 'api_url': "http://api.geweapi.com/gewe",
  153. }
  154. # Use Celery task
  155. worker_task = background_worker_task.delay(redis_config, kafka_config, gewe_config)
  156. background_tasks.add(worker_task)
  157. environment = os.environ.get('environment', 'default')
  158. if environment != 'default':
  159. task=asyncio.create_task(background_worker(redis_service,kafka_service,gewe_service))
  160. background_tasks.add(task)
  161. try:
  162. yield # 应用程序运行期间
  163. finally:
  164. # # 在应用程序关闭时取消所有后台任务
  165. task.cancel()
  166. try:
  167. await task
  168. except asyncio.CancelledError:
  169. pass
  170. background_tasks.clear()
  171. # 关闭 KafkaService
  172. print('应用关闭')
  173. await kafka_service.stop()
  174. app = FastAPI(lifespan=lifespan)
  175. # 配置日志:输出到文件,文件最大 10MB,保留 5 个备份文件
  176. # log_handler = RotatingFileHandler(
  177. # "app.log", # 日志文件名
  178. # maxBytes=10 * 1024 * 1024, # 文件大小限制:10MB
  179. # backupCount=5, # 保留 5 个备份文件
  180. # encoding="utf-8" # 日志文件的编码
  181. # )
  182. # # 设置日志格式
  183. # log_handler.setFormatter(
  184. # logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  185. # )
  186. # 获取根日志记录器,并设置日志级别为 INFO
  187. # logging.basicConfig(
  188. # level=logging.INFO, # 设置日志记录级别
  189. # handlers=[log_handler] # 配置文件日志处理器
  190. # )
  191. app.add_middleware(BaseHTTPMiddleware, dispatch=http_context)
  192. app.include_router(config_router)
  193. app.include_router(contacts_router)
  194. app.include_router(groups_router)
  195. app.include_router(sns_router)
  196. app.include_router(agent_router)
  197. app.include_router(messages_router)
  198. @app.get("/")
  199. async def root():
  200. logger.info("Root route is called")
  201. return {"message": "Kafka consumer is running in the background"}
  202. class AddRequest(BaseModel):
  203. x: int
  204. y: int
  205. @app.post("/add")
  206. async def add_numbers(request: AddRequest):
  207. task = add_task.delay(request.x, request.y)
  208. return {"task_id": task.id}
  209. @app.get("/task/{task_id}")
  210. async def get_task_status(task_id: str):
  211. task_result = AsyncResult(task_id)
  212. return {
  213. "task_id": task_id,
  214. "task_status": task_result.status,
  215. "task_result": task_result.result
  216. }