Browse Source

群图片处理

1257
H Vs 2 weeks ago
parent
commit
76da435211
4 changed files with 343 additions and 3 deletions
  1. +22
    -0
      app/endpoints/pipeline_endpoint.py
  2. +17
    -3
      common/utils.py
  3. +21
    -0
      services/gewe_service.py
  4. +283
    -0
      tasks copy.py

+ 22
- 0
app/endpoints/pipeline_endpoint.py View File

@@ -710,6 +710,28 @@ async def handle_image_async(request: Request,token_id,app_id, wxid,msg_data,fro

async def handle_image_group_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
logger.info('群聊图片消息')
msg_content=msg_data["Content"]["string"]
callback_to_user=msg_data["FromUserName"]["string"]

aeskey = re.search(r'aeskey="([^"]+)"', msg_content).group(1)
cdnthumburl = re.search(r'cdnthumburl="([^"]+)"', msg_content).group(1)
md5 = re.search(r'md5="([^"]+)"', msg_content).group(1)
cdnthumblength = re.search(r'cdnthumblength="([^"]+)"', msg_content).group(1)
cdnthumbheight = re.search(r'cdnthumbheight="([^"]+)"', msg_content).group(1)
cdnthumbwidth = re.search(r'cdnthumbwidth="([^"]+)"', msg_content).group(1)
length = re.search(r'length="([^"]+)"', msg_content).group(1)

img_xml=f'<?xml version=\"1.0\"?>\n<msg>\n\t<img aeskey=\"{aeskey}\" encryver=\"1\" cdnthumbaeskey=\"{aeskey}\" cdnthumburl=\"{cdnthumburl}\" cdnthumblength=\"{cdnthumblength}\" cdnthumbheight=\"{cdnthumbheight}\" cdnthumbwidth=\"{cdnthumbwidth}\" cdnmidheight=\"0\" cdnmidwidth=\"0\" cdnhdheight=\"0\" cdnhdwidth=\"0\" cdnmidimgurl=\"{cdnthumburl}\" length=\"{length}\" md5=\"{md5}\" />\n\t<platform_signature></platform_signature>\n\t<imgdatahash></imgdatahash>\n</msg>'
wx_img_url= await request.app.state.gewe_service.download_image_msg_async(token_id,app_id,img_xml)
oss_url=wx_img_url_to_oss_url(wx_img_url)

reply_content = re.sub(r'<\?xml.*', f'{oss_url}', msg_content, flags=re.DOTALL)
#reply_content=f'{wxid}:\n{oss_url}'
input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
input_message=dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message,False)
await request.app.state.kafka_service.send_message_async(input_message)
logger.info("发送对话 %s",input_message)


async def handle_voice_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
'''


+ 17
- 3
common/utils.py View File

@@ -206,6 +206,20 @@ def wx_voice(text: str):
print(f"发生错误:{e}")
return None, None # 发生错误时返回 None

def wx_img_url_to_oss_url(img_url: str)->str:
try:
# OSS 配置(建议将凭证存储在安全的地方)
oss_access_key_id="LTAI5tRTG6pLhTpKACJYoPR5"
oss_access_key_secret="E7dMzeeMxq4VQvLg7Tq7uKf3XWpYfN"
oss_endpoint="http://oss-cn-shanghai.aliyuncs.com"
oss_bucket_name="cow-agent"
oss_prefix="cow"
file_url = upload_oss(oss_access_key_id, oss_access_key_secret, oss_endpoint, oss_bucket_name, img_url, oss_prefix)
return file_url
except Exception as e:
print(f"发生错误:{e}")
return None # 发生错误时返回 None

def upload_oss(
access_key_id,
access_key_secret,
@@ -242,10 +256,10 @@ def upload_oss(
expiration=oss2.models.LifecycleExpiration(days=expiration_days))

# 设置Bucket的生命周期
lifecycle = oss2.models.BucketLifecycle([rule])
bucket.put_bucket_lifecycle(lifecycle)
# lifecycle = oss2.models.BucketLifecycle([rule])
# bucket.put_bucket_lifecycle(lifecycle)

print(f"已设置生命周期规则:文件将在{expiration_days}天后自动删除")
# print(f"已设置生命周期规则:文件将在{expiration_days}天后自动删除")

### 2. 判断文件来源并上传到OSS ###
if file_source.startswith('http://') or file_source.startswith('https://'):


+ 21
- 0
services/gewe_service.py View File

@@ -467,12 +467,33 @@ class GeWeService:
async with session.post(f"{self.base_url}/v2/api/message/downloadImage", json=data, headers=headers) as response:
if response.ok:
data = await response.json()
response_object = await response.json()
if data['ret'] == 200:
return data['data']['fileUrl']
else:
return False
else:
return False
async def download_cdn_msg_async(self, token_id:str,aeskey: str, file_id: str, type: str,total_size:str,suffix:str):
api_url = f"{self.base_url}/v2/api/message/downloadCdn"
data = {
"aeskey": aeskey,
"fileId":file_id,
"type": type, #下载的文件类型 1:高清图片 2:常规图片 3:缩略图 4:视频 5:文件
"totalSize":total_size, #文件大小
"suffix": suffix #下载类型为文件时,传文件的后缀(例:doc)
}
headers = {
'X-GEWE-TOKEN': token_id,
'Content-Type': 'application/json'
}

async with aiohttp.ClientSession() as session:
async with session.post(api_url, headers=headers, json=data) as response:
response_object = await response.json()
return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None)
async def download_audio_file(self, fileUrl: str, file_name: str):
local_filename = f'./silk/{file_name}.silk'


+ 283
- 0
tasks copy.py View File

@@ -0,0 +1,283 @@
from celery_app import celery_app
from fastapi import Request,FastAPI
import time,datetime
from celery import Celery
import celery.schedules
from redbeat import RedBeatSchedulerEntry
from datetime import timedelta

from services.redis_service import RedisService
from services.kafka_service import KafkaService
from services.gewe_service import GeWeService
from common.log import logger
import asyncio,random


@celery_app.task(name='tasks.add_task', bind=True, acks_late=True)
def add_task(self, x, y):
time.sleep(5) # 模拟长时间计算
logger.info('add')
return x + y


@celery_app.task(name='tasks.mul_task', bind=True, acks_late=True)
def mul_task(self, x, y):
time.sleep(5) # 模拟长时间计算
return x * y


# @celery.task(name='app.tasks.sync_contacts', bind=True, acks_late=True)
# async def sync_contacts_task(self,app):
# login_keys = list(await app.state.redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'))
# return login_keys
# # for k in login_keys:
# # print(k)

@celery_app.task(name='tasks.sync_contacts', bind=True, acks_late=True)
async def sync_contacts_task(self, redis_service):
# Use the redis_service passed as an argument
login_keys = list(await redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'))
return login_keys


@celery_app.task(name='tasks.background_worker_task', bind=True, acks_late=True)
def background_worker_task(self, redis_config, kafka_config, gewe_config):
async def task():
redis_service = RedisService()
await redis_service.init(**redis_config)
login_keys = []
async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
login_keys.append(key)
print(login_keys)
asyncio.run(task())

# @celery.task(name='tasks.background_worker_task', bind=True, acks_late=True)
# async def background_worker_task(self, redis_config, kafka_config, gewe_config):
# # Initialize services inside the task
# redis_service = RedisService()
# await redis_service.init(**redis_config)

# login_keys = []
# async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): # 使用 async for 遍历异步生成器
# login_keys.append(key)

# print(login_keys)

# kafka_service = KafkaService(**kafka_config)
# await kafka_service.start()

# gewe_service = await GeWeService.get_instance(None, gewe_config['api_url'])

# # Task logic
# lock_name = "background_wxchat_thread_lock"
# lock_identifier = str(time.time())
# while True:
# if await redis_service.acquire_lock(lock_name, timeout=10):
# try:
# logger.info("分布式锁已成功获取")
# # Perform task logic
# finally:
# await redis_service.release_lock(lock_name, lock_identifier)
# break
# else:
# logger.info("获取分布式锁失败,等待10秒后重试...")
# await asyncio.sleep(10)



# @celery_app.task(name='tasks.scheduled_task', bind=True, acks_late=True)
# def scheduled_task(self):
# print("定时任务执行成功!~~~~~~~~~~~~~~~~~")
# return "Hello from Celery Beat + RedBeat!"


# @celery_app.task(name='tasks.scheduled_task_sync_wx', bind=True, acks_late=True)
# def scheduled_task_sync_wx(self,redis_service,kafka_service,gewe_service):
# print("scheduled_task_sync_wx 定时任务执行成功!")
# return "Hello from Celery Beat + RedBeat!"

# @celery_app.task(name='tasks.scheduled_task_sync_wx_info_1', bind=True, acks_late=True)
# def scheduled_task_sync_wx_info_1(self,redis_config, kafka_config, gewe_config):
# '''
# 定时获取微信号资料
# '''
# loop = asyncio.new_event_loop()
# asyncio.set_event_loop(loop)
# async def task():
# try:
# redis_service = RedisService()
# await redis_service.init(**redis_config)
# # gewe_service = await GeWeService.get_instance(None, gewe_config['api_url'])
# login_keys = []
# async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
# login_keys.append(key)

# print(login_keys)
# # for k in login_keys:
# # r = await redis_service.get_hash(k)
# # app_id = r.get("appId")
# # token_id = r.get("tokenId")
# # wxid = r.get("wxid")
# # status = r.get('status')
# # if status == '0':
# # continue
# # ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id)
# # if ret != 200:
# # logger.warning(f"同步微信号 {wxid} 资料失败: {ret}-{msg}")
# # continue
# # nickname=profile.get("nickName")
# # head_img_url=profile.get("smallHeadImgUrl")
# # r.update({"nickName":nickname,"headImgUrl":head_img_url,"modify_at":int(time.time())})
# # cleaned_login_info = {k: (v if v is not None else '') for k, v in r.items()}
# # await redis_service.set_hash(k, cleaned_login_info)
# # logger.info(f"同步微信号 {wxid} 资料 成功")
# # redis_service.update_hash_field(k,"nickName",nickname)
# # redis_service.update_hash_field(k,"headImgUrl",head_img_url)
# # redis_service.update_hash_field(k,"modify_at",int(time.time()))
# except Exception as e:
# logger.error(f"任务执行过程中发生异常: {e}")

# print("scheduled_task_sync_wx_info 定时任务执行成功!")
# return "Hello from Celery Beat + RedBeat!"
# loop.run_until_complete(task())
# loop.close()



@celery_app.task(name='tasks.scheduled_task_sync_wx_info', bind=True, acks_late=True)
def scheduled_task_sync_wx_info(self, redis_config, kafka_config, gewe_config):
'''
定时获取微信号资料
'''
async def task():
try:
redis_service = RedisService()
await redis_service.init(**redis_config)
gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url'])
login_keys = []
async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
login_keys.append(key)

print(login_keys)
for k in login_keys:
r = await redis_service.get_hash(k)
app_id = r.get("appId")
token_id = r.get("tokenId")
wxid = r.get("wxid")
status = r.get('status')
if status == '0':
logger.warning(f"微信号 {wxid} 已经离线: {ret}-{msg}")
continue
ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id)
if ret != 200:
logger.warning(f"同步微信号 {wxid} 资料失败: {ret}-{msg}")
continue
nickname=profile.get("nickName")
head_img_url=profile.get("smallHeadImgUrl")
# print(nickname)

nickname=profile.get("nickName")
head_img_url=profile.get("smallHeadImgUrl")
r.update({"nickName":nickname,"headImgUrl":head_img_url,"modify_at":int(time.time())})
cleaned_login_info = {k: (v if v is not None else '') for k, v in r.items()}
await redis_service.set_hash(k, cleaned_login_info)
logger.info(f"定时同步微信号{wxid}-昵称{nickname} 资料成功")

except Exception as e:
logger.error(f"任务执行过程中发生异常: {e}")

loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

loop.run_until_complete(task()) # 在现有事件循环中运行任务



REDIS_KEY_PATTERN = "friend_add_limit:{date}"
REDIS_LAST_RUN_KEY = "last_run_time:add_friends_task"

@celery_app.task(name='tasks.add_friends_task', bind=True, acks_late=True)
def add_friends_task(self,redis_config):
"""
限制每天最多 15 个,每 2 小时最多 8 个
"""
async def task():
redis_service = RedisService()
await redis_service.init(**redis_config)
today_str = datetime.datetime.now().strftime("%Y%m%d")
redis_key = REDIS_KEY_PATTERN.format(date=today_str)

# 获取当前总添加数量
total_added = await redis_service.get_hash_field(redis_key, "total") or 0
last_2h_added =await redis_service.get_hash_field(redis_key, "last_2h") or 0

total_added = int(total_added)
last_2h_added = int(last_2h_added)

logger.info(f"当前添加好友总数: {total_added}, 过去2小时添加: {last_2h_added}")

# 判断是否超过限制
if total_added >= 15:
logger.warning("今日好友添加已达上限!")
return

if last_2h_added >= 8:
logger.warning("过去2小时添加已达上限!")
return

# 计算本次要添加的好友数量 (控制每天 5-15 个)
max_add = min(15 - total_added, 8 - last_2h_added)
if max_add <= 0:
return

num_to_add = min(max_add, 1) # 每次最多加 1 个
logger.info(f"本次添加 {num_to_add} 位好友")

# TODO: 调用好友添加逻辑 (接口 or 业务逻辑)
# success = add_friends(num_to_add)

success = num_to_add # 假设成功添加 num_to_add 个

# 更新 Redis 计数
if success > 0:
await redis_service.increment_hash_field(redis_key, "total", success)
await redis_service.increment_hash_field(redis_key, "last_2h", success)

# 设置 Redis 过期时间 (每日记录存 1 天, 2 小时记录存 2 小时)
await redis_service.expire(redis_key, 86400) # 24小时
await redis_service.expire_field(redis_key, "last_2h", 7200) # 2小时

logger.info(f"成功添加 {success} 位好友, 今日总数 {total_added + success}")

# 生成一个新的随机时间(5-15 分钟之间)
# next_interval = random.randint(10, 20)

# # 计算新的执行时间
# next_run_time = datetime.datetime.now() + timedelta(seconds=next_interval)

# # 重新注册 RedBeat 任务,确保下次执行时间不同
# redbeat_entry = RedBeatSchedulerEntry(
# name="redbeat:add_friends_task",
# task="tasks.add_friends_task",
# schedule=celery.schedules.schedule(timedelta(seconds=next_interval)),
# args=[redis_config],
# app=celery_app
# )

# # 设置任务的下次执行时间
# redbeat_entry.last_run_at = next_run_time
# redbeat_entry.save()

# logger.info(f"下次任务将在 {next_run_time} 执行(间隔 {next_interval} 秒)")

loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

loop.run_until_complete(task()) # 在现有事件循环中运行任务

Loading…
Cancel
Save