浏览代码

标签

1257
H Vs 3 周前
父节点
当前提交
7273f6e825
共有 3 个文件被更改,包括 69 次插入283 次删除
  1. +67
    -0
      app/endpoints/label_endpoint.py
  2. +2
    -0
      app/main.py
  3. +0
    -283
      tasks copy.py

+ 67
- 0
app/endpoints/label_endpoint.py 查看文件

@@ -0,0 +1,67 @@
from fastapi import APIRouter,Request,HTTPException
from pydantic import BaseModel
from fastapi import APIRouter, Depends
from pydantic import BaseModel, ValidationError
from model.models import AgentConfig,validate_wxid,auth_required_time
import threading
import asyncio
import time,random
from typing import Dict, Tuple, Any


label_router = APIRouter(prefix="/api/label")

class LabelAddRequest(BaseModel):
wxid: str
labelName: str

class LabelDelRequest(BaseModel):
wxid: str
labelIds: list

class LabelListRequest(BaseModel):
wxid: str

class LabelModifyRequest(BaseModel):
wxid: str
labelIds: list
wxIds: list

@label_router.post("/add", response_model=None)
async def add_label(request: Request, body: LabelAddRequest):
wxid = body.wxid
label_name = body.labelName

return { "labelName":label_name,"labelId":random.randint(1,100)}


@label_router.post("/delete", response_model=None)
async def delete_label(request: Request, body: LabelDelRequest, ):
wxid = body.wxid
label_ids= body.labelIds


return {"message":"操作成功"}

@label_router.post("/list", response_model=None)
async def list_label(request: Request, body: LabelListRequest, ):
wxid = body.wxid

return {
"labelList":[{
"labelName": "朋友",
"labelId": 1
},{
"labelName": "邻居",
"labelId": 2
}]
}

@label_router.post("/modifymembers", response_model=None)
async def modifymembers_label(request: Request, body: LabelModifyRequest, ):
wxid = body.wxid
label_ids= body.labelIds
wxids= body.wxIds
return {"message":"操作成功"}


+ 2
- 0
app/main.py 查看文件

@@ -21,6 +21,7 @@ from app.endpoints.groups_endpoint import groups_router
from app.endpoints.sns_endpoint import sns_router
from app.endpoints.agent_endpoint import agent_router
from app.endpoints.pipeline_endpoint import messages_router
from app.endpoints.label_endpoint import label_router
from tasks import background_worker_task


@@ -239,6 +240,7 @@ app.include_router(groups_router)
app.include_router(sns_router)
app.include_router(agent_router)
app.include_router(messages_router)
app.include_router(label_router)

@app.get("/")
async def root():


+ 0
- 283
tasks copy.py 查看文件

@@ -1,283 +0,0 @@
from celery_app import celery_app
from fastapi import Request,FastAPI
import time,datetime
from celery import Celery
import celery.schedules
from redbeat import RedBeatSchedulerEntry
from datetime import timedelta

from services.redis_service import RedisService
from services.kafka_service import KafkaService
from services.gewe_service import GeWeService
from common.log import logger
import asyncio,random


@celery_app.task(name='tasks.add_task', bind=True, acks_late=True)
def add_task(self, x, y):
time.sleep(5) # 模拟长时间计算
logger.info('add')
return x + y


@celery_app.task(name='tasks.mul_task', bind=True, acks_late=True)
def mul_task(self, x, y):
time.sleep(5) # 模拟长时间计算
return x * y


# @celery.task(name='app.tasks.sync_contacts', bind=True, acks_late=True)
# async def sync_contacts_task(self,app):
# login_keys = list(await app.state.redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'))
# return login_keys
# # for k in login_keys:
# # print(k)

@celery_app.task(name='tasks.sync_contacts', bind=True, acks_late=True)
async def sync_contacts_task(self, redis_service):
# Use the redis_service passed as an argument
login_keys = list(await redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'))
return login_keys


@celery_app.task(name='tasks.background_worker_task', bind=True, acks_late=True)
def background_worker_task(self, redis_config, kafka_config, gewe_config):
async def task():
redis_service = RedisService()
await redis_service.init(**redis_config)
login_keys = []
async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
login_keys.append(key)
print(login_keys)
asyncio.run(task())

# @celery.task(name='tasks.background_worker_task', bind=True, acks_late=True)
# async def background_worker_task(self, redis_config, kafka_config, gewe_config):
# # Initialize services inside the task
# redis_service = RedisService()
# await redis_service.init(**redis_config)

# login_keys = []
# async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): # 使用 async for 遍历异步生成器
# login_keys.append(key)

# print(login_keys)

# kafka_service = KafkaService(**kafka_config)
# await kafka_service.start()

# gewe_service = await GeWeService.get_instance(None, gewe_config['api_url'])

# # Task logic
# lock_name = "background_wxchat_thread_lock"
# lock_identifier = str(time.time())
# while True:
# if await redis_service.acquire_lock(lock_name, timeout=10):
# try:
# logger.info("分布式锁已成功获取")
# # Perform task logic
# finally:
# await redis_service.release_lock(lock_name, lock_identifier)
# break
# else:
# logger.info("获取分布式锁失败,等待10秒后重试...")
# await asyncio.sleep(10)



# @celery_app.task(name='tasks.scheduled_task', bind=True, acks_late=True)
# def scheduled_task(self):
# print("定时任务执行成功!~~~~~~~~~~~~~~~~~")
# return "Hello from Celery Beat + RedBeat!"


# @celery_app.task(name='tasks.scheduled_task_sync_wx', bind=True, acks_late=True)
# def scheduled_task_sync_wx(self,redis_service,kafka_service,gewe_service):
# print("scheduled_task_sync_wx 定时任务执行成功!")
# return "Hello from Celery Beat + RedBeat!"

# @celery_app.task(name='tasks.scheduled_task_sync_wx_info_1', bind=True, acks_late=True)
# def scheduled_task_sync_wx_info_1(self,redis_config, kafka_config, gewe_config):
# '''
# 定时获取微信号资料
# '''
# loop = asyncio.new_event_loop()
# asyncio.set_event_loop(loop)
# async def task():
# try:
# redis_service = RedisService()
# await redis_service.init(**redis_config)
# # gewe_service = await GeWeService.get_instance(None, gewe_config['api_url'])
# login_keys = []
# async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
# login_keys.append(key)

# print(login_keys)
# # for k in login_keys:
# # r = await redis_service.get_hash(k)
# # app_id = r.get("appId")
# # token_id = r.get("tokenId")
# # wxid = r.get("wxid")
# # status = r.get('status')
# # if status == '0':
# # continue
# # ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id)
# # if ret != 200:
# # logger.warning(f"同步微信号 {wxid} 资料失败: {ret}-{msg}")
# # continue
# # nickname=profile.get("nickName")
# # head_img_url=profile.get("smallHeadImgUrl")
# # r.update({"nickName":nickname,"headImgUrl":head_img_url,"modify_at":int(time.time())})
# # cleaned_login_info = {k: (v if v is not None else '') for k, v in r.items()}
# # await redis_service.set_hash(k, cleaned_login_info)
# # logger.info(f"同步微信号 {wxid} 资料 成功")
# # redis_service.update_hash_field(k,"nickName",nickname)
# # redis_service.update_hash_field(k,"headImgUrl",head_img_url)
# # redis_service.update_hash_field(k,"modify_at",int(time.time()))
# except Exception as e:
# logger.error(f"任务执行过程中发生异常: {e}")

# print("scheduled_task_sync_wx_info 定时任务执行成功!")
# return "Hello from Celery Beat + RedBeat!"
# loop.run_until_complete(task())
# loop.close()



@celery_app.task(name='tasks.scheduled_task_sync_wx_info', bind=True, acks_late=True)
def scheduled_task_sync_wx_info(self, redis_config, kafka_config, gewe_config):
'''
定时获取微信号资料
'''
async def task():
try:
redis_service = RedisService()
await redis_service.init(**redis_config)
gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url'])
login_keys = []
async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
login_keys.append(key)

print(login_keys)
for k in login_keys:
r = await redis_service.get_hash(k)
app_id = r.get("appId")
token_id = r.get("tokenId")
wxid = r.get("wxid")
status = r.get('status')
if status == '0':
logger.warning(f"微信号 {wxid} 已经离线: {ret}-{msg}")
continue
ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id)
if ret != 200:
logger.warning(f"同步微信号 {wxid} 资料失败: {ret}-{msg}")
continue
nickname=profile.get("nickName")
head_img_url=profile.get("smallHeadImgUrl")
# print(nickname)

nickname=profile.get("nickName")
head_img_url=profile.get("smallHeadImgUrl")
r.update({"nickName":nickname,"headImgUrl":head_img_url,"modify_at":int(time.time())})
cleaned_login_info = {k: (v if v is not None else '') for k, v in r.items()}
await redis_service.set_hash(k, cleaned_login_info)
logger.info(f"定时同步微信号{wxid}-昵称{nickname} 资料成功")

except Exception as e:
logger.error(f"任务执行过程中发生异常: {e}")

loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

loop.run_until_complete(task()) # 在现有事件循环中运行任务



REDIS_KEY_PATTERN = "friend_add_limit:{date}"
REDIS_LAST_RUN_KEY = "last_run_time:add_friends_task"

@celery_app.task(name='tasks.add_friends_task', bind=True, acks_late=True)
def add_friends_task(self,redis_config):
"""
限制每天最多 15 个,每 2 小时最多 8 个
"""
async def task():
redis_service = RedisService()
await redis_service.init(**redis_config)
today_str = datetime.datetime.now().strftime("%Y%m%d")
redis_key = REDIS_KEY_PATTERN.format(date=today_str)

# 获取当前总添加数量
total_added = await redis_service.get_hash_field(redis_key, "total") or 0
last_2h_added =await redis_service.get_hash_field(redis_key, "last_2h") or 0

total_added = int(total_added)
last_2h_added = int(last_2h_added)

logger.info(f"当前添加好友总数: {total_added}, 过去2小时添加: {last_2h_added}")

# 判断是否超过限制
if total_added >= 15:
logger.warning("今日好友添加已达上限!")
return

if last_2h_added >= 8:
logger.warning("过去2小时添加已达上限!")
return

# 计算本次要添加的好友数量 (控制每天 5-15 个)
max_add = min(15 - total_added, 8 - last_2h_added)
if max_add <= 0:
return

num_to_add = min(max_add, 1) # 每次最多加 1 个
logger.info(f"本次添加 {num_to_add} 位好友")

# TODO: 调用好友添加逻辑 (接口 or 业务逻辑)
# success = add_friends(num_to_add)

success = num_to_add # 假设成功添加 num_to_add 个

# 更新 Redis 计数
if success > 0:
await redis_service.increment_hash_field(redis_key, "total", success)
await redis_service.increment_hash_field(redis_key, "last_2h", success)

# 设置 Redis 过期时间 (每日记录存 1 天, 2 小时记录存 2 小时)
await redis_service.expire(redis_key, 86400) # 24小时
await redis_service.expire_field(redis_key, "last_2h", 7200) # 2小时

logger.info(f"成功添加 {success} 位好友, 今日总数 {total_added + success}")

# 生成一个新的随机时间(5-15 分钟之间)
# next_interval = random.randint(10, 20)

# # 计算新的执行时间
# next_run_time = datetime.datetime.now() + timedelta(seconds=next_interval)

# # 重新注册 RedBeat 任务,确保下次执行时间不同
# redbeat_entry = RedBeatSchedulerEntry(
# name="redbeat:add_friends_task",
# task="tasks.add_friends_task",
# schedule=celery.schedules.schedule(timedelta(seconds=next_interval)),
# args=[redis_config],
# app=celery_app
# )

# # 设置任务的下次执行时间
# redbeat_entry.last_run_at = next_run_time
# redbeat_entry.save()

# logger.info(f"下次任务将在 {next_run_time} 执行(间隔 {next_interval} 秒)")

loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

loop.run_until_complete(task()) # 在现有事件循环中运行任务

正在加载...
取消
保存