You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1012 lines
43KB

  1. from flask_restful import Resource, reqparse
  2. from flask import jsonify,request
  3. import requests,json,re
  4. from wechat import gewe_chat
  5. from voice.ali.ali_voice import AliVoice
  6. from common import utils,redis_helper,memory,kafka_helper
  7. from common.log import logger
  8. import xml.etree.ElementTree as ET
  9. import threading,time
  10. import os
  11. from voice import audio_convert
  12. timeout_duration = 8.0
  13. class MessagesResource(Resource):
  14. def __init__(self):
  15. self.parser = reqparse.RequestParser()
  16. def post(self):
  17. msg = request.get_json()
  18. logger.info(f"收到微信回调消息: {json.dumps(msg, separators=(',', ':'),ensure_ascii=False)}")
  19. type_name =msg.get("TypeName")
  20. app_id = msg.get("Appid")
  21. # token_id = "f828cb3c-1039-489f-b9ae-7494d1778a15"
  22. token_id=get_token_id_by_app_id(app_id)
  23. if token_id=="":
  24. logger.warning('找不到登录信息,不处理')
  25. return jsonify({"message": "收到微信回调消息"})
  26. wxid = msg.get("Wxid",'')
  27. # 自发命令
  28. # if type_name=='AddMsg':
  29. # from_wxid = msg_data["FromUserName"]["string"]
  30. # if wxid=='from_wxid':
  31. # pass
  32. if type_name == 'AddMsg':
  33. handle_self_cmd(wxid,msg)
  34. wx_config = gewe_chat.wxchat.get_wxchat_config_from_cache(wxid)
  35. # if not bool(wx_config.get("agentEnabled",False)):
  36. # logger.info(f'微信ID {wxid} 未托管,不处理')
  37. # return jsonify({"message": "收到微信回调消息"})
  38. if type_name=='AddMsg':
  39. if not bool(wx_config.get("agentEnabled",False)):
  40. logger.info(f'微信ID {wxid} 未托管,不处理')
  41. return jsonify({"message": "收到微信回调消息"})
  42. #wxid = msg.get("Wxid")
  43. msg_data = msg.get("Data")
  44. msg_type = msg_data.get("MsgType",None)
  45. from_wxid = msg_data["FromUserName"]["string"]
  46. to_wxid = msg_data["ToUserName"]["string"]
  47. msg_push_content=msg_data.get("PushContent") #群发
  48. #wx_config = gewe_chat.wxchat.get_wxchat_config_from_cache(wxid)
  49. handlers = {
  50. 1: handle_text,
  51. 3: handle_image,
  52. 34: handle_voice,
  53. 42: handle_name_card,
  54. 49: handle_xml,
  55. 37: handle_add_friend_notice,
  56. 10002: handle_10002_msg
  57. }
  58. # (扫码进群情况)判断受否是群聊,并添加到通信录
  59. if check_chatroom(from_wxid) or check_chatroom(to_wxid):
  60. logger.info('群信息')
  61. chatroom_id=from_wxid
  62. ret,msg,data=gewe_chat.wxchat.save_contract_list(token_id,app_id,chatroom_id,3)
  63. logger.info(f'保存到通讯录 chatroom_id {chatroom_id} {msg}')
  64. gewe_chat.wxchat.update_group_info_to_cache(token_id,app_id,wxid,chatroom_id)
  65. gewe_chat.wxchat.update_group_members_to_cache(token_id,app_id,wxid,chatroom_id)
  66. handlers[1]=handle_text_group
  67. handlers[3]=handle_image_group
  68. handlers[34]=handle_voice_group
  69. handler = handlers.get(msg_type)
  70. if handler:
  71. return handler(token_id,app_id, wxid,msg_data,from_wxid, to_wxid)
  72. else:
  73. logger.warning(f"微信回调消息类型 {msg_type} 未处理")
  74. elif type_name=='ModContacts':
  75. '''
  76. 好友通过验证及好友资料变更的通知消息
  77. '''
  78. wxid = msg.get("Wxid")
  79. msg_data = msg.get("Data")
  80. #
  81. #handle_mod_contacts(token_id,app_id,wxid,msg_data)
  82. #
  83. threading.Thread(target=handle_mod_contacts, args=(token_id,app_id,wxid,msg_data)).start()
  84. elif type_name=="DelContacts":
  85. '''
  86. 删除好友通知/退出群聊
  87. '''
  88. msg_data = msg.get("Data")
  89. username=msg_data["UserName"]["string"]
  90. if check_chatroom(username):
  91. logger.info('退出群聊')
  92. wxid = msg.get("Wxid")
  93. chatroom_id=username
  94. redis_helper.redis_helper.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id)
  95. logger.info(f'清除 chatroom_id{chatroom_id} 数据')
  96. else:
  97. logger.info('删除好友通知')
  98. elif type_name=="Offline":
  99. '''
  100. 已经离线
  101. '''
  102. wxid = msg.get("Wxid")
  103. logger.warning(f'微信ID {wxid}在设备{app_id}已经离线')
  104. k,r=get_login_info_by_app_id(app_id)
  105. print(k)
  106. redis_helper.redis_helper.update_hash_field(k,'status',0)
  107. redis_helper.redis_helper.update_hash_field(k,'modify_at',int(time.time()))
  108. else:
  109. logger.warning(f"未知消息类型")
  110. return jsonify({"message": "收到微信回调消息"})
  111. def handle_name_card(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  112. logger.info('名片消息')
  113. msg_content_xml=msg_data["Content"]["string"]
  114. # 解析XML字符串
  115. root = ET.fromstring(msg_content_xml)
  116. # 提取alias属性
  117. alias_value = root.get("alias")
  118. def handle_self_cmd(wxid,msg):
  119. '''
  120. 个人微信命令处理
  121. 如果是个人微信的指令,wxid == from_wxid
  122. commands = {
  123. '启用托管': True,
  124. '关闭托管': False
  125. }
  126. '''
  127. msg_data=msg.get("Data")
  128. from_wxid=msg_data["FromUserName"]["string"]
  129. msg_content=msg_data["Content"]["string"]
  130. if wxid == from_wxid:
  131. commands = {
  132. '启用托管': True,
  133. '关闭托管': False
  134. }
  135. if msg_content in commands:
  136. c_dict = gewe_chat.wxchat.get_wxchat_config_from_cache(wxid)
  137. if c_dict:
  138. from model import Models
  139. agent_config=Models.AgentConfig.model_validate(c_dict)
  140. agent_config.agentEnabled=commands[msg_content]
  141. gewe_chat.wxchat.save_wxchat_config(wxid, agent_config.model_dump())
  142. logger.info(f'{wxid} {"启动" if commands[msg_content] else "关闭"}托管')
  143. def handle_text(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  144. '''
  145. 私聊文本消息
  146. '''
  147. msg_content=msg_data["Content"]["string"]
  148. if wxid == from_wxid: #手动发送消息
  149. logger.info("Active message sending detected")
  150. gewe_chat.wxchat.save_contacts_brief_to_cache(token_id,app_id,wxid,[to_wxid])
  151. callback_to_user=msg_data["ToUserName"]["string"]
  152. input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  153. input_message=utils.dialogue_message(from_wxid,to_wxid,input_wx_content_dialogue_message)
  154. kafka_helper.kafka_client.produce_message(input_message)
  155. logger.info("发送对话 %s",input_message)
  156. else:
  157. callback_to_user=msg_data["FromUserName"]["string"]
  158. # hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
  159. # prompt={"role": "user", "content": [{
  160. # "type": "text",
  161. # "text": msg_content
  162. # }]}
  163. # messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  164. # # 收到的对话
  165. # input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  166. # input_message=utils.dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message)
  167. # kafka_helper.kafka_client.produce_message(input_message)
  168. # logger.info("发送对话 %s",input_message)
  169. # cache_data = memory.USER_INTERACTIVE_CACHE.get(wxid)
  170. # if cache_data and cache_data.get('interactive') :
  171. # o=get_first_char_if_digit(msg_content)
  172. # if o is not None:
  173. # userSelectOptions=cache_data.get('options')
  174. # if o < len(userSelectOptions):
  175. # o=o-1
  176. # msg_content=userSelectOptions[o].get("value")
  177. # messages_to_send=[{"role": "user", "content": msg_content}]
  178. # else:
  179. # messages_to_send=[{"role": "user", "content": msg_content}]
  180. # else:
  181. # messages_to_send=[{"role": "user", "content": msg_content}]
  182. # res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  183. # reply_content=res["choices"][0]["message"]["content"]
  184. # description = ''
  185. # userSelectOptions = []
  186. # if isinstance(reply_content, list) and any(item.get("type") == "interactive" for item in reply_content):
  187. # for item in reply_content:
  188. # if item["type"] == "interactive" and item["interactive"]["type"] == "userSelect":
  189. # params = item["interactive"]["params"]
  190. # description = params.get("description")
  191. # userSelectOptions = params.get("userSelectOptions", [])
  192. # values_string = "\n".join(option["value"] for option in userSelectOptions)
  193. # if description is not None:
  194. # memory.USER_INTERACTIVE_CACHE[wxid] = {
  195. # "interactive":True,
  196. # "options": userSelectOptions,
  197. # }
  198. # reply_content=description + '------------------------------\n'+values_string
  199. # elif isinstance(reply_content, list) and any(item.get("type") == "text" for item in reply_content):
  200. # memory.USER_INTERACTIVE_CACHE[wxid] = {
  201. # "interactive":False
  202. # }
  203. # text=''
  204. # for item in reply_content:
  205. # if item["type"] == "text":
  206. # text=item["text"]["content"]
  207. # if text=='':
  208. # # 去除上次上一轮对话再次请求
  209. # cache_messages_str=redis_helper.redis_helper.get_hash_field(hash_key,"data")
  210. # cache_messages = json.loads(cache_messages_str) if cache_messages_str else []
  211. # if len(cache_messages) >= 3:
  212. # cache_messages = cache_messages[:-3]
  213. # redis_helper.redis_helper.update_hash_field(hash_key,"data",json.dumps(cache_messages,ensure_ascii=False))
  214. # messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  215. # res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  216. # reply_content=res["choices"][0]["message"]["content"]
  217. # if isinstance(reply_content, list) :
  218. # reply_content=reply_content[0].get('text').get("content")
  219. # else:
  220. # reply_content=text
  221. # else:
  222. # memory.USER_INTERACTIVE_CACHE[wxid] = {
  223. # "interactive":False
  224. # }
  225. # reply_content=res["choices"][0]["message"]["content"]
  226. # gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,reply_content)
  227. # gewe_chat.wxchat.save_session_messages_to_cache(hash_key, {"role": "assistant", "content": reply_content})
  228. # # 回复的对话
  229. # input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
  230. # input_message=utils.dialogue_message(wxid,callback_to_user,input_wx_content_dialogue_message,True)
  231. # kafka_helper.kafka_client.produce_message(input_message)
  232. # logger.info("发送对话 %s",input_message)
  233. # 创建并启动任务线程,将参数传递给 ai_chat_text 函数
  234. task_thread = threading.Thread(
  235. target=ai_chat_text,
  236. args=(token_id,app_id,wxid,msg_data,msg_content)
  237. )
  238. task_thread.start()
  239. # 设置定时器,1秒后检查任务是否超时。这里需要使用 lambda 来传递参数
  240. timeout_timer = threading.Timer(
  241. timeout_duration,
  242. lambda:check_timeout(task_thread, token_id, wxid,app_id, callback_to_user)
  243. )
  244. timeout_timer.start()
  245. # 等待任务线程完成
  246. task_thread.join()
  247. # 取消定时器
  248. timeout_timer.cancel()
  249. def check_timeout(task_thread:threading.Thread, token_id,wxid, app_id, callback_to_user):
  250. if task_thread.is_alive():
  251. print(f"任务运行时间超过{timeout_duration}秒,token_id={token_id}, app_id={app_id}, callback_to_user={callback_to_user}")
  252. wx_config = gewe_chat.wxchat.get_wxchat_config_from_cache(wxid)
  253. if bool(wx_config.get("chatWaitingMsgEnabled",True)):
  254. gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,"亲,我正在组织回复的信息,请稍等一会")
  255. def ai_chat_text(token_id,app_id,wxid,msg_data,msg_content):
  256. start_time = time.time() # 记录任务开始时间
  257. callback_to_user=msg_data["FromUserName"]["string"]
  258. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
  259. prompt={"role": "user", "content": [{
  260. "type": "text",
  261. "text": msg_content
  262. }]}
  263. messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  264. # 收到的对话
  265. input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  266. input_message=utils.dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message)
  267. kafka_helper.kafka_client.produce_message(input_message)
  268. logger.info("发送对话 %s",input_message)
  269. cache_data = memory.USER_INTERACTIVE_CACHE.get(wxid)
  270. if cache_data and cache_data.get('interactive') :
  271. o=get_first_char_if_digit(msg_content)
  272. if o is not None:
  273. userSelectOptions=cache_data.get('options')
  274. if o < len(userSelectOptions):
  275. o=o-1
  276. msg_content=userSelectOptions[o].get("value")
  277. messages_to_send=[{"role": "user", "content": msg_content}]
  278. else:
  279. messages_to_send=[{"role": "user", "content": msg_content}]
  280. else:
  281. messages_to_send=[{"role": "user", "content": msg_content}]
  282. res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  283. reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"])
  284. description = ''
  285. userSelectOptions = []
  286. if isinstance(reply_content, list) and any(item.get("type") == "interactive" for item in reply_content):
  287. for item in reply_content:
  288. if item["type"] == "interactive" and item["interactive"]["type"] == "userSelect":
  289. params = item["interactive"]["params"]
  290. description = params.get("description")
  291. userSelectOptions = params.get("userSelectOptions", [])
  292. values_string = "\n".join(option["value"] for option in userSelectOptions)
  293. if description is not None:
  294. memory.USER_INTERACTIVE_CACHE[wxid] = {
  295. "interactive":True,
  296. "options": userSelectOptions,
  297. }
  298. reply_content=description + '------------------------------\n'+values_string
  299. elif isinstance(reply_content, list) and any(item.get("type") == "text" for item in reply_content):
  300. memory.USER_INTERACTIVE_CACHE[wxid] = {
  301. "interactive":False
  302. }
  303. text=''
  304. for item in reply_content:
  305. if item["type"] == "text":
  306. text=item["text"]["content"]
  307. if text=='':
  308. # 去除上次上一轮对话再次请求
  309. cache_messages_str=redis_helper.redis_helper.get_hash_field(hash_key,"data")
  310. cache_messages = json.loads(cache_messages_str) if cache_messages_str else []
  311. if len(cache_messages) >= 3:
  312. cache_messages = cache_messages[:-3]
  313. redis_helper.redis_helper.update_hash_field(hash_key,"data",json.dumps(cache_messages,ensure_ascii=False))
  314. messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  315. res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  316. reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"])
  317. if isinstance(reply_content, list) :
  318. reply_content=remove_markdown_symbol(reply_content[0].get('text').get("content"))
  319. else:
  320. reply_content=text
  321. else:
  322. memory.USER_INTERACTIVE_CACHE[wxid] = {
  323. "interactive":False
  324. }
  325. reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"])
  326. gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,reply_content)
  327. gewe_chat.wxchat.save_session_messages_to_cache(hash_key, {"role": "assistant", "content": reply_content})
  328. # 回复的对话
  329. input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
  330. input_message=utils.dialogue_message(wxid,callback_to_user,input_wx_content_dialogue_message,True)
  331. kafka_helper.kafka_client.produce_message(input_message)
  332. logger.info("发送对话 %s",input_message)
  333. end_time = time.time() # 记录任务结束时间
  334. execution_time = end_time - start_time # 计算执行时间
  335. logger.info(f"AI回答任务完成,耗时 {execution_time:.2f} 秒")
  336. def handle_text_group(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  337. '''
  338. 群聊文本消息
  339. '''
  340. msg_content=msg_data["Content"]["string"]
  341. msg_push_content=msg_data.get("PushContent")
  342. k,login_info=get_login_info_by_app_id(app_id)
  343. nickname=login_info.get("nickName")
  344. if wxid == from_wxid: #手动发送消息
  345. logger.info("Active message sending detected")
  346. gewe_chat.wxchat.save_contacts_brief_to_cache(token_id,app_id,wxid,[to_wxid])
  347. callback_to_user=msg_data["ToUserName"]["string"]
  348. input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  349. input_message=utils.dialogue_message(from_wxid,to_wxid,input_wx_content_dialogue_message)
  350. kafka_helper.kafka_client.produce_message(input_message)
  351. logger.info("发送对话 %s",input_message)
  352. else:
  353. c = gewe_chat.wxchat.get_wxchat_config_from_cache(wxid)
  354. chatroom_id_white_list = c.get("chatroomIdWhiteList", [])
  355. if not chatroom_id_white_list:
  356. logger.info('白名单为空或未定义,不处理')
  357. return
  358. if from_wxid not in chatroom_id_white_list:
  359. logger.info(f'群ID {from_wxid} 不在白名单中,不处理')
  360. return
  361. if '在群聊中@了你' in msg_push_content or '@'+nickname in msg_push_content:
  362. callback_to_user=msg_data["FromUserName"]["string"]
  363. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
  364. prompt={"role": "user", "content": [{
  365. "type": "text",
  366. "text": msg_content
  367. }]}
  368. messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  369. # 收到的对话
  370. input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  371. input_message=utils.dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message)
  372. kafka_helper.kafka_client.produce_message(input_message)
  373. logger.info("发送对话 %s",input_message)
  374. cache_data = memory.USER_INTERACTIVE_CACHE.get(wxid)
  375. if cache_data and cache_data.get('interactive') :
  376. o=get_first_char_if_digit(msg_content)
  377. if o is not None:
  378. userSelectOptions=cache_data.get('options')
  379. if o < len(userSelectOptions):
  380. o=o-1
  381. msg_content=userSelectOptions[o].get("value")
  382. messages_to_send=[{"role": "user", "content": msg_content}]
  383. else:
  384. messages_to_send=[{"role": "user", "content": msg_content}]
  385. else:
  386. messages_to_send=[{"role": "user", "content": msg_content}]
  387. res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  388. reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"])
  389. description = ''
  390. userSelectOptions = []
  391. if isinstance(reply_content, list) and any(item.get("type") == "interactive" for item in reply_content):
  392. for item in reply_content:
  393. if item["type"] == "interactive" and item["interactive"]["type"] == "userSelect":
  394. params = item["interactive"]["params"]
  395. description = params.get("description")
  396. userSelectOptions = params.get("userSelectOptions", [])
  397. values_string = "\n".join(option["value"] for option in userSelectOptions)
  398. if description is not None:
  399. memory.USER_INTERACTIVE_CACHE[wxid] = {
  400. "interactive":True,
  401. "options": userSelectOptions,
  402. }
  403. reply_content=description + '------------------------------\n'+values_string
  404. elif isinstance(reply_content, list) and any(item.get("type") == "text" for item in reply_content):
  405. memory.USER_INTERACTIVE_CACHE[wxid] = {
  406. "interactive":False
  407. }
  408. text=''
  409. for item in reply_content:
  410. if item["type"] == "text":
  411. text=item["text"]["content"]
  412. if text=='':
  413. # 去除上次上一轮对话再次请求
  414. cache_messages_str=redis_helper.redis_helper.get_hash_field(hash_key,"data")
  415. cache_messages = json.loads(cache_messages_str) if cache_messages_str else []
  416. if len(cache_messages) >= 3:
  417. cache_messages = cache_messages[:-3]
  418. redis_helper.redis_helper.update_hash_field(hash_key,"data",json.dumps(cache_messages,ensure_ascii=False))
  419. messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  420. res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  421. reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"])
  422. else:
  423. reply_content=text
  424. else:
  425. memory.USER_INTERACTIVE_CACHE[wxid] = {
  426. "interactive":False
  427. }
  428. reply_content=res["choices"][0]["message"]["content"]
  429. reply_content='@'+extract_nickname(msg_push_content) + reply_content
  430. gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,reply_content)
  431. gewe_chat.wxchat.save_session_messages_to_cache(hash_key, {"role": "assistant", "content": reply_content})
  432. # 回复的对话
  433. input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
  434. input_message=utils.dialogue_message(wxid,callback_to_user,input_wx_content_dialogue_message,True)
  435. kafka_helper.kafka_client.produce_message(input_message)
  436. logger.info("发送对话 %s",input_message)
  437. else:
  438. logger.info('群聊公开消息')
  439. callback_to_user=msg_data["FromUserName"]["string"]
  440. dialogue_message=[{"type": "text", "text": msg_content}]
  441. input_message=utils.dialogue_message(callback_to_user,wxid,dialogue_message)
  442. kafka_helper.kafka_client.produce_message(input_message)
  443. logger.info("发送对话 %s",input_message)
  444. return
  445. def handle_image(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  446. '''
  447. 图片消息
  448. '''
  449. msg_content=msg_data["Content"]["string"]
  450. callback_to_user=from_wxid
  451. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
  452. wx_img_url=gewe_chat.wxchat.download_image_msg(token_id,app_id,msg_content)
  453. oss_access_key_id="LTAI5tRTG6pLhTpKACJYoPR5"
  454. oss_access_key_secret="E7dMzeeMxq4VQvLg7Tq7uKf3XWpYfN"
  455. oss_endpoint="http://oss-cn-shanghai.aliyuncs.com"
  456. oss_bucket_name="cow-agent"
  457. oss_prefix="cow"
  458. img_url=utils.upload_oss(oss_access_key_id, oss_access_key_secret, oss_endpoint, oss_bucket_name, wx_img_url, oss_prefix)
  459. prompt={
  460. "role": "user",
  461. "content": [{
  462. "type": "image_url",
  463. "image_url": {"url": img_url}
  464. }]
  465. }
  466. gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  467. gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,'已经上传了图片,有什么可以为您服务')
  468. logger.info(f"上传图片 URL: {img_url}")
  469. wx_content_dialogue_message=[{"type": "image_url", "image_url": {"url": img_url}}]
  470. input_message=utils.dialogue_message(wxid,callback_to_user,wx_content_dialogue_message)
  471. kafka_helper.kafka_client.produce_message(input_message)
  472. logger.info("发送对话 %s",input_message)
  473. def handle_image_group(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  474. logger.info('群聊图片消息')
  475. def handle_voice(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  476. '''
  477. 语音消息
  478. '''
  479. callback_to_user=from_wxid
  480. msg_content=msg_data["Content"]["string"]
  481. msg_id=msg_data["MsgId"]
  482. file_url=gewe_chat.wxchat.download_audio_msg(token_id,app_id,msg_id,msg_content)
  483. react_silk_path=utils.save_to_local_from_url(file_url)
  484. react_wav_path = os.path.splitext(react_silk_path)[0] + ".wav"
  485. audio_convert.any_to_wav(react_silk_path,react_wav_path)
  486. react_voice_text=AliVoice().voiceToText(react_wav_path)
  487. os.remove(react_silk_path)
  488. os.remove(react_wav_path)
  489. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
  490. messages=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, {"role": "user", "content": react_voice_text})
  491. ai_res=fast_gpt_api(messages,wxid,callback_to_user)
  492. ai_res_content=remove_markdown_symbol(ai_res["choices"][0]["message"]["content"])
  493. has_url=contains_url(ai_res_content)
  494. if not has_url:
  495. voice_during,voice_url=utils.wx_voice(ai_res_content)
  496. if voice_during < 60 * 1000:
  497. ret,ret_msg,res=gewe_chat.wxchat.post_voice(token_id,app_id,callback_to_user,voice_url,voice_during)
  498. else:
  499. ret,ret_msg,res=gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,ai_res_content)
  500. logger.warning(f'回应语音消息长度 {voice_during/1000}秒,超过60秒,转为文本回复')
  501. if ret==200:
  502. logger.info((f'{wxid} 向 {callback_to_user} 发送语音文本【{ai_res_content}】{ret_msg}'))
  503. else:
  504. logger.warning((f'{wxid} 向 {callback_to_user} 发送语音文本【{ai_res_content}】{ret_msg}'))
  505. ret,ret_msg,res=gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,ai_res_content)
  506. logger.info((f'{wxid} 向 {callback_to_user} 发送文本【{ai_res_content}】{ret_msg}'))
  507. else:
  508. logger.info(f"回复内容包含网址,不发送语音,回复文字内容:{ai_res_content}")
  509. ret,ret_msg,res=gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,ai_res_content)
  510. gewe_chat.wxchat.save_session_messages_to_cache(hash_key, {"role": "assistant", "content": ai_res_content})
  511. # 构造对话消息并发送到 Kafka
  512. input_wx_content_dialogue_message = [{"type": "text", "text": ai_res_content}]
  513. input_message = utils.dialogue_message(wxid, callback_to_user, input_wx_content_dialogue_message,True)
  514. kafka_helper.kafka_client.produce_message(input_message)
  515. logger.info("发送对话 %s", input_message)
  516. def handle_voice_group(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  517. logger.info('语音消息')
  518. def handle_xml(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  519. '''
  520. 处理xml
  521. '''
  522. msg_content_xml=msg_data["Content"]["string"]
  523. root = ET.fromstring(msg_content_xml)
  524. type_value = root.find(".//appmsg/type").text
  525. handlers = {
  526. 57: handle_xml_reference,
  527. 5: handle_xml_invite_group
  528. }
  529. handler = handlers.get(type_value)
  530. if handler:
  531. return handler(token_id,app_id, wxid,msg_data,from_wxid, to_wxid)
  532. # elif "邀请你加入了群聊" in msg_content_xml: # 邀请加入群聊
  533. # logger.warning(f"xml消息 {type_value} 邀请你加入了群聊.todo")
  534. else:
  535. print(f"xml消息 {type_value} 未解析")
  536. def handle_xml_reference(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  537. '''
  538. 引用消息
  539. 判断此类消息的逻辑:$.Data.MsgType=49 并且 解析$.Data.Content.string中的xml msg.appmsg.type=57
  540. '''
  541. callback_to_user=from_wxid
  542. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
  543. msg_content= msg_data["PushContent"]
  544. prompt={"role": "user", "content": [{
  545. "type": "text",
  546. "text": msg_content
  547. }]}
  548. # 收到的对话
  549. messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  550. input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  551. input_message=utils.dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message)
  552. kafka_helper.kafka_client.produce_message(input_message)
  553. logger.info("发送对话 %s",input_message)
  554. # 回复的对话
  555. res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  556. reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"])
  557. input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
  558. input_message=utils.dialogue_message(wxid,callback_to_user,input_wx_content_dialogue_message,True)
  559. kafka_helper.kafka_client.produce_message(input_message)
  560. logger.info("发送对话 %s",input_message)
  561. gewe_chat.wxchat.save_session_messages_to_cache(hash_key, {"role": "assistant", "content": reply_content})
  562. gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,reply_content)
  563. def handle_xml_invite_group(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  564. '''
  565. 群聊邀请
  566. 判断此类消息的逻辑:$.Data.MsgType=49
  567. 并且 解析$.Data.Content.string中的xml msg.appmsg.title=邀请你加入群聊(根据手机设置的系统语言title会有调整,不同语言关键字不同)
  568. '''
  569. logger.info(f'{wxid} 群聊邀请')
  570. msg_content_xml=msg_data["Content"]["string"]
  571. root = ET.fromstring(msg_content_xml)
  572. title_value = root.find(".//appmsg/title").text
  573. if '邀请你加入群聊' in title_value:
  574. invite_url = root.find('.//url').text
  575. ret,msg,data=gewe_chat.wxchat.agree_join_room(token_id,app_id,invite_url)
  576. if ret==200:
  577. logger.info(f'群聊邀请,同意加入群聊 {msg} {data}')
  578. chatroom_id=data.get('chatroomId','')
  579. # if not chatroom_id:
  580. # logger.warning(f'群聊邀请,同意加入群聊失败 {msg} {data}')
  581. # return
  582. ret,msg,data=gewe_chat.wxchat.save_contract_list(token_id,app_id,chatroom_id,3)
  583. logger.info(f'群聊邀请,保存到通讯录 chatroom_id {chatroom_id} {msg}')
  584. gewe_chat.wxchat.update_group_info_to_cache(token_id,app_id,wxid,chatroom_id)
  585. gewe_chat.wxchat.update_group_members_to_cache(token_id,app_id,wxid,chatroom_id)
  586. else:
  587. logger.warning(f'群聊邀请,同意加入群聊失败 {msg} {data}')
  588. def handle_add_friend_notice(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  589. '''
  590. 好友添加请求通知
  591. '''
  592. logger.info('好友添加请求通知')
  593. msg_content_xml=msg_data["Content"]["string"]
  594. root = ET.fromstring(msg_content_xml)
  595. msg_content = root.attrib.get('content', None)
  596. v3= root.attrib.get('encryptusername', None)
  597. v4= root.attrib.get('ticket', None)
  598. scene=root.attrib.get('scene', None)
  599. to_contact_wxid=root.attrib.get('fromusername', None)
  600. wxid=msg_data["ToUserName"]["string"]
  601. # 自动同意好友
  602. # print(v3)
  603. # print(v4)
  604. # print(scene)
  605. # print(msg_content)
  606. # 操作类型,2添加好友 3同意好友 4拒绝好友
  607. #option=2
  608. option=3
  609. reply_add_contact_contact="亲,我是你的好友"
  610. ret,ret_msg=gewe_chat.wxchat.add_contacts(token_id,app_id,scene,option,v3,v4,reply_add_contact_contact)
  611. if ret==200:
  612. logger.info('自动添加好友成功')
  613. # 好友发送的文字
  614. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{to_contact_wxid}'
  615. prompt={"role": "user", "content": [{"type": "text","text": msg_content}]}
  616. messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  617. input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  618. input_message=utils.dialogue_message(to_contact_wxid,wxid,input_wx_content_dialogue_message)
  619. kafka_helper.kafka_client.produce_message(input_message)
  620. logger.info("发送对话 %s",input_message)
  621. callback_to_user=to_contact_wxid
  622. res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  623. reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"])
  624. #保存好友信息
  625. gewe_chat.wxchat.save_contacts_brief_to_cache(token_id,app_id, wxid,[to_contact_wxid])
  626. # 保存到缓存
  627. gewe_chat.wxchat.save_session_messages_to_cache(hash_key, {"role": "assistant", "content": reply_content})
  628. # 发送信息
  629. gewe_chat.wxchat.post_text(token_id,app_id, to_contact_wxid,reply_content)
  630. # 发送到kafka
  631. input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
  632. input_message=utils.dialogue_message(wxid,to_contact_wxid,input_wx_content_dialogue_message,True)
  633. kafka_helper.kafka_client.produce_message(input_message)
  634. logger.info("发送对话 %s",input_message)
  635. else:
  636. logger.warning("添加好友失败")
  637. def handle_10002_msg(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  638. '''
  639. 群聊邀请
  640. 撤回消息
  641. 拍一拍消息
  642. 地理位置
  643. 踢出群聊通知
  644. 解散群聊通知
  645. 发布群公告
  646. '''
  647. msg_content_xml=msg_data["Content"]["string"]
  648. # 群聊邀请
  649. if '邀请你加入了群聊' in msg_content_xml and check_chatroom(msg_data["FromUserName"]["string"]):
  650. chatroom_id=msg_data["FromUserName"]["string"]
  651. ret,msg,data=gewe_chat.wxchat.save_contract_list(token_id,app_id,chatroom_id,3)
  652. logger.info(f'群聊邀请,保存到通讯录 chatroom_id {chatroom_id} {msg}')
  653. gewe_chat.wxchat.update_group_info_to_cache(token_id,app_id,wxid,chatroom_id)
  654. gewe_chat.wxchat.update_group_members_to_cache(token_id,app_id,wxid,chatroom_id)
  655. if '移出了群聊' in msg_content_xml and 'sysmsgtemplate' in msg_content_xml :
  656. chatroom_id=msg_data["FromUserName"]["string"]
  657. ret,msg,data=gewe_chat.wxchat.save_contract_list(token_id,app_id,chatroom_id,2)
  658. logger.info(f'踢出群聊,移除从通讯录 chatroom_id {chatroom_id} {msg}')
  659. redis_helper.redis_helper.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id)
  660. logger.info(f'清除 chatroom_id{chatroom_id} 数据')
  661. if '已解散该群聊' in msg_content_xml and 'sysmsgtemplate' in msg_content_xml :
  662. chatroom_id=msg_data["FromUserName"]["string"]
  663. ret,msg,data=gewe_chat.wxchat.save_contract_list(token_id,app_id,chatroom_id,2)
  664. logger.info(f'解散群聊,移除从通讯录 chatroom_id {chatroom_id} {msg}')
  665. redis_helper.redis_helper.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id)
  666. logger.info(f'清除 chatroom_id{chatroom_id} 数据')
  667. print('撤回消息,拍一拍消息,地理位置')
  668. def handle_mod_contacts(token_id,app_id,wxid,msg_data):
  669. '''
  670. 好友通过验证及好友资料变更的通知消息
  671. '''
  672. logger.info('好友通过验证及好友资料变更的通知消息')
  673. if not check_chatroom(msg_data["UserName"]["string"]):
  674. contact_wxid = msg_data["UserName"]["string"]
  675. # 更新好友信息
  676. # 检查好友关系,不是好友则删除
  677. # ret,msg,check_relation=gewe_chat.wxchat.check_relation(token_id, app_id,[contact_wxid])
  678. # first_item = check_relation[0]
  679. # check_relation_status=first_item.get('relation')
  680. # logger.info(f'{wxid} 好友 {contact_wxid} 关系检查:{check_relation_status}')
  681. # if check_relation_status != 0:
  682. # gewe_chat.wxchat.delete_contacts_brief_from_cache(wxid, [contact_wxid])
  683. # logger.info(f'好友关系异常:{check_relation_status},删除好友 {contact_wxid} 信息')
  684. # else:
  685. # gewe_chat.wxchat.save_contacts_brief_to_cache(token_id, app_id, wxid, [contact_wxid])
  686. ret,msg,contacts_list = gewe_chat.wxchat.fetch_contacts_list(token_id, app_id)
  687. # friend_wxids = contacts_list['friends'][3:] # 可以调整截取范围
  688. # print(friend_wxids)
  689. #friend_wxids.remove('fmessage')
  690. #friend_wxids.remove('weixin')
  691. friend_wxids = [c for c in contacts_list['friends'] if c not in ['fmessage', 'medianote','weixin','weixingongzhong']] # 可以调整截取范围
  692. print(f'{wxid}的好友数量 {len(friend_wxids)}')
  693. gewe_chat.wxchat.save_contacts_brief_to_cache(token_id, app_id, wxid, friend_wxids)
  694. else:
  695. logger.info('群聊好友通过验证及好友资料变更的通知消息')
  696. def get_messages_from_cache(hash_key,object:object)->list:
  697. '''
  698. 对话列表
  699. '''
  700. messages=redis_helper.redis_helper.get_hash(hash_key)
  701. wxid=hash_key.split(':')[-1]
  702. if not messages:
  703. messages=[{"role": "system", "content": ""}]
  704. messages.append(object)
  705. redis_helper.redis_helper.set_hash(hash_key,{"data":json.dumps(messages,ensure_ascii=False)},3600)
  706. else:
  707. messages_str=redis_helper.redis_helper.get_hash_field(hash_key,"data")
  708. messages = json.loads(messages_str) if messages_str else []
  709. #判断是否含有图片
  710. last_message = messages[-1]
  711. content = last_message.get("content", [])
  712. if isinstance(content, list) and content:
  713. last_content_type = content[-1].get("type")
  714. if last_content_type == 'image_url':
  715. content.append(object['content'][0])
  716. messages[-1]['content']=content
  717. else:
  718. messages.append(object)
  719. else:
  720. messages.append(object)
  721. redis_helper.redis_helper.set_hash(hash_key,{"data":json.dumps(messages,ensure_ascii=False)},3600)
  722. return messages
  723. def fast_gpt_api(messages:list,wixd:str,friend_wxid:str):
  724. c=gewe_chat.wxchat.get_wxchat_config_from_cache(wixd)
  725. api_key=c.get('agentTokenId',"sk-jr69ONIehfGKe9JFphuNk4DU5Y5wooHKHhQv7oSnFzVbwCnW65fXO9kvH")
  726. print(f'流程key:{api_key}\n')
  727. #api_key="sk-jr69ONIehfGKe9JFphuNk4DU5Y5wooHKHhQv7oSnFzVbwCnW65fXO9kvH" #测试
  728. #api_key="sk-uJDBdKmJVb2cmfldGOvlIY6Qx0AzqWMPD3lS1IzgQYzHNOXv9SKNI" #开发2
  729. api_url = "http://106.15.182.218:3000/api/v1/chat/completions"
  730. headers = {
  731. "Content-Type": "application/json",
  732. "Authorization": f"Bearer {api_key}"
  733. }
  734. session_id=f'{wixd}-{friend_wxid}'
  735. data={
  736. "model": "",
  737. "messages":messages,
  738. "chatId": session_id,
  739. "detail": True
  740. }
  741. #print(json.dumps(data,ensure_ascii=False))
  742. logger.info("[CHATGPT] 请求={}".format(json.dumps(data, ensure_ascii=False)))
  743. response = requests.post(url=api_url, headers=headers, data=json.dumps(data), timeout=600)
  744. response.raise_for_status()
  745. response_data = response.json()
  746. logger.info("[CHATGPT] 响应={}".format(json.dumps(response_data, separators=(',', ':'),ensure_ascii=False)))
  747. #print(response_data)
  748. return response_data
  749. def get_token_id_by_app_id(app_id: str) -> str:
  750. # 使用 SCAN 避免一次性返回所有的匹配键,逐步扫描
  751. cursor = 0
  752. while True:
  753. cursor, login_keys = redis_helper.redis_helper.client.scan(cursor, match='__AI_OPS_WX__:LOGININFO:*')
  754. # 批量获取所有键的 hash 数据
  755. for k in login_keys:
  756. r = redis_helper.redis_helper.get_hash(k)
  757. if r.get("appId") == app_id:
  758. return r.get("tokenId", "")
  759. # 如果游标为 0,则表示扫描完成
  760. if cursor == 0:
  761. break
  762. return ""
  763. def get_login_info_by_app_id(app_id: str) ->dict:
  764. # 使用 SCAN 避免一次性返回所有的匹配键,逐步扫描
  765. cursor = 0
  766. while True:
  767. cursor, login_keys = redis_helper.redis_helper.client.scan(cursor, match='__AI_OPS_WX__:LOGININFO:*')
  768. # 批量获取所有键的 hash 数据
  769. for k in login_keys:
  770. r = redis_helper.redis_helper.get_hash(k)
  771. if r.get("appId") == app_id:
  772. return k,r
  773. # 如果游标为 0,则表示扫描完成
  774. if cursor == 0:
  775. break
  776. return ""
  777. def contains_url(text):
  778. # 定义检测网址的正则表达式
  779. url_pattern = re.compile(
  780. r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+'
  781. )
  782. # 检查字符串是否包含网址
  783. return bool(url_pattern.search(text))
  784. def get_first_char_if_digit(s):
  785. if s and s[0].isdigit(): # 判断字符串是否非空且首字符为数字
  786. return int(s[0]) # 返回数字形式
  787. return None # 如果不是数字则返回 None
  788. def remove_at_mention_regex(text):
  789. # 使用正则表达式去掉“在群聊中@了你”
  790. return re.sub(r"在群聊中@了你", "", text)
  791. def extract_nickname(text)->str:
  792. if "在群聊中@了你" in text:
  793. # 如果包含 "在群聊中@了你",提取其前面的名字
  794. match = re.search(r"^(.*?)在群聊中@了你", text)
  795. if match:
  796. return match.group(1).strip()
  797. elif ": @" in text:
  798. # 如果包含 ": @",提取其前面的名字
  799. return text.split(": @")[0].strip()
  800. return ''
  801. def check_chatroom(userName):
  802. pattern = r'^\d+@chatroom$'
  803. if re.match(pattern, userName):
  804. return True
  805. return False
  806. # def remove_markdown_symbol(text: str):
  807. # # 移除markdown格式,目前先移除**
  808. # if not text or not isinstance(text, str):
  809. # return text
  810. # return re.sub(r'\*\*(.*?)\*\*', r'\1', text)
  811. def remove_markdown_symbol(text: str):
  812. # 移除markdown格式,目前先移除**
  813. if not text or not isinstance(text, str):
  814. return text
  815. # 去除加粗、斜体等格式
  816. #text = re.sub(r'\*\*([^*]+)\*\*', r'\1', text) # 去除加粗
  817. text=re.sub(r'\*\*(.*?)\*\*', r'\1', text)
  818. text = re.sub(r'\*([^*]+)\*', r'\1', text) # 去除斜体
  819. text = re.sub(r'__([^_]+)__', r'\1', text) # 去除加粗(下划线)
  820. text = re.sub(r'_(.*?)_', r'\1', text) # 去除斜体(下划线)
  821. # 去除行内代码块
  822. text = re.sub(r'`([^`]+)`', r'\1', text)
  823. # 去除换行符\n,或者多余的空格
  824. #text = re.sub(r'\n+', ' ', text)
  825. # 去除列表编号等
  826. #text = re.sub(r'^\d+\.\s*', '', text, flags=re.MULTILINE)
  827. #text = re.sub('[\\\`\*\_\[\]\#\+\-\!\>]', '', text)
  828. text = re.sub('[\\\`\*\_\[\]\#\+\!\>]', '', text)
  829. print(text)
  830. return text