You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

921 lines
40KB

  1. from flask_restful import Resource, reqparse
  2. from flask import jsonify,request
  3. import requests,json,re
  4. from wechat import gewe_chat
  5. from voice.ali.ali_voice import AliVoice
  6. from common import utils,redis_helper,memory,kafka_helper
  7. from common.log import logger
  8. import xml.etree.ElementTree as ET
  9. import threading,time
  10. import os
  11. from voice import audio_convert
  12. timeout_duration = 4.0
  13. class MessagesResource(Resource):
  14. def __init__(self):
  15. self.parser = reqparse.RequestParser()
  16. def post(self):
  17. msg = request.get_json()
  18. logger.info(f"收到微信回调消息: {msg}")
  19. type_name =msg.get("TypeName")
  20. app_id = msg.get("Appid")
  21. # token_id = "f828cb3c-1039-489f-b9ae-7494d1778a15"
  22. token_id=get_token_id_by_app_id(app_id)
  23. if token_id=="":
  24. logger.warning('找不到登录信息,不处理')
  25. return jsonify({"message": "收到微信回调消息"})
  26. wx_config = gewe_chat.wxchat.get_wxchat_config_from_cache(wxid)
  27. if not bool(wx_config.get("agentEnabled",False)):
  28. logger.info('智能体未启用,不处理')
  29. return jsonify({"message": "收到微信回调消息"})
  30. if type_name=='AddMsg':
  31. wxid = msg.get("Wxid")
  32. msg_data = msg.get("Data")
  33. msg_type = msg_data.get("MsgType",None)
  34. from_wxid = msg_data["FromUserName"]["string"]
  35. to_wxid = msg_data["ToUserName"]["string"]
  36. msg_push_content=msg_data.get("PushContent") #群发
  37. #wx_config = gewe_chat.wxchat.get_wxchat_config_from_cache(wxid)
  38. handlers = {
  39. 1: handle_text,
  40. 3: handle_image,
  41. 34: handle_voice,
  42. 49: handle_xml,
  43. 37: handle_add_friend_notice,
  44. 10002: handle_10002_msg
  45. }
  46. # (扫码进群情况)判断受否是群聊,并添加到通信录
  47. if check_chatroom(from_wxid) or check_chatroom(to_wxid):
  48. logger.info('群信息')
  49. chatroom_id=from_wxid
  50. ret,msg,data=gewe_chat.wxchat.save_contract_list(token_id,app_id,chatroom_id,3)
  51. logger.info(f'保存到通讯录 chatroom_id {chatroom_id} {msg}')
  52. gewe_chat.wxchat.update_groups_info_to_cache(token_id,app_id,wxid,chatroom_id)
  53. handlers[1]=handle_text_group
  54. handlers[3]=handle_image_group
  55. handlers[34]=handle_voice_group
  56. handler = handlers.get(msg_type)
  57. if handler:
  58. return handler(token_id,app_id, wxid,msg_data,from_wxid, to_wxid)
  59. else:
  60. logger.warning(f"微信回调消息类型 {msg_type} 未处理")
  61. elif type_name=='ModContacts':
  62. '''
  63. 好友通过验证及好友资料变更的通知消息
  64. '''
  65. wxid = msg.get("Wxid")
  66. msg_data = msg.get("Data")
  67. handle_mod_contacts(token_id,app_id,wxid,msg_data)
  68. elif type_name=="DelContacts":
  69. '''
  70. 删除好友通知/退出群聊
  71. '''
  72. msg_data = msg.get("Data")
  73. username=msg_data["UserName"]["string"]
  74. if check_chatroom(username):
  75. logger.info('退出群聊')
  76. wxid = msg.get("Wxid")
  77. chatroom_id=username
  78. redis_helper.redis_helper.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id)
  79. logger.info(f'清除 chatroom_id{chatroom_id} 数据')
  80. else:
  81. logger.info('删除好友通知')
  82. elif type_name=="Offline":
  83. '''
  84. 已经离线
  85. '''
  86. wxid = msg.get("Wxid")
  87. logger.warning(f'微信ID {wxid}在设备{app_id}已经离线')
  88. k,r=get_login_info_by_app_id(app_id)
  89. print(k)
  90. redis_helper.redis_helper.update_hash_field(k,'status',0)
  91. redis_helper.redis_helper.update_hash_field(k,'modify_at',int(time.time()))
  92. else:
  93. logger.warning(f"未知消息类型")
  94. return jsonify({"message": "收到微信回调消息"})
  95. def handle_text(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  96. '''
  97. 私聊文本消息
  98. '''
  99. msg_content=msg_data["Content"]["string"]
  100. if wxid == from_wxid: #手动发送消息
  101. logger.info("Active message sending detected")
  102. gewe_chat.wxchat.save_contacts_brief_to_cache(token_id,app_id,wxid,[to_wxid])
  103. callback_to_user=msg_data["ToUserName"]["string"]
  104. input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  105. input_message=utils.dialogue_message(from_wxid,to_wxid,input_wx_content_dialogue_message)
  106. kafka_helper.kafka_client.produce_message(input_message)
  107. logger.info("发送对话 %s",input_message)
  108. else:
  109. callback_to_user=msg_data["FromUserName"]["string"]
  110. # hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
  111. # prompt={"role": "user", "content": [{
  112. # "type": "text",
  113. # "text": msg_content
  114. # }]}
  115. # messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  116. # # 收到的对话
  117. # input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  118. # input_message=utils.dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message)
  119. # kafka_helper.kafka_client.produce_message(input_message)
  120. # logger.info("发送对话 %s",input_message)
  121. # cache_data = memory.USER_INTERACTIVE_CACHE.get(wxid)
  122. # if cache_data and cache_data.get('interactive') :
  123. # o=get_first_char_if_digit(msg_content)
  124. # if o is not None:
  125. # userSelectOptions=cache_data.get('options')
  126. # if o < len(userSelectOptions):
  127. # o=o-1
  128. # msg_content=userSelectOptions[o].get("value")
  129. # messages_to_send=[{"role": "user", "content": msg_content}]
  130. # else:
  131. # messages_to_send=[{"role": "user", "content": msg_content}]
  132. # else:
  133. # messages_to_send=[{"role": "user", "content": msg_content}]
  134. # res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  135. # reply_content=res["choices"][0]["message"]["content"]
  136. # description = ''
  137. # userSelectOptions = []
  138. # if isinstance(reply_content, list) and any(item.get("type") == "interactive" for item in reply_content):
  139. # for item in reply_content:
  140. # if item["type"] == "interactive" and item["interactive"]["type"] == "userSelect":
  141. # params = item["interactive"]["params"]
  142. # description = params.get("description")
  143. # userSelectOptions = params.get("userSelectOptions", [])
  144. # values_string = "\n".join(option["value"] for option in userSelectOptions)
  145. # if description is not None:
  146. # memory.USER_INTERACTIVE_CACHE[wxid] = {
  147. # "interactive":True,
  148. # "options": userSelectOptions,
  149. # }
  150. # reply_content=description + '------------------------------\n'+values_string
  151. # elif isinstance(reply_content, list) and any(item.get("type") == "text" for item in reply_content):
  152. # memory.USER_INTERACTIVE_CACHE[wxid] = {
  153. # "interactive":False
  154. # }
  155. # text=''
  156. # for item in reply_content:
  157. # if item["type"] == "text":
  158. # text=item["text"]["content"]
  159. # if text=='':
  160. # # 去除上次上一轮对话再次请求
  161. # cache_messages_str=redis_helper.redis_helper.get_hash_field(hash_key,"data")
  162. # cache_messages = json.loads(cache_messages_str) if cache_messages_str else []
  163. # if len(cache_messages) >= 3:
  164. # cache_messages = cache_messages[:-3]
  165. # redis_helper.redis_helper.update_hash_field(hash_key,"data",json.dumps(cache_messages,ensure_ascii=False))
  166. # messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  167. # res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  168. # reply_content=res["choices"][0]["message"]["content"]
  169. # if isinstance(reply_content, list) :
  170. # reply_content=reply_content[0].get('text').get("content")
  171. # else:
  172. # reply_content=text
  173. # else:
  174. # memory.USER_INTERACTIVE_CACHE[wxid] = {
  175. # "interactive":False
  176. # }
  177. # reply_content=res["choices"][0]["message"]["content"]
  178. # gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,reply_content)
  179. # gewe_chat.wxchat.save_session_messages_to_cache(hash_key, {"role": "assistant", "content": reply_content})
  180. # # 回复的对话
  181. # input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
  182. # input_message=utils.dialogue_message(wxid,callback_to_user,input_wx_content_dialogue_message,True)
  183. # kafka_helper.kafka_client.produce_message(input_message)
  184. # logger.info("发送对话 %s",input_message)
  185. # 创建并启动任务线程,将参数传递给 ai_chat_text 函数
  186. task_thread = threading.Thread(
  187. target=ai_chat_text,
  188. args=(token_id,app_id,wxid,msg_data,msg_content)
  189. )
  190. task_thread.start()
  191. # 设置定时器,1秒后检查任务是否超时。这里需要使用 lambda 来传递参数
  192. timeout_timer = threading.Timer(
  193. timeout_duration,
  194. lambda:check_timeout(task_thread, token_id, wxid,app_id, callback_to_user)
  195. )
  196. timeout_timer.start()
  197. # 等待任务线程完成
  198. task_thread.join()
  199. # 取消定时器
  200. timeout_timer.cancel()
  201. def check_timeout( task_thread:threading.Thread, token_id,wxid, app_id, callback_to_user):
  202. if task_thread.is_alive():
  203. print(f"任务运行时间超过{timeout_duration}秒,token_id={token_id}, app_id={app_id}, callback_to_user={callback_to_user}")
  204. wx_config = gewe_chat.wxchat.get_wxchat_config_from_cache(wxid)
  205. if bool(wx_config.get("chatWaitingMsgEnabled",True)):
  206. gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,"亲,我正在组织回复的信息,请稍等一会")
  207. def ai_chat_text(token_id,app_id,wxid,msg_data,msg_content):
  208. start_time = time.time() # 记录任务开始时间
  209. callback_to_user=msg_data["FromUserName"]["string"]
  210. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
  211. prompt={"role": "user", "content": [{
  212. "type": "text",
  213. "text": msg_content
  214. }]}
  215. messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  216. # 收到的对话
  217. input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  218. input_message=utils.dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message)
  219. kafka_helper.kafka_client.produce_message(input_message)
  220. logger.info("发送对话 %s",input_message)
  221. cache_data = memory.USER_INTERACTIVE_CACHE.get(wxid)
  222. if cache_data and cache_data.get('interactive') :
  223. o=get_first_char_if_digit(msg_content)
  224. if o is not None:
  225. userSelectOptions=cache_data.get('options')
  226. if o < len(userSelectOptions):
  227. o=o-1
  228. msg_content=userSelectOptions[o].get("value")
  229. messages_to_send=[{"role": "user", "content": msg_content}]
  230. else:
  231. messages_to_send=[{"role": "user", "content": msg_content}]
  232. else:
  233. messages_to_send=[{"role": "user", "content": msg_content}]
  234. res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  235. reply_content=res["choices"][0]["message"]["content"]
  236. description = ''
  237. userSelectOptions = []
  238. if isinstance(reply_content, list) and any(item.get("type") == "interactive" for item in reply_content):
  239. for item in reply_content:
  240. if item["type"] == "interactive" and item["interactive"]["type"] == "userSelect":
  241. params = item["interactive"]["params"]
  242. description = params.get("description")
  243. userSelectOptions = params.get("userSelectOptions", [])
  244. values_string = "\n".join(option["value"] for option in userSelectOptions)
  245. if description is not None:
  246. memory.USER_INTERACTIVE_CACHE[wxid] = {
  247. "interactive":True,
  248. "options": userSelectOptions,
  249. }
  250. reply_content=description + '------------------------------\n'+values_string
  251. elif isinstance(reply_content, list) and any(item.get("type") == "text" for item in reply_content):
  252. memory.USER_INTERACTIVE_CACHE[wxid] = {
  253. "interactive":False
  254. }
  255. text=''
  256. for item in reply_content:
  257. if item["type"] == "text":
  258. text=item["text"]["content"]
  259. if text=='':
  260. # 去除上次上一轮对话再次请求
  261. cache_messages_str=redis_helper.redis_helper.get_hash_field(hash_key,"data")
  262. cache_messages = json.loads(cache_messages_str) if cache_messages_str else []
  263. if len(cache_messages) >= 3:
  264. cache_messages = cache_messages[:-3]
  265. redis_helper.redis_helper.update_hash_field(hash_key,"data",json.dumps(cache_messages,ensure_ascii=False))
  266. messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  267. res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  268. reply_content=res["choices"][0]["message"]["content"]
  269. if isinstance(reply_content, list) :
  270. reply_content=reply_content[0].get('text').get("content")
  271. else:
  272. reply_content=text
  273. else:
  274. memory.USER_INTERACTIVE_CACHE[wxid] = {
  275. "interactive":False
  276. }
  277. reply_content=res["choices"][0]["message"]["content"]
  278. gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,reply_content)
  279. gewe_chat.wxchat.save_session_messages_to_cache(hash_key, {"role": "assistant", "content": reply_content})
  280. # 回复的对话
  281. input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
  282. input_message=utils.dialogue_message(wxid,callback_to_user,input_wx_content_dialogue_message,True)
  283. kafka_helper.kafka_client.produce_message(input_message)
  284. logger.info("发送对话 %s",input_message)
  285. end_time = time.time() # 记录任务结束时间
  286. execution_time = end_time - start_time # 计算执行时间
  287. logger.info(f"AI回答任务完成,耗时 {execution_time:.2f} 秒")
  288. def handle_text_group(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  289. '''
  290. 群聊文本消息
  291. '''
  292. msg_content=msg_data["Content"]["string"]
  293. msg_push_content=msg_data.get("PushContent")
  294. k,login_info=get_login_info_by_app_id(app_id)
  295. nickname=login_info.get("nickName")
  296. if wxid == from_wxid: #手动发送消息
  297. logger.info("Active message sending detected")
  298. gewe_chat.wxchat.save_contacts_brief_to_cache(token_id,app_id,wxid,[to_wxid])
  299. callback_to_user=msg_data["ToUserName"]["string"]
  300. input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  301. input_message=utils.dialogue_message(from_wxid,to_wxid,input_wx_content_dialogue_message)
  302. kafka_helper.kafka_client.produce_message(input_message)
  303. logger.info("发送对话 %s",input_message)
  304. else:
  305. c = gewe_chat.wxchat.get_wxchat_config_from_cache(wxid)
  306. chatroom_id_white_list = c.get("chatroomIdWhiteList", [])
  307. if not chatroom_id_white_list:
  308. logger.info('白名单为空或未定义,不处理')
  309. return
  310. if from_wxid not in chatroom_id_white_list:
  311. logger.info('群ID不在白名单中,不处理')
  312. return
  313. if '在群聊中@了你' in msg_push_content or '@'+nickname in msg_push_content:
  314. callback_to_user=msg_data["FromUserName"]["string"]
  315. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
  316. prompt={"role": "user", "content": [{
  317. "type": "text",
  318. "text": msg_content
  319. }]}
  320. messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  321. # 收到的对话
  322. input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  323. input_message=utils.dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message)
  324. kafka_helper.kafka_client.produce_message(input_message)
  325. logger.info("发送对话 %s",input_message)
  326. cache_data = memory.USER_INTERACTIVE_CACHE.get(wxid)
  327. if cache_data and cache_data.get('interactive') :
  328. o=get_first_char_if_digit(msg_content)
  329. if o is not None:
  330. userSelectOptions=cache_data.get('options')
  331. if o < len(userSelectOptions):
  332. o=o-1
  333. msg_content=userSelectOptions[o].get("value")
  334. messages_to_send=[{"role": "user", "content": msg_content}]
  335. else:
  336. messages_to_send=[{"role": "user", "content": msg_content}]
  337. else:
  338. messages_to_send=[{"role": "user", "content": msg_content}]
  339. res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  340. reply_content=res["choices"][0]["message"]["content"]
  341. description = ''
  342. userSelectOptions = []
  343. if isinstance(reply_content, list) and any(item.get("type") == "interactive" for item in reply_content):
  344. for item in reply_content:
  345. if item["type"] == "interactive" and item["interactive"]["type"] == "userSelect":
  346. params = item["interactive"]["params"]
  347. description = params.get("description")
  348. userSelectOptions = params.get("userSelectOptions", [])
  349. values_string = "\n".join(option["value"] for option in userSelectOptions)
  350. if description is not None:
  351. memory.USER_INTERACTIVE_CACHE[wxid] = {
  352. "interactive":True,
  353. "options": userSelectOptions,
  354. }
  355. reply_content=description + '------------------------------\n'+values_string
  356. elif isinstance(reply_content, list) and any(item.get("type") == "text" for item in reply_content):
  357. memory.USER_INTERACTIVE_CACHE[wxid] = {
  358. "interactive":False
  359. }
  360. text=''
  361. for item in reply_content:
  362. if item["type"] == "text":
  363. text=item["text"]["content"]
  364. if text=='':
  365. # 去除上次上一轮对话再次请求
  366. cache_messages_str=redis_helper.redis_helper.get_hash_field(hash_key,"data")
  367. cache_messages = json.loads(cache_messages_str) if cache_messages_str else []
  368. if len(cache_messages) >= 3:
  369. cache_messages = cache_messages[:-3]
  370. redis_helper.redis_helper.update_hash_field(hash_key,"data",json.dumps(cache_messages,ensure_ascii=False))
  371. messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  372. res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  373. reply_content=res["choices"][0]["message"]["content"]
  374. else:
  375. reply_content=text
  376. else:
  377. memory.USER_INTERACTIVE_CACHE[wxid] = {
  378. "interactive":False
  379. }
  380. reply_content=res["choices"][0]["message"]["content"]
  381. reply_content='@'+extract_nickname(msg_push_content) + reply_content
  382. gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,reply_content)
  383. gewe_chat.wxchat.save_session_messages_to_cache(hash_key, {"role": "assistant", "content": reply_content})
  384. # 回复的对话
  385. input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
  386. input_message=utils.dialogue_message(wxid,callback_to_user,input_wx_content_dialogue_message,True)
  387. kafka_helper.kafka_client.produce_message(input_message)
  388. logger.info("发送对话 %s",input_message)
  389. else:
  390. logger.info('群聊公开消息')
  391. callback_to_user=msg_data["FromUserName"]["string"]
  392. dialogue_message=[{"type": "text", "text": msg_content}]
  393. input_message=utils.dialogue_message(callback_to_user,wxid,dialogue_message)
  394. kafka_helper.kafka_client.produce_message(input_message)
  395. logger.info("发送对话 %s",input_message)
  396. return
  397. def handle_image(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  398. '''
  399. 图片消息
  400. '''
  401. msg_content=msg_data["Content"]["string"]
  402. callback_to_user=from_wxid
  403. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
  404. wx_img_url=gewe_chat.wxchat.download_image_msg(token_id,app_id,msg_content)
  405. oss_access_key_id="LTAI5tRTG6pLhTpKACJYoPR5"
  406. oss_access_key_secret="E7dMzeeMxq4VQvLg7Tq7uKf3XWpYfN"
  407. oss_endpoint="http://oss-cn-shanghai.aliyuncs.com"
  408. oss_bucket_name="cow-agent"
  409. oss_prefix="cow"
  410. img_url=utils.upload_oss(oss_access_key_id, oss_access_key_secret, oss_endpoint, oss_bucket_name, wx_img_url, oss_prefix)
  411. prompt={
  412. "role": "user",
  413. "content": [{
  414. "type": "image_url",
  415. "image_url": {"url": img_url}
  416. }]
  417. }
  418. gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  419. gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,'已经上传了图片,有什么可以为您服务')
  420. logger.info(f"上传图片 URL: {img_url}")
  421. wx_content_dialogue_message=[{"type": "image_url", "image_url": {"url": img_url}}]
  422. input_message=utils.dialogue_message(wxid,callback_to_user,wx_content_dialogue_message)
  423. kafka_helper.kafka_client.produce_message(input_message)
  424. logger.info("发送对话 %s",input_message)
  425. def handle_image_group(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  426. logger.info('群聊图片消息')
  427. def handle_voice(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  428. '''
  429. 语音消息
  430. '''
  431. callback_to_user=from_wxid
  432. msg_content=msg_data["Content"]["string"]
  433. msg_id=msg_data["MsgId"]
  434. file_url=gewe_chat.wxchat.download_audio_msg(token_id,app_id,msg_id,msg_content)
  435. react_silk_path=utils.save_to_local_from_url(file_url)
  436. react_wav_path = os.path.splitext(react_silk_path)[0] + ".wav"
  437. audio_convert.any_to_wav(react_silk_path,react_wav_path)
  438. react_voice_text=AliVoice().voiceToText(react_wav_path)
  439. os.remove(react_silk_path)
  440. os.remove(react_wav_path)
  441. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
  442. messages=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, {"role": "user", "content": react_voice_text})
  443. ai_res=fast_gpt_api(messages,wxid,callback_to_user)
  444. ai_res_content=ai_res["choices"][0]["message"]["content"]
  445. has_url=contains_url(ai_res_content)
  446. if not has_url:
  447. voice_during,voice_url=utils.wx_voice(ai_res_content)
  448. if voice_during < 60 * 1000:
  449. ret,ret_msg,res=gewe_chat.wxchat.post_voice(token_id,app_id,callback_to_user,voice_url,voice_during)
  450. else:
  451. ret,ret_msg,res=gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,ai_res_content)
  452. logger.warning(f'回应语音消息长度 {voice_during/1000}秒,超过60秒,转为文本回复')
  453. if ret==200:
  454. logger.info((f'{wxid} 向 {callback_to_user} 发送语音文本【{ai_res_content}】{ret_msg}'))
  455. else:
  456. logger.warning((f'{wxid} 向 {callback_to_user} 发送语音文本【{ai_res_content}】{ret_msg}'))
  457. ret,ret_msg,res=gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,ai_res_content)
  458. logger.info((f'{wxid} 向 {callback_to_user} 发送文本【{ai_res_content}】{ret_msg}'))
  459. else:
  460. logger.info(f"回复内容包含网址,不发送语音,回复文字内容:{ai_res_content}")
  461. ret,ret_msg,res=gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,ai_res_content)
  462. gewe_chat.wxchat.save_session_messages_to_cache(hash_key, {"role": "assistant", "content": ai_res_content})
  463. # 构造对话消息并发送到 Kafka
  464. input_wx_content_dialogue_message = [{"type": "text", "text": ai_res_content}]
  465. input_message = utils.dialogue_message(wxid, callback_to_user, input_wx_content_dialogue_message,True)
  466. kafka_helper.kafka_client.produce_message(input_message)
  467. logger.info("发送对话 %s", input_message)
  468. def handle_voice_group(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  469. logger.info('语音消息')
  470. def handle_xml(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  471. '''
  472. 处理xml
  473. '''
  474. msg_content_xml=msg_data["Content"]["string"]
  475. root = ET.fromstring(msg_content_xml)
  476. type_value = root.find(".//appmsg/type").text
  477. handlers = {
  478. 57: handle_xml_reference,
  479. }
  480. handler = handlers.get(type_value)
  481. if handler:
  482. return handler(token_id,app_id, wxid,msg_data,from_wxid, to_wxid)
  483. elif "邀请你加入了群聊" in msg_content_xml: # 邀请加入群聊
  484. logger.warning(f"xml消息 {type_value} 邀请你加入了群聊.todo")
  485. else:
  486. print(f"xml消息 {type_value} 未解析")
  487. def handle_xml_reference(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  488. '''
  489. 判断此类消息的逻辑:$.Data.MsgType=49 并且 解析$.Data.Content.string中的xml msg.appmsg.type=57
  490. '''
  491. callback_to_user=from_wxid
  492. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
  493. msg_content= msg_data["PushContent"]
  494. prompt={"role": "user", "content": [{
  495. "type": "text",
  496. "text": msg_content
  497. }]}
  498. # 收到的对话
  499. messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  500. input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  501. input_message=utils.dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message)
  502. kafka_helper.kafka_client.produce_message(input_message)
  503. logger.info("发送对话 %s",input_message)
  504. # 回复的对话
  505. res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  506. reply_content=res["choices"][0]["message"]["content"]
  507. input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
  508. input_message=utils.dialogue_message(wxid,callback_to_user,input_wx_content_dialogue_message,True)
  509. kafka_helper.kafka_client.produce_message(input_message)
  510. logger.info("发送对话 %s",input_message)
  511. gewe_chat.wxchat.save_session_messages_to_cache(hash_key, {"role": "assistant", "content": reply_content})
  512. gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,reply_content)
  513. def handle_add_friend_notice(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  514. '''
  515. 好友添加请求通知
  516. '''
  517. logger.info('好友添加请求通知')
  518. msg_content_xml=msg_data["Content"]["string"]
  519. root = ET.fromstring(msg_content_xml)
  520. msg_content = root.attrib.get('content', None)
  521. v3= root.attrib.get('encryptusername', None)
  522. v4= root.attrib.get('ticket', None)
  523. scene=root.attrib.get('scene', None)
  524. to_contact_wxid=root.attrib.get('fromusername', None)
  525. wxid=msg_data["ToUserName"]["string"]
  526. # 自动同意好友
  527. # print(v3)
  528. # print(v4)
  529. # print(scene)
  530. # print(msg_content)
  531. # 操作类型,2添加好友 3同意好友 4拒绝好友
  532. #option=2
  533. option=3
  534. reply_add_contact_contact="亲,我是你的美丽顾问"
  535. ret,ret_msg=gewe_chat.wxchat.add_contacts(token_id,app_id,scene,option,v3,v4,reply_add_contact_contact)
  536. if ret==200:
  537. logger.info('自动添加好友成功')
  538. # 好友发送的文字
  539. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{to_contact_wxid}'
  540. prompt={"role": "user", "content": [{"type": "text","text": msg_content}]}
  541. messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  542. input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  543. input_message=utils.dialogue_message(to_contact_wxid,wxid,input_wx_content_dialogue_message)
  544. kafka_helper.kafka_client.produce_message(input_message)
  545. logger.info("发送对话 %s",input_message)
  546. callback_to_user=to_contact_wxid
  547. res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  548. reply_content=res["choices"][0]["message"]["content"]
  549. #保存好友信息
  550. gewe_chat.wxchat.save_contacts_brief_to_cache(token_id,app_id, wxid,[to_contact_wxid])
  551. # 保存到缓存
  552. gewe_chat.wxchat.save_session_messages_to_cache(hash_key, {"role": "assistant", "content": reply_content})
  553. # 发送信息
  554. gewe_chat.wxchat.post_text(token_id,app_id, to_contact_wxid,reply_content)
  555. # 发送到kafka
  556. input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
  557. input_message=utils.dialogue_message(wxid,to_contact_wxid,input_wx_content_dialogue_message,True)
  558. kafka_helper.kafka_client.produce_message(input_message)
  559. logger.info("发送对话 %s",input_message)
  560. else:
  561. logger.warning("添加好友失败")
  562. def handle_10002_msg(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  563. '''
  564. 群聊邀请
  565. 撤回消息
  566. 拍一拍消息
  567. 地理位置
  568. 踢出群聊通知
  569. 解散群聊通知
  570. 发布群公告
  571. '''
  572. msg_content_xml=msg_data["Content"]["string"]
  573. # 群聊邀请
  574. if '邀请你加入了群聊' in msg_content_xml and check_chatroom(msg_data["FromUserName"]["string"]):
  575. chatroom_id=msg_data["FromUserName"]["string"]
  576. ret,msg,data=gewe_chat.wxchat.save_contract_list(token_id,app_id,chatroom_id,3)
  577. logger.info(f'群聊邀请,保存到通讯录 chatroom_id {chatroom_id} {msg}')
  578. gewe_chat.wxchat.update_groups_info_to_cache(token_id,app_id,wxid,chatroom_id)
  579. if '移出了群聊' in msg_content_xml and 'sysmsgtemplate' in msg_content_xml :
  580. chatroom_id=msg_data["FromUserName"]["string"]
  581. ret,msg,data=gewe_chat.wxchat.save_contract_list(token_id,app_id,chatroom_id,2)
  582. logger.info(f'踢出群聊,移除从通讯录 chatroom_id {chatroom_id} {msg}')
  583. redis_helper.redis_helper.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id)
  584. logger.info(f'清除 chatroom_id{chatroom_id} 数据')
  585. if '已解散该群聊' in msg_content_xml and 'sysmsgtemplate' in msg_content_xml :
  586. chatroom_id=msg_data["FromUserName"]["string"]
  587. ret,msg,data=gewe_chat.wxchat.save_contract_list(token_id,app_id,chatroom_id,2)
  588. logger.info(f'解散群聊,移除从通讯录 chatroom_id {chatroom_id} {msg}')
  589. redis_helper.redis_helper.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id)
  590. logger.info(f'清除 chatroom_id{chatroom_id} 数据')
  591. print('撤回消息,拍一拍消息,地理位置')
  592. def handle_mod_contacts(token_id,app_id,wxid,msg_data):
  593. '''
  594. 好友通过验证及好友资料变更的通知消息
  595. '''
  596. logger.info('好友通过验证及好友资料变更的通知消息')
  597. if not check_chatroom(msg_data["UserName"]["string"]):
  598. contact_wxid = msg_data["UserName"]["string"]
  599. # 更新好友信息
  600. ret,msg,contacts_list = gewe_chat.wxchat.fetch_contacts_list(token_id, app_id)
  601. friend_wxids = contacts_list['friends'][3:] # 可以调整截取范围
  602. print(friend_wxids)
  603. #friend_wxids.remove('weixin')
  604. gewe_chat.wxchat.save_contacts_brief_to_cache(token_id, app_id, wxid, friend_wxids)
  605. # keys=redis_helper.redis_helper.client.keys('__AI_OPS_WX__:LOGININFO:*')
  606. # wxid=""
  607. # for k in keys:
  608. # key=k.decode('utf-8')
  609. # hash_key=f"__AI_OPS_WX__:LOGININFO:{key}"
  610. # cache_app_id=redis_helper.redis_helper.get_hash_field(hash_key,"appId")
  611. # if app_id==cache_app_id:
  612. # wxid=redis_helper.redis_helper.get_hash_field(hash_key,"wxid")
  613. # break
  614. # print(wxid)
  615. # print('~~~~~~~~~~~~~~~~~~~~~~~~~~~\n')
  616. # if wxid!="":
  617. # print('~~~~~~~~~~~~~~~~~~~~~~~~~~~\n')
  618. # hash_key=f"__AI_OPS_WX__:CONTACTS_BRIEF:{wxid}"
  619. # ret,msg,contacts_list = gewe_chat.wxchat.fetch_contacts_list(token_id, app_id)
  620. # friend_wxids = contacts_list['friends'][3:] # 可以调整截取范围
  621. # print(friend_wxids)
  622. # #friend_wxids.remove('weixin')
  623. # gewe_chat.wxchat.save_contacts_brief_to_cache(token_id, app_id, wxid, friend_wxids)
  624. # cache_str=redis_helper.redis_helper.get_hash_field(hash_key,"data")
  625. # cache = json.loads(cache_str) if cache_str else []
  626. # for c in cache:
  627. # if c["userName"]==contact_wxid:
  628. # c["nickName"] = msg_data["NickName"]
  629. # c["snsBgImg"] = msg_data["SnsBgimgId"]
  630. # c["smallHeadImgUrl"] = msg_data["SmallHeadImgUrl"]
  631. # c["signature"]= msg_data["Signature"]
  632. # # 更新缓存,如果有修改过的话
  633. # if cache:
  634. # updated_cache_str = json.dumps(cache)
  635. # redis_helper.redis_helper.set_hash_field(hash_key, "data", updated_cache_str)
  636. else:
  637. logger.info('群聊好友通过验证及好友资料变更的通知消息')
  638. def get_messages_from_cache(hash_key,object:object)->list:
  639. '''
  640. 对话列表
  641. '''
  642. messages=redis_helper.redis_helper.get_hash(hash_key)
  643. wxid=hash_key.split(':')[-1]
  644. if not messages:
  645. messages=[{"role": "system", "content": ""}]
  646. messages.append(object)
  647. redis_helper.redis_helper.set_hash(hash_key,{"data":json.dumps(messages,ensure_ascii=False)},3600)
  648. else:
  649. messages_str=redis_helper.redis_helper.get_hash_field(hash_key,"data")
  650. messages = json.loads(messages_str) if messages_str else []
  651. #判断是否含有图片
  652. last_message = messages[-1]
  653. content = last_message.get("content", [])
  654. if isinstance(content, list) and content:
  655. last_content_type = content[-1].get("type")
  656. if last_content_type == 'image_url':
  657. content.append(object['content'][0])
  658. messages[-1]['content']=content
  659. else:
  660. messages.append(object)
  661. else:
  662. messages.append(object)
  663. redis_helper.redis_helper.set_hash(hash_key,{"data":json.dumps(messages,ensure_ascii=False)},3600)
  664. return messages
  665. # def fast_gpt_api(messages:list,session_id:str):
  666. # api_key="sk-jr69ONIehfGKe9JFphuNk4DU5Y5wooHKHhQv7oSnFzVbwCnW65fXO9kvH" #测试
  667. # #api_key="sk-uJDBdKmJVb2cmfldGOvlIY6Qx0AzqWMPD3lS1IzgQYzHNOXv9SKNI" #开发2
  668. # api_url = "http://106.15.182.218:3000/api/v1/chat/completions"
  669. # headers = {
  670. # "Content-Type": "application/json",
  671. # "Authorization": f"Bearer {api_key}"
  672. # }
  673. # data={
  674. # "model": "",
  675. # "messages":messages,
  676. # "chatId": session_id,
  677. # "detail": True
  678. # }
  679. # #print(json.dumps(data,ensure_ascii=False))
  680. # logger.info("[CHATGPT] 请求={}".format(json.dumps(data, ensure_ascii=False)))
  681. # response = requests.post(url=api_url, headers=headers, data=json.dumps(data), timeout=600)
  682. # response.raise_for_status()
  683. # response_data = response.json()
  684. # logger.info("[CHATGPT] 响应={}".format(json.dumps(response_data, separators=(',', ':'),ensure_ascii=False)))
  685. # print(response_data)
  686. # return response_data
  687. def fast_gpt_api(messages:list,wixd:str,friend_wxid:str):
  688. c=gewe_chat.wxchat.get_wxchat_config_from_cache(wixd)
  689. api_key=c.get('agentTokenId',"sk-jr69ONIehfGKe9JFphuNk4DU5Y5wooHKHhQv7oSnFzVbwCnW65fXO9kvH")
  690. print(f'流程key:{api_key}\n~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~\n')
  691. #api_key="sk-jr69ONIehfGKe9JFphuNk4DU5Y5wooHKHhQv7oSnFzVbwCnW65fXO9kvH" #测试
  692. #api_key="sk-uJDBdKmJVb2cmfldGOvlIY6Qx0AzqWMPD3lS1IzgQYzHNOXv9SKNI" #开发2
  693. api_url = "http://106.15.182.218:3000/api/v1/chat/completions"
  694. headers = {
  695. "Content-Type": "application/json",
  696. "Authorization": f"Bearer {api_key}"
  697. }
  698. session_id=f'{wixd}-{friend_wxid}'
  699. data={
  700. "model": "",
  701. "messages":messages,
  702. "chatId": session_id,
  703. "detail": True
  704. }
  705. #print(json.dumps(data,ensure_ascii=False))
  706. logger.info("[CHATGPT] 请求={}".format(json.dumps(data, ensure_ascii=False)))
  707. response = requests.post(url=api_url, headers=headers, data=json.dumps(data), timeout=600)
  708. response.raise_for_status()
  709. response_data = response.json()
  710. logger.info("[CHATGPT] 响应={}".format(json.dumps(response_data, separators=(',', ':'),ensure_ascii=False)))
  711. #print(response_data)
  712. return response_data
  713. def get_token_id_by_app_id(app_id: str) -> str:
  714. # 使用 SCAN 避免一次性返回所有的匹配键,逐步扫描
  715. cursor = 0
  716. while True:
  717. cursor, login_keys = redis_helper.redis_helper.client.scan(cursor, match='__AI_OPS_WX__:LOGININFO:*')
  718. # 批量获取所有键的 hash 数据
  719. for k in login_keys:
  720. r = redis_helper.redis_helper.get_hash(k)
  721. if r.get("appId") == app_id:
  722. return r.get("tokenId", "")
  723. # 如果游标为 0,则表示扫描完成
  724. if cursor == 0:
  725. break
  726. return ""
  727. def get_login_info_by_app_id(app_id: str) ->dict:
  728. # 使用 SCAN 避免一次性返回所有的匹配键,逐步扫描
  729. cursor = 0
  730. while True:
  731. cursor, login_keys = redis_helper.redis_helper.client.scan(cursor, match='__AI_OPS_WX__:LOGININFO:*')
  732. # 批量获取所有键的 hash 数据
  733. for k in login_keys:
  734. r = redis_helper.redis_helper.get_hash(k)
  735. if r.get("appId") == app_id:
  736. return k,r
  737. # 如果游标为 0,则表示扫描完成
  738. if cursor == 0:
  739. break
  740. return ""
  741. def contains_url(text):
  742. # 定义检测网址的正则表达式
  743. url_pattern = re.compile(
  744. r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+'
  745. )
  746. # 检查字符串是否包含网址
  747. return bool(url_pattern.search(text))
  748. def get_first_char_if_digit(s):
  749. if s and s[0].isdigit(): # 判断字符串是否非空且首字符为数字
  750. return int(s[0]) # 返回数字形式
  751. return None # 如果不是数字则返回 None
  752. def remove_at_mention_regex(text):
  753. # 使用正则表达式去掉“在群聊中@了你”
  754. return re.sub(r"在群聊中@了你", "", text)
  755. def extract_nickname(text)->str:
  756. if "在群聊中@了你" in text:
  757. # 如果包含 "在群聊中@了你",提取其前面的名字
  758. match = re.search(r"^(.*?)在群聊中@了你", text)
  759. if match:
  760. return match.group(1).strip()
  761. elif ": @" in text:
  762. # 如果包含 ": @",提取其前面的名字
  763. return text.split(": @")[0].strip()
  764. return ''
  765. def check_chatroom(userName):
  766. pattern = r'^\d+@chatroom$'
  767. if re.match(pattern, userName):
  768. return True
  769. return False