Ви не можете вибрати більше 25 тем Теми мають розпочинатися з літери або цифри, можуть містити дефіси (-) і не повинні перевищувати 35 символів.

3 місяці тому
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747
  1. # encoding:utf-8
  2. """
  3. wechat channel
  4. """
  5. import io
  6. import json
  7. import os
  8. import threading
  9. import time
  10. import requests
  11. from bridge.context import *
  12. from bridge.reply import *
  13. from channel.chat_channel import ChatChannel
  14. from channel import chat_channel
  15. from channel.wechat.wechat_message import *
  16. from common.expired_dict import ExpiredDict
  17. from common.log import logger
  18. from common.singleton import singleton
  19. from common.time_check import time_checker
  20. from common.utils import convert_webp_to_png
  21. from config import conf, get_appdata_dir
  22. # from lib import itchat
  23. # from lib.itchat.content import *
  24. # import itchat
  25. # from itchat.content import *
  26. from urllib.parse import urlparse
  27. import asyncio
  28. import threading
  29. from common import kafka_helper, redis_helper
  30. from confluent_kafka import Consumer, KafkaException
  31. import json,time,re
  32. import pickle
  33. from datetime import datetime
  34. import oss2
  35. import random
  36. # from common.kafka_client import KafkaClient
  37. # @itchat.msg_register([TEXT, VOICE, PICTURE, NOTE, ATTACHMENT, SHARING])
  38. # def handler_single_msg(msg):
  39. # try:
  40. # cmsg = WechatMessage(msg, False)
  41. # except NotImplementedError as e:
  42. # logger.debug("[WX]single message {} skipped: {}".format(msg["MsgId"], e))
  43. # return None
  44. # WechatChannel().handle_single(cmsg)
  45. # return None
  46. # @itchat.msg_register([TEXT, VOICE, PICTURE, NOTE, ATTACHMENT, SHARING], isGroupChat=True)
  47. # def handler_group_msg(msg):
  48. # try:
  49. # cmsg = WechatMessage(msg, True)
  50. # except NotImplementedError as e:
  51. # logger.debug("[WX]group message {} skipped: {}".format(msg["MsgId"], e))
  52. # return None
  53. # WechatChannel().handle_group(cmsg)
  54. # return None
  55. # def _check(func):
  56. # def wrapper(self, cmsg: ChatMessage):
  57. # msgId = cmsg.msg_id
  58. # if msgId in self.receivedMsgs:
  59. # logger.info("Wechat message {} already received, ignore".format(msgId))
  60. # return
  61. # self.receivedMsgs[msgId] = True
  62. # create_time = cmsg.create_time # 消息时间戳
  63. # if conf().get("hot_reload") == True and int(create_time) < int(time.time()) - 60: # 跳过1分钟前的历史消息
  64. # logger.debug("[WX]history message {} skipped".format(msgId))
  65. # return
  66. # if cmsg.my_msg and not cmsg.is_group:
  67. # logger.debug("[WX]my message {} skipped".format(msgId))
  68. # return
  69. # return func(self, cmsg)
  70. # return wrapper
  71. # # 可用的二维码生成接口
  72. # # https://api.qrserver.com/v1/create-qr-code/?size=400×400&data=https://www.abc.com
  73. # # https://api.isoyu.com/qr/?m=1&e=L&p=20&url=https://www.abc.com
  74. # def qrCallback(uuid, status, qrcode):
  75. # # logger.debug("qrCallback: {} {}".format(uuid,status))
  76. # if status == "0":
  77. # try:
  78. # from PIL import Image
  79. # img = Image.open(io.BytesIO(qrcode))
  80. # _thread = threading.Thread(target=img.show, args=("QRCode",))
  81. # _thread.setDaemon(True)
  82. # _thread.start()
  83. # except Exception as e:
  84. # pass
  85. # import qrcode
  86. # url = f"https://login.weixin.qq.com/l/{uuid}"
  87. # qr_api1 = "https://api.isoyu.com/qr/?m=1&e=L&p=20&url={}".format(url)
  88. # qr_api2 = "https://api.qrserver.com/v1/create-qr-code/?size=400×400&data={}".format(url)
  89. # qr_api3 = "https://api.pwmqr.com/qrcode/create/?url={}".format(url)
  90. # qr_api4 = "https://my.tv.sohu.com/user/a/wvideo/getQRCode.do?text={}".format(url)
  91. # print("You can also scan QRCode in any website below:")
  92. # print(qr_api3)
  93. # print(qr_api4)
  94. # print(qr_api2)
  95. # print(qr_api1)
  96. # _send_qr_code([qr_api3, qr_api4, qr_api2, qr_api1])
  97. # qr = qrcode.QRCode(border=1)
  98. # qr.add_data(url)
  99. # qr.make(fit=True)
  100. # qr.print_ascii(invert=True)
  101. @singleton
  102. class WechatChannel(ChatChannel):
  103. NOT_SUPPORT_REPLYTYPE = []
  104. def __init__(self):
  105. super().__init__()
  106. self.receivedMsgs = ExpiredDict(conf().get("expires_in_seconds", 3600))
  107. self.auto_login_times = 0
  108. def startup(self):
  109. try:
  110. print('启动')
  111. # itchat.instance.receivingRetryCount = 600 # 修改断线超时时间
  112. # # login by scan QRCode
  113. # hotReload = conf().get("hot_reload", False)
  114. # status_path = os.path.join(get_appdata_dir(), "itchat","itchat.pkl")
  115. # # with open(status_path, 'rb') as file:
  116. # # data = pickle.load(file)
  117. # # logger.info(data)
  118. # itchat.auto_login(
  119. # enableCmdQR=2,
  120. # hotReload=hotReload,
  121. # statusStorageDir=status_path,
  122. # qrCallback=qrCallback,
  123. # exitCallback=self.exitCallback,
  124. # loginCallback=self.loginCallback
  125. # )
  126. # self.user_id = itchat.instance.storageClass.userName
  127. # self.name = itchat.instance.storageClass.nickName
  128. # logger.info("Wechat login success, user_id: {}, nickname: {}".format(self.user_id, self.name))
  129. # # 创建一个线程来运行 consume_messages
  130. # kafka_thread = threading.Thread(target=kafka_helper.kafka_client.consume_messages, args=(wx_messages_process_callback, self.name))
  131. # kafka_thread.start()
  132. # logger.info("启动kafka")
  133. # # 好友定时同步
  134. # agent_nickname=self.name
  135. # friend_thread =threading.Thread(target=ten_mins_change_save_friends, args=(agent_nickname,))
  136. # friend_thread.start()
  137. # # 立刻同步
  138. # agent_info=fetch_agent_info(agent_nickname)
  139. # agent_tel=agent_info.get("agent_tel",None)
  140. # # friends=itchat.get_contact(update=True)[1:]
  141. # friends=itchat.get_friends(update=True)[1:]
  142. # save_friends_to_redis(agent_tel,agent_nickname, friends)
  143. # logger.info("启动好友同步")
  144. # # start message listener
  145. # logger.info("启动itchat")
  146. # itchat.run()
  147. except Exception as e:
  148. logger.exception(e)
  149. # def exitCallback(self):
  150. # print('主动退出')
  151. # try:
  152. # from common.linkai_client import chat_client
  153. # if chat_client.client_id and conf().get("use_linkai"):
  154. # print('退出')
  155. # _send_logout()
  156. # time.sleep(2)
  157. # self.auto_login_times += 1
  158. # if self.auto_login_times < 100:
  159. # chat_channel.handler_pool._shutdown = False
  160. # self.startup()
  161. # except Exception as e:
  162. # pass
  163. # def loginCallback(self):
  164. # logger.debug("Login success")
  165. # print('登录成功')
  166. # # 同步
  167. # _send_login_success()
  168. # # handle_* 系列函数处理收到的消息后构造Context,然后传入produce函数中处理Context和发送回复
  169. # # Context包含了消息的所有信息,包括以下属性
  170. # # type 消息类型, 包括TEXT、VOICE、IMAGE_CREATE
  171. # # content 消息内容,如果是TEXT类型,content就是文本内容,如果是VOICE类型,content就是语音文件名,如果是IMAGE_CREATE类型,content就是图片生成命令
  172. # # kwargs 附加参数字典,包含以下的key:
  173. # # session_id: 会话id
  174. # # isgroup: 是否是群聊
  175. # # receiver: 需要回复的对象
  176. # # msg: ChatMessage消息对象
  177. # # origin_ctype: 原始消息类型,语音转文字后,私聊时如果匹配前缀失败,会根据初始消息是否是语音来放宽触发规则
  178. # # desire_rtype: 希望回复类型,默认是文本回复,设置为ReplyType.VOICE是语音回复
  179. # @time_checker
  180. # @_check
  181. # def handle_single(self, cmsg: ChatMessage):
  182. # # filter system message
  183. # if cmsg.other_user_id in ["weixin"]:
  184. # return
  185. # if cmsg.ctype == ContextType.VOICE:
  186. # if conf().get("speech_recognition") != True:
  187. # return
  188. # logger.debug("[WX]receive voice msg: {}".format(cmsg.content))
  189. # elif cmsg.ctype == ContextType.IMAGE:
  190. # logger.debug("[WX]receive image msg: {}".format(cmsg.content))
  191. # elif cmsg.ctype == ContextType.PATPAT:
  192. # logger.debug("[WX]receive patpat msg: {}".format(cmsg.content))
  193. # elif cmsg.ctype == ContextType.TEXT:
  194. # logger.debug("[WX]receive text msg: {}, cmsg={}".format(json.dumps(cmsg._rawmsg, ensure_ascii=False), cmsg))
  195. # # content = cmsg.content # 消息内容
  196. # # from_user_nickname = cmsg.from_user_nickname # 发送方昵称
  197. # # to_user_nickname = cmsg.to_user_nickname # 接收方昵称
  198. # # wx_content_dialogue_message=[{"type": "text", "text": content}]
  199. # # message=dialogue_message(from_user_nickname,to_user_nickname,wx_content_dialogue_message)
  200. # # kafka_helper.kafka_client.produce_message(message)
  201. # # logger.info("发送对话 %s", json.dumps(message, ensure_ascii=False))
  202. # input_content = cmsg.content
  203. # input_from_user_nickname = cmsg.from_user_nickname
  204. # input_to_user_nickname = cmsg.to_user_nickname
  205. # input_wx_content_dialogue_message=[{"type": "text", "text": input_content}]
  206. # input_message=dialogue_message(input_from_user_nickname,input_to_user_nickname,input_wx_content_dialogue_message)
  207. # kafka_helper.kafka_client.produce_message(input_message)
  208. # logger.info("发送对话 %s",input_message)
  209. # else:
  210. # logger.debug("[WX]receive msg: {}, cmsg={}".format(cmsg.content, cmsg))
  211. # context = self._compose_context(cmsg.ctype, cmsg.content, isgroup=False, msg=cmsg)
  212. # if context:
  213. # self.produce(context)
  214. # @time_checker
  215. # @_check
  216. # def handle_group(self, cmsg: ChatMessage):
  217. # if cmsg.ctype == ContextType.VOICE:
  218. # if conf().get("group_speech_recognition") != True:
  219. # return
  220. # logger.debug("[WX]receive voice for group msg: {}".format(cmsg.content))
  221. # elif cmsg.ctype == ContextType.IMAGE:
  222. # logger.debug("[WX]receive image for group msg: {}".format(cmsg.content))
  223. # elif cmsg.ctype in [ContextType.JOIN_GROUP, ContextType.PATPAT, ContextType.ACCEPT_FRIEND, ContextType.EXIT_GROUP]:
  224. # logger.debug("[WX]receive note msg: {}".format(cmsg.content))
  225. # elif cmsg.ctype == ContextType.TEXT:
  226. # # logger.debug("[WX]receive group msg: {}, cmsg={}".format(json.dumps(cmsg._rawmsg, ensure_ascii=False), cmsg))
  227. # pass
  228. # elif cmsg.ctype == ContextType.FILE:
  229. # logger.debug(f"[WX]receive attachment msg, file_name={cmsg.content}")
  230. # else:
  231. # logger.debug("[WX]receive group msg: {}".format(cmsg.content))
  232. # context = self._compose_context(cmsg.ctype, cmsg.content, isgroup=True, msg=cmsg, no_need_at=conf().get("no_need_at", False))
  233. # if context:
  234. # self.produce(context)
  235. # # 统一的发送函数,每个Channel自行实现,根据reply的type字段发送不同类型的消息
  236. # def send(self, reply: Reply, context: Context):
  237. receiver = context["receiver"]
  238. if reply.type == ReplyType.TEXT:
  239. sent_res=itchat.send(reply.content, toUserName=receiver)
  240. logger.info("[WX] sendMsg={}, receiver={} {}".format(reply, receiver,sent_res.get('BaseResponse',{}).get('RawMsg')))
  241. msg: ChatMessage = context["msg"]
  242. is_group=msg.is_group
  243. if not is_group:
  244. # 响应用户
  245. output_content=reply.content
  246. output_from_user_nickname=msg.to_user_nickname # 回复翻转
  247. output_to_user_nickname=msg.from_user_nickname # 回复翻转
  248. output_wx_content_dialogue_message=[{"type": "text", "text": output_content}]
  249. output_message=dialogue_message(output_from_user_nickname,output_to_user_nickname,output_wx_content_dialogue_message)
  250. kafka_helper.kafka_client.produce_message(output_message)
  251. logger.info("发送对话 %s", output_message)
  252. elif reply.type == ReplyType.ERROR or reply.type == ReplyType.INFO:
  253. itchat.send(reply.content, toUserName=receiver)
  254. logger.info("[WX] sendMsg={}, receiver={}".format(reply, receiver))
  255. elif reply.type == ReplyType.VOICE:
  256. itchat.send_file(reply.content, toUserName=receiver)
  257. logger.info("[WX] sendFile={}, receiver={}".format(reply.content, receiver))
  258. elif reply.type == ReplyType.IMAGE_URL: # 从网络下载图片
  259. img_url = reply.content
  260. logger.debug(f"[WX] start download image, img_url={img_url}")
  261. pic_res = requests.get(img_url, stream=True)
  262. image_storage = io.BytesIO()
  263. size = 0
  264. for block in pic_res.iter_content(1024):
  265. size += len(block)
  266. image_storage.write(block)
  267. logger.info(f"[WX] download image success, size={size}, img_url={img_url}")
  268. image_storage.seek(0)
  269. if ".webp" in img_url:
  270. try:
  271. image_storage = convert_webp_to_png(image_storage)
  272. except Exception as e:
  273. logger.error(f"Failed to convert image: {e}")
  274. return
  275. itchat.send_image(image_storage, toUserName=receiver)
  276. logger.info("[WX] sendImage url={}, receiver={}".format(img_url, receiver))
  277. elif reply.type == ReplyType.IMAGE: # 从文件读取图片
  278. image_storage = reply.content
  279. image_storage.seek(0)
  280. itchat.send_image(image_storage, toUserName=receiver)
  281. logger.info("[WX] sendImage, receiver={}".format(receiver))
  282. elif reply.type == ReplyType.FILE: # 新增文件回复类型
  283. file_storage = reply.content
  284. itchat.send_file(file_storage, toUserName=receiver)
  285. logger.info("[WX] sendFile, receiver={}".format(receiver))
  286. # msg: ChatMessage = context["msg"]
  287. # # content=msg["content"]
  288. # is_group=msg.is_group
  289. # if not is_group:
  290. # # print(f'响应:{content}')
  291. # # 用户输入
  292. # # input_content=msg.content
  293. # # input_from_user_nickname=msg.from_user_nickname
  294. # # input_to_user_nickname=msg.to_user_nickname
  295. # # input_wx_content_dialogue_message=[{"type": "text", "text": input_content}]
  296. # # input_message=dialogue_message(input_from_user_nickname,input_to_user_nickname,input_wx_content_dialogue_message)
  297. # # kafka_helper.kafka_client.produce_message(input_message)
  298. # # logger.info("发送对话 %s", json.dumps(input_message, separators=(',', ':'), ensure_ascii=False))
  299. # # 响应用户
  300. # output_content=reply.content
  301. # output_from_user_nickname=msg.to_user_nickname # 回复翻转
  302. # output_to_user_nickname=msg.from_user_nickname # 回复翻转
  303. # output_wx_content_dialogue_message=[{"type": "file", "text": output_content}]
  304. # output_message=dialogue_message(output_from_user_nickname,output_to_user_nickname,output_wx_content_dialogue_message)
  305. # kafka_helper.kafka_client.produce_message(output_message)
  306. # logger.info("发送对话 %s", output_message)
  307. elif reply.type == ReplyType.VIDEO: # 新增视频回复类型
  308. video_storage = reply.content
  309. itchat.send_video(video_storage, toUserName=receiver)
  310. logger.info("[WX] sendFile, receiver={}".format(receiver))
  311. elif reply.type == ReplyType.VIDEO_URL: # 新增视频URL回复类型
  312. video_url = reply.content
  313. logger.debug(f"[WX] start download video, video_url={video_url}")
  314. video_res = requests.get(video_url, stream=True)
  315. video_storage = io.BytesIO()
  316. size = 0
  317. for block in video_res.iter_content(1024):
  318. size += len(block)
  319. video_storage.write(block)
  320. logger.info(f"[WX] download video success, size={size}, video_url={video_url}")
  321. video_storage.seek(0)
  322. itchat.send_video(video_storage, toUserName=receiver)
  323. logger.info("[WX] sendVideo url={}, receiver={}".format(video_url, receiver))
  324. # def _send_login_success():
  325. # try:
  326. # from common.linkai_client import chat_client
  327. # if chat_client.client_id:
  328. # chat_client.send_login_success()
  329. # except Exception as e:
  330. # pass
  331. # def _send_logout():
  332. # try:
  333. # from common.linkai_client import chat_client
  334. # if chat_client.client_id:
  335. # chat_client.send_logout()
  336. # except Exception as e:
  337. # pass
  338. # def _send_qr_code(qrcode_list: list):
  339. # try:
  340. # from common.linkai_client import chat_client
  341. # if chat_client.client_id:
  342. # chat_client.send_qrcode(qrcode_list)
  343. # except Exception as e:
  344. # pass
  345. # def clean_json_string(json_str):
  346. # # 删除所有控制字符(非打印字符),包括换行符、回车符等
  347. # return re.sub(r'[\x00-\x1f\x7f]', '', json_str)
  348. # def save_friends_to_redis(agent_tel,agent_nickname, friends):
  349. # contact_list = []
  350. # for friend in friends:
  351. # friend_data = {
  352. # "UserName": friend.UserName,
  353. # "NickName": friend.NickName,
  354. # "Signature": friend.Signature,
  355. # "Province": friend.Province,
  356. # "City": friend.City,
  357. # "Sex": str(friend.Sex), # 性别可转换为字符串存储
  358. # "Alias": friend.Alias
  359. # }
  360. # contact_list.append(friend_data) # 将每个朋友的信息加入到列表中
  361. # agent_contact_list = {
  362. # "AgentTel":agent_tel,
  363. # "agent_nick_name": agent_nickname,
  364. # "contact_list": contact_list # 将朋友列表添加到字典中
  365. # }
  366. # # 将联系人信息保存到 Redis,使用一个合适的 key
  367. # hash_key = f"__AI_OPS_WX__:CONTACTLIST"
  368. # redis_helper.redis_helper.update_hash_field(hash_key, agent_tel, json.dumps(agent_contact_list, ensure_ascii=False)) # 设置有效期为 600 秒
  369. # def hourly_change_save_friends(agent_nickname):
  370. # last_hour = datetime.now().hour # 获取当前小时
  371. # while True:
  372. # current_hour = datetime.now().hour
  373. # if current_hour != last_hour: # 检测小时是否变化
  374. # friends=itchat.get_friends(update=True)[1:]
  375. # agent_info=fetch_agent_info(agent_nickname)
  376. # agent_tel=agent_info.get("agent_tel",None)
  377. # save_friends_to_redis(agent_tel,agent_nickname, friends)
  378. # last_hour = current_hour
  379. # time.sleep(1) # 每秒检查一次
  380. # def ten_mins_change_save_friends(agent_nickname):
  381. # last_check_minute = datetime.now().minute # 获取当前分钟
  382. # while True:
  383. # current_minute = datetime.now().minute
  384. # if current_minute % 10 == 0 and current_minute != last_check_minute: # 检测每10分钟变化
  385. # friends = itchat.get_friends(update=True)[1:]
  386. # agent_info = fetch_agent_info(agent_nickname)
  387. # agent_tel = agent_info.get("agent_tel", None)
  388. # save_friends_to_redis(agent_tel, agent_nickname, friends)
  389. # last_check_minute = current_minute # 更新最后检查的分钟
  390. # time.sleep(60) # 每分钟检查一次
  391. # def wx_messages_process_callback(user_nickname,message):
  392. # """
  393. # 处理消费到的 Kafka 消息(基础示例)
  394. # :param message: Kafka 消费到的消息内容
  395. # """
  396. # # print(user_nickname)
  397. # # print(f"Processing message: {message}")
  398. # # return True
  399. # msg_content= message
  400. # cleaned_content = clean_json_string(msg_content)
  401. # content=json.loads(cleaned_content)
  402. # data = content.get("data", {})
  403. # msg_type_data=data.get("msg_type",None)
  404. # content_data = data.get("content",{})
  405. # agent_nickname_data=content_data.get("agent_nickname",None)
  406. # agent_tel=content_data.get("agent_tel",None)
  407. # if user_nickname == agent_nickname_data and msg_type_data=='group-sending':
  408. # friends=itchat.get_friends(update=True)[1:]
  409. # contact_list_content_data=content_data.get("contact_list",None)
  410. # # 更新好友缓存
  411. # save_friends_to_redis(agent_tel,agent_nickname_data,friends)
  412. # # 遍历要群发的好友
  413. # for contact in contact_list_content_data:
  414. # nickname = contact.get("nickname",None)
  415. # if(nickname not in [friend['NickName'] for friend in friends]):
  416. # logger.warning(f'微信中没有 {nickname} 的昵称,将不会发送消息')
  417. # for friend in friends:
  418. # if friend.get("NickName",None) == nickname:
  419. # wx_content_list=content_data.get("wx_content",[])
  420. # for wx_content in wx_content_list:
  421. # # 处理文字
  422. # if wx_content.get("type",None) == 'text':
  423. # wx_content_text=wx_content.get("text",None)
  424. # sent_res=itchat.send(wx_content_text, toUserName=friend.get("UserName",None))
  425. # logger.info(f"{user_nickname} 向 {nickname} 发送文字【 {wx_content_text} 】 {sent_res.get('BaseResponse',{}).get('RawMsg')}")
  426. # # // 发送kafka
  427. # wx_content_dialogue_message=[{"type": "text", "text": wx_content_text}]
  428. # message=dialogue_message(agent_nickname_data,friend.get("NickName",None),wx_content_dialogue_message)
  429. # kafka_helper.kafka_client.produce_message(message)
  430. # logger.info("发送对话 %s",message)
  431. # # 等待随机时间
  432. # time.sleep(random.uniform(5, 15))
  433. # # 处理图片
  434. # elif wx_content.get("type",None) == 'image_url':
  435. # print('发送图片')
  436. # image_url= wx_content.get("image_url",{})
  437. # url=image_url.get("url",None)
  438. # # 网络图片
  439. # logger.debug(f"[WX] start download image, img_url={url}")
  440. # pic_res = requests.get(url, stream=True)
  441. # image_storage = io.BytesIO()
  442. # size = 0
  443. # for block in pic_res.iter_content(1024):
  444. # size += len(block)
  445. # image_storage.write(block)
  446. # logger.info(f"[WX] download image success, size={size}, img_url={url}")
  447. # image_storage.seek(0)
  448. # if ".webp" in url:
  449. # try:
  450. # image_storage = convert_webp_to_png(image_storage)
  451. # except Exception as e:
  452. # logger.error(f"Failed to convert image: {e}")
  453. # return
  454. # sent_res=itchat.send_image(image_storage, toUserName=friend.get("UserName",None))
  455. # logger.info(f"{user_nickname} 向 {nickname} 发送图片【 {url} 】{sent_res.get('BaseResponse',{}).get('RawMsg')}")
  456. # # // 发送kafka
  457. # wx_content_dialogue_message=[{"type": "image_url", "image_url": {"url": url}}]
  458. # message=dialogue_message(agent_nickname_data,friend.get("NickName",None),wx_content_dialogue_message)
  459. # kafka_helper.kafka_client.produce_message(message)
  460. # logger.info("发送对话 %s",message)
  461. # # 等待随机时间
  462. # time.sleep(random.uniform(5, 15))
  463. # #处理文件
  464. # elif wx_content.get("type",None) == 'file':
  465. # print('处理文件')
  466. # file_url= wx_content.get("file_url",{})
  467. # url=file_url.get("url",None)
  468. # # 提取路径部分
  469. # parsed_url = urlparse(url).path
  470. # # 获取文件名和扩展名
  471. # filename = os.path.basename(parsed_url) # 文件名(包含扩展名)
  472. # name, ext = os.path.splitext(filename) # 分离文件名和扩展名
  473. # if ext in ['.pdf']:
  474. # print('处理PDF文件')
  475. # tmp_file_path=save_to_local_from_url(url)
  476. # sent_res=itchat.send_file(tmp_file_path, toUserName=friend.get("UserName",None))
  477. # logger.info(f'删除本地{ext}文件: {tmp_file_path}')
  478. # os.remove(tmp_file_path)
  479. # logger.info(f"{user_nickname} 向 {nickname} 发送 {ext} 文件【 {url} 】{sent_res.get('BaseResponse',{}).get('RawMsg')}")
  480. # # // 发送kafka
  481. # wx_content_dialogue_message=[{"type": "file", "file_url": {"url": url}}]
  482. # message=dialogue_message(agent_nickname_data,friend.get("NickName",None),wx_content_dialogue_message)
  483. # kafka_helper.kafka_client.produce_message(message)
  484. # logger.info("发送对话 %s",message)
  485. # # 等待随机时间
  486. # time.sleep(random.uniform(5, 15))
  487. # elif ext in ['.mp4']:
  488. # print('处理MP4文件')
  489. # tmp_file_path=save_to_local_from_url(url)
  490. # itchat.send_file(tmp_file_path, toUserName=friend.get("UserName",None))
  491. # logger.info(f'删除本地{ext}文件: {tmp_file_path}')
  492. # os.remove(tmp_file_path)
  493. # logger.info(f"{user_nickname} 向 {nickname} 发送 {ext} 文件【 {url} 】")
  494. # # // 发送kafka
  495. # wx_content_dialogue_message=[{"type": "file", "file_url": {"url": url}}]
  496. # message=dialogue_message(agent_nickname_data,friend.get("NickName",None),wx_content_dialogue_message)
  497. # kafka_helper.kafka_client.produce_message(message)
  498. # logger.info("发送对话 %s",message)
  499. # # 等待随机时间
  500. # time.sleep(random.uniform(5, 15))
  501. # else:
  502. # logger.warning(f'暂不支持 {ext} 文件的处理')
  503. # return True
  504. # else:
  505. # return False
  506. # def dialogue_message(nickname_from,nickname_to,wx_content):
  507. # """
  508. # 构造消息的 JSON 数据
  509. # :param contents: list,包含多个消息内容,每个内容为字典,如:
  510. # [{"type": "text", "text": "AAAAAAA"},
  511. # {"type": "image_url", "image_url": {"url": "https://AAAAA.jpg"}},
  512. # {"type":"file","file_url":{"url":"https://AAAAA.pdf"}}
  513. # ]
  514. # :return: JSON 字符串
  515. # """
  516. # # 获取当前时间戳,精确到毫秒
  517. # current_timestamp = int(time.time() * 1000)
  518. # # 获取当前时间,格式化为 "YYYY-MM-DD HH:MM:SS"
  519. # current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
  520. # # 构造 JSON 数据
  521. # data = {
  522. # "messageId": str(current_timestamp),
  523. # "topic": "topic.aiops.wx",
  524. # "time": current_time,
  525. # "data": {
  526. # "msg_type": "dialogue",
  527. # "content": {
  528. # "nickname_from": nickname_from,
  529. # "nickname_to": nickname_to,
  530. # "wx_content":wx_content
  531. # }
  532. # }
  533. # }
  534. # return json.dumps(data, separators=(',', ':'), ensure_ascii=False)
  535. # def fetch_agent_info(agent_nickname):
  536. # if os.environ.get('environment', 'default')=='default':
  537. # return {
  538. # "agent_nickname": agent_nickname,
  539. # "agent_tel": "19200137635"
  540. # }
  541. # aiops_api=conf().get("aiops_api")
  542. # # 定义请求URL
  543. # url = f"{aiops_api}/business/Agent/smartinfobyname"
  544. # # 定义请求头
  545. # headers = {
  546. # "accept": "*/*",
  547. # "Content-Type": "application/json"
  548. # }
  549. # # 定义请求数据
  550. # data = {
  551. # "smartName": agent_nickname
  552. # }
  553. # try:
  554. # # 发送POST请求
  555. # response = requests.post(url, headers=headers, data=json.dumps(data))
  556. # # 确认响应状态码
  557. # if response.status_code == 200:
  558. # response_data = response.json()
  559. # if response_data.get("code") == 200:
  560. # # 提取 smartName 和 smartPhone
  561. # data = response_data.get("data", {})
  562. # return {
  563. # "agent_nickname": data.get("smartName"),
  564. # "agent_tel": data.get("smartPhone")
  565. # }
  566. # else:
  567. # logger.error(f"API 返回错误信息: {response_data.get('msg')}")
  568. # return None
  569. # else:
  570. # logger.error(f"请求失败,状态码:{response.status_code}")
  571. # return None
  572. # except Exception as e:
  573. # logger.error(f"请求出错: {e}")
  574. # return None
  575. # def save_to_local_from_url(url):
  576. # '''
  577. # 从url保存到本地tmp目录
  578. # '''
  579. # parsed_url = urlparse(url)
  580. # # 从 URL 提取文件名
  581. # filename = os.path.basename(parsed_url.path)
  582. # # tmp_dir = os.path(__file__) # 获取系统临时目录
  583. # # print(tmp_dir)
  584. # tmp_file_path = os.path.join(os.getcwd(),'tmp', filename) # 拼接完整路径
  585. # # 检查是否存在同名文件
  586. # if os.path.exists(tmp_file_path):
  587. # logger.info(f"文件已存在,将覆盖:{tmp_file_path}")
  588. # # 下载文件并保存到临时目录
  589. # response = requests.get(url, stream=True)
  590. # with open(tmp_file_path, 'wb') as f:
  591. # for chunk in response.iter_content(chunk_size=1024):
  592. # if chunk: # 检查是否有内容
  593. # f.write(chunk)
  594. # return tmp_file_path
  595. # def upload_oss(access_key_id, access_key_secret, endpoint, bucket_name, local_file_path, oss_file_name, expiration_days=7):
  596. # """
  597. # 上传文件到阿里云OSS并设置生命周期规则,同时返回文件的公共访问地址。
  598. # :param access_key_id: 阿里云AccessKey ID
  599. # :param access_key_secret: 阿里云AccessKey Secret
  600. # :param endpoint: OSS区域对应的Endpoint
  601. # :param bucket_name: OSS中的Bucket名称
  602. # :param local_file_path: 本地文件路径
  603. # :param oss_file_name: OSS中的文件存储路径
  604. # :param expiration_days: 文件保存天数,默认7天后删除
  605. # :return: 文件的公共访问地址
  606. # """
  607. # # 创建Bucket实例
  608. # auth = oss2.Auth(access_key_id, access_key_secret)
  609. # bucket = oss2.Bucket(auth, endpoint, bucket_name)
  610. # ### 1. 设置生命周期规则 ###
  611. # rule_id = f'delete_after_{expiration_days}_days' # 规则ID
  612. # prefix = oss_file_name.split('/')[0] + '/' # 设置规则应用的前缀为文件所在目录
  613. # # 定义生命周期规则
  614. # rule = oss2.models.LifecycleRule(rule_id, prefix, status=oss2.models.LifecycleRule.ENABLED,
  615. # expiration=oss2.models.LifecycleExpiration(days=expiration_days))
  616. # # 设置Bucket的生命周期
  617. # lifecycle = oss2.models.BucketLifecycle([rule])
  618. # bucket.put_bucket_lifecycle(lifecycle)
  619. # print(f"已设置生命周期规则:文件将在{expiration_days}天后自动删除")
  620. # ### 2. 上传文件到OSS ###
  621. # bucket.put_object_from_file(oss_file_name, local_file_path)
  622. # ### 3. 构建公共访问URL ###
  623. # file_url = f"http://{bucket_name}.{endpoint.replace('http://', '')}/{oss_file_name}"
  624. # print(f"文件上传成功,公共访问地址:{file_url}")
  625. # return file_url