From a326383da7c85baa2de39348fb8937007a4cb3eb Mon Sep 17 00:00:00 2001 From: H Vs Date: Mon, 7 Apr 2025 14:21:24 +0800 Subject: [PATCH] =?UTF-8?q?=E6=A0=87=E7=AD=BE=E8=AE=BE=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/endpoints/label_endpoint.py | 99 ++++++++++++++++++++---- app/endpoints/pipeline_endpoint.py | 1 + celery_app.py | 2 +- services/gewe_service.py | 118 ++++++++++++++++++++++++++++- 4 files changed, 202 insertions(+), 18 deletions(-) diff --git a/app/endpoints/label_endpoint.py b/app/endpoints/label_endpoint.py index c7124b7..d5c7f04 100644 --- a/app/endpoints/label_endpoint.py +++ b/app/endpoints/label_endpoint.py @@ -32,36 +32,109 @@ class LabelModifyRequest(BaseModel): async def add_label(request: Request, body: LabelAddRequest): wxid = body.wxid label_name = body.labelName + if not wxid: + return {"code": 400, "message": "wxid 不能为空"} + + k,loginfo=await request.app.state.gewe_service.get_login_info_by_wxid_async(wxid) - return { "labelName":label_name,"labelId":random.randint(1,100)} + if not k: + return {"code":404,"message":f"{wxid} 没有对应的登录信息"} + + login_status=loginfo.get('status','0') + + if login_status != '1': + return {"code": 401, "message": f"{wxid} 已经离线"} + + token_id=loginfo.get('tokenId','') + app_id=loginfo.get('appId','') + + ret,msg,data=await request.app.state.gewe_service.label_add_async(token_id,app_id,label_name) + if ret!=200: + return {'code':ret,'message':msg} + + return { "labelName":data.get('labelName'),"labelId":data.get('labelId')} @label_router.post("/delete", response_model=None) async def delete_label(request: Request, body: LabelDelRequest, ): - wxid = body.wxid - label_ids= body.labelIds + wxid = body.wxid + label_ids= body.labelIds + if not wxid: + return {"code": 400, "message": "wxid 不能为空"} + + k,loginfo=await request.app.state.gewe_service.get_login_info_by_wxid_async(wxid) + + if not k: + return {"code":404,"message":f"{wxid} 没有对应的登录信息"} + + login_status=loginfo.get('status','0') + + if login_status != '1': + return {"code": 401, "message": f"{wxid} 已经离线"} + + token_id=loginfo.get('tokenId','') + app_id=loginfo.get('appId','') + + ret,msg,data=await request.app.state.gewe_service.label_delete_async(token_id,app_id,label_ids) + + if ret!=200: + return {'code':ret,'message':msg} - return {"message":"操作成功"} + return {"message":"操作成功"} @label_router.post("/list", response_model=None) async def list_label(request: Request, body: LabelListRequest, ): wxid = body.wxid + if not wxid: + return {"code": 400, "message": "wxid 不能为空"} + + k,loginfo=await request.app.state.gewe_service.get_login_info_by_wxid_async(wxid) + + if not k: + return {"code":404,"message":f"{wxid} 没有对应的登录信息"} + + login_status=loginfo.get('status','0') + + if login_status != '1': + return {"code": 401, "message": f"{wxid} 已经离线"} + + token_id=loginfo.get('tokenId','') + app_id=loginfo.get('appId','') + + ret,msg,data=await request.app.state.gewe_service.label_list_async(token_id,app_id) + if ret!=200: + return {'code':ret,'message':msg} + + return {"labelList":data.get('labelList',[]) if data!=None else []} - return { - "labelList":[{ - "labelName": "朋友", - "labelId": 1 - },{ - "labelName": "邻居", - "labelId": 2 - }] - } @label_router.post("/modifymembers", response_model=None) async def modifymembers_label(request: Request, body: LabelModifyRequest, ): wxid = body.wxid label_ids= body.labelIds wxids= body.wxIds + + wxid = body.wxid + if not wxid: + return {"code": 400, "message": "wxid 不能为空"} + + k,loginfo=await request.app.state.gewe_service.get_login_info_by_wxid_async(wxid) + + if not k: + return {"code":404,"message":f"{wxid} 没有对应的登录信息"} + + login_status=loginfo.get('status','0') + + if login_status != '1': + return {"code": 401, "message": f"{wxid} 已经离线"} + + token_id=loginfo.get('tokenId','') + app_id=loginfo.get('appId','') + + ret,msg,data=await request.app.state.gewe_service.label_modify_members_async(token_id,app_id,label_ids,wxids) + if ret!=200: + return {'code':ret,'message':msg} + return {"message":"操作成功"} diff --git a/app/endpoints/pipeline_endpoint.py b/app/endpoints/pipeline_endpoint.py index fcf8c8f..5f155f3 100644 --- a/app/endpoints/pipeline_endpoint.py +++ b/app/endpoints/pipeline_endpoint.py @@ -660,6 +660,7 @@ async def handle_text_group_async(request: Request,token_id,app_id, wxid,msg_dat await request.app.state.gewe_service.post_text_async(token_id,app_id,callback_to_user,reply_content) await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, {"role": "assistant", "content": reply_content}) # 回复的对话 + reply_content=f'{wxid}:\n'+ reply_content input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}] input_message=dialogue_message(wxid,callback_to_user,input_wx_content_dialogue_message,True) await request.app.state.kafka_service.send_message_async(input_message) diff --git a/celery_app.py b/celery_app.py index 69b1d35..30fa0db 100644 --- a/celery_app.py +++ b/celery_app.py @@ -36,7 +36,7 @@ elif environment == 'test': scheduled_task_add_contacts_from_chatrooms_interval = 60*11 else: scheduled_task_sync_wx_info_interval = 6000 - scheduled_task_add_contacts_from_chatrooms_interval=10 + scheduled_task_add_contacts_from_chatrooms_interval=6000 # 定义定时任务列表 (任务 ID, 任务名称, 执行间隔秒, 任务参数) diff --git a/services/gewe_service.py b/services/gewe_service.py index f3af170..681159f 100644 --- a/services/gewe_service.py +++ b/services/gewe_service.py @@ -10,6 +10,9 @@ import time,datetime import uuid from fastapi import FastAPI, Depends from common.singleton import singleton + +from aiohttp import ClientError +from json.decoder import JSONDecodeError from common.log import logger from model.models import AddGroupContactsHistory from services.redis_service import RedisService @@ -765,6 +768,116 @@ class GeWeService: response_object = await response.json() return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None) + ############################### 标签模块 ############################### + async def label_add_async(self, token_id, app_id, label_name): + api_url = f"{self.base_url}/v2/api/label/add" + headers = { + 'X-GEWE-TOKEN': token_id, + 'Content-Type': 'application/json' + } + data = { + "appId": app_id, + "labelName": label_name, + } + async with aiohttp.ClientSession() as session: + async with session.post(api_url, headers=headers, json=data) as response: + response_object = await response.json() + return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None) + + async def label_delete_async(self, token_id, app_id, label_ids: list): + api_url = f"{self.base_url}/v2/api/label/delete" + label_ids_str = ','.join(map(str, label_ids)) + headers = { + 'X-GEWE-TOKEN': token_id, + 'Content-Type': 'application/json' + } + data = { + "appId": app_id, + "labelIds": label_ids_str, + } + async with aiohttp.ClientSession() as session: + async with session.post(api_url, headers=headers, json=data) as response: + response_object = await response.json() + return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None) + + # async def label_list_async(self, token_id, app_id): + # api_url = f"{self.base_url}/v2/api/label/list" + # headers = { + # 'X-GEWE-TOKEN': token_id, + # 'Content-Type': 'application/json' + # } + # data = { + # "appId": app_id, + # } + # async with aiohttp.ClientSession() as session: + # async with session.post(api_url, headers=headers, json=data) as response: + # response_object = await response.json() + # return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None) + + async def label_list_async(self, token_id, app_id): + api_url = f"{self.base_url}/v2/api/label/list" + headers = { + 'X-GEWE-TOKEN': token_id, + 'Content-Type': 'application/json' + } + data = { + "appId": app_id, + } + + try: + async with aiohttp.ClientSession() as session: + try: + async with session.post(api_url, headers=headers, json=data) as response: + # 检查响应状态码 + if response.status != 200: + return response.status, f"HTTP Error: {response.status}", None + + try: + response_object = await response.json() + except JSONDecodeError: + return 501, "Invalid JSON response", None + + # 检查返回的数据结构 + if not isinstance(response_object, dict): + return 501, "Invalid response format", None + + # 返回处理后的数据 + return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None) + + except ClientError as e: + return 501, f"Network error: {str(e)}", None + + except Exception as e: + return None, f"Unexpected error: {str(e)}", None + + async def label_modify_members_async(self, token_id, app_id,label_ids: list,wx_ids: list): + ''' + 注意 + 由于好友标签信息存储在用户客户端,因此每次在修改时都需要进行全量修改。举例来说,考虑好友A(wxid_asdfaihp123),该好友已经被标记为标签ID为1和2。 + + 在添加标签ID为3时,传递的参数如下:labelIds:1,2,3,wxIds:[wxid_asdfaihp123]。这表示要给好友A添加标签ID为3,同时保留已有的标签ID 1和2。 + + 而在删除标签ID为1时,传递的参数如下:labelIds:2,3 ,wxIds:[wxid_asdfaihp123]。这表示要将好友A的标签ID 1删除,而保留标签ID 2。 + ''' + + api_url = f"{self.base_url}/v2/api/label/modifyMemberList" + label_ids_str = ','.join(map(str, label_ids)) + + headers = { + 'X-GEWE-TOKEN': token_id, + 'Content-Type': 'application/json' + } + data = { + "appId": app_id, + "wxIds": wx_ids, + "labelIds": label_ids_str + } + async with aiohttp.ClientSession() as session: + async with session.post(api_url, headers=headers, json=data) as response: + response_object = await response.json() + return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None) + + ############################### 其他 ############################### async def save_session_messages_to_cache_async(self, hash_key,item:object)->list: @@ -1105,7 +1218,6 @@ class GeWeService: return "" - async def save_login_wx_captch_code_to_cache_async(self,tel,captch_code): hash_key = f"__AI_OPS_WX__:WXCAPTCHCODE:{tel}" await self.redis_service.set_hash(hash_key,{"data":captch_code},30) @@ -1177,9 +1289,7 @@ class GeWeService: if len(today_list) == 200: return True return False - - - + async def enqueue_to_add_contacts_async(self,wxid,scene:int,v3,v4): """ 入列待添加好友