diff --git a/app/endpoints/pipeline_endpoint.py b/app/endpoints/pipeline_endpoint.py index dd54d42..803e44c 100644 --- a/app/endpoints/pipeline_endpoint.py +++ b/app/endpoints/pipeline_endpoint.py @@ -42,7 +42,8 @@ async def get_chatroominfo(request: Request, body: Dict[str, Any]): config=await request.app.state.redis_service.get_hash(f"__AI_OPS_WX__:WXCHAT_CONFIG") wxids=config.keys() WX_BACKLIST.extend(wxids) - if from_wxid in WX_BACKLIST: + # 公众号ID已gh_开头 + if from_wxid in WX_BACKLIST or 'gh_' in from_wxid: logger.warning(f'微信ID {wxid} 在黑名单,不处理') return {"message": "收到微信回调消息"} @@ -135,7 +136,7 @@ async def gpt_client_async(request,messages: list, wixd: str, friend_wxid: str): logger.info("[CHATGPT] 响应={}".format(json.dumps(response_data, separators=(',', ':'), ensure_ascii=False))) return response_data except aiohttp.ClientError as e: - logger.error(f"请求失败: {e}") + logger.error(f"[CHATGPT] 请求失败: {e}") raise async def handle_add_messages_async(request: Request,token_id,msg,wxid): @@ -159,7 +160,8 @@ async def handle_add_messages_async(request: Request,token_id,msg,wxid): 42: handle_name_card_async, 49: handle_xml_async, 37: handle_add_friend_notice_async, - 10002: handle_10002_msg + 10002: handle_10002_msg_async, + 10000: handle_10000_msg_async } # (扫码进群情况)判断受否是群聊,并添加到通信录 @@ -191,55 +193,67 @@ async def handle_mod_contacts_async(request: Request,token_id,msg,wxid): #handle_mod_contacts(token_id,app_id,wxid,msg_data) # - loop = asyncio.get_event_loop() - future = asyncio.run_coroutine_threadsafe( - handle_mod_contacts_worker_async(request,token_id,app_id,wxid,msg_data), - loop - ) - - contact_wxid =msg_data.get("UserName",{}).get("string","") #msg_data["UserName"]["string"] - nickname= msg_data.get("NickName",{}).get("string","")#msg_data["NickName"]["string"] - city=msg_data.get("City",None) - signature=msg_data.get("Signature",None) - province=msg_data.get("Province",None) - bigHeadImgUrl=msg_data.get("SnsUserInfo",{}).get("SnsBgimgId",None) #msg_data["SnsUserInfo"]["SnsBgimgId"] - country=msg_data.get("Country",None) - sex=msg_data.get("Sex",None) - pyInitial= msg_data.get("PyInitial",{}).get("string",None)#msg_data["PyInitial"]["string"] - quanPin=msg_data.get("QuanPin",{}).get("string",None) #msg_data["QuanPin"]["string"] - remark=msg_data.get("Remark",{}).get("string",None) - remarkPyInitial=msg_data.get("RemarkPyInitial",{}).get("string",None) - remarkQuanPin=msg_data.get("RemarkQuanPin",{}).get("string",None) - smallHeadImgUrl=msg_data.get("smallHeadImgUrl",None) - - # data=gewe_chat.wxchat.get_brief_info(token_id,app_id,[contact_wxid]) - # contact=data[0] - # alias=contact.get("alias") - #推动到kafka - contact_data = { - "alias": None, - "bigHeadImgUrl": bigHeadImgUrl, - "cardImgUrl": None, - "city": city, - "country": country, - "description": None, - "labelList": None, - "nickName": nickname, - "phoneNumList": None, - "province": province, - "pyInitial": pyInitial, - "quanPin": quanPin, - "remark": remark, - "remarkPyInitial": remarkPyInitial, - "remarkQuanPin": remarkQuanPin, - "sex": sex, - "signature": signature, - "smallHeadImgUrl": smallHeadImgUrl, - "snsBgImg": None, - "userName": contact_wxid - } - input_message=wx_mod_contact_message(wxid,contact_data) - await request.app.state.kafka_service.send_message_async(input_message) + user_name=msg_data.get("UserName",{}).get("string","") + + if not check_chatroom(user_name): + loop = asyncio.get_event_loop() + future = asyncio.run_coroutine_threadsafe( + handle_mod_contacts_worker_async(request,token_id,app_id,wxid,msg_data), + loop + ) + + contact_wxid =msg_data.get("UserName",{}).get("string","") #msg_data["UserName"]["string"] + nickname= msg_data.get("NickName",{}).get("string","")#msg_data["NickName"]["string"] + city=msg_data.get("City",None) + signature=msg_data.get("Signature",None) + province=msg_data.get("Province",None) + bigHeadImgUrl=msg_data.get("SnsUserInfo",{}).get("SnsBgimgId",None) #msg_data["SnsUserInfo"]["SnsBgimgId"] + country=msg_data.get("Country",None) + sex=msg_data.get("Sex",None) + pyInitial= msg_data.get("PyInitial",{}).get("string",None)#msg_data["PyInitial"]["string"] + quanPin=msg_data.get("QuanPin",{}).get("string",None) #msg_data["QuanPin"]["string"] + remark=msg_data.get("Remark",{}).get("string",None) + remarkPyInitial=msg_data.get("RemarkPyInitial",{}).get("string",None) + remarkQuanPin=msg_data.get("RemarkQuanPin",{}).get("string",None) + smallHeadImgUrl=msg_data.get("smallHeadImgUrl",None) + + # data=gewe_chat.wxchat.get_brief_info(token_id,app_id,[contact_wxid]) + # contact=data[0] + # alias=contact.get("alias") + #推动到kafka + contact_data = { + "alias": None, + "bigHeadImgUrl": bigHeadImgUrl, + "cardImgUrl": None, + "city": city, + "country": country, + "description": None, + "labelList": None, + "nickName": nickname, + "phoneNumList": None, + "province": province, + "pyInitial": pyInitial, + "quanPin": quanPin, + "remark": remark, + "remarkPyInitial": remarkPyInitial, + "remarkQuanPin": remarkQuanPin, + "sex": sex, + "signature": signature, + "smallHeadImgUrl": smallHeadImgUrl, + "snsBgImg": None, + "userName": contact_wxid + } + input_message=wx_mod_contact_message(wxid,contact_data) + await request.app.state.kafka_service.send_message_async(input_message) + else: + logger.info('群信息变更通知') + chatroom_id=user_name + await request.app.state.gewe_service.update_group_info_to_cache_async(token_id,app_id,wxid,chatroom_id) + await request.app.state.gewe_service.update_group_members_to_cache_async(token_id,app_id,wxid,chatroom_id) + + group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id) + k_message=wx_mod_group_info_members_message(wxid,group_info_members) + await request.app.state.kafka_service.send_message_async(k_message) async def handle_del_contacts_async(request: Request,token_id,msg,wxid): ''' @@ -358,6 +372,7 @@ async def check_timeout_async(task: asyncio.Task, request: Request,token_id, wxi await request.app.state.gewe_service.post_text_async(token_id, app_id, callback_to_user, "亲,我正在组织回复的信息,请稍等一会") async def ai_chat_text_async(request: Request,token_id, app_id, wxid, msg_data, msg_content): + start_time = time.time() # 记录任务开始时间 callback_to_user = msg_data["FromUserName"]["string"] hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}' @@ -433,8 +448,11 @@ async def ai_chat_text_async(request: Request,token_id, app_id, wxid, msg_data, "interactive": False } reply_content = remove_markdown_symbol(res["choices"][0]["message"]["content"]) + + if callback_to_user not in 'wx_nBGqh6_Rw6pW8KXg0AudW': + logger.info(f'{callback_to_user} 不发送到微信有关{msg_content}的AI回复到微信') + await request.app.state.gewe_service.post_text_async(token_id, app_id, callback_to_user, reply_content) - await request.app.state.gewe_service.post_text_async(token_id, app_id, callback_to_user, reply_content) await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, {"role": "assistant", "content": reply_content}) # 回复的对话 input_wx_content_dialogue_message = [{"type": "text", "text": reply_content}] @@ -871,7 +889,7 @@ async def handle_add_friend_notice_async(request: Request,token_id,app_id, wxid, logger.error(f"未知错误: {e}") return -async def handle_10002_msg(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid): +async def handle_10002_msg_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid): ''' 群聊邀请 @@ -901,7 +919,7 @@ async def handle_10002_msg(request: Request,token_id,app_id, wxid,msg_data,from_ if '移出了群聊' in msg_content_xml and 'sysmsgtemplate' in msg_content_xml : chatroom_id=msg_data["FromUserName"]["string"] ret,msg,data=await request.app.state.gewe_service.save_contract_list_async(token_id,app_id,chatroom_id,2) - logger.info(f'踢出群聊,移除从通讯录 chatroom_id {chatroom_id} {msg}') + logger.info(f'移出群聊,移除从通讯录 chatroom_id {chatroom_id} {msg}') await request.app.state.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id) await request.app.state.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_MEMBERS:{wxid}',chatroom_id) logger.info(f'清除 chatroom_id{chatroom_id} 数据') @@ -921,6 +939,28 @@ async def handle_10002_msg(request: Request,token_id,app_id, wxid,msg_data,from_ k_message=wx_del_group_message(wxid,chatroom_id) await request.app.state.kafka_service.send_message_async(k_message) + if 'mmchatroombarannouncememt' in msg_content_xml : + chatroom_id=msg_data["FromUserName"]["string"] + logger.info(f'发布群公告 chatroom_id {chatroom_id}') + await request.app.state.gewe_service.update_group_info_to_cache_async(token_id,app_id,wxid,chatroom_id) + await request.app.state.gewe_service.update_group_members_to_cache_async(token_id,app_id,wxid,chatroom_id) + + #推送群资料到kafka + group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id) + k_message=wx_mod_group_info_members_message(wxid,group_info_members) + await request.app.state.kafka_service.send_message_async(k_message) + + if 'roomtoolstips' in msg_content_xml : + chatroom_id=msg_data["FromUserName"]["string"] + logger.info(f'群待办 chatroom_id {chatroom_id} ') + await request.app.state.gewe_service.update_group_info_to_cache_async(token_id,app_id,wxid,chatroom_id) + await request.app.state.gewe_service.update_group_members_to_cache_async(token_id,app_id,wxid,chatroom_id) + + #推送群资料到kafka + group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id) + k_message=wx_mod_group_info_members_message(wxid,group_info_members) + await request.app.state.kafka_service.send_message_async(k_message) + except ET.ParseError as e: logger.error(f"解析XML失败: {e}") except Exception as e: @@ -929,6 +969,24 @@ async def handle_10002_msg(request: Request,token_id,app_id, wxid,msg_data,from_ +async def handle_10000_msg_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid): + ''' + 修改群名称 + 更换群主通知 + 被移除群聊通知 + ''' + content=msg_data.get("Content","").get("string","") + + if '修改群名' or '新群主' or '被移除群聊通知' in content and check_chatroom(from_wxid): + logger.info(f'{content} chatroom_id {chatroom_id} ') + chatroom_id=from_wxid + await request.app.state.gewe_service.update_group_info_to_cache_async(token_id,app_id,wxid,chatroom_id) + await request.app.state.gewe_service.update_group_members_to_cache_async(token_id,app_id,wxid,chatroom_id) + + group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id) + k_message=wx_mod_group_info_members_message(wxid,group_info_members) + await request.app.state.kafka_service.send_message_async(k_message) + return