@@ -27,7 +27,7 @@ class BizService(): | |||
def __init__(self,app:FastAPI): | |||
if not hasattr(self, 'initialized'): | |||
#self.kafka_service =kafka_service # 获取 KafkaService 单例 | |||
self.kafka_service =app.state.kafka_service | |||
self.kafka_service:KafkaService =app.state.kafka_service | |||
self.wxchat:GeWeService=app.state.gewe_service | |||
self.redis_service:RedisService=app.state.redis_service | |||
self.initialized = True | |||
@@ -145,6 +145,8 @@ class BizService(): | |||
self.wxchat.forward_video_length = 0 | |||
self.wxchat.video_duration = 0 | |||
self.wxchat.forward_file_aeskey = '' | |||
for intersection_wxid in intersection_wxids: | |||
for wx_content in wx_content_list: | |||
if wx_content["type"] == "text": | |||
@@ -156,6 +158,8 @@ class BizService(): | |||
elif wx_content["type"] == "file": | |||
await self.send_file_message(token_id, app_id, agent_wxid, [intersection_wxid], wx_content.get("file_url", {}).get("url")) | |||
async def send_text_message_async(self, token_id, app_id, agent_wxid, intersection_wxids, text): | |||
for t in intersection_wxids: | |||
# 发送文本消息 | |||
@@ -251,6 +255,8 @@ class BizService(): | |||
if ext == '.mp4': | |||
await self.send_video_message(token_id, app_id, agent_wxid, intersection_wxids, file_url) | |||
# if ext == '.pdf': | |||
# await self.send_pdf_message_async(token_id, app_id, agent_wxid, intersection_wxids, file_url) | |||
else: | |||
await self.send_other_file_message(token_id, app_id, agent_wxid, intersection_wxids, file_url) | |||
#time.sleep(random.uniform(1.5, 3)) | |||
@@ -288,8 +294,27 @@ class BizService(): | |||
# 等待随机时间 | |||
await asyncio.sleep(random.uniform(1.5, 3)) | |||
async def send_pdf_message_async(self, token_id, app_id, agent_wxid, intersection_wxids, file_url): | |||
print('send_pdf_message_async') | |||
async def send_other_file_message(self, token_id, app_id, agent_wxid, intersection_wxids, file_url): | |||
print('send_otherfile_message') | |||
parsed_url = urlparse(file_url) | |||
filename = os.path.basename(parsed_url.path) | |||
for t in intersection_wxids: | |||
# 发送文件消息 | |||
ret,ret_msg,res = await self.wxchat.post_file_async(token_id, app_id, t, file_url,filename) | |||
if ret==200: | |||
logger.info(f'{agent_wxid} 向 {t} 发送文件【{file_url}】{ret_msg}') | |||
# 构造对话消息并发送到 Kafka | |||
input_wx_content_dialogue_message = [{"type": "file", "file_url": {"url": file_url}}] | |||
input_message = dialogue_message(agent_wxid, t, input_wx_content_dialogue_message) | |||
await self.kafka_service.send_message_async(input_message) | |||
logger.info("发送对话 %s", input_message) | |||
else: | |||
logger.warning((f'{agent_wxid} 向 {t} 发送文件【{file_url}】{ret_msg}')) | |||
# 等待随机时间 | |||
await asyncio.sleep(random.uniform(1.5, 3)) | |||
async def sns_sendtext_forward_handler_async(self,content_data): | |||
wxids=content_data.get('wxids',[]) | |||
@@ -364,6 +364,24 @@ class GeWeService: | |||
response_object = await response.json() | |||
return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None) | |||
# 发送文件消息 | |||
async def post_file_async(self, token_id, app_id, to_wxid, file_url, file_name): | |||
api_url = f"{self.base_url}/v2/api/message/postFile" | |||
headers = { | |||
'X-GEWE-TOKEN': token_id, | |||
'Content-Type': 'application/json' | |||
} | |||
data = { | |||
"appId": app_id, | |||
"toWxid": to_wxid, | |||
"fileUrl": file_url, | |||
"fileName": file_name, | |||
} | |||
async with aiohttp.ClientSession() as session: | |||
async with session.post(api_url, headers=headers, json=data) as response: | |||
response_object = await response.json() | |||
return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None) | |||
async def post_mini_app_aysnc(self, token_id, app_id,to_wxid, mini_app_id,display_name,page_path,cover_img_url,title,user_name): | |||
''' | |||
发送小程序消息 | |||
@@ -516,7 +534,7 @@ class GeWeService: | |||
data = { | |||
"appId": app_id, | |||
"toWxid": to_wxid, | |||
"xml": xml | |||
"xml": f"<?xml version=\"1.0\"?>\n<msg>\n\t<appmsg appid=\"\" sdkver=\"0\">\n\t\t<title>info.json</title>\n\t\t<des />\n\t\t<action />\n\t\t<type>6</type>\n\t\t<showtype>0</showtype>\n\t\t<soundtype>0</soundtype>\n\t\t<mediatagname />\n\t\t<messageext />\n\t\t<messageaction />\n\t\t<content />\n\t\t<contentattr>0</contentattr>\n\t\t<url />\n\t\t<lowurl />\n\t\t<dataurl />\n\t\t<lowdataurl />\n\t\t<appattach>\n\t\t\t<totallen>63</totallen>\n\t\t\t<attachid>@cdn_3057020100044b304902010002043904752002032f7d6d02046bb5bade02046593760c042433653765306131612d646138622d346662322d383239362d3964343665623766323061370204051400050201000405004c53d900_f46be643aa0dc009ae5fb63bbc73335d_1</attachid>\n\t\t\t<emoticonmd5 />\n\t\t\t<fileext>json</fileext>\n\t\t\t<cdnattachurl>3057020100044b304902010002043904752002032f7d6d02046bb5bade02046593760c042433653765306131612d646138622d346662322d383239362d3964343665623766323061370204051400050201000405004c53d900</cdnattachurl>\n\t\t\t<aeskey>f46be643aa0dc009ae5fb63bbc73335d</aeskey>\n\t\t\t<encryver>0</encryver>\n\t\t\t<overwrite_newmsgid>594239960546299206</overwrite_newmsgid>\n\t\t\t<fileuploadtoken>v1_0bgfyCkUmoZYYyvXys0cCiJdd2R/pKPdD2TNi9IY6FOt+Tvlhp3ijUoupZHzyB2Lp7xYgdVFaUGL4iu3Pm9/YACCt20egPGpT+DKe+VymOzD7tJfsS8YW7JObTbN8eVoFEetU5HSRWTgS/48VVsPZMoDF6Gz1XJDLN/dWRxvzrbOzVGGNvmY4lpXb0kRwXkSxwL+dO4=</fileuploadtoken>\n\t\t</appattach>\n\t\t<extinfo />\n\t\t<sourceusername />\n\t\t<sourcedisplayname />\n\t\t<thumburl />\n\t\t<md5>d16070253eee7173e467dd7237d76f60</md5>\n\t\t<statextstr />\n\t</appmsg>\n\t<fromusername>zhangchuan2288</fromusername>\n\t<scene>0</scene>\n\t<appinfo>\n\t\t<version>1</version>\n\t\t<appname></appname>\n\t</appinfo>\n\t<commenturl></commenturl>\n</msg>" | |||
} | |||
async with aiohttp.ClientSession() as session: | |||
async with session.post(api_url, headers=headers, json=data) as response: | |||