import aiohttp import asyncio import json import base64 import io import json import os import threading import time import uuid,random from fastapi import FastAPI, Depends from common.log import logger from common.singleton import singleton from services.kafka_service import KafkaService from fastapi import Request from common.utils import * from services.redis_service import RedisService from services.kafka_service import KafkaService from services.gewe_service import GeWeService @singleton class BizService(): def __init__(self,app:FastAPI): if not hasattr(self, 'initialized'): #self.kafka_service =kafka_service # 获取 KafkaService 单例 self.kafka_service:KafkaService =app.state.kafka_service self.wxchat:GeWeService=app.state.gewe_service self.redis_service:RedisService=app.state.redis_service self.initialized = True def setup_handlers(self): """设置默认的消息处理器""" # 这里可以添加业务逻辑 # 注册默认处理器 self.kafka_service.add_handler( self.kafka_service.consumer_topic, self.ops_messages_process_handler ) async def ops_messages_process_handler(self, message: str): """消息处理器""" #print(f"BizService handling message: {message}") try: msg_content = message cleaned_content = clean_json_string(msg_content) content = json.loads(cleaned_content) data = content.get("data", {}) msg_type_data = data.get("msg_type", None) content_data = data.get("content", {}) # if msg_type_data=="login": # await self.login_handler_async(content_data) # elif msg_type_data == 'group-sending': # print(f'处理消息类型group-sending') # await self.group_sending_handler_async(content_data) # elif msg_type_data == 'login_wx_captch_code': # pass # elif msg_type_data == 'sns-sendtext-forward': # pass # elif msg_type_data == 'sns-sendimages-forward': # pass # elif msg_type_data == 'sns-sendvideo-forward': # pass # else: # print(f'kakfa 未处理息类型 {msg_type_data}') match msg_type_data: case 'login': await self.login_handler_async(content_data) case 'group-sending': await self.group_sending_handler_async(content_data) case 'sns-sendtext-forward': await self.sns_sendtext_forward_handler_async(content_data) case 'sns-sendtext': await self.sns_sendtext_handler_async(content_data) case 'sns-sendimages-forward': await self.sns_sendimages_forward_handler_async(content_data) case 'sns-sendimages': await self.sns_sendimages_handler_async(content_data) case 'sns-sendvideo-forward': await self.sns_sendvideo_forward_handler_async(content_data) case 'sns-sendvideo': await self.sns_sendvideo_handler_async(content_data) case _: logger.warning(f'kakfa 未处理息类型 {msg_type_data}') except Exception as e: print(f"处理消息时发生错误: {e}, 消息内容: {message}") async def login_handler_async(self, content_data: dict): tel=content_data.get('tel', '18733438393') token_id=content_data.get('token_id', 'c50b7d57-2efa-4a53-8c11-104a06d1e1fa') region_id=content_data.get('region_id', '440000') agent_token_id=content_data.get('agent_token_id', 'sk-fAOIdANeGXjWKW5mFybnsNZZGYU2lFLmqVY9rVFaFmjiOaWt3tcWMi') loginfo= await self.wxchat.get_login_info_from_cache_async(tel,token_id,region_id,agent_token_id) print(loginfo) status=loginfo.get('status','0') if status=='1': logger.info(f'手机号{tel},wx_token{token_id} 已经微信登录,终止登录流程') return async def group_sending_handler_async(self,content_data: dict): agent_tel=content_data.get('agent_tel', '18733438393') hash_key = f"__AI_OPS_WX__:LOGININFO:{agent_tel}" logininfo = await self.redis_service.get_hash(hash_key) if not logininfo: logger.warning(f"未找到 {agent_tel} 的登录信息") return token_id = logininfo.get('tokenId') app_id = logininfo.get('appId') agent_wxid = logininfo.get('wxid') # 获取联系人列表并计算交集 hash_key = f"__AI_OPS_WX__:CONTACTS_BRIEF:{agent_wxid}" cache_friend_wxids_str=await self.redis_service.get_hash_field(hash_key,"data") cache_friend_wxids_list=json.loads(cache_friend_wxids_str) if cache_friend_wxids_str else [] cache_friend_wxids=[f["userName"] for f in cache_friend_wxids_list] # 获取群交集 hash_key = f"__AI_OPS_WX__:GROUPS_INFO:{agent_wxid}" cache_chatrooms = await self.redis_service.get_hash(hash_key) cache_chatroom_ids=cache_chatrooms.keys() wxid_contact_list_content_data = [c['wxid'] for c in content_data.get("contact_list", [])] intersection_friend_wxids = list(set(cache_friend_wxids) & set(wxid_contact_list_content_data)) intersection_chatroom_ids = list(set(cache_chatroom_ids) & set(wxid_contact_list_content_data)) intersection_wxids=intersection_friend_wxids+intersection_chatroom_ids # 发送消息 wx_content_list = content_data.get("wx_content", []) self.wxchat.forward_video_aeskey = '' self.wxchat.forward_video_cdnvideourl = '' self.wxchat.forward_video_length = 0 self.wxchat.video_duration = 0 self.wxchat.forward_image_aeskey = '' self.wxchat.forward_image_cdnthumburl = '' self.wxchat.forward_image_cdnthumblength=0 self.wxchat.forward_image_cdnthumbheight=0 self.wxchat.forward_image_cdnthumbwidth=0 self.wxchat.forward_image_length=0 self.wxchat.forward_image_md5='' self.wxchat.forward_file_aeskey = '' for intersection_wxid in intersection_wxids: for wx_content in wx_content_list: if wx_content["type"] == "text": await self.send_text_message_async(token_id, app_id, agent_wxid, [intersection_wxid], wx_content["text"]) elif wx_content["type"] == "image_url": await self.send_image_messagae_sync(token_id, app_id, agent_wxid, [intersection_wxid], wx_content.get("image_url", {}).get("url")) elif wx_content["type"] == "tts": await self.send_tts_message(token_id, app_id, agent_wxid, [intersection_wxid], wx_content["text"]) elif wx_content["type"] == "file": await self.send_file_message(token_id, app_id, agent_wxid, [intersection_wxid], wx_content.get("file_url", {}).get("url")) async def send_text_message_async(self, token_id, app_id, agent_wxid, intersection_wxids, text): for t in intersection_wxids: # 发送文本消息 ret,ret_msg,res = await self.wxchat.post_text_async(token_id, app_id, t, text) logger.info(f'{agent_wxid} 向 {t} 发送文字【{text}】') # 构造对话消息并发送到 Kafka input_wx_content_dialogue_message = [{"type": "text", "text": text}] input_message = dialogue_message(agent_wxid, t, input_wx_content_dialogue_message) await self.kafka_service.send_message_async(input_message) logger.info("发送对话 %s", input_message) # 等待随机时间 await asyncio.sleep(random.uniform(1.5, 3)) async def send_image_messagae_sync(self,token_id, app_id, agent_wxid, intersection_wxids, image_url): #aeskey, cdnthumburl, cdnthumblength, cdnthumbheight, cdnthumbwidth, length, md5 = "", "", 0, 0, 0, 0, "" for t in intersection_wxids: if t == intersection_wxids[0]: # 发送图片 ret,ret_msg,res = await self.wxchat.post_image_async(token_id, app_id, t, image_url) if ret==200: self.wxchat.forward_image_aeskey = res["aesKey"] self.wxchat.forward_image_cdnthumburl = res["fileId"] self.wxchat.forward_image_cdnthumblength = res["cdnThumbLength"] self.wxchat.forward_image_cdnthumbheight = res["height"] self.wxchat.forward_image_cdnthumbwidth = res["width"] self.wxchat.forward_image_length = res["length"] self.wxchat.forward_image_md5 = res["md5"] logger.info(f'{agent_wxid} 向 {t} 发送图片【{image_url}】{ret_msg}') else: logger.warning(f'{agent_wxid} 向 {t} 发送图片【{image_url}】{ret_msg}') else: if self.wxchat.forward_image_aeskey !="": # 转发图片 ret,ret_msg,res = await self.wxchat.forward_image_async(token_id, app_id, t, self.wxchat.forward_image_aeskey, self.wxchat.forward_image_cdnthumburl, self.wxchat.forward_image_cdnthumblengt, self.wxchat.forward_image_cdnthumbheight, self.wxchat.forward_image_cdnthumbwidth, self.wxchat.forward_image_length, self.wxchat.forward_image_md5) logger.info(f'{agent_wxid} 向 {t} 转发图片【{image_url}】{ret_msg}') else: # 发送图片 ret,ret_msg,res = await self.wxchat.post_image_async(token_id, app_id, t, image_url) if ret==200: self.wxchat.forward_image_aeskey = res["aesKey"] self.wxchat.forward_image_cdnthumburl = res["fileId"] self.wxchat.forward_image_cdnthumblength = res["cdnThumbLength"] self.wxchat.forward_image_cdnthumbheight = res["height"] self.wxchat.forward_image_cdnthumbwidth = res["width"] self.wxchat.forward_image_length = res["length"] self.wxchat.forward_image_md5 = res["md5"] logger.info(f'{agent_wxid} 向 {t} 发送图片【{image_url}】{ret_msg}') else: logger.warning(f'{agent_wxid} 向 {t} 发送图片【{image_url}】{ret_msg}') # 构造对话消息并发送到 Kafka wx_content_dialogue_message = [{"type": "image_url", "image_url": {"url": image_url}}] input_message = dialogue_message(agent_wxid, t, wx_content_dialogue_message) await self.kafka_service.send_message_async(input_message) logger.info("发送对话 %s", input_message) # 等待随机时间 await asyncio.sleep(random.uniform(1.5, 3)) # async def send_image_messagae_sync(self,token_id, app_id, agent_wxid, intersection_wxids, image_url): # aeskey, cdnthumburl, cdnthumblength, cdnthumbheight, cdnthumbwidth, length, md5 = "", "", 0, 0, 0, 0, "" # for t in intersection_wxids: # if t == intersection_wxids[0]: # # 发送图片 # ret,ret_msg,res = await self.wxchat.post_image_async(token_id, app_id, t, image_url) # if ret==200: # aeskey = res["aesKey"] # cdnthumburl = res["fileId"] # cdnthumblength = res["cdnThumbLength"] # cdnthumbheight = res["height"] # cdnthumbwidth = res["width"] # length = res["length"] # md5 = res["md5"] # logger.info(f'{agent_wxid} 向 {t} 发送图片【{image_url}】{ret_msg}') # else: # logger.warning(f'{agent_wxid} 向 {t} 发送图片【{image_url}】{ret_msg}') # else: # if aeskey !="": # # 转发图片 # ret,ret_msg,res = await self.wxchat.forward_image_async(token_id, app_id, t, aeskey, cdnthumburl, cdnthumblength, cdnthumbheight, cdnthumbwidth, length, md5) # logger.info(f'{agent_wxid} 向 {t} 转发图片【{image_url}】{ret_msg}') # else: # # 发送图片 # ret,ret_msg,res = await self.wxchat.post_image_async(token_id, app_id, t, image_url) # if ret==200: # aeskey = res["aesKey"] # cdnthumburl = res["fileId"] # cdnthumblength = res["cdnThumbLength"] # cdnthumbheight = res["height"] # cdnthumbwidth = res["width"] # length = res["length"] # md5 = res["md5"] # logger.info(f'{agent_wxid} 向 {t} 发送图片【{image_url}】{ret_msg}') # else: # logger.warning(f'{agent_wxid} 向 {t} 发送图片【{image_url}】{ret_msg}') # # 构造对话消息并发送到 Kafka # wx_content_dialogue_message = [{"type": "image_url", "image_url": {"url": image_url}}] # input_message = dialogue_message(agent_wxid, t, wx_content_dialogue_message) # await self.kafka_service.send_message_async(input_message) # logger.info("发送对话 %s", input_message) # # 等待随机时间 # await asyncio.sleep(random.uniform(1.5, 3)) async def send_tts_message(self, token_id, app_id, agent_wxid, intersection_wxids, text): voice_during,voice_url=wx_voice(text) for t in intersection_wxids: # 发送送语音消息 if voice_url: ret,ret_msg,res = await self.wxchat.post_voice_async(token_id, app_id, t, voice_url,voice_during) if ret==200: logger.info(f'{agent_wxid} 向 {t} 发送语音文本【{text}】{ret_msg}') # 构造对话消息并发送到 Kafka input_wx_content_dialogue_message = [{"type": "text", "text": text}] input_message = dialogue_message(agent_wxid, t, input_wx_content_dialogue_message) await self.kafka_service.send_message_async(input_message) logger.info("发送对话 %s", input_message) else: logger.warning((f'{agent_wxid} 向 {t} 发送语音文本【{text}】{ret_msg}')) else: logger.warning((f'{agent_wxid} 向 {t} 发送语音文本【{text}】出错')) # 等待随机时间 await asyncio.sleep(random.uniform(1.5, 3)) async def send_file_message(self,token_id, app_id, agent_wxid, intersection_wxids, file_url): parsed_url = urlparse(file_url) path = parsed_url.path # 从路径中提取文件名 filename = path.split('/')[-1] # 获取扩展名 _, ext = os.path.splitext(filename) if ext == '.mp4': await self.send_video_message(token_id, app_id, agent_wxid, intersection_wxids, file_url) # if ext == '.pdf': # await self.send_pdf_message_async(token_id, app_id, agent_wxid, intersection_wxids, file_url) else: await self.send_other_file_message(token_id, app_id, agent_wxid, intersection_wxids, file_url) #time.sleep(random.uniform(1.5, 3)) async def send_video_message(self, token_id, app_id, agent_wxid, intersection_wxids, file_url): for t in intersection_wxids: # 发送视频消息 parsed_url = urlparse(file_url) filename = os.path.basename(parsed_url.path) tmp_file_path = os.path.join(os.getcwd(),'tmp', filename) # 拼接完整路径 thumbnail_path=tmp_file_path.replace('.mp4','.jpg') if self.wxchat.forward_video_aeskey == '': video_thumb_url,video_duration =download_video_and_get_thumbnail(file_url,thumbnail_path) print(f'视频缩略图 {video_thumb_url} 时长 {video_duration}') ret,ret_msg,res = await self.wxchat.post_video_async(token_id, app_id, t, file_url,video_thumb_url,video_duration) if ret==200: self.wxchat.forward_video_aeskey = res["aesKey"] self.wxchat.forward_video_cdnvideourl = res["cdnThumbUrl"] self.wxchat.forward_video_length = res["length"] self.wxchat.video_duration=int(video_duration) else: ret,ret_msg,res = await self.wxchat.forward_video_async(token_id, app_id, t, self.wxchat.forward_video_aeskey, self.wxchat.forward_video_cdnvideourl, self.wxchat.forward_video_length,self.wxchat.video_duration) print('转发视频') if ret==200: logger.info(f'{agent_wxid} 向 {t} 发送视频【{file_url}】{ret_msg}') # 构造对话消息并发送到 Kafka input_wx_content_dialogue_message = [{"type": "file", "file_url": {"url": file_url}}] input_message = dialogue_message(agent_wxid, t, input_wx_content_dialogue_message) await self.kafka_service.send_message_async(input_message) logger.info("发送对话 %s", input_message) else: logger.warning((f'{agent_wxid} 向 {t} 发送视频【{file_url}】{ret_msg}')) # 等待随机时间 await asyncio.sleep(random.uniform(1.5, 3)) async def send_pdf_message_async(self, token_id, app_id, agent_wxid, intersection_wxids, file_url): print('send_pdf_message_async') async def send_other_file_message(self, token_id, app_id, agent_wxid, intersection_wxids, file_url): parsed_url = urlparse(file_url) filename = os.path.basename(parsed_url.path) for t in intersection_wxids: # 发送文件消息 ret,ret_msg,res = await self.wxchat.post_file_async(token_id, app_id, t, file_url,filename) if ret==200: logger.info(f'{agent_wxid} 向 {t} 发送文件【{file_url}】{ret_msg}') # 构造对话消息并发送到 Kafka input_wx_content_dialogue_message = [{"type": "file", "file_url": {"url": file_url}}] input_message = dialogue_message(agent_wxid, t, input_wx_content_dialogue_message) await self.kafka_service.send_message_async(input_message) logger.info("发送对话 %s", input_message) else: logger.warning((f'{agent_wxid} 向 {t} 发送文件【{file_url}】{ret_msg}')) # 等待随机时间 await asyncio.sleep(random.uniform(1.5, 3)) async def sns_sendtext_forward_handler_async(self,content_data): wxids=content_data.get('wxids',[]) wx_sns_content_text=content_data.get("wx_sns_content",{}).get("text","") if not wx_sns_content_text: logger.warning(f'转发文本消息为空不处理 {wx_sns_content_text}') return if not wxids: logger.warning(f'wxids 空列表不处理 {wxids}') return tasks = [] for wxid in wxids: loginfo=await self.wx_auth_required_time_async(wxid) if not loginfo: continue app_id=loginfo.get('appId','') token_id=loginfo.get('tokenId','') tasks.append(self.wxchat.send_text_sns_async(token_id, app_id, wx_sns_content_text)) await asyncio.gather(*tasks) async def sns_sendimages_forward_handler_async(self,content_data): wxids=content_data.get('wxids',[]) wx_sns_content_text=content_data.get("wx_sns_content",{}).get("text","") wx_sns_content_imgs=content_data.get("wx_sns_content",{}).get("imageUrls",[]) if not wx_sns_content_imgs: logger.warning(f'转发图片消息为空不处理 {wx_sns_content_imgs}') return tasks = [] for wxid in wxids: loginfo=await self.wx_auth_required_time_async(wxid) if not loginfo: continue app_id=loginfo.get('appId','') token_id=loginfo.get('tokenId','') tasks.append(self.wxchat.upload_send_image_sns_async(token_id, app_id, wx_sns_content_text,wx_sns_content_imgs)) await asyncio.gather(*tasks) async def sns_sendvideo_forward_handler_async(self,content_data): wxids=content_data.get('wxids',[]) wx_sns_content_text=content_data.get("wx_sns_content",{}).get("text","") wx_sns_content_video_url=content_data.get("wx_sns_content",{}).get("videoUrl","") wx_sns_content_thumb_url=content_data.get("wx_sns_content",{}).get("videoThumbUrl","") if not wx_sns_content_thumb_url: logger.warning(f'转发视频缩略图消息为空不处理 {wx_sns_content_thumb_url}') return if not wx_sns_content_video_url: logger.warning(f'转发视频消息为空不处理 {wx_sns_content_video_url}') return if not wx_sns_content_thumb_url: logger.warning(f'转发视频缩略图消息为空不处理 {wx_sns_content_thumb_url}') return tasks = [] for wxid in wxids: loginfo=await self.wx_auth_required_time_async(wxid) if not loginfo: continue app_id=loginfo.get('appId','') token_id=loginfo.get('tokenId','') tasks.append(self.wxchat.upload_send_video_sns_async(token_id, app_id,wx_sns_content_text,wx_sns_content_video_url,wx_sns_content_thumb_url)) await asyncio.gather(*tasks) async def sns_sendtext_handler_async(self,content_data): wxid=content_data.get('wxid','') wx_sns_content_text=content_data.get("wx_sns_content",{}).get("text","") if not wx_sns_content_text: logger.warning(f'转发文本消息为空不处理 {wx_sns_content_text}') return if not wxid: logger.warning(f'wxid 空不处理 {wxid}') return loginfo=await self.wx_auth_required_time_async(wxid) if not loginfo: return app_id=loginfo.get('appId','') token_id=loginfo.get('tokenId','') await self.wxchat.send_text_sns_async(token_id, app_id, wx_sns_content_text) async def sns_sendimages_handler_async(self,content_data): wxid=content_data.get('wxid','') wx_sns_content_text=content_data.get("wx_sns_content",{}).get("text","") wx_sns_content_imgs=content_data.get("wx_sns_content",{}).get("imageUrls",[]) if not wx_sns_content_imgs: logger.warning(f'转发图片消息为空不处理 {wx_sns_content_imgs}') return if not wxid: logger.warning(f'wxid 空不处理 {wxid}') return loginfo=await self.wx_auth_required_time_async(wxid) if not loginfo: return app_id=loginfo.get('appId','') token_id=loginfo.get('tokenId','') await self.wxchat.upload_send_image_sns_async(token_id, app_id, wx_sns_content_text,wx_sns_content_imgs) async def sns_sendvideo_handler_async(self,content_data): wxid=content_data.get('wxid','') wx_sns_content_text=content_data.get("wx_sns_content",{}).get("text","") wx_sns_content_video_url=content_data.get("wx_sns_content",{}).get("videoUrl","") wx_sns_content_thumb_url=content_data.get("wx_sns_content",{}).get("videoThumbUrl","") if not wx_sns_content_thumb_url: logger.warning(f'转发视频缩略图消息为空不处理 {wx_sns_content_thumb_url}') return if not wx_sns_content_video_url: logger.warning(f'转发视频消息为空不处理 {wx_sns_content_video_url}') return if not wx_sns_content_thumb_url: logger.warning(f'转发视频缩略图消息为空不处理 {wx_sns_content_thumb_url}') return if not wxid: logger.warning(f'wxid 空不处理 {wxid}') return loginfo=await self.wx_auth_required_time_async(wxid) if not loginfo: return app_id=loginfo.get('appId','') token_id=loginfo.get('tokenId','') await self.wxchat.upload_send_video_sns_async(token_id, app_id,wx_sns_content_text,wx_sns_content_video_url,wx_sns_content_thumb_url) async def wx_auth_required_time_async(self,wxid:str)->dict: if not wxid: logger.warning(f'wxid 不能为空') return None # 模拟获取登录信息 k, loginfo = await self.wxchat.get_login_info_by_wxid_async(wxid) if not loginfo: logger.warning(f'{wxid} 微信信息不存在') return None login_status = loginfo.get('status', '0') if login_status != '1': logger.warning(f'{wxid} 已经离线') return None creation_timestamp = int(loginfo.get('create_at', time.time())) current_timestamp = time.time() three_days_seconds = 3 * 24 * 60 * 60 # 三天的秒数 diff_flag = (current_timestamp - creation_timestamp) >= three_days_seconds if not diff_flag: logger.warning(f'{wxid} 用户创建不够三天,不能使用该功能') return None return loginfo