Sfoglia il codice sorgente

kafka朋友圈处理

1257
H Vs 2 settimane fa
parent
commit
8f93da1672
2 ha cambiato i file con 150 aggiunte e 25 eliminazioni
  1. +109
    -11
      services/biz_service.py
  2. +41
    -14
      services/gewe_service.py

+ 109
- 11
services/biz_service.py Vedi File

@@ -67,8 +67,6 @@ class BizService():
# pass
# else:
# print(f'kakfa 未处理息类型 {msg_type_data}')
match msg_type_data:
case 'login':
await self.login_handler_async(content_data)
@@ -76,10 +74,16 @@ class BizService():
await self.group_sending_handler_async(content_data)
case 'sns-sendtext-forward':
await self.sns_sendtext_forward_handler_async(content_data)
case 'sns-sendtext':
await self.sns_sendtext_handler_async(content_data)
case 'sns-sendimages-forward':
await self.sns_sendimages_forward_handler_async(content_data)
case 'sns-sendimages':
await self.sns_sendimages_handler_async(content_data)
case 'sns-sendvideo-forward':
await self.sns_sendvideo_forward_handler_async(content_data)
await self.sns_sendvideo_forward_handler_async(content_data)
case 'sns-sendvideo':
await self.sns_sendvideo_handler_async(content_data)
case _:
logger.warning(f'kakfa 未处理息类型 {msg_type_data}')

@@ -289,24 +293,118 @@ class BizService():

async def sns_sendtext_forward_handler_async(self,content_data):
wxids=content_data.get('wxids',[])
wx_sns_content=content_data.get("wx_sns_content","")
if not wx_sns_content:
logger.warning(f'转发文本消息为空不处理 {wx_sns_content}')
wx_sns_content_text=content_data.get("wx_sns_content",{}).get("text","")
if not wx_sns_content_text:
logger.warning(f'转发文本消息为空不处理 {wx_sns_content_text}')
return
if not wxids:
logger.warning(f'wxids 空列表不处理 {wxids}')
return
#wxids_first=wxids[0]

tasks = []

for wxid in wxids:
tasks.append(self.wxchat.send_text_sns_async(self.token_id, self.app_id, wx_sns_content))
k,loginfo=await self.wxchat.get_login_info_by_wxid_async(wxid)
app_id=loginfo.get('appId','')
token_id=loginfo.get('tokenId','')
tasks.append(self.wxchat.send_text_sns_async(token_id, app_id, wx_sns_content_text))
await asyncio.gather(*tasks)

async def sns_sendimages_forward_handler_async(self,content_data):
pass
wxids=content_data.get('wxids',[])
wx_sns_content_text=content_data.get("wx_sns_content",{}).get("text","")
wx_sns_content_imgs=content_data.get("wx_sns_content",{}).get("imageUrls",[])
if not wx_sns_content_imgs:
logger.warning(f'转发图片消息为空不处理 {wx_sns_content_imgs}')
return
tasks = []
for wxid in wxids:
k,loginfo=await self.wxchat.get_login_info_by_wxid_async(wxid)
app_id=loginfo.get('appId','')
token_id=loginfo.get('tokenId','')
tasks.append(self.wxchat.upload_send_image_sns_async(token_id, app_id, wx_sns_content_text,wx_sns_content_imgs))
await asyncio.gather(*tasks)

async def sns_sendvideo_forward_handler_async(self,content_data):
pass
wxids=content_data.get('wxids',[])
wx_sns_content_text=content_data.get("wx_sns_content",{}).get("text","")
wx_sns_content_video_url=content_data.get("wx_sns_content",{}).get("videoUrl","")
wx_sns_content_thumb_url=content_data.get("wx_sns_content",{}).get("videoThumbUrl","")

if not wx_sns_content_thumb_url:
logger.warning(f'转发视频缩略图消息为空不处理 {wx_sns_content_thumb_url}')
return
if not wx_sns_content_video_url:
logger.warning(f'转发视频消息为空不处理 {wx_sns_content_video_url}')
return
if not wx_sns_content_thumb_url:
logger.warning(f'转发视频缩略图消息为空不处理 {wx_sns_content_thumb_url}')
return

tasks = []
for wxid in wxids:
k,loginfo=await self.wxchat.get_login_info_by_wxid_async(wxid)
app_id=loginfo.get('appId','')
token_id=loginfo.get('tokenId','')
tasks.append(self.wxchat.upload_send_video_sns_async(token_id, app_id,wx_sns_content_text,wx_sns_content_video_url,wx_sns_content_thumb_url))
await asyncio.gather(*tasks)

async def sns_sendtext_handler_async(self,content_data):
wxid=content_data.get('wxid','')
wx_sns_content_text=content_data.get("wx_sns_content",{}).get("text","")
if not wx_sns_content_text:
logger.warning(f'转发文本消息为空不处理 {wx_sns_content_text}')
return
if not wxid:
logger.warning(f'wxid 空不处理 {wxid}')
return
k,loginfo=await self.wxchat.get_login_info_by_wxid_async(wxid)
app_id=loginfo.get('appId','')
token_id=loginfo.get('tokenId','')
await self.wxchat.send_text_sns_async(token_id, app_id, wx_sns_content_text)

async def sns_sendimages_handler_async(self,content_data):
wxid=content_data.get('wxid','')
wx_sns_content_text=content_data.get("wx_sns_content",{}).get("text","")
wx_sns_content_imgs=content_data.get("wx_sns_content",{}).get("imageUrls",[])
if not wx_sns_content_imgs:
logger.warning(f'转发图片消息为空不处理 {wx_sns_content_imgs}')
return
if not wxid:
logger.warning(f'wxid 空不处理 {wxid}')
return
k,loginfo=await self.wxchat.get_login_info_by_wxid_async(wxid)
app_id=loginfo.get('appId','')
token_id=loginfo.get('tokenId','')
await self.wxchat.upload_send_image_sns_async(token_id, app_id, wx_sns_content_text,wx_sns_content_imgs)
async def sns_sendvideo_handler_async(self,content_data):
wxid=content_data.get('wxid','')
wx_sns_content_text=content_data.get("wx_sns_content",{}).get("text","")
wx_sns_content_video_url=content_data.get("wx_sns_content",{}).get("videoUrl","")
wx_sns_content_thumb_url=content_data.get("wx_sns_content",{}).get("videoThumbUrl","")

if not wx_sns_content_thumb_url:
logger.warning(f'转发视频缩略图消息为空不处理 {wx_sns_content_thumb_url}')
return
if not wx_sns_content_video_url:
logger.warning(f'转发视频消息为空不处理 {wx_sns_content_video_url}')
return
if not wx_sns_content_thumb_url:
logger.warning(f'转发视频缩略图消息为空不处理 {wx_sns_content_thumb_url}')
return
if not wxid:
logger.warning(f'wxid 空不处理 {wxid}')
return
k,loginfo=await self.wxchat.get_login_info_by_wxid_async(wxid)
app_id=loginfo.get('appId','')
token_id=loginfo.get('tokenId','')
await self.wxchat.upload_send_video_sns_async(token_id, app_id,wx_sns_content_text,wx_sns_content_video_url,wx_sns_content_thumb_url)

+ 41
- 14
services/gewe_service.py Vedi File

@@ -708,42 +708,56 @@ class GeWeService:
async with session.post(api_url, headers=headers, json=data) as response:
response_object = await response.json()
return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None)
async def send_video_sns_async(self, token_id, app_id, content: str, video_info: dict):
async def upload_sns_image_async(self, token_id, app_id, img_urls: list):
'''
发送视频朋友圈
上传朋友圈图片
'''
api_url = f"{self.base_url}/v2/api/sns/sendVideoSns"
api_url = f"{self.base_url}/v2/api/sns/uploadSnsImage"
headers = {
'X-GEWE-TOKEN': token_id,
'Content-Type': 'application/json'
}
data = {
"appId": app_id,
"content": content,
"allowWxIds": [],
"atWxIds": [],
"disableWxIds": [],
"videoInfo": video_info,
"privacy": False
"imgUrls": img_urls
}
async with aiohttp.ClientSession() as session:
async with session.post(api_url, headers=headers, json=data) as response:
response_object = await response.json()
return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None)

async def upload_sns_image_async(self, token_id, app_id, img_urls: list):
async def upload_send_image_sns_async(self, token_id, app_id, content, img_infos: list):
'''
上传朋友圈图片
上传并发送图片朋友圈
'''
api_url = f"{self.base_url}/v2/api/sns/uploadSnsImage"
ret, msg, data = await self.upload_sns_image_async(token_id, app_id, img_infos)
if ret != 200:
logger.warning(f'上传图片失败 {ret} {msg} {data}')
return ret, msg, data
ret, msg, data = await self.send_image_sns_async(token_id, app_id, content, data)
if ret != 200:
logger.warning(f'发送图片失败 {ret} {msg} {data}')
return ret, msg, data


async def send_video_sns_async(self, token_id, app_id, content: str, video_info: dict):
'''
发送视频朋友圈
'''
api_url = f"{self.base_url}/v2/api/sns/sendVideoSns"
headers = {
'X-GEWE-TOKEN': token_id,
'Content-Type': 'application/json'
}
data = {
"appId": app_id,
"imgUrls": img_urls
"content": content,
"allowWxIds": [],
"atWxIds": [],
"disableWxIds": [],
"videoInfo": video_info,
"privacy": False
}
async with aiohttp.ClientSession() as session:
async with session.post(api_url, headers=headers, json=data) as response:
@@ -769,6 +783,19 @@ class GeWeService:
response_object = await response.json()
return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None)

async def upload_send_video_sns_async(self, token_id, app_id, content: str, video_url: str, video_thumb_url: str):
'''
上传并发送视频朋友圈
'''
ret, msg, data = self.upload_sns_video_async(token_id, app_id, video_url, video_thumb_url)
if ret != 200:
logger.warning(f'上传视频失败 {ret} {msg} {data}')
return ret, msg, data
ret,msg,data=self.send_video_sns_async(token_id, app_id, content, data)
if ret != 200:
logger.warning(f'发送视频失败 {ret} {msg} {data}')
return ret,msg,data

async def get_sns_list_async(self, token_id, app_id, sns_id: str):
api_url = f"{self.base_url}/v2/api/sns/snsList"
headers = {


Loading…
Annulla
Salva