Vous ne pouvez pas sélectionner plus de 25 sujets Les noms de sujets doivent commencer par une lettre ou un nombre, peuvent contenir des tirets ('-') et peuvent comporter jusqu'à 35 caractères.

935 lignes
44KB

  1. from voice.ali.ali_voice import AliVoice
  2. from common.log import logger
  3. import xml.etree.ElementTree as ET
  4. import os,json,asyncio,aiohttp
  5. from voice import audio_convert
  6. from fastapi import APIRouter,Request
  7. from pydantic import BaseModel
  8. from fastapi import APIRouter, Depends
  9. from typing import Dict, Any
  10. from model.models import AgentConfig,OperationType
  11. from common.utils import *
  12. from common.memory import *
  13. timeout_duration = 2.0
  14. messages_router = APIRouter()
  15. WX_BACKLIST=['fmessage', 'medianote','weixin','weixingongzhong','tmessage']
  16. @messages_router.post("/messages",response_model=None)
  17. async def get_chatroominfo(request: Request, body: Dict[str, Any]):
  18. logger.info(f"收到微信回调消息: {json.dumps(msg, separators=(',', ':'),ensure_ascii=False)}")
  19. try:
  20. msg = body
  21. type_name =msg.get("TypeName")
  22. app_id = msg.get("Appid")
  23. k, loginfo = await request.app.state.gewe_service.get_login_info_by_app_id_async(app_id)
  24. if not k:
  25. logger.warning('找不到登录信息,不处理')
  26. return {"message": f"收到微信回调消息: {type_name}"}
  27. token_id=loginfo.get('tokenId','')
  28. wxid = msg.get("Wxid",'')
  29. if type_name == 'AddMsg':
  30. await handle_self_cmd_async(request,wxid,msg)
  31. msg_data = msg.get("Data")
  32. from_wxid = msg_data["FromUserName"]["string"]
  33. config=await request.app.state.redis_service.get_hash(f"__AI_OPS_WX__:WXCHAT_CONFIG")
  34. wxids=config.keys()
  35. WX_BACKLIST.extend(wxids)
  36. if from_wxid in WX_BACKLIST:
  37. logger.warning(f'微信ID {wxid} 在黑名单,不处理')
  38. return {"message": "收到微信回调消息"}
  39. await handle_messages_async(request,token_id,msg)
  40. return {"message": "收到微信回调消息,处理完成"}
  41. except Exception as e:
  42. logger.error(f"无法解析微信回调消息: {body} {e}")
  43. return {"message": "无法解析微信回调消息"}
  44. async def handle_self_cmd_async(request: Request,wxid,msg):
  45. '''
  46. 个人微信命令处理
  47. 如果是个人微信的指令,wxid == from_wxid
  48. commands = {
  49. '启用托管': True,
  50. '关闭托管': False
  51. }
  52. '''
  53. msg_data=msg.get("Data")
  54. from_wxid=msg_data["FromUserName"]["string"]
  55. msg_content=msg_data["Content"]["string"]
  56. if wxid == from_wxid:
  57. commands = {
  58. '启用托管': True,
  59. '关闭托管': False
  60. }
  61. if msg_content in commands:
  62. c_dict = await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wxid)
  63. if c_dict:
  64. agent_config=AgentConfig.model_validate(c_dict)
  65. agent_config.agentEnabled=commands[msg_content]
  66. await request.app.state.gewe_service.wxchat.save_wxchat_config_async(wxid, agent_config.model_dump())
  67. logger.info(f'{wxid} {"启动" if commands[msg_content] else "关闭"}托管')
  68. async def handle_messages_async(request: Request,token_id,msg):
  69. #msg_data=msg.get("Data")
  70. type_name =msg.get("TypeName")
  71. # app_id = msg.get("Appid")
  72. # from_wxid=msg_data["FromUserName"]["string"]
  73. # msg_content=msg_data["Content"]["string"]
  74. wxid = msg.get("Wxid",'')
  75. match type_name:
  76. case 'AddMsg':
  77. await handle_add_messages_async(request,token_id,msg,wxid)
  78. case 'ModContacts':
  79. await handle_mod_contacts_async(request,token_id,msg,wxid)
  80. case 'DelContacts':
  81. await handle_del_contacts_async(request,token_id,msg,wxid)
  82. case 'Offline':
  83. await handle_offline_async(request,token_id,msg,wxid)
  84. case _:
  85. logger.warning(f'未知消息类型:{type_name}')
  86. async def gpt_client_async(request,messages: list, wixd: str, friend_wxid: str):
  87. c = await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wixd)
  88. api_key = c.get('agentTokenId', "sk-jr69ONIehfGKe9JFphuNk4DU5Y5wooHKHhQv7oSnFzVbwCnW65fXO9kvH")
  89. print(f'流程key:{api_key}\n')
  90. #api_key="sk-jr69ONIehfGKe9JFphuNk4DU5Y5wooHKHhQv7oSnFzVbwCnW65fXO9kvH" #测试
  91. #api_key="sk-uJDBdKmJVb2cmfldGOvlIY6Qx0AzqWMPD3lS1IzgQYzHNOXv9SKNI" #开发2
  92. api_url = "http://106.15.182.218:3000/api/v1/chat/completions"
  93. headers = {
  94. "Content-Type": "application/json",
  95. "Authorization": f"Bearer {api_key}"
  96. }
  97. session_id = f'{wixd}-{friend_wxid}'
  98. data = {
  99. "model": "",
  100. "messages": messages,
  101. "chatId": session_id,
  102. "detail": True
  103. }
  104. logger.info("[CHATGPT] 请求={}".format(json.dumps(data, ensure_ascii=False)))
  105. async with aiohttp.ClientSession() as session:
  106. try:
  107. async with session.post(url=api_url, headers=headers, data=json.dumps(data), timeout=600) as response:
  108. response.raise_for_status()
  109. response_data = await response.json()
  110. logger.info("[CHATGPT] 响应={}".format(json.dumps(response_data, separators=(',', ':'), ensure_ascii=False)))
  111. return response_data
  112. except aiohttp.ClientError as e:
  113. logger.error(f"请求失败: {e}")
  114. raise
  115. async def handle_add_messages_async(request: Request,token_id,msg,wxid):
  116. wx_config =await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wxid)
  117. if not bool(wx_config.get("agentEnabled",False)):
  118. logger.info(f'微信ID {wxid} 未托管,不处理')
  119. return
  120. app_id = msg.get("Appid")
  121. msg_data = msg.get("Data")
  122. msg_type = msg_data.get("MsgType",None)
  123. from_wxid = msg_data["FromUserName"]["string"]
  124. to_wxid = msg_data["ToUserName"]["string"]
  125. msg_push_content=msg_data.get("PushContent")
  126. handlers = {
  127. 1: handle_text_async,
  128. 3: handle_image_async,
  129. 34: handle_voice_async,
  130. 42: handle_name_card_async,
  131. 49: handle_xml_async,
  132. 37: handle_add_friend_notice_async,
  133. 10002: handle_10002_msg
  134. }
  135. # (扫码进群情况)判断受否是群聊,并添加到通信录
  136. if check_chatroom(from_wxid) or check_chatroom(to_wxid):
  137. logger.info('群信息')
  138. chatroom_id=from_wxid
  139. ret,msg,data=await request.app.state.gewe_service.save_contract_list_async(token_id,app_id,chatroom_id,3)
  140. logger.info(f'保存到通讯录 chatroom_id {chatroom_id} {msg}')
  141. await request.app.state.gewe_service.update_group_info_to_cache_async(token_id,app_id,wxid,chatroom_id)
  142. await request.app.state.gewe_service.update_group_members_to_cache_async(token_id,app_id,wxid,chatroom_id)
  143. handlers[1]=handle_text_group_async
  144. handlers[3]=handle_image_group_async
  145. handlers[34]=handle_voice_group_async
  146. handler = handlers.get(msg_type)
  147. if handler:
  148. return await handler(request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid)
  149. else:
  150. logger.warning(f"微信回调消息类型 {msg_type} 未处理")
  151. async def handle_mod_contacts_async(request: Request,token_id,msg,wxid):
  152. '''
  153. 好友通过验证及好友资料变更的通知消息
  154. '''
  155. wxid = msg.get("Wxid")
  156. msg_data = msg.get("Data")
  157. app_id = msg.get("Appid")
  158. #
  159. #handle_mod_contacts(token_id,app_id,wxid,msg_data)
  160. #
  161. loop = asyncio.get_event_loop()
  162. future = asyncio.run_coroutine_threadsafe(
  163. handle_mod_contacts_worker_async(request,token_id,app_id,wxid,msg_data),
  164. loop
  165. )
  166. contact_wxid =msg_data.get("UserName",{}).get("string","") #msg_data["UserName"]["string"]
  167. nickname= msg_data.get("NickName",{}).get("string","")#msg_data["NickName"]["string"]
  168. city=msg_data.get("City",None)
  169. signature=msg_data.get("Signature",None)
  170. province=msg_data.get("Province",None)
  171. bigHeadImgUrl=msg_data.get("SnsUserInfo",{}).get("SnsBgimgId",None) #msg_data["SnsUserInfo"]["SnsBgimgId"]
  172. country=msg_data.get("Country",None)
  173. sex=msg_data.get("Sex",None)
  174. pyInitial= msg_data.get("PyInitial",{}).get("string",None)#msg_data["PyInitial"]["string"]
  175. quanPin=msg_data.get("QuanPin",{}).get("string",None) #msg_data["QuanPin"]["string"]
  176. remark=msg_data.get("Remark",{}).get("string",None)
  177. remarkPyInitial=msg_data.get("RemarkPyInitial",{}).get("string",None)
  178. remarkQuanPin=msg_data.get("RemarkQuanPin",{}).get("string",None)
  179. smallHeadImgUrl=msg_data.get("smallHeadImgUrl",None)
  180. # data=gewe_chat.wxchat.get_brief_info(token_id,app_id,[contact_wxid])
  181. # contact=data[0]
  182. # alias=contact.get("alias")
  183. #推动到kafka
  184. contact_data = {
  185. "alias": None,
  186. "bigHeadImgUrl": bigHeadImgUrl,
  187. "cardImgUrl": None,
  188. "city": city,
  189. "country": country,
  190. "description": None,
  191. "labelList": None,
  192. "nickName": nickname,
  193. "phoneNumList": None,
  194. "province": province,
  195. "pyInitial": pyInitial,
  196. "quanPin": quanPin,
  197. "remark": remark,
  198. "remarkPyInitial": remarkPyInitial,
  199. "remarkQuanPin": remarkQuanPin,
  200. "sex": sex,
  201. "signature": signature,
  202. "smallHeadImgUrl": smallHeadImgUrl,
  203. "snsBgImg": None,
  204. "userName": contact_wxid
  205. }
  206. input_message=wx_mod_contact_message(wxid,contact_data)
  207. await request.app.state.kafka_service.send_message_async(input_message)
  208. async def handle_del_contacts_async(request: Request,token_id,msg,wxid):
  209. '''
  210. 删除好友通知/退出群聊
  211. '''
  212. msg_data = msg.get("Data")
  213. username=msg_data["UserName"]["string"]
  214. if check_chatroom(username):
  215. logger.info('退出群聊')
  216. wxid = msg.get("Wxid")
  217. chatroom_id=username
  218. await request.app.state.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id)
  219. await request.app.state.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_MEMBERS:{wxid}',chatroom_id)
  220. logger.info(f'清除 chatroom_id{chatroom_id} 数据')
  221. # 推送删除群资料到kafka
  222. k_message=wx_del_group_message(wxid,chatroom_id)
  223. await request.app.state.kafka_service.send_message_async(k_message)
  224. else:
  225. logger.info('删除好友通知')
  226. # 推送到kafka
  227. input_message=wx_del_contact_message(wxid,username)
  228. await request.app.state.kafka_service.send_message_async(input_message)
  229. async def handle_offline_async(request: Request,token_id,msg,wxid):
  230. '''
  231. 已经离线
  232. '''
  233. wxid = msg.get("Wxid")
  234. app_id = msg.get("Appid")
  235. logger.warning(f'微信ID {wxid}在设备{app_id}已经离线')
  236. k,r=await request.app.state.gewe_service.get_login_info_by_app_id_async(app_id)
  237. print(k)
  238. await request.app.state.redis_service.update_hash_field(k,'status',0)
  239. await request.app.state.redis_service.update_hash_field(k,'modify_at',int(time.time()))
  240. # 推送到kafka
  241. input_message=wx_offline_message(app_id,wxid)
  242. await request.app.state.kafka_service.send_message_async(input_message)
  243. async def handle_mod_contacts_worker_async(request:Request,token_id,app_id,wxid,msg_data):
  244. '''
  245. 好友通过验证及好友资料变更的通知消息
  246. '''
  247. logger.info('好友通过验证及好友资料变更的通知消息')
  248. if not check_chatroom(msg_data["UserName"]["string"]):
  249. contact_wxid = msg_data["UserName"]["string"]
  250. # 更新好友信息
  251. # 检查好友关系,不是好友则删除
  252. # ret,msg,check_relation=gewe_chat.wxchat.check_relation(token_id, app_id,[contact_wxid])
  253. # first_item = check_relation[0]
  254. # check_relation_status=first_item.get('relation')
  255. # logger.info(f'{wxid} 好友 {contact_wxid} 关系检查:{check_relation_status}')
  256. # if check_relation_status != 0:
  257. # gewe_chat.wxchat.delete_contacts_brief_from_cache(wxid, [contact_wxid])
  258. # logger.info(f'好友关系异常:{check_relation_status},删除好友 {contact_wxid} 信息')
  259. # else:
  260. # gewe_chat.wxchat.save_contacts_brief_to_cache(token_id, app_id, wxid, [contact_wxid])
  261. ret,msg,contacts_list = await request.app.state.gewe_service.fetch_contacts_list_async(token_id, app_id)
  262. # friend_wxids = contacts_list['friends'][3:] # 可以调整截取范围
  263. # print(friend_wxids)
  264. #friend_wxids.remove('fmessage')
  265. #friend_wxids.remove('weixin')
  266. friend_wxids = [c for c in contacts_list['friends'] if c not in ['fmessage', 'medianote','weixin','weixingongzhong','tmessage']] # 可以调整截取范围
  267. print(f'{wxid}的好友数量 {len(friend_wxids)}')
  268. await request.app.state.gewe_service.save_contacts_brief_to_cache_async(token_id, app_id, wxid, friend_wxids)
  269. else:
  270. logger.info('群聊好友通过验证及好友资料变更的通知消息')
  271. async def handle_text_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  272. '''
  273. 私聊文本消息
  274. '''
  275. msg_content=msg_data["Content"]["string"]
  276. if wxid == from_wxid: #手动发送消息
  277. logger.info("Active message sending detected")
  278. await request.app.state.gewe_service.save_contacts_brief_to_cache_async(token_id,app_id,wxid,[to_wxid])
  279. callback_to_user=msg_data["ToUserName"]["string"]
  280. input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  281. input_message=dialogue_message(from_wxid,to_wxid,input_wx_content_dialogue_message)
  282. await request.app.state.kafaka_service.send_message_async(input_message)
  283. logger.info("发送对话 %s",input_message)
  284. else:
  285. callback_to_user=msg_data["FromUserName"]["string"]
  286. # 创建并启动任务协程,将参数传递给 ai_chat_text 函数
  287. task = asyncio.create_task(
  288. ai_chat_text_async(request,token_id, app_id, wxid, msg_data, msg_content)
  289. )
  290. # 设置定时器,1秒后检查任务是否超时。这里需要使用 lambda 来传递参数
  291. timeout_timer = asyncio.create_task(
  292. check_timeout_async(task, request,token_id, wxid, app_id, callback_to_user)
  293. )
  294. # 等待任务协程完成
  295. await task
  296. # 取消定时器
  297. timeout_timer.cancel()
  298. async def check_timeout_async(task: asyncio.Task, request: Request,token_id, wxid, app_id, callback_to_user):
  299. await asyncio.sleep(timeout_duration) # 等待超时时间
  300. if not task.done():
  301. print(f"任务运行时间超过{timeout_duration}秒,token_id={token_id}, app_id={app_id}, callback_to_user={callback_to_user}")
  302. wx_config = await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wxid)
  303. if bool(wx_config.get("chatWaitingMsgEnabled", True)):
  304. await request.app.state.gewe_service.post_text_async(token_id, app_id, callback_to_user, "亲,我正在组织回复的信息,请稍等一会")
  305. async def ai_chat_text_async(request: Request,token_id, app_id, wxid, msg_data, msg_content):
  306. start_time = time.time() # 记录任务开始时间
  307. callback_to_user = msg_data["FromUserName"]["string"]
  308. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
  309. prompt = {"role": "user", "content": [{
  310. "type": "text",
  311. "text": msg_content
  312. }]}
  313. messages_to_send = await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, prompt)
  314. # 收到的对话
  315. input_wx_content_dialogue_message = [{"type": "text", "text": msg_content}]
  316. input_message = dialogue_message(callback_to_user, wxid, input_wx_content_dialogue_message)
  317. await request.app.state.kafka_service.send_message_async(input_message)
  318. logger.info("发送对话 %s", input_message)
  319. cache_data = USER_INTERACTIVE_CACHE.get(wxid)
  320. if cache_data and cache_data.get('interactive'):
  321. o = get_first_char_if_digit(msg_content)
  322. if o is not None:
  323. userSelectOptions = cache_data.get('options')
  324. if o < len(userSelectOptions):
  325. o = o - 1
  326. msg_content = userSelectOptions[o].get("value")
  327. messages_to_send = [{"role": "user", "content": msg_content}]
  328. else:
  329. messages_to_send = [{"role": "user", "content": msg_content}]
  330. else:
  331. messages_to_send = [{"role": "user", "content": msg_content}]
  332. res = await gpt_client_async(request,messages_to_send, wxid, callback_to_user)
  333. reply_content = remove_markdown_symbol(res["choices"][0]["message"]["content"])
  334. description = ''
  335. userSelectOptions = []
  336. if isinstance(reply_content, list) and any(item.get("type") == "interactive" for item in reply_content):
  337. for item in reply_content:
  338. if item["type"] == "interactive" and item["interactive"]["type"] == "userSelect":
  339. params = item["interactive"]["params"]
  340. description = params.get("description")
  341. userSelectOptions = params.get("userSelectOptions", [])
  342. values_string = "\n".join(option["value"] for option in userSelectOptions)
  343. if description is not None:
  344. USER_INTERACTIVE_CACHE[wxid] = {
  345. "interactive": True,
  346. "options": userSelectOptions,
  347. }
  348. reply_content = description + '------------------------------\n' + values_string
  349. elif isinstance(reply_content, list) and any(item.get("type") == "text" for item in reply_content):
  350. USER_INTERACTIVE_CACHE[wxid] = {
  351. "interactive": False
  352. }
  353. text = ''
  354. for item in reply_content:
  355. if item["type"] == "text":
  356. text = item["text"]["content"]
  357. if text == '':
  358. # 去除上次上一轮对话再次请求
  359. cache_messages_str = await request.app.state.redis_service.get_hash_field(hash_key, "data")
  360. cache_messages = json.loads(cache_messages_str) if cache_messages_str else []
  361. if len(cache_messages) >= 3:
  362. cache_messages = cache_messages[:-3]
  363. await request.app.state.redis_service.update_hash_field(hash_key, "data", json.dumps(cache_messages, ensure_ascii=False))
  364. messages_to_send = await request.app.state.redis_service.save_session_messages_to_cache_async(hash_key, prompt)
  365. res = await gpt_client_async(request,messages_to_send, wxid, callback_to_user)
  366. reply_content = remove_markdown_symbol(res["choices"][0]["message"]["content"])
  367. if isinstance(reply_content, list):
  368. reply_content = remove_markdown_symbol(reply_content[0].get('text').get("content"))
  369. else:
  370. reply_content = text
  371. else:
  372. USER_INTERACTIVE_CACHE[wxid] = {
  373. "interactive": False
  374. }
  375. reply_content = remove_markdown_symbol(res["choices"][0]["message"]["content"])
  376. await request.app.state.gewe_service.post_text_async(token_id, app_id, callback_to_user, reply_content)
  377. await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, {"role": "assistant", "content": reply_content})
  378. # 回复的对话
  379. input_wx_content_dialogue_message = [{"type": "text", "text": reply_content}]
  380. input_message = dialogue_message(wxid, callback_to_user, input_wx_content_dialogue_message, True)
  381. await request.app.state.kafka_service.send_message_async(input_message)
  382. logger.info("发送对话 %s", input_message)
  383. end_time = time.time() # 记录任务结束时间
  384. execution_time = end_time - start_time # 计算执行时间
  385. logger.info(f"AI回答任务完成,耗时 {execution_time:.2f} 秒")
  386. async def handle_text_group_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  387. '''
  388. 群聊文本消息
  389. '''
  390. msg_content=msg_data["Content"]["string"]
  391. msg_push_content=msg_data.get("PushContent")
  392. k,login_info=await request.app.state.gewe_service.get_login_info_by_app_id_async(app_id)
  393. nickname=login_info.get("nickName")
  394. if wxid == from_wxid: #手动发送消息
  395. logger.info("Active message sending detected")
  396. await request.app.state.gewe_service.save_contacts_brief_to_cache_async(token_id,app_id,wxid,[to_wxid])
  397. callback_to_user=msg_data["ToUserName"]["string"]
  398. input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  399. input_message=dialogue_message(from_wxid,to_wxid,input_wx_content_dialogue_message)
  400. await request.app.state.kafka_service.send_message_async(input_message)
  401. logger.info("发送对话 %s",input_message)
  402. else:
  403. c = await request.app.state.gewe_service.get_wxchat_config_from_cache_async(wxid)
  404. chatroom_id_white_list = c.get("chatroomIdWhiteList", [])
  405. if not chatroom_id_white_list:
  406. logger.info('白名单为空或未定义,不处理')
  407. return
  408. if from_wxid not in chatroom_id_white_list:
  409. logger.info(f'群ID {from_wxid} 不在白名单中,不处理')
  410. return
  411. if '在群聊中@了你' in msg_push_content or '@'+nickname in msg_push_content:
  412. callback_to_user=msg_data["FromUserName"]["string"]
  413. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
  414. prompt={"role": "user", "content": [{
  415. "type": "text",
  416. "text": msg_content
  417. }]}
  418. messages_to_send=await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, prompt)
  419. # 收到的对话
  420. input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  421. input_message=dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message)
  422. await request.app.state.kafka_service.send_message_async(input_message)
  423. logger.info("发送对话 %s",input_message)
  424. cache_data = USER_INTERACTIVE_CACHE.get(wxid)
  425. if cache_data and cache_data.get('interactive') :
  426. o=get_first_char_if_digit(msg_content)
  427. if o is not None:
  428. userSelectOptions=cache_data.get('options')
  429. if o < len(userSelectOptions):
  430. o=o-1
  431. msg_content=userSelectOptions[o].get("value")
  432. messages_to_send=[{"role": "user", "content": msg_content}]
  433. else:
  434. messages_to_send=[{"role": "user", "content": msg_content}]
  435. else:
  436. messages_to_send=[{"role": "user", "content": msg_content}]
  437. res=await gpt_client_async(request,messages_to_send,wxid,callback_to_user)
  438. reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"])
  439. description = ''
  440. userSelectOptions = []
  441. if isinstance(reply_content, list) and any(item.get("type") == "interactive" for item in reply_content):
  442. for item in reply_content:
  443. if item["type"] == "interactive" and item["interactive"]["type"] == "userSelect":
  444. params = item["interactive"]["params"]
  445. description = params.get("description")
  446. userSelectOptions = params.get("userSelectOptions", [])
  447. values_string = "\n".join(option["value"] for option in userSelectOptions)
  448. if description is not None:
  449. USER_INTERACTIVE_CACHE[wxid] = {
  450. "interactive":True,
  451. "options": userSelectOptions,
  452. }
  453. reply_content=description + '------------------------------\n'+values_string
  454. elif isinstance(reply_content, list) and any(item.get("type") == "text" for item in reply_content):
  455. USER_INTERACTIVE_CACHE[wxid] = {
  456. "interactive":False
  457. }
  458. text=''
  459. for item in reply_content:
  460. if item["type"] == "text":
  461. text=item["text"]["content"]
  462. if text=='':
  463. # 去除上次上一轮对话再次请求
  464. cache_messages_str=await request.app.state.redis_service.get_hash_field(hash_key,"data")
  465. cache_messages = json.loads(cache_messages_str) if cache_messages_str else []
  466. if len(cache_messages) >= 3:
  467. cache_messages = cache_messages[:-3]
  468. await request.app.state.redis_service.update_hash_field(hash_key,"data",json.dumps(cache_messages,ensure_ascii=False))
  469. messages_to_send=await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, prompt)
  470. res=await gpt_client_async(request,messages_to_send,wxid,callback_to_user)
  471. reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"])
  472. else:
  473. reply_content=text
  474. else:
  475. USER_INTERACTIVE_CACHE[wxid] = {
  476. "interactive":False
  477. }
  478. reply_content=res["choices"][0]["message"]["content"]
  479. reply_content='@'+extract_nickname(msg_push_content) + reply_content
  480. await request.app.state.gewe_service.post_text_async(token_id,app_id,callback_to_user,reply_content)
  481. await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, {"role": "assistant", "content": reply_content})
  482. # 回复的对话
  483. input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
  484. input_message=dialogue_message(wxid,callback_to_user,input_wx_content_dialogue_message,True)
  485. await request.app.state.kafka_service.send_message_async(input_message)
  486. logger.info("发送对话 %s",input_message)
  487. else:
  488. logger.info('群聊公开消息')
  489. callback_to_user=msg_data["FromUserName"]["string"]
  490. group_dialogue_message=[{"type": "text", "text": msg_content}]
  491. input_message=dialogue_message(callback_to_user,wxid,group_dialogue_message)
  492. await request.app.state.kafka_service.send_message_async(input_message)
  493. logger.info("发送对话 %s",input_message)
  494. return
  495. async def handle_image_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  496. '''
  497. 私聊图片消息
  498. '''
  499. msg_content=msg_data["Content"]["string"]
  500. callback_to_user=from_wxid
  501. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
  502. wx_img_url= await request.app.state.gewe_service.download_image_msg_async(token_id,app_id,msg_content)
  503. oss_access_key_id="LTAI5tRTG6pLhTpKACJYoPR5"
  504. oss_access_key_secret="E7dMzeeMxq4VQvLg7Tq7uKf3XWpYfN"
  505. oss_endpoint="http://oss-cn-shanghai.aliyuncs.com"
  506. oss_bucket_name="cow-agent"
  507. oss_prefix="cow"
  508. img_url=upload_oss(oss_access_key_id, oss_access_key_secret, oss_endpoint, oss_bucket_name, wx_img_url, oss_prefix)
  509. prompt={
  510. "role": "user",
  511. "content": [{
  512. "type": "image_url",
  513. "image_url": {"url": img_url}
  514. }]
  515. }
  516. await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, prompt)
  517. await request.app.state.gewe_service.post_text_async(token_id,app_id,callback_to_user,'已经上传了图片,有什么可以为您服务')
  518. logger.info(f"上传图片 URL: {img_url}")
  519. wx_content_dialogue_message=[{"type": "image_url", "image_url": {"url": img_url}}]
  520. input_message=dialogue_message(wxid,callback_to_user,wx_content_dialogue_message)
  521. await request.app.state.kafka_service.send_message_async(input_message)
  522. logger.info("发送对话 %s",input_message)
  523. async def handle_image_group_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  524. logger.info('群聊图片消息')
  525. async def handle_voice_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  526. '''
  527. 私聊语音消息
  528. '''
  529. callback_to_user=from_wxid
  530. msg_content=msg_data["Content"]["string"]
  531. msg_id=msg_data["MsgId"]
  532. file_url=await request.app.state.gewe_service.download_audio_msg_async(token_id,app_id,msg_id,msg_content)
  533. react_silk_path=await save_to_local_from_url_async(file_url)
  534. react_wav_path = os.path.splitext(react_silk_path)[0] + ".wav"
  535. audio_convert.any_to_wav(react_silk_path,react_wav_path)
  536. react_voice_text=AliVoice().voiceToText(react_wav_path)
  537. os.remove(react_silk_path)
  538. os.remove(react_wav_path)
  539. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
  540. messages=await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, {"role": "user", "content": react_voice_text})
  541. ai_res=await gpt_client_async(request,messages,wxid,callback_to_user)
  542. ai_res_content=remove_markdown_symbol(ai_res["choices"][0]["message"]["content"])
  543. has_url=contains_url(ai_res_content)
  544. if not has_url:
  545. voice_during,voice_url=wx_voice(ai_res_content)
  546. if voice_during < 60 * 1000:
  547. ret,ret_msg,res=await request.app.state.gewe_service.post_voice_async(token_id,app_id,callback_to_user,voice_url,voice_during)
  548. else:
  549. ret,ret_msg,res=await request.app.state.gewe_service.post_text_async(token_id,app_id,callback_to_user,ai_res_content)
  550. logger.warning(f'回应语音消息长度 {voice_during/1000}秒,超过60秒,转为文本回复')
  551. if ret==200:
  552. logger.info((f'{wxid} 向 {callback_to_user} 发送语音文本【{ai_res_content}】{ret_msg}'))
  553. else:
  554. logger.warning((f'{wxid} 向 {callback_to_user} 发送语音文本【{ai_res_content}】{ret_msg}'))
  555. ret,ret_msg,res==await request.app.state.gewe_service.post_text_async(token_id,app_id,callback_to_user,ai_res_content)
  556. logger.info((f'{wxid} 向 {callback_to_user} 发送文本【{ai_res_content}】{ret_msg}'))
  557. else:
  558. logger.info(f"回复内容包含网址,不发送语音,回复文字内容:{ai_res_content}")
  559. ret,ret_msg,res=await request.app.state.gewe_service.post_text_async(token_id,app_id,callback_to_user,ai_res_content)
  560. await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, {"role": "assistant", "content": ai_res_content})
  561. # 构造对话消息并发送到 Kafka
  562. input_wx_content_dialogue_message = [{"type": "text", "text": ai_res_content}]
  563. input_message = dialogue_message(wxid, callback_to_user, input_wx_content_dialogue_message,True)
  564. await request.app.state.kafka_service.send_message_async(input_message)
  565. logger.info("发送对话 %s", input_message)
  566. async def handle_voice_group_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  567. logger.info('群聊语音消息')
  568. async def handle_name_card_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  569. logger.info('名片消息')
  570. try:
  571. msg_content_xml=msg_data["Content"]["string"]
  572. # 解析XML字符串
  573. root = ET.fromstring(msg_content_xml)
  574. # 提取alias属性
  575. alias_value = root.get("alias")
  576. # 加好友资料
  577. scene = int(root.get("scene"))
  578. v3 = root.get("username")
  579. v4 = root.get("antispamticket")
  580. logger.info(f"alias_value: {alias_value}, scene: {scene}, v3: {v3}, v4: {v4}")
  581. # 判断appid 是否已经创建3天
  582. k, login_info = await request.app.state.gewe_service.get_login_info_by_wxid_async(wxid)
  583. creation_timestamp=int(login_info.get('create_at',time.time()))
  584. current_timestamp = time.time()
  585. three_days_seconds = 3 * 24 * 60 * 60 # 三天的秒数
  586. diff_flag=(current_timestamp - creation_timestamp) >= three_days_seconds
  587. if not diff_flag:
  588. log_content=f'名片添加好友功能,{wxid} 用户创建不够三天,不能使用该功能'
  589. logger.warning(log_content)
  590. return
  591. # 将加好友资料添加到待加好友队列
  592. #gewe_chat.wxchat.enqueue_to_add_contacts(wxid,scene,v3,v4)
  593. _,loginfo=await request.app.state.gewe_service.get_login_info_by_wxid_async(wxid)
  594. nickname=loginfo.get('nickName')
  595. add_contact_content=f'您好,我是{nickname}'
  596. #gewe_chat.wxchat.add_contacts(token_id,app_id,scene,Models.OperationType.ADD_FRIEND,v3,v4,add_contact_content)
  597. await request.app.state.gewe_service.add_contacts_async(token_id,app_id,scene,OperationType.ADD_FRIEND.value,v3,v4,add_contact_content)
  598. except ET.ParseError as e:
  599. logger.error(f"XML解析错误: {e}")
  600. except KeyError as e:
  601. logger.error(f"字典键错误: {e}")
  602. except Exception as e:
  603. logger.error(f"未知错误: {e}")
  604. async def handle_xml_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  605. '''
  606. 处理xml
  607. '''
  608. try:
  609. msg_content_xml=msg_data["Content"]["string"]
  610. root = ET.fromstring(msg_content_xml)
  611. type_value = int(root.find(".//appmsg/type").text)
  612. handlers = {
  613. 57: handle_xml_reference_async,
  614. 5: handle_xml_invite_group_async
  615. }
  616. handler = handlers.get(type_value)
  617. if handler:
  618. return await handler(request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid)
  619. # elif "邀请你加入了群聊" in msg_content_xml: # 邀请加入群聊
  620. # logger.warning(f"xml消息 {type_value} 邀请你加入了群聊.todo")
  621. else:
  622. print(f"xml消息 {type_value} 未解析")
  623. except ET.ParseError as e:
  624. logger.error(f"解析XML失败: {e}")
  625. except Exception as e:
  626. logger.error(f"未知错误: {e}")
  627. return
  628. async def handle_xml_reference_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  629. '''
  630. 引用消息
  631. 判断此类消息的逻辑:$.Data.MsgType=49 并且 解析$.Data.Content.string中的xml msg.appmsg.type=57
  632. '''
  633. callback_to_user=from_wxid
  634. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
  635. msg_content= msg_data["PushContent"]
  636. prompt={"role": "user", "content": [{
  637. "type": "text",
  638. "text": msg_content
  639. }]}
  640. # 收到的对话
  641. messages_to_send=await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, prompt)
  642. input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  643. input_message=dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message)
  644. await request.app.state.kafka_service.send_message_async(input_message)
  645. logger.info("发送对话 %s",input_message)
  646. # 回复的对话
  647. res=await gpt_client_async(request,messages_to_send,wxid,callback_to_user)
  648. reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"])
  649. input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
  650. input_message=dialogue_message(wxid,callback_to_user,input_wx_content_dialogue_message,True)
  651. await request.app.state.kafka_service.kafka_client.produce_message(input_message)
  652. logger.info("发送对话 %s",input_message)
  653. await request.app.state.kafka_service.save_session_messages_to_cache_async(hash_key, {"role": "assistant", "content": reply_content})
  654. await request.app.state.kafka_service.post_text_async(token_id,app_id,callback_to_user,reply_content)
  655. async def handle_xml_invite_group_async (request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  656. '''
  657. 群聊邀请
  658. 判断此类消息的逻辑:$.Data.MsgType=49
  659. 并且 解析$.Data.Content.string中的xml msg.appmsg.title=邀请你加入群聊(根据手机设置的系统语言title会有调整,不同语言关键字不同)
  660. '''
  661. logger.info(f'{wxid} 群聊邀请')
  662. msg_content_xml=msg_data["Content"]["string"]
  663. root = ET.fromstring(msg_content_xml)
  664. title_value = root.find(".//appmsg/title").text
  665. if '邀请你加入群聊' in title_value:
  666. invite_url = root.find('.//url').text
  667. ret,msg,data=await request.app.state.gewe_service.agree_join_room_async(token_id,app_id,invite_url)
  668. if ret==200:
  669. logger.info(f'群聊邀请,同意加入群聊 {msg} {data}')
  670. chatroom_id=data.get('chatroomId','')
  671. # if not chatroom_id:
  672. # logger.warning(f'群聊邀请,同意加入群聊失败 {msg} {data}')
  673. # return
  674. ret,msg,data=await request.app.state.gewe_service.save_contract_list_async(token_id,app_id,chatroom_id,3)
  675. logger.info(f'群聊邀请,保存到通讯录 chatroom_id {chatroom_id} {msg}')
  676. await request.app.state.gewe_service.update_group_info_to_cache_async(token_id,app_id,wxid,chatroom_id)
  677. await request.app.state.gewe_service.update_group_members_to_cache_async(token_id,app_id,wxid,chatroom_id)
  678. # 单个群信息推送到kafka
  679. group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id)
  680. k_message=wx_mod_group_info_members_message(wxid,group_info_members)
  681. await request.app.state.kafka_service.send_message_async(k_message)
  682. else:
  683. logger.warning(f'群聊邀请,同意加入群聊失败 {msg} {data}')
  684. async def handle_add_friend_notice_async(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  685. '''
  686. 好友添加请求通知
  687. '''
  688. logger.info('好友添加请求通知')
  689. try:
  690. msg_content_xml=msg_data["Content"]["string"]
  691. root = ET.fromstring(msg_content_xml)
  692. msg_content = root.attrib.get('content', None)
  693. v3= root.attrib.get('encryptusername', None)
  694. v4= root.attrib.get('ticket', None)
  695. scene=root.attrib.get('scene', None)
  696. to_contact_wxid=root.attrib.get('fromusername', None)
  697. wxid=msg_data["ToUserName"]["string"]
  698. # 自动同意好友
  699. # print(v3)
  700. # print(v4)
  701. # print(scene)
  702. # print(msg_content)
  703. # 操作类型,2添加好友 3同意好友 4拒绝好友
  704. #option=2
  705. option=3
  706. reply_add_contact_contact="亲,我是你的好友"
  707. ret,ret_msg=await request.app.state.gewe_service.add_contacts_async(token_id,app_id,scene,option,v3,v4,reply_add_contact_contact)
  708. if ret==200:
  709. logger.info('自动添加好友成功')
  710. # 好友发送的文字
  711. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{to_contact_wxid}'
  712. prompt={"role": "user", "content": [{"type": "text","text": msg_content}]}
  713. messages_to_send=await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, prompt)
  714. input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  715. input_message=dialogue_message(to_contact_wxid,wxid,input_wx_content_dialogue_message)
  716. await request.app.state.gewe_service.send_message_async(input_message)
  717. logger.info("发送对话 %s",input_message)
  718. callback_to_user=to_contact_wxid
  719. res=await gpt_client_async(messages_to_send,wxid,callback_to_user)
  720. reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"])
  721. #保存好友信息
  722. await request.app.state.gewe_service.save_contacts_brief_to_cache_async(token_id,app_id, wxid,[to_contact_wxid])
  723. # 保存到缓存
  724. await request.app.state.gewe_service.save_session_messages_to_cache_async(hash_key, {"role": "assistant", "content": reply_content})
  725. # 发送信息
  726. await request.app.state.gewe_service.post_text_async(token_id,app_id, to_contact_wxid,reply_content)
  727. # 发送到kafka
  728. input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
  729. input_message=dialogue_message(wxid,to_contact_wxid,input_wx_content_dialogue_message,True)
  730. request.app.state.kafka_service.send_message_async(input_message)
  731. logger.info("发送对话 %s",input_message)
  732. else:
  733. logger.warning("添加好友失败")
  734. except ET.ParseError as e:
  735. logger.error(f"解析XML失败: {e}")
  736. except Exception as e:
  737. logger.error(f"未知错误: {e}")
  738. return
  739. async def handle_10002_msg(request: Request,token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  740. '''
  741. 群聊邀请
  742. 撤回消息
  743. 拍一拍消息
  744. 地理位置
  745. 踢出群聊通知
  746. 解散群聊通知
  747. 发布群公告
  748. '''
  749. try:
  750. msg_content_xml=msg_data["Content"]["string"]
  751. # 群聊邀请
  752. if '邀请你加入了群聊' in msg_content_xml and check_chatroom(msg_data["FromUserName"]["string"]):
  753. chatroom_id=msg_data["FromUserName"]["string"]
  754. ret,msg,data=await request.app.state.gewe_service.save_contract_list_async(token_id,app_id,chatroom_id,3)
  755. logger.info(f'群聊邀请,保存到通讯录 chatroom_id {chatroom_id} {msg}')
  756. await request.app.state.gewe_service.update_group_info_to_cache_async(token_id,app_id,wxid,chatroom_id)
  757. await request.app.state.gewe_service.update_group_members_to_cache_async(token_id,app_id,wxid,chatroom_id)
  758. #推送群资料到kafka
  759. group_info_members=await request.app.state.gewe_service.get_group_info_members_from_cache_async(wxid,chatroom_id)
  760. k_message=wx_mod_group_info_members_message(wxid,group_info_members)
  761. await request.app.state.kafka_service.send_message_async(k_message)
  762. if '移出了群聊' in msg_content_xml and 'sysmsgtemplate' in msg_content_xml :
  763. chatroom_id=msg_data["FromUserName"]["string"]
  764. ret,msg,data=await request.app.state.gewe_service.save_contract_list_async(token_id,app_id,chatroom_id,2)
  765. logger.info(f'踢出群聊,移除从通讯录 chatroom_id {chatroom_id} {msg}')
  766. await request.app.state.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id)
  767. await request.app.state.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_MEMBERS:{wxid}',chatroom_id)
  768. logger.info(f'清除 chatroom_id{chatroom_id} 数据')
  769. # 推送删除群资料到kafka
  770. k_message=wx_del_group_message(wxid,chatroom_id)
  771. await request.app.state.kafka_service.send_message_async(k_message)
  772. if '已解散该群聊' in msg_content_xml and 'sysmsgtemplate' in msg_content_xml :
  773. chatroom_id=msg_data["FromUserName"]["string"]
  774. ret,msg,data=await request.app.state.gewe_service.save_contract_list_async(token_id,app_id,chatroom_id,2)
  775. logger.info(f'解散群聊,移除从通讯录 chatroom_id {chatroom_id} {msg}')
  776. await request.app.state.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id)
  777. await request.app.state.redis_service.delete_hash_field(f'__AI_OPS_WX__:GROUPS_MEMBERS:{wxid}',chatroom_id)
  778. logger.info(f'清除 chatroom_id{chatroom_id} 数据')
  779. # 推送删除群资料到kafka
  780. k_message=wx_del_group_message(wxid,chatroom_id)
  781. await request.app.state.kafka_service.send_message_async(k_message)
  782. except ET.ParseError as e:
  783. logger.error(f"解析XML失败: {e}")
  784. except Exception as e:
  785. logger.error(f"未知错误: {e}")
  786. return