@@ -139,6 +139,11 @@ def wx_add_contacts_from_chatroom_message(wxid:str,chatroom_id:str,contact_wixd: | |||||
return json.dumps(data, separators=(',', ':'), ensure_ascii=False) | return json.dumps(data, separators=(',', ':'), ensure_ascii=False) | ||||
def wx_add_contacts_from_chatroom_task_status(wxid:str,chatroom_id:str,status:int): | |||||
content = {"wxid": wxid,"chatroomId":chatroom_id,"status":status} | |||||
data=kafka_base_message("add-contacts-from-chatroom-task-status",content) | |||||
return json.dumps(data, separators=(',', ':'), ensure_ascii=False) | |||||
def wx_del_group_message(wxid:str,chatroom_id:str)->str: | def wx_del_group_message(wxid:str,chatroom_id:str)->str: | ||||
content = {"wxid": wxid,"chatroom_id":chatroom_id} | content = {"wxid": wxid,"chatroom_id":chatroom_id} | ||||
data=kafka_base_message("del-group",content) | data=kafka_base_message("del-group",content) | ||||
@@ -1395,6 +1395,34 @@ class GeWeService: | |||||
cache= await self.redis_service.get_hash_field(hash_key,"data") | cache= await self.redis_service.get_hash_field(hash_key,"data") | ||||
return json.loads(cache) if cache else [] | return json.loads(cache) if cache else [] | ||||
async def wx_add_contacts_from_chatroom_task_status_async(self,wxid,chatroom_id)->int: | |||||
history_hash_key = f'__AI_OPS_WX__:GROUPS_ADD_CONTACT_HISTORY:{wxid}:{chatroom_id}' | |||||
cache=await self.redis_service.get_hash(history_hash_key) | |||||
cache.keys() | |||||
group=await self. get_group_members_from_cache_async(wxid,chatroom_id) | |||||
chatroom_member_list = group.get('memberList', []) | |||||
chatroom_owner_wxid = group.get('chatroomOwner', None) | |||||
admin_wxids = group.get('adminWxid', []) | |||||
admin_wxids = group.get('adminWxid') | |||||
if admin_wxids is None: | |||||
admin_wxids = [] | |||||
if chatroom_owner_wxid: | |||||
admin_wxids.append(chatroom_owner_wxid) | |||||
#unavailable_wixds=set(admin_wxids) | |||||
available_members=[m["wxid"] for m in chatroom_member_list if m["wxid"] not in admin_wxids] | |||||
if len(available_members) > len(cache.keys): | |||||
return 1 | |||||
for key, value in cache.items(): | |||||
value_data_list = json.loads(value) | |||||
if len(value_data_list) <2: | |||||
return 1 | |||||
return 2 | |||||
# 依赖项:获取 GeWeChatCom 单例 | # 依赖项:获取 GeWeChatCom 单例 | ||||
async def get_gewe_service(app: FastAPI = Depends()) -> GeWeService: | async def get_gewe_service(app: FastAPI = Depends()) -> GeWeService: |
@@ -739,6 +739,12 @@ def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config, | |||||
k_message = wx_add_contacts_from_chatroom_message(history.wxid,history.chatroomId,history.contactWixd,history.addTime) | k_message = wx_add_contacts_from_chatroom_message(history.wxid,history.chatroomId,history.contactWixd,history.addTime) | ||||
await kafka_service.send_message_async(k_message) | await kafka_service.send_message_async(k_message) | ||||
await asyncio.sleep(random.uniform(1.5, 3)) | await asyncio.sleep(random.uniform(1.5, 3)) | ||||
# 任务推送到kafka | |||||
task_status=await gewe_service.wx_add_contacts_from_chatroom_task_status_async(wxid,chatroom_id) | |||||
wx_add_contacts_from_chatroom_task_status(wxid,chatroom_id,task_status) | |||||
await kafka_service.send_message_async(k_message) | |||||
# 下一个群 | |||||
await asyncio.sleep(random.uniform(1.5, 3)) | await asyncio.sleep(random.uniform(1.5, 3)) | ||||