Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

737 lines
31KB

  1. # encoding:utf-8
  2. """
  3. wechat channel
  4. """
  5. import io
  6. import json
  7. import os
  8. import threading
  9. import time
  10. import requests
  11. from bridge.context import *
  12. from bridge.reply import *
  13. from channel.chat_channel import ChatChannel
  14. from channel import chat_channel
  15. from channel.wechat.wechat_message import *
  16. from common.expired_dict import ExpiredDict
  17. from common.log import logger
  18. from common.singleton import singleton
  19. from common.time_check import time_checker
  20. from common.utils import convert_webp_to_png
  21. from config import conf, get_appdata_dir
  22. from lib import itchat
  23. from lib.itchat.content import *
  24. from urllib.parse import urlparse
  25. import threading
  26. from common import kafka_helper, redis_helper
  27. from confluent_kafka import Consumer, KafkaException
  28. import json,time,re
  29. import pickle
  30. from datetime import datetime
  31. import oss2
  32. # from common.kafka_client import KafkaClient
  33. @itchat.msg_register([TEXT, VOICE, PICTURE, NOTE, ATTACHMENT, SHARING])
  34. def handler_single_msg(msg):
  35. try:
  36. cmsg = WechatMessage(msg, False)
  37. except NotImplementedError as e:
  38. logger.debug("[WX]single message {} skipped: {}".format(msg["MsgId"], e))
  39. return None
  40. WechatChannel().handle_single(cmsg)
  41. return None
  42. @itchat.msg_register([TEXT, VOICE, PICTURE, NOTE, ATTACHMENT, SHARING], isGroupChat=True)
  43. def handler_group_msg(msg):
  44. try:
  45. cmsg = WechatMessage(msg, True)
  46. except NotImplementedError as e:
  47. logger.debug("[WX]group message {} skipped: {}".format(msg["MsgId"], e))
  48. return None
  49. WechatChannel().handle_group(cmsg)
  50. return None
  51. def _check(func):
  52. def wrapper(self, cmsg: ChatMessage):
  53. msgId = cmsg.msg_id
  54. if msgId in self.receivedMsgs:
  55. logger.info("Wechat message {} already received, ignore".format(msgId))
  56. return
  57. self.receivedMsgs[msgId] = True
  58. create_time = cmsg.create_time # 消息时间戳
  59. if conf().get("hot_reload") == True and int(create_time) < int(time.time()) - 60: # 跳过1分钟前的历史消息
  60. logger.debug("[WX]history message {} skipped".format(msgId))
  61. return
  62. if cmsg.my_msg and not cmsg.is_group:
  63. logger.debug("[WX]my message {} skipped".format(msgId))
  64. return
  65. return func(self, cmsg)
  66. return wrapper
  67. # 可用的二维码生成接口
  68. # https://api.qrserver.com/v1/create-qr-code/?size=400×400&data=https://www.abc.com
  69. # https://api.isoyu.com/qr/?m=1&e=L&p=20&url=https://www.abc.com
  70. def qrCallback(uuid, status, qrcode):
  71. # logger.debug("qrCallback: {} {}".format(uuid,status))
  72. if status == "0":
  73. try:
  74. from PIL import Image
  75. img = Image.open(io.BytesIO(qrcode))
  76. _thread = threading.Thread(target=img.show, args=("QRCode",))
  77. _thread.setDaemon(True)
  78. _thread.start()
  79. except Exception as e:
  80. pass
  81. import qrcode
  82. url = f"https://login.weixin.qq.com/l/{uuid}"
  83. qr_api1 = "https://api.isoyu.com/qr/?m=1&e=L&p=20&url={}".format(url)
  84. qr_api2 = "https://api.qrserver.com/v1/create-qr-code/?size=400×400&data={}".format(url)
  85. qr_api3 = "https://api.pwmqr.com/qrcode/create/?url={}".format(url)
  86. qr_api4 = "https://my.tv.sohu.com/user/a/wvideo/getQRCode.do?text={}".format(url)
  87. print("You can also scan QRCode in any website below:")
  88. print(qr_api3)
  89. print(qr_api4)
  90. print(qr_api2)
  91. print(qr_api1)
  92. _send_qr_code([qr_api3, qr_api4, qr_api2, qr_api1])
  93. qr = qrcode.QRCode(border=1)
  94. qr.add_data(url)
  95. qr.make(fit=True)
  96. qr.print_ascii(invert=True)
  97. @singleton
  98. class WechatChannel(ChatChannel):
  99. NOT_SUPPORT_REPLYTYPE = []
  100. def __init__(self):
  101. super().__init__()
  102. self.receivedMsgs = ExpiredDict(conf().get("expires_in_seconds", 3600))
  103. self.auto_login_times = 0
  104. def startup(self):
  105. try:
  106. itchat.instance.receivingRetryCount = 600 # 修改断线超时时间
  107. # login by scan QRCode
  108. hotReload = conf().get("hot_reload", False)
  109. status_path = os.path.join(get_appdata_dir(), "itchat","itchat.pkl")
  110. # with open(status_path, 'rb') as file:
  111. # data = pickle.load(file)
  112. # logger.info(data)
  113. itchat.auto_login(
  114. enableCmdQR=2,
  115. hotReload=hotReload,
  116. statusStorageDir=status_path,
  117. qrCallback=qrCallback,
  118. exitCallback=self.exitCallback,
  119. loginCallback=self.loginCallback
  120. )
  121. self.user_id = itchat.instance.storageClass.userName
  122. self.name = itchat.instance.storageClass.nickName
  123. logger.info("Wechat login success, user_id: {}, nickname: {}".format(self.user_id, self.name))
  124. # 创建一个线程来运行 consume_messages
  125. kafka_thread = threading.Thread(target=kafka_helper.kafka_client.consume_messages, args=(wx_messages_process_callback, self.name))
  126. kafka_thread.start()
  127. logger.info("启动kafka")
  128. # 好友定时同步
  129. agent_nickname=self.name
  130. friend_thread =threading.Thread(target=hourly_change_save_friends, args=(agent_nickname,))
  131. friend_thread.start()
  132. # 立刻同步
  133. agent_info=fetch_agent_info(agent_nickname)
  134. agent_tel=agent_info.get("agent_tel",None)
  135. # friends=itchat.get_contact(update=True)[1:]
  136. friends=itchat.get_friends(update=True)[1:]
  137. save_friends_to_redis(agent_tel,agent_nickname, friends)
  138. logger.info("启动好友同步")
  139. # start message listener
  140. logger.info("启动itchat")
  141. itchat.run()
  142. except Exception as e:
  143. logger.exception(e)
  144. def exitCallback(self):
  145. print('主动退出')
  146. try:
  147. from common.linkai_client import chat_client
  148. if chat_client.client_id and conf().get("use_linkai"):
  149. print('退出')
  150. _send_logout()
  151. time.sleep(2)
  152. self.auto_login_times += 1
  153. if self.auto_login_times < 100:
  154. chat_channel.handler_pool._shutdown = False
  155. self.startup()
  156. except Exception as e:
  157. pass
  158. def loginCallback(self):
  159. logger.debug("Login success")
  160. print('登录成功')
  161. # 同步
  162. _send_login_success()
  163. # handle_* 系列函数处理收到的消息后构造Context,然后传入produce函数中处理Context和发送回复
  164. # Context包含了消息的所有信息,包括以下属性
  165. # type 消息类型, 包括TEXT、VOICE、IMAGE_CREATE
  166. # content 消息内容,如果是TEXT类型,content就是文本内容,如果是VOICE类型,content就是语音文件名,如果是IMAGE_CREATE类型,content就是图片生成命令
  167. # kwargs 附加参数字典,包含以下的key:
  168. # session_id: 会话id
  169. # isgroup: 是否是群聊
  170. # receiver: 需要回复的对象
  171. # msg: ChatMessage消息对象
  172. # origin_ctype: 原始消息类型,语音转文字后,私聊时如果匹配前缀失败,会根据初始消息是否是语音来放宽触发规则
  173. # desire_rtype: 希望回复类型,默认是文本回复,设置为ReplyType.VOICE是语音回复
  174. @time_checker
  175. @_check
  176. def handle_single(self, cmsg: ChatMessage):
  177. # filter system message
  178. if cmsg.other_user_id in ["weixin"]:
  179. return
  180. if cmsg.ctype == ContextType.VOICE:
  181. if conf().get("speech_recognition") != True:
  182. return
  183. logger.debug("[WX]receive voice msg: {}".format(cmsg.content))
  184. elif cmsg.ctype == ContextType.IMAGE:
  185. logger.debug("[WX]receive image msg: {}".format(cmsg.content))
  186. elif cmsg.ctype == ContextType.PATPAT:
  187. logger.debug("[WX]receive patpat msg: {}".format(cmsg.content))
  188. elif cmsg.ctype == ContextType.TEXT:
  189. logger.debug("[WX]receive text msg: {}, cmsg={}".format(json.dumps(cmsg._rawmsg, ensure_ascii=False), cmsg))
  190. # content = cmsg.content # 消息内容
  191. # from_user_nickname = cmsg.from_user_nickname # 发送方昵称
  192. # to_user_nickname = cmsg.to_user_nickname # 接收方昵称
  193. # wx_content_dialogue_message=[{"type": "text", "text": content}]
  194. # message=dialogue_message(from_user_nickname,to_user_nickname,wx_content_dialogue_message)
  195. # kafka_helper.kafka_client.produce_message(message)
  196. # logger.info("发送对话 %s", json.dumps(message, ensure_ascii=False))
  197. input_content = cmsg.content
  198. input_from_user_nickname = cmsg.from_user_nickname
  199. input_to_user_nickname = cmsg.to_user_nickname
  200. input_wx_content_dialogue_message=[{"type": "text", "text": input_content}]
  201. input_message=dialogue_message(input_from_user_nickname,input_to_user_nickname,input_wx_content_dialogue_message)
  202. kafka_helper.kafka_client.produce_message(input_message)
  203. logger.info("发送对话 %s",input_message)
  204. else:
  205. logger.debug("[WX]receive msg: {}, cmsg={}".format(cmsg.content, cmsg))
  206. context = self._compose_context(cmsg.ctype, cmsg.content, isgroup=False, msg=cmsg)
  207. if context:
  208. self.produce(context)
  209. @time_checker
  210. @_check
  211. def handle_group(self, cmsg: ChatMessage):
  212. if cmsg.ctype == ContextType.VOICE:
  213. if conf().get("group_speech_recognition") != True:
  214. return
  215. logger.debug("[WX]receive voice for group msg: {}".format(cmsg.content))
  216. elif cmsg.ctype == ContextType.IMAGE:
  217. logger.debug("[WX]receive image for group msg: {}".format(cmsg.content))
  218. elif cmsg.ctype in [ContextType.JOIN_GROUP, ContextType.PATPAT, ContextType.ACCEPT_FRIEND, ContextType.EXIT_GROUP]:
  219. logger.debug("[WX]receive note msg: {}".format(cmsg.content))
  220. elif cmsg.ctype == ContextType.TEXT:
  221. # logger.debug("[WX]receive group msg: {}, cmsg={}".format(json.dumps(cmsg._rawmsg, ensure_ascii=False), cmsg))
  222. pass
  223. elif cmsg.ctype == ContextType.FILE:
  224. logger.debug(f"[WX]receive attachment msg, file_name={cmsg.content}")
  225. else:
  226. logger.debug("[WX]receive group msg: {}".format(cmsg.content))
  227. context = self._compose_context(cmsg.ctype, cmsg.content, isgroup=True, msg=cmsg, no_need_at=conf().get("no_need_at", False))
  228. if context:
  229. self.produce(context)
  230. # 统一的发送函数,每个Channel自行实现,根据reply的type字段发送不同类型的消息
  231. def send(self, reply: Reply, context: Context):
  232. receiver = context["receiver"]
  233. if reply.type == ReplyType.TEXT:
  234. itchat.send(reply.content, toUserName=receiver)
  235. logger.info("[WX] sendMsg={}, receiver={}".format(reply, receiver))
  236. # logger.info(context)
  237. # logger.info(context["msg"])
  238. # // 发送kafka
  239. # msg=context["msg"]
  240. msg: ChatMessage = context["msg"]
  241. # content=msg["content"]
  242. is_group=msg.is_group
  243. if not is_group:
  244. # print(f'响应:{content}')
  245. # 用户输入
  246. # input_content=msg.content
  247. # input_from_user_nickname=msg.from_user_nickname
  248. # input_to_user_nickname=msg.to_user_nickname
  249. # input_wx_content_dialogue_message=[{"type": "text", "text": input_content}]
  250. # input_message=dialogue_message(input_from_user_nickname,input_to_user_nickname,input_wx_content_dialogue_message)
  251. # kafka_helper.kafka_client.produce_message(input_message)
  252. # logger.info("发送对话 %s", json.dumps(input_message, separators=(',', ':'), ensure_ascii=False))
  253. # 响应用户
  254. output_content=reply.content
  255. output_from_user_nickname=msg.to_user_nickname # 回复翻转
  256. output_to_user_nickname=msg.from_user_nickname # 回复翻转
  257. output_wx_content_dialogue_message=[{"type": "text", "text": output_content}]
  258. output_message=dialogue_message(output_from_user_nickname,output_to_user_nickname,output_wx_content_dialogue_message)
  259. kafka_helper.kafka_client.produce_message(output_message)
  260. logger.info("发送对话 %s", output_message)
  261. elif reply.type == ReplyType.ERROR or reply.type == ReplyType.INFO:
  262. itchat.send(reply.content, toUserName=receiver)
  263. logger.info("[WX] sendMsg={}, receiver={}".format(reply, receiver))
  264. elif reply.type == ReplyType.VOICE:
  265. itchat.send_file(reply.content, toUserName=receiver)
  266. logger.info("[WX] sendFile={}, receiver={}".format(reply.content, receiver))
  267. elif reply.type == ReplyType.IMAGE_URL: # 从网络下载图片
  268. img_url = reply.content
  269. logger.debug(f"[WX] start download image, img_url={img_url}")
  270. pic_res = requests.get(img_url, stream=True)
  271. image_storage = io.BytesIO()
  272. size = 0
  273. for block in pic_res.iter_content(1024):
  274. size += len(block)
  275. image_storage.write(block)
  276. logger.info(f"[WX] download image success, size={size}, img_url={img_url}")
  277. image_storage.seek(0)
  278. if ".webp" in img_url:
  279. try:
  280. image_storage = convert_webp_to_png(image_storage)
  281. except Exception as e:
  282. logger.error(f"Failed to convert image: {e}")
  283. return
  284. itchat.send_image(image_storage, toUserName=receiver)
  285. logger.info("[WX] sendImage url={}, receiver={}".format(img_url, receiver))
  286. elif reply.type == ReplyType.IMAGE: # 从文件读取图片
  287. image_storage = reply.content
  288. image_storage.seek(0)
  289. itchat.send_image(image_storage, toUserName=receiver)
  290. logger.info("[WX] sendImage, receiver={}".format(receiver))
  291. elif reply.type == ReplyType.FILE: # 新增文件回复类型
  292. file_storage = reply.content
  293. itchat.send_file(file_storage, toUserName=receiver)
  294. logger.info("[WX] sendFile, receiver={}".format(receiver))
  295. # msg: ChatMessage = context["msg"]
  296. # # content=msg["content"]
  297. # is_group=msg.is_group
  298. # if not is_group:
  299. # # print(f'响应:{content}')
  300. # # 用户输入
  301. # # input_content=msg.content
  302. # # input_from_user_nickname=msg.from_user_nickname
  303. # # input_to_user_nickname=msg.to_user_nickname
  304. # # input_wx_content_dialogue_message=[{"type": "text", "text": input_content}]
  305. # # input_message=dialogue_message(input_from_user_nickname,input_to_user_nickname,input_wx_content_dialogue_message)
  306. # # kafka_helper.kafka_client.produce_message(input_message)
  307. # # logger.info("发送对话 %s", json.dumps(input_message, separators=(',', ':'), ensure_ascii=False))
  308. # # 响应用户
  309. # output_content=reply.content
  310. # output_from_user_nickname=msg.to_user_nickname # 回复翻转
  311. # output_to_user_nickname=msg.from_user_nickname # 回复翻转
  312. # output_wx_content_dialogue_message=[{"type": "file", "text": output_content}]
  313. # output_message=dialogue_message(output_from_user_nickname,output_to_user_nickname,output_wx_content_dialogue_message)
  314. # kafka_helper.kafka_client.produce_message(output_message)
  315. # logger.info("发送对话 %s", output_message)
  316. elif reply.type == ReplyType.VIDEO: # 新增视频回复类型
  317. video_storage = reply.content
  318. itchat.send_video(video_storage, toUserName=receiver)
  319. logger.info("[WX] sendFile, receiver={}".format(receiver))
  320. elif reply.type == ReplyType.VIDEO_URL: # 新增视频URL回复类型
  321. video_url = reply.content
  322. logger.debug(f"[WX] start download video, video_url={video_url}")
  323. video_res = requests.get(video_url, stream=True)
  324. video_storage = io.BytesIO()
  325. size = 0
  326. for block in video_res.iter_content(1024):
  327. size += len(block)
  328. video_storage.write(block)
  329. logger.info(f"[WX] download video success, size={size}, video_url={video_url}")
  330. video_storage.seek(0)
  331. itchat.send_video(video_storage, toUserName=receiver)
  332. logger.info("[WX] sendVideo url={}, receiver={}".format(video_url, receiver))
  333. def _send_login_success():
  334. try:
  335. from common.linkai_client import chat_client
  336. if chat_client.client_id:
  337. chat_client.send_login_success()
  338. except Exception as e:
  339. pass
  340. def _send_logout():
  341. try:
  342. from common.linkai_client import chat_client
  343. if chat_client.client_id:
  344. chat_client.send_logout()
  345. except Exception as e:
  346. pass
  347. def _send_qr_code(qrcode_list: list):
  348. try:
  349. from common.linkai_client import chat_client
  350. if chat_client.client_id:
  351. chat_client.send_qrcode(qrcode_list)
  352. except Exception as e:
  353. pass
  354. def clean_json_string(json_str):
  355. # 删除所有控制字符(非打印字符),包括换行符、回车符等
  356. return re.sub(r'[\x00-\x1f\x7f]', '', json_str)
  357. def save_friends_to_redis(agent_tel,agent_nickname, friends):
  358. contact_list = []
  359. for friend in friends:
  360. friend_data = {
  361. "UserName": friend.UserName,
  362. "NickName": friend.NickName,
  363. "Signature": friend.Signature,
  364. "Province": friend.Province,
  365. "City": friend.City,
  366. "Sex": str(friend.Sex), # 性别可转换为字符串存储
  367. "Alias": friend.Alias
  368. }
  369. contact_list.append(friend_data) # 将每个朋友的信息加入到列表中
  370. agent_contact_list = {
  371. "AgentTel":agent_tel,
  372. "agent_nick_name": agent_nickname,
  373. "contact_list": contact_list # 将朋友列表添加到字典中
  374. }
  375. # 将联系人信息保存到 Redis,使用一个合适的 key
  376. hash_key = f"__AI_OPS_WX__:CONTACTLIST"
  377. redis_helper.redis_helper.update_hash_field(hash_key, agent_tel, json.dumps(agent_contact_list, ensure_ascii=False)) # 设置有效期为 600 秒
  378. def hourly_change_save_friends(agent_nickname):
  379. last_hour = datetime.now().hour # 获取当前小时
  380. while True:
  381. current_hour = datetime.now().hour
  382. if current_hour != last_hour: # 检测小时是否变化
  383. friends=itchat.get_friends(update=True)[1:]
  384. agent_info=fetch_agent_info(agent_nickname)
  385. agent_tel=agent_info.get("agent_tel",None)
  386. save_friends_to_redis(agent_tel,agent_nickname, friends)
  387. last_hour = current_hour
  388. time.sleep(1) # 每秒检查一次
  389. def wx_messages_process_callback(user_nickname,message):
  390. """
  391. 处理消费到的 Kafka 消息(基础示例)
  392. :param message: Kafka 消费到的消息内容
  393. """
  394. # print(user_nickname)
  395. # print(f"Processing message: {message}")
  396. # return True
  397. msg_content= message
  398. cleaned_content = clean_json_string(msg_content)
  399. content=json.loads(cleaned_content)
  400. data = content.get("data", {})
  401. msg_type_data=data.get("msg_type",None)
  402. content_data = data.get("content",{})
  403. agent_nickname_data=content_data.get("agent_nickname",None)
  404. agent_tel=content_data.get("agent_tel",None)
  405. if user_nickname == agent_nickname_data and msg_type_data=='group-sending':
  406. friends=itchat.get_friends(update=True)[1:]
  407. contact_list_content_data=content_data.get("contact_list",None)
  408. # 更新好友缓存
  409. save_friends_to_redis(agent_tel,agent_nickname_data,friends)
  410. # 遍历要群发的好友
  411. for contact in contact_list_content_data:
  412. nickname = contact.get("nickname",None)
  413. if(nickname not in [friend['NickName'] for friend in friends]):
  414. logger.warning(f'微信中没有 {nickname} 的昵称,将不会发送消息')
  415. for friend in friends:
  416. if friend.get("NickName",None) == nickname:
  417. wx_content_list=content_data.get("wx_content",[])
  418. for wx_content in wx_content_list:
  419. # 处理文件
  420. if wx_content.get("type",None) == 'text':
  421. wx_content_text=wx_content.get("text",None)
  422. itchat.send(wx_content_text, toUserName=friend.get("UserName",None))
  423. logger.info(f"{user_nickname} 向 {nickname} 发送文字【 {wx_content_text} 】")
  424. # // 发送kafka
  425. wx_content_dialogue_message=[{"type": "text", "text": wx_content_text}]
  426. message=dialogue_message(agent_nickname_data,friend.get("NickName",None),wx_content_dialogue_message)
  427. kafka_helper.kafka_client.produce_message(message)
  428. logger.info("发送对话 %s",message)
  429. time.sleep(10)
  430. # 处理图片
  431. elif wx_content.get("type",None) == 'image_url':
  432. print('发送图片')
  433. image_url= wx_content.get("image_url",{})
  434. url=image_url.get("url",None)
  435. # 网络图片
  436. logger.debug(f"[WX] start download image, img_url={url}")
  437. pic_res = requests.get(url, stream=True)
  438. image_storage = io.BytesIO()
  439. size = 0
  440. for block in pic_res.iter_content(1024):
  441. size += len(block)
  442. image_storage.write(block)
  443. logger.info(f"[WX] download image success, size={size}, img_url={url}")
  444. image_storage.seek(0)
  445. if ".webp" in url:
  446. try:
  447. image_storage = convert_webp_to_png(image_storage)
  448. except Exception as e:
  449. logger.error(f"Failed to convert image: {e}")
  450. return
  451. itchat.send_image(image_storage, toUserName=friend.get("UserName",None))
  452. logger.info(f"{user_nickname} 向 {nickname} 发送图片【 {url} 】")
  453. # // 发送kafka
  454. wx_content_dialogue_message=[{"type": "image_url", "image_url": {"url": url}}]
  455. message=dialogue_message(agent_nickname_data,friend.get("NickName",None),wx_content_dialogue_message)
  456. kafka_helper.kafka_client.produce_message(message)
  457. logger.info("发送对话 %s",message)
  458. time.sleep(10)
  459. #处理文件
  460. elif wx_content.get("type",None) == 'file':
  461. print('处理文件')
  462. file_url= wx_content.get("file_url",{})
  463. url=file_url.get("url",None)
  464. # 提取路径部分
  465. parsed_url = urlparse(url).path
  466. # 获取文件名和扩展名
  467. filename = os.path.basename(parsed_url) # 文件名(包含扩展名)
  468. name, ext = os.path.splitext(filename) # 分离文件名和扩展名
  469. if ext in ['.pdf']:
  470. print('处理PDF文件')
  471. tmp_file_path=save_to_local_from_url(url)
  472. itchat.send_file(tmp_file_path, toUserName=friend.get("UserName",None))
  473. logger.info(f'删除本地{ext}文件: {tmp_file_path}')
  474. os.remove(tmp_file_path)
  475. logger.info(f"{user_nickname} 向 {nickname} 发送 {ext} 文件【 {url} 】")
  476. # // 发送kafka
  477. wx_content_dialogue_message=[{"type": "file", "file_url": {"url": url}}]
  478. message=dialogue_message(agent_nickname_data,friend.get("NickName",None),wx_content_dialogue_message)
  479. kafka_helper.kafka_client.produce_message(message)
  480. logger.info("发送对话 %s",message)
  481. time.sleep(10)
  482. elif ext in ['.mp4']:
  483. print('处理MP4文件')
  484. tmp_file_path=save_to_local_from_url(url)
  485. itchat.send_file(tmp_file_path, toUserName=friend.get("UserName",None))
  486. logger.info(f'删除本地{ext}文件: {tmp_file_path}')
  487. os.remove(tmp_file_path)
  488. logger.info(f"{user_nickname} 向 {nickname} 发送 {ext} 文件【 {url} 】")
  489. # // 发送kafka
  490. wx_content_dialogue_message=[{"type": "file", "file_url": {"url": url}}]
  491. message=dialogue_message(agent_nickname_data,friend.get("NickName",None),wx_content_dialogue_message)
  492. kafka_helper.kafka_client.produce_message(message)
  493. logger.info("发送对话 %s",message)
  494. time.sleep(10)
  495. else:
  496. logger.warning(f'暂不支持 {ext} 文件的处理')
  497. return True
  498. else:
  499. return False
  500. def dialogue_message(nickname_from,nickname_to,wx_content):
  501. """
  502. 构造消息的 JSON 数据
  503. :param contents: list,包含多个消息内容,每个内容为字典,如:
  504. [{"type": "text", "text": "AAAAAAA"},
  505. {"type": "image_url", "image_url": {"url": "https://AAAAA.jpg"}},
  506. {"type":"file","file_url":{"url":"https://AAAAA.pdf"}}
  507. ]
  508. :return: JSON 字符串
  509. """
  510. # 获取当前时间戳,精确到毫秒
  511. current_timestamp = int(time.time() * 1000)
  512. # 获取当前时间,格式化为 "YYYY-MM-DD HH:MM:SS"
  513. current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
  514. # 构造 JSON 数据
  515. data = {
  516. "messageId": str(current_timestamp),
  517. "topic": "topic.aiops.wx",
  518. "time": current_time,
  519. "data": {
  520. "msg_type": "dialogue",
  521. "content": {
  522. "nickname_from": nickname_from,
  523. "nickname_to": nickname_to,
  524. "wx_content":wx_content
  525. }
  526. }
  527. }
  528. return json.dumps(data, separators=(',', ':'), ensure_ascii=False)
  529. def fetch_agent_info(agent_nickname):
  530. if os.environ.get('environment', 'default')=='default':
  531. return {
  532. "agent_nickname": agent_nickname,
  533. "agent_tel": "19200137635"
  534. }
  535. aiops_api=conf().get("aiops_api")
  536. # 定义请求URL
  537. url = f"{aiops_api}/business/Agent/smartinfobyname"
  538. # 定义请求头
  539. headers = {
  540. "accept": "*/*",
  541. "Content-Type": "application/json"
  542. }
  543. # 定义请求数据
  544. data = {
  545. "smartName": agent_nickname
  546. }
  547. try:
  548. # 发送POST请求
  549. response = requests.post(url, headers=headers, data=json.dumps(data))
  550. # 确认响应状态码
  551. if response.status_code == 200:
  552. response_data = response.json()
  553. if response_data.get("code") == 200:
  554. # 提取 smartName 和 smartPhone
  555. data = response_data.get("data", {})
  556. return {
  557. "agent_nickname": data.get("smartName"),
  558. "agent_tel": data.get("smartPhone")
  559. }
  560. else:
  561. logger.error(f"API 返回错误信息: {response_data.get('msg')}")
  562. return None
  563. else:
  564. logger.error(f"请求失败,状态码:{response.status_code}")
  565. return None
  566. except Exception as e:
  567. logger.error(f"请求出错: {e}")
  568. return None
  569. def save_to_local_from_url(url):
  570. '''
  571. 从url保存到本地tmp目录
  572. '''
  573. parsed_url = urlparse(url)
  574. # 从 URL 提取文件名
  575. filename = os.path.basename(parsed_url.path)
  576. # tmp_dir = os.path(__file__) # 获取系统临时目录
  577. # print(tmp_dir)
  578. tmp_file_path = os.path.join(os.getcwd(),'tmp', filename) # 拼接完整路径
  579. # 检查是否存在同名文件
  580. if os.path.exists(tmp_file_path):
  581. logger.info(f"文件已存在,将覆盖:{tmp_file_path}")
  582. # 下载文件并保存到临时目录
  583. response = requests.get(url, stream=True)
  584. with open(tmp_file_path, 'wb') as f:
  585. for chunk in response.iter_content(chunk_size=1024):
  586. if chunk: # 检查是否有内容
  587. f.write(chunk)
  588. return tmp_file_path
  589. def upload_oss(access_key_id, access_key_secret, endpoint, bucket_name, local_file_path, oss_file_name, expiration_days=7):
  590. """
  591. 上传文件到阿里云OSS并设置生命周期规则,同时返回文件的公共访问地址。
  592. :param access_key_id: 阿里云AccessKey ID
  593. :param access_key_secret: 阿里云AccessKey Secret
  594. :param endpoint: OSS区域对应的Endpoint
  595. :param bucket_name: OSS中的Bucket名称
  596. :param local_file_path: 本地文件路径
  597. :param oss_file_name: OSS中的文件存储路径
  598. :param expiration_days: 文件保存天数,默认7天后删除
  599. :return: 文件的公共访问地址
  600. """
  601. # 创建Bucket实例
  602. auth = oss2.Auth(access_key_id, access_key_secret)
  603. bucket = oss2.Bucket(auth, endpoint, bucket_name)
  604. ### 1. 设置生命周期规则 ###
  605. rule_id = f'delete_after_{expiration_days}_days' # 规则ID
  606. prefix = oss_file_name.split('/')[0] + '/' # 设置规则应用的前缀为文件所在目录
  607. # 定义生命周期规则
  608. rule = oss2.models.LifecycleRule(rule_id, prefix, status=oss2.models.LifecycleRule.ENABLED,
  609. expiration=oss2.models.LifecycleExpiration(days=expiration_days))
  610. # 设置Bucket的生命周期
  611. lifecycle = oss2.models.BucketLifecycle([rule])
  612. bucket.put_bucket_lifecycle(lifecycle)
  613. print(f"已设置生命周期规则:文件将在{expiration_days}天后自动删除")
  614. ### 2. 上传文件到OSS ###
  615. bucket.put_object_from_file(oss_file_name, local_file_path)
  616. ### 3. 构建公共访问URL ###
  617. file_url = f"http://{bucket_name}.{endpoint.replace('http://', '')}/{oss_file_name}"
  618. print(f"文件上传成功,公共访问地址:{file_url}")
  619. return file_url