@@ -20,6 +20,7 @@ from common.expired_dict import ExpiredDict | |||||
from common.log import logger | from common.log import logger | ||||
from common.singleton import singleton | from common.singleton import singleton | ||||
from common.time_check import time_checker | from common.time_check import time_checker | ||||
from common.utils import convert_webp_to_png | |||||
from config import conf, get_appdata_dir | from config import conf, get_appdata_dir | ||||
from lib import itchat | from lib import itchat | ||||
from lib.itchat.content import * | from lib.itchat.content import * | ||||
@@ -228,9 +229,9 @@ class WechatChannel(ChatChannel): | |||||
image_storage.write(block) | image_storage.write(block) | ||||
logger.info(f"[WX] download image success, size={size}, img_url={img_url}") | logger.info(f"[WX] download image success, size={size}, img_url={img_url}") | ||||
image_storage.seek(0) | image_storage.seek(0) | ||||
if img_url.endswith(".webp"): | |||||
if ".webp" in img_url: | |||||
try: | try: | ||||
image_storage = _convert_webp_to_png(image_storage) | |||||
image_storage = convert_webp_to_png(image_storage) | |||||
except Exception as e: | except Exception as e: | ||||
logger.error(f"Failed to convert image: {e}") | logger.error(f"Failed to convert image: {e}") | ||||
return | return | ||||
@@ -289,16 +290,3 @@ def _send_qr_code(qrcode_list: list): | |||||
except Exception as e: | except Exception as e: | ||||
pass | pass | ||||
def _convert_webp_to_png(webp_image): | |||||
from PIL import Image | |||||
try: | |||||
webp_image.seek(0) | |||||
img = Image.open(webp_image).convert("RGBA") | |||||
png_image = io.BytesIO() | |||||
img.save(png_image, format="PNG") | |||||
png_image.seek(0) | |||||
return png_image | |||||
except Exception as e: | |||||
logger.error(f"Failed to convert WEBP to PNG: {e}") | |||||
raise |
@@ -17,7 +17,7 @@ from channel.wechatcom.wechatcomapp_client import WechatComAppClient | |||||
from channel.wechatcom.wechatcomapp_message import WechatComAppMessage | from channel.wechatcom.wechatcomapp_message import WechatComAppMessage | ||||
from common.log import logger | from common.log import logger | ||||
from common.singleton import singleton | from common.singleton import singleton | ||||
from common.utils import compress_imgfile, fsize, split_string_by_utf8_length | |||||
from common.utils import compress_imgfile, fsize, split_string_by_utf8_length, convert_webp_to_png | |||||
from config import conf, subscribe_msg | from config import conf, subscribe_msg | ||||
from voice.audio_convert import any_to_amr, split_audio | from voice.audio_convert import any_to_amr, split_audio | ||||
@@ -99,6 +99,12 @@ class WechatComAppChannel(ChatChannel): | |||||
image_storage = compress_imgfile(image_storage, 10 * 1024 * 1024 - 1) | image_storage = compress_imgfile(image_storage, 10 * 1024 * 1024 - 1) | ||||
logger.info("[wechatcom] image compressed, sz={}".format(fsize(image_storage))) | logger.info("[wechatcom] image compressed, sz={}".format(fsize(image_storage))) | ||||
image_storage.seek(0) | image_storage.seek(0) | ||||
if ".webp" in img_url: | |||||
try: | |||||
image_storage = convert_webp_to_png(image_storage) | |||||
except Exception as e: | |||||
logger.error(f"Failed to convert image: {e}") | |||||
return | |||||
try: | try: | ||||
response = self.client.media.upload("image", image_storage) | response = self.client.media.upload("image", image_storage) | ||||
logger.debug("[wechatcom] upload image response: {}".format(response)) | logger.debug("[wechatcom] upload image response: {}".format(response)) | ||||
@@ -2,7 +2,7 @@ import io | |||||
import os | import os | ||||
from urllib.parse import urlparse | from urllib.parse import urlparse | ||||
from PIL import Image | from PIL import Image | ||||
from common.log import logger | |||||
def fsize(file): | def fsize(file): | ||||
if isinstance(file, io.BytesIO): | if isinstance(file, io.BytesIO): | ||||
@@ -54,3 +54,17 @@ def split_string_by_utf8_length(string, max_length, max_split=0): | |||||
def get_path_suffix(path): | def get_path_suffix(path): | ||||
path = urlparse(path).path | path = urlparse(path).path | ||||
return os.path.splitext(path)[-1].lstrip('.') | return os.path.splitext(path)[-1].lstrip('.') | ||||
def convert_webp_to_png(webp_image): | |||||
from PIL import Image | |||||
try: | |||||
webp_image.seek(0) | |||||
img = Image.open(webp_image).convert("RGBA") | |||||
png_image = io.BytesIO() | |||||
img.save(png_image, format="PNG") | |||||
png_image.seek(0) | |||||
return png_image | |||||
except Exception as e: | |||||
logger.error(f"Failed to convert WEBP to PNG: {e}") | |||||
raise |