選択できるのは25トピックまでです。 トピックは、先頭が英数字で、英数字とダッシュ('-')を使用した35文字以内のものにしてください。

messages_resource.py 40KB

4ヶ月前
3ヶ月前
4ヶ月前
4ヶ月前
2ヶ月前
4ヶ月前
2ヶ月前
4ヶ月前
3ヶ月前
4ヶ月前
3ヶ月前
4ヶ月前
3ヶ月前
4ヶ月前
3ヶ月前
2ヶ月前
4ヶ月前
3ヶ月前
4ヶ月前
3ヶ月前
4ヶ月前
3ヶ月前
4ヶ月前
3ヶ月前
3ヶ月前
4ヶ月前
3ヶ月前
4ヶ月前
3ヶ月前
4ヶ月前
3ヶ月前
4ヶ月前
3ヶ月前
4ヶ月前
3ヶ月前
4ヶ月前
3ヶ月前
4ヶ月前
3ヶ月前
3ヶ月前
3ヶ月前
4ヶ月前
3ヶ月前
4ヶ月前
2ヶ月前
4ヶ月前
2ヶ月前
4ヶ月前
2ヶ月前
4ヶ月前
2ヶ月前
4ヶ月前
2ヶ月前
4ヶ月前
2ヶ月前
4ヶ月前
2ヶ月前
2ヶ月前
2ヶ月前
2ヶ月前
4ヶ月前
2ヶ月前
2ヶ月前
3ヶ月前
2ヶ月前
4ヶ月前
2ヶ月前
3ヶ月前
4ヶ月前
3ヶ月前
3ヶ月前
3ヶ月前
3ヶ月前
3ヶ月前
3ヶ月前
3ヶ月前
3ヶ月前
3ヶ月前
3ヶ月前
3ヶ月前
3ヶ月前
3ヶ月前
4ヶ月前
3ヶ月前
4ヶ月前
4ヶ月前
3ヶ月前
4ヶ月前
3ヶ月前
4ヶ月前
3ヶ月前
3ヶ月前
4ヶ月前
3ヶ月前
3ヶ月前
4ヶ月前
3ヶ月前
3ヶ月前
2ヶ月前
3ヶ月前
4ヶ月前
3ヶ月前
3ヶ月前
3ヶ月前
3ヶ月前
3ヶ月前
3ヶ月前
4ヶ月前
3ヶ月前
3ヶ月前
4ヶ月前
3ヶ月前
4ヶ月前
3ヶ月前
4ヶ月前
3ヶ月前
4ヶ月前
4ヶ月前
3ヶ月前
4ヶ月前
3ヶ月前
4ヶ月前
3ヶ月前
4ヶ月前
3ヶ月前
4ヶ月前
3ヶ月前
4ヶ月前
3ヶ月前
4ヶ月前
3ヶ月前
3ヶ月前
4ヶ月前
3ヶ月前
4ヶ月前
3ヶ月前
3ヶ月前
4ヶ月前
3ヶ月前
4ヶ月前
3ヶ月前
3ヶ月前
4ヶ月前
3ヶ月前
4ヶ月前
3ヶ月前
4ヶ月前
3ヶ月前
4ヶ月前
3ヶ月前
4ヶ月前
3ヶ月前
3ヶ月前
3ヶ月前
3ヶ月前
3ヶ月前
3ヶ月前
4ヶ月前
3ヶ月前
4ヶ月前
3ヶ月前
3ヶ月前
4ヶ月前
4ヶ月前
3ヶ月前
4ヶ月前
3ヶ月前
4ヶ月前
3ヶ月前
3ヶ月前
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921
  1. from flask_restful import Resource, reqparse
  2. from flask import jsonify,request
  3. import requests,json,re
  4. from wechat import gewe_chat
  5. from voice.ali.ali_voice import AliVoice
  6. from common import utils,redis_helper,memory,kafka_helper
  7. from common.log import logger
  8. import xml.etree.ElementTree as ET
  9. import threading,time
  10. import os
  11. from voice import audio_convert
  12. timeout_duration = 4.0
  13. class MessagesResource(Resource):
  14. def __init__(self):
  15. self.parser = reqparse.RequestParser()
  16. def post(self):
  17. msg = request.get_json()
  18. logger.info(f"收到微信回调消息: {msg}")
  19. type_name =msg.get("TypeName")
  20. app_id = msg.get("Appid")
  21. # token_id = "f828cb3c-1039-489f-b9ae-7494d1778a15"
  22. token_id=get_token_id_by_app_id(app_id)
  23. if token_id=="":
  24. logger.warning('找不到登录信息,不处理')
  25. return jsonify({"message": "收到微信回调消息"})
  26. if type_name=='AddMsg':
  27. wxid = msg.get("Wxid")
  28. msg_data = msg.get("Data")
  29. msg_type = msg_data.get("MsgType",None)
  30. from_wxid = msg_data["FromUserName"]["string"]
  31. to_wxid = msg_data["ToUserName"]["string"]
  32. msg_push_content=msg_data.get("PushContent") #群发
  33. wx_config = gewe_chat.wxchat.get_wxchat_config_from_cache(wxid)
  34. if not bool(wx_config.get("agentEnabled",False)):
  35. logger.warning(f'{wxid} 智能体已关闭')
  36. handlers = {
  37. 49: handle_xml,
  38. 37: handle_add_friend_notice,
  39. 10002: handle_10002_msg
  40. }
  41. else:
  42. handlers = {
  43. 1: handle_text,
  44. 3: handle_image,
  45. 34: handle_voice,
  46. 49: handle_xml,
  47. 37: handle_add_friend_notice,
  48. 10002: handle_10002_msg
  49. }
  50. # (扫码进群情况)判断受否是群聊,并添加到通信录
  51. if check_chatroom(from_wxid) or check_chatroom(to_wxid):
  52. logger.info('群信息')
  53. chatroom_id=from_wxid
  54. ret,msg,data=gewe_chat.wxchat.save_contract_list(token_id,app_id,chatroom_id,3)
  55. logger.info(f'保存到通讯录 chatroom_id {chatroom_id} {msg}')
  56. gewe_chat.wxchat.update_groups_info_to_cache(token_id,app_id,wxid,chatroom_id)
  57. handlers[1]=handle_text_group
  58. handlers[3]=handle_image_group
  59. handlers[34]=handle_voice_group
  60. handler = handlers.get(msg_type)
  61. if handler:
  62. return handler(token_id,app_id, wxid,msg_data,from_wxid, to_wxid)
  63. else:
  64. logger.warning(f"微信回调消息类型 {msg_type} 未处理")
  65. elif type_name=='ModContacts':
  66. '''
  67. 好友通过验证及好友资料变更的通知消息
  68. '''
  69. wxid = msg.get("Wxid")
  70. msg_data = msg.get("Data")
  71. handle_mod_contacts(token_id,app_id,wxid,msg_data)
  72. elif type_name=="DelContacts":
  73. '''
  74. 删除好友通知/退出群聊
  75. '''
  76. msg_data = msg.get("Data")
  77. username=msg_data["UserName"]["string"]
  78. if check_chatroom(username):
  79. logger.info('退出群聊')
  80. wxid = msg.get("Wxid")
  81. chatroom_id=username
  82. redis_helper.redis_helper.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id)
  83. logger.info(f'清除 chatroom_id{chatroom_id} 数据')
  84. else:
  85. logger.info('删除好友通知')
  86. elif type_name=="Offline":
  87. '''
  88. 已经离线
  89. '''
  90. wxid = msg.get("Wxid")
  91. logger.warning(f'微信ID {wxid}在设备{app_id}已经离线')
  92. k,r=get_login_info_by_app_id(app_id)
  93. print(k)
  94. redis_helper.redis_helper.update_hash_field(k,'status',0)
  95. else:
  96. logger.warning(f"未知消息类型")
  97. return jsonify({"message": "收到微信回调消息"})
  98. def handle_text(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  99. '''
  100. 私聊文本消息
  101. '''
  102. msg_content=msg_data["Content"]["string"]
  103. if wxid == from_wxid: #手动发送消息
  104. logger.info("Active message sending detected")
  105. gewe_chat.wxchat.save_contacts_brief_to_cache(token_id,app_id,wxid,[to_wxid])
  106. callback_to_user=msg_data["ToUserName"]["string"]
  107. input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  108. input_message=utils.dialogue_message(from_wxid,to_wxid,input_wx_content_dialogue_message)
  109. kafka_helper.kafka_client.produce_message(input_message)
  110. logger.info("发送对话 %s",input_message)
  111. else:
  112. callback_to_user=msg_data["FromUserName"]["string"]
  113. # hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
  114. # prompt={"role": "user", "content": [{
  115. # "type": "text",
  116. # "text": msg_content
  117. # }]}
  118. # messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  119. # # 收到的对话
  120. # input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  121. # input_message=utils.dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message)
  122. # kafka_helper.kafka_client.produce_message(input_message)
  123. # logger.info("发送对话 %s",input_message)
  124. # cache_data = memory.USER_INTERACTIVE_CACHE.get(wxid)
  125. # if cache_data and cache_data.get('interactive') :
  126. # o=get_first_char_if_digit(msg_content)
  127. # if o is not None:
  128. # userSelectOptions=cache_data.get('options')
  129. # if o < len(userSelectOptions):
  130. # o=o-1
  131. # msg_content=userSelectOptions[o].get("value")
  132. # messages_to_send=[{"role": "user", "content": msg_content}]
  133. # else:
  134. # messages_to_send=[{"role": "user", "content": msg_content}]
  135. # else:
  136. # messages_to_send=[{"role": "user", "content": msg_content}]
  137. # res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  138. # reply_content=res["choices"][0]["message"]["content"]
  139. # description = ''
  140. # userSelectOptions = []
  141. # if isinstance(reply_content, list) and any(item.get("type") == "interactive" for item in reply_content):
  142. # for item in reply_content:
  143. # if item["type"] == "interactive" and item["interactive"]["type"] == "userSelect":
  144. # params = item["interactive"]["params"]
  145. # description = params.get("description")
  146. # userSelectOptions = params.get("userSelectOptions", [])
  147. # values_string = "\n".join(option["value"] for option in userSelectOptions)
  148. # if description is not None:
  149. # memory.USER_INTERACTIVE_CACHE[wxid] = {
  150. # "interactive":True,
  151. # "options": userSelectOptions,
  152. # }
  153. # reply_content=description + '------------------------------\n'+values_string
  154. # elif isinstance(reply_content, list) and any(item.get("type") == "text" for item in reply_content):
  155. # memory.USER_INTERACTIVE_CACHE[wxid] = {
  156. # "interactive":False
  157. # }
  158. # text=''
  159. # for item in reply_content:
  160. # if item["type"] == "text":
  161. # text=item["text"]["content"]
  162. # if text=='':
  163. # # 去除上次上一轮对话再次请求
  164. # cache_messages_str=redis_helper.redis_helper.get_hash_field(hash_key,"data")
  165. # cache_messages = json.loads(cache_messages_str) if cache_messages_str else []
  166. # if len(cache_messages) >= 3:
  167. # cache_messages = cache_messages[:-3]
  168. # redis_helper.redis_helper.update_hash_field(hash_key,"data",json.dumps(cache_messages,ensure_ascii=False))
  169. # messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  170. # res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  171. # reply_content=res["choices"][0]["message"]["content"]
  172. # if isinstance(reply_content, list) :
  173. # reply_content=reply_content[0].get('text').get("content")
  174. # else:
  175. # reply_content=text
  176. # else:
  177. # memory.USER_INTERACTIVE_CACHE[wxid] = {
  178. # "interactive":False
  179. # }
  180. # reply_content=res["choices"][0]["message"]["content"]
  181. # gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,reply_content)
  182. # gewe_chat.wxchat.save_session_messages_to_cache(hash_key, {"role": "assistant", "content": reply_content})
  183. # # 回复的对话
  184. # input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
  185. # input_message=utils.dialogue_message(wxid,callback_to_user,input_wx_content_dialogue_message,True)
  186. # kafka_helper.kafka_client.produce_message(input_message)
  187. # logger.info("发送对话 %s",input_message)
  188. # 创建并启动任务线程,将参数传递给 ai_chat_text 函数
  189. task_thread = threading.Thread(
  190. target=ai_chat_text,
  191. args=(token_id,app_id,wxid,msg_data,msg_content)
  192. )
  193. task_thread.start()
  194. # 设置定时器,1秒后检查任务是否超时。这里需要使用 lambda 来传递参数
  195. timeout_timer = threading.Timer(
  196. timeout_duration,
  197. lambda:check_timeout(task_thread, token_id, wxid,app_id, callback_to_user)
  198. )
  199. timeout_timer.start()
  200. # 等待任务线程完成
  201. task_thread.join()
  202. # 取消定时器
  203. timeout_timer.cancel()
  204. def check_timeout( task_thread:threading.Thread, token_id,wxid, app_id, callback_to_user):
  205. if task_thread.is_alive():
  206. print(f"任务运行时间超过{timeout_duration}秒,token_id={token_id}, app_id={app_id}, callback_to_user={callback_to_user}")
  207. wx_config = gewe_chat.wxchat.get_wxchat_config_from_cache(wxid)
  208. if bool(wx_config.get("chatWaitingMsgEnabled",True)):
  209. gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,"亲,我正在组织回复的信息,请稍等一会")
  210. def ai_chat_text(token_id,app_id,wxid,msg_data,msg_content):
  211. start_time = time.time() # 记录任务开始时间
  212. callback_to_user=msg_data["FromUserName"]["string"]
  213. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
  214. prompt={"role": "user", "content": [{
  215. "type": "text",
  216. "text": msg_content
  217. }]}
  218. messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  219. # 收到的对话
  220. input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  221. input_message=utils.dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message)
  222. kafka_helper.kafka_client.produce_message(input_message)
  223. logger.info("发送对话 %s",input_message)
  224. cache_data = memory.USER_INTERACTIVE_CACHE.get(wxid)
  225. if cache_data and cache_data.get('interactive') :
  226. o=get_first_char_if_digit(msg_content)
  227. if o is not None:
  228. userSelectOptions=cache_data.get('options')
  229. if o < len(userSelectOptions):
  230. o=o-1
  231. msg_content=userSelectOptions[o].get("value")
  232. messages_to_send=[{"role": "user", "content": msg_content}]
  233. else:
  234. messages_to_send=[{"role": "user", "content": msg_content}]
  235. else:
  236. messages_to_send=[{"role": "user", "content": msg_content}]
  237. res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  238. reply_content=res["choices"][0]["message"]["content"]
  239. description = ''
  240. userSelectOptions = []
  241. if isinstance(reply_content, list) and any(item.get("type") == "interactive" for item in reply_content):
  242. for item in reply_content:
  243. if item["type"] == "interactive" and item["interactive"]["type"] == "userSelect":
  244. params = item["interactive"]["params"]
  245. description = params.get("description")
  246. userSelectOptions = params.get("userSelectOptions", [])
  247. values_string = "\n".join(option["value"] for option in userSelectOptions)
  248. if description is not None:
  249. memory.USER_INTERACTIVE_CACHE[wxid] = {
  250. "interactive":True,
  251. "options": userSelectOptions,
  252. }
  253. reply_content=description + '------------------------------\n'+values_string
  254. elif isinstance(reply_content, list) and any(item.get("type") == "text" for item in reply_content):
  255. memory.USER_INTERACTIVE_CACHE[wxid] = {
  256. "interactive":False
  257. }
  258. text=''
  259. for item in reply_content:
  260. if item["type"] == "text":
  261. text=item["text"]["content"]
  262. if text=='':
  263. # 去除上次上一轮对话再次请求
  264. cache_messages_str=redis_helper.redis_helper.get_hash_field(hash_key,"data")
  265. cache_messages = json.loads(cache_messages_str) if cache_messages_str else []
  266. if len(cache_messages) >= 3:
  267. cache_messages = cache_messages[:-3]
  268. redis_helper.redis_helper.update_hash_field(hash_key,"data",json.dumps(cache_messages,ensure_ascii=False))
  269. messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  270. res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  271. reply_content=res["choices"][0]["message"]["content"]
  272. if isinstance(reply_content, list) :
  273. reply_content=reply_content[0].get('text').get("content")
  274. else:
  275. reply_content=text
  276. else:
  277. memory.USER_INTERACTIVE_CACHE[wxid] = {
  278. "interactive":False
  279. }
  280. reply_content=res["choices"][0]["message"]["content"]
  281. gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,reply_content)
  282. gewe_chat.wxchat.save_session_messages_to_cache(hash_key, {"role": "assistant", "content": reply_content})
  283. # 回复的对话
  284. input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
  285. input_message=utils.dialogue_message(wxid,callback_to_user,input_wx_content_dialogue_message,True)
  286. kafka_helper.kafka_client.produce_message(input_message)
  287. logger.info("发送对话 %s",input_message)
  288. end_time = time.time() # 记录任务结束时间
  289. execution_time = end_time - start_time # 计算执行时间
  290. logger.info(f"AI回答任务完成,耗时 {execution_time:.2f} 秒")
  291. def handle_text_group(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  292. '''
  293. 群聊文本消息
  294. '''
  295. msg_content=msg_data["Content"]["string"]
  296. msg_push_content=msg_data.get("PushContent")
  297. k,login_info=get_login_info_by_app_id(app_id)
  298. nickname=login_info.get("nickName")
  299. if wxid == from_wxid: #手动发送消息
  300. logger.info("Active message sending detected")
  301. gewe_chat.wxchat.save_contacts_brief_to_cache(token_id,app_id,wxid,[to_wxid])
  302. callback_to_user=msg_data["ToUserName"]["string"]
  303. input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  304. input_message=utils.dialogue_message(from_wxid,to_wxid,input_wx_content_dialogue_message)
  305. kafka_helper.kafka_client.produce_message(input_message)
  306. logger.info("发送对话 %s",input_message)
  307. else:
  308. c = gewe_chat.wxchat.get_wxchat_config_from_cache(wxid)
  309. chatroom_id_white_list = c.get("chatroomIdWhiteList", [])
  310. if not chatroom_id_white_list:
  311. logger.info('白名单为空或未定义,不处理')
  312. return
  313. if from_wxid not in chatroom_id_white_list:
  314. logger.info('群ID不在白名单中,不处理')
  315. return
  316. if '在群聊中@了你' in msg_push_content or '@'+nickname in msg_push_content:
  317. callback_to_user=msg_data["FromUserName"]["string"]
  318. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
  319. prompt={"role": "user", "content": [{
  320. "type": "text",
  321. "text": msg_content
  322. }]}
  323. messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  324. # 收到的对话
  325. input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  326. input_message=utils.dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message)
  327. kafka_helper.kafka_client.produce_message(input_message)
  328. logger.info("发送对话 %s",input_message)
  329. cache_data = memory.USER_INTERACTIVE_CACHE.get(wxid)
  330. if cache_data and cache_data.get('interactive') :
  331. o=get_first_char_if_digit(msg_content)
  332. if o is not None:
  333. userSelectOptions=cache_data.get('options')
  334. if o < len(userSelectOptions):
  335. o=o-1
  336. msg_content=userSelectOptions[o].get("value")
  337. messages_to_send=[{"role": "user", "content": msg_content}]
  338. else:
  339. messages_to_send=[{"role": "user", "content": msg_content}]
  340. else:
  341. messages_to_send=[{"role": "user", "content": msg_content}]
  342. res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  343. reply_content=res["choices"][0]["message"]["content"]
  344. description = ''
  345. userSelectOptions = []
  346. if isinstance(reply_content, list) and any(item.get("type") == "interactive" for item in reply_content):
  347. for item in reply_content:
  348. if item["type"] == "interactive" and item["interactive"]["type"] == "userSelect":
  349. params = item["interactive"]["params"]
  350. description = params.get("description")
  351. userSelectOptions = params.get("userSelectOptions", [])
  352. values_string = "\n".join(option["value"] for option in userSelectOptions)
  353. if description is not None:
  354. memory.USER_INTERACTIVE_CACHE[wxid] = {
  355. "interactive":True,
  356. "options": userSelectOptions,
  357. }
  358. reply_content=description + '------------------------------\n'+values_string
  359. elif isinstance(reply_content, list) and any(item.get("type") == "text" for item in reply_content):
  360. memory.USER_INTERACTIVE_CACHE[wxid] = {
  361. "interactive":False
  362. }
  363. text=''
  364. for item in reply_content:
  365. if item["type"] == "text":
  366. text=item["text"]["content"]
  367. if text=='':
  368. # 去除上次上一轮对话再次请求
  369. cache_messages_str=redis_helper.redis_helper.get_hash_field(hash_key,"data")
  370. cache_messages = json.loads(cache_messages_str) if cache_messages_str else []
  371. if len(cache_messages) >= 3:
  372. cache_messages = cache_messages[:-3]
  373. redis_helper.redis_helper.update_hash_field(hash_key,"data",json.dumps(cache_messages,ensure_ascii=False))
  374. messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  375. res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  376. reply_content=res["choices"][0]["message"]["content"]
  377. else:
  378. reply_content=text
  379. else:
  380. memory.USER_INTERACTIVE_CACHE[wxid] = {
  381. "interactive":False
  382. }
  383. reply_content=res["choices"][0]["message"]["content"]
  384. reply_content='@'+extract_nickname(msg_push_content) + reply_content
  385. gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,reply_content)
  386. gewe_chat.wxchat.save_session_messages_to_cache(hash_key, {"role": "assistant", "content": reply_content})
  387. # 回复的对话
  388. input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
  389. input_message=utils.dialogue_message(wxid,callback_to_user,input_wx_content_dialogue_message,True)
  390. kafka_helper.kafka_client.produce_message(input_message)
  391. logger.info("发送对话 %s",input_message)
  392. else:
  393. logger.info('群聊公开消息')
  394. callback_to_user=msg_data["FromUserName"]["string"]
  395. dialogue_message=[{"type": "text", "text": msg_content}]
  396. input_message=utils.dialogue_message(callback_to_user,wxid,dialogue_message)
  397. kafka_helper.kafka_client.produce_message(input_message)
  398. logger.info("发送对话 %s",input_message)
  399. return
  400. def handle_image(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  401. '''
  402. 图片消息
  403. '''
  404. msg_content=msg_data["Content"]["string"]
  405. callback_to_user=from_wxid
  406. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
  407. wx_img_url=gewe_chat.wxchat.download_image_msg(token_id,app_id,msg_content)
  408. oss_access_key_id="LTAI5tRTG6pLhTpKACJYoPR5"
  409. oss_access_key_secret="E7dMzeeMxq4VQvLg7Tq7uKf3XWpYfN"
  410. oss_endpoint="http://oss-cn-shanghai.aliyuncs.com"
  411. oss_bucket_name="cow-agent"
  412. oss_prefix="cow"
  413. img_url=utils.upload_oss(oss_access_key_id, oss_access_key_secret, oss_endpoint, oss_bucket_name, wx_img_url, oss_prefix)
  414. prompt={
  415. "role": "user",
  416. "content": [{
  417. "type": "image_url",
  418. "image_url": {"url": img_url}
  419. }]
  420. }
  421. gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  422. gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,'已经上传了图片,有什么可以为您服务')
  423. logger.info(f"上传图片 URL: {img_url}")
  424. wx_content_dialogue_message=[{"type": "image_url", "image_url": {"url": img_url}}]
  425. input_message=utils.dialogue_message(wxid,callback_to_user,wx_content_dialogue_message)
  426. kafka_helper.kafka_client.produce_message(input_message)
  427. logger.info("发送对话 %s",input_message)
  428. def handle_image_group(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  429. logger.info('群聊图片消息')
  430. def handle_voice(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  431. '''
  432. 语音消息
  433. '''
  434. callback_to_user=from_wxid
  435. msg_content=msg_data["Content"]["string"]
  436. msg_id=msg_data["MsgId"]
  437. file_url=gewe_chat.wxchat.download_audio_msg(token_id,app_id,msg_id,msg_content)
  438. react_silk_path=utils.save_to_local_from_url(file_url)
  439. react_wav_path = os.path.splitext(react_silk_path)[0] + ".wav"
  440. audio_convert.any_to_wav(react_silk_path,react_wav_path)
  441. react_voice_text=AliVoice().voiceToText(react_wav_path)
  442. os.remove(react_silk_path)
  443. os.remove(react_wav_path)
  444. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
  445. messages=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, {"role": "user", "content": react_voice_text})
  446. ai_res=fast_gpt_api(messages,wxid,callback_to_user)
  447. ai_res_content=ai_res["choices"][0]["message"]["content"]
  448. has_url=contains_url(ai_res_content)
  449. if not has_url:
  450. voice_during,voice_url=utils.wx_voice(ai_res_content)
  451. if voice_during < 60 * 1000:
  452. ret,ret_msg,res=gewe_chat.wxchat.post_voice(token_id,app_id,callback_to_user,voice_url,voice_during)
  453. else:
  454. ret,ret_msg,res=gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,ai_res_content)
  455. logger.warning(f'回应语音消息长度 {voice_during/1000}秒,超过60秒,转为文本回复')
  456. if ret==200:
  457. logger.info((f'{wxid} 向 {callback_to_user} 发送语音文本【{ai_res_content}】{ret_msg}'))
  458. else:
  459. logger.warning((f'{wxid} 向 {callback_to_user} 发送语音文本【{ai_res_content}】{ret_msg}'))
  460. ret,ret_msg,res=gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,ai_res_content)
  461. logger.info((f'{wxid} 向 {callback_to_user} 发送文本【{ai_res_content}】{ret_msg}'))
  462. else:
  463. logger.info(f"回复内容包含网址,不发送语音,回复文字内容:{ai_res_content}")
  464. ret,ret_msg,res=gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,ai_res_content)
  465. gewe_chat.wxchat.save_session_messages_to_cache(hash_key, {"role": "assistant", "content": ai_res_content})
  466. # 构造对话消息并发送到 Kafka
  467. input_wx_content_dialogue_message = [{"type": "text", "text": ai_res_content}]
  468. input_message = utils.dialogue_message(wxid, callback_to_user, input_wx_content_dialogue_message,True)
  469. kafka_helper.kafka_client.produce_message(input_message)
  470. logger.info("发送对话 %s", input_message)
  471. def handle_voice_group(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  472. logger.info('语音消息')
  473. def handle_xml(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  474. '''
  475. 处理xml
  476. '''
  477. msg_content_xml=msg_data["Content"]["string"]
  478. root = ET.fromstring(msg_content_xml)
  479. type_value = root.find(".//appmsg/type").text
  480. handlers = {
  481. 57: handle_xml_reference,
  482. }
  483. handler = handlers.get(type_value)
  484. if handler:
  485. return handler(token_id,app_id, wxid,msg_data,from_wxid, to_wxid)
  486. elif "邀请你加入了群聊" in msg_content_xml: # 邀请加入群聊
  487. logger.warning(f"xml消息 {type_value} 邀请你加入了群聊.todo")
  488. else:
  489. print(f"xml消息 {type_value} 未解析")
  490. def handle_xml_reference(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  491. '''
  492. 判断此类消息的逻辑:$.Data.MsgType=49 并且 解析$.Data.Content.string中的xml msg.appmsg.type=57
  493. '''
  494. callback_to_user=from_wxid
  495. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{callback_to_user}'
  496. msg_content= msg_data["PushContent"]
  497. prompt={"role": "user", "content": [{
  498. "type": "text",
  499. "text": msg_content
  500. }]}
  501. # 收到的对话
  502. messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  503. input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  504. input_message=utils.dialogue_message(callback_to_user,wxid,input_wx_content_dialogue_message)
  505. kafka_helper.kafka_client.produce_message(input_message)
  506. logger.info("发送对话 %s",input_message)
  507. # 回复的对话
  508. res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  509. reply_content=res["choices"][0]["message"]["content"]
  510. input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
  511. input_message=utils.dialogue_message(wxid,callback_to_user,input_wx_content_dialogue_message,True)
  512. kafka_helper.kafka_client.produce_message(input_message)
  513. logger.info("发送对话 %s",input_message)
  514. gewe_chat.wxchat.save_session_messages_to_cache(hash_key, {"role": "assistant", "content": reply_content})
  515. gewe_chat.wxchat.post_text(token_id,app_id,callback_to_user,reply_content)
  516. def handle_add_friend_notice(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  517. '''
  518. 好友添加请求通知
  519. '''
  520. logger.info('好友添加请求通知')
  521. msg_content_xml=msg_data["Content"]["string"]
  522. root = ET.fromstring(msg_content_xml)
  523. msg_content = root.attrib.get('content', None)
  524. v3= root.attrib.get('encryptusername', None)
  525. v4= root.attrib.get('ticket', None)
  526. scene=root.attrib.get('scene', None)
  527. to_contact_wxid=root.attrib.get('fromusername', None)
  528. wxid=msg_data["ToUserName"]["string"]
  529. # 自动同意好友
  530. # print(v3)
  531. # print(v4)
  532. # print(scene)
  533. # print(msg_content)
  534. # 操作类型,2添加好友 3同意好友 4拒绝好友
  535. #option=2
  536. option=3
  537. reply_add_contact_contact="亲,我是你的美丽顾问"
  538. ret,ret_msg=gewe_chat.wxchat.add_contacts(token_id,app_id,scene,option,v3,v4,reply_add_contact_contact)
  539. if ret==200:
  540. logger.info('自动添加好友成功')
  541. # 好友发送的文字
  542. hash_key = f'__AI_OPS_WX__:MESSAGES:{wxid}:{to_contact_wxid}'
  543. prompt={"role": "user", "content": [{"type": "text","text": msg_content}]}
  544. messages_to_send=gewe_chat.wxchat.save_session_messages_to_cache(hash_key, prompt)
  545. input_wx_content_dialogue_message=[{"type": "text", "text": msg_content}]
  546. input_message=utils.dialogue_message(to_contact_wxid,wxid,input_wx_content_dialogue_message)
  547. kafka_helper.kafka_client.produce_message(input_message)
  548. logger.info("发送对话 %s",input_message)
  549. callback_to_user=to_contact_wxid
  550. res=fast_gpt_api(messages_to_send,wxid,callback_to_user)
  551. reply_content=res["choices"][0]["message"]["content"]
  552. #保存好友信息
  553. gewe_chat.wxchat.save_contacts_brief_to_cache(token_id,app_id, wxid,[to_contact_wxid])
  554. # 保存到缓存
  555. gewe_chat.wxchat.save_session_messages_to_cache(hash_key, {"role": "assistant", "content": reply_content})
  556. # 发送信息
  557. gewe_chat.wxchat.post_text(token_id,app_id, to_contact_wxid,reply_content)
  558. # 发送到kafka
  559. input_wx_content_dialogue_message=[{"type": "text", "text": reply_content}]
  560. input_message=utils.dialogue_message(wxid,to_contact_wxid,input_wx_content_dialogue_message,True)
  561. kafka_helper.kafka_client.produce_message(input_message)
  562. logger.info("发送对话 %s",input_message)
  563. else:
  564. logger.warning("添加好友失败")
  565. def handle_10002_msg(token_id,app_id, wxid,msg_data,from_wxid, to_wxid):
  566. '''
  567. 群聊邀请
  568. 撤回消息
  569. 拍一拍消息
  570. 地理位置
  571. 踢出群聊通知
  572. 解散群聊通知
  573. 发布群公告
  574. '''
  575. msg_content_xml=msg_data["Content"]["string"]
  576. # 群聊邀请
  577. if '邀请你加入了群聊' in msg_content_xml and check_chatroom(msg_data["FromUserName"]["string"]):
  578. chatroom_id=msg_data["FromUserName"]["string"]
  579. ret,msg,data=gewe_chat.wxchat.save_contract_list(token_id,app_id,chatroom_id,3)
  580. logger.info(f'群聊邀请,保存到通讯录 chatroom_id {chatroom_id} {msg}')
  581. gewe_chat.wxchat.update_groups_info_to_cache(token_id,app_id,wxid,chatroom_id)
  582. if '移出了群聊' in msg_content_xml and 'sysmsgtemplate' in msg_content_xml :
  583. chatroom_id=msg_data["FromUserName"]["string"]
  584. ret,msg,data=gewe_chat.wxchat.save_contract_list(token_id,app_id,chatroom_id,2)
  585. logger.info(f'踢出群聊,移除从通讯录 chatroom_id {chatroom_id} {msg}')
  586. redis_helper.redis_helper.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id)
  587. logger.info(f'清除 chatroom_id{chatroom_id} 数据')
  588. if '已解散该群聊' in msg_content_xml and 'sysmsgtemplate' in msg_content_xml :
  589. chatroom_id=msg_data["FromUserName"]["string"]
  590. ret,msg,data=gewe_chat.wxchat.save_contract_list(token_id,app_id,chatroom_id,2)
  591. logger.info(f'解散群聊,移除从通讯录 chatroom_id {chatroom_id} {msg}')
  592. redis_helper.redis_helper.delete_hash_field(f'__AI_OPS_WX__:GROUPS_INFO:{wxid}',chatroom_id)
  593. logger.info(f'清除 chatroom_id{chatroom_id} 数据')
  594. print('撤回消息,拍一拍消息,地理位置')
  595. def handle_mod_contacts(token_id,app_id,wxid,msg_data):
  596. '''
  597. 好友通过验证及好友资料变更的通知消息
  598. '''
  599. logger.info('好友通过验证及好友资料变更的通知消息')
  600. if not check_chatroom(msg_data["UserName"]["string"]):
  601. contact_wxid = msg_data["UserName"]["string"]
  602. # 更新好友信息
  603. ret,msg,contacts_list = gewe_chat.wxchat.fetch_contacts_list(token_id, app_id)
  604. friend_wxids = contacts_list['friends'][3:] # 可以调整截取范围
  605. print(friend_wxids)
  606. #friend_wxids.remove('weixin')
  607. gewe_chat.wxchat.save_contacts_brief_to_cache(token_id, app_id, wxid, friend_wxids)
  608. # keys=redis_helper.redis_helper.client.keys('__AI_OPS_WX__:LOGININFO:*')
  609. # wxid=""
  610. # for k in keys:
  611. # key=k.decode('utf-8')
  612. # hash_key=f"__AI_OPS_WX__:LOGININFO:{key}"
  613. # cache_app_id=redis_helper.redis_helper.get_hash_field(hash_key,"appId")
  614. # if app_id==cache_app_id:
  615. # wxid=redis_helper.redis_helper.get_hash_field(hash_key,"wxid")
  616. # break
  617. # print(wxid)
  618. # print('~~~~~~~~~~~~~~~~~~~~~~~~~~~\n')
  619. # if wxid!="":
  620. # print('~~~~~~~~~~~~~~~~~~~~~~~~~~~\n')
  621. # hash_key=f"__AI_OPS_WX__:CONTACTS_BRIEF:{wxid}"
  622. # ret,msg,contacts_list = gewe_chat.wxchat.fetch_contacts_list(token_id, app_id)
  623. # friend_wxids = contacts_list['friends'][3:] # 可以调整截取范围
  624. # print(friend_wxids)
  625. # #friend_wxids.remove('weixin')
  626. # gewe_chat.wxchat.save_contacts_brief_to_cache(token_id, app_id, wxid, friend_wxids)
  627. # cache_str=redis_helper.redis_helper.get_hash_field(hash_key,"data")
  628. # cache = json.loads(cache_str) if cache_str else []
  629. # for c in cache:
  630. # if c["userName"]==contact_wxid:
  631. # c["nickName"] = msg_data["NickName"]
  632. # c["snsBgImg"] = msg_data["SnsBgimgId"]
  633. # c["smallHeadImgUrl"] = msg_data["SmallHeadImgUrl"]
  634. # c["signature"]= msg_data["Signature"]
  635. # # 更新缓存,如果有修改过的话
  636. # if cache:
  637. # updated_cache_str = json.dumps(cache)
  638. # redis_helper.redis_helper.set_hash_field(hash_key, "data", updated_cache_str)
  639. else:
  640. logger.info('群聊好友通过验证及好友资料变更的通知消息')
  641. def get_messages_from_cache(hash_key,object:object)->list:
  642. '''
  643. 对话列表
  644. '''
  645. messages=redis_helper.redis_helper.get_hash(hash_key)
  646. wxid=hash_key.split(':')[-1]
  647. if not messages:
  648. messages=[{"role": "system", "content": ""}]
  649. messages.append(object)
  650. redis_helper.redis_helper.set_hash(hash_key,{"data":json.dumps(messages,ensure_ascii=False)},3600)
  651. else:
  652. messages_str=redis_helper.redis_helper.get_hash_field(hash_key,"data")
  653. messages = json.loads(messages_str) if messages_str else []
  654. #判断是否含有图片
  655. last_message = messages[-1]
  656. content = last_message.get("content", [])
  657. if isinstance(content, list) and content:
  658. last_content_type = content[-1].get("type")
  659. if last_content_type == 'image_url':
  660. content.append(object['content'][0])
  661. messages[-1]['content']=content
  662. else:
  663. messages.append(object)
  664. else:
  665. messages.append(object)
  666. redis_helper.redis_helper.set_hash(hash_key,{"data":json.dumps(messages,ensure_ascii=False)},3600)
  667. return messages
  668. # def fast_gpt_api(messages:list,session_id:str):
  669. # api_key="sk-jr69ONIehfGKe9JFphuNk4DU5Y5wooHKHhQv7oSnFzVbwCnW65fXO9kvH" #测试
  670. # #api_key="sk-uJDBdKmJVb2cmfldGOvlIY6Qx0AzqWMPD3lS1IzgQYzHNOXv9SKNI" #开发2
  671. # api_url = "http://106.15.182.218:3000/api/v1/chat/completions"
  672. # headers = {
  673. # "Content-Type": "application/json",
  674. # "Authorization": f"Bearer {api_key}"
  675. # }
  676. # data={
  677. # "model": "",
  678. # "messages":messages,
  679. # "chatId": session_id,
  680. # "detail": True
  681. # }
  682. # #print(json.dumps(data,ensure_ascii=False))
  683. # logger.info("[CHATGPT] 请求={}".format(json.dumps(data, ensure_ascii=False)))
  684. # response = requests.post(url=api_url, headers=headers, data=json.dumps(data), timeout=600)
  685. # response.raise_for_status()
  686. # response_data = response.json()
  687. # logger.info("[CHATGPT] 响应={}".format(json.dumps(response_data, separators=(',', ':'),ensure_ascii=False)))
  688. # print(response_data)
  689. # return response_data
  690. def fast_gpt_api(messages:list,wixd:str,friend_wxid:str):
  691. c=gewe_chat.wxchat.get_wxchat_config_from_cache(wixd)
  692. api_key=c.get('agentTokenId',"sk-jr69ONIehfGKe9JFphuNk4DU5Y5wooHKHhQv7oSnFzVbwCnW65fXO9kvH")
  693. print(f'流程key:{api_key}\n~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~\n')
  694. #api_key="sk-jr69ONIehfGKe9JFphuNk4DU5Y5wooHKHhQv7oSnFzVbwCnW65fXO9kvH" #测试
  695. #api_key="sk-uJDBdKmJVb2cmfldGOvlIY6Qx0AzqWMPD3lS1IzgQYzHNOXv9SKNI" #开发2
  696. api_url = "http://106.15.182.218:3000/api/v1/chat/completions"
  697. headers = {
  698. "Content-Type": "application/json",
  699. "Authorization": f"Bearer {api_key}"
  700. }
  701. session_id=f'{wixd}-{friend_wxid}'
  702. data={
  703. "model": "",
  704. "messages":messages,
  705. "chatId": session_id,
  706. "detail": True
  707. }
  708. #print(json.dumps(data,ensure_ascii=False))
  709. logger.info("[CHATGPT] 请求={}".format(json.dumps(data, ensure_ascii=False)))
  710. response = requests.post(url=api_url, headers=headers, data=json.dumps(data), timeout=600)
  711. response.raise_for_status()
  712. response_data = response.json()
  713. logger.info("[CHATGPT] 响应={}".format(json.dumps(response_data, separators=(',', ':'),ensure_ascii=False)))
  714. #print(response_data)
  715. return response_data
  716. def get_token_id_by_app_id(app_id: str) -> str:
  717. # 使用 SCAN 避免一次性返回所有的匹配键,逐步扫描
  718. cursor = 0
  719. while True:
  720. cursor, login_keys = redis_helper.redis_helper.client.scan(cursor, match='__AI_OPS_WX__:LOGININFO:*')
  721. # 批量获取所有键的 hash 数据
  722. for k in login_keys:
  723. r = redis_helper.redis_helper.get_hash(k)
  724. if r.get("appId") == app_id:
  725. return r.get("tokenId", "")
  726. # 如果游标为 0,则表示扫描完成
  727. if cursor == 0:
  728. break
  729. return ""
  730. def get_login_info_by_app_id(app_id: str) ->dict:
  731. # 使用 SCAN 避免一次性返回所有的匹配键,逐步扫描
  732. cursor = 0
  733. while True:
  734. cursor, login_keys = redis_helper.redis_helper.client.scan(cursor, match='__AI_OPS_WX__:LOGININFO:*')
  735. # 批量获取所有键的 hash 数据
  736. for k in login_keys:
  737. r = redis_helper.redis_helper.get_hash(k)
  738. if r.get("appId") == app_id:
  739. return k,r
  740. # 如果游标为 0,则表示扫描完成
  741. if cursor == 0:
  742. break
  743. return ""
  744. def contains_url(text):
  745. # 定义检测网址的正则表达式
  746. url_pattern = re.compile(
  747. r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+'
  748. )
  749. # 检查字符串是否包含网址
  750. return bool(url_pattern.search(text))
  751. def get_first_char_if_digit(s):
  752. if s and s[0].isdigit(): # 判断字符串是否非空且首字符为数字
  753. return int(s[0]) # 返回数字形式
  754. return None # 如果不是数字则返回 None
  755. def remove_at_mention_regex(text):
  756. # 使用正则表达式去掉“在群聊中@了你”
  757. return re.sub(r"在群聊中@了你", "", text)
  758. def extract_nickname(text)->str:
  759. if "在群聊中@了你" in text:
  760. # 如果包含 "在群聊中@了你",提取其前面的名字
  761. match = re.search(r"^(.*?)在群聊中@了你", text)
  762. if match:
  763. return match.group(1).strip()
  764. elif ": @" in text:
  765. # 如果包含 ": @",提取其前面的名字
  766. return text.split(": @")[0].strip()
  767. return ''
  768. def check_chatroom(userName):
  769. pattern = r'^\d+@chatroom$'
  770. if re.match(pattern, userName):
  771. return True
  772. return False