Procházet zdrojové kódy

好友和群自动回复开关

1257
H Vs před 2 týdny
rodič
revize
7648fdd52a
7 změnil soubory, kde provedl 137 přidání a 22 odebrání
  1. +3
    -1
      app/endpoints/agent_endpoint.py
  2. +7
    -2
      app/endpoints/config_endpoint.py
  3. +29
    -3
      app/endpoints/pipeline_endpoint.py
  4. +1
    -0
      model/models.py
  5. +49
    -12
      services/biz_service.py
  6. +41
    -4
      services/gewe_service.py
  7. +7
    -0
      tasks.py

+ 3
- 1
app/endpoints/agent_endpoint.py Zobrazit soubor

@@ -143,7 +143,9 @@ async def waitting_login_result(request: Request, token_id, app_id,region_id, ag
"agentTokenId": agent_token_id,
"agentEnabled": False,
"addContactsFromChatroomIdWhiteList": [],
"chatWaitingMsgEnabled": True
"chatWaitingMsgEnabled": True,
"privateGroupChatEnabled": True

})
else:
login_info.update({"modify_at":int(time.time())})


+ 7
- 2
app/endpoints/config_endpoint.py Zobrazit soubor

@@ -42,8 +42,13 @@ async def get_config(request: Request, body: GetConfigRequest):
return {"code": 401, "message": f"{wxid} 已经离线"}
config=await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wxid)

return config
print(config)
try:
# 使用 Pydantic 严格校验数据类型和结构
validated_config = AgentConfig.model_validate(config)
except ValidationError as e:
return {'code': 407, 'message': e.errors().__str__()}
return validated_config





+ 29
- 3
app/endpoints/pipeline_endpoint.py Zobrazit soubor

@@ -196,10 +196,12 @@ async def gpt_client_async(request, messages: list, wixd: str, friend_wxid: str)
async def handle_add_messages_async(request: Request,token_id,msg,wxid):

wx_config =await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wxid)
# if not bool(wx_config.get("agentEnabled",False)):
# logger.info(f'微信ID {wxid} 未托管,不处理')
# return

if not bool(wx_config.get("agentEnabled",False)):
logger.info(f'微信ID {wxid} 未托管,不处理')
return

app_id = msg.get("Appid")
msg_data = msg.get("Data")
msg_type = msg_data.get("MsgType",None)
@@ -207,6 +209,12 @@ async def handle_add_messages_async(request: Request,token_id,msg,wxid):
to_wxid = msg_data["ToUserName"]["string"]
msg_push_content=msg_data.get("PushContent")

validated_config = AgentConfig.model_validate(wx_config)

if not validated_config.agentEnabled:
logger.info(f'微信ID {wxid} 未托管,不处理 {msg_type}')
return

handlers = {
1: handle_text_async,
3: handle_image_async,
@@ -393,6 +401,12 @@ async def handle_text_async(request: Request,token_id,app_id, wxid,msg_data,from
'''
私聊文本消息
'''
config=await request.aoo.state.gewe_service.get_wxchat_config_from_cache_async(wxid)
validated_config = AgentConfig.model_validate(config)
if not validated_config.privateGroupChatEnabled:
logger.warning(f"微信号 {wxid} 关闭好友和群自动回复功能,{handle_text_async.__name__} 不回复消息")
return
msg_content=msg_data["Content"]["string"]
if wxid == from_wxid: #手动发送消息
logger.info("Active message sending detected")
@@ -561,6 +575,12 @@ async def handle_text_group_async(request: Request,token_id,app_id, wxid,msg_dat
k,login_info=await request.app.state.gewe_service.get_login_info_by_app_id_async(app_id)
nickname=login_info.get("nickName")

config=await request.aoo.state.gewe_service.get_wxchat_config_from_cache_async(wxid)
validated_config = AgentConfig.model_validate(config)
if not validated_config.privateGroupChatEnabled:
logger.warning(f"微信号 {wxid} 关闭好友和群自动回复功能,{handle_text_group_async.__name__} 不回复消息")
return

if wxid == from_wxid: #手动发送消息
logger.info("Active message sending detected")
@@ -750,6 +770,12 @@ async def handle_voice_async(request: Request,token_id,app_id, wxid,msg_data,fro
msg_content=msg_data["Content"]["string"]
msg_id=msg_data["MsgId"]

config=await request.aoo.state.gewe_service.get_wxchat_config_from_cache_async(wxid)
validated_config = AgentConfig.model_validate(config)
if not validated_config.privateGroupChatEnabled:
logger.warning(f"微信号 {wxid} 关闭好友和群自动回复功能,{handle_voice_async.__name__} 不回复消息")
return


file_url=await request.app.state.gewe_service.download_audio_msg_async(token_id,app_id,msg_id,msg_content)
react_silk_path=await save_to_local_from_url_async(file_url)


+ 1
- 0
model/models.py Zobrazit soubor

@@ -15,6 +15,7 @@ class AgentConfig(BaseModel):
agentEnabled: bool
addContactsFromChatroomIdWhiteList: List[str] = []
chatWaitingMsgEnabled: bool
privateGroupChatEnabled: bool=True


@dataclass


+ 49
- 12
services/biz_service.py Zobrazit soubor

@@ -48,20 +48,42 @@ class BizService():
data = content.get("data", {})
msg_type_data = data.get("msg_type", None)
content_data = data.get("content", {})
if msg_type_data=="login":
await self.login_handler_async(content_data)
elif msg_type_data == 'group-sending':
print(f'处理消息类型group-sending')
await self.group_sending_handler_async(content_data)
elif msg_type_data == 'login_wx_captch_code':
pass
else:
print(f'kakfa 未处理息类型 {msg_type_data}')
# if msg_type_data=="login":
# await self.login_handler_async(content_data)
# elif msg_type_data == 'group-sending':
# print(f'处理消息类型group-sending')
# await self.group_sending_handler_async(content_data)
# elif msg_type_data == 'login_wx_captch_code':
# pass
# elif msg_type_data == 'sns-sendtext-forward':
# pass
# elif msg_type_data == 'sns-sendimages-forward':
# pass
# elif msg_type_data == 'sns-sendvideo-forward':
# pass
# else:
# print(f'kakfa 未处理息类型 {msg_type_data}')
match msg_type_data:
case 'login':
await self.login_handler_async(content_data)
case 'group-sending':
await self.group_sending_handler_async(content_data)
case 'sns-sendtext-forward':
await self.sns_sendtext_forward_handler_async(content_data)
case 'sns-sendimages-forward':
await self.sns_sendimages_forward_handler_async(content_data)
case 'sns-sendvideo-forward':
await self.sns_sendvideo_forward_handler_async(content_data)
case _:
logger.warning(f'kakfa 未处理息类型 {msg_type_data}')

except Exception as e:
print(f"处理消息时发生错误: {e}, 消息内容: {message}")

async def login_handler_async(self, content_data: dict):
tel=content_data.get('tel', '18733438393')
token_id=content_data.get('token_id', 'c50b7d57-2efa-4a53-8c11-104a06d1e1fa')
region_id=content_data.get('region_id', '440000')
@@ -126,7 +148,6 @@ class BizService():
elif wx_content["type"] == "file":
await self.send_file_message(token_id, app_id, agent_wxid, [intersection_wxid], wx_content.get("file_url", {}).get("url"))

async def send_text_message_async(self, token_id, app_id, agent_wxid, intersection_wxids, text):
for t in intersection_wxids:
# 发送文本消息
@@ -212,7 +233,6 @@ class BizService():
# 等待随机时间
await asyncio.sleep(random.uniform(1.5, 3))


async def send_file_message(self,token_id, app_id, agent_wxid, intersection_wxids, file_url):
parsed_url = urlparse(file_url)
path = parsed_url.path
@@ -227,7 +247,6 @@ class BizService():
await self.send_other_file_message(token_id, app_id, agent_wxid, intersection_wxids, file_url)
#time.sleep(random.uniform(1.5, 3))


async def send_video_message(self, token_id, app_id, agent_wxid, intersection_wxids, file_url):
for t in intersection_wxids:
# 发送视频消息
@@ -263,3 +282,21 @@ class BizService():

async def send_other_file_message(self, token_id, app_id, agent_wxid, intersection_wxids, file_url):
print('send_otherfile_message')

async def sns_sendtext_forward_handler_async(self,content_data):
wxids=content_data.get('wxids',[])
wx_sns_content=content_data.get("wx_sns_content","")
if not wx_sns_content:
logger.warning(f'转发文本消息为空不处理 {wx_sns_content}')
return
if not wxids:
logger.warning(f'wxids 空列表不处理 {wxids}')
return
wxids_first=wxids[0]


async def sns_sendimages_forward_handler_async(self,content_data):
pass

async def sns_sendvideo_forward_handler_async(self,content_data):
pass

+ 41
- 4
services/gewe_service.py Zobrazit soubor

@@ -14,6 +14,7 @@ from common.singleton import singleton
from aiohttp import ClientError
from json.decoder import JSONDecodeError
from common.log import logger
from common.utils import check_chatroom
from model.models import AddGroupContactsHistory
from services.redis_service import RedisService

@@ -768,6 +769,22 @@ class GeWeService:
response_object = await response.json()
return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None)

async def get_sns_list_async(self, token_id, app_id, sns_id: str):
api_url = f"{self.base_url}/v2/api/sns/snsList"
headers = {
'X-GEWE-TOKEN': token_id,
'Content-Type': 'application/json'
}
data = {
"appId": app_id,
"snsId": sns_id,
}
async with aiohttp.ClientSession() as session:
async with session.post(api_url, headers=headers, json=data) as response:
response_object = await response.json()
return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None)


############################### 标签模块 ###############################
async def label_add_async(self, token_id, app_id, label_name):
api_url = f"{self.base_url}/v2/api/label/add"
@@ -999,12 +1016,24 @@ class GeWeService:
# 删除缓存中不再需要的 chatroom_id 数据
for chatroom_id in chatrooms_to_delete:
await self.redis_service.delete_hash_field(hash_key, chatroom_id)

for chatroom_id in chatroom_ids:

if not check_chatroom(chatroom_id):
await self.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id)
logger.info(f'{chatroom_id} 不是有效的群,不处理')

continue

# 获取群信息
ret, msg, data =await self.get_chatroom_info_async(token_id, app_id, chatroom_id)
if ret != 200:
continue

if not data.get('memberList', []):
await self.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id)
logger.info(f'通过 成员列表为空 清理 {chatroom_id}群成员信息')
continue
# 更新缓存
await self.redis_service.update_hash_field(hash_key, chatroom_id, json.dumps(data, ensure_ascii=False))
await asyncio.sleep(0.1)
@@ -1045,8 +1074,14 @@ class GeWeService:
# 获取群信息
ret, msg, data = await self.get_group_memberlist_async(token_id, app_id, chatroom_id)
if ret != 200:
logger.error(f"获取{chatroom_id}群成员信息失败,错误信息:{ret} {msg}")
return
if msg in '获取群成员列表异常:null':
await self.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id)
await self.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_MEMBERS:{wxid}',chatroom_id)
logger.info(f'通过 [获取群成员列表异常:null] 清理 {chatroom_id}群成员信息')
return
else:
logger.error(f"获取{chatroom_id}群成员信息失败,错误信息:{ret} {msg}")
return
await self.redis_service.update_hash_field(hash_key, chatroom_id, json.dumps(data, ensure_ascii=False))

async def get_group_members_from_cache_async(self, wxid,chatroom_id)->dict:
@@ -1119,7 +1154,9 @@ class GeWeService:
"""
# Redis 缓存的 key
hash_key = f"__AI_OPS_WX__:GROUPS_INFO:{wxid}"

if not check_chatroom(chatroom_id):
await self.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id)
return
# 获取群信息
ret, msg, data =await self.get_chatroom_info_async(token_id, app_id, chatroom_id)
if ret != 200:


+ 7
- 0
tasks.py Zobrazit soubor

@@ -14,6 +14,7 @@ from common.utils import *
import asyncio,random
from model.models import AddGroupContactsHistory
import logging
from model.models import AgentConfig

import logging

@@ -620,6 +621,12 @@ def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config,
logger.warning(f"微信号 {wxid} 已经离线,群成员不能定时添加")
continue

config=await gewe_service.get_wxchat_config_from_cache_async(wxid)
validated_config = AgentConfig.model_validate(config)
if not validated_config.agentEnabled:
logger.warning(f"微信号 {wxid} 取消了托管,群成员不能定时添加")
continue

c = await gewe_service.get_wxchat_config_from_cache_async(wxid)
contacts = await gewe_service.get_contacts_brief_from_cache_async(wxid)


Načítá se…
Zrušit
Uložit