From 76da43521187c370aaee77333bd05ecd6044e46e Mon Sep 17 00:00:00 2001 From: H Vs Date: Thu, 3 Apr 2025 15:12:12 +0800 Subject: [PATCH] =?UTF-8?q?=E7=BE=A4=E5=9B=BE=E7=89=87=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/endpoints/pipeline_endpoint.py | 22 +++ common/utils.py | 20 +- services/gewe_service.py | 21 +++ tasks copy.py | 283 +++++++++++++++++++++++++++++ 4 files changed, 343 insertions(+), 3 deletions(-) create mode 100644 tasks copy.py diff --git a/app/endpoints/pipeline_endpoint.py b/app/endpoints/pipeline_endpoint.py index 03ee9a5..fcf8c8f 100644 --- a/app/endpoints/pipeline_endpoint.py +++ b/app/endpoints/pipeline_endpoint.py @@ -710,6 +710,28 @@ async def handle_image_async(request: Request,token_id,app_id, wxid,msg_data,fro async def handle_image_group_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid): logger.info('群聊图片消息') + msg_content=msg_data["Content"]["string"] + callback_to_user=msg_data["FromUserName"]["string"] + + aeskey = re.search(r'aeskey="([^"]+)"', msg_content).group(1) + cdnthumburl = re.search(r'cdnthumburl="([^"]+)"', msg_content).group(1) + md5 = re.search(r'md5="([^"]+)"', msg_content).group(1) + cdnthumblength = re.search(r'cdnthumblength="([^"]+)"', msg_content).group(1) + cdnthumbheight = re.search(r'cdnthumbheight="([^"]+)"', msg_content).group(1) + cdnthumbwidth = re.search(r'cdnthumbwidth="([^"]+)"', msg_content).group(1) + length = re.search(r'length="([^"]+)"', msg_content).group(1) + + img_xml=f'\n\n\t\n\t\n\t\n' + wx_img_url= await request.app.state.gewe_service.download_image_msg_async(token_id,app_id,img_xml) + oss_url=wx_img_url_to_oss_url(wx_img_url) + + reply_content = re.sub(r'<\?xml.*', f'{oss_url}', msg_content, flags=re.DOTALL) + #reply_content=f'{wxid}:\n{oss_url}' + input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}] + input_message=dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message,False) + await request.app.state.kafka_service.send_message_async(input_message) + logger.info("发送对话 %s",input_message) + async def handle_voice_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid): ''' diff --git a/common/utils.py b/common/utils.py index 6f52fb8..19fada7 100644 --- a/common/utils.py +++ b/common/utils.py @@ -206,6 +206,20 @@ def wx_voice(text: str): print(f"发生错误:{e}") return None, None # 发生错误时返回 None +def wx_img_url_to_oss_url(img_url: str)->str: + try: + # OSS 配置(建议将凭证存储在安全的地方) + oss_access_key_id="LTAI5tRTG6pLhTpKACJYoPR5" + oss_access_key_secret="E7dMzeeMxq4VQvLg7Tq7uKf3XWpYfN" + oss_endpoint="http://oss-cn-shanghai.aliyuncs.com" + oss_bucket_name="cow-agent" + oss_prefix="cow" + file_url = upload_oss(oss_access_key_id, oss_access_key_secret, oss_endpoint, oss_bucket_name, img_url, oss_prefix) + return file_url + except Exception as e: + print(f"发生错误:{e}") + return None # 发生错误时返回 None + def upload_oss( access_key_id, access_key_secret, @@ -242,10 +256,10 @@ def upload_oss( expiration=oss2.models.LifecycleExpiration(days=expiration_days)) # 设置Bucket的生命周期 - lifecycle = oss2.models.BucketLifecycle([rule]) - bucket.put_bucket_lifecycle(lifecycle) + # lifecycle = oss2.models.BucketLifecycle([rule]) + # bucket.put_bucket_lifecycle(lifecycle) - print(f"已设置生命周期规则:文件将在{expiration_days}天后自动删除") + # print(f"已设置生命周期规则:文件将在{expiration_days}天后自动删除") ### 2. 判断文件来源并上传到OSS ### if file_source.startswith('http://') or file_source.startswith('https://'): diff --git a/services/gewe_service.py b/services/gewe_service.py index 1818d85..f3af170 100644 --- a/services/gewe_service.py +++ b/services/gewe_service.py @@ -467,12 +467,33 @@ class GeWeService: async with session.post(f"{self.base_url}/v2/api/message/downloadImage", json=data, headers=headers) as response: if response.ok: data = await response.json() + response_object = await response.json() if data['ret'] == 200: return data['data']['fileUrl'] else: return False else: return False + + async def download_cdn_msg_async(self, token_id:str,aeskey: str, file_id: str, type: str,total_size:str,suffix:str): + + api_url = f"{self.base_url}/v2/api/message/downloadCdn" + data = { + "aeskey": aeskey, + "fileId":file_id, + "type": type, #下载的文件类型 1:高清图片 2:常规图片 3:缩略图 4:视频 5:文件 + "totalSize":total_size, #文件大小 + "suffix": suffix #下载类型为文件时,传文件的后缀(例:doc) + } + headers = { + 'X-GEWE-TOKEN': token_id, + 'Content-Type': 'application/json' + } + + async with aiohttp.ClientSession() as session: + async with session.post(api_url, headers=headers, json=data) as response: + response_object = await response.json() + return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None) async def download_audio_file(self, fileUrl: str, file_name: str): local_filename = f'./silk/{file_name}.silk' diff --git a/tasks copy.py b/tasks copy.py new file mode 100644 index 0000000..3e0e959 --- /dev/null +++ b/tasks copy.py @@ -0,0 +1,283 @@ +from celery_app import celery_app +from fastapi import Request,FastAPI +import time,datetime +from celery import Celery +import celery.schedules +from redbeat import RedBeatSchedulerEntry +from datetime import timedelta + +from services.redis_service import RedisService +from services.kafka_service import KafkaService +from services.gewe_service import GeWeService +from common.log import logger +import asyncio,random + + +@celery_app.task(name='tasks.add_task', bind=True, acks_late=True) +def add_task(self, x, y): + time.sleep(5) # 模拟长时间计算 + logger.info('add') + return x + y + + +@celery_app.task(name='tasks.mul_task', bind=True, acks_late=True) +def mul_task(self, x, y): + time.sleep(5) # 模拟长时间计算 + return x * y + + +# @celery.task(name='app.tasks.sync_contacts', bind=True, acks_late=True) +# async def sync_contacts_task(self,app): +# login_keys = list(await app.state.redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*')) +# return login_keys +# # for k in login_keys: +# # print(k) + +@celery_app.task(name='tasks.sync_contacts', bind=True, acks_late=True) +async def sync_contacts_task(self, redis_service): + # Use the redis_service passed as an argument + login_keys = list(await redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*')) + return login_keys + + +@celery_app.task(name='tasks.background_worker_task', bind=True, acks_late=True) +def background_worker_task(self, redis_config, kafka_config, gewe_config): + async def task(): + redis_service = RedisService() + await redis_service.init(**redis_config) + login_keys = [] + async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): + login_keys.append(key) + print(login_keys) + + asyncio.run(task()) + +# @celery.task(name='tasks.background_worker_task', bind=True, acks_late=True) +# async def background_worker_task(self, redis_config, kafka_config, gewe_config): +# # Initialize services inside the task +# redis_service = RedisService() +# await redis_service.init(**redis_config) + +# login_keys = [] +# async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): # 使用 async for 遍历异步生成器 +# login_keys.append(key) + +# print(login_keys) + + # kafka_service = KafkaService(**kafka_config) + # await kafka_service.start() + + # gewe_service = await GeWeService.get_instance(None, gewe_config['api_url']) + + # # Task logic + # lock_name = "background_wxchat_thread_lock" + # lock_identifier = str(time.time()) + # while True: + # if await redis_service.acquire_lock(lock_name, timeout=10): + # try: + # logger.info("分布式锁已成功获取") + # # Perform task logic + # finally: + # await redis_service.release_lock(lock_name, lock_identifier) + # break + # else: + # logger.info("获取分布式锁失败,等待10秒后重试...") + # await asyncio.sleep(10) + + + +# @celery_app.task(name='tasks.scheduled_task', bind=True, acks_late=True) +# def scheduled_task(self): +# print("定时任务执行成功!~~~~~~~~~~~~~~~~~") +# return "Hello from Celery Beat + RedBeat!" + + +# @celery_app.task(name='tasks.scheduled_task_sync_wx', bind=True, acks_late=True) +# def scheduled_task_sync_wx(self,redis_service,kafka_service,gewe_service): +# print("scheduled_task_sync_wx 定时任务执行成功!") +# return "Hello from Celery Beat + RedBeat!" + +# @celery_app.task(name='tasks.scheduled_task_sync_wx_info_1', bind=True, acks_late=True) +# def scheduled_task_sync_wx_info_1(self,redis_config, kafka_config, gewe_config): +# ''' +# 定时获取微信号资料 +# ''' +# loop = asyncio.new_event_loop() +# asyncio.set_event_loop(loop) +# async def task(): +# try: +# redis_service = RedisService() +# await redis_service.init(**redis_config) +# # gewe_service = await GeWeService.get_instance(None, gewe_config['api_url']) +# login_keys = [] +# async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): +# login_keys.append(key) + +# print(login_keys) +# # for k in login_keys: +# # r = await redis_service.get_hash(k) +# # app_id = r.get("appId") +# # token_id = r.get("tokenId") +# # wxid = r.get("wxid") +# # status = r.get('status') +# # if status == '0': +# # continue +# # ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id) +# # if ret != 200: +# # logger.warning(f"同步微信号 {wxid} 资料失败: {ret}-{msg}") +# # continue +# # nickname=profile.get("nickName") +# # head_img_url=profile.get("smallHeadImgUrl") +# # r.update({"nickName":nickname,"headImgUrl":head_img_url,"modify_at":int(time.time())}) +# # cleaned_login_info = {k: (v if v is not None else '') for k, v in r.items()} +# # await redis_service.set_hash(k, cleaned_login_info) +# # logger.info(f"同步微信号 {wxid} 资料 成功") +# # redis_service.update_hash_field(k,"nickName",nickname) +# # redis_service.update_hash_field(k,"headImgUrl",head_img_url) +# # redis_service.update_hash_field(k,"modify_at",int(time.time())) +# except Exception as e: +# logger.error(f"任务执行过程中发生异常: {e}") + +# print("scheduled_task_sync_wx_info 定时任务执行成功!") +# return "Hello from Celery Beat + RedBeat!" + +# loop.run_until_complete(task()) +# loop.close() + + + +@celery_app.task(name='tasks.scheduled_task_sync_wx_info', bind=True, acks_late=True) +def scheduled_task_sync_wx_info(self, redis_config, kafka_config, gewe_config): + ''' + 定时获取微信号资料 + ''' + async def task(): + try: + redis_service = RedisService() + await redis_service.init(**redis_config) + gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url']) + login_keys = [] + async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): + login_keys.append(key) + + print(login_keys) + for k in login_keys: + r = await redis_service.get_hash(k) + app_id = r.get("appId") + token_id = r.get("tokenId") + wxid = r.get("wxid") + status = r.get('status') + if status == '0': + logger.warning(f"微信号 {wxid} 已经离线: {ret}-{msg}") + continue + ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id) + if ret != 200: + logger.warning(f"同步微信号 {wxid} 资料失败: {ret}-{msg}") + continue + nickname=profile.get("nickName") + head_img_url=profile.get("smallHeadImgUrl") + # print(nickname) + + nickname=profile.get("nickName") + head_img_url=profile.get("smallHeadImgUrl") + r.update({"nickName":nickname,"headImgUrl":head_img_url,"modify_at":int(time.time())}) + cleaned_login_info = {k: (v if v is not None else '') for k, v in r.items()} + await redis_service.set_hash(k, cleaned_login_info) + logger.info(f"定时同步微信号{wxid}-昵称{nickname} 资料成功") + + except Exception as e: + logger.error(f"任务执行过程中发生异常: {e}") + + loop = asyncio.get_event_loop() + if loop.is_closed(): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + loop.run_until_complete(task()) # 在现有事件循环中运行任务 + + + +REDIS_KEY_PATTERN = "friend_add_limit:{date}" +REDIS_LAST_RUN_KEY = "last_run_time:add_friends_task" + +@celery_app.task(name='tasks.add_friends_task', bind=True, acks_late=True) +def add_friends_task(self,redis_config): + """ + 限制每天最多 15 个,每 2 小时最多 8 个 + """ + async def task(): + redis_service = RedisService() + await redis_service.init(**redis_config) + today_str = datetime.datetime.now().strftime("%Y%m%d") + redis_key = REDIS_KEY_PATTERN.format(date=today_str) + + # 获取当前总添加数量 + total_added = await redis_service.get_hash_field(redis_key, "total") or 0 + last_2h_added =await redis_service.get_hash_field(redis_key, "last_2h") or 0 + + total_added = int(total_added) + last_2h_added = int(last_2h_added) + + logger.info(f"当前添加好友总数: {total_added}, 过去2小时添加: {last_2h_added}") + + # 判断是否超过限制 + if total_added >= 15: + logger.warning("今日好友添加已达上限!") + return + + if last_2h_added >= 8: + logger.warning("过去2小时添加已达上限!") + return + + # 计算本次要添加的好友数量 (控制每天 5-15 个) + max_add = min(15 - total_added, 8 - last_2h_added) + if max_add <= 0: + return + + num_to_add = min(max_add, 1) # 每次最多加 1 个 + logger.info(f"本次添加 {num_to_add} 位好友") + + # TODO: 调用好友添加逻辑 (接口 or 业务逻辑) + # success = add_friends(num_to_add) + + success = num_to_add # 假设成功添加 num_to_add 个 + + # 更新 Redis 计数 + if success > 0: + await redis_service.increment_hash_field(redis_key, "total", success) + await redis_service.increment_hash_field(redis_key, "last_2h", success) + + # 设置 Redis 过期时间 (每日记录存 1 天, 2 小时记录存 2 小时) + await redis_service.expire(redis_key, 86400) # 24小时 + await redis_service.expire_field(redis_key, "last_2h", 7200) # 2小时 + + logger.info(f"成功添加 {success} 位好友, 今日总数 {total_added + success}") + + # 生成一个新的随机时间(5-15 分钟之间) + # next_interval = random.randint(10, 20) + + # # 计算新的执行时间 + # next_run_time = datetime.datetime.now() + timedelta(seconds=next_interval) + + # # 重新注册 RedBeat 任务,确保下次执行时间不同 + # redbeat_entry = RedBeatSchedulerEntry( + # name="redbeat:add_friends_task", + # task="tasks.add_friends_task", + # schedule=celery.schedules.schedule(timedelta(seconds=next_interval)), + # args=[redis_config], + # app=celery_app + # ) + + # # 设置任务的下次执行时间 + # redbeat_entry.last_run_at = next_run_time + # redbeat_entry.save() + + # logger.info(f"下次任务将在 {next_run_time} 执行(间隔 {next_interval} 秒)") + + loop = asyncio.get_event_loop() + + if loop.is_closed(): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + loop.run_until_complete(task()) # 在现有事件循环中运行任务 \ No newline at end of file