|
- # encoding:utf-8
-
- """
- wechat channel
- """
-
- import io
- import json
- import os
- import threading
- import time
- import requests
-
- from bridge.context import *
- from bridge.reply import *
- from channel.chat_channel import ChatChannel
- from channel import chat_channel
- from channel.wechat.wechat_message import *
- from common.expired_dict import ExpiredDict
- from common.log import logger
- from common.singleton import singleton
- from common.time_check import time_checker
- from common.utils import convert_webp_to_png
- from config import conf, get_appdata_dir
- from lib import itchat
- from lib.itchat.content import *
- from urllib.parse import urlparse
-
- import asyncio
- import threading
-
- from common import kafka_helper, redis_helper
-
- from confluent_kafka import Consumer, KafkaException
- import json,time,re
- import pickle
- from datetime import datetime
- import oss2
- import random
-
-
-
- # from common.kafka_client import KafkaClient
-
-
- @itchat.msg_register([TEXT, VOICE, PICTURE, NOTE, ATTACHMENT, SHARING])
- def handler_single_msg(msg):
- try:
- cmsg = WechatMessage(msg, False)
- except NotImplementedError as e:
- logger.debug("[WX]single message {} skipped: {}".format(msg["MsgId"], e))
- return None
- WechatChannel().handle_single(cmsg)
- return None
-
-
- @itchat.msg_register([TEXT, VOICE, PICTURE, NOTE, ATTACHMENT, SHARING], isGroupChat=True)
- def handler_group_msg(msg):
- try:
- cmsg = WechatMessage(msg, True)
- except NotImplementedError as e:
- logger.debug("[WX]group message {} skipped: {}".format(msg["MsgId"], e))
- return None
- WechatChannel().handle_group(cmsg)
- return None
-
-
- def _check(func):
- def wrapper(self, cmsg: ChatMessage):
- msgId = cmsg.msg_id
- if msgId in self.receivedMsgs:
- logger.info("Wechat message {} already received, ignore".format(msgId))
- return
- self.receivedMsgs[msgId] = True
- create_time = cmsg.create_time # 消息时间戳
- if conf().get("hot_reload") == True and int(create_time) < int(time.time()) - 60: # 跳过1分钟前的历史消息
- logger.debug("[WX]history message {} skipped".format(msgId))
- return
- if cmsg.my_msg and not cmsg.is_group:
- logger.debug("[WX]my message {} skipped".format(msgId))
- return
- return func(self, cmsg)
-
- return wrapper
-
-
- # 可用的二维码生成接口
- # https://api.qrserver.com/v1/create-qr-code/?size=400×400&data=https://www.abc.com
- # https://api.isoyu.com/qr/?m=1&e=L&p=20&url=https://www.abc.com
- def qrCallback(uuid, status, qrcode):
- # logger.debug("qrCallback: {} {}".format(uuid,status))
- if status == "0":
- try:
- from PIL import Image
-
- img = Image.open(io.BytesIO(qrcode))
- _thread = threading.Thread(target=img.show, args=("QRCode",))
- _thread.setDaemon(True)
- _thread.start()
- except Exception as e:
- pass
-
- import qrcode
-
- url = f"https://login.weixin.qq.com/l/{uuid}"
-
- qr_api1 = "https://api.isoyu.com/qr/?m=1&e=L&p=20&url={}".format(url)
- qr_api2 = "https://api.qrserver.com/v1/create-qr-code/?size=400×400&data={}".format(url)
- qr_api3 = "https://api.pwmqr.com/qrcode/create/?url={}".format(url)
- qr_api4 = "https://my.tv.sohu.com/user/a/wvideo/getQRCode.do?text={}".format(url)
- print("You can also scan QRCode in any website below:")
- print(qr_api3)
- print(qr_api4)
- print(qr_api2)
- print(qr_api1)
- _send_qr_code([qr_api3, qr_api4, qr_api2, qr_api1])
- qr = qrcode.QRCode(border=1)
- qr.add_data(url)
- qr.make(fit=True)
- qr.print_ascii(invert=True)
-
-
- @singleton
- class WechatChannel(ChatChannel):
- NOT_SUPPORT_REPLYTYPE = []
-
- def __init__(self):
- super().__init__()
- self.receivedMsgs = ExpiredDict(conf().get("expires_in_seconds", 3600))
- self.auto_login_times = 0
-
- def startup(self):
- try:
- itchat.instance.receivingRetryCount = 600 # 修改断线超时时间
- # login by scan QRCode
- hotReload = conf().get("hot_reload", False)
- status_path = os.path.join(get_appdata_dir(), "itchat","itchat.pkl")
- # with open(status_path, 'rb') as file:
- # data = pickle.load(file)
- # logger.info(data)
- itchat.auto_login(
- enableCmdQR=2,
- hotReload=hotReload,
- statusStorageDir=status_path,
- qrCallback=qrCallback,
- exitCallback=self.exitCallback,
- loginCallback=self.loginCallback
- )
- self.user_id = itchat.instance.storageClass.userName
- self.name = itchat.instance.storageClass.nickName
- logger.info("Wechat login success, user_id: {}, nickname: {}".format(self.user_id, self.name))
-
- # 创建一个线程来运行 consume_messages
- kafka_thread = threading.Thread(target=kafka_helper.kafka_client.consume_messages, args=(wx_messages_process_callback, self.name))
-
- kafka_thread.start()
- logger.info("启动kafka")
-
- # 好友定时同步
- agent_nickname=self.name
- friend_thread =threading.Thread(target=ten_mins_change_save_friends, args=(agent_nickname,))
- friend_thread.start()
-
- # 立刻同步
- agent_info=fetch_agent_info(agent_nickname)
- agent_tel=agent_info.get("agent_tel",None)
- # friends=itchat.get_contact(update=True)[1:]
- friends=itchat.get_friends(update=True)[1:]
- save_friends_to_redis(agent_tel,agent_nickname, friends)
- logger.info("启动好友同步")
- # start message listener
-
- logger.info("启动itchat")
- itchat.run()
-
- except Exception as e:
- logger.exception(e)
-
- def exitCallback(self):
- print('主动退出')
- try:
- from common.linkai_client import chat_client
- if chat_client.client_id and conf().get("use_linkai"):
- print('退出')
- _send_logout()
- time.sleep(2)
- self.auto_login_times += 1
- if self.auto_login_times < 100:
- chat_channel.handler_pool._shutdown = False
- self.startup()
- except Exception as e:
- pass
-
- def loginCallback(self):
- logger.debug("Login success")
- print('登录成功')
- # 同步
- _send_login_success()
-
- # handle_* 系列函数处理收到的消息后构造Context,然后传入produce函数中处理Context和发送回复
- # Context包含了消息的所有信息,包括以下属性
- # type 消息类型, 包括TEXT、VOICE、IMAGE_CREATE
- # content 消息内容,如果是TEXT类型,content就是文本内容,如果是VOICE类型,content就是语音文件名,如果是IMAGE_CREATE类型,content就是图片生成命令
- # kwargs 附加参数字典,包含以下的key:
- # session_id: 会话id
- # isgroup: 是否是群聊
- # receiver: 需要回复的对象
- # msg: ChatMessage消息对象
- # origin_ctype: 原始消息类型,语音转文字后,私聊时如果匹配前缀失败,会根据初始消息是否是语音来放宽触发规则
- # desire_rtype: 希望回复类型,默认是文本回复,设置为ReplyType.VOICE是语音回复
- @time_checker
- @_check
- def handle_single(self, cmsg: ChatMessage):
- # filter system message
- if cmsg.other_user_id in ["weixin"]:
- return
- if cmsg.ctype == ContextType.VOICE:
- if conf().get("speech_recognition") != True:
- return
- logger.debug("[WX]receive voice msg: {}".format(cmsg.content))
- elif cmsg.ctype == ContextType.IMAGE:
- logger.debug("[WX]receive image msg: {}".format(cmsg.content))
- elif cmsg.ctype == ContextType.PATPAT:
- logger.debug("[WX]receive patpat msg: {}".format(cmsg.content))
- elif cmsg.ctype == ContextType.TEXT:
- logger.debug("[WX]receive text msg: {}, cmsg={}".format(json.dumps(cmsg._rawmsg, ensure_ascii=False), cmsg))
- # content = cmsg.content # 消息内容
- # from_user_nickname = cmsg.from_user_nickname # 发送方昵称
- # to_user_nickname = cmsg.to_user_nickname # 接收方昵称
-
- # wx_content_dialogue_message=[{"type": "text", "text": content}]
- # message=dialogue_message(from_user_nickname,to_user_nickname,wx_content_dialogue_message)
- # kafka_helper.kafka_client.produce_message(message)
- # logger.info("发送对话 %s", json.dumps(message, ensure_ascii=False))
-
- input_content = cmsg.content
- input_from_user_nickname = cmsg.from_user_nickname
- input_to_user_nickname = cmsg.to_user_nickname
-
- input_wx_content_dialogue_message=[{"type": "text", "text": input_content}]
- input_message=dialogue_message(input_from_user_nickname,input_to_user_nickname,input_wx_content_dialogue_message)
- kafka_helper.kafka_client.produce_message(input_message)
- logger.info("发送对话 %s",input_message)
-
- else:
- logger.debug("[WX]receive msg: {}, cmsg={}".format(cmsg.content, cmsg))
- context = self._compose_context(cmsg.ctype, cmsg.content, isgroup=False, msg=cmsg)
- if context:
- self.produce(context)
-
- @time_checker
- @_check
- def handle_group(self, cmsg: ChatMessage):
- if cmsg.ctype == ContextType.VOICE:
- if conf().get("group_speech_recognition") != True:
- return
- logger.debug("[WX]receive voice for group msg: {}".format(cmsg.content))
- elif cmsg.ctype == ContextType.IMAGE:
- logger.debug("[WX]receive image for group msg: {}".format(cmsg.content))
- elif cmsg.ctype in [ContextType.JOIN_GROUP, ContextType.PATPAT, ContextType.ACCEPT_FRIEND, ContextType.EXIT_GROUP]:
- logger.debug("[WX]receive note msg: {}".format(cmsg.content))
- elif cmsg.ctype == ContextType.TEXT:
- # logger.debug("[WX]receive group msg: {}, cmsg={}".format(json.dumps(cmsg._rawmsg, ensure_ascii=False), cmsg))
- pass
- elif cmsg.ctype == ContextType.FILE:
- logger.debug(f"[WX]receive attachment msg, file_name={cmsg.content}")
- else:
- logger.debug("[WX]receive group msg: {}".format(cmsg.content))
- context = self._compose_context(cmsg.ctype, cmsg.content, isgroup=True, msg=cmsg, no_need_at=conf().get("no_need_at", False))
- if context:
- self.produce(context)
-
- # 统一的发送函数,每个Channel自行实现,根据reply的type字段发送不同类型的消息
- def send(self, reply: Reply, context: Context):
- receiver = context["receiver"]
- if reply.type == ReplyType.TEXT:
- sent_res=itchat.send(reply.content, toUserName=receiver)
- logger.info("[WX] sendMsg={}, receiver={} {}".format(reply, receiver,sent_res.get('BaseResponse',{}).get('RawMsg')))
- msg: ChatMessage = context["msg"]
-
- is_group=msg.is_group
- if not is_group:
- # 响应用户
- output_content=reply.content
- output_from_user_nickname=msg.to_user_nickname # 回复翻转
- output_to_user_nickname=msg.from_user_nickname # 回复翻转
-
- output_wx_content_dialogue_message=[{"type": "text", "text": output_content}]
- output_message=dialogue_message(output_from_user_nickname,output_to_user_nickname,output_wx_content_dialogue_message)
- kafka_helper.kafka_client.produce_message(output_message)
- logger.info("发送对话 %s", output_message)
- elif reply.type == ReplyType.ERROR or reply.type == ReplyType.INFO:
- itchat.send(reply.content, toUserName=receiver)
- logger.info("[WX] sendMsg={}, receiver={}".format(reply, receiver))
- elif reply.type == ReplyType.VOICE:
- itchat.send_file(reply.content, toUserName=receiver)
- logger.info("[WX] sendFile={}, receiver={}".format(reply.content, receiver))
- elif reply.type == ReplyType.IMAGE_URL: # 从网络下载图片
- img_url = reply.content
- logger.debug(f"[WX] start download image, img_url={img_url}")
- pic_res = requests.get(img_url, stream=True)
- image_storage = io.BytesIO()
- size = 0
- for block in pic_res.iter_content(1024):
- size += len(block)
- image_storage.write(block)
- logger.info(f"[WX] download image success, size={size}, img_url={img_url}")
- image_storage.seek(0)
- if ".webp" in img_url:
- try:
- image_storage = convert_webp_to_png(image_storage)
- except Exception as e:
- logger.error(f"Failed to convert image: {e}")
- return
- itchat.send_image(image_storage, toUserName=receiver)
- logger.info("[WX] sendImage url={}, receiver={}".format(img_url, receiver))
- elif reply.type == ReplyType.IMAGE: # 从文件读取图片
- image_storage = reply.content
- image_storage.seek(0)
- itchat.send_image(image_storage, toUserName=receiver)
- logger.info("[WX] sendImage, receiver={}".format(receiver))
- elif reply.type == ReplyType.FILE: # 新增文件回复类型
- file_storage = reply.content
- itchat.send_file(file_storage, toUserName=receiver)
- logger.info("[WX] sendFile, receiver={}".format(receiver))
-
-
- # msg: ChatMessage = context["msg"]
- # # content=msg["content"]
-
-
- # is_group=msg.is_group
- # if not is_group:
- # # print(f'响应:{content}')
- # # 用户输入
- # # input_content=msg.content
- # # input_from_user_nickname=msg.from_user_nickname
- # # input_to_user_nickname=msg.to_user_nickname
-
- # # input_wx_content_dialogue_message=[{"type": "text", "text": input_content}]
- # # input_message=dialogue_message(input_from_user_nickname,input_to_user_nickname,input_wx_content_dialogue_message)
- # # kafka_helper.kafka_client.produce_message(input_message)
- # # logger.info("发送对话 %s", json.dumps(input_message, separators=(',', ':'), ensure_ascii=False))
-
- # # 响应用户
- # output_content=reply.content
- # output_from_user_nickname=msg.to_user_nickname # 回复翻转
- # output_to_user_nickname=msg.from_user_nickname # 回复翻转
-
- # output_wx_content_dialogue_message=[{"type": "file", "text": output_content}]
- # output_message=dialogue_message(output_from_user_nickname,output_to_user_nickname,output_wx_content_dialogue_message)
- # kafka_helper.kafka_client.produce_message(output_message)
- # logger.info("发送对话 %s", output_message)
-
-
- elif reply.type == ReplyType.VIDEO: # 新增视频回复类型
- video_storage = reply.content
- itchat.send_video(video_storage, toUserName=receiver)
- logger.info("[WX] sendFile, receiver={}".format(receiver))
- elif reply.type == ReplyType.VIDEO_URL: # 新增视频URL回复类型
- video_url = reply.content
- logger.debug(f"[WX] start download video, video_url={video_url}")
- video_res = requests.get(video_url, stream=True)
- video_storage = io.BytesIO()
- size = 0
- for block in video_res.iter_content(1024):
- size += len(block)
- video_storage.write(block)
- logger.info(f"[WX] download video success, size={size}, video_url={video_url}")
- video_storage.seek(0)
- itchat.send_video(video_storage, toUserName=receiver)
- logger.info("[WX] sendVideo url={}, receiver={}".format(video_url, receiver))
-
- def _send_login_success():
- try:
- from common.linkai_client import chat_client
- if chat_client.client_id:
- chat_client.send_login_success()
-
- except Exception as e:
- pass
-
-
- def _send_logout():
- try:
- from common.linkai_client import chat_client
- if chat_client.client_id:
- chat_client.send_logout()
- except Exception as e:
- pass
-
-
- def _send_qr_code(qrcode_list: list):
- try:
- from common.linkai_client import chat_client
- if chat_client.client_id:
- chat_client.send_qrcode(qrcode_list)
- except Exception as e:
- pass
-
-
- def clean_json_string(json_str):
-
- # 删除所有控制字符(非打印字符),包括换行符、回车符等
- return re.sub(r'[\x00-\x1f\x7f]', '', json_str)
-
- def save_friends_to_redis(agent_tel,agent_nickname, friends):
- contact_list = []
- for friend in friends:
- friend_data = {
- "UserName": friend.UserName,
- "NickName": friend.NickName,
- "Signature": friend.Signature,
- "Province": friend.Province,
- "City": friend.City,
- "Sex": str(friend.Sex), # 性别可转换为字符串存储
- "Alias": friend.Alias
- }
- contact_list.append(friend_data) # 将每个朋友的信息加入到列表中
-
- agent_contact_list = {
- "AgentTel":agent_tel,
- "agent_nick_name": agent_nickname,
- "contact_list": contact_list # 将朋友列表添加到字典中
- }
-
- # 将联系人信息保存到 Redis,使用一个合适的 key
- hash_key = f"__AI_OPS_WX__:CONTACTLIST"
- redis_helper.redis_helper.update_hash_field(hash_key, agent_tel, json.dumps(agent_contact_list, ensure_ascii=False)) # 设置有效期为 600 秒
-
- def hourly_change_save_friends(agent_nickname):
- last_hour = datetime.now().hour # 获取当前小时
- while True:
- current_hour = datetime.now().hour
- if current_hour != last_hour: # 检测小时是否变化
- friends=itchat.get_friends(update=True)[1:]
-
- agent_info=fetch_agent_info(agent_nickname)
- agent_tel=agent_info.get("agent_tel",None)
- save_friends_to_redis(agent_tel,agent_nickname, friends)
- last_hour = current_hour
- time.sleep(1) # 每秒检查一次
-
- def ten_mins_change_save_friends(agent_nickname):
- last_check_minute = datetime.now().minute # 获取当前分钟
- while True:
- current_minute = datetime.now().minute
- if current_minute % 10 == 0 and current_minute != last_check_minute: # 检测每10分钟变化
- friends = itchat.get_friends(update=True)[1:]
-
- agent_info = fetch_agent_info(agent_nickname)
- agent_tel = agent_info.get("agent_tel", None)
- save_friends_to_redis(agent_tel, agent_nickname, friends)
- last_check_minute = current_minute # 更新最后检查的分钟
-
- time.sleep(60) # 每分钟检查一次
-
- def wx_messages_process_callback(user_nickname,message):
- """
- 处理消费到的 Kafka 消息(基础示例)
- :param message: Kafka 消费到的消息内容
- """
- # print(user_nickname)
- # print(f"Processing message: {message}")
- # return True
-
- msg_content= message
- cleaned_content = clean_json_string(msg_content)
- content=json.loads(cleaned_content)
- data = content.get("data", {})
- msg_type_data=data.get("msg_type",None)
- content_data = data.get("content",{})
- agent_nickname_data=content_data.get("agent_nickname",None)
- agent_tel=content_data.get("agent_tel",None)
-
- if user_nickname == agent_nickname_data and msg_type_data=='group-sending':
- friends=itchat.get_friends(update=True)[1:]
- contact_list_content_data=content_data.get("contact_list",None)
-
- # 更新好友缓存
- save_friends_to_redis(agent_tel,agent_nickname_data,friends)
-
- # 遍历要群发的好友
- for contact in contact_list_content_data:
- nickname = contact.get("nickname",None)
- if(nickname not in [friend['NickName'] for friend in friends]):
- logger.warning(f'微信中没有 {nickname} 的昵称,将不会发送消息')
-
- for friend in friends:
- if friend.get("NickName",None) == nickname:
- wx_content_list=content_data.get("wx_content",[])
- for wx_content in wx_content_list:
- # 处理文字
- if wx_content.get("type",None) == 'text':
- wx_content_text=wx_content.get("text",None)
- sent_res=itchat.send(wx_content_text, toUserName=friend.get("UserName",None))
-
- logger.info(f"{user_nickname} 向 {nickname} 发送文字【 {wx_content_text} 】 {sent_res.get('BaseResponse',{}).get('RawMsg')}")
-
- # // 发送kafka
- wx_content_dialogue_message=[{"type": "text", "text": wx_content_text}]
- message=dialogue_message(agent_nickname_data,friend.get("NickName",None),wx_content_dialogue_message)
- kafka_helper.kafka_client.produce_message(message)
- logger.info("发送对话 %s",message)
- # 等待随机时间
- time.sleep(random.uniform(5, 15))
- # 处理图片
- elif wx_content.get("type",None) == 'image_url':
- print('发送图片')
- image_url= wx_content.get("image_url",{})
- url=image_url.get("url",None)
-
- # 网络图片
- logger.debug(f"[WX] start download image, img_url={url}")
- pic_res = requests.get(url, stream=True)
- image_storage = io.BytesIO()
- size = 0
- for block in pic_res.iter_content(1024):
- size += len(block)
- image_storage.write(block)
- logger.info(f"[WX] download image success, size={size}, img_url={url}")
- image_storage.seek(0)
- if ".webp" in url:
- try:
- image_storage = convert_webp_to_png(image_storage)
- except Exception as e:
- logger.error(f"Failed to convert image: {e}")
- return
-
- sent_res=itchat.send_image(image_storage, toUserName=friend.get("UserName",None))
- logger.info(f"{user_nickname} 向 {nickname} 发送图片【 {url} 】{sent_res.get('BaseResponse',{}).get('RawMsg')}")
-
- # // 发送kafka
- wx_content_dialogue_message=[{"type": "image_url", "image_url": {"url": url}}]
- message=dialogue_message(agent_nickname_data,friend.get("NickName",None),wx_content_dialogue_message)
- kafka_helper.kafka_client.produce_message(message)
- logger.info("发送对话 %s",message)
- # 等待随机时间
- time.sleep(random.uniform(5, 15))
- #处理文件
- elif wx_content.get("type",None) == 'file':
- print('处理文件')
- file_url= wx_content.get("file_url",{})
- url=file_url.get("url",None)
-
- # 提取路径部分
- parsed_url = urlparse(url).path
-
- # 获取文件名和扩展名
- filename = os.path.basename(parsed_url) # 文件名(包含扩展名)
- name, ext = os.path.splitext(filename) # 分离文件名和扩展名
- if ext in ['.pdf']:
- print('处理PDF文件')
-
- tmp_file_path=save_to_local_from_url(url)
-
- sent_res=itchat.send_file(tmp_file_path, toUserName=friend.get("UserName",None))
- logger.info(f'删除本地{ext}文件: {tmp_file_path}')
- os.remove(tmp_file_path)
- logger.info(f"{user_nickname} 向 {nickname} 发送 {ext} 文件【 {url} 】{sent_res.get('BaseResponse',{}).get('RawMsg')}")
- # // 发送kafka
- wx_content_dialogue_message=[{"type": "file", "file_url": {"url": url}}]
- message=dialogue_message(agent_nickname_data,friend.get("NickName",None),wx_content_dialogue_message)
- kafka_helper.kafka_client.produce_message(message)
- logger.info("发送对话 %s",message)
- # 等待随机时间
- time.sleep(random.uniform(5, 15))
-
- elif ext in ['.mp4']:
-
- print('处理MP4文件')
- tmp_file_path=save_to_local_from_url(url)
- itchat.send_file(tmp_file_path, toUserName=friend.get("UserName",None))
- logger.info(f'删除本地{ext}文件: {tmp_file_path}')
- os.remove(tmp_file_path)
- logger.info(f"{user_nickname} 向 {nickname} 发送 {ext} 文件【 {url} 】")
- # // 发送kafka
- wx_content_dialogue_message=[{"type": "file", "file_url": {"url": url}}]
- message=dialogue_message(agent_nickname_data,friend.get("NickName",None),wx_content_dialogue_message)
- kafka_helper.kafka_client.produce_message(message)
- logger.info("发送对话 %s",message)
- # 等待随机时间
- time.sleep(random.uniform(5, 15))
-
- else:
- logger.warning(f'暂不支持 {ext} 文件的处理')
-
- return True
- else:
- return False
-
-
-
-
- def dialogue_message(nickname_from,nickname_to,wx_content):
- """
- 构造消息的 JSON 数据
- :param contents: list,包含多个消息内容,每个内容为字典,如:
- [{"type": "text", "text": "AAAAAAA"},
- {"type": "image_url", "image_url": {"url": "https://AAAAA.jpg"}},
- {"type":"file","file_url":{"url":"https://AAAAA.pdf"}}
- ]
- :return: JSON 字符串
- """
-
- # 获取当前时间戳,精确到毫秒
- current_timestamp = int(time.time() * 1000)
-
- # 获取当前时间,格式化为 "YYYY-MM-DD HH:MM:SS"
- current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
-
- # 构造 JSON 数据
- data = {
- "messageId": str(current_timestamp),
- "topic": "topic.aiops.wx",
- "time": current_time,
- "data": {
- "msg_type": "dialogue",
- "content": {
- "nickname_from": nickname_from,
- "nickname_to": nickname_to,
- "wx_content":wx_content
- }
- }
- }
-
- return json.dumps(data, separators=(',', ':'), ensure_ascii=False)
-
- def fetch_agent_info(agent_nickname):
-
- if os.environ.get('environment', 'default')=='default':
- return {
- "agent_nickname": agent_nickname,
- "agent_tel": "19200137635"
- }
-
- aiops_api=conf().get("aiops_api")
- # 定义请求URL
- url = f"{aiops_api}/business/Agent/smartinfobyname"
-
- # 定义请求头
- headers = {
- "accept": "*/*",
- "Content-Type": "application/json"
- }
-
- # 定义请求数据
- data = {
- "smartName": agent_nickname
- }
-
- try:
- # 发送POST请求
- response = requests.post(url, headers=headers, data=json.dumps(data))
-
- # 确认响应状态码
- if response.status_code == 200:
- response_data = response.json()
- if response_data.get("code") == 200:
- # 提取 smartName 和 smartPhone
- data = response_data.get("data", {})
- return {
- "agent_nickname": data.get("smartName"),
- "agent_tel": data.get("smartPhone")
- }
- else:
- logger.error(f"API 返回错误信息: {response_data.get('msg')}")
- return None
- else:
- logger.error(f"请求失败,状态码:{response.status_code}")
- return None
- except Exception as e:
- logger.error(f"请求出错: {e}")
- return None
-
-
- def save_to_local_from_url(url):
- '''
- 从url保存到本地tmp目录
- '''
-
- parsed_url = urlparse(url)
- # 从 URL 提取文件名
- filename = os.path.basename(parsed_url.path)
- # tmp_dir = os.path(__file__) # 获取系统临时目录
- # print(tmp_dir)
- tmp_file_path = os.path.join(os.getcwd(),'tmp', filename) # 拼接完整路径
-
- # 检查是否存在同名文件
- if os.path.exists(tmp_file_path):
- logger.info(f"文件已存在,将覆盖:{tmp_file_path}")
-
- # 下载文件并保存到临时目录
- response = requests.get(url, stream=True)
- with open(tmp_file_path, 'wb') as f:
- for chunk in response.iter_content(chunk_size=1024):
- if chunk: # 检查是否有内容
- f.write(chunk)
-
- return tmp_file_path
-
- def upload_oss(access_key_id, access_key_secret, endpoint, bucket_name, local_file_path, oss_file_name, expiration_days=7):
- """
- 上传文件到阿里云OSS并设置生命周期规则,同时返回文件的公共访问地址。
-
- :param access_key_id: 阿里云AccessKey ID
- :param access_key_secret: 阿里云AccessKey Secret
- :param endpoint: OSS区域对应的Endpoint
- :param bucket_name: OSS中的Bucket名称
- :param local_file_path: 本地文件路径
- :param oss_file_name: OSS中的文件存储路径
- :param expiration_days: 文件保存天数,默认7天后删除
- :return: 文件的公共访问地址
- """
-
- # 创建Bucket实例
- auth = oss2.Auth(access_key_id, access_key_secret)
- bucket = oss2.Bucket(auth, endpoint, bucket_name)
-
- ### 1. 设置生命周期规则 ###
- rule_id = f'delete_after_{expiration_days}_days' # 规则ID
- prefix = oss_file_name.split('/')[0] + '/' # 设置规则应用的前缀为文件所在目录
-
- # 定义生命周期规则
- rule = oss2.models.LifecycleRule(rule_id, prefix, status=oss2.models.LifecycleRule.ENABLED,
- expiration=oss2.models.LifecycleExpiration(days=expiration_days))
-
- # 设置Bucket的生命周期
- lifecycle = oss2.models.BucketLifecycle([rule])
- bucket.put_bucket_lifecycle(lifecycle)
-
- print(f"已设置生命周期规则:文件将在{expiration_days}天后自动删除")
-
- ### 2. 上传文件到OSS ###
- bucket.put_object_from_file(oss_file_name, local_file_path)
-
- ### 3. 构建公共访问URL ###
- file_url = f"http://{bucket_name}.{endpoint.replace('http://', '')}/{oss_file_name}"
-
- print(f"文件上传成功,公共访问地址:{file_url}")
-
- return file_url
|