From 4239c06a3b685674493b3227f7fe082932826037 Mon Sep 17 00:00:00 2001 From: H Vs Date: Tue, 15 Apr 2025 14:45:03 +0800 Subject: [PATCH] =?UTF-8?q?=E7=82=B9=E5=AF=B9=E5=AF=B9=E6=96=87=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- services/biz_service.py | 29 +++++++++++++++++++++++++++-- services/gewe_service.py | 20 +++++++++++++++++++- 2 files changed, 46 insertions(+), 3 deletions(-) diff --git a/services/biz_service.py b/services/biz_service.py index aca8d03..61c096f 100644 --- a/services/biz_service.py +++ b/services/biz_service.py @@ -27,7 +27,7 @@ class BizService(): def __init__(self,app:FastAPI): if not hasattr(self, 'initialized'): #self.kafka_service =kafka_service # 获取 KafkaService 单例 - self.kafka_service =app.state.kafka_service + self.kafka_service:KafkaService =app.state.kafka_service self.wxchat:GeWeService=app.state.gewe_service self.redis_service:RedisService=app.state.redis_service self.initialized = True @@ -145,6 +145,8 @@ class BizService(): self.wxchat.forward_video_length = 0 self.wxchat.video_duration = 0 + self.wxchat.forward_file_aeskey = '' + for intersection_wxid in intersection_wxids: for wx_content in wx_content_list: if wx_content["type"] == "text": @@ -156,6 +158,8 @@ class BizService(): elif wx_content["type"] == "file": await self.send_file_message(token_id, app_id, agent_wxid, [intersection_wxid], wx_content.get("file_url", {}).get("url")) + + async def send_text_message_async(self, token_id, app_id, agent_wxid, intersection_wxids, text): for t in intersection_wxids: # 发送文本消息 @@ -251,6 +255,8 @@ class BizService(): if ext == '.mp4': await self.send_video_message(token_id, app_id, agent_wxid, intersection_wxids, file_url) + # if ext == '.pdf': + # await self.send_pdf_message_async(token_id, app_id, agent_wxid, intersection_wxids, file_url) else: await self.send_other_file_message(token_id, app_id, agent_wxid, intersection_wxids, file_url) #time.sleep(random.uniform(1.5, 3)) @@ -288,8 +294,27 @@ class BizService(): # 等待随机时间 await asyncio.sleep(random.uniform(1.5, 3)) + async def send_pdf_message_async(self, token_id, app_id, agent_wxid, intersection_wxids, file_url): + print('send_pdf_message_async') + async def send_other_file_message(self, token_id, app_id, agent_wxid, intersection_wxids, file_url): - print('send_otherfile_message') + parsed_url = urlparse(file_url) + filename = os.path.basename(parsed_url.path) + for t in intersection_wxids: + # 发送文件消息 + ret,ret_msg,res = await self.wxchat.post_file_async(token_id, app_id, t, file_url,filename) + if ret==200: + logger.info(f'{agent_wxid} 向 {t} 发送文件【{file_url}】{ret_msg}') + # 构造对话消息并发送到 Kafka + input_wx_content_dialogue_message = [{"type": "file", "file_url": {"url": file_url}}] + input_message = dialogue_message(agent_wxid, t, input_wx_content_dialogue_message) + await self.kafka_service.send_message_async(input_message) + logger.info("发送对话 %s", input_message) + else: + logger.warning((f'{agent_wxid} 向 {t} 发送文件【{file_url}】{ret_msg}')) + + # 等待随机时间 + await asyncio.sleep(random.uniform(1.5, 3)) async def sns_sendtext_forward_handler_async(self,content_data): wxids=content_data.get('wxids',[]) diff --git a/services/gewe_service.py b/services/gewe_service.py index 12a3b99..a8e5f95 100644 --- a/services/gewe_service.py +++ b/services/gewe_service.py @@ -364,6 +364,24 @@ class GeWeService: response_object = await response.json() return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None) + # 发送文件消息 + async def post_file_async(self, token_id, app_id, to_wxid, file_url, file_name): + api_url = f"{self.base_url}/v2/api/message/postFile" + headers = { + 'X-GEWE-TOKEN': token_id, + 'Content-Type': 'application/json' + } + data = { + "appId": app_id, + "toWxid": to_wxid, + "fileUrl": file_url, + "fileName": file_name, + } + async with aiohttp.ClientSession() as session: + async with session.post(api_url, headers=headers, json=data) as response: + response_object = await response.json() + return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None) + async def post_mini_app_aysnc(self, token_id, app_id,to_wxid, mini_app_id,display_name,page_path,cover_img_url,title,user_name): ''' 发送小程序消息 @@ -516,7 +534,7 @@ class GeWeService: data = { "appId": app_id, "toWxid": to_wxid, - "xml": xml + "xml": f"\n\n\t\n\t\tinfo.json\n\t\t\n\t\t\n\t\t6\n\t\t0\n\t\t0\n\t\t\n\t\t\n\t\t\n\t\t\n\t\t0\n\t\t\n\t\t\n\t\t\n\t\t\n\t\t\n\t\t\t63\n\t\t\t@cdn_3057020100044b304902010002043904752002032f7d6d02046bb5bade02046593760c042433653765306131612d646138622d346662322d383239362d3964343665623766323061370204051400050201000405004c53d900_f46be643aa0dc009ae5fb63bbc73335d_1\n\t\t\t\n\t\t\tjson\n\t\t\t3057020100044b304902010002043904752002032f7d6d02046bb5bade02046593760c042433653765306131612d646138622d346662322d383239362d3964343665623766323061370204051400050201000405004c53d900\n\t\t\tf46be643aa0dc009ae5fb63bbc73335d\n\t\t\t0\n\t\t\t594239960546299206\n\t\t\tv1_0bgfyCkUmoZYYyvXys0cCiJdd2R/pKPdD2TNi9IY6FOt+Tvlhp3ijUoupZHzyB2Lp7xYgdVFaUGL4iu3Pm9/YACCt20egPGpT+DKe+VymOzD7tJfsS8YW7JObTbN8eVoFEetU5HSRWTgS/48VVsPZMoDF6Gz1XJDLN/dWRxvzrbOzVGGNvmY4lpXb0kRwXkSxwL+dO4=\n\t\t\n\t\t\n\t\t\n\t\t\n\t\t\n\t\td16070253eee7173e467dd7237d76f60\n\t\t\n\t\n\tzhangchuan2288\n\t0\n\t\n\t\t1\n\t\t\n\t\n\t\n" } async with aiohttp.ClientSession() as session: async with session.post(api_url, headers=headers, json=data) as response: