From 8f93da167275de97fee33a34d6dbf6c4e2621b91 Mon Sep 17 00:00:00 2001 From: H Vs Date: Thu, 10 Apr 2025 14:38:30 +0800 Subject: [PATCH] =?UTF-8?q?kafka=E6=9C=8B=E5=8F=8B=E5=9C=88=E5=A4=84?= =?UTF-8?q?=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- services/biz_service.py | 120 +++++++++++++++++++++++++++++++++++---- services/gewe_service.py | 55 +++++++++++++----- 2 files changed, 150 insertions(+), 25 deletions(-) diff --git a/services/biz_service.py b/services/biz_service.py index a8243d0..3172067 100644 --- a/services/biz_service.py +++ b/services/biz_service.py @@ -67,8 +67,6 @@ class BizService(): # pass # else: # print(f'kakfa 未处理息类型 {msg_type_data}') - - match msg_type_data: case 'login': await self.login_handler_async(content_data) @@ -76,10 +74,16 @@ class BizService(): await self.group_sending_handler_async(content_data) case 'sns-sendtext-forward': await self.sns_sendtext_forward_handler_async(content_data) + case 'sns-sendtext': + await self.sns_sendtext_handler_async(content_data) case 'sns-sendimages-forward': await self.sns_sendimages_forward_handler_async(content_data) + case 'sns-sendimages': + await self.sns_sendimages_handler_async(content_data) case 'sns-sendvideo-forward': - await self.sns_sendvideo_forward_handler_async(content_data) + await self.sns_sendvideo_forward_handler_async(content_data) + case 'sns-sendvideo': + await self.sns_sendvideo_handler_async(content_data) case _: logger.warning(f'kakfa 未处理息类型 {msg_type_data}') @@ -289,24 +293,118 @@ class BizService(): async def sns_sendtext_forward_handler_async(self,content_data): wxids=content_data.get('wxids',[]) - wx_sns_content=content_data.get("wx_sns_content","") - if not wx_sns_content: - logger.warning(f'转发文本消息为空不处理 {wx_sns_content}') + wx_sns_content_text=content_data.get("wx_sns_content",{}).get("text","") + if not wx_sns_content_text: + logger.warning(f'转发文本消息为空不处理 {wx_sns_content_text}') return if not wxids: logger.warning(f'wxids 空列表不处理 {wxids}') return - #wxids_first=wxids[0] - tasks = [] for wxid in wxids: - tasks.append(self.wxchat.send_text_sns_async(self.token_id, self.app_id, wx_sns_content)) + k,loginfo=await self.wxchat.get_login_info_by_wxid_async(wxid) + app_id=loginfo.get('appId','') + token_id=loginfo.get('tokenId','') + tasks.append(self.wxchat.send_text_sns_async(token_id, app_id, wx_sns_content_text)) await asyncio.gather(*tasks) async def sns_sendimages_forward_handler_async(self,content_data): - pass + wxids=content_data.get('wxids',[]) + wx_sns_content_text=content_data.get("wx_sns_content",{}).get("text","") + wx_sns_content_imgs=content_data.get("wx_sns_content",{}).get("imageUrls",[]) + if not wx_sns_content_imgs: + logger.warning(f'转发图片消息为空不处理 {wx_sns_content_imgs}') + return + tasks = [] + for wxid in wxids: + k,loginfo=await self.wxchat.get_login_info_by_wxid_async(wxid) + app_id=loginfo.get('appId','') + token_id=loginfo.get('tokenId','') + tasks.append(self.wxchat.upload_send_image_sns_async(token_id, app_id, wx_sns_content_text,wx_sns_content_imgs)) + + await asyncio.gather(*tasks) async def sns_sendvideo_forward_handler_async(self,content_data): - pass + wxids=content_data.get('wxids',[]) + wx_sns_content_text=content_data.get("wx_sns_content",{}).get("text","") + wx_sns_content_video_url=content_data.get("wx_sns_content",{}).get("videoUrl","") + wx_sns_content_thumb_url=content_data.get("wx_sns_content",{}).get("videoThumbUrl","") + + if not wx_sns_content_thumb_url: + logger.warning(f'转发视频缩略图消息为空不处理 {wx_sns_content_thumb_url}') + return + + if not wx_sns_content_video_url: + logger.warning(f'转发视频消息为空不处理 {wx_sns_content_video_url}') + return + + if not wx_sns_content_thumb_url: + logger.warning(f'转发视频缩略图消息为空不处理 {wx_sns_content_thumb_url}') + return + + + tasks = [] + for wxid in wxids: + k,loginfo=await self.wxchat.get_login_info_by_wxid_async(wxid) + app_id=loginfo.get('appId','') + token_id=loginfo.get('tokenId','') + tasks.append(self.wxchat.upload_send_video_sns_async(token_id, app_id,wx_sns_content_text,wx_sns_content_video_url,wx_sns_content_thumb_url)) + + await asyncio.gather(*tasks) + + async def sns_sendtext_handler_async(self,content_data): + wxid=content_data.get('wxid','') + wx_sns_content_text=content_data.get("wx_sns_content",{}).get("text","") + if not wx_sns_content_text: + logger.warning(f'转发文本消息为空不处理 {wx_sns_content_text}') + return + if not wxid: + logger.warning(f'wxid 空不处理 {wxid}') + return + + k,loginfo=await self.wxchat.get_login_info_by_wxid_async(wxid) + app_id=loginfo.get('appId','') + token_id=loginfo.get('tokenId','') + await self.wxchat.send_text_sns_async(token_id, app_id, wx_sns_content_text) + + async def sns_sendimages_handler_async(self,content_data): + wxid=content_data.get('wxid','') + wx_sns_content_text=content_data.get("wx_sns_content",{}).get("text","") + wx_sns_content_imgs=content_data.get("wx_sns_content",{}).get("imageUrls",[]) + if not wx_sns_content_imgs: + logger.warning(f'转发图片消息为空不处理 {wx_sns_content_imgs}') + return + if not wxid: + logger.warning(f'wxid 空不处理 {wxid}') + return + k,loginfo=await self.wxchat.get_login_info_by_wxid_async(wxid) + app_id=loginfo.get('appId','') + token_id=loginfo.get('tokenId','') + await self.wxchat.upload_send_image_sns_async(token_id, app_id, wx_sns_content_text,wx_sns_content_imgs) + + async def sns_sendvideo_handler_async(self,content_data): + wxid=content_data.get('wxid','') + wx_sns_content_text=content_data.get("wx_sns_content",{}).get("text","") + wx_sns_content_video_url=content_data.get("wx_sns_content",{}).get("videoUrl","") + wx_sns_content_thumb_url=content_data.get("wx_sns_content",{}).get("videoThumbUrl","") + + if not wx_sns_content_thumb_url: + logger.warning(f'转发视频缩略图消息为空不处理 {wx_sns_content_thumb_url}') + return + + if not wx_sns_content_video_url: + logger.warning(f'转发视频消息为空不处理 {wx_sns_content_video_url}') + return + + if not wx_sns_content_thumb_url: + logger.warning(f'转发视频缩略图消息为空不处理 {wx_sns_content_thumb_url}') + return + if not wxid: + logger.warning(f'wxid 空不处理 {wxid}') + return + k,loginfo=await self.wxchat.get_login_info_by_wxid_async(wxid) + app_id=loginfo.get('appId','') + token_id=loginfo.get('tokenId','') + await self.wxchat.upload_send_video_sns_async(token_id, app_id,wx_sns_content_text,wx_sns_content_video_url,wx_sns_content_thumb_url) diff --git a/services/gewe_service.py b/services/gewe_service.py index 5fa7915..03c4e0f 100644 --- a/services/gewe_service.py +++ b/services/gewe_service.py @@ -708,42 +708,56 @@ class GeWeService: async with session.post(api_url, headers=headers, json=data) as response: response_object = await response.json() return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None) - - async def send_video_sns_async(self, token_id, app_id, content: str, video_info: dict): + + async def upload_sns_image_async(self, token_id, app_id, img_urls: list): ''' - 发送视频朋友圈 + 上传朋友圈图片 ''' - api_url = f"{self.base_url}/v2/api/sns/sendVideoSns" + api_url = f"{self.base_url}/v2/api/sns/uploadSnsImage" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id, - "content": content, - "allowWxIds": [], - "atWxIds": [], - "disableWxIds": [], - "videoInfo": video_info, - "privacy": False + "imgUrls": img_urls } async with aiohttp.ClientSession() as session: async with session.post(api_url, headers=headers, json=data) as response: response_object = await response.json() return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None) - async def upload_sns_image_async(self, token_id, app_id, img_urls: list): + async def upload_send_image_sns_async(self, token_id, app_id, content, img_infos: list): ''' - 上传朋友圈图片 + 上传并发送图片朋友圈 ''' - api_url = f"{self.base_url}/v2/api/sns/uploadSnsImage" + ret, msg, data = await self.upload_sns_image_async(token_id, app_id, img_infos) + if ret != 200: + logger.warning(f'上传图片失败 {ret} {msg} {data}') + return ret, msg, data + ret, msg, data = await self.send_image_sns_async(token_id, app_id, content, data) + if ret != 200: + logger.warning(f'发送图片失败 {ret} {msg} {data}') + return ret, msg, data + + + async def send_video_sns_async(self, token_id, app_id, content: str, video_info: dict): + ''' + 发送视频朋友圈 + ''' + api_url = f"{self.base_url}/v2/api/sns/sendVideoSns" headers = { 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } data = { "appId": app_id, - "imgUrls": img_urls + "content": content, + "allowWxIds": [], + "atWxIds": [], + "disableWxIds": [], + "videoInfo": video_info, + "privacy": False } async with aiohttp.ClientSession() as session: async with session.post(api_url, headers=headers, json=data) as response: @@ -769,6 +783,19 @@ class GeWeService: response_object = await response.json() return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None) + async def upload_send_video_sns_async(self, token_id, app_id, content: str, video_url: str, video_thumb_url: str): + ''' + 上传并发送视频朋友圈 + ''' + ret, msg, data = self.upload_sns_video_async(token_id, app_id, video_url, video_thumb_url) + if ret != 200: + logger.warning(f'上传视频失败 {ret} {msg} {data}') + return ret, msg, data + ret,msg,data=self.send_video_sns_async(token_id, app_id, content, data) + if ret != 200: + logger.warning(f'发送视频失败 {ret} {msg} {data}') + return ret,msg,data + async def get_sns_list_async(self, token_id, app_id, sns_id: str): api_url = f"{self.base_url}/v2/api/sns/snsList" headers = {