You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1237 lines
61KB

  1. from voice.ali.ali_voice import AliVoice
  2. from common.log import logger
  3. import xml.etree.ElementTree as ET
  4. import os,json,asyncio,aiohttp,random
  5. from aiohttp import ClientError
  6. from voice import audio_convert
  7. from fastapi import APIRouter,Request
  8. from pydantic import BaseModel
  9. from fastapi import APIRouter, Depends
  10. from typing import Dict, Any
  11. from model.models import AgentConfig,OperationType
  12. from common.utils import *
  13. from common.memory import *
  14. import traceback
  15. import sys
  16. timeout_duration = 8.0
  17. messages_router = APIRouter()
  18. #WX_BACKLIST=['fmessage', 'medianote','weixin','weixingongzhong','tmessage']
  19. WX_BACKLIST=['medianote','weixin','weixingongzhong','tmessage']
  20. @messages_router.post("/messages",response_model=None)
  21. async def get_messages(request: Request, body: Dict[str, Any]):
  22. logger.info(f"收到微信回调消息: {json.dumps(body, separators=(',', ':'),ensure_ascii=False)}")
  23. try:
  24. msg = body
  25. type_name =msg.get("TypeName")
  26. app_id = msg.get("Appid")
  27. k, loginfo = await request.app.state.gewe_service.get_login_info_by_app_id_async(app_id)
  28. if not k:
  29. logger.warning('找不到登录信息,不处理')
  30. return {"message": f"收到微信回调消息: {type_name}"}
  31. token_id=loginfo.get('tokenId','')
  32. wxid = msg.get("Wxid",'')
  33. if type_name == 'AddMsg':
  34. await handle_self_cmd_async(request,wxid,msg)
  35. msg_data = msg.get("Data")
  36. from_wxid = msg_data["FromUserName"]["string"]
  37. config=await request.app.state.redis_service.get_hash(f"__AI_OPS_WX__:WXCHAT_CONFIG")
  38. wxids = list(config.keys())
  39. meged_backlist_wxids=wxids+WX_BACKLIST
  40. # 公众号ID已gh_开头
  41. if (from_wxid in meged_backlist_wxids and from_wxid != wxid) or 'gh_' in from_wxid:
  42. logger.warning(f'来自微信ID {from_wxid} 在黑名单,发送给 {wxid} ,不处理')
  43. return {"message": "收到微信回调消息"}
  44. await handle_messages_async(request,token_id,msg)
  45. return {"message": "收到微信回调消息,处理完成"}
  46. except Exception as e:
  47. # 获取当前的堆栈跟踪
  48. tb = sys.exc_info()[2]
  49. # 为异常附加堆栈跟踪
  50. e = e.with_traceback(tb)
  51. # 输出详细的错误信息
  52. logger.error(f"处理微信回调消息出错: {json.dumps(body, separators=(',', ':'),ensure_ascii=False)}")
  53. logger.error(f"异常类型: {type(e).__name__}")
  54. logger.error(f"异常信息: {str(e)}")
  55. logger.error(f"堆栈跟踪: {traceback.format_exc()}")
  56. return {"message": "处理微信回调消息错误"}
  57. async def handle_self_cmd_async(request: Request,wxid,msg):
  58. '''
  59. 个人微信命令处理
  60. 如果是个人微信的指令,wxid == from_wxid
  61. commands = {
  62. '启用托管': True,
  63. '关闭托管': False
  64. }
  65. '''
  66. msg_data=msg.get("Data")
  67. from_wxid=msg_data["FromUserName"]["string"]
  68. msg_content=msg_data["Content"]["string"]
  69. if wxid == from_wxid:
  70. commands = {
  71. '启用托管': True,
  72. '关闭托管': False
  73. }
  74. if msg_content in commands:
  75. c_dict = await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wxid)
  76. if c_dict:
  77. agent_config=AgentConfig.model_validate(c_dict)
  78. agent_config.agentEnabled=commands[msg_content]
  79. await request.app.state.gewe_service.wxchat.save_wxchat_config_async(wxid, agent_config.model_dump())
  80. logger.info(f'{wxid} {"启动" if commands[msg_content] else "关闭"}托管')
  81. async def handle_messages_async(request: Request,token_id,msg):
  82. #msg_data=msg.get("Data")
  83. type_name =msg.get("TypeName")
  84. # app_id = msg.get("Appid")
  85. # from_wxid=msg_data["FromUserName"]["string"]
  86. # msg_content=msg_data["Content"]["string"]
  87. wxid = msg.get("Wxid",'')
  88. match type_name:
  89. case 'AddMsg':
  90. await handle_add_messages_async(request,token_id,msg,wxid)
  91. case 'ModContacts':
  92. await handle_mod_contacts_async(request,token_id,msg,wxid)
  93. case 'DelContacts':
  94. await handle_del_contacts_async(request,token_id,msg,wxid)
  95. case 'Offline':
  96. await handle_offline_async(request,token_id,msg,wxid)
  97. case _:
  98. logger.warning(f'未知消息类型:{type_name}')
  99. # async def gpt_client_async(request,messages: list, wixd: str, friend_wxid: str):
  100. # c = await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wixd)
  101. # api_key = c.get('agentTokenId', "sk-jr69ONIehfGKe9JFphuNk4DU5Y5wooHKHhQv7oSnFzVbwCnW65fXO9kvH")
  102. # print(f'流程key:{api_key}\n')
  103. # #api_key="sk-jr69ONIehfGKe9JFphuNk4DU5Y5wooHKHhQv7oSnFzVbwCnW65fXO9kvH" #测试
  104. # #api_key="sk-uJDBdKmJVb2cmfldGOvlIY6Qx0AzqWMPD3lS1IzgQYzHNOXv9SKNI" #开发2
  105. # api_url = "http://106.15.182.218:3000/api/v1/chat/completions"
  106. # headers = {
  107. # "Content-Type": "application/json",
  108. # "Authorization": f"Bearer {api_key}"
  109. # }
  110. # session_id = f'{wixd}-{friend_wxid}'
  111. # data = {
  112. # "model": "",
  113. # "messages": messages,
  114. # "chatId": session_id,
  115. # "detail": True
  116. # }
  117. # logger.info("[CHATGPT] 请求={}".format(json.dumps(data, ensure_ascii=False)))
  118. # async with aiohttp.ClientSession() as session:
  119. # try:
  120. # async with session.post(url=api_url, headers=headers, data=json.dumps(data), timeout=600) as response:
  121. # response.raise_for_status()
  122. # response_data = await response.json()
  123. # logger.info("[CHATGPT] 响应={}".format(json.dumps(response_data, separators=(',', ':'), ensure_ascii=False)))
  124. # return response_data
  125. # except aiohttp.ClientError as e:
  126. # logger.error(f"[CHATGPT] 请求失败: {e}")
  127. # raise
  128. async def gpt_client_async(request, messages: list, wixd: str, friend_wxid: str):
  129. max_retries = 3
  130. retry_delay = 5 # 重试间隔时间(秒)
  131. for attempt in range(max_retries):
  132. try:
  133. c = await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wixd)
  134. api_key = c.get('agentTokenId', "sk-jr69ONIehfGKe9JFphuNk4DU5Y5wooHKHhQv7oSnFzVbwCnW65fXO9kvH")
  135. base_url="http://106.15.182.218:3000"
  136. environment = os.environ.get('environment', 'default')
  137. if environment != 'default':
  138. base_url="http://172.19.42.59:3000"
  139. api_url = f"{base_url}/api/v1/chat/completions"
  140. headers = {
  141. "Content-Type": "application/json",
  142. "Authorization": f"Bearer {api_key}"
  143. }
  144. session_id = f'{wixd}-{friend_wxid}'
  145. data = {
  146. "model": "",
  147. "messages": messages,
  148. "chatId": session_id,
  149. "detail": True
  150. }
  151. logger.info("[CHATGPT] 请求={}".format(json.dumps(data, ensure_ascii=False)))
  152. async with aiohttp.ClientSession() as session:
  153. async with session.post(url=api_url, headers=headers, data=json.dumps(data), timeout=1200) as response:
  154. response.raise_for_status()
  155. response_data = await response.json()
  156. return response_data
  157. except (ClientError, asyncio.TimeoutError) as e:
  158. logger.error(f"[CHATGPT] 请求失败(尝试 {attempt + 1}/{max_retries}): {e}")
  159. if attempt < max_retries - 1:
  160. await asyncio.sleep(retry_delay)
  161. else:
  162. raise
  163. async def handle_add_messages_async(request: Request,token_id,msg,wxid):
  164. wx_config =await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wxid)
  165. # if not bool(wx_config.get("agentEnabled",False)):
  166. # logger.info(f'微信ID {wxid} 未托管,不处理')
  167. # return
  168. app_id = msg.get("Appid")
  169. msg_data = msg.get("Data")
  170. msg_type = msg_data.get("MsgType",None)
  171. from_wxid = msg_data["FromUserName"]["string"]
  172. to_wxid = msg_data["ToUserName"]["string"]
  173. msg_push_content=msg_data.get("PushContent")
  174. validated_config = AgentConfig.model_validate(wx_config)
  175. if not validated_config.agentEnabled:
  176. logger.info(f'微信ID {wxid} 未托管,不处理 {msg_type}')
  177. return
  178. handlers = {
  179. 1: handle_text_async,
  180. 3: handle_image_async,
  181. 34: handle_voice_async,
  182. 42: handle_name_card_async,
  183. 49: handle_xml_async,
  184. 37: handle_add_friend_notice_async,
  185. 10002: handle_10002_msg_async,
  186. 10000: handle_10000_msg_async
  187. }
  188. # (扫码进群情况)判断受否是群聊,并添加到通信录
  189. if check_chatroom(from_wxid) or check_chatroom(to_wxid):
  190. logger.info('群信息')
  191. chatroom_id=from_wxid
  192. ret,msg,data=await request.app.state.gewe_service.save_contract_list_async(token_id,app_id,chatroom_id,3)
  193. logger.info(f'保存到通讯录 chatroom_id {chatroom_id} {msg}')
  194. await request.app.state.gewe_service.update_group_info_to_cache_async(token_id,app_id,wxid,chatroom_id)
  195. await request.app.state.gewe_service.update_group_members_to_cache_async(token_id,app_id,wxid,chatroom_id)
  196. handlers[1]=handle_text_group_async
  197. handlers[3]=handle_image_group_async
  198. handlers[34]=handle_voice_group_async
  199. handler = handlers.get(msg_type)
  200. if handler:
  201. return await handler(request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid)
  202. else:
  203. logger.warning(f"微信回调消息类型 {msg_type} 未处理")
  204. async def handle_mod_contacts_async(request: Request,token_id,msg,wxid):
  205. '''
  206. 好友通过验证及好友资料变更的通知消息
  207. '''
  208. wxid = msg.get("Wxid")
  209. msg_data = msg.get("Data")
  210. app_id = msg.get("Appid")
  211. #
  212. #handle_mod_contacts(token_id,app_id,wxid,msg_data)
  213. #
  214. user_name=msg_data.get("UserName",{}).get("string","")
  215. if not check_chatroom(user_name):
  216. # loop = asyncio.get_event_loop()
  217. # future = asyncio.run_coroutine_threadsafe(
  218. # handle_mod_contacts_worker_async(request,token_id,app_id,wxid,msg_data),
  219. # loop
  220. # )
  221. await handle_mod_contacts_worker_async(request,token_id,app_id,wxid,msg_data)
  222. #判断是否好友关系
  223. # is_friends=True
  224. # if is_friends:
  225. # loop = asyncio.get_event_loop()
  226. # future = asyncio.run_coroutine_threadsafe(
  227. # handle_mod_contacts_worker_async(request,token_id,app_id,wxid,msg_data),
  228. # loop
  229. # )
  230. # contact_wxid =msg_data.get("UserName",{}).get("string","") #msg_data["UserName"]["string"]
  231. # nickname= msg_data.get("NickName",{}).get("string","")#msg_data["NickName"]["string"]
  232. # city=msg_data.get("City",None)
  233. # signature=msg_data.get("Signature",None)
  234. # province=msg_data.get("Province",None)
  235. # bigHeadImgUrl=msg_data.get("SnsUserInfo",{}).get("SnsBgimgId",None) #msg_data["SnsUserInfo"]["SnsBgimgId"]
  236. # country=msg_data.get("Country",None)
  237. # sex=msg_data.get("Sex",None)
  238. # pyInitial= msg_data.get("PyInitial",{}).get("string",None)#msg_data["PyInitial"]["string"]
  239. # quanPin=msg_data.get("QuanPin",{}).get("string",None) #msg_data["QuanPin"]["string"]
  240. # remark=msg_data.get("Remark",{}).get("string",None)
  241. # remarkPyInitial=msg_data.get("RemarkPyInitial",{}).get("string",None)
  242. # remarkQuanPin=msg_data.get("RemarkQuanPin",{}).get("string",None)
  243. # smallHeadImgUrl=msg_data.get("smallHeadImgUrl",None)
  244. # #推动到kafka
  245. # contact_data = {
  246. # "alias": None,
  247. # "bigHeadImgUrl": bigHeadImgUrl,
  248. # "cardImgUrl": None,
  249. # "city": city,
  250. # "country": country,
  251. # "description": None,
  252. # "labelList": None,
  253. # "nickName": nickname,
  254. # "phoneNumList": None,
  255. # "province": province,
  256. # "pyInitial": pyInitial,
  257. # "quanPin": quanPin,
  258. # "remark": remark,
  259. # "remarkPyInitial": remarkPyInitial,
  260. # "remarkQuanPin": remarkQuanPin,
  261. # "sex": sex,
  262. # "signature": signature,
  263. # "smallHeadImgUrl": smallHeadImgUrl,
  264. # "snsBgImg": None,
  265. # "userName": contact_wxid
  266. # }
  267. # input_message=wx_mod_contact_message(wxid,contact_data)
  268. # await request.app.state.kafka_service.send_message_async(input_message)
  269. # else:
  270. # logger.info('删除好友通知')
  271. # contact_wxid =msg_data.get("UserName",{}).get("string","")
  272. # # 推送到kafka
  273. # input_message=wx_del_contact_message(wxid,contact_wxid)
  274. # await request.app.state.kafka_service.send_message_async(input_message)
  275. else:
  276. logger.info('群信息变更通知')
  277. chatroom_id=user_name
  278. ret,msg,data=await request.app.state.gewe_service.save_contract_list_async(token_id,app_id,chatroom_id,3)
  279. logger.info(f'保存到通讯录 chatroom_id {chatroom_id} {msg}')
  280. await request.app.state.gewe_service.update_group_info_to_cache_async(token_id,app_id,wxid,chatroom_id)
  281. await request.app.state.gewe_service.update_group_members_to_cache_async(token_id,app_id,wxid,chatroom_id)
  282. # group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id)
  283. # k_message=wx_mod_group_info_members_message(wxid,group_info_members)
  284. # 全量推送
  285. #all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid)
  286. #k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers)
  287. k_message=wx_groups_info_members_key_message(wxid)
  288. await request.app.state.kafka_service.send_message_async(k_message)
  289. async def handle_del_contacts_async(request: Request,token_id,msg,wxid):
  290. '''
  291. 删除好友通知/退出群聊
  292. '''
  293. msg_data = msg.get("Data")
  294. username=msg_data["UserName"]["string"]
  295. if check_chatroom(username):
  296. logger.info('退出群聊')
  297. wxid = msg.get("Wxid")
  298. chatroom_id=username
  299. await request.app.state.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id)
  300. await request.app.state.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_MEMBERS:{wxid}',chatroom_id)
  301. logger.info(f'清除 chatroom_id{chatroom_id} 数据')
  302. # 推送删除群资料到kafka
  303. k_message=wx_del_group_message(wxid,chatroom_id)
  304. await request.app.state.kafka_service.send_message_async(k_message)
  305. else:
  306. logger.info('删除好友通知')
  307. # 推送到kafka
  308. input_message=wx_del_contact_message(wxid,username)
  309. await request.app.state.kafka_service.send_message_async(input_message)
  310. async def handle_offline_async(request: Request,token_id,msg,wxid):
  311. '''
  312. 已经离线
  313. '''
  314. wxid = msg.get("Wxid")
  315. app_id = msg.get("Appid")
  316. logger.warning(f'微信ID {wxid}在设备{app_id}已经离线')
  317. k,r=await request.app.state.gewe_service.get_login_info_by_app_id_async(app_id)
  318. print(k)
  319. await request.app.state.redis_service.update_hash_field(k,'status',0)
  320. await request.app.state.redis_service.update_hash_field(k,'modify_at',int(time.time()))
  321. # 推送到kafka
  322. input_message=wx_offline_message(app_id,wxid)
  323. await request.app.state.kafka_service.send_message_async(input_message)
  324. async def handle_mod_contacts_worker_async(request:Request,token_id,app_id,wxid,msg_data):
  325. '''
  326. 好友通过验证及好友资料变更的通知消息
  327. '''
  328. logger.info('好友通过验证及好友资料变更的通知消息')
  329. if not check_chatroom(msg_data["UserName"]["string"]):
  330. contact_wxid = msg_data["UserName"]["string"]
  331. ret,msg,contacts_list = await request.app.state.gewe_service.fetch_contacts_list_async(token_id, app_id)
  332. friend_wxids = [c for c in contacts_list['friends'] if c not in ['fmessage', 'medianote','weixin','weixingongzhong','tmessage']] # 可以调整截取范围
  333. print(f'{wxid}的好友资料变更,数量 {len(friend_wxids)}')
  334. data = await request.app.state.gewe_service.save_contacts_brief_to_cache_async(token_id, app_id, wxid, friend_wxids)
  335. #k_message = wx_all_contacts_message(wxid, data)
  336. k_message = wx_all_contacts_key_message(wxid)
  337. await request.app.state.kafka_service.send_message_async(k_message)
  338. else:
  339. logger.info('群聊好友通过验证及好友资料变更的通知消息')
  340. async def handle_text_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  341. '''
  342. 私聊文本消息
  343. '''
  344. config=await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wxid)
  345. validated_config = AgentConfig.model_validate(config)
  346. if not validated_config.privateGroupChatEnabled:
  347. logger.warning(f"微信号 {wxid} 关闭好友和群自动回复功能,{handle_text_async.__name__} 不回复消息")
  348. return
  349. # 判断是否转人工处理功能
  350. is_human_handle_msg= await request.app.state.gewe_service.is_human_handle_msg_async(wxid)
  351. if is_human_handle_msg:
  352. logger.warning(f'微信号 {wxid} 暂时工人接管30分钟中')
  353. return
  354. msg_content=msg_data["Content"]["string"]
  355. if wxid == from_wxid: #手动发送消息
  356. logger.info("Active message sending detected")
  357. await request.app.state.gewe_service.save_contacts_brief_to_cache_async(token_id,app_id,wxid,[to_wxid])
  358. callback_to_user=msg_data["ToUserName"]["string"]
  359. # 转人工处理功能
  360. await request.app.state.gewe_service.set_human_handle_msg_async(wxid,60*30)
  361. # 推送到kafka
  362. input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  363. input_message=dialogue_message(from_wxid,to_wxid,input_wx_content_dialogue_message)
  364. await request.app.state.kafka_service.send_message_async(input_message)
  365. logger.info("发送对话 %s",input_message)
  366. else:
  367. callback_to_user=msg_data["FromUserName"]["string"]
  368. # 判断哪些关键词存在于 msg_content 中
  369. keywords = ["预约", "报价", "购买", "价钱"]
  370. found_keywords = [keyword for keyword in keywords if keyword in msg_content]
  371. if found_keywords:
  372. await request.app.state.gewe_service.set_human_handle_msg_with_contact_wxid_async(wxid,callback_to_user,60*30)
  373. logger.info(f"{wxid} 收到 {callback_to_user} 私聊消息匹配到关键词:{', '.join(found_keywords)}")
  374. # 回复好友
  375. await request.app.state.gewe_service.post_text_async(token_id, app_id, callback_to_user, "我这边目前有点忙,稍后回复您好吗?")
  376. await asyncio.sleep(random.uniform(1.5,3))
  377. # 推送到助理
  378. print('推送到助理')
  379. '''
  380. 长服AI商机提醒助理
  381. 管家助理
  382. 18664262743
  383. wxid_9pocbage7cdb22
  384. '''
  385. await request.app.state.gewe_service.post_text_async(token_id, app_id, 'wxid_9pocbage7cdb22', msg_content)
  386. return
  387. # 是否在转人工处理
  388. is_human_handle_msg_with_contact_wxid= await request.app.state.gewe_service.is_human_handle_msg_with_contact_wxid_async(wxid,callback_to_user)
  389. if is_human_handle_msg_with_contact_wxid:
  390. logger.warning(f'微信号 {wxid} 与 {callback_to_user} 有关键字匹配,暂时工人接管30分钟中,请查看长服AI商机提醒助理')
  391. return
  392. # 创建并启动任务协程,将参数传递给 ai_chat_text 函数
  393. task = asyncio.create_task(
  394. ai_chat_text_async(request,token_id, app_id, wxid, msg_data, msg_content)
  395. )
  396. # 设置定时器,1秒后检查任务是否超时。这里需要使用 lambda 来传递参数
  397. timeout_timer = asyncio.create_task(
  398. check_timeout_async(task, request,token_id, wxid, app_id, callback_to_user)
  399. )
  400. # 等待任务协程完成
  401. await task
  402. # 取消定时器
  403. timeout_timer.cancel()
  404. async def check_timeout_async(task: asyncio.Task, request: Request,token_id, wxid, app_id, callback_to_user):
  405. await asyncio.sleep(timeout_duration) # 等待超时时间
  406. if not task.done():
  407. print(f"任务运行时间超过{timeout_duration}秒,token_id={token_id}, app_id={app_id}, callback_to_user={callback_to_user}")
  408. wx_config = await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wxid)
  409. if bool(wx_config.get("chatWaitingMsgEnabled", True)):
  410. # if callback_to_user == 'wxid_mesh33pw13e721':
  411. # logger.info(f'wxid_mesh33pw13e721 不发送到微信有关等待的AI回复到微信')
  412. # else:
  413. # phrases = ["稍等一下", "辛苦等等", "马上就好", "很快就好", "请稍后", "请等会", "稍等1分钟","请别急,在整理","就好了"]
  414. # # 随机选择一个短语
  415. # random_phrase = random.choice(phrases)
  416. # await request.app.state.gewe_service.post_text_async(token_id, app_id, callback_to_user, random_phrase)
  417. phrases = ["稍等一下", "辛苦等等", "马上就好", "很快就好", "请稍后", "请等会", "稍等1分钟","请别急,在整理","就好了"]
  418. random_phrase = random.choice(phrases)
  419. await request.app.state.gewe_service.post_text_async(token_id, app_id, callback_to_user, random_phrase)
  420. #await request.app.state.gewe_service.post_text_async(token_id, app_id, callback_to_user, "亲,我正在组织回复的信息,请稍等一会")
  421. async def ai_chat_text_async(request: Request,token_id, app_id, wxid, msg_data, msg_content):
  422. start_time = time.time() # 记录任务开始时间
  423. callback_to_user = msg_data["FromUserName"]["string"]
  424. k,loginfo=await request.app.state.gewe_service.get_login_info_by_wxid_async(wxid)
  425. wxid_nickname=loginfo.get("nickName")
  426. contacts_brief:list=await request.app.state.gewe_service.get_contacts_brief_from_cache_async(wxid)
  427. #callback_to_user_nickname=next(filter(lambda x:x.get("userName") == callback_to_user,contacts_brief),None).get("nickName")
  428. callback_to_user_nickname = next(filter(lambda x: x.get("userName") == callback_to_user, contacts_brief), {}).get("nickName", "")
  429. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
  430. prompt = {"role": "user", "content": [{
  431. "type": "text",
  432. "text": msg_content
  433. }]}
  434. messages_to_send = await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, prompt)
  435. # 收到的对话
  436. input_wx_content_dialogue_message = [{"type": "text", "text": msg_content}]
  437. input_message = dialogue_message(callback_to_user, wxid, input_wx_content_dialogue_message)
  438. await request.app.state.kafka_service.send_message_async(input_message)
  439. logger.info("发送对话 %s", input_message)
  440. cache_data = USER_INTERACTIVE_CACHE.get(wxid)
  441. if cache_data and cache_data.get('interactive'):
  442. o = get_first_char_if_digit(msg_content)
  443. if o is not None:
  444. userSelectOptions = cache_data.get('options')
  445. if o < len(userSelectOptions):
  446. o = o - 1
  447. msg_content = userSelectOptions[o].get("value")
  448. messages_to_send = [{"role": "user", "content": msg_content}]
  449. else:
  450. messages_to_send = [{"role": "user", "content": msg_content}]
  451. else:
  452. messages_to_send = [{"role": "user", "content": msg_content}]
  453. res = await gpt_client_async(request,messages_to_send, wxid, callback_to_user)
  454. reply_content = remove_markdown_symbol(res["choices"][0]["message"]["content"])
  455. description = ''
  456. userSelectOptions = []
  457. if isinstance(reply_content, list) and any(item.get("type") == "interactive" for item in reply_content):
  458. for item in reply_content:
  459. if item["type"] == "interactive" and item["interactive"]["type"] == "userSelect":
  460. params = item["interactive"]["params"]
  461. description = params.get("description")
  462. userSelectOptions = params.get("userSelectOptions", [])
  463. values_string = "\n".join(option["value"] for option in userSelectOptions)
  464. if description is not None:
  465. USER_INTERACTIVE_CACHE[wxid] = {
  466. "interactive": True,
  467. "options": userSelectOptions,
  468. }
  469. reply_content = description + '------------------------------\n' + values_string
  470. elif isinstance(reply_content, list) and any(item.get("type") == "text" for item in reply_content):
  471. USER_INTERACTIVE_CACHE[wxid] = {
  472. "interactive": False
  473. }
  474. text = ''
  475. for item in reply_content:
  476. if item["type"] == "text":
  477. text = item["text"]["content"]
  478. if text == '':
  479. # 去除上次上一轮对话再次请求
  480. cache_messages_str = await request.app.state.redis_service.get_hash_field(hash_key, "data")
  481. cache_messages = json.loads(cache_messages_str) if cache_messages_str else []
  482. if len(cache_messages) >= 3:
  483. cache_messages = cache_messages[:-3]
  484. await request.app.state.redis_service.update_hash_field(hash_key, "data", json.dumps(cache_messages, ensure_ascii=False))
  485. messages_to_send = await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, prompt)
  486. res = await gpt_client_async(request,messages_to_send, wxid, callback_to_user)
  487. reply_content = remove_markdown_symbol(res["choices"][0]["message"]["content"])
  488. if isinstance(reply_content, list):
  489. reply_content = remove_markdown_symbol(reply_content[0].get('text').get("content"))
  490. else:
  491. reply_content = text
  492. else:
  493. USER_INTERACTIVE_CACHE[wxid] = {
  494. "interactive": False
  495. }
  496. reply_content = remove_markdown_symbol(res["choices"][0]["message"]["content"])
  497. if reply_content == '不回复':
  498. end_time = time.time() # 记录任务结束时间
  499. execution_time = end_time - start_time # 计算执行时间
  500. logger.warning(f"AI回答任务完成,耗时 {execution_time:.2f} 秒,AI回答<不回复>,跳过微信回复")
  501. await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, {"role": "assistant", "content": reply_content})
  502. return
  503. # 昵称替换
  504. replacements = {
  505. '{昵称}': wxid_nickname,
  506. '{好友昵称}': callback_to_user_nickname
  507. }
  508. reply_content=replace_placeholders(reply_content, replacements)
  509. # 判断图片url
  510. img_urls,reply_content=extract_and_replace_image_urls(reply_content)
  511. if img_urls:
  512. await request.app.state.gewe_service.post_text_async(token_id, app_id, callback_to_user, reply_content)
  513. await asyncio.sleep(random.uniform(1.5, 3))
  514. for img_url in img_urls:
  515. await request.app.state.gewe_service.post_image_async(token_id, app_id, callback_to_user, img_url)
  516. await asyncio.sleep(random.uniform(1.5, 3))
  517. # 判断视频url
  518. video_urls,reply_content=extract_and_replace_video_urls(reply_content)
  519. if video_urls:
  520. await request.app.state.gewe_service.post_text_async(token_id, app_id, callback_to_user, reply_content)
  521. await asyncio.sleep(random.uniform(1.5, 3))
  522. for video_url in video_urls:
  523. parsed_url = urlparse(video_url)
  524. filename = os.path.basename(parsed_url.path)
  525. tmp_file_path = os.path.join(os.getcwd(),'tmp', filename) # 拼接完整路径
  526. thumbnail_path=tmp_file_path.replace('.mp4','.jpg')
  527. video_thumb_url,video_duration =download_video_and_get_thumbnail(video_url,thumbnail_path)
  528. logger.info(f'{wxid} 视频缩略图 {video_thumb_url} 时长 {video_duration}')
  529. ret,ret_msg,res = await request.app.state.gewe_service.post_video_async(token_id, app_id, callback_to_user, video_url,video_thumb_url,video_duration)
  530. if ret!=200:
  531. logger.warning(f'{wxid} 发送视频{video_url} 到 {callback_to_user} 失败,{ret_msg}')
  532. await asyncio.sleep(random.uniform(1.5, 3))
  533. # 发送AI微信回复
  534. await request.app.state.gewe_service.post_text_async(token_id, app_id, callback_to_user, reply_content)
  535. await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, {"role": "assistant", "content": reply_content})
  536. # 回复的对话
  537. input_wx_content_dialogue_message = [{"type": "text", "text": reply_content}]
  538. input_message = dialogue_message(wxid, callback_to_user, input_wx_content_dialogue_message, True)
  539. await request.app.state.kafka_service.send_message_async(input_message)
  540. logger.info("发送对话 %s", input_message)
  541. end_time = time.time() # 记录任务结束时间
  542. execution_time = end_time - start_time # 计算执行时间
  543. logger.info(f"AI回答任务完成,耗时 {execution_time:.2f} 秒")
  544. async def handle_text_group_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  545. '''
  546. 群聊文本消息
  547. '''
  548. msg_content=msg_data["Content"]["string"]
  549. msg_push_content=msg_data.get("PushContent")
  550. k,login_info=await request.app.state.gewe_service.get_login_info_by_app_id_async(app_id)
  551. nickname=login_info.get("nickName")
  552. config=await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wxid)
  553. validated_config = AgentConfig.model_validate(config)
  554. if not validated_config.privateGroupChatEnabled:
  555. logger.warning(f"微信号 {wxid} 关闭好友和群自动回复功能,{handle_text_group_async.__name__} 不回复消息")
  556. return
  557. if wxid == from_wxid: #手动发送消息
  558. logger.info("Active message sending detected")
  559. await request.app.state.gewe_service.save_contacts_brief_to_cache_async(token_id,app_id,wxid,[to_wxid])
  560. callback_to_user=msg_data["ToUserName"]["string"]
  561. input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  562. input_message=dialogue_message(from_wxid,to_wxid,input_wx_content_dialogue_message)
  563. await request.app.state.kafka_service.send_message_async(input_message)
  564. logger.info("发送对话 %s",input_message)
  565. else:
  566. c = await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wxid)
  567. chatroom_id_white_list = c.get("chatroomIdWhiteList", [])
  568. if not chatroom_id_white_list:
  569. logger.info('白名单为空或未定义,不处理')
  570. return
  571. if from_wxid not in chatroom_id_white_list:
  572. logger.info(f'群ID {from_wxid} 不在白名单中,不处理')
  573. return
  574. if '在群聊中@了你' in msg_push_content or '@'+nickname in msg_push_content:
  575. callback_to_user=msg_data["FromUserName"]["string"]
  576. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
  577. prompt={"role": "user", "content": [{
  578. "type": "text",
  579. "text": msg_content
  580. }]}
  581. messages_to_send=await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, prompt)
  582. # 收到的对话
  583. input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  584. input_message=dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message)
  585. await request.app.state.kafka_service.send_message_async(input_message)
  586. logger.info("发送对话 %s",input_message)
  587. cache_data = USER_INTERACTIVE_CACHE.get(wxid)
  588. if cache_data and cache_data.get('interactive') :
  589. o=get_first_char_if_digit(msg_content)
  590. if o is not None:
  591. userSelectOptions=cache_data.get('options')
  592. if o < len(userSelectOptions):
  593. o=o-1
  594. msg_content=userSelectOptions[o].get("value")
  595. messages_to_send=[{"role": "user", "content": msg_content}]
  596. else:
  597. messages_to_send=[{"role": "user", "content": msg_content}]
  598. else:
  599. messages_to_send=[{"role": "user", "content": msg_content}]
  600. res=await gpt_client_async(request,messages_to_send,wxid,callback_to_user)
  601. reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"])
  602. description = ''
  603. userSelectOptions = []
  604. if isinstance(reply_content, list) and any(item.get("type") == "interactive" for item in reply_content):
  605. for item in reply_content:
  606. if item["type"] == "interactive" and item["interactive"]["type"] == "userSelect":
  607. params = item["interactive"]["params"]
  608. description = params.get("description")
  609. userSelectOptions = params.get("userSelectOptions", [])
  610. values_string = "\n".join(option["value"] for option in userSelectOptions)
  611. if description is not None:
  612. USER_INTERACTIVE_CACHE[wxid] = {
  613. "interactive":True,
  614. "options": userSelectOptions,
  615. }
  616. reply_content=description + '------------------------------\n'+values_string
  617. elif isinstance(reply_content, list) and any(item.get("type") == "text" for item in reply_content):
  618. USER_INTERACTIVE_CACHE[wxid] = {
  619. "interactive":False
  620. }
  621. text=''
  622. for item in reply_content:
  623. if item["type"] == "text":
  624. text=item["text"]["content"]
  625. if text=='':
  626. # 去除上次上一轮对话再次请求
  627. cache_messages_str=await request.app.state.redis_service.get_hash_field(hash_key,"data")
  628. cache_messages = json.loads(cache_messages_str) if cache_messages_str else []
  629. if len(cache_messages) >= 3:
  630. cache_messages = cache_messages[:-3]
  631. await request.app.state.redis_service.update_hash_field(hash_key,"data",json.dumps(cache_messages,ensure_ascii=False))
  632. messages_to_send=await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, prompt)
  633. res=await gpt_client_async(request,messages_to_send,wxid,callback_to_user)
  634. reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"])
  635. else:
  636. reply_content=text
  637. else:
  638. USER_INTERACTIVE_CACHE[wxid] = {
  639. "interactive":False
  640. }
  641. reply_content=res["choices"][0]["message"]["content"]
  642. reply_content='@'+extract_nickname(msg_push_content) + reply_content
  643. await request.app.state.gewe_service.post_text_async(token_id,app_id,callback_to_user,reply_content)
  644. await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, {"role": "assistant", "content": reply_content})
  645. # 回复的对话
  646. reply_content=f'{wxid}:\n'+ reply_content
  647. input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
  648. input_message=dialogue_message(wxid,callback_to_user,input_wx_content_dialogue_message,True)
  649. await request.app.state.kafka_service.send_message_async(input_message)
  650. logger.info("发送对话 %s",input_message)
  651. else:
  652. logger.info('群聊公开消息')
  653. callback_to_user=msg_data["FromUserName"]["string"]
  654. group_dialogue_message=[{"type": "text", "text": msg_content}]
  655. input_message=dialogue_message(callback_to_user,wxid,group_dialogue_message)
  656. await request.app.state.kafka_service.send_message_async(input_message)
  657. logger.info("发送对话 %s",input_message)
  658. return
  659. async def handle_image_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  660. '''
  661. 私聊图片消息
  662. '''
  663. msg_content=msg_data["Content"]["string"]
  664. callback_to_user=from_wxid
  665. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
  666. wx_img_url= await request.app.state.gewe_service.download_image_msg_async(token_id,app_id,msg_content)
  667. if not wx_img_url:
  668. logger.warning(f'{wxid} 下载 {callback_to_user} 图片失败')
  669. return
  670. oss_access_key_id="LTAI5tRTG6pLhTpKACJYoPR5"
  671. oss_access_key_secret="E7dMzeeMxq4VQvLg7Tq7uKf3XWpYfN"
  672. oss_endpoint="http://oss-cn-shanghai.aliyuncs.com"
  673. oss_bucket_name="cow-agent"
  674. oss_prefix="cow"
  675. img_url=upload_oss(oss_access_key_id, oss_access_key_secret, oss_endpoint, oss_bucket_name, wx_img_url, oss_prefix)
  676. prompt={
  677. "role": "user",
  678. "content": [{
  679. "type": "image_url",
  680. "image_url": {"url": img_url}
  681. }]
  682. }
  683. await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, prompt)
  684. await request.app.state.gewe_service.post_text_async(token_id,app_id,callback_to_user,'已经上传了图片,有什么可以为您服务')
  685. logger.info(f"上传图片 URL: {img_url}")
  686. wx_content_dialogue_message=[{"type": "image_url", "image_url": {"url": img_url}}]
  687. input_message=dialogue_message(wxid,callback_to_user,wx_content_dialogue_message)
  688. await request.app.state.kafka_service.send_message_async(input_message)
  689. logger.info("发送对话 %s",input_message)
  690. async def handle_image_group_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  691. logger.info('群聊图片消息')
  692. msg_content=msg_data["Content"]["string"]
  693. callback_to_user=msg_data["FromUserName"]["string"]
  694. aeskey = re.search(r'aeskey="([^"]+)"', msg_content).group(1)
  695. cdnthumburl = re.search(r'cdnthumburl="([^"]+)"', msg_content).group(1)
  696. md5 = re.search(r'md5="([^"]+)"', msg_content).group(1)
  697. cdnthumblength = re.search(r'cdnthumblength="([^"]+)"', msg_content).group(1)
  698. cdnthumbheight = re.search(r'cdnthumbheight="([^"]+)"', msg_content).group(1)
  699. cdnthumbwidth = re.search(r'cdnthumbwidth="([^"]+)"', msg_content).group(1)
  700. length = re.search(r'length="([^"]+)"', msg_content).group(1)
  701. img_xml=f'<?xml version=\"1.0\"?>\n<msg>\n\t<img aeskey=\"{aeskey}\" encryver=\"1\" cdnthumbaeskey=\"{aeskey}\" cdnthumburl=\"{cdnthumburl}\" cdnthumblength=\"{cdnthumblength}\" cdnthumbheight=\"{cdnthumbheight}\" cdnthumbwidth=\"{cdnthumbwidth}\" cdnmidheight=\"0\" cdnmidwidth=\"0\" cdnhdheight=\"0\" cdnhdwidth=\"0\" cdnmidimgurl=\"{cdnthumburl}\" length=\"{length}\" md5=\"{md5}\" />\n\t<platform_signature></platform_signature>\n\t<imgdatahash></imgdatahash>\n</msg>'
  702. wx_img_url= await request.app.state.gewe_service.download_image_msg_async(token_id,app_id,img_xml)
  703. if not wx_img_url:
  704. logger.warning(f'{wxid} 下载 {callback_to_user} 图片失败')
  705. return
  706. oss_url=wx_img_url_to_oss_url(wx_img_url)
  707. reply_content = re.sub(r'<\?xml.*', f'{oss_url}', msg_content, flags=re.DOTALL)
  708. #reply_content=f'{wxid}:\n{oss_url}'
  709. input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
  710. input_message=dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message,False)
  711. await request.app.state.kafka_service.send_message_async(input_message)
  712. logger.info("发送对话 %s",input_message)
  713. async def handle_voice_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  714. '''
  715. 私聊语音消息
  716. '''
  717. callback_to_user=from_wxid
  718. msg_content=msg_data["Content"]["string"]
  719. msg_id=msg_data["MsgId"]
  720. config=await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wxid)
  721. validated_config = AgentConfig.model_validate(config)
  722. if not validated_config.privateGroupChatEnabled:
  723. logger.warning(f"微信号 {wxid} 关闭好友和群自动回复功能,{handle_voice_async.__name__} 不回复消息")
  724. return
  725. file_url=await request.app.state.gewe_service.download_audio_msg_async(token_id,app_id,msg_id,msg_content)
  726. react_silk_path=await save_to_local_from_url_async(file_url)
  727. react_wav_path = os.path.splitext(react_silk_path)[0] + ".wav"
  728. audio_convert.any_to_wav(react_silk_path,react_wav_path)
  729. react_voice_text=AliVoice().voiceToText(react_wav_path)
  730. os.remove(react_silk_path)
  731. os.remove(react_wav_path)
  732. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
  733. messages=await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, {"role": "user", "content": react_voice_text})
  734. ai_res=await gpt_client_async(request,messages,wxid,callback_to_user)
  735. ai_res_content=remove_markdown_symbol(ai_res["choices"][0]["message"]["content"])
  736. has_url=contains_url(ai_res_content)
  737. if not has_url:
  738. voice_during,voice_url=wx_voice(ai_res_content)
  739. if voice_during < 60 * 1000:
  740. ret,ret_msg,res=await request.app.state.gewe_service.post_voice_async(token_id,app_id,callback_to_user,voice_url,voice_during)
  741. else:
  742. ret,ret_msg,res=await request.app.state.gewe_service.post_text_async(token_id,app_id,callback_to_user,ai_res_content)
  743. logger.warning(f'回应语音消息长度 {voice_during/1000}秒,超过60秒,转为文本回复')
  744. if ret==200:
  745. logger.info((f'{wxid} 向 {callback_to_user} 发送语音文本【{ai_res_content}】{ret_msg}'))
  746. else:
  747. logger.warning((f'{wxid} 向 {callback_to_user} 发送语音文本【{ai_res_content}】{ret_msg}'))
  748. ret,ret_msg,res==await request.app.state.gewe_service.post_text_async(token_id,app_id,callback_to_user,ai_res_content)
  749. logger.info((f'{wxid} 向 {callback_to_user} 发送文本【{ai_res_content}】{ret_msg}'))
  750. else:
  751. logger.info(f"回复内容包含网址,不发送语音,回复文字内容:{ai_res_content}")
  752. ret,ret_msg,res=await request.app.state.gewe_service.post_text_async(token_id,app_id,callback_to_user,ai_res_content)
  753. await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, {"role": "assistant", "content": ai_res_content})
  754. # 构造对话消息并发送到 Kafka
  755. input_wx_content_dialogue_message = [{"type": "text", "text": ai_res_content}]
  756. input_message = dialogue_message(wxid, callback_to_user, input_wx_content_dialogue_message,True)
  757. await request.app.state.kafka_service.send_message_async(input_message)
  758. logger.info("发送对话 %s", input_message)
  759. async def handle_voice_group_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  760. logger.info('群聊语音消息')
  761. async def handle_name_card_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  762. logger.info('名片消息')
  763. try:
  764. msg_content_xml=msg_data["Content"]["string"]
  765. # 解析XML字符串
  766. root = ET.fromstring(msg_content_xml)
  767. # 提取alias属性
  768. alias_value = root.get("alias")
  769. # 加好友资料
  770. scene = int(root.get("scene"))
  771. v3 = root.get("username")
  772. v4 = root.get("antispamticket")
  773. logger.info(f"alias_value: {alias_value}, scene: {scene}, v3: {v3}, v4: {v4}")
  774. # 判断appid 是否已经创建3天
  775. k, login_info = await request.app.state.gewe_service.get_login_info_by_wxid_async(wxid)
  776. creation_timestamp=int(login_info.get('create_at',time.time()))
  777. current_timestamp = time.time()
  778. three_days_seconds = 3 * 24 * 60 * 60 # 三天的秒数
  779. diff_flag=(current_timestamp - creation_timestamp) >= three_days_seconds
  780. if not diff_flag:
  781. log_content=f'名片添加好友功能,{wxid} 用户创建不够三天,不能使用该功能'
  782. logger.warning(log_content)
  783. return
  784. # 将加好友资料添加到待加好友队列
  785. #gewe_chat.wxchat.enqueue_to_add_contacts(wxid,scene,v3,v4)
  786. _,loginfo=await request.app.state.gewe_service.get_login_info_by_wxid_async(wxid)
  787. nickname=loginfo.get('nickName')
  788. add_contact_content=f'您好,我是{nickname}'
  789. #gewe_chat.wxchat.add_contacts(token_id,app_id,scene,Models.OperationType.ADD_FRIEND,v3,v4,add_contact_content)
  790. await request.app.state.gewe_service.add_contacts_async(token_id,app_id,scene,OperationType.ADD_FRIEND.value,v3,v4,add_contact_content)
  791. except ET.ParseError as e:
  792. logger.error(f"XML解析错误: {e}")
  793. except KeyError as e:
  794. logger.error(f"字典键错误: {e}")
  795. except Exception as e:
  796. logger.error(f"未知错误: {e}")
  797. async def handle_xml_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  798. '''
  799. 处理xml
  800. '''
  801. try:
  802. msg_content_xml=msg_data["Content"]["string"]
  803. root = ET.fromstring(msg_content_xml)
  804. type_value = int(root.find(".//appmsg/type").text)
  805. handlers = {
  806. 57: handle_xml_reference_async,
  807. 5: handle_xml_invite_group_async
  808. }
  809. handler = handlers.get(type_value)
  810. if handler:
  811. return await handler(request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid)
  812. # elif "邀请你加入了群聊" in msg_content_xml: # 邀请加入群聊
  813. # logger.warning(f"xml消息 {type_value} 邀请你加入了群聊.todo")
  814. else:
  815. print(f"xml消息 {type_value} 未解析")
  816. except ET.ParseError as e:
  817. logger.error(f"解析XML失败: {e}")
  818. except Exception as e:
  819. logger.error(f"未知错误: {e}")
  820. return
  821. async def handle_xml_reference_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  822. '''
  823. 引用消息
  824. 判断此类消息的逻辑:$.Data.MsgType=49 并且 解析$.Data.Content.string中的xml msg.appmsg.type=57
  825. '''
  826. callback_to_user=from_wxid
  827. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
  828. msg_content= msg_data["PushContent"]
  829. prompt={"role": "user", "content": [{
  830. "type": "text",
  831. "text": msg_content
  832. }]}
  833. # 收到的对话
  834. messages_to_send=await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, prompt)
  835. input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  836. input_message=dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message)
  837. await request.app.state.kafka_service.send_message_async(input_message)
  838. logger.info("发送对话 %s",input_message)
  839. # 回复的对话
  840. res=await gpt_client_async(request,messages_to_send,wxid,callback_to_user)
  841. reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"])
  842. input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
  843. input_message=dialogue_message(wxid,callback_to_user,input_wx_content_dialogue_message,True)
  844. await request.app.state.kafka_service.kafka_client.produce_message(input_message)
  845. logger.info("发送对话 %s",input_message)
  846. await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, {"role": "assistant", "content": reply_content})
  847. await request.app.state.gewe_service.post_text_async(token_id,app_id,callback_to_user,reply_content)
  848. async def handle_xml_invite_group_async (request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  849. '''
  850. 群聊邀请
  851. 判断此类消息的逻辑:$.Data.MsgType=49
  852. 并且 解析$.Data.Content.string中的xml msg.appmsg.title=邀请你加入群聊(根据手机设置的系统语言title会有调整,不同语言关键字不同)
  853. '''
  854. logger.info(f'{wxid} 群聊邀请')
  855. msg_content_xml=msg_data["Content"]["string"]
  856. root = ET.fromstring(msg_content_xml)
  857. title_value = root.find(".//appmsg/title").text
  858. if '邀请你加入群聊' in title_value:
  859. invite_url = root.find('.//url').text
  860. ret,msg,data=await request.app.state.gewe_service.agree_join_room_async(token_id,app_id,invite_url)
  861. if ret==200:
  862. logger.info(f'群聊邀请,同意加入群聊 {msg} {data}')
  863. chatroom_id=data.get('chatroomId','')
  864. # if not chatroom_id:
  865. # logger.warning(f'群聊邀请,同意加入群聊失败 {msg} {data}')
  866. # return
  867. ret,msg,data=await request.app.state.gewe_service.save_contract_list_async(token_id,app_id,chatroom_id,3)
  868. logger.info(f'群聊邀请,保存到通讯录 chatroom_id {chatroom_id} {msg}')
  869. await request.app.state.gewe_service.update_group_info_to_cache_async(token_id,app_id,wxid,chatroom_id)
  870. await request.app.state.gewe_service.update_group_members_to_cache_async(token_id,app_id,wxid,chatroom_id)
  871. # # 单个群信息推送到kafka
  872. # group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id)
  873. # k_message=wx_mod_group_info_members_message(wxid,group_info_members)
  874. # 全量群信息推送到kafka
  875. #all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid)
  876. #k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers)
  877. k_message=wx_groups_info_members_key_message(wxid)
  878. await request.app.state.kafka_service.send_message_async(k_message)
  879. else:
  880. logger.warning(f'群聊邀请,同意加入群聊失败 {msg} {data}')
  881. async def handle_add_friend_notice_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  882. '''
  883. 好友添加请求通知
  884. '''
  885. logger.info('好友添加请求通知')
  886. try:
  887. msg_content_xml=msg_data["Content"]["string"]
  888. root = ET.fromstring(msg_content_xml)
  889. msg_content = root.attrib.get('content', None)
  890. v3= root.attrib.get('encryptusername', None)
  891. v4= root.attrib.get('ticket', None)
  892. scene=root.attrib.get('scene', None)
  893. to_contact_wxid=root.attrib.get('fromusername', None)
  894. wxid=msg_data["ToUserName"]["string"]
  895. # 自动同意好友
  896. # print(v3)
  897. # print(v4)
  898. # print(scene)
  899. # print(msg_content)
  900. # 操作类型,2添加好友 3同意好友 4拒绝好友
  901. #option=2
  902. option=3
  903. reply_add_contact_contact="亲,我是你的好友"
  904. ret,ret_msg=await request.app.state.gewe_service.add_contacts_async(token_id,app_id,scene,option,v3,v4,reply_add_contact_contact)
  905. if ret==200:
  906. logger.info('自动添加好友成功')
  907. # 好友发送的文字
  908. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{to_contact_wxid}'
  909. prompt={"role": "user", "content": [{"type": "text","text": msg_content}]}
  910. messages_to_send=await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, prompt)
  911. input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  912. input_message=dialogue_message(to_contact_wxid,wxid,input_wx_content_dialogue_message)
  913. await request.app.state.kafka_service.send_message_async(input_message)
  914. logger.info("发送对话 %s",input_message)
  915. callback_to_user=to_contact_wxid
  916. res=await gpt_client_async(messages_to_send,wxid,callback_to_user)
  917. reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"])
  918. #保存好友信息
  919. await request.app.state.gewe_service.save_contacts_brief_to_cache_async(token_id,app_id, wxid,[to_contact_wxid])
  920. # 保存到缓存
  921. await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, {"role": "assistant", "content": reply_content})
  922. # 发送信息
  923. await request.app.state.gewe_service.post_text_async(token_id,app_id, to_contact_wxid,reply_content)
  924. # 发送到kafka
  925. input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
  926. input_message=dialogue_message(wxid,to_contact_wxid,input_wx_content_dialogue_message,True)
  927. request.app.state.kafka_service.send_message_async(input_message)
  928. logger.info("发送对话 %s",input_message)
  929. else:
  930. logger.warning("添加好友失败")
  931. except ET.ParseError as e:
  932. logger.error(f"解析XML失败: {e}")
  933. except Exception as e:
  934. logger.error(f"未知错误: {e}")
  935. return
  936. async def handle_10002_msg_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  937. '''
  938. 群聊邀请
  939. 撤回消息
  940. 拍一拍消息
  941. 地理位置
  942. 踢出群聊通知
  943. 解散群聊通知
  944. 发布群公告
  945. '''
  946. try:
  947. msg_content_xml=msg_data["Content"]["string"]
  948. # 群聊邀请
  949. if '邀请你加入了群聊' in msg_content_xml and check_chatroom(msg_data["FromUserName"]["string"]):
  950. chatroom_id=msg_data["FromUserName"]["string"]
  951. ret,msg,data=await request.app.state.gewe_service.save_contract_list_async(token_id,app_id,chatroom_id,3)
  952. logger.info(f'群聊邀请,保存到通讯录 chatroom_id {chatroom_id} {msg}')
  953. await request.app.state.gewe_service.update_group_info_to_cache_async(token_id,app_id,wxid,chatroom_id)
  954. await request.app.state.gewe_service.update_group_members_to_cache_async(token_id,app_id,wxid,chatroom_id)
  955. # #推送群资料到kafka
  956. # group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id)
  957. # k_message=wx_mod_group_info_members_message(wxid,group_info_members)
  958. # await request.app.state.kafka_service.send_message_async(k_message)
  959. # 全量群信息推送到kafka
  960. #all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid)
  961. #k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers)
  962. k_message=wx_groups_info_members_key_message(wxid)
  963. await request.app.state.kafka_service.send_message_async(k_message)
  964. if '移出了群聊' in msg_content_xml and 'sysmsgtemplate' in msg_content_xml :
  965. chatroom_id=msg_data["FromUserName"]["string"]
  966. ret,msg,data=await request.app.state.gewe_service.save_contract_list_async(token_id,app_id,chatroom_id,2)
  967. logger.info(f'移出群聊,移除从通讯录 chatroom_id {chatroom_id} {msg}')
  968. await request.app.state.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id)
  969. await request.app.state.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_MEMBERS:{wxid}',chatroom_id)
  970. logger.info(f'清除 chatroom_id{chatroom_id} 数据')
  971. # 推送删除群资料到kafka
  972. k_message=wx_del_group_message(wxid,chatroom_id)
  973. await request.app.state.kafka_service.send_message_async(k_message)
  974. if '已解散该群聊' in msg_content_xml and 'sysmsgtemplate' in msg_content_xml :
  975. chatroom_id=msg_data["FromUserName"]["string"]
  976. ret,msg,data=await request.app.state.gewe_service.save_contract_list_async(token_id,app_id,chatroom_id,2)
  977. logger.info(f'解散群聊,移除从通讯录 chatroom_id {chatroom_id} {msg}')
  978. await request.app.state.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id)
  979. await request.app.state.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_MEMBERS:{wxid}',chatroom_id)
  980. logger.info(f'清除 chatroom_id{chatroom_id} 数据')
  981. # 推送删除群资料到kafka
  982. k_message=wx_del_group_message(wxid,chatroom_id)
  983. await request.app.state.kafka_service.send_message_async(k_message)
  984. if 'mmchatroombarannouncememt' in msg_content_xml :
  985. chatroom_id=msg_data["FromUserName"]["string"]
  986. logger.info(f'发布群公告 chatroom_id {chatroom_id}')
  987. await request.app.state.gewe_service.update_group_info_to_cache_async(token_id,app_id,wxid,chatroom_id)
  988. await request.app.state.gewe_service.update_group_members_to_cache_async(token_id,app_id,wxid,chatroom_id)
  989. # #推送群资料到kafka
  990. # group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id)
  991. # k_message=wx_mod_group_info_members_message(wxid,group_info_members)
  992. # await request.app.state.kafka_service.send_message_async(k_message)、
  993. # 全量群信息推送到kafka
  994. #all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid)
  995. #k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers)
  996. k_message=wx_groups_info_members_key_message(wxid)
  997. await request.app.state.kafka_service.send_message_async(k_message)
  998. if 'roomtoolstips' in msg_content_xml :
  999. chatroom_id=msg_data["FromUserName"]["string"]
  1000. logger.info(f'群待办 chatroom_id {chatroom_id} ')
  1001. await request.app.state.gewe_service.update_group_info_to_cache_async(token_id,app_id,wxid,chatroom_id)
  1002. await request.app.state.gewe_service.update_group_members_to_cache_async(token_id,app_id,wxid,chatroom_id)
  1003. # #推送群资料到kafka
  1004. # group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id)
  1005. # k_message=wx_mod_group_info_members_message(wxid,group_info_members)
  1006. # await request.app.state.kafka_service.send_message_async(k_message)
  1007. # 全量群信息推送到kafka
  1008. #all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid)
  1009. #k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers)
  1010. k_message=wx_groups_info_members_key_message(wxid)
  1011. await request.app.state.kafka_service.send_message_async(k_message)
  1012. except ET.ParseError as e:
  1013. logger.error(f"解析XML失败: {e}")
  1014. except Exception as e:
  1015. logger.error(f"未知错误: {e}")
  1016. return
  1017. async def handle_10000_msg_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  1018. '''
  1019. 修改群名称
  1020. 更换群主通知
  1021. 被移除群聊通知
  1022. '''
  1023. content=msg_data.get("Content","").get("string","")
  1024. if '修改群名' or '新群主' or '被移除群聊通知' in content and check_chatroom(from_wxid):
  1025. chatroom_id=from_wxid
  1026. logger.info(f'{content} chatroom_id {chatroom_id} ')
  1027. await request.app.state.gewe_service.update_group_info_to_cache_async(token_id,app_id,wxid,chatroom_id)
  1028. await request.app.state.gewe_service.update_group_members_to_cache_async(token_id,app_id,wxid,chatroom_id)
  1029. # group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id)
  1030. # k_message=wx_mod_group_info_members_message(wxid,group_info_members)
  1031. # await request.app.state.kafka_service.send_message_async(k_message)
  1032. # 全量群信息推送到kafka
  1033. #all_groups_info_menbers=await request.app.state.gewe_service.get_groups_info_members_from_cache_async(wxid)
  1034. #k_message=wx_groups_info_members_message(wxid,all_groups_info_menbers)
  1035. k_message=wx_groups_info_members_key_message(wxid)
  1036. await request.app.state.kafka_service.send_message_async(k_message)
  1037. return