from voice.ali.ali_voice import AliVoice from common.log import logger import xml.etree.ElementTree as ET import os,json,asyncio,aiohttp,random from aiohttp import ClientError from voice import audio_convert from fastapi import APIRouter,Request from pydantic import BaseModel from fastapi import APIRouter, Depends from typing import Dict, Any from model.models import AgentConfig,OperationType from common.utils import * from common.memory import * import traceback import sys timeout_duration = 8.0 messages_router = APIRouter() #WX_BACKLIST=['fmessage', 'medianote','weixin','weixingongzhong','tmessage'] WX_BACKLIST=['medianote','weixin','weixingongzhong','tmessage'] @messages_router.post("/messages",response_model=None) async def get_messages(request: Request, body: Dict[str, Any]): logger.info(f"收到微信回调消息: {json.dumps(body, separators=(',', ':'),ensure_ascii=False)}") try: msg = body type_name =msg.get("TypeName") app_id = msg.get("Appid") k, loginfo = await request.app.state.gewe_service.get_login_info_by_app_id_async(app_id) if not k: logger.warning('找不到登录信息,不处理') return {"message": f"收到微信回调消息: {type_name}"} token_id=loginfo.get('tokenId','') wxid = msg.get("Wxid",'') if type_name == 'AddMsg': await handle_self_cmd_async(request,wxid,msg) msg_data = msg.get("Data") from_wxid = msg_data["FromUserName"]["string"] config=await request.app.state.redis_service.get_hash(f"__AI_OPS_WX__:WXCHAT_CONFIG") wxids = list(config.keys()) meged_backlist_wxids=wxids+WX_BACKLIST # 公众号ID已gh_开头 if (from_wxid in meged_backlist_wxids and from_wxid != wxid) or 'gh_' in from_wxid: logger.warning(f'来自微信ID {from_wxid} 在黑名单,发送给 {wxid} ,不处理') return {"message": "收到微信回调消息"} await handle_messages_async(request,token_id,msg) return {"message": "收到微信回调消息,处理完成"} except Exception as e: # 获取当前的堆栈跟踪 tb = sys.exc_info()[2] # 为异常附加堆栈跟踪 e = e.with_traceback(tb) # 输出详细的错误信息 logger.error(f"处理微信回调消息出错: {json.dumps(body, separators=(',', ':'),ensure_ascii=False)}") logger.error(f"异常类型: {type(e).__name__}") logger.error(f"异常信息: {str(e)}") logger.error(f"堆栈跟踪: {traceback.format_exc()}") return {"message": "处理微信回调消息错误"} async def handle_self_cmd_async(request: Request,wxid,msg): ''' 个人微信命令处理 如果是个人微信的指令,wxid == from_wxid commands = { '启用托管': True, '关闭托管': False } ''' msg_data=msg.get("Data") from_wxid=msg_data["FromUserName"]["string"] msg_content=msg_data["Content"]["string"] if wxid == from_wxid: commands = { '启用托管': True, '关闭托管': False } if msg_content in commands: c_dict = await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wxid) if c_dict: agent_config=AgentConfig.model_validate(c_dict) agent_config.agentEnabled=commands[msg_content] await request.app.state.gewe_service.wxchat.save_wxchat_config_async(wxid, agent_config.model_dump()) logger.info(f'{wxid} {"启动" if commands[msg_content] else "关闭"}托管') async def handle_messages_async(request: Request,token_id,msg): #msg_data=msg.get("Data") type_name =msg.get("TypeName") # app_id = msg.get("Appid") # from_wxid=msg_data["FromUserName"]["string"] # msg_content=msg_data["Content"]["string"] wxid = msg.get("Wxid",'') match type_name: case 'AddMsg': await handle_add_messages_async(request,token_id,msg,wxid) case 'ModContacts': await handle_mod_contacts_async(request,token_id,msg,wxid) case 'DelContacts': await handle_del_contacts_async(request,token_id,msg,wxid) case 'Offline': await handle_offline_async(request,token_id,msg,wxid) case _: logger.warning(f'未知消息类型:{type_name}') # async def gpt_client_async(request,messages: list, wixd: str, friend_wxid: str): # c = await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wixd) # api_key = c.get('agentTokenId', "sk-jr69ONIehfGKe9JFphuNk4DU5Y5wooHKHhQv7oSnFzVbwCnW65fXO9kvH") # print(f'流程key:{api_key}\n') # #api_key="sk-jr69ONIehfGKe9JFphuNk4DU5Y5wooHKHhQv7oSnFzVbwCnW65fXO9kvH" #测试 # #api_key="sk-uJDBdKmJVb2cmfldGOvlIY6Qx0AzqWMPD3lS1IzgQYzHNOXv9SKNI" #开发2 # api_url = "http://106.15.182.218:3000/api/v1/chat/completions" # headers = { # "Content-Type": "application/json", # "Authorization": f"Bearer {api_key}" # } # session_id = f'{wixd}-{friend_wxid}' # data = { # "model": "", # "messages": messages, # "chatId": session_id, # "detail": True # } # logger.info("[CHATGPT] 请求={}".format(json.dumps(data, ensure_ascii=False))) # async with aiohttp.ClientSession() as session: # try: # async with session.post(url=api_url, headers=headers, data=json.dumps(data), timeout=600) as response: # response.raise_for_status() # response_data = await response.json() # logger.info("[CHATGPT] 响应={}".format(json.dumps(response_data, separators=(',', ':'), ensure_ascii=False))) # return response_data # except aiohttp.ClientError as e: # logger.error(f"[CHATGPT] 请求失败: {e}") # raise async def gpt_client_async(request, messages: list, wixd: str, friend_wxid: str): max_retries = 3 retry_delay = 5 # 重试间隔时间(秒) for attempt in range(max_retries): try: c = await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wixd) api_key = c.get('agentTokenId', "sk-jr69ONIehfGKe9JFphuNk4DU5Y5wooHKHhQv7oSnFzVbwCnW65fXO9kvH") base_url="http://106.15.182.218:3000" environment = os.environ.get('environment', 'default') if environment != 'default': base_url="http://172.19.42.59:3000" api_url = f"{base_url}/api/v1/chat/completions" headers = { "Content-Type": "application/json", "Authorization": f"Bearer {api_key}" } session_id = f'{wixd}-{friend_wxid}' data = { "model": "", "messages": messages, "chatId": session_id, "detail": True } logger.info("[CHATGPT] 请求={}".format(json.dumps(data, ensure_ascii=False))) async with aiohttp.ClientSession() as session: async with session.post(url=api_url, headers=headers, data=json.dumps(data), timeout=1200) as response: response.raise_for_status() response_data = await response.json() return response_data except (ClientError, asyncio.TimeoutError) as e: logger.error(f"[CHATGPT] 请求失败(尝试 {attempt + 1}/{max_retries}): {e}") if attempt < max_retries - 1: await asyncio.sleep(retry_delay) else: raise async def handle_add_messages_async(request: Request,token_id,msg,wxid): wx_config =await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wxid) # if not bool(wx_config.get("agentEnabled",False)): # logger.info(f'微信ID {wxid} 未托管,不处理') # return app_id = msg.get("Appid") msg_data = msg.get("Data") msg_type = msg_data.get("MsgType",None) from_wxid = msg_data["FromUserName"]["string"] to_wxid = msg_data["ToUserName"]["string"] msg_push_content=msg_data.get("PushContent") validated_config = AgentConfig.model_validate(wx_config) if not validated_config.agentEnabled: logger.info(f'微信ID {wxid} 未托管,不处理 {msg_type}') return handlers = { 1: handle_text_async, 3: handle_image_async, 34: handle_voice_async, 42: handle_name_card_async, 49: handle_xml_async, 37: handle_add_friend_notice_async, 10002: handle_10002_msg_async, 10000: handle_10000_msg_async } # (扫码进群情况)判断受否是群聊,并添加到通信录 if check_chatroom(from_wxid) or check_chatroom(to_wxid): logger.info('群信息') chatroom_id=from_wxid ret,msg,data=await request.app.state.gewe_service.save_contract_list_async(token_id,app_id,chatroom_id,3) logger.info(f'保存到通讯录 chatroom_id {chatroom_id} {msg}') await request.app.state.gewe_service.update_group_info_to_cache_async(token_id,app_id,wxid,chatroom_id) await request.app.state.gewe_service.update_group_members_to_cache_async(token_id,app_id,wxid,chatroom_id) handlers[1]=handle_text_group_async handlers[3]=handle_image_group_async handlers[34]=handle_voice_group_async handler = handlers.get(msg_type) if handler: return await handler(request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid) else: logger.warning(f"微信回调消息类型 {msg_type} 未处理") async def handle_mod_contacts_async(request: Request,token_id,msg,wxid): ''' 好友通过验证及好友资料变更的通知消息 ''' wxid = msg.get("Wxid") msg_data = msg.get("Data") app_id = msg.get("Appid") # #handle_mod_contacts(token_id,app_id,wxid,msg_data) # user_name=msg_data.get("UserName",{}).get("string","") if not check_chatroom(user_name): # loop = asyncio.get_event_loop() # future = asyncio.run_coroutine_threadsafe( # handle_mod_contacts_worker_async(request,token_id,app_id,wxid,msg_data), # loop # ) await handle_mod_contacts_worker_async(request,token_id,app_id,wxid,msg_data) #判断是否好友关系 # is_friends=True # if is_friends: # loop = asyncio.get_event_loop() # future = asyncio.run_coroutine_threadsafe( # handle_mod_contacts_worker_async(request,token_id,app_id,wxid,msg_data), # loop # ) # contact_wxid =msg_data.get("UserName",{}).get("string","") #msg_data["UserName"]["string"] # nickname= msg_data.get("NickName",{}).get("string","")#msg_data["NickName"]["string"] # city=msg_data.get("City",None) # signature=msg_data.get("Signature",None) # province=msg_data.get("Province",None) # bigHeadImgUrl=msg_data.get("SnsUserInfo",{}).get("SnsBgimgId",None) #msg_data["SnsUserInfo"]["SnsBgimgId"] # country=msg_data.get("Country",None) # sex=msg_data.get("Sex",None) # pyInitial= msg_data.get("PyInitial",{}).get("string",None)#msg_data["PyInitial"]["string"] # quanPin=msg_data.get("QuanPin",{}).get("string",None) #msg_data["QuanPin"]["string"] # remark=msg_data.get("Remark",{}).get("string",None) # remarkPyInitial=msg_data.get("RemarkPyInitial",{}).get("string",None) # remarkQuanPin=msg_data.get("RemarkQuanPin",{}).get("string",None) # smallHeadImgUrl=msg_data.get("smallHeadImgUrl",None) # #推动到kafka # contact_data = { # "alias": None, # "bigHeadImgUrl": bigHeadImgUrl, # "cardImgUrl": None, # "city": city, # "country": country, # "description": None, # "labelList": None, # "nickName": nickname, # "phoneNumList": None, # "province": province, # "pyInitial": pyInitial, # "quanPin": quanPin, # "remark": remark, # "remarkPyInitial": remarkPyInitial, # "remarkQuanPin": remarkQuanPin, # "sex": sex, # "signature": signature, # "smallHeadImgUrl": smallHeadImgUrl, # "snsBgImg": None, # "userName": contact_wxid # } # input_message=wx_mod_contact_message(wxid,contact_data) # await request.app.state.kafka_service.send_message_async(input_message) # else: # logger.info('删除好友通知') # contact_wxid =msg_data.get("UserName",{}).get("string","") # # 推送到kafka # input_message=wx_del_contact_message(wxid,contact_wxid) # await request.app.state.kafka_service.send_message_async(input_message) else: logger.info('群信息变更通知') chatroom_id=user_name ret,msg,data=await request.app.state.gewe_service.save_contract_list_async(token_id,app_id,chatroom_id,3) logger.info(f'保存到通讯录 chatroom_id {chatroom_id} {msg}') await request.app.state.gewe_service.update_group_info_to_cache_async(token_id,app_id,wxid,chatroom_id) await request.app.state.gewe_service.update_group_members_to_cache_async(token_id,app_id,wxid,chatroom_id) # group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id) # k_message=wx_mod_group_info_members_message(wxid,group_info_members) # 全量推送 #all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid) #k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers) k_message=wx_groups_info_members_key_message(wxid) await request.app.state.kafka_service.send_message_async(k_message) async def handle_del_contacts_async(request: Request,token_id,msg,wxid): ''' 删除好友通知/退出群聊 ''' msg_data = msg.get("Data") username=msg_data["UserName"]["string"] if check_chatroom(username): logger.info('退出群聊') wxid = msg.get("Wxid") chatroom_id=username await request.app.state.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id) await request.app.state.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_MEMBERS:{wxid}',chatroom_id) logger.info(f'清除 chatroom_id{chatroom_id} 数据') # 推送删除群资料到kafka k_message=wx_del_group_message(wxid,chatroom_id) await request.app.state.kafka_service.send_message_async(k_message) else: logger.info('删除好友通知') # 推送到kafka input_message=wx_del_contact_message(wxid,username) await request.app.state.kafka_service.send_message_async(input_message) async def handle_offline_async(request: Request,token_id,msg,wxid): ''' 已经离线 ''' wxid = msg.get("Wxid") app_id = msg.get("Appid") logger.warning(f'微信ID {wxid}在设备{app_id}已经离线') k,r=await request.app.state.gewe_service.get_login_info_by_app_id_async(app_id) print(k) await request.app.state.redis_service.update_hash_field(k,'status',0) await request.app.state.redis_service.update_hash_field(k,'modify_at',int(time.time())) # 推送到kafka input_message=wx_offline_message(app_id,wxid) await request.app.state.kafka_service.send_message_async(input_message) async def handle_mod_contacts_worker_async(request:Request,token_id,app_id,wxid,msg_data): ''' 好友通过验证及好友资料变更的通知消息 ''' logger.info('好友通过验证及好友资料变更的通知消息') if not check_chatroom(msg_data["UserName"]["string"]): contact_wxid = msg_data["UserName"]["string"] ret,msg,contacts_list = await request.app.state.gewe_service.fetch_contacts_list_async(token_id, app_id) friend_wxids = [c for c in contacts_list['friends'] if c not in ['fmessage', 'medianote','weixin','weixingongzhong','tmessage']] # 可以调整截取范围 print(f'{wxid}的好友资料变更,数量 {len(friend_wxids)}') data = await request.app.state.gewe_service.save_contacts_brief_to_cache_async(token_id, app_id, wxid, friend_wxids) #k_message = wx_all_contacts_message(wxid, data) k_message = wx_all_contacts_key_message(wxid) await request.app.state.kafka_service.send_message_async(k_message) else: logger.info('群聊好友通过验证及好友资料变更的通知消息') async def handle_text_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid): ''' 私聊文本消息 ''' config=await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wxid) validated_config = AgentConfig.model_validate(config) if not validated_config.privateGroupChatEnabled: logger.warning(f"微信号 {wxid} 关闭好友和群自动回复功能,{handle_text_async.__name__} 不回复消息") return # 判断是否转人工处理功能 is_human_handle_msg= await request.app.state.gewe_service.is_human_handle_msg_async(wxid) if is_human_handle_msg: logger.warning(f'微信号 {wxid} 暂时工人接管30分钟中') return msg_content=msg_data["Content"]["string"] if wxid == from_wxid: #手动发送消息 logger.info("Active message sending detected") await request.app.state.gewe_service.save_contacts_brief_to_cache_async(token_id,app_id,wxid,[to_wxid]) callback_to_user=msg_data["ToUserName"]["string"] # 转人工处理功能 await request.app.state.gewe_service.set_human_handle_msg_async(wxid,60*30) # 推送到kafka input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}] input_message=dialogue_message(from_wxid,to_wxid,input_wx_content_dialogue_message) await request.app.state.kafka_service.send_message_async(input_message) logger.info("发送对话 %s",input_message) else: callback_to_user=msg_data["FromUserName"]["string"] # 判断哪些关键词存在于 msg_content 中 keywords = ["预约", "报价", "购买", "价钱"] found_keywords = [keyword for keyword in keywords if keyword in msg_content] if found_keywords: await request.app.state.gewe_service.set_human_handle_msg_with_contact_wxid_async(wxid,callback_to_user,60*30) logger.info(f"{wxid} 收到 {callback_to_user} 私聊消息匹配到关键词:{', '.join(found_keywords)}") # 回复好友 await request.app.state.gewe_service.post_text_async(token_id, app_id, callback_to_user, "我这边目前有点忙,稍后回复您好吗?") # 推送到助理 print('推送到助理') return # 是否在转人工处理 is_human_handle_msg_with_contact_wxid= await request.app.state.gewe_service.is_human_handle_msg_with_contact_wxid_async(wxid,callback_to_user) if is_human_handle_msg_with_contact_wxid: logger.warning(f'微信号 {wxid} 与 {callback_to_user} 有关键字匹配,暂时工人接管30分钟中,请查看长服AI商机提醒助理') return # 创建并启动任务协程,将参数传递给 ai_chat_text 函数 task = asyncio.create_task( ai_chat_text_async(request,token_id, app_id, wxid, msg_data, msg_content) ) # 设置定时器,1秒后检查任务是否超时。这里需要使用 lambda 来传递参数 timeout_timer = asyncio.create_task( check_timeout_async(task, request,token_id, wxid, app_id, callback_to_user) ) # 等待任务协程完成 await task # 取消定时器 timeout_timer.cancel() async def check_timeout_async(task: asyncio.Task, request: Request,token_id, wxid, app_id, callback_to_user): await asyncio.sleep(timeout_duration) # 等待超时时间 if not task.done(): print(f"任务运行时间超过{timeout_duration}秒,token_id={token_id}, app_id={app_id}, callback_to_user={callback_to_user}") wx_config = await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wxid) if bool(wx_config.get("chatWaitingMsgEnabled", True)): # if callback_to_user == 'wxid_mesh33pw13e721': # logger.info(f'wxid_mesh33pw13e721 不发送到微信有关等待的AI回复到微信') # else: # phrases = ["稍等一下", "辛苦等等", "马上就好", "很快就好", "请稍后", "请等会", "稍等1分钟","请别急,在整理","就好了"] # # 随机选择一个短语 # random_phrase = random.choice(phrases) # await request.app.state.gewe_service.post_text_async(token_id, app_id, callback_to_user, random_phrase) phrases = ["稍等一下", "辛苦等等", "马上就好", "很快就好", "请稍后", "请等会", "稍等1分钟","请别急,在整理","就好了"] random_phrase = random.choice(phrases) await request.app.state.gewe_service.post_text_async(token_id, app_id, callback_to_user, random_phrase) #await request.app.state.gewe_service.post_text_async(token_id, app_id, callback_to_user, "亲,我正在组织回复的信息,请稍等一会") async def ai_chat_text_async(request: Request,token_id, app_id, wxid, msg_data, msg_content): start_time = time.time() # 记录任务开始时间 callback_to_user = msg_data["FromUserName"]["string"] k,loginfo=await request.app.state.gewe_service.get_login_info_by_wxid_async(wxid) wxid_nickname=loginfo.get("nickName") contacts_brief:list=await request.app.state.gewe_service.get_contacts_brief_from_cache_async(wxid) callback_to_user_nickname=next(filter(lambda x:x.get("userName") == callback_to_user,contacts_brief),None).get("nickName") hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}' prompt = {"role": "user", "content": [{ "type": "text", "text": msg_content }]} messages_to_send = await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, prompt) # 收到的对话 input_wx_content_dialogue_message = [{"type": "text", "text": msg_content}] input_message = dialogue_message(callback_to_user, wxid, input_wx_content_dialogue_message) await request.app.state.kafka_service.send_message_async(input_message) logger.info("发送对话 %s", input_message) cache_data = USER_INTERACTIVE_CACHE.get(wxid) if cache_data and cache_data.get('interactive'): o = get_first_char_if_digit(msg_content) if o is not None: userSelectOptions = cache_data.get('options') if o < len(userSelectOptions): o = o - 1 msg_content = userSelectOptions[o].get("value") messages_to_send = [{"role": "user", "content": msg_content}] else: messages_to_send = [{"role": "user", "content": msg_content}] else: messages_to_send = [{"role": "user", "content": msg_content}] res = await gpt_client_async(request,messages_to_send, wxid, callback_to_user) reply_content = remove_markdown_symbol(res["choices"][0]["message"]["content"]) description = '' userSelectOptions = [] if isinstance(reply_content, list) and any(item.get("type") == "interactive" for item in reply_content): for item in reply_content: if item["type"] == "interactive" and item["interactive"]["type"] == "userSelect": params = item["interactive"]["params"] description = params.get("description") userSelectOptions = params.get("userSelectOptions", []) values_string = "\n".join(option["value"] for option in userSelectOptions) if description is not None: USER_INTERACTIVE_CACHE[wxid] = { "interactive": True, "options": userSelectOptions, } reply_content = description + '------------------------------\n' + values_string elif isinstance(reply_content, list) and any(item.get("type") == "text" for item in reply_content): USER_INTERACTIVE_CACHE[wxid] = { "interactive": False } text = '' for item in reply_content: if item["type"] == "text": text = item["text"]["content"] if text == '': # 去除上次上一轮对话再次请求 cache_messages_str = await request.app.state.redis_service.get_hash_field(hash_key, "data") cache_messages = json.loads(cache_messages_str) if cache_messages_str else [] if len(cache_messages) >= 3: cache_messages = cache_messages[:-3] await request.app.state.redis_service.update_hash_field(hash_key, "data", json.dumps(cache_messages, ensure_ascii=False)) messages_to_send = await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, prompt) res = await gpt_client_async(request,messages_to_send, wxid, callback_to_user) reply_content = remove_markdown_symbol(res["choices"][0]["message"]["content"]) if isinstance(reply_content, list): reply_content = remove_markdown_symbol(reply_content[0].get('text').get("content")) else: reply_content = text else: USER_INTERACTIVE_CACHE[wxid] = { "interactive": False } reply_content = remove_markdown_symbol(res["choices"][0]["message"]["content"]) if reply_content == '不回复': end_time = time.time() # 记录任务结束时间 execution_time = end_time - start_time # 计算执行时间 logger.warning(f"AI回答任务完成,耗时 {execution_time:.2f} 秒,AI回答<不回复>,跳过微信回复") await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, {"role": "assistant", "content": reply_content}) return # 昵称替换 replacements = { '{昵称}': wxid_nickname, '{好友昵称}': callback_to_user_nickname } reply_content=replace_placeholders(reply_content, replacements) # 判断图片url img_urls,reply_content=extract_and_replace_image_urls(reply_content) if img_urls: await request.app.state.gewe_service.post_text_async(token_id, app_id, callback_to_user, reply_content) await asyncio.sleep(random.uniform(1.5, 3)) for img_url in img_urls: await request.app.state.gewe_service.post_image_async(token_id, app_id, callback_to_user, img_url) await asyncio.sleep(random.uniform(1.5, 3)) # 判断视频url video_urls,reply_content=extract_and_replace_video_urls(reply_content) if video_urls: await request.app.state.gewe_service.post_text_async(token_id, app_id, callback_to_user, reply_content) await asyncio.sleep(random.uniform(1.5, 3)) for video_url in video_urls: parsed_url = urlparse(video_url) filename = os.path.basename(parsed_url.path) tmp_file_path = os.path.join(os.getcwd(),'tmp', filename) # 拼接完整路径 thumbnail_path=tmp_file_path.replace('.mp4','.jpg') video_thumb_url,video_duration =download_video_and_get_thumbnail(video_url,thumbnail_path) logger.info(f'{wxid} 视频缩略图 {video_thumb_url} 时长 {video_duration}') ret,ret_msg,res = await request.app.state.gewe_service.post_video_async(token_id, app_id, callback_to_user, video_url,video_thumb_url,video_duration) if ret!=200: logger.warning(f'{wxid} 发送视频{video_url} 到 {callback_to_user} 失败,{ret_msg}') await asyncio.sleep(random.uniform(1.5, 3)) # 发送AI微信回复 await request.app.state.gewe_service.post_text_async(token_id, app_id, callback_to_user, reply_content) await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, {"role": "assistant", "content": reply_content}) # 回复的对话 input_wx_content_dialogue_message = [{"type": "text", "text": reply_content}] input_message = dialogue_message(wxid, callback_to_user, input_wx_content_dialogue_message, True) await request.app.state.kafka_service.send_message_async(input_message) logger.info("发送对话 %s", input_message) end_time = time.time() # 记录任务结束时间 execution_time = end_time - start_time # 计算执行时间 logger.info(f"AI回答任务完成,耗时 {execution_time:.2f} 秒") async def handle_text_group_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid): ''' 群聊文本消息 ''' msg_content=msg_data["Content"]["string"] msg_push_content=msg_data.get("PushContent") k,login_info=await request.app.state.gewe_service.get_login_info_by_app_id_async(app_id) nickname=login_info.get("nickName") config=await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wxid) validated_config = AgentConfig.model_validate(config) if not validated_config.privateGroupChatEnabled: logger.warning(f"微信号 {wxid} 关闭好友和群自动回复功能,{handle_text_group_async.__name__} 不回复消息") return if wxid == from_wxid: #手动发送消息 logger.info("Active message sending detected") await request.app.state.gewe_service.save_contacts_brief_to_cache_async(token_id,app_id,wxid,[to_wxid]) callback_to_user=msg_data["ToUserName"]["string"] input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}] input_message=dialogue_message(from_wxid,to_wxid,input_wx_content_dialogue_message) await request.app.state.kafka_service.send_message_async(input_message) logger.info("发送对话 %s",input_message) else: c = await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wxid) chatroom_id_white_list = c.get("chatroomIdWhiteList", []) if not chatroom_id_white_list: logger.info('白名单为空或未定义,不处理') return if from_wxid not in chatroom_id_white_list: logger.info(f'群ID {from_wxid} 不在白名单中,不处理') return if '在群聊中@了你' in msg_push_content or '@'+nickname in msg_push_content: callback_to_user=msg_data["FromUserName"]["string"] hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}' prompt={"role": "user", "content": [{ "type": "text", "text": msg_content }]} messages_to_send=await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, prompt) # 收到的对话 input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}] input_message=dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message) await request.app.state.kafka_service.send_message_async(input_message) logger.info("发送对话 %s",input_message) cache_data = USER_INTERACTIVE_CACHE.get(wxid) if cache_data and cache_data.get('interactive') : o=get_first_char_if_digit(msg_content) if o is not None: userSelectOptions=cache_data.get('options') if o < len(userSelectOptions): o=o-1 msg_content=userSelectOptions[o].get("value") messages_to_send=[{"role": "user", "content": msg_content}] else: messages_to_send=[{"role": "user", "content": msg_content}] else: messages_to_send=[{"role": "user", "content": msg_content}] res=await gpt_client_async(request,messages_to_send,wxid,callback_to_user) reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"]) description = '' userSelectOptions = [] if isinstance(reply_content, list) and any(item.get("type") == "interactive" for item in reply_content): for item in reply_content: if item["type"] == "interactive" and item["interactive"]["type"] == "userSelect": params = item["interactive"]["params"] description = params.get("description") userSelectOptions = params.get("userSelectOptions", []) values_string = "\n".join(option["value"] for option in userSelectOptions) if description is not None: USER_INTERACTIVE_CACHE[wxid] = { "interactive":True, "options": userSelectOptions, } reply_content=description + '------------------------------\n'+values_string elif isinstance(reply_content, list) and any(item.get("type") == "text" for item in reply_content): USER_INTERACTIVE_CACHE[wxid] = { "interactive":False } text='' for item in reply_content: if item["type"] == "text": text=item["text"]["content"] if text=='': # 去除上次上一轮对话再次请求 cache_messages_str=await request.app.state.redis_service.get_hash_field(hash_key,"data") cache_messages = json.loads(cache_messages_str) if cache_messages_str else [] if len(cache_messages) >= 3: cache_messages = cache_messages[:-3] await request.app.state.redis_service.update_hash_field(hash_key,"data",json.dumps(cache_messages,ensure_ascii=False)) messages_to_send=await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, prompt) res=await gpt_client_async(request,messages_to_send,wxid,callback_to_user) reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"]) else: reply_content=text else: USER_INTERACTIVE_CACHE[wxid] = { "interactive":False } reply_content=res["choices"][0]["message"]["content"] reply_content='@'+extract_nickname(msg_push_content) + reply_content await request.app.state.gewe_service.post_text_async(token_id,app_id,callback_to_user,reply_content) await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, {"role": "assistant", "content": reply_content}) # 回复的对话 reply_content=f'{wxid}:\n'+ reply_content input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}] input_message=dialogue_message(wxid,callback_to_user,input_wx_content_dialogue_message,True) await request.app.state.kafka_service.send_message_async(input_message) logger.info("发送对话 %s",input_message) else: logger.info('群聊公开消息') callback_to_user=msg_data["FromUserName"]["string"] group_dialogue_message=[{"type": "text", "text": msg_content}] input_message=dialogue_message(callback_to_user,wxid,group_dialogue_message) await request.app.state.kafka_service.send_message_async(input_message) logger.info("发送对话 %s",input_message) return async def handle_image_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid): ''' 私聊图片消息 ''' msg_content=msg_data["Content"]["string"] callback_to_user=from_wxid hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}' wx_img_url= await request.app.state.gewe_service.download_image_msg_async(token_id,app_id,msg_content) oss_access_key_id="LTAI5tRTG6pLhTpKACJYoPR5" oss_access_key_secret="E7dMzeeMxq4VQvLg7Tq7uKf3XWpYfN" oss_endpoint="http://oss-cn-shanghai.aliyuncs.com" oss_bucket_name="cow-agent" oss_prefix="cow" img_url=upload_oss(oss_access_key_id, oss_access_key_secret, oss_endpoint, oss_bucket_name, wx_img_url, oss_prefix) prompt={ "role": "user", "content": [{ "type": "image_url", "image_url": {"url": img_url} }] } await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, prompt) await request.app.state.gewe_service.post_text_async(token_id,app_id,callback_to_user,'已经上传了图片,有什么可以为您服务') logger.info(f"上传图片 URL: {img_url}") wx_content_dialogue_message=[{"type": "image_url", "image_url": {"url": img_url}}] input_message=dialogue_message(wxid,callback_to_user,wx_content_dialogue_message) await request.app.state.kafka_service.send_message_async(input_message) logger.info("发送对话 %s",input_message) async def handle_image_group_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid): logger.info('群聊图片消息') msg_content=msg_data["Content"]["string"] callback_to_user=msg_data["FromUserName"]["string"] aeskey = re.search(r'aeskey="([^"]+)"', msg_content).group(1) cdnthumburl = re.search(r'cdnthumburl="([^"]+)"', msg_content).group(1) md5 = re.search(r'md5="([^"]+)"', msg_content).group(1) cdnthumblength = re.search(r'cdnthumblength="([^"]+)"', msg_content).group(1) cdnthumbheight = re.search(r'cdnthumbheight="([^"]+)"', msg_content).group(1) cdnthumbwidth = re.search(r'cdnthumbwidth="([^"]+)"', msg_content).group(1) length = re.search(r'length="([^"]+)"', msg_content).group(1) img_xml=f'\n\n\t\n\t\n\t\n' wx_img_url= await request.app.state.gewe_service.download_image_msg_async(token_id,app_id,img_xml) oss_url=wx_img_url_to_oss_url(wx_img_url) reply_content = re.sub(r'<\?xml.*', f'{oss_url}', msg_content, flags=re.DOTALL) #reply_content=f'{wxid}:\n{oss_url}' input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}] input_message=dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message,False) await request.app.state.kafka_service.send_message_async(input_message) logger.info("发送对话 %s",input_message) async def handle_voice_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid): ''' 私聊语音消息 ''' callback_to_user=from_wxid msg_content=msg_data["Content"]["string"] msg_id=msg_data["MsgId"] config=await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wxid) validated_config = AgentConfig.model_validate(config) if not validated_config.privateGroupChatEnabled: logger.warning(f"微信号 {wxid} 关闭好友和群自动回复功能,{handle_voice_async.__name__} 不回复消息") return file_url=await request.app.state.gewe_service.download_audio_msg_async(token_id,app_id,msg_id,msg_content) react_silk_path=await save_to_local_from_url_async(file_url) react_wav_path = os.path.splitext(react_silk_path)[0] + ".wav" audio_convert.any_to_wav(react_silk_path,react_wav_path) react_voice_text=AliVoice().voiceToText(react_wav_path) os.remove(react_silk_path) os.remove(react_wav_path) hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}' messages=await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, {"role": "user", "content": react_voice_text}) ai_res=await gpt_client_async(request,messages,wxid,callback_to_user) ai_res_content=remove_markdown_symbol(ai_res["choices"][0]["message"]["content"]) has_url=contains_url(ai_res_content) if not has_url: voice_during,voice_url=wx_voice(ai_res_content) if voice_during < 60 * 1000: ret,ret_msg,res=await request.app.state.gewe_service.post_voice_async(token_id,app_id,callback_to_user,voice_url,voice_during) else: ret,ret_msg,res=await request.app.state.gewe_service.post_text_async(token_id,app_id,callback_to_user,ai_res_content) logger.warning(f'回应语音消息长度 {voice_during/1000}秒,超过60秒,转为文本回复') if ret==200: logger.info((f'{wxid} 向 {callback_to_user} 发送语音文本【{ai_res_content}】{ret_msg}')) else: logger.warning((f'{wxid} 向 {callback_to_user} 发送语音文本【{ai_res_content}】{ret_msg}')) ret,ret_msg,res==await request.app.state.gewe_service.post_text_async(token_id,app_id,callback_to_user,ai_res_content) logger.info((f'{wxid} 向 {callback_to_user} 发送文本【{ai_res_content}】{ret_msg}')) else: logger.info(f"回复内容包含网址,不发送语音,回复文字内容:{ai_res_content}") ret,ret_msg,res=await request.app.state.gewe_service.post_text_async(token_id,app_id,callback_to_user,ai_res_content) await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, {"role": "assistant", "content": ai_res_content}) # 构造对话消息并发送到 Kafka input_wx_content_dialogue_message = [{"type": "text", "text": ai_res_content}] input_message = dialogue_message(wxid, callback_to_user, input_wx_content_dialogue_message,True) await request.app.state.kafka_service.send_message_async(input_message) logger.info("发送对话 %s", input_message) async def handle_voice_group_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid): logger.info('群聊语音消息') async def handle_name_card_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid): logger.info('名片消息') try: msg_content_xml=msg_data["Content"]["string"] # 解析XML字符串 root = ET.fromstring(msg_content_xml) # 提取alias属性 alias_value = root.get("alias") # 加好友资料 scene = int(root.get("scene")) v3 = root.get("username") v4 = root.get("antispamticket") logger.info(f"alias_value: {alias_value}, scene: {scene}, v3: {v3}, v4: {v4}") # 判断appid 是否已经创建3天 k, login_info = await request.app.state.gewe_service.get_login_info_by_wxid_async(wxid) creation_timestamp=int(login_info.get('create_at',time.time())) current_timestamp = time.time() three_days_seconds = 3 * 24 * 60 * 60 # 三天的秒数 diff_flag=(current_timestamp - creation_timestamp) >= three_days_seconds if not diff_flag: log_content=f'名片添加好友功能,{wxid} 用户创建不够三天,不能使用该功能' logger.warning(log_content) return # 将加好友资料添加到待加好友队列 #gewe_chat.wxchat.enqueue_to_add_contacts(wxid,scene,v3,v4) _,loginfo=await request.app.state.gewe_service.get_login_info_by_wxid_async(wxid) nickname=loginfo.get('nickName') add_contact_content=f'您好,我是{nickname}' #gewe_chat.wxchat.add_contacts(token_id,app_id,scene,Models.OperationType.ADD_FRIEND,v3,v4,add_contact_content) await request.app.state.gewe_service.add_contacts_async(token_id,app_id,scene,OperationType.ADD_FRIEND.value,v3,v4,add_contact_content) except ET.ParseError as e: logger.error(f"XML解析错误: {e}") except KeyError as e: logger.error(f"字典键错误: {e}") except Exception as e: logger.error(f"未知错误: {e}") async def handle_xml_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid): ''' 处理xml ''' try: msg_content_xml=msg_data["Content"]["string"] root = ET.fromstring(msg_content_xml) type_value = int(root.find(".//appmsg/type").text) handlers = { 57: handle_xml_reference_async, 5: handle_xml_invite_group_async } handler = handlers.get(type_value) if handler: return await handler(request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid) # elif "邀请你加入了群聊" in msg_content_xml: # 邀请加入群聊 # logger.warning(f"xml消息 {type_value} 邀请你加入了群聊.todo") else: print(f"xml消息 {type_value} 未解析") except ET.ParseError as e: logger.error(f"解析XML失败: {e}") except Exception as e: logger.error(f"未知错误: {e}") return async def handle_xml_reference_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid): ''' 引用消息 判断此类消息的逻辑:$.Data.MsgType=49 并且 解析$.Data.Content.string中的xml msg.appmsg.type=57 ''' callback_to_user=from_wxid hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}' msg_content= msg_data["PushContent"] prompt={"role": "user", "content": [{ "type": "text", "text": msg_content }]} # 收到的对话 messages_to_send=await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, prompt) input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}] input_message=dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message) await request.app.state.kafka_service.send_message_async(input_message) logger.info("发送对话 %s",input_message) # 回复的对话 res=await gpt_client_async(request,messages_to_send,wxid,callback_to_user) reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"]) input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}] input_message=dialogue_message(wxid,callback_to_user,input_wx_content_dialogue_message,True) await request.app.state.kafka_service.kafka_client.produce_message(input_message) logger.info("发送对话 %s",input_message) await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, {"role": "assistant", "content": reply_content}) await request.app.state.gewe_service.post_text_async(token_id,app_id,callback_to_user,reply_content) async def handle_xml_invite_group_async (request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid): ''' 群聊邀请 判断此类消息的逻辑:$.Data.MsgType=49 并且 解析$.Data.Content.string中的xml msg.appmsg.title=邀请你加入群聊(根据手机设置的系统语言title会有调整,不同语言关键字不同) ''' logger.info(f'{wxid} 群聊邀请') msg_content_xml=msg_data["Content"]["string"] root = ET.fromstring(msg_content_xml) title_value = root.find(".//appmsg/title").text if '邀请你加入群聊' in title_value: invite_url = root.find('.//url').text ret,msg,data=await request.app.state.gewe_service.agree_join_room_async(token_id,app_id,invite_url) if ret==200: logger.info(f'群聊邀请,同意加入群聊 {msg} {data}') chatroom_id=data.get('chatroomId','') # if not chatroom_id: # logger.warning(f'群聊邀请,同意加入群聊失败 {msg} {data}') # return ret,msg,data=await request.app.state.gewe_service.save_contract_list_async(token_id,app_id,chatroom_id,3) logger.info(f'群聊邀请,保存到通讯录 chatroom_id {chatroom_id} {msg}') await request.app.state.gewe_service.update_group_info_to_cache_async(token_id,app_id,wxid,chatroom_id) await request.app.state.gewe_service.update_group_members_to_cache_async(token_id,app_id,wxid,chatroom_id) # # 单个群信息推送到kafka # group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id) # k_message=wx_mod_group_info_members_message(wxid,group_info_members) # 全量群信息推送到kafka #all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid) #k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers) k_message=wx_groups_info_members_key_message(wxid) await request.app.state.kafka_service.send_message_async(k_message) else: logger.warning(f'群聊邀请,同意加入群聊失败 {msg} {data}') async def handle_add_friend_notice_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid): ''' 好友添加请求通知 ''' logger.info('好友添加请求通知') try: msg_content_xml=msg_data["Content"]["string"] root = ET.fromstring(msg_content_xml) msg_content = root.attrib.get('content', None) v3= root.attrib.get('encryptusername', None) v4= root.attrib.get('ticket', None) scene=root.attrib.get('scene', None) to_contact_wxid=root.attrib.get('fromusername', None) wxid=msg_data["ToUserName"]["string"] # 自动同意好友 # print(v3) # print(v4) # print(scene) # print(msg_content) # 操作类型,2添加好友 3同意好友 4拒绝好友 #option=2 option=3 reply_add_contact_contact="亲,我是你的好友" ret,ret_msg=await request.app.state.gewe_service.add_contacts_async(token_id,app_id,scene,option,v3,v4,reply_add_contact_contact) if ret==200: logger.info('自动添加好友成功') # 好友发送的文字 hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{to_contact_wxid}' prompt={"role": "user", "content": [{"type": "text","text": msg_content}]} messages_to_send=await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, prompt) input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}] input_message=dialogue_message(to_contact_wxid,wxid,input_wx_content_dialogue_message) await request.app.state.gewe_service.send_message_async(input_message) logger.info("发送对话 %s",input_message) callback_to_user=to_contact_wxid res=await gpt_client_async(messages_to_send,wxid,callback_to_user) reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"]) #保存好友信息 await request.app.state.gewe_service.save_contacts_brief_to_cache_async(token_id,app_id, wxid,[to_contact_wxid]) # 保存到缓存 await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, {"role": "assistant", "content": reply_content}) # 发送信息 await request.app.state.gewe_service.post_text_async(token_id,app_id, to_contact_wxid,reply_content) # 发送到kafka input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}] input_message=dialogue_message(wxid,to_contact_wxid,input_wx_content_dialogue_message,True) request.app.state.kafka_service.send_message_async(input_message) logger.info("发送对话 %s",input_message) else: logger.warning("添加好友失败") except ET.ParseError as e: logger.error(f"解析XML失败: {e}") except Exception as e: logger.error(f"未知错误: {e}") return async def handle_10002_msg_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid): ''' 群聊邀请 撤回消息 拍一拍消息 地理位置 踢出群聊通知 解散群聊通知 发布群公告 ''' try: msg_content_xml=msg_data["Content"]["string"] # 群聊邀请 if '邀请你加入了群聊' in msg_content_xml and check_chatroom(msg_data["FromUserName"]["string"]): chatroom_id=msg_data["FromUserName"]["string"] ret,msg,data=await request.app.state.gewe_service.save_contract_list_async(token_id,app_id,chatroom_id,3) logger.info(f'群聊邀请,保存到通讯录 chatroom_id {chatroom_id} {msg}') await request.app.state.gewe_service.update_group_info_to_cache_async(token_id,app_id,wxid,chatroom_id) await request.app.state.gewe_service.update_group_members_to_cache_async(token_id,app_id,wxid,chatroom_id) # #推送群资料到kafka # group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id) # k_message=wx_mod_group_info_members_message(wxid,group_info_members) # await request.app.state.kafka_service.send_message_async(k_message) # 全量群信息推送到kafka #all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid) #k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers) k_message=wx_groups_info_members_key_message(wxid) await request.app.state.kafka_service.send_message_async(k_message) if '移出了群聊' in msg_content_xml and 'sysmsgtemplate' in msg_content_xml : chatroom_id=msg_data["FromUserName"]["string"] ret,msg,data=await request.app.state.gewe_service.save_contract_list_async(token_id,app_id,chatroom_id,2) logger.info(f'移出群聊,移除从通讯录 chatroom_id {chatroom_id} {msg}') await request.app.state.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id) await request.app.state.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_MEMBERS:{wxid}',chatroom_id) logger.info(f'清除 chatroom_id{chatroom_id} 数据') # 推送删除群资料到kafka k_message=wx_del_group_message(wxid,chatroom_id) await request.app.state.kafka_service.send_message_async(k_message) if '已解散该群聊' in msg_content_xml and 'sysmsgtemplate' in msg_content_xml : chatroom_id=msg_data["FromUserName"]["string"] ret,msg,data=await request.app.state.gewe_service.save_contract_list_async(token_id,app_id,chatroom_id,2) logger.info(f'解散群聊,移除从通讯录 chatroom_id {chatroom_id} {msg}') await request.app.state.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id) await request.app.state.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_MEMBERS:{wxid}',chatroom_id) logger.info(f'清除 chatroom_id{chatroom_id} 数据') # 推送删除群资料到kafka k_message=wx_del_group_message(wxid,chatroom_id) await request.app.state.kafka_service.send_message_async(k_message) if 'mmchatroombarannouncememt' in msg_content_xml : chatroom_id=msg_data["FromUserName"]["string"] logger.info(f'发布群公告 chatroom_id {chatroom_id}') await request.app.state.gewe_service.update_group_info_to_cache_async(token_id,app_id,wxid,chatroom_id) await request.app.state.gewe_service.update_group_members_to_cache_async(token_id,app_id,wxid,chatroom_id) # #推送群资料到kafka # group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id) # k_message=wx_mod_group_info_members_message(wxid,group_info_members) # await request.app.state.kafka_service.send_message_async(k_message)、 # 全量群信息推送到kafka #all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid) #k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers) k_message=wx_groups_info_members_key_message(wxid) await request.app.state.kafka_service.send_message_async(k_message) if 'roomtoolstips' in msg_content_xml : chatroom_id=msg_data["FromUserName"]["string"] logger.info(f'群待办 chatroom_id {chatroom_id} ') await request.app.state.gewe_service.update_group_info_to_cache_async(token_id,app_id,wxid,chatroom_id) await request.app.state.gewe_service.update_group_members_to_cache_async(token_id,app_id,wxid,chatroom_id) # #推送群资料到kafka # group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id) # k_message=wx_mod_group_info_members_message(wxid,group_info_members) # await request.app.state.kafka_service.send_message_async(k_message) # 全量群信息推送到kafka #all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid) #k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers) k_message=wx_groups_info_members_key_message(wxid) await request.app.state.kafka_service.send_message_async(k_message) except ET.ParseError as e: logger.error(f"解析XML失败: {e}") except Exception as e: logger.error(f"未知错误: {e}") return async def handle_10000_msg_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid): ''' 修改群名称 更换群主通知 被移除群聊通知 ''' content=msg_data.get("Content","").get("string","") if '修改群名' or '新群主' or '被移除群聊通知' in content and check_chatroom(from_wxid): chatroom_id=from_wxid logger.info(f'{content} chatroom_id {chatroom_id} ') await request.app.state.gewe_service.update_group_info_to_cache_async(token_id,app_id,wxid,chatroom_id) await request.app.state.gewe_service.update_group_members_to_cache_async(token_id,app_id,wxid,chatroom_id) # group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id) # k_message=wx_mod_group_info_members_message(wxid,group_info_members) # await request.app.state.kafka_service.send_message_async(k_message) # 全量群信息推送到kafka #all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid) #k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers) k_message=wx_groups_info_members_key_message(wxid) await request.app.state.kafka_service.send_message_async(k_message) return