You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

904 lines
39KB

  1. from flask_restful import Resource, reqparse
  2. from flask import jsonify,request
  3. import requests,json,re
  4. from wechat import gewe_chat
  5. from voice.ali.ali_voice import AliVoice
  6. from common import utils,redis_helper,memory,kafka_helper
  7. from common.log import logger
  8. import xml.etree.ElementTree as ET
  9. import threading,time
  10. import os
  11. from voice import audio_convert
  12. timeout_duration = 8.0
  13. class MessagesResource(Resource):
  14. def __init__(self):
  15. self.parser = reqparse.RequestParser()
  16. def post(self):
  17. msg = request.get_json()
  18. logger.info(f"收到微信回调消息: {msg}")
  19. type_name =msg.get("TypeName")
  20. app_id = msg.get("Appid")
  21. # token_id = "f828cb3c-1039-489f-b9ae-7494d1778a15"
  22. token_id=get_token_id_by_app_id(app_id)
  23. if token_id=="":
  24. logger.warning('找不到登录信息,不处理')
  25. return jsonify({"message": "收到微信回调消息"})
  26. wxid = msg.get("Wxid",'')
  27. wx_config = gewe_chat.wxchat.get_wxchat_config_from_cache(wxid)
  28. if not bool(wx_config.get("agentEnabled",False)):
  29. logger.info(f'微信ID {wxid} 未托管,不处理')
  30. return jsonify({"message": "收到微信回调消息"})
  31. if type_name=='AddMsg':
  32. #wxid = msg.get("Wxid")
  33. msg_data = msg.get("Data")
  34. msg_type = msg_data.get("MsgType",None)
  35. from_wxid = msg_data["FromUserName"]["string"]
  36. to_wxid = msg_data["ToUserName"]["string"]
  37. msg_push_content=msg_data.get("PushContent") #群发
  38. #wx_config = gewe_chat.wxchat.get_wxchat_config_from_cache(wxid)
  39. handlers = {
  40. 1: handle_text,
  41. 3: handle_image,
  42. 34: handle_voice,
  43. 49: handle_xml,
  44. 37: handle_add_friend_notice,
  45. 10002: handle_10002_msg
  46. }
  47. # (扫码进群情况)判断受否是群聊,并添加到通信录
  48. if check_chatroom(from_wxid) or check_chatroom(to_wxid):
  49. logger.info('群信息')
  50. chatroom_id=from_wxid
  51. ret,msg,data=gewe_chat.wxchat.save_contract_list(token_id,app_id,chatroom_id,3)
  52. logger.info(f'保存到通讯录 chatroom_id {chatroom_id} {msg}')
  53. gewe_chat.wxchat.update_group_info_to_cache(token_id,app_id,wxid,chatroom_id)
  54. gewe_chat.wxchat.update_group_members_to_cache(token_id,app_id,wxid,chatroom_id)
  55. handlers[1]=handle_text_group
  56. handlers[3]=handle_image_group
  57. handlers[34]=handle_voice_group
  58. handler = handlers.get(msg_type)
  59. if handler:
  60. return handler(token_id,app_id, wxid,msg_data,from_wxid, to_wxid)
  61. else:
  62. logger.warning(f"微信回调消息类型 {msg_type} 未处理")
  63. elif type_name=='ModContacts':
  64. '''
  65. 好友通过验证及好友资料变更的通知消息
  66. '''
  67. wxid = msg.get("Wxid")
  68. msg_data = msg.get("Data")
  69. handle_mod_contacts(token_id,app_id,wxid,msg_data)
  70. elif type_name=="DelContacts":
  71. '''
  72. 删除好友通知/退出群聊
  73. '''
  74. msg_data = msg.get("Data")
  75. username=msg_data["UserName"]["string"]
  76. if check_chatroom(username):
  77. logger.info('退出群聊')
  78. wxid = msg.get("Wxid")
  79. chatroom_id=username
  80. redis_helper.redis_helper.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id)
  81. logger.info(f'清除 chatroom_id{chatroom_id} 数据')
  82. else:
  83. logger.info('删除好友通知')
  84. elif type_name=="Offline":
  85. '''
  86. 已经离线
  87. '''
  88. wxid = msg.get("Wxid")
  89. logger.warning(f'微信ID {wxid}在设备{app_id}已经离线')
  90. k,r=get_login_info_by_app_id(app_id)
  91. print(k)
  92. redis_helper.redis_helper.update_hash_field(k,'status',0)
  93. redis_helper.redis_helper.update_hash_field(k,'modify_at',int(time.time()))
  94. else:
  95. logger.warning(f"未知消息类型")
  96. return jsonify({"message": "收到微信回调消息"})
  97. def handle_text(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  98. '''
  99. 私聊文本消息
  100. '''
  101. msg_content=msg_data["Content"]["string"]
  102. if wxid == from_wxid: #手动发送消息
  103. logger.info("Active message sending detected")
  104. gewe_chat.wxchat.save_contacts_brief_to_cache(token_id,app_id,wxid,[to_wxid])
  105. callback_to_user=msg_data["ToUserName"]["string"]
  106. input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  107. input_message=utils.dialogue_message(from_wxid,to_wxid,input_wx_content_dialogue_message)
  108. kafka_helper.kafka_client.produce_message(input_message)
  109. logger.info("发送对话 %s",input_message)
  110. else:
  111. callback_to_user=msg_data["FromUserName"]["string"]
  112. # hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
  113. # prompt={"role": "user", "content": [{
  114. # "type": "text",
  115. # "text": msg_content
  116. # }]}
  117. # messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  118. # # 收到的对话
  119. # input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  120. # input_message=utils.dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message)
  121. # kafka_helper.kafka_client.produce_message(input_message)
  122. # logger.info("发送对话 %s",input_message)
  123. # cache_data = memory.USER_INTERACTIVE_CACHE.get(wxid)
  124. # if cache_data and cache_data.get('interactive') :
  125. # o=get_first_char_if_digit(msg_content)
  126. # if o is not None:
  127. # userSelectOptions=cache_data.get('options')
  128. # if o < len(userSelectOptions):
  129. # o=o-1
  130. # msg_content=userSelectOptions[o].get("value")
  131. # messages_to_send=[{"role": "user", "content": msg_content}]
  132. # else:
  133. # messages_to_send=[{"role": "user", "content": msg_content}]
  134. # else:
  135. # messages_to_send=[{"role": "user", "content": msg_content}]
  136. # res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  137. # reply_content=res["choices"][0]["message"]["content"]
  138. # description = ''
  139. # userSelectOptions = []
  140. # if isinstance(reply_content, list) and any(item.get("type") == "interactive" for item in reply_content):
  141. # for item in reply_content:
  142. # if item["type"] == "interactive" and item["interactive"]["type"] == "userSelect":
  143. # params = item["interactive"]["params"]
  144. # description = params.get("description")
  145. # userSelectOptions = params.get("userSelectOptions", [])
  146. # values_string = "\n".join(option["value"] for option in userSelectOptions)
  147. # if description is not None:
  148. # memory.USER_INTERACTIVE_CACHE[wxid] = {
  149. # "interactive":True,
  150. # "options": userSelectOptions,
  151. # }
  152. # reply_content=description + '------------------------------\n'+values_string
  153. # elif isinstance(reply_content, list) and any(item.get("type") == "text" for item in reply_content):
  154. # memory.USER_INTERACTIVE_CACHE[wxid] = {
  155. # "interactive":False
  156. # }
  157. # text=''
  158. # for item in reply_content:
  159. # if item["type"] == "text":
  160. # text=item["text"]["content"]
  161. # if text=='':
  162. # # 去除上次上一轮对话再次请求
  163. # cache_messages_str=redis_helper.redis_helper.get_hash_field(hash_key,"data")
  164. # cache_messages = json.loads(cache_messages_str) if cache_messages_str else []
  165. # if len(cache_messages) >= 3:
  166. # cache_messages = cache_messages[:-3]
  167. # redis_helper.redis_helper.update_hash_field(hash_key,"data",json.dumps(cache_messages,ensure_ascii=False))
  168. # messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  169. # res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  170. # reply_content=res["choices"][0]["message"]["content"]
  171. # if isinstance(reply_content, list) :
  172. # reply_content=reply_content[0].get('text').get("content")
  173. # else:
  174. # reply_content=text
  175. # else:
  176. # memory.USER_INTERACTIVE_CACHE[wxid] = {
  177. # "interactive":False
  178. # }
  179. # reply_content=res["choices"][0]["message"]["content"]
  180. # gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,reply_content)
  181. # gewe_chat.wxchat.save_session_messages_to_cache(hash_key, {"role": "assistant", "content": reply_content})
  182. # # 回复的对话
  183. # input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
  184. # input_message=utils.dialogue_message(wxid,callback_to_user,input_wx_content_dialogue_message,True)
  185. # kafka_helper.kafka_client.produce_message(input_message)
  186. # logger.info("发送对话 %s",input_message)
  187. # 创建并启动任务线程,将参数传递给 ai_chat_text 函数
  188. task_thread = threading.Thread(
  189. target=ai_chat_text,
  190. args=(token_id,app_id,wxid,msg_data,msg_content)
  191. )
  192. task_thread.start()
  193. # 设置定时器,1秒后检查任务是否超时。这里需要使用 lambda 来传递参数
  194. timeout_timer = threading.Timer(
  195. timeout_duration,
  196. lambda:check_timeout(task_thread, token_id, wxid,app_id, callback_to_user)
  197. )
  198. timeout_timer.start()
  199. # 等待任务线程完成
  200. task_thread.join()
  201. # 取消定时器
  202. timeout_timer.cancel()
  203. def check_timeout( task_thread:threading.Thread, token_id,wxid, app_id, callback_to_user):
  204. if task_thread.is_alive():
  205. print(f"任务运行时间超过{timeout_duration}秒,token_id={token_id}, app_id={app_id}, callback_to_user={callback_to_user}")
  206. wx_config = gewe_chat.wxchat.get_wxchat_config_from_cache(wxid)
  207. if bool(wx_config.get("chatWaitingMsgEnabled",True)):
  208. gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,"亲,我正在组织回复的信息,请稍等一会")
  209. def ai_chat_text(token_id,app_id,wxid,msg_data,msg_content):
  210. start_time = time.time() # 记录任务开始时间
  211. callback_to_user=msg_data["FromUserName"]["string"]
  212. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
  213. prompt={"role": "user", "content": [{
  214. "type": "text",
  215. "text": msg_content
  216. }]}
  217. messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  218. # 收到的对话
  219. input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  220. input_message=utils.dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message)
  221. kafka_helper.kafka_client.produce_message(input_message)
  222. logger.info("发送对话 %s",input_message)
  223. cache_data = memory.USER_INTERACTIVE_CACHE.get(wxid)
  224. if cache_data and cache_data.get('interactive') :
  225. o=get_first_char_if_digit(msg_content)
  226. if o is not None:
  227. userSelectOptions=cache_data.get('options')
  228. if o < len(userSelectOptions):
  229. o=o-1
  230. msg_content=userSelectOptions[o].get("value")
  231. messages_to_send=[{"role": "user", "content": msg_content}]
  232. else:
  233. messages_to_send=[{"role": "user", "content": msg_content}]
  234. else:
  235. messages_to_send=[{"role": "user", "content": msg_content}]
  236. res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  237. reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"])
  238. description = ''
  239. userSelectOptions = []
  240. if isinstance(reply_content, list) and any(item.get("type") == "interactive" for item in reply_content):
  241. for item in reply_content:
  242. if item["type"] == "interactive" and item["interactive"]["type"] == "userSelect":
  243. params = item["interactive"]["params"]
  244. description = params.get("description")
  245. userSelectOptions = params.get("userSelectOptions", [])
  246. values_string = "\n".join(option["value"] for option in userSelectOptions)
  247. if description is not None:
  248. memory.USER_INTERACTIVE_CACHE[wxid] = {
  249. "interactive":True,
  250. "options": userSelectOptions,
  251. }
  252. reply_content=description + '------------------------------\n'+values_string
  253. elif isinstance(reply_content, list) and any(item.get("type") == "text" for item in reply_content):
  254. memory.USER_INTERACTIVE_CACHE[wxid] = {
  255. "interactive":False
  256. }
  257. text=''
  258. for item in reply_content:
  259. if item["type"] == "text":
  260. text=item["text"]["content"]
  261. if text=='':
  262. # 去除上次上一轮对话再次请求
  263. cache_messages_str=redis_helper.redis_helper.get_hash_field(hash_key,"data")
  264. cache_messages = json.loads(cache_messages_str) if cache_messages_str else []
  265. if len(cache_messages) >= 3:
  266. cache_messages = cache_messages[:-3]
  267. redis_helper.redis_helper.update_hash_field(hash_key,"data",json.dumps(cache_messages,ensure_ascii=False))
  268. messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  269. res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  270. reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"])
  271. if isinstance(reply_content, list) :
  272. reply_content=remove_markdown_symbol(reply_content[0].get('text').get("content"))
  273. else:
  274. reply_content=text
  275. else:
  276. memory.USER_INTERACTIVE_CACHE[wxid] = {
  277. "interactive":False
  278. }
  279. reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"])
  280. gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,reply_content)
  281. gewe_chat.wxchat.save_session_messages_to_cache(hash_key, {"role": "assistant", "content": reply_content})
  282. # 回复的对话
  283. input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
  284. input_message=utils.dialogue_message(wxid,callback_to_user,input_wx_content_dialogue_message,True)
  285. kafka_helper.kafka_client.produce_message(input_message)
  286. logger.info("发送对话 %s",input_message)
  287. end_time = time.time() # 记录任务结束时间
  288. execution_time = end_time - start_time # 计算执行时间
  289. logger.info(f"AI回答任务完成,耗时 {execution_time:.2f} 秒")
  290. def handle_text_group(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  291. '''
  292. 群聊文本消息
  293. '''
  294. msg_content=msg_data["Content"]["string"]
  295. msg_push_content=msg_data.get("PushContent")
  296. k,login_info=get_login_info_by_app_id(app_id)
  297. nickname=login_info.get("nickName")
  298. if wxid == from_wxid: #手动发送消息
  299. logger.info("Active message sending detected")
  300. gewe_chat.wxchat.save_contacts_brief_to_cache(token_id,app_id,wxid,[to_wxid])
  301. callback_to_user=msg_data["ToUserName"]["string"]
  302. input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  303. input_message=utils.dialogue_message(from_wxid,to_wxid,input_wx_content_dialogue_message)
  304. kafka_helper.kafka_client.produce_message(input_message)
  305. logger.info("发送对话 %s",input_message)
  306. else:
  307. c = gewe_chat.wxchat.get_wxchat_config_from_cache(wxid)
  308. chatroom_id_white_list = c.get("chatroomIdWhiteList", [])
  309. if not chatroom_id_white_list:
  310. logger.info('白名单为空或未定义,不处理')
  311. return
  312. if from_wxid not in chatroom_id_white_list:
  313. logger.info('群ID不在白名单中,不处理')
  314. return
  315. if '在群聊中@了你' in msg_push_content or '@'+nickname in msg_push_content:
  316. callback_to_user=msg_data["FromUserName"]["string"]
  317. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
  318. prompt={"role": "user", "content": [{
  319. "type": "text",
  320. "text": msg_content
  321. }]}
  322. messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  323. # 收到的对话
  324. input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  325. input_message=utils.dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message)
  326. kafka_helper.kafka_client.produce_message(input_message)
  327. logger.info("发送对话 %s",input_message)
  328. cache_data = memory.USER_INTERACTIVE_CACHE.get(wxid)
  329. if cache_data and cache_data.get('interactive') :
  330. o=get_first_char_if_digit(msg_content)
  331. if o is not None:
  332. userSelectOptions=cache_data.get('options')
  333. if o < len(userSelectOptions):
  334. o=o-1
  335. msg_content=userSelectOptions[o].get("value")
  336. messages_to_send=[{"role": "user", "content": msg_content}]
  337. else:
  338. messages_to_send=[{"role": "user", "content": msg_content}]
  339. else:
  340. messages_to_send=[{"role": "user", "content": msg_content}]
  341. res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  342. reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"])
  343. description = ''
  344. userSelectOptions = []
  345. if isinstance(reply_content, list) and any(item.get("type") == "interactive" for item in reply_content):
  346. for item in reply_content:
  347. if item["type"] == "interactive" and item["interactive"]["type"] == "userSelect":
  348. params = item["interactive"]["params"]
  349. description = params.get("description")
  350. userSelectOptions = params.get("userSelectOptions", [])
  351. values_string = "\n".join(option["value"] for option in userSelectOptions)
  352. if description is not None:
  353. memory.USER_INTERACTIVE_CACHE[wxid] = {
  354. "interactive":True,
  355. "options": userSelectOptions,
  356. }
  357. reply_content=description + '------------------------------\n'+values_string
  358. elif isinstance(reply_content, list) and any(item.get("type") == "text" for item in reply_content):
  359. memory.USER_INTERACTIVE_CACHE[wxid] = {
  360. "interactive":False
  361. }
  362. text=''
  363. for item in reply_content:
  364. if item["type"] == "text":
  365. text=item["text"]["content"]
  366. if text=='':
  367. # 去除上次上一轮对话再次请求
  368. cache_messages_str=redis_helper.redis_helper.get_hash_field(hash_key,"data")
  369. cache_messages = json.loads(cache_messages_str) if cache_messages_str else []
  370. if len(cache_messages) >= 3:
  371. cache_messages = cache_messages[:-3]
  372. redis_helper.redis_helper.update_hash_field(hash_key,"data",json.dumps(cache_messages,ensure_ascii=False))
  373. messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  374. res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  375. reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"])
  376. else:
  377. reply_content=text
  378. else:
  379. memory.USER_INTERACTIVE_CACHE[wxid] = {
  380. "interactive":False
  381. }
  382. reply_content=res["choices"][0]["message"]["content"]
  383. reply_content='@'+extract_nickname(msg_push_content) + reply_content
  384. gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,reply_content)
  385. gewe_chat.wxchat.save_session_messages_to_cache(hash_key, {"role": "assistant", "content": reply_content})
  386. # 回复的对话
  387. input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
  388. input_message=utils.dialogue_message(wxid,callback_to_user,input_wx_content_dialogue_message,True)
  389. kafka_helper.kafka_client.produce_message(input_message)
  390. logger.info("发送对话 %s",input_message)
  391. else:
  392. logger.info('群聊公开消息')
  393. callback_to_user=msg_data["FromUserName"]["string"]
  394. dialogue_message=[{"type": "text", "text": msg_content}]
  395. input_message=utils.dialogue_message(callback_to_user,wxid,dialogue_message)
  396. kafka_helper.kafka_client.produce_message(input_message)
  397. logger.info("发送对话 %s",input_message)
  398. return
  399. def handle_image(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  400. '''
  401. 图片消息
  402. '''
  403. msg_content=msg_data["Content"]["string"]
  404. callback_to_user=from_wxid
  405. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
  406. wx_img_url=gewe_chat.wxchat.download_image_msg(token_id,app_id,msg_content)
  407. oss_access_key_id="LTAI5tRTG6pLhTpKACJYoPR5"
  408. oss_access_key_secret="E7dMzeeMxq4VQvLg7Tq7uKf3XWpYfN"
  409. oss_endpoint="http://oss-cn-shanghai.aliyuncs.com"
  410. oss_bucket_name="cow-agent"
  411. oss_prefix="cow"
  412. img_url=utils.upload_oss(oss_access_key_id, oss_access_key_secret, oss_endpoint, oss_bucket_name, wx_img_url, oss_prefix)
  413. prompt={
  414. "role": "user",
  415. "content": [{
  416. "type": "image_url",
  417. "image_url": {"url": img_url}
  418. }]
  419. }
  420. gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  421. gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,'已经上传了图片,有什么可以为您服务')
  422. logger.info(f"上传图片 URL: {img_url}")
  423. wx_content_dialogue_message=[{"type": "image_url", "image_url": {"url": img_url}}]
  424. input_message=utils.dialogue_message(wxid,callback_to_user,wx_content_dialogue_message)
  425. kafka_helper.kafka_client.produce_message(input_message)
  426. logger.info("发送对话 %s",input_message)
  427. def handle_image_group(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  428. logger.info('群聊图片消息')
  429. def handle_voice(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  430. '''
  431. 语音消息
  432. '''
  433. callback_to_user=from_wxid
  434. msg_content=msg_data["Content"]["string"]
  435. msg_id=msg_data["MsgId"]
  436. file_url=gewe_chat.wxchat.download_audio_msg(token_id,app_id,msg_id,msg_content)
  437. react_silk_path=utils.save_to_local_from_url(file_url)
  438. react_wav_path = os.path.splitext(react_silk_path)[0] + ".wav"
  439. audio_convert.any_to_wav(react_silk_path,react_wav_path)
  440. react_voice_text=AliVoice().voiceToText(react_wav_path)
  441. os.remove(react_silk_path)
  442. os.remove(react_wav_path)
  443. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
  444. messages=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, {"role": "user", "content": react_voice_text})
  445. ai_res=fast_gpt_api(messages,wxid,callback_to_user)
  446. ai_res_content=remove_markdown_symbol(ai_res["choices"][0]["message"]["content"])
  447. has_url=contains_url(ai_res_content)
  448. if not has_url:
  449. voice_during,voice_url=utils.wx_voice(ai_res_content)
  450. if voice_during < 60 * 1000:
  451. ret,ret_msg,res=gewe_chat.wxchat.post_voice(token_id,app_id,callback_to_user,voice_url,voice_during)
  452. else:
  453. ret,ret_msg,res=gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,ai_res_content)
  454. logger.warning(f'回应语音消息长度 {voice_during/1000}秒,超过60秒,转为文本回复')
  455. if ret==200:
  456. logger.info((f'{wxid} 向 {callback_to_user} 发送语音文本【{ai_res_content}】{ret_msg}'))
  457. else:
  458. logger.warning((f'{wxid} 向 {callback_to_user} 发送语音文本【{ai_res_content}】{ret_msg}'))
  459. ret,ret_msg,res=gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,ai_res_content)
  460. logger.info((f'{wxid} 向 {callback_to_user} 发送文本【{ai_res_content}】{ret_msg}'))
  461. else:
  462. logger.info(f"回复内容包含网址,不发送语音,回复文字内容:{ai_res_content}")
  463. ret,ret_msg,res=gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,ai_res_content)
  464. gewe_chat.wxchat.save_session_messages_to_cache(hash_key, {"role": "assistant", "content": ai_res_content})
  465. # 构造对话消息并发送到 Kafka
  466. input_wx_content_dialogue_message = [{"type": "text", "text": ai_res_content}]
  467. input_message = utils.dialogue_message(wxid, callback_to_user, input_wx_content_dialogue_message,True)
  468. kafka_helper.kafka_client.produce_message(input_message)
  469. logger.info("发送对话 %s", input_message)
  470. def handle_voice_group(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  471. logger.info('语音消息')
  472. def handle_xml(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  473. '''
  474. 处理xml
  475. '''
  476. msg_content_xml=msg_data["Content"]["string"]
  477. root = ET.fromstring(msg_content_xml)
  478. type_value = root.find(".//appmsg/type").text
  479. handlers = {
  480. 57: handle_xml_reference,
  481. }
  482. handler = handlers.get(type_value)
  483. if handler:
  484. return handler(token_id,app_id, wxid,msg_data,from_wxid, to_wxid)
  485. elif "邀请你加入了群聊" in msg_content_xml: # 邀请加入群聊
  486. logger.warning(f"xml消息 {type_value} 邀请你加入了群聊.todo")
  487. else:
  488. print(f"xml消息 {type_value} 未解析")
  489. def handle_xml_reference(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  490. '''
  491. 判断此类消息的逻辑:$.Data.MsgType=49 并且 解析$.Data.Content.string中的xml msg.appmsg.type=57
  492. '''
  493. callback_to_user=from_wxid
  494. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
  495. msg_content= msg_data["PushContent"]
  496. prompt={"role": "user", "content": [{
  497. "type": "text",
  498. "text": msg_content
  499. }]}
  500. # 收到的对话
  501. messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  502. input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  503. input_message=utils.dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message)
  504. kafka_helper.kafka_client.produce_message(input_message)
  505. logger.info("发送对话 %s",input_message)
  506. # 回复的对话
  507. res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  508. reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"])
  509. input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
  510. input_message=utils.dialogue_message(wxid,callback_to_user,input_wx_content_dialogue_message,True)
  511. kafka_helper.kafka_client.produce_message(input_message)
  512. logger.info("发送对话 %s",input_message)
  513. gewe_chat.wxchat.save_session_messages_to_cache(hash_key, {"role": "assistant", "content": reply_content})
  514. gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,reply_content)
  515. def handle_add_friend_notice(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  516. '''
  517. 好友添加请求通知
  518. '''
  519. logger.info('好友添加请求通知')
  520. msg_content_xml=msg_data["Content"]["string"]
  521. root = ET.fromstring(msg_content_xml)
  522. msg_content = root.attrib.get('content', None)
  523. v3= root.attrib.get('encryptusername', None)
  524. v4= root.attrib.get('ticket', None)
  525. scene=root.attrib.get('scene', None)
  526. to_contact_wxid=root.attrib.get('fromusername', None)
  527. wxid=msg_data["ToUserName"]["string"]
  528. # 自动同意好友
  529. # print(v3)
  530. # print(v4)
  531. # print(scene)
  532. # print(msg_content)
  533. # 操作类型,2添加好友 3同意好友 4拒绝好友
  534. #option=2
  535. option=3
  536. reply_add_contact_contact="亲,我是你的好友"
  537. ret,ret_msg=gewe_chat.wxchat.add_contacts(token_id,app_id,scene,option,v3,v4,reply_add_contact_contact)
  538. if ret==200:
  539. logger.info('自动添加好友成功')
  540. # 好友发送的文字
  541. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{to_contact_wxid}'
  542. prompt={"role": "user", "content": [{"type": "text","text": msg_content}]}
  543. messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  544. input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  545. input_message=utils.dialogue_message(to_contact_wxid,wxid,input_wx_content_dialogue_message)
  546. kafka_helper.kafka_client.produce_message(input_message)
  547. logger.info("发送对话 %s",input_message)
  548. callback_to_user=to_contact_wxid
  549. res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  550. reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"])
  551. #保存好友信息
  552. gewe_chat.wxchat.save_contacts_brief_to_cache(token_id,app_id, wxid,[to_contact_wxid])
  553. # 保存到缓存
  554. gewe_chat.wxchat.save_session_messages_to_cache(hash_key, {"role": "assistant", "content": reply_content})
  555. # 发送信息
  556. gewe_chat.wxchat.post_text(token_id,app_id, to_contact_wxid,reply_content)
  557. # 发送到kafka
  558. input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
  559. input_message=utils.dialogue_message(wxid,to_contact_wxid,input_wx_content_dialogue_message,True)
  560. kafka_helper.kafka_client.produce_message(input_message)
  561. logger.info("发送对话 %s",input_message)
  562. else:
  563. logger.warning("添加好友失败")
  564. def handle_10002_msg(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  565. '''
  566. 群聊邀请
  567. 撤回消息
  568. 拍一拍消息
  569. 地理位置
  570. 踢出群聊通知
  571. 解散群聊通知
  572. 发布群公告
  573. '''
  574. msg_content_xml=msg_data["Content"]["string"]
  575. # 群聊邀请
  576. if '邀请你加入了群聊' in msg_content_xml and check_chatroom(msg_data["FromUserName"]["string"]):
  577. chatroom_id=msg_data["FromUserName"]["string"]
  578. ret,msg,data=gewe_chat.wxchat.save_contract_list(token_id,app_id,chatroom_id,3)
  579. logger.info(f'群聊邀请,保存到通讯录 chatroom_id {chatroom_id} {msg}')
  580. gewe_chat.wxchat.update_group_info_to_cache(token_id,app_id,wxid,chatroom_id)
  581. gewe_chat.wxchat.update_group_members_to_cache(token_id,app_id,wxid,chatroom_id)
  582. if '移出了群聊' in msg_content_xml and 'sysmsgtemplate' in msg_content_xml :
  583. chatroom_id=msg_data["FromUserName"]["string"]
  584. ret,msg,data=gewe_chat.wxchat.save_contract_list(token_id,app_id,chatroom_id,2)
  585. logger.info(f'踢出群聊,移除从通讯录 chatroom_id {chatroom_id} {msg}')
  586. redis_helper.redis_helper.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id)
  587. logger.info(f'清除 chatroom_id{chatroom_id} 数据')
  588. if '已解散该群聊' in msg_content_xml and 'sysmsgtemplate' in msg_content_xml :
  589. chatroom_id=msg_data["FromUserName"]["string"]
  590. ret,msg,data=gewe_chat.wxchat.save_contract_list(token_id,app_id,chatroom_id,2)
  591. logger.info(f'解散群聊,移除从通讯录 chatroom_id {chatroom_id} {msg}')
  592. redis_helper.redis_helper.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id)
  593. logger.info(f'清除 chatroom_id{chatroom_id} 数据')
  594. print('撤回消息,拍一拍消息,地理位置')
  595. def handle_mod_contacts(token_id,app_id,wxid,msg_data):
  596. '''
  597. 好友通过验证及好友资料变更的通知消息
  598. '''
  599. logger.info('好友通过验证及好友资料变更的通知消息')
  600. if not check_chatroom(msg_data["UserName"]["string"]):
  601. contact_wxid = msg_data["UserName"]["string"]
  602. # 更新好友信息
  603. # 检查好友关系,不是好友则删除
  604. # ret,msg,check_relation=gewe_chat.wxchat.check_relation(token_id, app_id,[contact_wxid])
  605. # first_item = check_relation[0]
  606. # check_relation_status=first_item.get('relation')
  607. # logger.info(f'{wxid} 好友 {contact_wxid} 关系检查:{check_relation_status}')
  608. # if check_relation_status != 0:
  609. # gewe_chat.wxchat.delete_contacts_brief_from_cache(wxid, [contact_wxid])
  610. # logger.info(f'好友关系异常:{check_relation_status},删除好友 {contact_wxid} 信息')
  611. # else:
  612. # gewe_chat.wxchat.save_contacts_brief_to_cache(token_id, app_id, wxid, [contact_wxid])
  613. ret,msg,contacts_list = gewe_chat.wxchat.fetch_contacts_list(token_id, app_id)
  614. friend_wxids = contacts_list['friends'][3:] # 可以调整截取范围
  615. print(friend_wxids)
  616. #friend_wxids.remove('weixin')
  617. gewe_chat.wxchat.save_contacts_brief_to_cache(token_id, app_id, wxid, friend_wxids)
  618. else:
  619. logger.info('群聊好友通过验证及好友资料变更的通知消息')
  620. def get_messages_from_cache(hash_key,object:object)->list:
  621. '''
  622. 对话列表
  623. '''
  624. messages=redis_helper.redis_helper.get_hash(hash_key)
  625. wxid=hash_key.split(':')[-1]
  626. if not messages:
  627. messages=[{"role": "system", "content": ""}]
  628. messages.append(object)
  629. redis_helper.redis_helper.set_hash(hash_key,{"data":json.dumps(messages,ensure_ascii=False)},3600)
  630. else:
  631. messages_str=redis_helper.redis_helper.get_hash_field(hash_key,"data")
  632. messages = json.loads(messages_str) if messages_str else []
  633. #判断是否含有图片
  634. last_message = messages[-1]
  635. content = last_message.get("content", [])
  636. if isinstance(content, list) and content:
  637. last_content_type = content[-1].get("type")
  638. if last_content_type == 'image_url':
  639. content.append(object['content'][0])
  640. messages[-1]['content']=content
  641. else:
  642. messages.append(object)
  643. else:
  644. messages.append(object)
  645. redis_helper.redis_helper.set_hash(hash_key,{"data":json.dumps(messages,ensure_ascii=False)},3600)
  646. return messages
  647. def fast_gpt_api(messages:list,wixd:str,friend_wxid:str):
  648. c=gewe_chat.wxchat.get_wxchat_config_from_cache(wixd)
  649. api_key=c.get('agentTokenId',"sk-jr69ONIehfGKe9JFphuNk4DU5Y5wooHKHhQv7oSnFzVbwCnW65fXO9kvH")
  650. print(f'流程key:{api_key}\n')
  651. #api_key="sk-jr69ONIehfGKe9JFphuNk4DU5Y5wooHKHhQv7oSnFzVbwCnW65fXO9kvH" #测试
  652. #api_key="sk-uJDBdKmJVb2cmfldGOvlIY6Qx0AzqWMPD3lS1IzgQYzHNOXv9SKNI" #开发2
  653. api_url = "http://106.15.182.218:3000/api/v1/chat/completions"
  654. headers = {
  655. "Content-Type": "application/json",
  656. "Authorization": f"Bearer {api_key}"
  657. }
  658. session_id=f'{wixd}-{friend_wxid}'
  659. data={
  660. "model": "",
  661. "messages":messages,
  662. "chatId": session_id,
  663. "detail": True
  664. }
  665. #print(json.dumps(data,ensure_ascii=False))
  666. logger.info("[CHATGPT] 请求={}".format(json.dumps(data, ensure_ascii=False)))
  667. response = requests.post(url=api_url, headers=headers, data=json.dumps(data), timeout=600)
  668. response.raise_for_status()
  669. response_data = response.json()
  670. logger.info("[CHATGPT] 响应={}".format(json.dumps(response_data, separators=(',', ':'),ensure_ascii=False)))
  671. #print(response_data)
  672. return response_data
  673. def get_token_id_by_app_id(app_id: str) -> str:
  674. # 使用 SCAN 避免一次性返回所有的匹配键,逐步扫描
  675. cursor = 0
  676. while True:
  677. cursor, login_keys = redis_helper.redis_helper.client.scan(cursor, match='__AI_OPS_WX__:LOGININFO:*')
  678. # 批量获取所有键的 hash 数据
  679. for k in login_keys:
  680. r = redis_helper.redis_helper.get_hash(k)
  681. if r.get("appId") == app_id:
  682. return r.get("tokenId", "")
  683. # 如果游标为 0,则表示扫描完成
  684. if cursor == 0:
  685. break
  686. return ""
  687. def get_login_info_by_app_id(app_id: str) ->dict:
  688. # 使用 SCAN 避免一次性返回所有的匹配键,逐步扫描
  689. cursor = 0
  690. while True:
  691. cursor, login_keys = redis_helper.redis_helper.client.scan(cursor, match='__AI_OPS_WX__:LOGININFO:*')
  692. # 批量获取所有键的 hash 数据
  693. for k in login_keys:
  694. r = redis_helper.redis_helper.get_hash(k)
  695. if r.get("appId") == app_id:
  696. return k,r
  697. # 如果游标为 0,则表示扫描完成
  698. if cursor == 0:
  699. break
  700. return ""
  701. def contains_url(text):
  702. # 定义检测网址的正则表达式
  703. url_pattern = re.compile(
  704. r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+'
  705. )
  706. # 检查字符串是否包含网址
  707. return bool(url_pattern.search(text))
  708. def get_first_char_if_digit(s):
  709. if s and s[0].isdigit(): # 判断字符串是否非空且首字符为数字
  710. return int(s[0]) # 返回数字形式
  711. return None # 如果不是数字则返回 None
  712. def remove_at_mention_regex(text):
  713. # 使用正则表达式去掉“在群聊中@了你”
  714. return re.sub(r"在群聊中@了你", "", text)
  715. def extract_nickname(text)->str:
  716. if "在群聊中@了你" in text:
  717. # 如果包含 "在群聊中@了你",提取其前面的名字
  718. match = re.search(r"^(.*?)在群聊中@了你", text)
  719. if match:
  720. return match.group(1).strip()
  721. elif ": @" in text:
  722. # 如果包含 ": @",提取其前面的名字
  723. return text.split(": @")[0].strip()
  724. return ''
  725. def check_chatroom(userName):
  726. pattern = r'^\d+@chatroom$'
  727. if re.match(pattern, userName):
  728. return True
  729. return False
  730. # def remove_markdown_symbol(text: str):
  731. # # 移除markdown格式,目前先移除**
  732. # if not text or not isinstance(text, str):
  733. # return text
  734. # return re.sub(r'\*\*(.*?)\*\*', r'\1', text)
  735. def remove_markdown_symbol(text: str):
  736. # 移除markdown格式,目前先移除**
  737. if not text or not isinstance(text, str):
  738. return text
  739. # 去除加粗、斜体等格式
  740. #text = re.sub(r'\*\*([^*]+)\*\*', r'\1', text) # 去除加粗
  741. text=re.sub(r'\*\*(.*?)\*\*', r'\1', text)
  742. text = re.sub(r'\*([^*]+)\*', r'\1', text) # 去除斜体
  743. text = re.sub(r'__([^_]+)__', r'\1', text) # 去除加粗(下划线)
  744. text = re.sub(r'_(.*?)_', r'\1', text) # 去除斜体(下划线)
  745. # 去除行内代码块
  746. text = re.sub(r'`([^`]+)`', r'\1', text)
  747. # 去除换行符\n,或者多余的空格
  748. #text = re.sub(r'\n+', ' ', text)
  749. # 去除列表编号等
  750. #text = re.sub(r'^\d+\.\s*', '', text, flags=re.MULTILINE)
  751. text = re.sub('[\\\`\*\_\[\]\#\+\-\!\>]', '', text)
  752. print(text)
  753. return text