You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1048 satır
45KB

  1. from flask_restful import Resource, reqparse
  2. from flask import jsonify,request
  3. import requests,json,re
  4. from wechat import gewe_chat
  5. from voice.ali.ali_voice import AliVoice
  6. from common import utils,redis_helper,memory,kafka_helper
  7. from common.log import logger
  8. import xml.etree.ElementTree as ET
  9. import threading,time
  10. from model import Models
  11. import os
  12. from voice import audio_convert
  13. timeout_duration = 8.0
  14. class MessagesResource(Resource):
  15. def __init__(self):
  16. self.parser = reqparse.RequestParser()
  17. def post(self):
  18. msg = request.get_json()
  19. logger.info(f"收到微信回调消息: {json.dumps(msg, separators=(',', ':'),ensure_ascii=False)}")
  20. type_name =msg.get("TypeName")
  21. app_id = msg.get("Appid")
  22. # token_id = "f828cb3c-1039-489f-b9ae-7494d1778a15"
  23. token_id=get_token_id_by_app_id(app_id)
  24. if token_id=="":
  25. logger.warning('找不到登录信息,不处理')
  26. return jsonify({"message": "收到微信回调消息"})
  27. wxid = msg.get("Wxid",'')
  28. # 自发命令
  29. # if type_name=='AddMsg':
  30. # from_wxid = msg_data["FromUserName"]["string"]
  31. # if wxid=='from_wxid':
  32. # pass
  33. if type_name == 'AddMsg':
  34. handle_self_cmd(wxid,msg)
  35. wx_config = gewe_chat.wxchat.get_wxchat_config_from_cache(wxid)
  36. # if not bool(wx_config.get("agentEnabled",False)):
  37. # logger.info(f'微信ID {wxid} 未托管,不处理')
  38. # return jsonify({"message": "收到微信回调消息"})
  39. if type_name=='AddMsg':
  40. if not bool(wx_config.get("agentEnabled",False)):
  41. logger.info(f'微信ID {wxid} 未托管,不处理')
  42. return jsonify({"message": "收到微信回调消息"})
  43. #wxid = msg.get("Wxid")
  44. msg_data = msg.get("Data")
  45. msg_type = msg_data.get("MsgType",None)
  46. from_wxid = msg_data["FromUserName"]["string"]
  47. to_wxid = msg_data["ToUserName"]["string"]
  48. msg_push_content=msg_data.get("PushContent") #群发
  49. #wx_config = gewe_chat.wxchat.get_wxchat_config_from_cache(wxid)
  50. handlers = {
  51. 1: handle_text,
  52. 3: handle_image,
  53. 34: handle_voice,
  54. 42: handle_name_card,
  55. 49: handle_xml,
  56. 37: handle_add_friend_notice,
  57. 10002: handle_10002_msg
  58. }
  59. # (扫码进群情况)判断受否是群聊,并添加到通信录
  60. if check_chatroom(from_wxid) or check_chatroom(to_wxid):
  61. logger.info('群信息')
  62. chatroom_id=from_wxid
  63. ret,msg,data=gewe_chat.wxchat.save_contract_list(token_id,app_id,chatroom_id,3)
  64. logger.info(f'保存到通讯录 chatroom_id {chatroom_id} {msg}')
  65. gewe_chat.wxchat.update_group_info_to_cache(token_id,app_id,wxid,chatroom_id)
  66. gewe_chat.wxchat.update_group_members_to_cache(token_id,app_id,wxid,chatroom_id)
  67. handlers[1]=handle_text_group
  68. handlers[3]=handle_image_group
  69. handlers[34]=handle_voice_group
  70. handler = handlers.get(msg_type)
  71. if handler:
  72. return handler(token_id,app_id, wxid,msg_data,from_wxid, to_wxid)
  73. else:
  74. logger.warning(f"微信回调消息类型 {msg_type} 未处理")
  75. elif type_name=='ModContacts':
  76. '''
  77. 好友通过验证及好友资料变更的通知消息
  78. '''
  79. wxid = msg.get("Wxid")
  80. msg_data = msg.get("Data")
  81. #
  82. #handle_mod_contacts(token_id,app_id,wxid,msg_data)
  83. #
  84. threading.Thread(target=handle_mod_contacts, args=(token_id,app_id,wxid,msg_data)).start()
  85. elif type_name=="DelContacts":
  86. '''
  87. 删除好友通知/退出群聊
  88. '''
  89. msg_data = msg.get("Data")
  90. username=msg_data["UserName"]["string"]
  91. if check_chatroom(username):
  92. logger.info('退出群聊')
  93. wxid = msg.get("Wxid")
  94. chatroom_id=username
  95. redis_helper.redis_helper.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id)
  96. logger.info(f'清除 chatroom_id{chatroom_id} 数据')
  97. else:
  98. logger.info('删除好友通知')
  99. elif type_name=="Offline":
  100. '''
  101. 已经离线
  102. '''
  103. wxid = msg.get("Wxid")
  104. logger.warning(f'微信ID {wxid}在设备{app_id}已经离线')
  105. k,r=get_login_info_by_app_id(app_id)
  106. print(k)
  107. redis_helper.redis_helper.update_hash_field(k,'status',0)
  108. redis_helper.redis_helper.update_hash_field(k,'modify_at',int(time.time()))
  109. else:
  110. logger.warning(f"未知消息类型")
  111. return jsonify({"message": "收到微信回调消息"})
  112. def handle_name_card(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  113. logger.info('名片消息')
  114. try:
  115. msg_content_xml=msg_data["Content"]["string"]
  116. # 解析XML字符串
  117. root = ET.fromstring(msg_content_xml)
  118. # 提取alias属性
  119. alias_value = root.get("alias")
  120. # 加好友资料
  121. scene = int(root.get("scene"))
  122. v3 = root.get("username")
  123. v4 = root.get("antispamticket")
  124. logger.info(f"alias_value: {alias_value}, scene: {scene}, v3: {v3}, v4: {v4}")
  125. # 判断appid 是否已经创建3天
  126. k, login_info = utils.get_login_info_by_wxid(wxid)
  127. creation_timestamp=int(login_info.get('create_at',time.time()))
  128. current_timestamp = time.time()
  129. three_days_seconds = 3 * 24 * 60 * 60 # 三天的秒数
  130. diff_flag=(current_timestamp - creation_timestamp) >= three_days_seconds
  131. if not diff_flag:
  132. log_content=f'名片添加好友功能,{wxid} 用户创建不够三天,不能使用该功能'
  133. logger.warning(log_content)
  134. response=jsonify({'code': 401, 'message':log_content})
  135. response.status_code = 401
  136. return response
  137. # 将加好友资料添加到待加好友队列
  138. #gewe_chat.wxchat.enqueue_to_add_contacts(wxid,scene,v3,v4)
  139. _,loginfo=utils.get_login_info_by_wxid(wxid)
  140. nickname=loginfo.get('nickName')
  141. add_contact_content=f'您好,我是{nickname}'
  142. #gewe_chat.wxchat.add_contacts(token_id,app_id,scene,Models.OperationType.ADD_FRIEND,v3,v4,add_contact_content)
  143. gewe_chat.wxchat.add_contacts(token_id,app_id,scene,Models.OperationType.ADD_FRIEND.value,v3,v4,add_contact_content)
  144. except ET.ParseError as e:
  145. logger.error(f"XML解析错误: {e}")
  146. except KeyError as e:
  147. logger.error(f"字典键错误: {e}")
  148. except Exception as e:
  149. logger.error(f"未知错误: {e}")
  150. def handle_self_cmd(wxid,msg):
  151. '''
  152. 个人微信命令处理
  153. 如果是个人微信的指令,wxid == from_wxid
  154. commands = {
  155. '启用托管': True,
  156. '关闭托管': False
  157. }
  158. '''
  159. msg_data=msg.get("Data")
  160. from_wxid=msg_data["FromUserName"]["string"]
  161. msg_content=msg_data["Content"]["string"]
  162. if wxid == from_wxid:
  163. commands = {
  164. '启用托管': True,
  165. '关闭托管': False
  166. }
  167. if msg_content in commands:
  168. c_dict = gewe_chat.wxchat.get_wxchat_config_from_cache(wxid)
  169. if c_dict:
  170. from model import Models
  171. agent_config=Models.AgentConfig.model_validate(c_dict)
  172. agent_config.agentEnabled=commands[msg_content]
  173. gewe_chat.wxchat.save_wxchat_config(wxid, agent_config.model_dump())
  174. logger.info(f'{wxid} {"启动" if commands[msg_content] else "关闭"}托管')
  175. def handle_text(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  176. '''
  177. 私聊文本消息
  178. '''
  179. msg_content=msg_data["Content"]["string"]
  180. if wxid == from_wxid: #手动发送消息
  181. logger.info("Active message sending detected")
  182. gewe_chat.wxchat.save_contacts_brief_to_cache(token_id,app_id,wxid,[to_wxid])
  183. callback_to_user=msg_data["ToUserName"]["string"]
  184. input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  185. input_message=utils.dialogue_message(from_wxid,to_wxid,input_wx_content_dialogue_message)
  186. kafka_helper.kafka_client.produce_message(input_message)
  187. logger.info("发送对话 %s",input_message)
  188. else:
  189. callback_to_user=msg_data["FromUserName"]["string"]
  190. # hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
  191. # prompt={"role": "user", "content": [{
  192. # "type": "text",
  193. # "text": msg_content
  194. # }]}
  195. # messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  196. # # 收到的对话
  197. # input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  198. # input_message=utils.dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message)
  199. # kafka_helper.kafka_client.produce_message(input_message)
  200. # logger.info("发送对话 %s",input_message)
  201. # cache_data = memory.USER_INTERACTIVE_CACHE.get(wxid)
  202. # if cache_data and cache_data.get('interactive') :
  203. # o=get_first_char_if_digit(msg_content)
  204. # if o is not None:
  205. # userSelectOptions=cache_data.get('options')
  206. # if o < len(userSelectOptions):
  207. # o=o-1
  208. # msg_content=userSelectOptions[o].get("value")
  209. # messages_to_send=[{"role": "user", "content": msg_content}]
  210. # else:
  211. # messages_to_send=[{"role": "user", "content": msg_content}]
  212. # else:
  213. # messages_to_send=[{"role": "user", "content": msg_content}]
  214. # res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  215. # reply_content=res["choices"][0]["message"]["content"]
  216. # description = ''
  217. # userSelectOptions = []
  218. # if isinstance(reply_content, list) and any(item.get("type") == "interactive" for item in reply_content):
  219. # for item in reply_content:
  220. # if item["type"] == "interactive" and item["interactive"]["type"] == "userSelect":
  221. # params = item["interactive"]["params"]
  222. # description = params.get("description")
  223. # userSelectOptions = params.get("userSelectOptions", [])
  224. # values_string = "\n".join(option["value"] for option in userSelectOptions)
  225. # if description is not None:
  226. # memory.USER_INTERACTIVE_CACHE[wxid] = {
  227. # "interactive":True,
  228. # "options": userSelectOptions,
  229. # }
  230. # reply_content=description + '------------------------------\n'+values_string
  231. # elif isinstance(reply_content, list) and any(item.get("type") == "text" for item in reply_content):
  232. # memory.USER_INTERACTIVE_CACHE[wxid] = {
  233. # "interactive":False
  234. # }
  235. # text=''
  236. # for item in reply_content:
  237. # if item["type"] == "text":
  238. # text=item["text"]["content"]
  239. # if text=='':
  240. # # 去除上次上一轮对话再次请求
  241. # cache_messages_str=redis_helper.redis_helper.get_hash_field(hash_key,"data")
  242. # cache_messages = json.loads(cache_messages_str) if cache_messages_str else []
  243. # if len(cache_messages) >= 3:
  244. # cache_messages = cache_messages[:-3]
  245. # redis_helper.redis_helper.update_hash_field(hash_key,"data",json.dumps(cache_messages,ensure_ascii=False))
  246. # messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  247. # res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  248. # reply_content=res["choices"][0]["message"]["content"]
  249. # if isinstance(reply_content, list) :
  250. # reply_content=reply_content[0].get('text').get("content")
  251. # else:
  252. # reply_content=text
  253. # else:
  254. # memory.USER_INTERACTIVE_CACHE[wxid] = {
  255. # "interactive":False
  256. # }
  257. # reply_content=res["choices"][0]["message"]["content"]
  258. # gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,reply_content)
  259. # gewe_chat.wxchat.save_session_messages_to_cache(hash_key, {"role": "assistant", "content": reply_content})
  260. # # 回复的对话
  261. # input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
  262. # input_message=utils.dialogue_message(wxid,callback_to_user,input_wx_content_dialogue_message,True)
  263. # kafka_helper.kafka_client.produce_message(input_message)
  264. # logger.info("发送对话 %s",input_message)
  265. # 创建并启动任务线程,将参数传递给 ai_chat_text 函数
  266. task_thread = threading.Thread(
  267. target=ai_chat_text,
  268. args=(token_id,app_id,wxid,msg_data,msg_content)
  269. )
  270. task_thread.start()
  271. # 设置定时器,1秒后检查任务是否超时。这里需要使用 lambda 来传递参数
  272. timeout_timer = threading.Timer(
  273. timeout_duration,
  274. lambda:check_timeout(task_thread, token_id, wxid,app_id, callback_to_user)
  275. )
  276. timeout_timer.start()
  277. # 等待任务线程完成
  278. task_thread.join()
  279. # 取消定时器
  280. timeout_timer.cancel()
  281. def check_timeout(task_thread:threading.Thread, token_id,wxid, app_id, callback_to_user):
  282. if task_thread.is_alive():
  283. print(f"任务运行时间超过{timeout_duration}秒,token_id={token_id}, app_id={app_id}, callback_to_user={callback_to_user}")
  284. wx_config = gewe_chat.wxchat.get_wxchat_config_from_cache(wxid)
  285. if bool(wx_config.get("chatWaitingMsgEnabled",True)):
  286. gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,"亲,我正在组织回复的信息,请稍等一会")
  287. def ai_chat_text(token_id,app_id,wxid,msg_data,msg_content):
  288. start_time = time.time() # 记录任务开始时间
  289. callback_to_user=msg_data["FromUserName"]["string"]
  290. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
  291. prompt={"role": "user", "content": [{
  292. "type": "text",
  293. "text": msg_content
  294. }]}
  295. messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  296. # 收到的对话
  297. input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  298. input_message=utils.dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message)
  299. kafka_helper.kafka_client.produce_message(input_message)
  300. logger.info("发送对话 %s",input_message)
  301. cache_data = memory.USER_INTERACTIVE_CACHE.get(wxid)
  302. if cache_data and cache_data.get('interactive') :
  303. o=get_first_char_if_digit(msg_content)
  304. if o is not None:
  305. userSelectOptions=cache_data.get('options')
  306. if o < len(userSelectOptions):
  307. o=o-1
  308. msg_content=userSelectOptions[o].get("value")
  309. messages_to_send=[{"role": "user", "content": msg_content}]
  310. else:
  311. messages_to_send=[{"role": "user", "content": msg_content}]
  312. else:
  313. messages_to_send=[{"role": "user", "content": msg_content}]
  314. res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  315. reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"])
  316. description = ''
  317. userSelectOptions = []
  318. if isinstance(reply_content, list) and any(item.get("type") == "interactive" for item in reply_content):
  319. for item in reply_content:
  320. if item["type"] == "interactive" and item["interactive"]["type"] == "userSelect":
  321. params = item["interactive"]["params"]
  322. description = params.get("description")
  323. userSelectOptions = params.get("userSelectOptions", [])
  324. values_string = "\n".join(option["value"] for option in userSelectOptions)
  325. if description is not None:
  326. memory.USER_INTERACTIVE_CACHE[wxid] = {
  327. "interactive":True,
  328. "options": userSelectOptions,
  329. }
  330. reply_content=description + '------------------------------\n'+values_string
  331. elif isinstance(reply_content, list) and any(item.get("type") == "text" for item in reply_content):
  332. memory.USER_INTERACTIVE_CACHE[wxid] = {
  333. "interactive":False
  334. }
  335. text=''
  336. for item in reply_content:
  337. if item["type"] == "text":
  338. text=item["text"]["content"]
  339. if text=='':
  340. # 去除上次上一轮对话再次请求
  341. cache_messages_str=redis_helper.redis_helper.get_hash_field(hash_key,"data")
  342. cache_messages = json.loads(cache_messages_str) if cache_messages_str else []
  343. if len(cache_messages) >= 3:
  344. cache_messages = cache_messages[:-3]
  345. redis_helper.redis_helper.update_hash_field(hash_key,"data",json.dumps(cache_messages,ensure_ascii=False))
  346. messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  347. res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  348. reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"])
  349. if isinstance(reply_content, list) :
  350. reply_content=remove_markdown_symbol(reply_content[0].get('text').get("content"))
  351. else:
  352. reply_content=text
  353. else:
  354. memory.USER_INTERACTIVE_CACHE[wxid] = {
  355. "interactive":False
  356. }
  357. reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"])
  358. gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,reply_content)
  359. gewe_chat.wxchat.save_session_messages_to_cache(hash_key, {"role": "assistant", "content": reply_content})
  360. # 回复的对话
  361. input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
  362. input_message=utils.dialogue_message(wxid,callback_to_user,input_wx_content_dialogue_message,True)
  363. kafka_helper.kafka_client.produce_message(input_message)
  364. logger.info("发送对话 %s",input_message)
  365. end_time = time.time() # 记录任务结束时间
  366. execution_time = end_time - start_time # 计算执行时间
  367. logger.info(f"AI回答任务完成,耗时 {execution_time:.2f} 秒")
  368. def handle_text_group(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  369. '''
  370. 群聊文本消息
  371. '''
  372. msg_content=msg_data["Content"]["string"]
  373. msg_push_content=msg_data.get("PushContent")
  374. k,login_info=get_login_info_by_app_id(app_id)
  375. nickname=login_info.get("nickName")
  376. if wxid == from_wxid: #手动发送消息
  377. logger.info("Active message sending detected")
  378. gewe_chat.wxchat.save_contacts_brief_to_cache(token_id,app_id,wxid,[to_wxid])
  379. callback_to_user=msg_data["ToUserName"]["string"]
  380. input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  381. input_message=utils.dialogue_message(from_wxid,to_wxid,input_wx_content_dialogue_message)
  382. kafka_helper.kafka_client.produce_message(input_message)
  383. logger.info("发送对话 %s",input_message)
  384. else:
  385. c = gewe_chat.wxchat.get_wxchat_config_from_cache(wxid)
  386. chatroom_id_white_list = c.get("chatroomIdWhiteList", [])
  387. if not chatroom_id_white_list:
  388. logger.info('白名单为空或未定义,不处理')
  389. return
  390. if from_wxid not in chatroom_id_white_list:
  391. logger.info(f'群ID {from_wxid} 不在白名单中,不处理')
  392. return
  393. if '在群聊中@了你' in msg_push_content or '@'+nickname in msg_push_content:
  394. callback_to_user=msg_data["FromUserName"]["string"]
  395. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
  396. prompt={"role": "user", "content": [{
  397. "type": "text",
  398. "text": msg_content
  399. }]}
  400. messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  401. # 收到的对话
  402. input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  403. input_message=utils.dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message)
  404. kafka_helper.kafka_client.produce_message(input_message)
  405. logger.info("发送对话 %s",input_message)
  406. cache_data = memory.USER_INTERACTIVE_CACHE.get(wxid)
  407. if cache_data and cache_data.get('interactive') :
  408. o=get_first_char_if_digit(msg_content)
  409. if o is not None:
  410. userSelectOptions=cache_data.get('options')
  411. if o < len(userSelectOptions):
  412. o=o-1
  413. msg_content=userSelectOptions[o].get("value")
  414. messages_to_send=[{"role": "user", "content": msg_content}]
  415. else:
  416. messages_to_send=[{"role": "user", "content": msg_content}]
  417. else:
  418. messages_to_send=[{"role": "user", "content": msg_content}]
  419. res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  420. reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"])
  421. description = ''
  422. userSelectOptions = []
  423. if isinstance(reply_content, list) and any(item.get("type") == "interactive" for item in reply_content):
  424. for item in reply_content:
  425. if item["type"] == "interactive" and item["interactive"]["type"] == "userSelect":
  426. params = item["interactive"]["params"]
  427. description = params.get("description")
  428. userSelectOptions = params.get("userSelectOptions", [])
  429. values_string = "\n".join(option["value"] for option in userSelectOptions)
  430. if description is not None:
  431. memory.USER_INTERACTIVE_CACHE[wxid] = {
  432. "interactive":True,
  433. "options": userSelectOptions,
  434. }
  435. reply_content=description + '------------------------------\n'+values_string
  436. elif isinstance(reply_content, list) and any(item.get("type") == "text" for item in reply_content):
  437. memory.USER_INTERACTIVE_CACHE[wxid] = {
  438. "interactive":False
  439. }
  440. text=''
  441. for item in reply_content:
  442. if item["type"] == "text":
  443. text=item["text"]["content"]
  444. if text=='':
  445. # 去除上次上一轮对话再次请求
  446. cache_messages_str=redis_helper.redis_helper.get_hash_field(hash_key,"data")
  447. cache_messages = json.loads(cache_messages_str) if cache_messages_str else []
  448. if len(cache_messages) >= 3:
  449. cache_messages = cache_messages[:-3]
  450. redis_helper.redis_helper.update_hash_field(hash_key,"data",json.dumps(cache_messages,ensure_ascii=False))
  451. messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  452. res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  453. reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"])
  454. else:
  455. reply_content=text
  456. else:
  457. memory.USER_INTERACTIVE_CACHE[wxid] = {
  458. "interactive":False
  459. }
  460. reply_content=res["choices"][0]["message"]["content"]
  461. reply_content='@'+extract_nickname(msg_push_content) + reply_content
  462. gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,reply_content)
  463. gewe_chat.wxchat.save_session_messages_to_cache(hash_key, {"role": "assistant", "content": reply_content})
  464. # 回复的对话
  465. input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
  466. input_message=utils.dialogue_message(wxid,callback_to_user,input_wx_content_dialogue_message,True)
  467. kafka_helper.kafka_client.produce_message(input_message)
  468. logger.info("发送对话 %s",input_message)
  469. else:
  470. logger.info('群聊公开消息')
  471. callback_to_user=msg_data["FromUserName"]["string"]
  472. dialogue_message=[{"type": "text", "text": msg_content}]
  473. input_message=utils.dialogue_message(callback_to_user,wxid,dialogue_message)
  474. kafka_helper.kafka_client.produce_message(input_message)
  475. logger.info("发送对话 %s",input_message)
  476. return
  477. def handle_image(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  478. '''
  479. 图片消息
  480. '''
  481. msg_content=msg_data["Content"]["string"]
  482. callback_to_user=from_wxid
  483. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
  484. wx_img_url=gewe_chat.wxchat.download_image_msg(token_id,app_id,msg_content)
  485. oss_access_key_id="LTAI5tRTG6pLhTpKACJYoPR5"
  486. oss_access_key_secret="E7dMzeeMxq4VQvLg7Tq7uKf3XWpYfN"
  487. oss_endpoint="http://oss-cn-shanghai.aliyuncs.com"
  488. oss_bucket_name="cow-agent"
  489. oss_prefix="cow"
  490. img_url=utils.upload_oss(oss_access_key_id, oss_access_key_secret, oss_endpoint, oss_bucket_name, wx_img_url, oss_prefix)
  491. prompt={
  492. "role": "user",
  493. "content": [{
  494. "type": "image_url",
  495. "image_url": {"url": img_url}
  496. }]
  497. }
  498. gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  499. gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,'已经上传了图片,有什么可以为您服务')
  500. logger.info(f"上传图片 URL: {img_url}")
  501. wx_content_dialogue_message=[{"type": "image_url", "image_url": {"url": img_url}}]
  502. input_message=utils.dialogue_message(wxid,callback_to_user,wx_content_dialogue_message)
  503. kafka_helper.kafka_client.produce_message(input_message)
  504. logger.info("发送对话 %s",input_message)
  505. def handle_image_group(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  506. logger.info('群聊图片消息')
  507. def handle_voice(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  508. '''
  509. 语音消息
  510. '''
  511. callback_to_user=from_wxid
  512. msg_content=msg_data["Content"]["string"]
  513. msg_id=msg_data["MsgId"]
  514. file_url=gewe_chat.wxchat.download_audio_msg(token_id,app_id,msg_id,msg_content)
  515. react_silk_path=utils.save_to_local_from_url(file_url)
  516. react_wav_path = os.path.splitext(react_silk_path)[0] + ".wav"
  517. audio_convert.any_to_wav(react_silk_path,react_wav_path)
  518. react_voice_text=AliVoice().voiceToText(react_wav_path)
  519. os.remove(react_silk_path)
  520. os.remove(react_wav_path)
  521. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
  522. messages=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, {"role": "user", "content": react_voice_text})
  523. ai_res=fast_gpt_api(messages,wxid,callback_to_user)
  524. ai_res_content=remove_markdown_symbol(ai_res["choices"][0]["message"]["content"])
  525. has_url=contains_url(ai_res_content)
  526. if not has_url:
  527. voice_during,voice_url=utils.wx_voice(ai_res_content)
  528. if voice_during < 60 * 1000:
  529. ret,ret_msg,res=gewe_chat.wxchat.post_voice(token_id,app_id,callback_to_user,voice_url,voice_during)
  530. else:
  531. ret,ret_msg,res=gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,ai_res_content)
  532. logger.warning(f'回应语音消息长度 {voice_during/1000}秒,超过60秒,转为文本回复')
  533. if ret==200:
  534. logger.info((f'{wxid} 向 {callback_to_user} 发送语音文本【{ai_res_content}】{ret_msg}'))
  535. else:
  536. logger.warning((f'{wxid} 向 {callback_to_user} 发送语音文本【{ai_res_content}】{ret_msg}'))
  537. ret,ret_msg,res=gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,ai_res_content)
  538. logger.info((f'{wxid} 向 {callback_to_user} 发送文本【{ai_res_content}】{ret_msg}'))
  539. else:
  540. logger.info(f"回复内容包含网址,不发送语音,回复文字内容:{ai_res_content}")
  541. ret,ret_msg,res=gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,ai_res_content)
  542. gewe_chat.wxchat.save_session_messages_to_cache(hash_key, {"role": "assistant", "content": ai_res_content})
  543. # 构造对话消息并发送到 Kafka
  544. input_wx_content_dialogue_message = [{"type": "text", "text": ai_res_content}]
  545. input_message = utils.dialogue_message(wxid, callback_to_user, input_wx_content_dialogue_message,True)
  546. kafka_helper.kafka_client.produce_message(input_message)
  547. logger.info("发送对话 %s", input_message)
  548. def handle_voice_group(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  549. logger.info('语音消息')
  550. def handle_xml(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  551. '''
  552. 处理xml
  553. '''
  554. msg_content_xml=msg_data["Content"]["string"]
  555. root = ET.fromstring(msg_content_xml)
  556. type_value = int(root.find(".//appmsg/type").text)
  557. handlers = {
  558. 57: handle_xml_reference,
  559. 5: handle_xml_invite_group
  560. }
  561. handler = handlers.get(type_value)
  562. if handler:
  563. return handler(token_id,app_id, wxid,msg_data,from_wxid, to_wxid)
  564. # elif "邀请你加入了群聊" in msg_content_xml: # 邀请加入群聊
  565. # logger.warning(f"xml消息 {type_value} 邀请你加入了群聊.todo")
  566. else:
  567. print(f"xml消息 {type_value} 未解析")
  568. def handle_xml_reference(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  569. '''
  570. 引用消息
  571. 判断此类消息的逻辑:$.Data.MsgType=49 并且 解析$.Data.Content.string中的xml msg.appmsg.type=57
  572. '''
  573. callback_to_user=from_wxid
  574. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
  575. msg_content= msg_data["PushContent"]
  576. prompt={"role": "user", "content": [{
  577. "type": "text",
  578. "text": msg_content
  579. }]}
  580. # 收到的对话
  581. messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  582. input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  583. input_message=utils.dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message)
  584. kafka_helper.kafka_client.produce_message(input_message)
  585. logger.info("发送对话 %s",input_message)
  586. # 回复的对话
  587. res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  588. reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"])
  589. input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
  590. input_message=utils.dialogue_message(wxid,callback_to_user,input_wx_content_dialogue_message,True)
  591. kafka_helper.kafka_client.produce_message(input_message)
  592. logger.info("发送对话 %s",input_message)
  593. gewe_chat.wxchat.save_session_messages_to_cache(hash_key, {"role": "assistant", "content": reply_content})
  594. gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,reply_content)
  595. def handle_xml_invite_group(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  596. '''
  597. 群聊邀请
  598. 判断此类消息的逻辑:$.Data.MsgType=49
  599. 并且 解析$.Data.Content.string中的xml msg.appmsg.title=邀请你加入群聊(根据手机设置的系统语言title会有调整,不同语言关键字不同)
  600. '''
  601. logger.info(f'{wxid} 群聊邀请')
  602. msg_content_xml=msg_data["Content"]["string"]
  603. root = ET.fromstring(msg_content_xml)
  604. title_value = root.find(".//appmsg/title").text
  605. if '邀请你加入群聊' in title_value:
  606. invite_url = root.find('.//url').text
  607. ret,msg,data=gewe_chat.wxchat.agree_join_room(token_id,app_id,invite_url)
  608. if ret==200:
  609. logger.info(f'群聊邀请,同意加入群聊 {msg} {data}')
  610. chatroom_id=data.get('chatroomId','')
  611. # if not chatroom_id:
  612. # logger.warning(f'群聊邀请,同意加入群聊失败 {msg} {data}')
  613. # return
  614. ret,msg,data=gewe_chat.wxchat.save_contract_list(token_id,app_id,chatroom_id,3)
  615. logger.info(f'群聊邀请,保存到通讯录 chatroom_id {chatroom_id} {msg}')
  616. gewe_chat.wxchat.update_group_info_to_cache(token_id,app_id,wxid,chatroom_id)
  617. gewe_chat.wxchat.update_group_members_to_cache(token_id,app_id,wxid,chatroom_id)
  618. else:
  619. logger.warning(f'群聊邀请,同意加入群聊失败 {msg} {data}')
  620. def handle_add_friend_notice(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  621. '''
  622. 好友添加请求通知
  623. '''
  624. logger.info('好友添加请求通知')
  625. msg_content_xml=msg_data["Content"]["string"]
  626. root = ET.fromstring(msg_content_xml)
  627. msg_content = root.attrib.get('content', None)
  628. v3= root.attrib.get('encryptusername', None)
  629. v4= root.attrib.get('ticket', None)
  630. scene=root.attrib.get('scene', None)
  631. to_contact_wxid=root.attrib.get('fromusername', None)
  632. wxid=msg_data["ToUserName"]["string"]
  633. # 自动同意好友
  634. # print(v3)
  635. # print(v4)
  636. # print(scene)
  637. # print(msg_content)
  638. # 操作类型,2添加好友 3同意好友 4拒绝好友
  639. #option=2
  640. option=3
  641. reply_add_contact_contact="亲,我是你的好友"
  642. ret,ret_msg=gewe_chat.wxchat.add_contacts(token_id,app_id,scene,option,v3,v4,reply_add_contact_contact)
  643. if ret==200:
  644. logger.info('自动添加好友成功')
  645. # 好友发送的文字
  646. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{to_contact_wxid}'
  647. prompt={"role": "user", "content": [{"type": "text","text": msg_content}]}
  648. messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  649. input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  650. input_message=utils.dialogue_message(to_contact_wxid,wxid,input_wx_content_dialogue_message)
  651. kafka_helper.kafka_client.produce_message(input_message)
  652. logger.info("发送对话 %s",input_message)
  653. callback_to_user=to_contact_wxid
  654. res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  655. reply_content=remove_markdown_symbol(res["choices"][0]["message"]["content"])
  656. #保存好友信息
  657. gewe_chat.wxchat.save_contacts_brief_to_cache(token_id,app_id, wxid,[to_contact_wxid])
  658. # 保存到缓存
  659. gewe_chat.wxchat.save_session_messages_to_cache(hash_key, {"role": "assistant", "content": reply_content})
  660. # 发送信息
  661. gewe_chat.wxchat.post_text(token_id,app_id, to_contact_wxid,reply_content)
  662. # 发送到kafka
  663. input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
  664. input_message=utils.dialogue_message(wxid,to_contact_wxid,input_wx_content_dialogue_message,True)
  665. kafka_helper.kafka_client.produce_message(input_message)
  666. logger.info("发送对话 %s",input_message)
  667. else:
  668. logger.warning("添加好友失败")
  669. def handle_10002_msg(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  670. '''
  671. 群聊邀请
  672. 撤回消息
  673. 拍一拍消息
  674. 地理位置
  675. 踢出群聊通知
  676. 解散群聊通知
  677. 发布群公告
  678. '''
  679. msg_content_xml=msg_data["Content"]["string"]
  680. # 群聊邀请
  681. if '邀请你加入了群聊' in msg_content_xml and check_chatroom(msg_data["FromUserName"]["string"]):
  682. chatroom_id=msg_data["FromUserName"]["string"]
  683. ret,msg,data=gewe_chat.wxchat.save_contract_list(token_id,app_id,chatroom_id,3)
  684. logger.info(f'群聊邀请,保存到通讯录 chatroom_id {chatroom_id} {msg}')
  685. gewe_chat.wxchat.update_group_info_to_cache(token_id,app_id,wxid,chatroom_id)
  686. gewe_chat.wxchat.update_group_members_to_cache(token_id,app_id,wxid,chatroom_id)
  687. if '移出了群聊' in msg_content_xml and 'sysmsgtemplate' in msg_content_xml :
  688. chatroom_id=msg_data["FromUserName"]["string"]
  689. ret,msg,data=gewe_chat.wxchat.save_contract_list(token_id,app_id,chatroom_id,2)
  690. logger.info(f'踢出群聊,移除从通讯录 chatroom_id {chatroom_id} {msg}')
  691. redis_helper.redis_helper.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id)
  692. logger.info(f'清除 chatroom_id{chatroom_id} 数据')
  693. if '已解散该群聊' in msg_content_xml and 'sysmsgtemplate' in msg_content_xml :
  694. chatroom_id=msg_data["FromUserName"]["string"]
  695. ret,msg,data=gewe_chat.wxchat.save_contract_list(token_id,app_id,chatroom_id,2)
  696. logger.info(f'解散群聊,移除从通讯录 chatroom_id {chatroom_id} {msg}')
  697. redis_helper.redis_helper.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id)
  698. logger.info(f'清除 chatroom_id{chatroom_id} 数据')
  699. print('撤回消息,拍一拍消息,地理位置')
  700. def handle_mod_contacts(token_id,app_id,wxid,msg_data):
  701. '''
  702. 好友通过验证及好友资料变更的通知消息
  703. '''
  704. logger.info('好友通过验证及好友资料变更的通知消息')
  705. if not check_chatroom(msg_data["UserName"]["string"]):
  706. contact_wxid = msg_data["UserName"]["string"]
  707. # 更新好友信息
  708. # 检查好友关系,不是好友则删除
  709. # ret,msg,check_relation=gewe_chat.wxchat.check_relation(token_id, app_id,[contact_wxid])
  710. # first_item = check_relation[0]
  711. # check_relation_status=first_item.get('relation')
  712. # logger.info(f'{wxid} 好友 {contact_wxid} 关系检查:{check_relation_status}')
  713. # if check_relation_status != 0:
  714. # gewe_chat.wxchat.delete_contacts_brief_from_cache(wxid, [contact_wxid])
  715. # logger.info(f'好友关系异常:{check_relation_status},删除好友 {contact_wxid} 信息')
  716. # else:
  717. # gewe_chat.wxchat.save_contacts_brief_to_cache(token_id, app_id, wxid, [contact_wxid])
  718. ret,msg,contacts_list = gewe_chat.wxchat.fetch_contacts_list(token_id, app_id)
  719. # friend_wxids = contacts_list['friends'][3:] # 可以调整截取范围
  720. # print(friend_wxids)
  721. #friend_wxids.remove('fmessage')
  722. #friend_wxids.remove('weixin')
  723. friend_wxids = [c for c in contacts_list['friends'] if c not in ['fmessage', 'medianote','weixin','weixingongzhong']] # 可以调整截取范围
  724. print(f'{wxid}的好友数量 {len(friend_wxids)}')
  725. gewe_chat.wxchat.save_contacts_brief_to_cache(token_id, app_id, wxid, friend_wxids)
  726. else:
  727. logger.info('群聊好友通过验证及好友资料变更的通知消息')
  728. def get_messages_from_cache(hash_key,object:object)->list:
  729. '''
  730. 对话列表
  731. '''
  732. messages=redis_helper.redis_helper.get_hash(hash_key)
  733. wxid=hash_key.split(':')[-1]
  734. if not messages:
  735. messages=[{"role": "system", "content": ""}]
  736. messages.append(object)
  737. redis_helper.redis_helper.set_hash(hash_key,{"data":json.dumps(messages,ensure_ascii=False)},3600)
  738. else:
  739. messages_str=redis_helper.redis_helper.get_hash_field(hash_key,"data")
  740. messages = json.loads(messages_str) if messages_str else []
  741. #判断是否含有图片
  742. last_message = messages[-1]
  743. content = last_message.get("content", [])
  744. if isinstance(content, list) and content:
  745. last_content_type = content[-1].get("type")
  746. if last_content_type == 'image_url':
  747. content.append(object['content'][0])
  748. messages[-1]['content']=content
  749. else:
  750. messages.append(object)
  751. else:
  752. messages.append(object)
  753. redis_helper.redis_helper.set_hash(hash_key,{"data":json.dumps(messages,ensure_ascii=False)},3600)
  754. return messages
  755. def fast_gpt_api(messages:list,wixd:str,friend_wxid:str):
  756. c=gewe_chat.wxchat.get_wxchat_config_from_cache(wixd)
  757. api_key=c.get('agentTokenId',"sk-jr69ONIehfGKe9JFphuNk4DU5Y5wooHKHhQv7oSnFzVbwCnW65fXO9kvH")
  758. print(f'流程key:{api_key}\n')
  759. #api_key="sk-jr69ONIehfGKe9JFphuNk4DU5Y5wooHKHhQv7oSnFzVbwCnW65fXO9kvH" #测试
  760. #api_key="sk-uJDBdKmJVb2cmfldGOvlIY6Qx0AzqWMPD3lS1IzgQYzHNOXv9SKNI" #开发2
  761. api_url = "http://106.15.182.218:3000/api/v1/chat/completions"
  762. headers = {
  763. "Content-Type": "application/json",
  764. "Authorization": f"Bearer {api_key}"
  765. }
  766. session_id=f'{wixd}-{friend_wxid}'
  767. data={
  768. "model": "",
  769. "messages":messages,
  770. "chatId": session_id,
  771. "detail": True
  772. }
  773. #print(json.dumps(data,ensure_ascii=False))
  774. logger.info("[CHATGPT] 请求={}".format(json.dumps(data, ensure_ascii=False)))
  775. response = requests.post(url=api_url, headers=headers, data=json.dumps(data), timeout=600)
  776. response.raise_for_status()
  777. response_data = response.json()
  778. logger.info("[CHATGPT] 响应={}".format(json.dumps(response_data, separators=(',', ':'),ensure_ascii=False)))
  779. #print(response_data)
  780. return response_data
  781. def get_token_id_by_app_id(app_id: str) -> str:
  782. # 使用 SCAN 避免一次性返回所有的匹配键,逐步扫描
  783. cursor = 0
  784. while True:
  785. cursor, login_keys = redis_helper.redis_helper.client.scan(cursor, match='__AI_OPS_WX__:LOGININFO:*')
  786. # 批量获取所有键的 hash 数据
  787. for k in login_keys:
  788. r = redis_helper.redis_helper.get_hash(k)
  789. if r.get("appId") == app_id:
  790. return r.get("tokenId", "")
  791. # 如果游标为 0,则表示扫描完成
  792. if cursor == 0:
  793. break
  794. return ""
  795. def get_login_info_by_app_id(app_id: str) ->dict:
  796. # 使用 SCAN 避免一次性返回所有的匹配键,逐步扫描
  797. cursor = 0
  798. while True:
  799. cursor, login_keys = redis_helper.redis_helper.client.scan(cursor, match='__AI_OPS_WX__:LOGININFO:*')
  800. # 批量获取所有键的 hash 数据
  801. for k in login_keys:
  802. r = redis_helper.redis_helper.get_hash(k)
  803. if r.get("appId") == app_id:
  804. return k,r
  805. # 如果游标为 0,则表示扫描完成
  806. if cursor == 0:
  807. break
  808. return ""
  809. def contains_url(text):
  810. # 定义检测网址的正则表达式
  811. url_pattern = re.compile(
  812. r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+'
  813. )
  814. # 检查字符串是否包含网址
  815. return bool(url_pattern.search(text))
  816. def get_first_char_if_digit(s):
  817. if s and s[0].isdigit(): # 判断字符串是否非空且首字符为数字
  818. return int(s[0]) # 返回数字形式
  819. return None # 如果不是数字则返回 None
  820. def remove_at_mention_regex(text):
  821. # 使用正则表达式去掉“在群聊中@了你”
  822. return re.sub(r"在群聊中@了你", "", text)
  823. def extract_nickname(text)->str:
  824. if "在群聊中@了你" in text:
  825. # 如果包含 "在群聊中@了你",提取其前面的名字
  826. match = re.search(r"^(.*?)在群聊中@了你", text)
  827. if match:
  828. return match.group(1).strip()
  829. elif ": @" in text:
  830. # 如果包含 ": @",提取其前面的名字
  831. return text.split(": @")[0].strip()
  832. return ''
  833. def check_chatroom(userName):
  834. pattern = r'^\d+@chatroom$'
  835. if re.match(pattern, userName):
  836. return True
  837. return False
  838. # def remove_markdown_symbol(text: str):
  839. # # 移除markdown格式,目前先移除**
  840. # if not text or not isinstance(text, str):
  841. # return text
  842. # return re.sub(r'\*\*(.*?)\*\*', r'\1', text)
  843. def remove_markdown_symbol(text: str):
  844. # 移除markdown格式,目前先移除**
  845. if not text or not isinstance(text, str):
  846. return text
  847. # 去除加粗、斜体等格式
  848. #text = re.sub(r'\*\*([^*]+)\*\*', r'\1', text) # 去除加粗
  849. text=re.sub(r'\*\*(.*?)\*\*', r'\1', text)
  850. text = re.sub(r'\*([^*]+)\*', r'\1', text) # 去除斜体
  851. text = re.sub(r'__([^_]+)__', r'\1', text) # 去除加粗(下划线)
  852. text = re.sub(r'_(.*?)_', r'\1', text) # 去除斜体(下划线)
  853. # 去除行内代码块
  854. text = re.sub(r'`([^`]+)`', r'\1', text)
  855. # 去除换行符\n,或者多余的空格
  856. #text = re.sub(r'\n+', ' ', text)
  857. # 去除列表编号等
  858. #text = re.sub(r'^\d+\.\s*', '', text, flags=re.MULTILINE)
  859. #text = re.sub('[\\\`\*\_\[\]\#\+\-\!\>]', '', text)
  860. text = re.sub('[\\\`\*\_\[\]\#\+\!\>]', '', text)
  861. print(text)
  862. return text