From 7273f6e8251aade6cb8481d21488c2ad5cd0b977 Mon Sep 17 00:00:00 2001 From: H Vs Date: Thu, 3 Apr 2025 16:55:32 +0800 Subject: [PATCH] =?UTF-8?q?=E6=A0=87=E7=AD=BE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/endpoints/label_endpoint.py | 67 ++++++++ app/main.py | 2 + tasks copy.py | 283 -------------------------------- 3 files changed, 69 insertions(+), 283 deletions(-) create mode 100644 app/endpoints/label_endpoint.py delete mode 100644 tasks copy.py diff --git a/app/endpoints/label_endpoint.py b/app/endpoints/label_endpoint.py new file mode 100644 index 0000000..c7124b7 --- /dev/null +++ b/app/endpoints/label_endpoint.py @@ -0,0 +1,67 @@ +from fastapi import APIRouter,Request,HTTPException +from pydantic import BaseModel +from fastapi import APIRouter, Depends +from pydantic import BaseModel, ValidationError +from model.models import AgentConfig,validate_wxid,auth_required_time +import threading +import asyncio +import time,random +from typing import Dict, Tuple, Any + + +label_router = APIRouter(prefix="/api/label") + +class LabelAddRequest(BaseModel): + wxid: str + labelName: str + +class LabelDelRequest(BaseModel): + wxid: str + labelIds: list + +class LabelListRequest(BaseModel): + wxid: str + +class LabelModifyRequest(BaseModel): + wxid: str + labelIds: list + wxIds: list + + +@label_router.post("/add", response_model=None) +async def add_label(request: Request, body: LabelAddRequest): + wxid = body.wxid + label_name = body.labelName + + return { "labelName":label_name,"labelId":random.randint(1,100)} + + +@label_router.post("/delete", response_model=None) +async def delete_label(request: Request, body: LabelDelRequest, ): + wxid = body.wxid + label_ids= body.labelIds + + + return {"message":"操作成功"} + +@label_router.post("/list", response_model=None) +async def list_label(request: Request, body: LabelListRequest, ): + wxid = body.wxid + + return { + "labelList":[{ + "labelName": "朋友", + "labelId": 1 + },{ + "labelName": "邻居", + "labelId": 2 + }] + } + +@label_router.post("/modifymembers", response_model=None) +async def modifymembers_label(request: Request, body: LabelModifyRequest, ): + wxid = body.wxid + label_ids= body.labelIds + wxids= body.wxIds + return {"message":"操作成功"} + diff --git a/app/main.py b/app/main.py index a817918..6102429 100644 --- a/app/main.py +++ b/app/main.py @@ -21,6 +21,7 @@ from app.endpoints.groups_endpoint import groups_router from app.endpoints.sns_endpoint import sns_router from app.endpoints.agent_endpoint import agent_router from app.endpoints.pipeline_endpoint import messages_router +from app.endpoints.label_endpoint import label_router from tasks import background_worker_task @@ -239,6 +240,7 @@ app.include_router(groups_router) app.include_router(sns_router) app.include_router(agent_router) app.include_router(messages_router) +app.include_router(label_router) @app.get("/") async def root(): diff --git a/tasks copy.py b/tasks copy.py deleted file mode 100644 index 3e0e959..0000000 --- a/tasks copy.py +++ /dev/null @@ -1,283 +0,0 @@ -from celery_app import celery_app -from fastapi import Request,FastAPI -import time,datetime -from celery import Celery -import celery.schedules -from redbeat import RedBeatSchedulerEntry -from datetime import timedelta - -from services.redis_service import RedisService -from services.kafka_service import KafkaService -from services.gewe_service import GeWeService -from common.log import logger -import asyncio,random - - -@celery_app.task(name='tasks.add_task', bind=True, acks_late=True) -def add_task(self, x, y): - time.sleep(5) # 模拟长时间计算 - logger.info('add') - return x + y - - -@celery_app.task(name='tasks.mul_task', bind=True, acks_late=True) -def mul_task(self, x, y): - time.sleep(5) # 模拟长时间计算 - return x * y - - -# @celery.task(name='app.tasks.sync_contacts', bind=True, acks_late=True) -# async def sync_contacts_task(self,app): -# login_keys = list(await app.state.redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*')) -# return login_keys -# # for k in login_keys: -# # print(k) - -@celery_app.task(name='tasks.sync_contacts', bind=True, acks_late=True) -async def sync_contacts_task(self, redis_service): - # Use the redis_service passed as an argument - login_keys = list(await redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*')) - return login_keys - - -@celery_app.task(name='tasks.background_worker_task', bind=True, acks_late=True) -def background_worker_task(self, redis_config, kafka_config, gewe_config): - async def task(): - redis_service = RedisService() - await redis_service.init(**redis_config) - login_keys = [] - async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): - login_keys.append(key) - print(login_keys) - - asyncio.run(task()) - -# @celery.task(name='tasks.background_worker_task', bind=True, acks_late=True) -# async def background_worker_task(self, redis_config, kafka_config, gewe_config): -# # Initialize services inside the task -# redis_service = RedisService() -# await redis_service.init(**redis_config) - -# login_keys = [] -# async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): # 使用 async for 遍历异步生成器 -# login_keys.append(key) - -# print(login_keys) - - # kafka_service = KafkaService(**kafka_config) - # await kafka_service.start() - - # gewe_service = await GeWeService.get_instance(None, gewe_config['api_url']) - - # # Task logic - # lock_name = "background_wxchat_thread_lock" - # lock_identifier = str(time.time()) - # while True: - # if await redis_service.acquire_lock(lock_name, timeout=10): - # try: - # logger.info("分布式锁已成功获取") - # # Perform task logic - # finally: - # await redis_service.release_lock(lock_name, lock_identifier) - # break - # else: - # logger.info("获取分布式锁失败,等待10秒后重试...") - # await asyncio.sleep(10) - - - -# @celery_app.task(name='tasks.scheduled_task', bind=True, acks_late=True) -# def scheduled_task(self): -# print("定时任务执行成功!~~~~~~~~~~~~~~~~~") -# return "Hello from Celery Beat + RedBeat!" - - -# @celery_app.task(name='tasks.scheduled_task_sync_wx', bind=True, acks_late=True) -# def scheduled_task_sync_wx(self,redis_service,kafka_service,gewe_service): -# print("scheduled_task_sync_wx 定时任务执行成功!") -# return "Hello from Celery Beat + RedBeat!" - -# @celery_app.task(name='tasks.scheduled_task_sync_wx_info_1', bind=True, acks_late=True) -# def scheduled_task_sync_wx_info_1(self,redis_config, kafka_config, gewe_config): -# ''' -# 定时获取微信号资料 -# ''' -# loop = asyncio.new_event_loop() -# asyncio.set_event_loop(loop) -# async def task(): -# try: -# redis_service = RedisService() -# await redis_service.init(**redis_config) -# # gewe_service = await GeWeService.get_instance(None, gewe_config['api_url']) -# login_keys = [] -# async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): -# login_keys.append(key) - -# print(login_keys) -# # for k in login_keys: -# # r = await redis_service.get_hash(k) -# # app_id = r.get("appId") -# # token_id = r.get("tokenId") -# # wxid = r.get("wxid") -# # status = r.get('status') -# # if status == '0': -# # continue -# # ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id) -# # if ret != 200: -# # logger.warning(f"同步微信号 {wxid} 资料失败: {ret}-{msg}") -# # continue -# # nickname=profile.get("nickName") -# # head_img_url=profile.get("smallHeadImgUrl") -# # r.update({"nickName":nickname,"headImgUrl":head_img_url,"modify_at":int(time.time())}) -# # cleaned_login_info = {k: (v if v is not None else '') for k, v in r.items()} -# # await redis_service.set_hash(k, cleaned_login_info) -# # logger.info(f"同步微信号 {wxid} 资料 成功") -# # redis_service.update_hash_field(k,"nickName",nickname) -# # redis_service.update_hash_field(k,"headImgUrl",head_img_url) -# # redis_service.update_hash_field(k,"modify_at",int(time.time())) -# except Exception as e: -# logger.error(f"任务执行过程中发生异常: {e}") - -# print("scheduled_task_sync_wx_info 定时任务执行成功!") -# return "Hello from Celery Beat + RedBeat!" - -# loop.run_until_complete(task()) -# loop.close() - - - -@celery_app.task(name='tasks.scheduled_task_sync_wx_info', bind=True, acks_late=True) -def scheduled_task_sync_wx_info(self, redis_config, kafka_config, gewe_config): - ''' - 定时获取微信号资料 - ''' - async def task(): - try: - redis_service = RedisService() - await redis_service.init(**redis_config) - gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url']) - login_keys = [] - async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): - login_keys.append(key) - - print(login_keys) - for k in login_keys: - r = await redis_service.get_hash(k) - app_id = r.get("appId") - token_id = r.get("tokenId") - wxid = r.get("wxid") - status = r.get('status') - if status == '0': - logger.warning(f"微信号 {wxid} 已经离线: {ret}-{msg}") - continue - ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id) - if ret != 200: - logger.warning(f"同步微信号 {wxid} 资料失败: {ret}-{msg}") - continue - nickname=profile.get("nickName") - head_img_url=profile.get("smallHeadImgUrl") - # print(nickname) - - nickname=profile.get("nickName") - head_img_url=profile.get("smallHeadImgUrl") - r.update({"nickName":nickname,"headImgUrl":head_img_url,"modify_at":int(time.time())}) - cleaned_login_info = {k: (v if v is not None else '') for k, v in r.items()} - await redis_service.set_hash(k, cleaned_login_info) - logger.info(f"定时同步微信号{wxid}-昵称{nickname} 资料成功") - - except Exception as e: - logger.error(f"任务执行过程中发生异常: {e}") - - loop = asyncio.get_event_loop() - if loop.is_closed(): - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - - loop.run_until_complete(task()) # 在现有事件循环中运行任务 - - - -REDIS_KEY_PATTERN = "friend_add_limit:{date}" -REDIS_LAST_RUN_KEY = "last_run_time:add_friends_task" - -@celery_app.task(name='tasks.add_friends_task', bind=True, acks_late=True) -def add_friends_task(self,redis_config): - """ - 限制每天最多 15 个,每 2 小时最多 8 个 - """ - async def task(): - redis_service = RedisService() - await redis_service.init(**redis_config) - today_str = datetime.datetime.now().strftime("%Y%m%d") - redis_key = REDIS_KEY_PATTERN.format(date=today_str) - - # 获取当前总添加数量 - total_added = await redis_service.get_hash_field(redis_key, "total") or 0 - last_2h_added =await redis_service.get_hash_field(redis_key, "last_2h") or 0 - - total_added = int(total_added) - last_2h_added = int(last_2h_added) - - logger.info(f"当前添加好友总数: {total_added}, 过去2小时添加: {last_2h_added}") - - # 判断是否超过限制 - if total_added >= 15: - logger.warning("今日好友添加已达上限!") - return - - if last_2h_added >= 8: - logger.warning("过去2小时添加已达上限!") - return - - # 计算本次要添加的好友数量 (控制每天 5-15 个) - max_add = min(15 - total_added, 8 - last_2h_added) - if max_add <= 0: - return - - num_to_add = min(max_add, 1) # 每次最多加 1 个 - logger.info(f"本次添加 {num_to_add} 位好友") - - # TODO: 调用好友添加逻辑 (接口 or 业务逻辑) - # success = add_friends(num_to_add) - - success = num_to_add # 假设成功添加 num_to_add 个 - - # 更新 Redis 计数 - if success > 0: - await redis_service.increment_hash_field(redis_key, "total", success) - await redis_service.increment_hash_field(redis_key, "last_2h", success) - - # 设置 Redis 过期时间 (每日记录存 1 天, 2 小时记录存 2 小时) - await redis_service.expire(redis_key, 86400) # 24小时 - await redis_service.expire_field(redis_key, "last_2h", 7200) # 2小时 - - logger.info(f"成功添加 {success} 位好友, 今日总数 {total_added + success}") - - # 生成一个新的随机时间(5-15 分钟之间) - # next_interval = random.randint(10, 20) - - # # 计算新的执行时间 - # next_run_time = datetime.datetime.now() + timedelta(seconds=next_interval) - - # # 重新注册 RedBeat 任务,确保下次执行时间不同 - # redbeat_entry = RedBeatSchedulerEntry( - # name="redbeat:add_friends_task", - # task="tasks.add_friends_task", - # schedule=celery.schedules.schedule(timedelta(seconds=next_interval)), - # args=[redis_config], - # app=celery_app - # ) - - # # 设置任务的下次执行时间 - # redbeat_entry.last_run_at = next_run_time - # redbeat_entry.save() - - # logger.info(f"下次任务将在 {next_run_time} 执行(间隔 {next_interval} 秒)") - - loop = asyncio.get_event_loop() - - if loop.is_closed(): - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - - loop.run_until_complete(task()) # 在现有事件循环中运行任务 \ No newline at end of file