You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

940 lines
41KB

  1. from flask_restful import Resource, reqparse
  2. from flask import jsonify,request
  3. import requests,json,re
  4. from wechat import gewe_chat
  5. from voice.ali.ali_voice import AliVoice
  6. from common import utils,redis_helper,memory,kafka_helper
  7. from common.log import logger
  8. import xml.etree.ElementTree as ET
  9. import threading,time
  10. import os
  11. from voice import audio_convert
  12. timeout_duration = 8.0
  13. class MessagesResource(Resource):
  14. def __init__(self):
  15. self.parser = reqparse.RequestParser()
  16. def post(self):
  17. msg = request.get_json()
  18. logger.info(f"收到微信回调消息: {msg}")
  19. type_name =msg.get("TypeName")
  20. app_id = msg.get("Appid")
  21. # token_id = "f828cb3c-1039-489f-b9ae-7494d1778a15"
  22. token_id=get_token_id_by_app_id(app_id)
  23. if token_id=="":
  24. logger.warning('找不到登录信息,不处理')
  25. return jsonify({"message": "收到微信回调消息"})
  26. wxid = msg.get("Wxid",'')
  27. wx_config = gewe_chat.wxchat.get_wxchat_config_from_cache(wxid)
  28. # 自发命令
  29. # if type_name=='AddMsg':
  30. # from_wxid = msg_data["FromUserName"]["string"]
  31. # if wxid=='from_wxid':
  32. # pass
  33. if type_name == 'AddMsg':
  34. handle_self_cmd(wxid,msg)
  35. if not bool(wx_config.get("agentEnabled",False)):
  36. logger.info(f'微信ID {wxid} 未托管,不处理')
  37. return jsonify({"message": "收到微信回调消息"})
  38. if type_name=='AddMsg':
  39. #wxid = msg.get("Wxid")
  40. msg_data = msg.get("Data")
  41. msg_type = msg_data.get("MsgType",None)
  42. from_wxid = msg_data["FromUserName"]["string"]
  43. to_wxid = msg_data["ToUserName"]["string"]
  44. msg_push_content=msg_data.get("PushContent") #群发
  45. #wx_config = gewe_chat.wxchat.get_wxchat_config_from_cache(wxid)
  46. handlers = {
  47. 1: handle_text,
  48. 3: handle_image,
  49. 34: handle_voice,
  50. 49: handle_xml,
  51. 37: handle_add_friend_notice,
  52. 10002: handle_10002_msg
  53. }
  54. # (扫码进群情况)判断受否是群聊,并添加到通信录
  55. if check_chatroom(from_wxid) or check_chatroom(to_wxid):
  56. logger.info('群信息')
  57. chatroom_id=from_wxid
  58. ret,msg,data=gewe_chat.wxchat.save_contract_list(token_id,app_id,chatroom_id,3)
  59. logger.info(f'保存到通讯录 chatroom_id {chatroom_id} {msg}')
  60. gewe_chat.wxchat.update_group_info_to_cache(token_id,app_id,wxid,chatroom_id)
  61. gewe_chat.wxchat.update_group_members_to_cache(token_id,app_id,wxid,chatroom_id)
  62. handlers[1]=handle_text_group
  63. handlers[3]=handle_image_group
  64. handlers[34]=handle_voice_group
  65. handler = handlers.get(msg_type)
  66. if handler:
  67. return handler(token_id,app_id, wxid,msg_data,from_wxid, to_wxid)
  68. else:
  69. logger.warning(f"微信回调消息类型 {msg_type} 未处理")
  70. elif type_name=='ModContacts':
  71. '''
  72. 好友通过验证及好友资料变更的通知消息
  73. '''
  74. wxid = msg.get("Wxid")
  75. msg_data = msg.get("Data")
  76. handle_mod_contacts(token_id,app_id,wxid,msg_data)
  77. elif type_name=="DelContacts":
  78. '''
  79. 删除好友通知/退出群聊
  80. '''
  81. msg_data = msg.get("Data")
  82. username=msg_data["UserName"]["string"]
  83. if check_chatroom(username):
  84. logger.info('退出群聊')
  85. wxid = msg.get("Wxid")
  86. chatroom_id=username
  87. redis_helper.redis_helper.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id)
  88. logger.info(f'清除 chatroom_id{chatroom_id} 数据')
  89. else:
  90. logger.info('删除好友通知')
  91. elif type_name=="Offline":
  92. '''
  93. 已经离线
  94. '''
  95. wxid = msg.get("Wxid")
  96. logger.warning(f'微信ID {wxid}在设备{app_id}已经离线')
  97. k,r=get_login_info_by_app_id(app_id)
  98. print(k)
  99. redis_helper.redis_helper.update_hash_field(k,'status',0)
  100. redis_helper.redis_helper.update_hash_field(k,'modify_at',int(time.time()))
  101. else:
  102. logger.warning(f"未知消息类型")
  103. return jsonify({"message": "收到微信回调消息"})
  104. def handle_self_cmd(wxid,msg):
  105. msg_data=msg.get("Data")
  106. from_wxid=msg_data["FromUserName"]["string"]
  107. msg_content=msg_data["Content"]["string"]
  108. if wxid == from_wxid:
  109. commands = {
  110. '启用托管': True,
  111. '关闭托管': False
  112. }
  113. if msg_content in commands:
  114. c_dict = gewe_chat.wxchat.get_wxchat_config_from_cache(wxid)
  115. if c_dict:
  116. from model import Models
  117. agent_config=Models.AgentConfig.model_validate(c_dict)
  118. agent_config.agentEnabled=commands[msg_content]
  119. gewe_chat.wxchat.save_wxchat_config(wxid, agent_config.model_dump())
  120. logger.info(f'{wxid} {"启动" if commands[msg_content] else "关闭"}托管')
  121. def handle_text(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  122. '''
  123. 私聊文本消息
  124. '''
  125. msg_content=msg_data["Content"]["string"]
  126. if wxid == from_wxid: #手动发送消息
  127. logger.info("Active message sending detected")
  128. gewe_chat.wxchat.save_contacts_brief_to_cache(token_id,app_id,wxid,[to_wxid])
  129. callback_to_user=msg_data["ToUserName"]["string"]
  130. input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  131. input_message=utils.dialogue_message(from_wxid,to_wxid,input_wx_content_dialogue_message)
  132. kafka_helper.kafka_client.produce_message(input_message)
  133. logger.info("发送对话 %s",input_message)
  134. else:
  135. callback_to_user=msg_data["FromUserName"]["string"]
  136. # hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
  137. # prompt={"role": "user", "content": [{
  138. # "type": "text",
  139. # "text": msg_content
  140. # }]}
  141. # messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  142. # # 收到的对话
  143. # input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  144. # input_message=utils.dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message)
  145. # kafka_helper.kafka_client.produce_message(input_message)
  146. # logger.info("发送对话 %s",input_message)
  147. # cache_data = memory.USER_INTERACTIVE_CACHE.get(wxid)
  148. # if cache_data and cache_data.get('interactive') :
  149. # o=get_first_char_if_digit(msg_content)
  150. # if o is not None:
  151. # userSelectOptions=cache_data.get('options')
  152. # if o < len(userSelectOptions):
  153. # o=o-1
  154. # msg_content=userSelectOptions[o].get("value")
  155. # messages_to_send=[{"role": "user", "content": msg_content}]
  156. # else:
  157. # messages_to_send=[{"role": "user", "content": msg_content}]
  158. # else:
  159. # messages_to_send=[{"role": "user", "content": msg_content}]
  160. # res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  161. # reply_content=res["choices"][0]["message"]["content"]
  162. # description = ''
  163. # userSelectOptions = []
  164. # if isinstance(reply_content, list) and any(item.get("type") == "interactive" for item in reply_content):
  165. # for item in reply_content:
  166. # if item["type"] == "interactive" and item["interactive"]["type"] == "userSelect":
  167. # params = item["interactive"]["params"]
  168. # description = params.get("description")
  169. # userSelectOptions = params.get("userSelectOptions", [])
  170. # values_string = "\n".join(option["value"] for option in userSelectOptions)
  171. # if description is not None:
  172. # memory.USER_INTERACTIVE_CACHE[wxid] = {
  173. # "interactive":True,
  174. # "options": userSelectOptions,
  175. # }
  176. # reply_content=description + '------------------------------\n'+values_string
  177. # elif isinstance(reply_content, list) and any(item.get("type") == "text" for item in reply_content):
  178. # memory.USER_INTERACTIVE_CACHE[wxid] = {
  179. # "interactive":False
  180. # }
  181. # text=''
  182. # for item in reply_content:
  183. # if item["type"] == "text":
  184. # text=item["text"]["content"]
  185. # if text=='':
  186. # # 去除上次上一轮对话再次请求
  187. # cache_messages_str=redis_helper.redis_helper.get_hash_field(hash_key,"data")
  188. # cache_messages = json.loads(cache_messages_str) if cache_messages_str else []
  189. # if len(cache_messages) >= 3:
  190. # cache_messages = cache_messages[:-3]
  191. # redis_helper.redis_helper.update_hash_field(hash_key,"data",json.dumps(cache_messages,ensure_ascii=False))
  192. # messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  193. # res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  194. # reply_content=res["choices"][0]["message"]["content"]
  195. # if isinstance(reply_content, list) :
  196. # reply_content=reply_content[0].get('text').get("content")
  197. # else:
  198. # reply_content=text
  199. # else:
  200. # memory.USER_INTERACTIVE_CACHE[wxid] = {
  201. # "interactive":False
  202. # }
  203. # reply_content=res["choices"][0]["message"]["content"]
  204. # gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,reply_content)
  205. # gewe_chat.wxchat.save_session_messages_to_cache(hash_key, {"role": "assistant", "content": reply_content})
  206. # # 回复的对话
  207. # input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
  208. # input_message=utils.dialogue_message(wxid,callback_to_user,input_wx_content_dialogue_message,True)
  209. # kafka_helper.kafka_client.produce_message(input_message)
  210. # logger.info("发送对话 %s",input_message)
  211. # 创建并启动任务线程,将参数传递给 ai_chat_text 函数
  212. task_thread = threading.Thread(
  213. target=ai_chat_text,
  214. args=(token_id,app_id,wxid,msg_data,msg_content)
  215. )
  216. task_thread.start()
  217. # 设置定时器,1秒后检查任务是否超时。这里需要使用 lambda 来传递参数
  218. timeout_timer = threading.Timer(
  219. timeout_duration,
  220. lambda:check_timeout(task_thread, token_id, wxid,app_id, callback_to_user)
  221. )
  222. timeout_timer.start()
  223. # 等待任务线程完成
  224. task_thread.join()
  225. # 取消定时器
  226. timeout_timer.cancel()
  227. def check_timeout(task_thread:threading.Thread, token_id,wxid, app_id, callback_to_user):
  228. if task_thread.is_alive():
  229. print(f"任务运行时间超过{timeout_duration}秒,token_id={token_id}, app_id={app_id}, callback_to_user={callback_to_user}")
  230. wx_config = gewe_chat.wxchat.get_wxchat_config_from_cache(wxid)
  231. if bool(wx_config.get("chatWaitingMsgEnabled",True)):
  232. gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,"亲,我正在组织回复的信息,请稍等一会")
  233. def ai_chat_text(token_id,app_id,wxid,msg_data,msg_content):
  234. start_time = time.time() # 记录任务开始时间
  235. callback_to_user=msg_data["FromUserName"]["string"]
  236. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
  237. prompt={"role": "user", "content": [{
  238. "type": "text",
  239. "text": msg_content
  240. }]}
  241. messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  242. # 收到的对话
  243. input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  244. input_message=utils.dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message)
  245. kafka_helper.kafka_client.produce_message(input_message)
  246. logger.info("发送对话 %s",input_message)
  247. cache_data = memory.USER_INTERACTIVE_CACHE.get(wxid)
  248. if cache_data and cache_data.get('interactive') :
  249. o=get_first_char_if_digit(msg_content)
  250. if o is not None:
  251. userSelectOptions=cache_data.get('options')
  252. if o < len(userSelectOptions):
  253. o=o-1
  254. msg_content=userSelectOptions[o].get("value")
  255. messages_to_send=[{"role": "user", "content": msg_content}]
  256. else:
  257. messages_to_send=[{"role": "user", "content": msg_content}]
  258. else:
  259. messages_to_send=[{"role": "user", "content": msg_content}]
  260. res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  261. reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"])
  262. description = ''
  263. userSelectOptions = []
  264. if isinstance(reply_content, list) and any(item.get("type") == "interactive" for item in reply_content):
  265. for item in reply_content:
  266. if item["type"] == "interactive" and item["interactive"]["type"] == "userSelect":
  267. params = item["interactive"]["params"]
  268. description = params.get("description")
  269. userSelectOptions = params.get("userSelectOptions", [])
  270. values_string = "\n".join(option["value"] for option in userSelectOptions)
  271. if description is not None:
  272. memory.USER_INTERACTIVE_CACHE[wxid] = {
  273. "interactive":True,
  274. "options": userSelectOptions,
  275. }
  276. reply_content=description + '------------------------------\n'+values_string
  277. elif isinstance(reply_content, list) and any(item.get("type") == "text" for item in reply_content):
  278. memory.USER_INTERACTIVE_CACHE[wxid] = {
  279. "interactive":False
  280. }
  281. text=''
  282. for item in reply_content:
  283. if item["type"] == "text":
  284. text=item["text"]["content"]
  285. if text=='':
  286. # 去除上次上一轮对话再次请求
  287. cache_messages_str=redis_helper.redis_helper.get_hash_field(hash_key,"data")
  288. cache_messages = json.loads(cache_messages_str) if cache_messages_str else []
  289. if len(cache_messages) >= 3:
  290. cache_messages = cache_messages[:-3]
  291. redis_helper.redis_helper.update_hash_field(hash_key,"data",json.dumps(cache_messages,ensure_ascii=False))
  292. messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  293. res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  294. reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"])
  295. if isinstance(reply_content, list) :
  296. reply_content=remove_markdown_symbol(reply_content[0].get('text').get("content"))
  297. else:
  298. reply_content=text
  299. else:
  300. memory.USER_INTERACTIVE_CACHE[wxid] = {
  301. "interactive":False
  302. }
  303. reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"])
  304. gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,reply_content)
  305. gewe_chat.wxchat.save_session_messages_to_cache(hash_key, {"role": "assistant", "content": reply_content})
  306. # 回复的对话
  307. input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
  308. input_message=utils.dialogue_message(wxid,callback_to_user,input_wx_content_dialogue_message,True)
  309. kafka_helper.kafka_client.produce_message(input_message)
  310. logger.info("发送对话 %s",input_message)
  311. end_time = time.time() # 记录任务结束时间
  312. execution_time = end_time - start_time # 计算执行时间
  313. logger.info(f"AI回答任务完成,耗时 {execution_time:.2f} 秒")
  314. def handle_text_group(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  315. '''
  316. 群聊文本消息
  317. '''
  318. msg_content=msg_data["Content"]["string"]
  319. msg_push_content=msg_data.get("PushContent")
  320. k,login_info=get_login_info_by_app_id(app_id)
  321. nickname=login_info.get("nickName")
  322. if wxid == from_wxid: #手动发送消息
  323. logger.info("Active message sending detected")
  324. gewe_chat.wxchat.save_contacts_brief_to_cache(token_id,app_id,wxid,[to_wxid])
  325. callback_to_user=msg_data["ToUserName"]["string"]
  326. input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  327. input_message=utils.dialogue_message(from_wxid,to_wxid,input_wx_content_dialogue_message)
  328. kafka_helper.kafka_client.produce_message(input_message)
  329. logger.info("发送对话 %s",input_message)
  330. else:
  331. c = gewe_chat.wxchat.get_wxchat_config_from_cache(wxid)
  332. chatroom_id_white_list = c.get("chatroomIdWhiteList", [])
  333. if not chatroom_id_white_list:
  334. logger.info('白名单为空或未定义,不处理')
  335. return
  336. if from_wxid not in chatroom_id_white_list:
  337. logger.info(f'群ID {from_wxid} 不在白名单中,不处理')
  338. return
  339. if '在群聊中@了你' in msg_push_content or '@'+nickname in msg_push_content:
  340. callback_to_user=msg_data["FromUserName"]["string"]
  341. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
  342. prompt={"role": "user", "content": [{
  343. "type": "text",
  344. "text": msg_content
  345. }]}
  346. messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  347. # 收到的对话
  348. input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  349. input_message=utils.dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message)
  350. kafka_helper.kafka_client.produce_message(input_message)
  351. logger.info("发送对话 %s",input_message)
  352. cache_data = memory.USER_INTERACTIVE_CACHE.get(wxid)
  353. if cache_data and cache_data.get('interactive') :
  354. o=get_first_char_if_digit(msg_content)
  355. if o is not None:
  356. userSelectOptions=cache_data.get('options')
  357. if o < len(userSelectOptions):
  358. o=o-1
  359. msg_content=userSelectOptions[o].get("value")
  360. messages_to_send=[{"role": "user", "content": msg_content}]
  361. else:
  362. messages_to_send=[{"role": "user", "content": msg_content}]
  363. else:
  364. messages_to_send=[{"role": "user", "content": msg_content}]
  365. res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  366. reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"])
  367. description = ''
  368. userSelectOptions = []
  369. if isinstance(reply_content, list) and any(item.get("type") == "interactive" for item in reply_content):
  370. for item in reply_content:
  371. if item["type"] == "interactive" and item["interactive"]["type"] == "userSelect":
  372. params = item["interactive"]["params"]
  373. description = params.get("description")
  374. userSelectOptions = params.get("userSelectOptions", [])
  375. values_string = "\n".join(option["value"] for option in userSelectOptions)
  376. if description is not None:
  377. memory.USER_INTERACTIVE_CACHE[wxid] = {
  378. "interactive":True,
  379. "options": userSelectOptions,
  380. }
  381. reply_content=description + '------------------------------\n'+values_string
  382. elif isinstance(reply_content, list) and any(item.get("type") == "text" for item in reply_content):
  383. memory.USER_INTERACTIVE_CACHE[wxid] = {
  384. "interactive":False
  385. }
  386. text=''
  387. for item in reply_content:
  388. if item["type"] == "text":
  389. text=item["text"]["content"]
  390. if text=='':
  391. # 去除上次上一轮对话再次请求
  392. cache_messages_str=redis_helper.redis_helper.get_hash_field(hash_key,"data")
  393. cache_messages = json.loads(cache_messages_str) if cache_messages_str else []
  394. if len(cache_messages) >= 3:
  395. cache_messages = cache_messages[:-3]
  396. redis_helper.redis_helper.update_hash_field(hash_key,"data",json.dumps(cache_messages,ensure_ascii=False))
  397. messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  398. res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  399. reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"])
  400. else:
  401. reply_content=text
  402. else:
  403. memory.USER_INTERACTIVE_CACHE[wxid] = {
  404. "interactive":False
  405. }
  406. reply_content=res["choices"][0]["message"]["content"]
  407. reply_content='@'+extract_nickname(msg_push_content) + reply_content
  408. gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,reply_content)
  409. gewe_chat.wxchat.save_session_messages_to_cache(hash_key, {"role": "assistant", "content": reply_content})
  410. # 回复的对话
  411. input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
  412. input_message=utils.dialogue_message(wxid,callback_to_user,input_wx_content_dialogue_message,True)
  413. kafka_helper.kafka_client.produce_message(input_message)
  414. logger.info("发送对话 %s",input_message)
  415. else:
  416. logger.info('群聊公开消息')
  417. callback_to_user=msg_data["FromUserName"]["string"]
  418. dialogue_message=[{"type": "text", "text": msg_content}]
  419. input_message=utils.dialogue_message(callback_to_user,wxid,dialogue_message)
  420. kafka_helper.kafka_client.produce_message(input_message)
  421. logger.info("发送对话 %s",input_message)
  422. return
  423. def handle_image(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  424. '''
  425. 图片消息
  426. '''
  427. msg_content=msg_data["Content"]["string"]
  428. callback_to_user=from_wxid
  429. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
  430. wx_img_url=gewe_chat.wxchat.download_image_msg(token_id,app_id,msg_content)
  431. oss_access_key_id="LTAI5tRTG6pLhTpKACJYoPR5"
  432. oss_access_key_secret="E7dMzeeMxq4VQvLg7Tq7uKf3XWpYfN"
  433. oss_endpoint="http://oss-cn-shanghai.aliyuncs.com"
  434. oss_bucket_name="cow-agent"
  435. oss_prefix="cow"
  436. img_url=utils.upload_oss(oss_access_key_id, oss_access_key_secret, oss_endpoint, oss_bucket_name, wx_img_url, oss_prefix)
  437. prompt={
  438. "role": "user",
  439. "content": [{
  440. "type": "image_url",
  441. "image_url": {"url": img_url}
  442. }]
  443. }
  444. gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  445. gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,'已经上传了图片,有什么可以为您服务')
  446. logger.info(f"上传图片 URL: {img_url}")
  447. wx_content_dialogue_message=[{"type": "image_url", "image_url": {"url": img_url}}]
  448. input_message=utils.dialogue_message(wxid,callback_to_user,wx_content_dialogue_message)
  449. kafka_helper.kafka_client.produce_message(input_message)
  450. logger.info("发送对话 %s",input_message)
  451. def handle_image_group(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  452. logger.info('群聊图片消息')
  453. def handle_voice(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  454. '''
  455. 语音消息
  456. '''
  457. callback_to_user=from_wxid
  458. msg_content=msg_data["Content"]["string"]
  459. msg_id=msg_data["MsgId"]
  460. file_url=gewe_chat.wxchat.download_audio_msg(token_id,app_id,msg_id,msg_content)
  461. react_silk_path=utils.save_to_local_from_url(file_url)
  462. react_wav_path = os.path.splitext(react_silk_path)[0] + ".wav"
  463. audio_convert.any_to_wav(react_silk_path,react_wav_path)
  464. react_voice_text=AliVoice().voiceToText(react_wav_path)
  465. os.remove(react_silk_path)
  466. os.remove(react_wav_path)
  467. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
  468. messages=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, {"role": "user", "content": react_voice_text})
  469. ai_res=fast_gpt_api(messages,wxid,callback_to_user)
  470. ai_res_content=remove_markdown_symbol(ai_res["choices"][0]["message"]["content"])
  471. has_url=contains_url(ai_res_content)
  472. if not has_url:
  473. voice_during,voice_url=utils.wx_voice(ai_res_content)
  474. if voice_during < 60 * 1000:
  475. ret,ret_msg,res=gewe_chat.wxchat.post_voice(token_id,app_id,callback_to_user,voice_url,voice_during)
  476. else:
  477. ret,ret_msg,res=gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,ai_res_content)
  478. logger.warning(f'回应语音消息长度 {voice_during/1000}秒,超过60秒,转为文本回复')
  479. if ret==200:
  480. logger.info((f'{wxid} 向 {callback_to_user} 发送语音文本【{ai_res_content}】{ret_msg}'))
  481. else:
  482. logger.warning((f'{wxid} 向 {callback_to_user} 发送语音文本【{ai_res_content}】{ret_msg}'))
  483. ret,ret_msg,res=gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,ai_res_content)
  484. logger.info((f'{wxid} 向 {callback_to_user} 发送文本【{ai_res_content}】{ret_msg}'))
  485. else:
  486. logger.info(f"回复内容包含网址,不发送语音,回复文字内容:{ai_res_content}")
  487. ret,ret_msg,res=gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,ai_res_content)
  488. gewe_chat.wxchat.save_session_messages_to_cache(hash_key, {"role": "assistant", "content": ai_res_content})
  489. # 构造对话消息并发送到 Kafka
  490. input_wx_content_dialogue_message = [{"type": "text", "text": ai_res_content}]
  491. input_message = utils.dialogue_message(wxid, callback_to_user, input_wx_content_dialogue_message,True)
  492. kafka_helper.kafka_client.produce_message(input_message)
  493. logger.info("发送对话 %s", input_message)
  494. def handle_voice_group(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  495. logger.info('语音消息')
  496. def handle_xml(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  497. '''
  498. 处理xml
  499. '''
  500. msg_content_xml=msg_data["Content"]["string"]
  501. root = ET.fromstring(msg_content_xml)
  502. type_value = root.find(".//appmsg/type").text
  503. handlers = {
  504. 57: handle_xml_reference,
  505. }
  506. handler = handlers.get(type_value)
  507. if handler:
  508. return handler(token_id,app_id, wxid,msg_data,from_wxid, to_wxid)
  509. elif "邀请你加入了群聊" in msg_content_xml: # 邀请加入群聊
  510. logger.warning(f"xml消息 {type_value} 邀请你加入了群聊.todo")
  511. else:
  512. print(f"xml消息 {type_value} 未解析")
  513. def handle_xml_reference(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  514. '''
  515. 判断此类消息的逻辑:$.Data.MsgType=49 并且 解析$.Data.Content.string中的xml msg.appmsg.type=57
  516. '''
  517. callback_to_user=from_wxid
  518. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
  519. msg_content= msg_data["PushContent"]
  520. prompt={"role": "user", "content": [{
  521. "type": "text",
  522. "text": msg_content
  523. }]}
  524. # 收到的对话
  525. messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  526. input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  527. input_message=utils.dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message)
  528. kafka_helper.kafka_client.produce_message(input_message)
  529. logger.info("发送对话 %s",input_message)
  530. # 回复的对话
  531. res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  532. reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"])
  533. input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
  534. input_message=utils.dialogue_message(wxid,callback_to_user,input_wx_content_dialogue_message,True)
  535. kafka_helper.kafka_client.produce_message(input_message)
  536. logger.info("发送对话 %s",input_message)
  537. gewe_chat.wxchat.save_session_messages_to_cache(hash_key, {"role": "assistant", "content": reply_content})
  538. gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,reply_content)
  539. def handle_add_friend_notice(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  540. '''
  541. 好友添加请求通知
  542. '''
  543. logger.info('好友添加请求通知')
  544. msg_content_xml=msg_data["Content"]["string"]
  545. root = ET.fromstring(msg_content_xml)
  546. msg_content = root.attrib.get('content', None)
  547. v3= root.attrib.get('encryptusername', None)
  548. v4= root.attrib.get('ticket', None)
  549. scene=root.attrib.get('scene', None)
  550. to_contact_wxid=root.attrib.get('fromusername', None)
  551. wxid=msg_data["ToUserName"]["string"]
  552. # 自动同意好友
  553. # print(v3)
  554. # print(v4)
  555. # print(scene)
  556. # print(msg_content)
  557. # 操作类型,2添加好友 3同意好友 4拒绝好友
  558. #option=2
  559. option=3
  560. reply_add_contact_contact="亲,我是你的好友"
  561. ret,ret_msg=gewe_chat.wxchat.add_contacts(token_id,app_id,scene,option,v3,v4,reply_add_contact_contact)
  562. if ret==200:
  563. logger.info('自动添加好友成功')
  564. # 好友发送的文字
  565. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{to_contact_wxid}'
  566. prompt={"role": "user", "content": [{"type": "text","text": msg_content}]}
  567. messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  568. input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  569. input_message=utils.dialogue_message(to_contact_wxid,wxid,input_wx_content_dialogue_message)
  570. kafka_helper.kafka_client.produce_message(input_message)
  571. logger.info("发送对话 %s",input_message)
  572. callback_to_user=to_contact_wxid
  573. res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  574. reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"])
  575. #保存好友信息
  576. gewe_chat.wxchat.save_contacts_brief_to_cache(token_id,app_id, wxid,[to_contact_wxid])
  577. # 保存到缓存
  578. gewe_chat.wxchat.save_session_messages_to_cache(hash_key, {"role": "assistant", "content": reply_content})
  579. # 发送信息
  580. gewe_chat.wxchat.post_text(token_id,app_id, to_contact_wxid,reply_content)
  581. # 发送到kafka
  582. input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
  583. input_message=utils.dialogue_message(wxid,to_contact_wxid,input_wx_content_dialogue_message,True)
  584. kafka_helper.kafka_client.produce_message(input_message)
  585. logger.info("发送对话 %s",input_message)
  586. else:
  587. logger.warning("添加好友失败")
  588. def handle_10002_msg(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  589. '''
  590. 群聊邀请
  591. 撤回消息
  592. 拍一拍消息
  593. 地理位置
  594. 踢出群聊通知
  595. 解散群聊通知
  596. 发布群公告
  597. '''
  598. msg_content_xml=msg_data["Content"]["string"]
  599. # 群聊邀请
  600. if '邀请你加入了群聊' in msg_content_xml and check_chatroom(msg_data["FromUserName"]["string"]):
  601. chatroom_id=msg_data["FromUserName"]["string"]
  602. ret,msg,data=gewe_chat.wxchat.save_contract_list(token_id,app_id,chatroom_id,3)
  603. logger.info(f'群聊邀请,保存到通讯录 chatroom_id {chatroom_id} {msg}')
  604. gewe_chat.wxchat.update_group_info_to_cache(token_id,app_id,wxid,chatroom_id)
  605. gewe_chat.wxchat.update_group_members_to_cache(token_id,app_id,wxid,chatroom_id)
  606. if '移出了群聊' in msg_content_xml and 'sysmsgtemplate' in msg_content_xml :
  607. chatroom_id=msg_data["FromUserName"]["string"]
  608. ret,msg,data=gewe_chat.wxchat.save_contract_list(token_id,app_id,chatroom_id,2)
  609. logger.info(f'踢出群聊,移除从通讯录 chatroom_id {chatroom_id} {msg}')
  610. redis_helper.redis_helper.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id)
  611. logger.info(f'清除 chatroom_id{chatroom_id} 数据')
  612. if '已解散该群聊' in msg_content_xml and 'sysmsgtemplate' in msg_content_xml :
  613. chatroom_id=msg_data["FromUserName"]["string"]
  614. ret,msg,data=gewe_chat.wxchat.save_contract_list(token_id,app_id,chatroom_id,2)
  615. logger.info(f'解散群聊,移除从通讯录 chatroom_id {chatroom_id} {msg}')
  616. redis_helper.redis_helper.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id)
  617. logger.info(f'清除 chatroom_id{chatroom_id} 数据')
  618. print('撤回消息,拍一拍消息,地理位置')
  619. def handle_mod_contacts(token_id,app_id,wxid,msg_data):
  620. '''
  621. 好友通过验证及好友资料变更的通知消息
  622. '''
  623. logger.info('好友通过验证及好友资料变更的通知消息')
  624. if not check_chatroom(msg_data["UserName"]["string"]):
  625. contact_wxid = msg_data["UserName"]["string"]
  626. # 更新好友信息
  627. # 检查好友关系,不是好友则删除
  628. # ret,msg,check_relation=gewe_chat.wxchat.check_relation(token_id, app_id,[contact_wxid])
  629. # first_item = check_relation[0]
  630. # check_relation_status=first_item.get('relation')
  631. # logger.info(f'{wxid} 好友 {contact_wxid} 关系检查:{check_relation_status}')
  632. # if check_relation_status != 0:
  633. # gewe_chat.wxchat.delete_contacts_brief_from_cache(wxid, [contact_wxid])
  634. # logger.info(f'好友关系异常:{check_relation_status},删除好友 {contact_wxid} 信息')
  635. # else:
  636. # gewe_chat.wxchat.save_contacts_brief_to_cache(token_id, app_id, wxid, [contact_wxid])
  637. ret,msg,contacts_list = gewe_chat.wxchat.fetch_contacts_list(token_id, app_id)
  638. friend_wxids = contacts_list['friends'][3:] # 可以调整截取范围
  639. print(friend_wxids)
  640. #friend_wxids.remove('weixin')
  641. gewe_chat.wxchat.save_contacts_brief_to_cache(token_id, app_id, wxid, friend_wxids)
  642. else:
  643. logger.info('群聊好友通过验证及好友资料变更的通知消息')
  644. def get_messages_from_cache(hash_key,object:object)->list:
  645. '''
  646. 对话列表
  647. '''
  648. messages=redis_helper.redis_helper.get_hash(hash_key)
  649. wxid=hash_key.split(':')[-1]
  650. if not messages:
  651. messages=[{"role": "system", "content": ""}]
  652. messages.append(object)
  653. redis_helper.redis_helper.set_hash(hash_key,{"data":json.dumps(messages,ensure_ascii=False)},3600)
  654. else:
  655. messages_str=redis_helper.redis_helper.get_hash_field(hash_key,"data")
  656. messages = json.loads(messages_str) if messages_str else []
  657. #判断是否含有图片
  658. last_message = messages[-1]
  659. content = last_message.get("content", [])
  660. if isinstance(content, list) and content:
  661. last_content_type = content[-1].get("type")
  662. if last_content_type == 'image_url':
  663. content.append(object['content'][0])
  664. messages[-1]['content']=content
  665. else:
  666. messages.append(object)
  667. else:
  668. messages.append(object)
  669. redis_helper.redis_helper.set_hash(hash_key,{"data":json.dumps(messages,ensure_ascii=False)},3600)
  670. return messages
  671. def fast_gpt_api(messages:list,wixd:str,friend_wxid:str):
  672. c=gewe_chat.wxchat.get_wxchat_config_from_cache(wixd)
  673. api_key=c.get('agentTokenId',"sk-jr69ONIehfGKe9JFphuNk4DU5Y5wooHKHhQv7oSnFzVbwCnW65fXO9kvH")
  674. print(f'流程key:{api_key}\n')
  675. #api_key="sk-jr69ONIehfGKe9JFphuNk4DU5Y5wooHKHhQv7oSnFzVbwCnW65fXO9kvH" #测试
  676. #api_key="sk-uJDBdKmJVb2cmfldGOvlIY6Qx0AzqWMPD3lS1IzgQYzHNOXv9SKNI" #开发2
  677. api_url = "http://106.15.182.218:3000/api/v1/chat/completions"
  678. headers = {
  679. "Content-Type": "application/json",
  680. "Authorization": f"Bearer {api_key}"
  681. }
  682. session_id=f'{wixd}-{friend_wxid}'
  683. data={
  684. "model": "",
  685. "messages":messages,
  686. "chatId": session_id,
  687. "detail": True
  688. }
  689. #print(json.dumps(data,ensure_ascii=False))
  690. logger.info("[CHATGPT] 请求={}".format(json.dumps(data, ensure_ascii=False)))
  691. response = requests.post(url=api_url, headers=headers, data=json.dumps(data), timeout=600)
  692. response.raise_for_status()
  693. response_data = response.json()
  694. logger.info("[CHATGPT] 响应={}".format(json.dumps(response_data, separators=(',', ':'),ensure_ascii=False)))
  695. #print(response_data)
  696. return response_data
  697. def get_token_id_by_app_id(app_id: str) -> str:
  698. # 使用 SCAN 避免一次性返回所有的匹配键,逐步扫描
  699. cursor = 0
  700. while True:
  701. cursor, login_keys = redis_helper.redis_helper.client.scan(cursor, match='__AI_OPS_WX__:LOGININFO:*')
  702. # 批量获取所有键的 hash 数据
  703. for k in login_keys:
  704. r = redis_helper.redis_helper.get_hash(k)
  705. if r.get("appId") == app_id:
  706. return r.get("tokenId", "")
  707. # 如果游标为 0,则表示扫描完成
  708. if cursor == 0:
  709. break
  710. return ""
  711. def get_login_info_by_app_id(app_id: str) ->dict:
  712. # 使用 SCAN 避免一次性返回所有的匹配键,逐步扫描
  713. cursor = 0
  714. while True:
  715. cursor, login_keys = redis_helper.redis_helper.client.scan(cursor, match='__AI_OPS_WX__:LOGININFO:*')
  716. # 批量获取所有键的 hash 数据
  717. for k in login_keys:
  718. r = redis_helper.redis_helper.get_hash(k)
  719. if r.get("appId") == app_id:
  720. return k,r
  721. # 如果游标为 0,则表示扫描完成
  722. if cursor == 0:
  723. break
  724. return ""
  725. def contains_url(text):
  726. # 定义检测网址的正则表达式
  727. url_pattern = re.compile(
  728. r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+'
  729. )
  730. # 检查字符串是否包含网址
  731. return bool(url_pattern.search(text))
  732. def get_first_char_if_digit(s):
  733. if s and s[0].isdigit(): # 判断字符串是否非空且首字符为数字
  734. return int(s[0]) # 返回数字形式
  735. return None # 如果不是数字则返回 None
  736. def remove_at_mention_regex(text):
  737. # 使用正则表达式去掉“在群聊中@了你”
  738. return re.sub(r"在群聊中@了你", "", text)
  739. def extract_nickname(text)->str:
  740. if "在群聊中@了你" in text:
  741. # 如果包含 "在群聊中@了你",提取其前面的名字
  742. match = re.search(r"^(.*?)在群聊中@了你", text)
  743. if match:
  744. return match.group(1).strip()
  745. elif ": @" in text:
  746. # 如果包含 ": @",提取其前面的名字
  747. return text.split(": @")[0].strip()
  748. return ''
  749. def check_chatroom(userName):
  750. pattern = r'^\d+@chatroom$'
  751. if re.match(pattern, userName):
  752. return True
  753. return False
  754. # def remove_markdown_symbol(text: str):
  755. # # 移除markdown格式,目前先移除**
  756. # if not text or not isinstance(text, str):
  757. # return text
  758. # return re.sub(r'\*\*(.*?)\*\*', r'\1', text)
  759. def remove_markdown_symbol(text: str):
  760. # 移除markdown格式,目前先移除**
  761. if not text or not isinstance(text, str):
  762. return text
  763. # 去除加粗、斜体等格式
  764. #text = re.sub(r'\*\*([^*]+)\*\*', r'\1', text) # 去除加粗
  765. text=re.sub(r'\*\*(.*?)\*\*', r'\1', text)
  766. text = re.sub(r'\*([^*]+)\*', r'\1', text) # 去除斜体
  767. text = re.sub(r'__([^_]+)__', r'\1', text) # 去除加粗(下划线)
  768. text = re.sub(r'_(.*?)_', r'\1', text) # 去除斜体(下划线)
  769. # 去除行内代码块
  770. text = re.sub(r'`([^`]+)`', r'\1', text)
  771. # 去除换行符\n,或者多余的空格
  772. #text = re.sub(r'\n+', ' ', text)
  773. # 去除列表编号等
  774. #text = re.sub(r'^\d+\.\s*', '', text, flags=re.MULTILINE)
  775. #text = re.sub('[\\\`\*\_\[\]\#\+\-\!\>]', '', text)
  776. text = re.sub('[\\\`\*\_\[\]\#\+\!\>]', '', text)
  777. print(text)
  778. return text