diff --git a/app/endpoints/pipeline_endpoint.py b/app/endpoints/pipeline_endpoint.py index 709e17b..7fea9c6 100644 --- a/app/endpoints/pipeline_endpoint.py +++ b/app/endpoints/pipeline_endpoint.py @@ -197,55 +197,62 @@ async def handle_mod_contacts_async(request: Request,token_id,msg,wxid): user_name=msg_data.get("UserName",{}).get("string","") if not check_chatroom(user_name): - loop = asyncio.get_event_loop() - future = asyncio.run_coroutine_threadsafe( - handle_mod_contacts_worker_async(request,token_id,app_id,wxid,msg_data), - loop - ) - - contact_wxid =msg_data.get("UserName",{}).get("string","") #msg_data["UserName"]["string"] - nickname= msg_data.get("NickName",{}).get("string","")#msg_data["NickName"]["string"] - city=msg_data.get("City",None) - signature=msg_data.get("Signature",None) - province=msg_data.get("Province",None) - bigHeadImgUrl=msg_data.get("SnsUserInfo",{}).get("SnsBgimgId",None) #msg_data["SnsUserInfo"]["SnsBgimgId"] - country=msg_data.get("Country",None) - sex=msg_data.get("Sex",None) - pyInitial= msg_data.get("PyInitial",{}).get("string",None)#msg_data["PyInitial"]["string"] - quanPin=msg_data.get("QuanPin",{}).get("string",None) #msg_data["QuanPin"]["string"] - remark=msg_data.get("Remark",{}).get("string",None) - remarkPyInitial=msg_data.get("RemarkPyInitial",{}).get("string",None) - remarkQuanPin=msg_data.get("RemarkQuanPin",{}).get("string",None) - smallHeadImgUrl=msg_data.get("smallHeadImgUrl",None) - - # data=gewe_chat.wxchat.get_brief_info(token_id,app_id,[contact_wxid]) - # contact=data[0] - # alias=contact.get("alias") - #推动到kafka - contact_data = { - "alias": None, - "bigHeadImgUrl": bigHeadImgUrl, - "cardImgUrl": None, - "city": city, - "country": country, - "description": None, - "labelList": None, - "nickName": nickname, - "phoneNumList": None, - "province": province, - "pyInitial": pyInitial, - "quanPin": quanPin, - "remark": remark, - "remarkPyInitial": remarkPyInitial, - "remarkQuanPin": remarkQuanPin, - "sex": sex, - "signature": signature, - "smallHeadImgUrl": smallHeadImgUrl, - "snsBgImg": None, - "userName": contact_wxid - } - input_message=wx_mod_contact_message(wxid,contact_data) - await request.app.state.kafka_service.send_message_async(input_message) + + #判断是否好友关系 + is_friends=True + if is_friends: + + loop = asyncio.get_event_loop() + future = asyncio.run_coroutine_threadsafe( + handle_mod_contacts_worker_async(request,token_id,app_id,wxid,msg_data), + loop + ) + + contact_wxid =msg_data.get("UserName",{}).get("string","") #msg_data["UserName"]["string"] + nickname= msg_data.get("NickName",{}).get("string","")#msg_data["NickName"]["string"] + city=msg_data.get("City",None) + signature=msg_data.get("Signature",None) + province=msg_data.get("Province",None) + bigHeadImgUrl=msg_data.get("SnsUserInfo",{}).get("SnsBgimgId",None) #msg_data["SnsUserInfo"]["SnsBgimgId"] + country=msg_data.get("Country",None) + sex=msg_data.get("Sex",None) + pyInitial= msg_data.get("PyInitial",{}).get("string",None)#msg_data["PyInitial"]["string"] + quanPin=msg_data.get("QuanPin",{}).get("string",None) #msg_data["QuanPin"]["string"] + remark=msg_data.get("Remark",{}).get("string",None) + remarkPyInitial=msg_data.get("RemarkPyInitial",{}).get("string",None) + remarkQuanPin=msg_data.get("RemarkQuanPin",{}).get("string",None) + smallHeadImgUrl=msg_data.get("smallHeadImgUrl",None) + #推动到kafka + contact_data = { + "alias": None, + "bigHeadImgUrl": bigHeadImgUrl, + "cardImgUrl": None, + "city": city, + "country": country, + "description": None, + "labelList": None, + "nickName": nickname, + "phoneNumList": None, + "province": province, + "pyInitial": pyInitial, + "quanPin": quanPin, + "remark": remark, + "remarkPyInitial": remarkPyInitial, + "remarkQuanPin": remarkQuanPin, + "sex": sex, + "signature": signature, + "smallHeadImgUrl": smallHeadImgUrl, + "snsBgImg": None, + "userName": contact_wxid + } + input_message=wx_mod_contact_message(wxid,contact_data) + await request.app.state.kafka_service.send_message_async(input_message) + else: + logger.info('删除好友通知') + contact_wxid =msg_data.get("UserName",{}).get("string","") + # 推送到kafka + input_message=wx_del_contact_message(wxid,contact_wxid) + await request.app.state.kafka_service.send_message_async(input_message) else: logger.info('群信息变更通知') chatroom_id=user_name @@ -303,27 +310,7 @@ async def handle_mod_contacts_worker_async(request:Request,token_id,app_id,wxid, logger.info('好友通过验证及好友资料变更的通知消息') if not check_chatroom(msg_data["UserName"]["string"]): contact_wxid = msg_data["UserName"]["string"] - - # 更新好友信息 - # 检查好友关系,不是好友则删除 - # ret,msg,check_relation=gewe_chat.wxchat.check_relation(token_id, app_id,[contact_wxid]) - # first_item = check_relation[0] - # check_relation_status=first_item.get('relation') - # logger.info(f'{wxid} 好友 {contact_wxid} 关系检查:{check_relation_status}') - # if check_relation_status != 0: - # gewe_chat.wxchat.delete_contacts_brief_from_cache(wxid, [contact_wxid]) - # logger.info(f'好友关系异常:{check_relation_status},删除好友 {contact_wxid} 信息') - # else: - # gewe_chat.wxchat.save_contacts_brief_to_cache(token_id, app_id, wxid, [contact_wxid]) - - ret,msg,contacts_list = await request.app.state.gewe_service.fetch_contacts_list_async(token_id, app_id) - # friend_wxids = contacts_list['friends'][3:] # 可以调整截取范围 - # print(friend_wxids) - - #friend_wxids.remove('fmessage') - #friend_wxids.remove('weixin') - friend_wxids = [c for c in contacts_list['friends'] if c not in ['fmessage', 'medianote','weixin','weixingongzhong','tmessage']] # 可以调整截取范围 print(f'{wxid}的好友数量 {len(friend_wxids)}') await request.app.state.gewe_service.save_contacts_brief_to_cache_async(token_id, app_id, wxid, friend_wxids) @@ -440,7 +427,7 @@ async def ai_chat_text_async(request: Request,token_id, app_id, wxid, msg_data, if len(cache_messages) >= 3: cache_messages = cache_messages[:-3] await request.app.state.redis_service.update_hash_field(hash_key, "data", json.dumps(cache_messages, ensure_ascii=False)) - messages_to_send = await request.app.state.redis_service.save_session_messages_to_cache_async(hash_key, prompt) + messages_to_send = await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, prompt) res = await gpt_client_async(request,messages_to_send, wxid, callback_to_user) reply_content = remove_markdown_symbol(res["choices"][0]["message"]["content"]) @@ -792,8 +779,8 @@ async def handle_xml_reference_async(request: Request,token_id,app_id, wxid,msg_ input_message=dialogue_message(wxid,callback_to_user,input_wx_content_dialogue_message,True) await request.app.state.kafka_service.kafka_client.produce_message(input_message) logger.info("发送对话 %s",input_message) - await request.app.state.kafka_service.save_session_messages_to_cache_async(hash_key, {"role": "assistant", "content": reply_content}) - await request.app.state.kafka_service.post_text_async(token_id,app_id,callback_to_user,reply_content) + await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, {"role": "assistant", "content": reply_content}) + await request.app.state.gewe_service.post_text_async(token_id,app_id,callback_to_user,reply_content) async def handle_xml_invite_group_async (request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid): '''