您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符

922 行
40KB

  1. from flask_restful import Resource, reqparse
  2. from flask import jsonify,request
  3. import requests,json,re
  4. from wechat import gewe_chat
  5. from voice.ali.ali_voice import AliVoice
  6. from common import utils,redis_helper,memory,kafka_helper
  7. from common.log import logger
  8. import xml.etree.ElementTree as ET
  9. import threading,time
  10. import os
  11. from voice import audio_convert
  12. timeout_duration = 4.0
  13. class MessagesResource(Resource):
  14. def __init__(self):
  15. self.parser = reqparse.RequestParser()
  16. def post(self):
  17. msg = request.get_json()
  18. logger.info(f"收到微信回调消息: {msg}")
  19. type_name =msg.get("TypeName")
  20. app_id = msg.get("Appid")
  21. # token_id = "f828cb3c-1039-489f-b9ae-7494d1778a15"
  22. token_id=get_token_id_by_app_id(app_id)
  23. if token_id=="":
  24. logger.warning('找不到登录信息,不处理')
  25. return jsonify({"message": "收到微信回调消息"})
  26. if type_name=='AddMsg':
  27. wxid = msg.get("Wxid")
  28. msg_data = msg.get("Data")
  29. msg_type = msg_data.get("MsgType",None)
  30. from_wxid = msg_data["FromUserName"]["string"]
  31. to_wxid = msg_data["ToUserName"]["string"]
  32. msg_push_content=msg_data.get("PushContent") #群发
  33. wx_config = gewe_chat.wxchat.get_wxchat_config_from_cache(wxid)
  34. if not bool(wx_config.get("agentEnabled",False)):
  35. logger.warning(f'{wxid} 智能体已关闭')
  36. handlers = {
  37. 49: handle_xml,
  38. 37: handle_add_friend_notice,
  39. 10002: handle_10002_msg
  40. }
  41. else:
  42. handlers = {
  43. 1: handle_text,
  44. 3: handle_image,
  45. 34: handle_voice,
  46. 49: handle_xml,
  47. 37: handle_add_friend_notice,
  48. 10002: handle_10002_msg
  49. }
  50. # (扫码进群情况)判断受否是群聊,并添加到通信录
  51. if check_chatroom(from_wxid) or check_chatroom(to_wxid):
  52. logger.info('群信息')
  53. chatroom_id=from_wxid
  54. ret,msg,data=gewe_chat.wxchat.save_contract_list(token_id,app_id,chatroom_id,3)
  55. logger.info(f'保存到通讯录 chatroom_id {chatroom_id} {msg}')
  56. gewe_chat.wxchat.update_groups_info_to_cache(token_id,app_id,wxid,chatroom_id)
  57. handlers[1]=handle_text_group
  58. handlers[3]=handle_image_group
  59. handlers[34]=handle_voice_group
  60. handler = handlers.get(msg_type)
  61. if handler:
  62. return handler(token_id,app_id, wxid,msg_data,from_wxid, to_wxid)
  63. else:
  64. logger.warning(f"微信回调消息类型 {msg_type} 未处理")
  65. elif type_name=='ModContacts':
  66. '''
  67. 好友通过验证及好友资料变更的通知消息
  68. '''
  69. wxid = msg.get("Wxid")
  70. msg_data = msg.get("Data")
  71. handle_mod_contacts(token_id,app_id,wxid,msg_data)
  72. elif type_name=="DelContacts":
  73. '''
  74. 删除好友通知/退出群聊
  75. '''
  76. msg_data = msg.get("Data")
  77. username=msg_data["UserName"]["string"]
  78. if check_chatroom(username):
  79. logger.info('退出群聊')
  80. wxid = msg.get("Wxid")
  81. chatroom_id=username
  82. redis_helper.redis_helper.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id)
  83. logger.info(f'清除 chatroom_id{chatroom_id} 数据')
  84. else:
  85. logger.info('删除好友通知')
  86. elif type_name=="Offline":
  87. '''
  88. 已经离线
  89. '''
  90. wxid = msg.get("Wxid")
  91. logger.warning(f'微信ID {wxid}在设备{app_id}已经离线')
  92. k,r=get_login_info_by_app_id(app_id)
  93. print(k)
  94. redis_helper.redis_helper.update_hash_field(k,'status',0)
  95. redis_helper.redis_helper.update_hash_field(k,'modify_at',int(time.time()))
  96. else:
  97. logger.warning(f"未知消息类型")
  98. return jsonify({"message": "收到微信回调消息"})
  99. def handle_text(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  100. '''
  101. 私聊文本消息
  102. '''
  103. msg_content=msg_data["Content"]["string"]
  104. if wxid == from_wxid: #手动发送消息
  105. logger.info("Active message sending detected")
  106. gewe_chat.wxchat.save_contacts_brief_to_cache(token_id,app_id,wxid,[to_wxid])
  107. callback_to_user=msg_data["ToUserName"]["string"]
  108. input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  109. input_message=utils.dialogue_message(from_wxid,to_wxid,input_wx_content_dialogue_message)
  110. kafka_helper.kafka_client.produce_message(input_message)
  111. logger.info("发送对话 %s",input_message)
  112. else:
  113. callback_to_user=msg_data["FromUserName"]["string"]
  114. # hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
  115. # prompt={"role": "user", "content": [{
  116. # "type": "text",
  117. # "text": msg_content
  118. # }]}
  119. # messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  120. # # 收到的对话
  121. # input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  122. # input_message=utils.dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message)
  123. # kafka_helper.kafka_client.produce_message(input_message)
  124. # logger.info("发送对话 %s",input_message)
  125. # cache_data = memory.USER_INTERACTIVE_CACHE.get(wxid)
  126. # if cache_data and cache_data.get('interactive') :
  127. # o=get_first_char_if_digit(msg_content)
  128. # if o is not None:
  129. # userSelectOptions=cache_data.get('options')
  130. # if o < len(userSelectOptions):
  131. # o=o-1
  132. # msg_content=userSelectOptions[o].get("value")
  133. # messages_to_send=[{"role": "user", "content": msg_content}]
  134. # else:
  135. # messages_to_send=[{"role": "user", "content": msg_content}]
  136. # else:
  137. # messages_to_send=[{"role": "user", "content": msg_content}]
  138. # res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  139. # reply_content=res["choices"][0]["message"]["content"]
  140. # description = ''
  141. # userSelectOptions = []
  142. # if isinstance(reply_content, list) and any(item.get("type") == "interactive" for item in reply_content):
  143. # for item in reply_content:
  144. # if item["type"] == "interactive" and item["interactive"]["type"] == "userSelect":
  145. # params = item["interactive"]["params"]
  146. # description = params.get("description")
  147. # userSelectOptions = params.get("userSelectOptions", [])
  148. # values_string = "\n".join(option["value"] for option in userSelectOptions)
  149. # if description is not None:
  150. # memory.USER_INTERACTIVE_CACHE[wxid] = {
  151. # "interactive":True,
  152. # "options": userSelectOptions,
  153. # }
  154. # reply_content=description + '------------------------------\n'+values_string
  155. # elif isinstance(reply_content, list) and any(item.get("type") == "text" for item in reply_content):
  156. # memory.USER_INTERACTIVE_CACHE[wxid] = {
  157. # "interactive":False
  158. # }
  159. # text=''
  160. # for item in reply_content:
  161. # if item["type"] == "text":
  162. # text=item["text"]["content"]
  163. # if text=='':
  164. # # 去除上次上一轮对话再次请求
  165. # cache_messages_str=redis_helper.redis_helper.get_hash_field(hash_key,"data")
  166. # cache_messages = json.loads(cache_messages_str) if cache_messages_str else []
  167. # if len(cache_messages) >= 3:
  168. # cache_messages = cache_messages[:-3]
  169. # redis_helper.redis_helper.update_hash_field(hash_key,"data",json.dumps(cache_messages,ensure_ascii=False))
  170. # messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  171. # res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  172. # reply_content=res["choices"][0]["message"]["content"]
  173. # if isinstance(reply_content, list) :
  174. # reply_content=reply_content[0].get('text').get("content")
  175. # else:
  176. # reply_content=text
  177. # else:
  178. # memory.USER_INTERACTIVE_CACHE[wxid] = {
  179. # "interactive":False
  180. # }
  181. # reply_content=res["choices"][0]["message"]["content"]
  182. # gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,reply_content)
  183. # gewe_chat.wxchat.save_session_messages_to_cache(hash_key, {"role": "assistant", "content": reply_content})
  184. # # 回复的对话
  185. # input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
  186. # input_message=utils.dialogue_message(wxid,callback_to_user,input_wx_content_dialogue_message,True)
  187. # kafka_helper.kafka_client.produce_message(input_message)
  188. # logger.info("发送对话 %s",input_message)
  189. # 创建并启动任务线程,将参数传递给 ai_chat_text 函数
  190. task_thread = threading.Thread(
  191. target=ai_chat_text,
  192. args=(token_id,app_id,wxid,msg_data,msg_content)
  193. )
  194. task_thread.start()
  195. # 设置定时器,1秒后检查任务是否超时。这里需要使用 lambda 来传递参数
  196. timeout_timer = threading.Timer(
  197. timeout_duration,
  198. lambda:check_timeout(task_thread, token_id, wxid,app_id, callback_to_user)
  199. )
  200. timeout_timer.start()
  201. # 等待任务线程完成
  202. task_thread.join()
  203. # 取消定时器
  204. timeout_timer.cancel()
  205. def check_timeout( task_thread:threading.Thread, token_id,wxid, app_id, callback_to_user):
  206. if task_thread.is_alive():
  207. print(f"任务运行时间超过{timeout_duration}秒,token_id={token_id}, app_id={app_id}, callback_to_user={callback_to_user}")
  208. wx_config = gewe_chat.wxchat.get_wxchat_config_from_cache(wxid)
  209. if bool(wx_config.get("chatWaitingMsgEnabled",True)):
  210. gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,"亲,我正在组织回复的信息,请稍等一会")
  211. def ai_chat_text(token_id,app_id,wxid,msg_data,msg_content):
  212. start_time = time.time() # 记录任务开始时间
  213. callback_to_user=msg_data["FromUserName"]["string"]
  214. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
  215. prompt={"role": "user", "content": [{
  216. "type": "text",
  217. "text": msg_content
  218. }]}
  219. messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  220. # 收到的对话
  221. input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  222. input_message=utils.dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message)
  223. kafka_helper.kafka_client.produce_message(input_message)
  224. logger.info("发送对话 %s",input_message)
  225. cache_data = memory.USER_INTERACTIVE_CACHE.get(wxid)
  226. if cache_data and cache_data.get('interactive') :
  227. o=get_first_char_if_digit(msg_content)
  228. if o is not None:
  229. userSelectOptions=cache_data.get('options')
  230. if o < len(userSelectOptions):
  231. o=o-1
  232. msg_content=userSelectOptions[o].get("value")
  233. messages_to_send=[{"role": "user", "content": msg_content}]
  234. else:
  235. messages_to_send=[{"role": "user", "content": msg_content}]
  236. else:
  237. messages_to_send=[{"role": "user", "content": msg_content}]
  238. res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  239. reply_content=res["choices"][0]["message"]["content"]
  240. description = ''
  241. userSelectOptions = []
  242. if isinstance(reply_content, list) and any(item.get("type") == "interactive" for item in reply_content):
  243. for item in reply_content:
  244. if item["type"] == "interactive" and item["interactive"]["type"] == "userSelect":
  245. params = item["interactive"]["params"]
  246. description = params.get("description")
  247. userSelectOptions = params.get("userSelectOptions", [])
  248. values_string = "\n".join(option["value"] for option in userSelectOptions)
  249. if description is not None:
  250. memory.USER_INTERACTIVE_CACHE[wxid] = {
  251. "interactive":True,
  252. "options": userSelectOptions,
  253. }
  254. reply_content=description + '------------------------------\n'+values_string
  255. elif isinstance(reply_content, list) and any(item.get("type") == "text" for item in reply_content):
  256. memory.USER_INTERACTIVE_CACHE[wxid] = {
  257. "interactive":False
  258. }
  259. text=''
  260. for item in reply_content:
  261. if item["type"] == "text":
  262. text=item["text"]["content"]
  263. if text=='':
  264. # 去除上次上一轮对话再次请求
  265. cache_messages_str=redis_helper.redis_helper.get_hash_field(hash_key,"data")
  266. cache_messages = json.loads(cache_messages_str) if cache_messages_str else []
  267. if len(cache_messages) >= 3:
  268. cache_messages = cache_messages[:-3]
  269. redis_helper.redis_helper.update_hash_field(hash_key,"data",json.dumps(cache_messages,ensure_ascii=False))
  270. messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  271. res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  272. reply_content=res["choices"][0]["message"]["content"]
  273. if isinstance(reply_content, list) :
  274. reply_content=reply_content[0].get('text').get("content")
  275. else:
  276. reply_content=text
  277. else:
  278. memory.USER_INTERACTIVE_CACHE[wxid] = {
  279. "interactive":False
  280. }
  281. reply_content=res["choices"][0]["message"]["content"]
  282. gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,reply_content)
  283. gewe_chat.wxchat.save_session_messages_to_cache(hash_key, {"role": "assistant", "content": reply_content})
  284. # 回复的对话
  285. input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
  286. input_message=utils.dialogue_message(wxid,callback_to_user,input_wx_content_dialogue_message,True)
  287. kafka_helper.kafka_client.produce_message(input_message)
  288. logger.info("发送对话 %s",input_message)
  289. end_time = time.time() # 记录任务结束时间
  290. execution_time = end_time - start_time # 计算执行时间
  291. logger.info(f"AI回答任务完成,耗时 {execution_time:.2f} 秒")
  292. def handle_text_group(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  293. '''
  294. 群聊文本消息
  295. '''
  296. msg_content=msg_data["Content"]["string"]
  297. msg_push_content=msg_data.get("PushContent")
  298. k,login_info=get_login_info_by_app_id(app_id)
  299. nickname=login_info.get("nickName")
  300. if wxid == from_wxid: #手动发送消息
  301. logger.info("Active message sending detected")
  302. gewe_chat.wxchat.save_contacts_brief_to_cache(token_id,app_id,wxid,[to_wxid])
  303. callback_to_user=msg_data["ToUserName"]["string"]
  304. input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  305. input_message=utils.dialogue_message(from_wxid,to_wxid,input_wx_content_dialogue_message)
  306. kafka_helper.kafka_client.produce_message(input_message)
  307. logger.info("发送对话 %s",input_message)
  308. else:
  309. c = gewe_chat.wxchat.get_wxchat_config_from_cache(wxid)
  310. chatroom_id_white_list = c.get("chatroomIdWhiteList", [])
  311. if not chatroom_id_white_list:
  312. logger.info('白名单为空或未定义,不处理')
  313. return
  314. if from_wxid not in chatroom_id_white_list:
  315. logger.info('群ID不在白名单中,不处理')
  316. return
  317. if '在群聊中@了你' in msg_push_content or '@'+nickname in msg_push_content:
  318. callback_to_user=msg_data["FromUserName"]["string"]
  319. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
  320. prompt={"role": "user", "content": [{
  321. "type": "text",
  322. "text": msg_content
  323. }]}
  324. messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  325. # 收到的对话
  326. input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  327. input_message=utils.dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message)
  328. kafka_helper.kafka_client.produce_message(input_message)
  329. logger.info("发送对话 %s",input_message)
  330. cache_data = memory.USER_INTERACTIVE_CACHE.get(wxid)
  331. if cache_data and cache_data.get('interactive') :
  332. o=get_first_char_if_digit(msg_content)
  333. if o is not None:
  334. userSelectOptions=cache_data.get('options')
  335. if o < len(userSelectOptions):
  336. o=o-1
  337. msg_content=userSelectOptions[o].get("value")
  338. messages_to_send=[{"role": "user", "content": msg_content}]
  339. else:
  340. messages_to_send=[{"role": "user", "content": msg_content}]
  341. else:
  342. messages_to_send=[{"role": "user", "content": msg_content}]
  343. res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  344. reply_content=res["choices"][0]["message"]["content"]
  345. description = ''
  346. userSelectOptions = []
  347. if isinstance(reply_content, list) and any(item.get("type") == "interactive" for item in reply_content):
  348. for item in reply_content:
  349. if item["type"] == "interactive" and item["interactive"]["type"] == "userSelect":
  350. params = item["interactive"]["params"]
  351. description = params.get("description")
  352. userSelectOptions = params.get("userSelectOptions", [])
  353. values_string = "\n".join(option["value"] for option in userSelectOptions)
  354. if description is not None:
  355. memory.USER_INTERACTIVE_CACHE[wxid] = {
  356. "interactive":True,
  357. "options": userSelectOptions,
  358. }
  359. reply_content=description + '------------------------------\n'+values_string
  360. elif isinstance(reply_content, list) and any(item.get("type") == "text" for item in reply_content):
  361. memory.USER_INTERACTIVE_CACHE[wxid] = {
  362. "interactive":False
  363. }
  364. text=''
  365. for item in reply_content:
  366. if item["type"] == "text":
  367. text=item["text"]["content"]
  368. if text=='':
  369. # 去除上次上一轮对话再次请求
  370. cache_messages_str=redis_helper.redis_helper.get_hash_field(hash_key,"data")
  371. cache_messages = json.loads(cache_messages_str) if cache_messages_str else []
  372. if len(cache_messages) >= 3:
  373. cache_messages = cache_messages[:-3]
  374. redis_helper.redis_helper.update_hash_field(hash_key,"data",json.dumps(cache_messages,ensure_ascii=False))
  375. messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  376. res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  377. reply_content=res["choices"][0]["message"]["content"]
  378. else:
  379. reply_content=text
  380. else:
  381. memory.USER_INTERACTIVE_CACHE[wxid] = {
  382. "interactive":False
  383. }
  384. reply_content=res["choices"][0]["message"]["content"]
  385. reply_content='@'+extract_nickname(msg_push_content) + reply_content
  386. gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,reply_content)
  387. gewe_chat.wxchat.save_session_messages_to_cache(hash_key, {"role": "assistant", "content": reply_content})
  388. # 回复的对话
  389. input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
  390. input_message=utils.dialogue_message(wxid,callback_to_user,input_wx_content_dialogue_message,True)
  391. kafka_helper.kafka_client.produce_message(input_message)
  392. logger.info("发送对话 %s",input_message)
  393. else:
  394. logger.info('群聊公开消息')
  395. callback_to_user=msg_data["FromUserName"]["string"]
  396. dialogue_message=[{"type": "text", "text": msg_content}]
  397. input_message=utils.dialogue_message(callback_to_user,wxid,dialogue_message)
  398. kafka_helper.kafka_client.produce_message(input_message)
  399. logger.info("发送对话 %s",input_message)
  400. return
  401. def handle_image(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  402. '''
  403. 图片消息
  404. '''
  405. msg_content=msg_data["Content"]["string"]
  406. callback_to_user=from_wxid
  407. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
  408. wx_img_url=gewe_chat.wxchat.download_image_msg(token_id,app_id,msg_content)
  409. oss_access_key_id="LTAI5tRTG6pLhTpKACJYoPR5"
  410. oss_access_key_secret="E7dMzeeMxq4VQvLg7Tq7uKf3XWpYfN"
  411. oss_endpoint="http://oss-cn-shanghai.aliyuncs.com"
  412. oss_bucket_name="cow-agent"
  413. oss_prefix="cow"
  414. img_url=utils.upload_oss(oss_access_key_id, oss_access_key_secret, oss_endpoint, oss_bucket_name, wx_img_url, oss_prefix)
  415. prompt={
  416. "role": "user",
  417. "content": [{
  418. "type": "image_url",
  419. "image_url": {"url": img_url}
  420. }]
  421. }
  422. gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  423. gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,'已经上传了图片,有什么可以为您服务')
  424. logger.info(f"上传图片 URL: {img_url}")
  425. wx_content_dialogue_message=[{"type": "image_url", "image_url": {"url": img_url}}]
  426. input_message=utils.dialogue_message(wxid,callback_to_user,wx_content_dialogue_message)
  427. kafka_helper.kafka_client.produce_message(input_message)
  428. logger.info("发送对话 %s",input_message)
  429. def handle_image_group(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  430. logger.info('群聊图片消息')
  431. def handle_voice(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  432. '''
  433. 语音消息
  434. '''
  435. callback_to_user=from_wxid
  436. msg_content=msg_data["Content"]["string"]
  437. msg_id=msg_data["MsgId"]
  438. file_url=gewe_chat.wxchat.download_audio_msg(token_id,app_id,msg_id,msg_content)
  439. react_silk_path=utils.save_to_local_from_url(file_url)
  440. react_wav_path = os.path.splitext(react_silk_path)[0] + ".wav"
  441. audio_convert.any_to_wav(react_silk_path,react_wav_path)
  442. react_voice_text=AliVoice().voiceToText(react_wav_path)
  443. os.remove(react_silk_path)
  444. os.remove(react_wav_path)
  445. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
  446. messages=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, {"role": "user", "content": react_voice_text})
  447. ai_res=fast_gpt_api(messages,wxid,callback_to_user)
  448. ai_res_content=ai_res["choices"][0]["message"]["content"]
  449. has_url=contains_url(ai_res_content)
  450. if not has_url:
  451. voice_during,voice_url=utils.wx_voice(ai_res_content)
  452. if voice_during < 60 * 1000:
  453. ret,ret_msg,res=gewe_chat.wxchat.post_voice(token_id,app_id,callback_to_user,voice_url,voice_during)
  454. else:
  455. ret,ret_msg,res=gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,ai_res_content)
  456. logger.warning(f'回应语音消息长度 {voice_during/1000}秒,超过60秒,转为文本回复')
  457. if ret==200:
  458. logger.info((f'{wxid} 向 {callback_to_user} 发送语音文本【{ai_res_content}】{ret_msg}'))
  459. else:
  460. logger.warning((f'{wxid} 向 {callback_to_user} 发送语音文本【{ai_res_content}】{ret_msg}'))
  461. ret,ret_msg,res=gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,ai_res_content)
  462. logger.info((f'{wxid} 向 {callback_to_user} 发送文本【{ai_res_content}】{ret_msg}'))
  463. else:
  464. logger.info(f"回复内容包含网址,不发送语音,回复文字内容:{ai_res_content}")
  465. ret,ret_msg,res=gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,ai_res_content)
  466. gewe_chat.wxchat.save_session_messages_to_cache(hash_key, {"role": "assistant", "content": ai_res_content})
  467. # 构造对话消息并发送到 Kafka
  468. input_wx_content_dialogue_message = [{"type": "text", "text": ai_res_content}]
  469. input_message = utils.dialogue_message(wxid, callback_to_user, input_wx_content_dialogue_message,True)
  470. kafka_helper.kafka_client.produce_message(input_message)
  471. logger.info("发送对话 %s", input_message)
  472. def handle_voice_group(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  473. logger.info('语音消息')
  474. def handle_xml(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  475. '''
  476. 处理xml
  477. '''
  478. msg_content_xml=msg_data["Content"]["string"]
  479. root = ET.fromstring(msg_content_xml)
  480. type_value = root.find(".//appmsg/type").text
  481. handlers = {
  482. 57: handle_xml_reference,
  483. }
  484. handler = handlers.get(type_value)
  485. if handler:
  486. return handler(token_id,app_id, wxid,msg_data,from_wxid, to_wxid)
  487. elif "邀请你加入了群聊" in msg_content_xml: # 邀请加入群聊
  488. logger.warning(f"xml消息 {type_value} 邀请你加入了群聊.todo")
  489. else:
  490. print(f"xml消息 {type_value} 未解析")
  491. def handle_xml_reference(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  492. '''
  493. 判断此类消息的逻辑:$.Data.MsgType=49 并且 解析$.Data.Content.string中的xml msg.appmsg.type=57
  494. '''
  495. callback_to_user=from_wxid
  496. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
  497. msg_content= msg_data["PushContent"]
  498. prompt={"role": "user", "content": [{
  499. "type": "text",
  500. "text": msg_content
  501. }]}
  502. # 收到的对话
  503. messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  504. input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  505. input_message=utils.dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message)
  506. kafka_helper.kafka_client.produce_message(input_message)
  507. logger.info("发送对话 %s",input_message)
  508. # 回复的对话
  509. res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  510. reply_content=res["choices"][0]["message"]["content"]
  511. input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
  512. input_message=utils.dialogue_message(wxid,callback_to_user,input_wx_content_dialogue_message,True)
  513. kafka_helper.kafka_client.produce_message(input_message)
  514. logger.info("发送对话 %s",input_message)
  515. gewe_chat.wxchat.save_session_messages_to_cache(hash_key, {"role": "assistant", "content": reply_content})
  516. gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,reply_content)
  517. def handle_add_friend_notice(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  518. '''
  519. 好友添加请求通知
  520. '''
  521. logger.info('好友添加请求通知')
  522. msg_content_xml=msg_data["Content"]["string"]
  523. root = ET.fromstring(msg_content_xml)
  524. msg_content = root.attrib.get('content', None)
  525. v3= root.attrib.get('encryptusername', None)
  526. v4= root.attrib.get('ticket', None)
  527. scene=root.attrib.get('scene', None)
  528. to_contact_wxid=root.attrib.get('fromusername', None)
  529. wxid=msg_data["ToUserName"]["string"]
  530. # 自动同意好友
  531. # print(v3)
  532. # print(v4)
  533. # print(scene)
  534. # print(msg_content)
  535. # 操作类型,2添加好友 3同意好友 4拒绝好友
  536. #option=2
  537. option=3
  538. reply_add_contact_contact="亲,我是你的美丽顾问"
  539. ret,ret_msg=gewe_chat.wxchat.add_contacts(token_id,app_id,scene,option,v3,v4,reply_add_contact_contact)
  540. if ret==200:
  541. logger.info('自动添加好友成功')
  542. # 好友发送的文字
  543. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{to_contact_wxid}'
  544. prompt={"role": "user", "content": [{"type": "text","text": msg_content}]}
  545. messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  546. input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  547. input_message=utils.dialogue_message(to_contact_wxid,wxid,input_wx_content_dialogue_message)
  548. kafka_helper.kafka_client.produce_message(input_message)
  549. logger.info("发送对话 %s",input_message)
  550. callback_to_user=to_contact_wxid
  551. res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  552. reply_content=res["choices"][0]["message"]["content"]
  553. #保存好友信息
  554. gewe_chat.wxchat.save_contacts_brief_to_cache(token_id,app_id, wxid,[to_contact_wxid])
  555. # 保存到缓存
  556. gewe_chat.wxchat.save_session_messages_to_cache(hash_key, {"role": "assistant", "content": reply_content})
  557. # 发送信息
  558. gewe_chat.wxchat.post_text(token_id,app_id, to_contact_wxid,reply_content)
  559. # 发送到kafka
  560. input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
  561. input_message=utils.dialogue_message(wxid,to_contact_wxid,input_wx_content_dialogue_message,True)
  562. kafka_helper.kafka_client.produce_message(input_message)
  563. logger.info("发送对话 %s",input_message)
  564. else:
  565. logger.warning("添加好友失败")
  566. def handle_10002_msg(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  567. '''
  568. 群聊邀请
  569. 撤回消息
  570. 拍一拍消息
  571. 地理位置
  572. 踢出群聊通知
  573. 解散群聊通知
  574. 发布群公告
  575. '''
  576. msg_content_xml=msg_data["Content"]["string"]
  577. # 群聊邀请
  578. if '邀请你加入了群聊' in msg_content_xml and check_chatroom(msg_data["FromUserName"]["string"]):
  579. chatroom_id=msg_data["FromUserName"]["string"]
  580. ret,msg,data=gewe_chat.wxchat.save_contract_list(token_id,app_id,chatroom_id,3)
  581. logger.info(f'群聊邀请,保存到通讯录 chatroom_id {chatroom_id} {msg}')
  582. gewe_chat.wxchat.update_groups_info_to_cache(token_id,app_id,wxid,chatroom_id)
  583. if '移出了群聊' in msg_content_xml and 'sysmsgtemplate' in msg_content_xml :
  584. chatroom_id=msg_data["FromUserName"]["string"]
  585. ret,msg,data=gewe_chat.wxchat.save_contract_list(token_id,app_id,chatroom_id,2)
  586. logger.info(f'踢出群聊,移除从通讯录 chatroom_id {chatroom_id} {msg}')
  587. redis_helper.redis_helper.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id)
  588. logger.info(f'清除 chatroom_id{chatroom_id} 数据')
  589. if '已解散该群聊' in msg_content_xml and 'sysmsgtemplate' in msg_content_xml :
  590. chatroom_id=msg_data["FromUserName"]["string"]
  591. ret,msg,data=gewe_chat.wxchat.save_contract_list(token_id,app_id,chatroom_id,2)
  592. logger.info(f'解散群聊,移除从通讯录 chatroom_id {chatroom_id} {msg}')
  593. redis_helper.redis_helper.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id)
  594. logger.info(f'清除 chatroom_id{chatroom_id} 数据')
  595. print('撤回消息,拍一拍消息,地理位置')
  596. def handle_mod_contacts(token_id,app_id,wxid,msg_data):
  597. '''
  598. 好友通过验证及好友资料变更的通知消息
  599. '''
  600. logger.info('好友通过验证及好友资料变更的通知消息')
  601. if not check_chatroom(msg_data["UserName"]["string"]):
  602. contact_wxid = msg_data["UserName"]["string"]
  603. # 更新好友信息
  604. ret,msg,contacts_list = gewe_chat.wxchat.fetch_contacts_list(token_id, app_id)
  605. friend_wxids = contacts_list['friends'][3:] # 可以调整截取范围
  606. print(friend_wxids)
  607. #friend_wxids.remove('weixin')
  608. gewe_chat.wxchat.save_contacts_brief_to_cache(token_id, app_id, wxid, friend_wxids)
  609. # keys=redis_helper.redis_helper.client.keys('__AI_OPS_WX__:LOGININFO:*')
  610. # wxid=""
  611. # for k in keys:
  612. # key=k.decode('utf-8')
  613. # hash_key=f"__AI_OPS_WX__:LOGININFO:{key}"
  614. # cache_app_id=redis_helper.redis_helper.get_hash_field(hash_key,"appId")
  615. # if app_id==cache_app_id:
  616. # wxid=redis_helper.redis_helper.get_hash_field(hash_key,"wxid")
  617. # break
  618. # print(wxid)
  619. # print('~~~~~~~~~~~~~~~~~~~~~~~~~~~\n')
  620. # if wxid!="":
  621. # print('~~~~~~~~~~~~~~~~~~~~~~~~~~~\n')
  622. # hash_key=f"__AI_OPS_WX__:CONTACTS_BRIEF:{wxid}"
  623. # ret,msg,contacts_list = gewe_chat.wxchat.fetch_contacts_list(token_id, app_id)
  624. # friend_wxids = contacts_list['friends'][3:] # 可以调整截取范围
  625. # print(friend_wxids)
  626. # #friend_wxids.remove('weixin')
  627. # gewe_chat.wxchat.save_contacts_brief_to_cache(token_id, app_id, wxid, friend_wxids)
  628. # cache_str=redis_helper.redis_helper.get_hash_field(hash_key,"data")
  629. # cache = json.loads(cache_str) if cache_str else []
  630. # for c in cache:
  631. # if c["userName"]==contact_wxid:
  632. # c["nickName"] = msg_data["NickName"]
  633. # c["snsBgImg"] = msg_data["SnsBgimgId"]
  634. # c["smallHeadImgUrl"] = msg_data["SmallHeadImgUrl"]
  635. # c["signature"]= msg_data["Signature"]
  636. # # 更新缓存,如果有修改过的话
  637. # if cache:
  638. # updated_cache_str = json.dumps(cache)
  639. # redis_helper.redis_helper.set_hash_field(hash_key, "data", updated_cache_str)
  640. else:
  641. logger.info('群聊好友通过验证及好友资料变更的通知消息')
  642. def get_messages_from_cache(hash_key,object:object)->list:
  643. '''
  644. 对话列表
  645. '''
  646. messages=redis_helper.redis_helper.get_hash(hash_key)
  647. wxid=hash_key.split(':')[-1]
  648. if not messages:
  649. messages=[{"role": "system", "content": ""}]
  650. messages.append(object)
  651. redis_helper.redis_helper.set_hash(hash_key,{"data":json.dumps(messages,ensure_ascii=False)},3600)
  652. else:
  653. messages_str=redis_helper.redis_helper.get_hash_field(hash_key,"data")
  654. messages = json.loads(messages_str) if messages_str else []
  655. #判断是否含有图片
  656. last_message = messages[-1]
  657. content = last_message.get("content", [])
  658. if isinstance(content, list) and content:
  659. last_content_type = content[-1].get("type")
  660. if last_content_type == 'image_url':
  661. content.append(object['content'][0])
  662. messages[-1]['content']=content
  663. else:
  664. messages.append(object)
  665. else:
  666. messages.append(object)
  667. redis_helper.redis_helper.set_hash(hash_key,{"data":json.dumps(messages,ensure_ascii=False)},3600)
  668. return messages
  669. # def fast_gpt_api(messages:list,session_id:str):
  670. # api_key="sk-jr69ONIehfGKe9JFphuNk4DU5Y5wooHKHhQv7oSnFzVbwCnW65fXO9kvH" #测试
  671. # #api_key="sk-uJDBdKmJVb2cmfldGOvlIY6Qx0AzqWMPD3lS1IzgQYzHNOXv9SKNI" #开发2
  672. # api_url = "http://106.15.182.218:3000/api/v1/chat/completions"
  673. # headers = {
  674. # "Content-Type": "application/json",
  675. # "Authorization": f"Bearer {api_key}"
  676. # }
  677. # data={
  678. # "model": "",
  679. # "messages":messages,
  680. # "chatId": session_id,
  681. # "detail": True
  682. # }
  683. # #print(json.dumps(data,ensure_ascii=False))
  684. # logger.info("[CHATGPT] 请求={}".format(json.dumps(data, ensure_ascii=False)))
  685. # response = requests.post(url=api_url, headers=headers, data=json.dumps(data), timeout=600)
  686. # response.raise_for_status()
  687. # response_data = response.json()
  688. # logger.info("[CHATGPT] 响应={}".format(json.dumps(response_data, separators=(',', ':'),ensure_ascii=False)))
  689. # print(response_data)
  690. # return response_data
  691. def fast_gpt_api(messages:list,wixd:str,friend_wxid:str):
  692. c=gewe_chat.wxchat.get_wxchat_config_from_cache(wixd)
  693. api_key=c.get('agentTokenId',"sk-jr69ONIehfGKe9JFphuNk4DU5Y5wooHKHhQv7oSnFzVbwCnW65fXO9kvH")
  694. print(f'流程key:{api_key}\n~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~\n')
  695. #api_key="sk-jr69ONIehfGKe9JFphuNk4DU5Y5wooHKHhQv7oSnFzVbwCnW65fXO9kvH" #测试
  696. #api_key="sk-uJDBdKmJVb2cmfldGOvlIY6Qx0AzqWMPD3lS1IzgQYzHNOXv9SKNI" #开发2
  697. api_url = "http://106.15.182.218:3000/api/v1/chat/completions"
  698. headers = {
  699. "Content-Type": "application/json",
  700. "Authorization": f"Bearer {api_key}"
  701. }
  702. session_id=f'{wixd}-{friend_wxid}'
  703. data={
  704. "model": "",
  705. "messages":messages,
  706. "chatId": session_id,
  707. "detail": True
  708. }
  709. #print(json.dumps(data,ensure_ascii=False))
  710. logger.info("[CHATGPT] 请求={}".format(json.dumps(data, ensure_ascii=False)))
  711. response = requests.post(url=api_url, headers=headers, data=json.dumps(data), timeout=600)
  712. response.raise_for_status()
  713. response_data = response.json()
  714. logger.info("[CHATGPT] 响应={}".format(json.dumps(response_data, separators=(',', ':'),ensure_ascii=False)))
  715. #print(response_data)
  716. return response_data
  717. def get_token_id_by_app_id(app_id: str) -> str:
  718. # 使用 SCAN 避免一次性返回所有的匹配键,逐步扫描
  719. cursor = 0
  720. while True:
  721. cursor, login_keys = redis_helper.redis_helper.client.scan(cursor, match='__AI_OPS_WX__:LOGININFO:*')
  722. # 批量获取所有键的 hash 数据
  723. for k in login_keys:
  724. r = redis_helper.redis_helper.get_hash(k)
  725. if r.get("appId") == app_id:
  726. return r.get("tokenId", "")
  727. # 如果游标为 0,则表示扫描完成
  728. if cursor == 0:
  729. break
  730. return ""
  731. def get_login_info_by_app_id(app_id: str) ->dict:
  732. # 使用 SCAN 避免一次性返回所有的匹配键,逐步扫描
  733. cursor = 0
  734. while True:
  735. cursor, login_keys = redis_helper.redis_helper.client.scan(cursor, match='__AI_OPS_WX__:LOGININFO:*')
  736. # 批量获取所有键的 hash 数据
  737. for k in login_keys:
  738. r = redis_helper.redis_helper.get_hash(k)
  739. if r.get("appId") == app_id:
  740. return k,r
  741. # 如果游标为 0,则表示扫描完成
  742. if cursor == 0:
  743. break
  744. return ""
  745. def contains_url(text):
  746. # 定义检测网址的正则表达式
  747. url_pattern = re.compile(
  748. r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+'
  749. )
  750. # 检查字符串是否包含网址
  751. return bool(url_pattern.search(text))
  752. def get_first_char_if_digit(s):
  753. if s and s[0].isdigit(): # 判断字符串是否非空且首字符为数字
  754. return int(s[0]) # 返回数字形式
  755. return None # 如果不是数字则返回 None
  756. def remove_at_mention_regex(text):
  757. # 使用正则表达式去掉“在群聊中@了你”
  758. return re.sub(r"在群聊中@了你", "", text)
  759. def extract_nickname(text)->str:
  760. if "在群聊中@了你" in text:
  761. # 如果包含 "在群聊中@了你",提取其前面的名字
  762. match = re.search(r"^(.*?)在群聊中@了你", text)
  763. if match:
  764. return match.group(1).strip()
  765. elif ": @" in text:
  766. # 如果包含 ": @",提取其前面的名字
  767. return text.split(": @")[0].strip()
  768. return ''
  769. def check_chatroom(userName):
  770. pattern = r'^\d+@chatroom$'
  771. if re.match(pattern, userName):
  772. return True
  773. return False