|
-
-
- import aiohttp
- import asyncio
- import json
- import base64
- import io
- import json
- import os
- import threading
- import time
- import uuid,random
- from fastapi import FastAPI, Depends
- from common.log import logger
- from common.singleton import singleton
- from services.kafka_service import KafkaService
- from fastapi import Request
- from common.utils import *
-
- from services.redis_service import RedisService
- from services.kafka_service import KafkaService
- from services.gewe_service import GeWeService
-
- @singleton
- class BizService():
-
- def __init__(self,app:FastAPI):
- if not hasattr(self, 'initialized'):
- #self.kafka_service =kafka_service # 获取 KafkaService 单例
- self.kafka_service:KafkaService =app.state.kafka_service
- self.wxchat:GeWeService=app.state.gewe_service
- self.redis_service:RedisService=app.state.redis_service
- self.initialized = True
-
- def setup_handlers(self):
- """设置默认的消息处理器"""
- # 这里可以添加业务逻辑
-
- # 注册默认处理器
- self.kafka_service.add_handler(
- self.kafka_service.consumer_topic,
- self.ops_messages_process_handler
- )
-
- async def ops_messages_process_handler(self, message: str):
- """消息处理器"""
- #print(f"BizService handling message: {message}")
- try:
- msg_content = message
- cleaned_content = clean_json_string(msg_content)
- content = json.loads(cleaned_content)
- data = content.get("data", {})
- msg_type_data = data.get("msg_type", None)
- content_data = data.get("content", {})
- # if msg_type_data=="login":
- # await self.login_handler_async(content_data)
- # elif msg_type_data == 'group-sending':
- # print(f'处理消息类型group-sending')
- # await self.group_sending_handler_async(content_data)
- # elif msg_type_data == 'login_wx_captch_code':
- # pass
- # elif msg_type_data == 'sns-sendtext-forward':
- # pass
- # elif msg_type_data == 'sns-sendimages-forward':
- # pass
- # elif msg_type_data == 'sns-sendvideo-forward':
- # pass
- # else:
- # print(f'kakfa 未处理息类型 {msg_type_data}')
- match msg_type_data:
- case 'login':
- await self.login_handler_async(content_data)
- case 'group-sending':
- await self.group_sending_handler_async(content_data)
- case 'sns-sendtext-forward':
- await self.sns_sendtext_forward_handler_async(content_data)
- case 'sns-sendtext':
- await self.sns_sendtext_handler_async(content_data)
- case 'sns-sendimages-forward':
- await self.sns_sendimages_forward_handler_async(content_data)
- case 'sns-sendimages':
- await self.sns_sendimages_handler_async(content_data)
- case 'sns-sendvideo-forward':
- await self.sns_sendvideo_forward_handler_async(content_data)
- case 'sns-sendvideo':
- await self.sns_sendvideo_handler_async(content_data)
- case _:
- logger.warning(f'kakfa 未处理息类型 {msg_type_data}')
-
- except Exception as e:
- print(f"处理消息时发生错误: {e}, 消息内容: {message}")
-
- async def login_handler_async(self, content_data: dict):
-
- tel=content_data.get('tel', '18733438393')
- token_id=content_data.get('token_id', 'c50b7d57-2efa-4a53-8c11-104a06d1e1fa')
- region_id=content_data.get('region_id', '440000')
- agent_token_id=content_data.get('agent_token_id', 'sk-fAOIdANeGXjWKW5mFybnsNZZGYU2lFLmqVY9rVFaFmjiOaWt3tcWMi')
- loginfo= await self.wxchat.get_login_info_from_cache_async(tel,token_id,region_id,agent_token_id)
- print(loginfo)
- status=loginfo.get('status','0')
-
- if status=='1':
- logger.info(f'手机号{tel},wx_token{token_id} 已经微信登录,终止登录流程')
- return
-
- async def group_sending_handler_async(self,content_data: dict):
- agent_tel=content_data.get('agent_tel', '18733438393')
- hash_key = f"__AI_OPS_WX__:LOGININFO:{agent_tel}"
- logininfo = await self.redis_service.get_hash(hash_key)
-
- if not logininfo:
- logger.warning(f"未找到 {agent_tel} 的登录信息")
- return
-
- token_id = logininfo.get('tokenId')
- app_id = logininfo.get('appId')
- agent_wxid = logininfo.get('wxid')
-
- # 获取联系人列表并计算交集
- hash_key = f"__AI_OPS_WX__:CONTACTS_BRIEF:{agent_wxid}"
- cache_friend_wxids_str=await self.redis_service.get_hash_field(hash_key,"data")
- cache_friend_wxids_list=json.loads(cache_friend_wxids_str) if cache_friend_wxids_str else []
- cache_friend_wxids=[f["userName"] for f in cache_friend_wxids_list]
-
-
- # 获取群交集
- hash_key = f"__AI_OPS_WX__:GROUPS_INFO:{agent_wxid}"
- cache_chatrooms = await self.redis_service.get_hash(hash_key)
- cache_chatroom_ids=cache_chatrooms.keys()
-
-
-
- wxid_contact_list_content_data = [c['wxid'] for c in content_data.get("contact_list", [])]
- intersection_friend_wxids = list(set(cache_friend_wxids) & set(wxid_contact_list_content_data))
- intersection_chatroom_ids = list(set(cache_chatroom_ids) & set(wxid_contact_list_content_data))
-
- intersection_wxids=intersection_friend_wxids+intersection_chatroom_ids
-
- # 发送消息
- wx_content_list = content_data.get("wx_content", [])
-
- self.wxchat.forward_video_aeskey = ''
- self.wxchat.forward_video_cdnvideourl = ''
- self.wxchat.forward_video_length = 0
- self.wxchat.video_duration = 0
-
- self.wxchat.forward_image_aeskey = ''
- self.wxchat.forward_image_cdnthumburl = ''
- self.wxchat.forward_image_cdnthumblength=0
- self.wxchat.forward_image_cdnthumbheight=0
- self.wxchat.forward_image_cdnthumbwidth=0
- self.wxchat.forward_image_length=0
- self.wxchat.forward_image_md5=''
-
- self.wxchat.forward_file_aeskey = ''
-
- for intersection_wxid in intersection_wxids:
- for wx_content in wx_content_list:
- if wx_content["type"] == "text":
- await self.send_text_message_async(token_id, app_id, agent_wxid, [intersection_wxid], wx_content["text"])
- elif wx_content["type"] == "image_url":
- await self.send_image_messagae_sync(token_id, app_id, agent_wxid, [intersection_wxid], wx_content.get("image_url", {}).get("url"))
- elif wx_content["type"] == "tts":
- await self.send_tts_message(token_id, app_id, agent_wxid, [intersection_wxid], wx_content["text"])
- elif wx_content["type"] == "file":
- await self.send_file_message(token_id, app_id, agent_wxid, [intersection_wxid], wx_content.get("file_url", {}).get("url"))
-
-
-
- async def send_text_message_async(self, token_id, app_id, agent_wxid, intersection_wxids, text):
- for t in intersection_wxids:
- # 发送文本消息
- ret,ret_msg,res = await self.wxchat.post_text_async(token_id, app_id, t, text)
- logger.info(f'{agent_wxid} 向 {t} 发送文字【{text}】')
-
- # 构造对话消息并发送到 Kafka
- input_wx_content_dialogue_message = [{"type": "text", "text": text}]
- input_message = dialogue_message(agent_wxid, t, input_wx_content_dialogue_message)
- await self.kafka_service.send_message_async(input_message)
- logger.info("发送对话 %s", input_message)
-
- # 等待随机时间
- await asyncio.sleep(random.uniform(1.5, 3))
-
- async def send_image_messagae_sync(self,token_id, app_id, agent_wxid, intersection_wxids, image_url):
- #aeskey, cdnthumburl, cdnthumblength, cdnthumbheight, cdnthumbwidth, length, md5 = "", "", 0, 0, 0, 0, ""
- for t in intersection_wxids:
- if t == intersection_wxids[0]:
- # 发送图片
- ret,ret_msg,res = await self.wxchat.post_image_async(token_id, app_id, t, image_url)
- if ret==200:
- self.wxchat.forward_image_aeskey = res["aesKey"]
- self.wxchat.forward_image_cdnthumburl = res["fileId"]
- self.wxchat.forward_image_cdnthumblength = res["cdnThumbLength"]
- self.wxchat.forward_image_cdnthumbheight = res["height"]
- self.wxchat.forward_image_cdnthumbwidth = res["width"]
- self.wxchat.forward_image_length = res["length"]
- self.wxchat.forward_image_md5 = res["md5"]
- logger.info(f'{agent_wxid} 向 {t} 发送图片【{image_url}】{ret_msg}')
- else:
- logger.warning(f'{agent_wxid} 向 {t} 发送图片【{image_url}】{ret_msg}')
-
- else:
- if self.wxchat.forward_image_aeskey !="":
- # 转发图片
- ret,ret_msg,res = await self.wxchat.forward_image_async(token_id, app_id, t, self.wxchat.forward_image_aeskey, self.wxchat.forward_image_cdnthumburl, self.wxchat.forward_image_cdnthumblengt, self.wxchat.forward_image_cdnthumbheight, self.wxchat.forward_image_cdnthumbwidth, self.wxchat.forward_image_length, self.wxchat.forward_image_md5)
- logger.info(f'{agent_wxid} 向 {t} 转发图片【{image_url}】{ret_msg}')
- else:
- # 发送图片
- ret,ret_msg,res = await self.wxchat.post_image_async(token_id, app_id, t, image_url)
- if ret==200:
- self.wxchat.forward_image_aeskey = res["aesKey"]
- self.wxchat.forward_image_cdnthumburl = res["fileId"]
- self.wxchat.forward_image_cdnthumblength = res["cdnThumbLength"]
- self.wxchat.forward_image_cdnthumbheight = res["height"]
- self.wxchat.forward_image_cdnthumbwidth = res["width"]
- self.wxchat.forward_image_length = res["length"]
- self.wxchat.forward_image_md5 = res["md5"]
- logger.info(f'{agent_wxid} 向 {t} 发送图片【{image_url}】{ret_msg}')
- else:
- logger.warning(f'{agent_wxid} 向 {t} 发送图片【{image_url}】{ret_msg}')
-
- # 构造对话消息并发送到 Kafka
- wx_content_dialogue_message = [{"type": "image_url", "image_url": {"url": image_url}}]
- input_message = dialogue_message(agent_wxid, t, wx_content_dialogue_message)
- await self.kafka_service.send_message_async(input_message)
- logger.info("发送对话 %s", input_message)
-
- # 等待随机时间
- await asyncio.sleep(random.uniform(1.5, 3))
-
-
- # async def send_image_messagae_sync(self,token_id, app_id, agent_wxid, intersection_wxids, image_url):
- # aeskey, cdnthumburl, cdnthumblength, cdnthumbheight, cdnthumbwidth, length, md5 = "", "", 0, 0, 0, 0, ""
- # for t in intersection_wxids:
- # if t == intersection_wxids[0]:
- # # 发送图片
- # ret,ret_msg,res = await self.wxchat.post_image_async(token_id, app_id, t, image_url)
- # if ret==200:
- # aeskey = res["aesKey"]
- # cdnthumburl = res["fileId"]
- # cdnthumblength = res["cdnThumbLength"]
- # cdnthumbheight = res["height"]
- # cdnthumbwidth = res["width"]
- # length = res["length"]
- # md5 = res["md5"]
- # logger.info(f'{agent_wxid} 向 {t} 发送图片【{image_url}】{ret_msg}')
- # else:
- # logger.warning(f'{agent_wxid} 向 {t} 发送图片【{image_url}】{ret_msg}')
-
- # else:
- # if aeskey !="":
- # # 转发图片
- # ret,ret_msg,res = await self.wxchat.forward_image_async(token_id, app_id, t, aeskey, cdnthumburl, cdnthumblength, cdnthumbheight, cdnthumbwidth, length, md5)
- # logger.info(f'{agent_wxid} 向 {t} 转发图片【{image_url}】{ret_msg}')
- # else:
- # # 发送图片
- # ret,ret_msg,res = await self.wxchat.post_image_async(token_id, app_id, t, image_url)
- # if ret==200:
- # aeskey = res["aesKey"]
- # cdnthumburl = res["fileId"]
- # cdnthumblength = res["cdnThumbLength"]
- # cdnthumbheight = res["height"]
- # cdnthumbwidth = res["width"]
- # length = res["length"]
- # md5 = res["md5"]
- # logger.info(f'{agent_wxid} 向 {t} 发送图片【{image_url}】{ret_msg}')
- # else:
- # logger.warning(f'{agent_wxid} 向 {t} 发送图片【{image_url}】{ret_msg}')
-
- # # 构造对话消息并发送到 Kafka
- # wx_content_dialogue_message = [{"type": "image_url", "image_url": {"url": image_url}}]
- # input_message = dialogue_message(agent_wxid, t, wx_content_dialogue_message)
- # await self.kafka_service.send_message_async(input_message)
- # logger.info("发送对话 %s", input_message)
-
- # # 等待随机时间
- # await asyncio.sleep(random.uniform(1.5, 3))
-
- async def send_tts_message(self, token_id, app_id, agent_wxid, intersection_wxids, text):
-
- voice_during,voice_url=wx_voice(text)
- for t in intersection_wxids:
- # 发送送语音消息
- if voice_url:
- ret,ret_msg,res = await self.wxchat.post_voice_async(token_id, app_id, t, voice_url,voice_during)
- if ret==200:
- logger.info(f'{agent_wxid} 向 {t} 发送语音文本【{text}】{ret_msg}')
-
- # 构造对话消息并发送到 Kafka
- input_wx_content_dialogue_message = [{"type": "text", "text": text}]
- input_message = dialogue_message(agent_wxid, t, input_wx_content_dialogue_message)
- await self.kafka_service.send_message_async(input_message)
- logger.info("发送对话 %s", input_message)
- else:
- logger.warning((f'{agent_wxid} 向 {t} 发送语音文本【{text}】{ret_msg}'))
- else:
- logger.warning((f'{agent_wxid} 向 {t} 发送语音文本【{text}】出错'))
-
- # 等待随机时间
- await asyncio.sleep(random.uniform(1.5, 3))
-
- async def send_file_message(self,token_id, app_id, agent_wxid, intersection_wxids, file_url):
- parsed_url = urlparse(file_url)
- path = parsed_url.path
- # 从路径中提取文件名
- filename = path.split('/')[-1]
- # 获取扩展名
- _, ext = os.path.splitext(filename)
-
- if ext == '.mp4':
- await self.send_video_message(token_id, app_id, agent_wxid, intersection_wxids, file_url)
- # if ext == '.pdf':
- # await self.send_pdf_message_async(token_id, app_id, agent_wxid, intersection_wxids, file_url)
- else:
- await self.send_other_file_message(token_id, app_id, agent_wxid, intersection_wxids, file_url)
- #time.sleep(random.uniform(1.5, 3))
-
- async def send_video_message(self, token_id, app_id, agent_wxid, intersection_wxids, file_url):
- for t in intersection_wxids:
- # 发送视频消息
- parsed_url = urlparse(file_url)
- filename = os.path.basename(parsed_url.path)
- tmp_file_path = os.path.join(os.getcwd(),'tmp', filename) # 拼接完整路径
- thumbnail_path=tmp_file_path.replace('.mp4','.jpg')
- if self.wxchat.forward_video_aeskey == '':
- video_thumb_url,video_duration =download_video_and_get_thumbnail(file_url,thumbnail_path)
- print(f'视频缩略图 {video_thumb_url} 时长 {video_duration}')
-
- ret,ret_msg,res = await self.wxchat.post_video_async(token_id, app_id, t, file_url,video_thumb_url,video_duration)
- if ret==200:
- self.wxchat.forward_video_aeskey = res["aesKey"]
- self.wxchat.forward_video_cdnvideourl = res["cdnThumbUrl"]
- self.wxchat.forward_video_length = res["length"]
- self.wxchat.video_duration=int(video_duration)
- else:
- ret,ret_msg,res = await self.wxchat.forward_video_async(token_id, app_id, t, self.wxchat.forward_video_aeskey, self.wxchat.forward_video_cdnvideourl, self.wxchat.forward_video_length,self.wxchat.video_duration)
- print('转发视频')
- if ret==200:
- logger.info(f'{agent_wxid} 向 {t} 发送视频【{file_url}】{ret_msg}')
- # 构造对话消息并发送到 Kafka
- input_wx_content_dialogue_message = [{"type": "file", "file_url": {"url": file_url}}]
- input_message = dialogue_message(agent_wxid, t, input_wx_content_dialogue_message)
- await self.kafka_service.send_message_async(input_message)
- logger.info("发送对话 %s", input_message)
- else:
- logger.warning((f'{agent_wxid} 向 {t} 发送视频【{file_url}】{ret_msg}'))
-
- # 等待随机时间
- await asyncio.sleep(random.uniform(1.5, 3))
-
- async def send_pdf_message_async(self, token_id, app_id, agent_wxid, intersection_wxids, file_url):
- print('send_pdf_message_async')
-
- async def send_other_file_message(self, token_id, app_id, agent_wxid, intersection_wxids, file_url):
- parsed_url = urlparse(file_url)
- filename = os.path.basename(parsed_url.path)
- for t in intersection_wxids:
- # 发送文件消息
- ret,ret_msg,res = await self.wxchat.post_file_async(token_id, app_id, t, file_url,filename)
- if ret==200:
- logger.info(f'{agent_wxid} 向 {t} 发送文件【{file_url}】{ret_msg}')
- # 构造对话消息并发送到 Kafka
- input_wx_content_dialogue_message = [{"type": "file", "file_url": {"url": file_url}}]
- input_message = dialogue_message(agent_wxid, t, input_wx_content_dialogue_message)
- await self.kafka_service.send_message_async(input_message)
- logger.info("发送对话 %s", input_message)
- else:
- logger.warning((f'{agent_wxid} 向 {t} 发送文件【{file_url}】{ret_msg}'))
-
- # 等待随机时间
- await asyncio.sleep(random.uniform(1.5, 3))
-
- async def sns_sendtext_forward_handler_async(self,content_data):
- wxids=content_data.get('wxids',[])
- wx_sns_content_text=content_data.get("wx_sns_content",{}).get("text","")
- if not wx_sns_content_text:
- logger.warning(f'转发文本消息为空不处理 {wx_sns_content_text}')
- return
-
- if not wxids:
- logger.warning(f'wxids 空列表不处理 {wxids}')
- return
- tasks = []
-
- for wxid in wxids:
- loginfo=await self.wx_auth_required_time_async(wxid)
- if not loginfo:
- continue
-
- app_id=loginfo.get('appId','')
- token_id=loginfo.get('tokenId','')
- tasks.append(self.wxchat.send_text_sns_async(token_id, app_id, wx_sns_content_text))
-
- await asyncio.gather(*tasks)
-
- async def sns_sendimages_forward_handler_async(self,content_data):
- wxids=content_data.get('wxids',[])
- wx_sns_content_text=content_data.get("wx_sns_content",{}).get("text","")
- wx_sns_content_imgs=content_data.get("wx_sns_content",{}).get("imageUrls",[])
- if not wx_sns_content_imgs:
- logger.warning(f'转发图片消息为空不处理 {wx_sns_content_imgs}')
- return
-
- tasks = []
- for wxid in wxids:
- loginfo=await self.wx_auth_required_time_async(wxid)
- if not loginfo:
- continue
-
- app_id=loginfo.get('appId','')
- token_id=loginfo.get('tokenId','')
- tasks.append(self.wxchat.upload_send_image_sns_async(token_id, app_id, wx_sns_content_text,wx_sns_content_imgs))
-
- await asyncio.gather(*tasks)
-
- async def sns_sendvideo_forward_handler_async(self,content_data):
- wxids=content_data.get('wxids',[])
- wx_sns_content_text=content_data.get("wx_sns_content",{}).get("text","")
- wx_sns_content_video_url=content_data.get("wx_sns_content",{}).get("videoUrl","")
- wx_sns_content_thumb_url=content_data.get("wx_sns_content",{}).get("videoThumbUrl","")
-
- if not wx_sns_content_thumb_url:
- logger.warning(f'转发视频缩略图消息为空不处理 {wx_sns_content_thumb_url}')
- return
-
- if not wx_sns_content_video_url:
- logger.warning(f'转发视频消息为空不处理 {wx_sns_content_video_url}')
- return
-
- if not wx_sns_content_thumb_url:
- logger.warning(f'转发视频缩略图消息为空不处理 {wx_sns_content_thumb_url}')
- return
-
-
- tasks = []
- for wxid in wxids:
- loginfo=await self.wx_auth_required_time_async(wxid)
- if not loginfo:
- continue
- app_id=loginfo.get('appId','')
- token_id=loginfo.get('tokenId','')
- tasks.append(self.wxchat.upload_send_video_sns_async(token_id, app_id,wx_sns_content_text,wx_sns_content_video_url,wx_sns_content_thumb_url))
-
- await asyncio.gather(*tasks)
-
- async def sns_sendtext_handler_async(self,content_data):
- wxid=content_data.get('wxid','')
- wx_sns_content_text=content_data.get("wx_sns_content",{}).get("text","")
- if not wx_sns_content_text:
- logger.warning(f'转发文本消息为空不处理 {wx_sns_content_text}')
- return
-
- if not wxid:
- logger.warning(f'wxid 空不处理 {wxid}')
- return
-
- loginfo=await self.wx_auth_required_time_async(wxid)
- if not loginfo:
- return
-
- app_id=loginfo.get('appId','')
- token_id=loginfo.get('tokenId','')
- await self.wxchat.send_text_sns_async(token_id, app_id, wx_sns_content_text)
-
- async def sns_sendimages_handler_async(self,content_data):
- wxid=content_data.get('wxid','')
- wx_sns_content_text=content_data.get("wx_sns_content",{}).get("text","")
- wx_sns_content_imgs=content_data.get("wx_sns_content",{}).get("imageUrls",[])
- if not wx_sns_content_imgs:
- logger.warning(f'转发图片消息为空不处理 {wx_sns_content_imgs}')
- return
-
- if not wxid:
- logger.warning(f'wxid 空不处理 {wxid}')
- return
-
- loginfo=await self.wx_auth_required_time_async(wxid)
- if not loginfo:
- return
- app_id=loginfo.get('appId','')
- token_id=loginfo.get('tokenId','')
- await self.wxchat.upload_send_image_sns_async(token_id, app_id, wx_sns_content_text,wx_sns_content_imgs)
-
- async def sns_sendvideo_handler_async(self,content_data):
- wxid=content_data.get('wxid','')
- wx_sns_content_text=content_data.get("wx_sns_content",{}).get("text","")
- wx_sns_content_video_url=content_data.get("wx_sns_content",{}).get("videoUrl","")
- wx_sns_content_thumb_url=content_data.get("wx_sns_content",{}).get("videoThumbUrl","")
-
- if not wx_sns_content_thumb_url:
- logger.warning(f'转发视频缩略图消息为空不处理 {wx_sns_content_thumb_url}')
- return
-
- if not wx_sns_content_video_url:
- logger.warning(f'转发视频消息为空不处理 {wx_sns_content_video_url}')
- return
-
- if not wx_sns_content_thumb_url:
- logger.warning(f'转发视频缩略图消息为空不处理 {wx_sns_content_thumb_url}')
- return
-
- if not wxid:
- logger.warning(f'wxid 空不处理 {wxid}')
- return
-
- loginfo=await self.wx_auth_required_time_async(wxid)
- if not loginfo:
- return
-
- app_id=loginfo.get('appId','')
- token_id=loginfo.get('tokenId','')
- await self.wxchat.upload_send_video_sns_async(token_id, app_id,wx_sns_content_text,wx_sns_content_video_url,wx_sns_content_thumb_url)
-
- async def wx_auth_required_time_async(self,wxid:str)->dict:
- if not wxid:
- logger.warning(f'wxid 不能为空')
- return None
-
- # 模拟获取登录信息
- k, loginfo = await self.wxchat.get_login_info_by_wxid_async(wxid)
- if not loginfo:
- logger.warning(f'{wxid} 微信信息不存在')
- return None
-
- login_status = loginfo.get('status', '0')
- if login_status != '1':
- logger.warning(f'{wxid} 已经离线')
- return None
-
- creation_timestamp = int(loginfo.get('create_at', time.time()))
- current_timestamp = time.time()
- three_days_seconds = 3 * 24 * 60 * 60 # 三天的秒数
- diff_flag = (current_timestamp - creation_timestamp) >= three_days_seconds
- if not diff_flag:
- logger.warning(f'{wxid} 用户创建不够三天,不能使用该功能')
- return None
- return loginfo
-
|