@@ -148,7 +148,7 @@ async def lifespan(app: FastAPI): | |||
# redis_service_instance=app.state.redis_service | |||
# 初始化 GeWeChatCom | |||
app.state.gewe_service = await GeWeService.get_instance(app,"http://api.geweapi.com/gewe") | |||
app.state.gewe_service = await GeWeService.get_instance(redis_service,"http://api.geweapi.com/gewe") | |||
gewe_service=app.state.gewe_service | |||
# # 初始化 GeWeChatCom | |||
#app.state.gwechat_service = GeWeService(app) | |||
@@ -182,8 +182,9 @@ async def lifespan(app: FastAPI): | |||
} | |||
# Use Celery task | |||
worker_task = background_worker_task.delay(redis_config, kafka_config, gewe_config) | |||
background_tasks.add(worker_task) | |||
# worker_task = background_worker_task.delay(redis_config, kafka_config, gewe_config) | |||
# background_tasks.add(worker_task) | |||
environment = os.environ.get('environment', 'default') | |||
if environment != 'default': | |||
task=asyncio.create_task(background_worker(redis_service,kafka_service,gewe_service)) | |||
@@ -192,11 +193,11 @@ async def lifespan(app: FastAPI): | |||
yield # 应用程序运行期间 | |||
finally: | |||
# # 在应用程序关闭时取消所有后台任务 | |||
task.cancel() | |||
try: | |||
await task | |||
except asyncio.CancelledError: | |||
pass | |||
# task.cancel() | |||
# try: | |||
# await task | |||
# except asyncio.CancelledError: | |||
# pass | |||
background_tasks.clear() | |||
# 关闭 KafkaService | |||
print('应用关闭') | |||
@@ -1,66 +1,34 @@ | |||
# from celery import Celery | |||
# # 创建 Celery 应用 | |||
# celery_app = Celery( | |||
# 'ai_ops_wechat_app', | |||
# broker='redis://:telpo%231234@192.168.2.121:8090/3', | |||
# backend='redis://:telpo%231234@192.168.2.121:8090/3', | |||
# ) | |||
# # 配置 Celery | |||
# celery_app.conf.update( | |||
# task_serializer='json', | |||
# accept_content=['json'], | |||
# result_serializer='json', | |||
# timezone='Asia/Shanghai', | |||
# enable_utc=True, | |||
# ) | |||
# #celery_app.autodiscover_tasks(['app.tasks']) | |||
# from celery import Celery | |||
# def make_celery(app): | |||
# celery = Celery( | |||
# app.import_name, | |||
# backend=app.config['CELERY_RESULT_BACKEND'], | |||
# broker=app.config['CELERY_BROKER_URL'] | |||
# ) | |||
# celery.conf.update(app.config) | |||
# # 自动发现任务 | |||
# celery.autodiscover_tasks(['app.tasks']) | |||
# return celery | |||
# # 初始化 Flask | |||
# app = Flask(__name__) | |||
# app.config.update( | |||
# CELERY_BROKER_URL='redis://:telpo%231234@192.168.2.121:8090/3', | |||
# CELERY_RESULT_BACKEND='redis://:telpo%231234@192.168.2.121:8090/3' | |||
# ) | |||
# celery = make_celery(app) | |||
from celery import Celery | |||
import celery.schedules | |||
from redbeat import RedBeatSchedulerEntry | |||
from datetime import timedelta | |||
from services.redis_service import RedisService | |||
from services.kafka_service import KafkaService | |||
from services.biz_service import BizService | |||
from config import load_config,conf | |||
from urllib.parse import quote | |||
import asyncio,os | |||
load_config() | |||
KAFKA_BOOTSTRAP_SERVERS = conf().get("kafka_bootstrap_servers") | |||
KAFKA_TOPIC = 'topic.ai.ops.wx' | |||
KAFKA_GROUP_ID = 'ai-ops-wx' | |||
redis_host=conf().get("redis_host") | |||
redis_port=conf().get("redis_port") | |||
redis_password=conf().get("redis_password") | |||
redis_db=conf().get("redis_db") | |||
encoded_password = quote(redis_password) | |||
# 配置 Celery | |||
celery_app = Celery( | |||
"worker", | |||
broker=f"redis://:telpo%231234@{redis_host}:{redis_port}/{redis_db}", | |||
backend=f"redis://:telpo%231234@{redis_host}:{redis_port}/{redis_db}", | |||
broker=f"redis://:{encoded_password}@{redis_host}:{redis_port}/{redis_db}", | |||
backend=f"redis://:{encoded_password}@{redis_host}:{redis_port}/{redis_db}", | |||
include=['tasks'] | |||
) | |||
@@ -68,28 +36,86 @@ celery_app = Celery( | |||
celery_app.conf.update( | |||
timezone="Asia/Shanghai", # 设定时区 | |||
beat_scheduler="redbeat.RedBeatScheduler", # 使用 RedBeat 作为调度器 | |||
redbeat_redis_url=f"redis://:telpo%231234@{redis_host}:{redis_port}/{redis_db}" # redbeat 存储任务调度信息的 Redis | |||
redbeat_redis_url=f"redis://:{encoded_password}@{redis_host}:{redis_port}/{redis_db}" # redbeat 存储任务调度信息的 Redis | |||
, | |||
redbeat_lock_timeout=60, # 避免多个 Beat 实例冲突 | |||
beat_max_loop_interval=5 # 让 Celery Beat 每 5 秒检查一次任务 | |||
) | |||
task_name = "tasks.scheduled_task" | |||
# 任务执行间隔(每 10 秒执行一次) | |||
schedule = celery.schedules.schedule(timedelta(seconds=3)) | |||
# RedBeat 任务唯一 ID | |||
redbeat_entry = RedBeatSchedulerEntry( | |||
name="redbeat:scheduled_task", # 任务 ID | |||
task=task_name, # 任务名称 | |||
schedule=schedule, # 任务调度时间 | |||
args=[], | |||
app=celery_app | |||
# task_name = "tasks.scheduled_task" | |||
# # 任务执行间隔(每 10 秒执行一次) | |||
# schedule = celery.schedules.schedule(timedelta(seconds=3)) | |||
# # RedBeat 任务唯一 ID | |||
# redbeat_entry = RedBeatSchedulerEntry( | |||
# name="redbeat:scheduled_task", # 任务 ID | |||
# task=task_name, # 任务名称 | |||
# schedule=schedule, # 任务调度时间 | |||
# args=[], | |||
# app=celery_app | |||
) | |||
# ) | |||
# # 保存任务到 Redis | |||
# redbeat_entry.save() | |||
# 保存任务到 Redis | |||
redbeat_entry.save() | |||
# 自动发现任务 | |||
#celery.autodiscover_tasks(['tasks']) | |||
# tasks_schedule = [ | |||
# ("redbeat:scheduled_task", "tasks.scheduled_task", 3), | |||
# ("redbeat:scheduled_task_sync_wx", "tasks.scheduled_task_sync_wx", 15), | |||
# ] | |||
# # 创建并保存 RedBeat 任务 | |||
# for task_id, task_name, interval in tasks_schedule: | |||
# redbeat_entry = RedBeatSchedulerEntry( | |||
# name=task_id, | |||
# task=task_name, | |||
# schedule=celery.schedules.schedule(timedelta(seconds=interval)), | |||
# args=[], | |||
# app=celery_app | |||
# ) | |||
# redbeat_entry.save() | |||
# 获取配置文件中的 redis_config、kafka_config、gewe_config | |||
redis_config = { | |||
'host': redis_host, | |||
'port': redis_port, | |||
'password': redis_password, | |||
'db': redis_db, | |||
} | |||
kafka_config = { | |||
'bootstrap_servers': KAFKA_BOOTSTRAP_SERVERS, | |||
'topic': KAFKA_TOPIC, | |||
'group_id': KAFKA_GROUP_ID, | |||
} | |||
gewe_config = { | |||
'api_url': "http://api.geweapi.com/gewe", | |||
} | |||
scheduled_task_sync_wx_info_interval = 10 | |||
environment = os.environ.get('environment', 'default') | |||
if environment != 'default': | |||
scheduled_task_sync_wx_info_interval = 60*11 | |||
# 定义定时任务列表 (任务 ID, 任务名称, 执行间隔秒, 任务参数) | |||
tasks_schedule = [ | |||
#("redbeat:scheduled_task", "tasks.scheduled_task", 3, []), | |||
#("redbeat:scheduled_task_sync_wx", "tasks.scheduled_task_sync_wx", 15, []), | |||
("redbeat:scheduled_task_sync_wx_info", "tasks.scheduled_task_sync_wx_info", scheduled_task_sync_wx_info_interval, [redis_config, kafka_config, gewe_config]), | |||
] | |||
# 注册 RedBeat 任务 | |||
for task_id, task_name, interval, task_args in tasks_schedule: | |||
redbeat_entry = RedBeatSchedulerEntry( | |||
name=task_id, | |||
task=task_name, | |||
schedule=celery.schedules.schedule(timedelta(seconds=interval)), | |||
args=task_args, | |||
app=celery_app | |||
) | |||
redbeat_entry.save() |
@@ -27,10 +27,10 @@ def start_celery_beat(): | |||
if __name__ == "__main__": | |||
# 启动 FastAPI、Celery Worker 和 Celery Beat | |||
fastapi_process = start_fastapi() | |||
#celery_worker_process = start_celery_worker() | |||
#celery_beat_process = start_celery_beat() | |||
celery_worker_process = start_celery_worker() | |||
celery_beat_process = start_celery_beat() | |||
# 等待子进程完成 | |||
fastapi_process.wait() | |||
#celery_worker_process.wait() | |||
#celery_beat_process.wait() | |||
celery_worker_process.wait() | |||
celery_beat_process.wait() |
@@ -12,17 +12,18 @@ from fastapi import FastAPI, Depends | |||
from common.singleton import singleton | |||
from common.log import logger | |||
from model.models import AddGroupContactsHistory | |||
from services.redis_service import RedisService | |||
#@singleton | |||
class GeWeService: | |||
_instance = None | |||
_lock = asyncio.Lock() # 异步锁,确保单例初始化线程安全 | |||
def __init__(self,app:FastAPI, base_url: str): | |||
def __init__(self,redis_service:RedisService, base_url: str): | |||
if GeWeService._instance is not None: | |||
raise RuntimeError("请使用 get_instance() 获取单例!") | |||
self.base_url = base_url | |||
self.redis_service=app.state.redis_service | |||
self.redis_service=redis_service | |||
@classmethod | |||
async def get_instance(cls, app:FastAPI,base_url: str = "http://api.geweapi.com/gewe"): | |||
@@ -583,6 +584,22 @@ class GeWeService: | |||
response_object = await response.json() | |||
return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None) | |||
############################### 个人模块 ############################### | |||
async def get_profile_async(self, token_id, app_id): | |||
api_url = f"{self.base_url}/v2/api/personal/getProfile" | |||
headers = { | |||
'X-GEWE-TOKEN': token_id, | |||
'Content-Type': 'application/json' | |||
} | |||
data = { | |||
"appId": app_id, | |||
} | |||
async with aiohttp.ClientSession() as session: | |||
async with session.post(api_url, headers=headers, json=data) as response: | |||
response_object = await response.json() | |||
return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None) | |||
############################### 朋友圈模块 ################################### | |||
# 在新设备登录后的1-3天内,您将无法使用朋友圈发布、点赞、评论等功能。在此期间,如果尝试进行这些操作,您将收到来自微信团队的提醒。请注意遵守相关规定。 | |||
@@ -82,7 +82,113 @@ def background_worker_task(self, redis_config, kafka_config, gewe_config): | |||
@celery_app.task(name='tasks.scheduled_task', bind=True, acks_late=True) | |||
def scheduled_task(self): | |||
print("🚀 定时任务执行成功!") | |||
return "Hello from Celery Beat + RedBeat!" | |||
# @celery_app.task(name='tasks.scheduled_task', bind=True, acks_late=True) | |||
# def scheduled_task(self): | |||
# print("🚀 定时任务执行成功!~~~~~~~~~~~~~~~~~") | |||
# return "Hello from Celery Beat + RedBeat!" | |||
# @celery_app.task(name='tasks.scheduled_task_sync_wx', bind=True, acks_late=True) | |||
# def scheduled_task_sync_wx(self,redis_service,kafka_service,gewe_service): | |||
# print("🚀 scheduled_task_sync_wx 定时任务执行成功!") | |||
# return "Hello from Celery Beat + RedBeat!" | |||
# @celery_app.task(name='tasks.scheduled_task_sync_wx_info_1', bind=True, acks_late=True) | |||
# def scheduled_task_sync_wx_info_1(self,redis_config, kafka_config, gewe_config): | |||
# ''' | |||
# 定时获取微信号资料 | |||
# ''' | |||
# loop = asyncio.new_event_loop() | |||
# asyncio.set_event_loop(loop) | |||
# async def task(): | |||
# try: | |||
# redis_service = RedisService() | |||
# await redis_service.init(**redis_config) | |||
# # gewe_service = await GeWeService.get_instance(None, gewe_config['api_url']) | |||
# login_keys = [] | |||
# async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): | |||
# login_keys.append(key) | |||
# print(login_keys) | |||
# # for k in login_keys: | |||
# # r = await redis_service.get_hash(k) | |||
# # app_id = r.get("appId") | |||
# # token_id = r.get("tokenId") | |||
# # wxid = r.get("wxid") | |||
# # status = r.get('status') | |||
# # if status == '0': | |||
# # continue | |||
# # ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id) | |||
# # if ret != 200: | |||
# # logger.warning(f"同步微信号 {wxid} 资料失败: {ret}-{msg}") | |||
# # continue | |||
# # nickname=profile.get("nickName") | |||
# # head_img_url=profile.get("smallHeadImgUrl") | |||
# # r.update({"nickName":nickname,"headImgUrl":head_img_url,"modify_at":int(time.time())}) | |||
# # cleaned_login_info = {k: (v if v is not None else '') for k, v in r.items()} | |||
# # await redis_service.set_hash(k, cleaned_login_info) | |||
# # logger.info(f"同步微信号 {wxid} 资料 成功") | |||
# # redis_service.update_hash_field(k,"nickName",nickname) | |||
# # redis_service.update_hash_field(k,"headImgUrl",head_img_url) | |||
# # redis_service.update_hash_field(k,"modify_at",int(time.time())) | |||
# except Exception as e: | |||
# logger.error(f"任务执行过程中发生异常: {e}") | |||
# print("scheduled_task_sync_wx_info 定时任务执行成功!") | |||
# return "Hello from Celery Beat + RedBeat!" | |||
# loop.run_until_complete(task()) | |||
# loop.close() | |||
@celery_app.task(name='tasks.scheduled_task_sync_wx_info', bind=True, acks_late=True) | |||
def scheduled_task_sync_wx_info(self, redis_config, kafka_config, gewe_config): | |||
''' | |||
定时获取微信号资料 | |||
''' | |||
async def task(): | |||
try: | |||
redis_service = RedisService() | |||
await redis_service.init(**redis_config) | |||
gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url']) | |||
login_keys = [] | |||
async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): | |||
login_keys.append(key) | |||
print(login_keys) | |||
for k in login_keys: | |||
r = await redis_service.get_hash(k) | |||
app_id = r.get("appId") | |||
token_id = r.get("tokenId") | |||
wxid = r.get("wxid") | |||
status = r.get('status') | |||
if status == '0': | |||
continue | |||
ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id) | |||
if ret != 200: | |||
logger.warning(f"同步微信号 {wxid} 资料失败: {ret}-{msg}") | |||
continue | |||
nickname=profile.get("nickName") | |||
head_img_url=profile.get("smallHeadImgUrl") | |||
# print(nickname) | |||
nickname=profile.get("nickName") | |||
head_img_url=profile.get("smallHeadImgUrl") | |||
r.update({"nickName":nickname,"headImgUrl":head_img_url,"modify_at":int(time.time())}) | |||
cleaned_login_info = {k: (v if v is not None else '') for k, v in r.items()} | |||
await redis_service.set_hash(k, cleaned_login_info) | |||
logger.info(f"定时同步微信号{wxid}-昵称{nickname} 资料成功") | |||
except Exception as e: | |||
logger.error(f"任务执行过程中发生异常: {e}") | |||
print("scheduled_task_sync_wx_info 定时任务执行成功!") | |||
return "Hello from Celery Beat + RedBeat!" | |||
loop = asyncio.get_event_loop() | |||
if loop.is_closed(): | |||
loop = asyncio.new_event_loop() | |||
asyncio.set_event_loop(loop) | |||
loop.run_until_complete(task()) # 在现有事件循环中运行任务 |