diff --git a/tasks.py b/tasks.py index ee88436..c0a9ef6 100644 --- a/tasks.py +++ b/tasks.py @@ -207,324 +207,6 @@ def scheduled_task_sync_wx_info(self, redis_config, kafka_config, gewe_config): loop.run_until_complete(task()) # 在现有事件循环中运行任务 - -#@celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms_p', bind=True, acks_late=True) -def scheduled_task_add_contacts_from_chatrooms_p(self, redis_config, kafka_config, gewe_config): - async def task(): - try: - now = datetime.now() - if now.hour < 8: - logger.info(f"定时群成员定时添好友任务不启动,当前时间为 {now.strftime('%Y-%m-%d %H:%M:%S')},早于8点") - return - logger.info('定时群成员定时添好友任务开始') - redis_service = RedisService() - await redis_service.init(**redis_config) - gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url']) - login_keys = [] - async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): - login_keys.append(key) - - #print(login_keys) - for k in login_keys: - r = await redis_service.get_hash(k) - app_id = r.get("appId") - token_id = r.get("tokenId") - wxid = r.get("wxid") - status = r.get('status') - if status == '0': - logger.warning(f"微信号 {wxid} 已经离线,群成员不能定时添加") - continue - - c = await gewe_service.get_wxchat_config_from_cache_async(wxid) - contacts = await gewe_service.get_contacts_brief_from_cache_async(wxid) - - contact_wxids = [c.get('userName') for c in contacts] - chatrooms = c.get('addContactsFromChatroomIdWhiteList', []) - logger.info(f'{wxid} 定时群成员定时添好友任务开始') - for chatroom_id in chatrooms: - chatroom = await gewe_service.get_group_info_from_cache_async(wxid, chatroom_id) - chatroom_member=await gewe_service.get_group_members_from_cache_async(wxid, chatroom_id) - - chatroom_nickname = chatroom.get('nickName') - chatroom_owner_wxid = chatroom_member.get('chatroomOwner', None) - - admin_wxids = chatroom_member.get('adminWxid', []) - - admin_wxids = chatroom_member.get('adminWxid') - if admin_wxids is None: - admin_wxids = [] # 如果 admin_wxids 是 None,将其初始化为空列表 - - # 判断当天群成员是否已经加了200个好友 - is_add_group_200_times = await gewe_service.is_group_add_contacts_history_one_day_200_async(wxid, chatroom_id) - if is_add_group_200_times: - logger.info(f"{wxid}在 {chatroom_nickname} 群成员已经加了200个好友,不再添加,群id:{chatroom_id}") - continue - - logger.info(f'{chatroom_nickname} 的群主是 {chatroom_owner_wxid},管理员是{admin_wxids}') - contact_wxids_set = set(contact_wxids) - # for admin_wxid in admin_wxids: - # contact_wxids_set.add(admin_wxid) - if admin_wxids: - contact_wxids_set.update(set(admin_wxids)) - - contact_wxids_set.update(set(admin_wxids)) - if chatroom_owner_wxid is not None: - contact_wxids_set.add(chatroom_owner_wxid) - - contact_wxids_set.add(wxid) - - # unavailable_wixds=await gewe_service.check_wixd_group_add_contacts_history_async(wxid,chatroom_id) - # contact_wxids_set.update(unavailable_wixds) - - chatroot_member_list = chatroom.get('memberList', []) - remaining_chatroot_members = [x for x in chatroot_member_list if x.get('wxid') not in contact_wxids_set] - - nickname = next((member['nickName'] for member in chatroot_member_list if member['wxid'] == wxid), None) - - logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里还可以邀请的好友有:{[x.get("nickName") for x in remaining_chatroot_members]}') - for m in remaining_chatroot_members: - contact_wxid= m.get('wxid') - member_nickname=m.get("nickName") - group_add_contacts_history = await gewe_service.get_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid) - sorted_history = sorted(group_add_contacts_history, key=lambda x: x.addTime, reverse=True) - - # 已经邀请过两次,不再邀请 - if len(sorted_history)==2: - logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经邀请过2次,不再邀请') - continue - - # 当天邀请过,不再邀请 - if len(sorted_history) > 0: - last_add_time = sorted_history[0].addTime - def is_add_time_more_than_one_day(addTime: int) -> bool: - """ - 判断 addTime 是否与当前时间相隔大于 3600 × 24 秒 - :param addTime: Unix 时间戳 - :return: 如果 addTime 与当前时间相隔大于 3600 × 24 秒,返回 True;否则返回 False - """ - # 获取当前时间的时间戳 - current_time = time.time() - - # 计算时间戳差值 - time_difference = abs(current_time - addTime) - - # 检查是否大于 3600 × 24 秒 - return time_difference > 3600 * 24 - - is_more_than_one_day= is_add_time_more_than_one_day(last_add_time) - - if not is_more_than_one_day: - logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的{member_nickname}-{contact_wxid}已经当天邀请,不再邀请') - continue - - - - ret, msg, data = await gewe_service.add_group_member_as_friend_async(token_id, app_id, chatroom_id, m.get('wxid'), f'我是群聊"{chatroom_nickname}"群的{nickname}') - # if ret==200: - # history=AddGroupContactsHistory.model_validate({ - # "chatroomId":chatroom_id, - # "wxid":wxid, - # "contactWixd":contact_wxid, - # "addTime":int(time.time()) - # }) - # await gewe_service.save_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid,history) - # else: - # logger.info(f'群好友邀请失败原因:{data}') - if ret!=200: - logger.warning(f'群好友邀请失败原因:{ret} {data}') - - history=AddGroupContactsHistory.model_validate({ - "chatroomId":chatroom_id, - "wxid":wxid, - "contactWixd":contact_wxid, - "addTime":int(time.time()) - }) - await gewe_service.save_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid,history) - - logger.info(f'{nickname} 向 {chatroom_nickname}-{chatroom_id} 群的 {m.get("nickName")}-{m.get("wxid")} 发送好友邀请 {msg}') - await asyncio.sleep(random.uniform(1.5, 3)) - await asyncio.sleep(random.uniform(1.5, 3)) - - - except Exception as e: - logger.error(f"任务执行过程中发生异常: {e}") - - loop = asyncio.get_event_loop() - if loop.is_closed(): - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - - loop.run_until_complete(task()) # 在现有事件循环中运行任务 - - - -#@celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms_p2', bind=True, acks_late=True) -def scheduled_task_add_contacts_from_chatrooms_p2(self, redis_config, kafka_config, gewe_config): - - ''' - 关于群加好友的请求规则:一次30人,间隔2小时做1次,一天做3次,即最多90人/天。 - ''' - async def task(): - try: - now = datetime.now() - if now.hour < 8: - logger.info(f"定时群成员定时添好友任务不启动,当前时间为 {now.strftime('%Y-%m-%d %H:%M:%S')},早于8点") - return - - logger.info('定时群成员定时添好友任务开始') - redis_service = RedisService() - await redis_service.init(**redis_config) - - gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url']) - - KAFKA_BOOTSTRAP_SERVERS=kafka_config['bootstrap_servers'] - KAFKA_TOPIC=kafka_config['topic'] - KAFKA_GROUP_ID=kafka_config['group_id'] - - kafka_service= KafkaService(KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC, KAFKA_TOPIC,KAFKA_GROUP_ID) - await kafka_service.start() - - login_keys = [] - async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): - login_keys.append(key) - - wixd_add_contacts_from_chatrooms_times = {} - #print(login_keys) - for k in login_keys: - r = await redis_service.get_hash(k) - app_id = r.get("appId") - token_id = r.get("tokenId") - wxid = r.get("wxid") - status = r.get('status') - if status == '0': - logger.warning(f"微信号 {wxid} 已经离线,群成员不能定时添加") - continue - - c = await gewe_service.get_wxchat_config_from_cache_async(wxid) - contacts = await gewe_service.get_contacts_brief_from_cache_async(wxid) - - contact_wxids = [c.get('userName') for c in contacts] - chatrooms = c.get('addContactsFromChatroomIdWhiteList', []) - logger.info(f'{wxid} 定时群成员定时添好友任务开始') - wixd_add_contacts_from_chatrooms_times[wxid] = 0 - for chatroom_id in chatrooms: - chatroom = await gewe_service.get_group_info_from_cache_async(wxid, chatroom_id) - chatroom_member=await gewe_service.get_group_members_from_cache_async(wxid, chatroom_id) - - chatroom_nickname = chatroom.get('nickName') - chatroom_owner_wxid = chatroom_member.get('chatroomOwner', None) - - admin_wxids = chatroom_member.get('adminWxid', []) - - admin_wxids = chatroom_member.get('adminWxid') - if admin_wxids is None: - admin_wxids = [] # 如果 admin_wxids 是 None,将其初始化为空列表 - - logger.info(f'{chatroom_nickname} 的群主是 {chatroom_owner_wxid},管理员是{admin_wxids}') - contact_wxids_set = set(contact_wxids) - # for admin_wxid in admin_wxids: - # contact_wxids_set.add(admin_wxid) - if admin_wxids: - contact_wxids_set.update(set(admin_wxids)) - if chatroom_owner_wxid is not None: - contact_wxids_set.add(chatroom_owner_wxid) - - contact_wxids_set.add(wxid) - - unavailable_wixds=await gewe_service.check_wixd_group_add_contacts_history_async(wxid,chatroom_id) - if unavailable_wixds: - contact_wxids_set.update(set(unavailable_wixds)) - - chatroom_member_list = chatroom.get('memberList', []) - if chatroom_member_list is None: - chatroom_member_list = [] # 如果 memberList 是 None,将其初始化为空列表 - elif not isinstance(chatroom_member_list, list): - chatroom_member_list = list(chatroom_member_list) # 如果 memberList 不是列表,将其转换为列表 - - remaining_chatroot_members = [x for x in chatroom_member_list if x.get('wxid') not in contact_wxids_set] - - nickname = next((member['nickName'] for member in chatroom_member_list if member['wxid'] == wxid), None) - - logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里还可以邀请的好友有:{[x.get("nickName") for x in remaining_chatroot_members]}') - for m in remaining_chatroot_members: - # 判断本次任务是否已经邀请了30个好友 - if wixd_add_contacts_from_chatrooms_times[wxid] == 30: - logger.info(f"{wxid} 本次任务已经邀请了30人,不再邀请") - return - # 判断当天群成员是否已经加了90个好友 - is_add_group_90_times = await gewe_service.is_group_add_contacts_history_one_day_90_async(wxid) - if is_add_group_90_times: - logger.info(f"当天 {wxid} 所有群的成员已经加了90个好友,不再添加") - return - - contact_wxid= m.get('wxid') - member_nickname=m.get("nickName") - group_add_contacts_history = await gewe_service.get_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid) - sorted_history = sorted(group_add_contacts_history, key=lambda x: x.addTime, reverse=True) - - # 已经邀请过两次,不再邀请 - if len(sorted_history)==2: - logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经邀请过2次,不再邀请') - continue - - # 当天邀请过,不再邀请 - if len(sorted_history) > 0: - last_add_time = sorted_history[0].addTime - def is_add_time_more_than_one_day(addTime: int) -> bool: - """ - 判断 addTime 是否与当前时间相隔大于 3600 × 24 秒 - :param addTime: Unix 时间戳 - :return: 如果 addTime 与当前时间相隔大于 3600 × 24 秒,返回 True;否则返回 False - """ - # 获取当前时间的时间戳 - current_time = time.time() - - # 计算时间戳差值 - time_difference = abs(current_time - addTime) - - # 检查是否大于 3600 × 24 秒 - return time_difference > 3600 * 24 - - is_more_than_one_day= is_add_time_more_than_one_day(last_add_time) - - if not is_more_than_one_day: - logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的{member_nickname}-{contact_wxid}已经当天邀请,不再邀请') - continue - - - - ret, msg, data = await gewe_service.add_group_member_as_friend_async(token_id, app_id, chatroom_id, m.get('wxid'), f'我是群聊"{chatroom_nickname}"群的{nickname}') - if ret!=200: - logger.warning(f'群好友邀请失败原因:{ret} {data}') - - history=AddGroupContactsHistory.model_validate({ - "chatroomId":chatroom_id, - "wxid":wxid, - "contactWixd":contact_wxid, - "addTime":int(time.time()) - }) - await gewe_service.save_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid,history) - wixd_add_contacts_from_chatrooms_times[wxid]+=1 - logger.info(f'{nickname} 向 {chatroom_nickname}-{chatroom_id} 群的 {m.get("nickName")}-{m.get("wxid")} 发送好友邀请 {msg}') - # 推送到kafka - - k_message = wx_add_contacts_from_chatroom_message(history.wxid,history.chatroomId,history.contactWixd,history.addTime) - await kafka_service.send_message_async(k_message) - - await asyncio.sleep(random.uniform(1.5, 3)) - await asyncio.sleep(random.uniform(1.5, 3)) - - - except Exception as e: - logger.error(f"任务执行过程中发生异常: {e}") - - loop = asyncio.get_event_loop() - if loop.is_closed(): - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - - loop.run_until_complete(task()) # 在现有事件循环中运行任务 - @celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms', bind=True, acks_late=True) def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config, gewe_config): @@ -731,39 +413,41 @@ def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config, contact_wxid= m.get('wxid') member_nickname=m.get("nickName") group_add_contacts_history = await gewe_service.get_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid) - sorted_history = sorted(group_add_contacts_history, key=lambda x: x.addTime, reverse=True) - - # 已经邀请过两次,不再邀请 - if len(sorted_history)==2: - logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经邀请过2次,不再邀请') - continue - - # 当天邀请过,不再邀请 - if len(sorted_history) > 0: - last_add_time = sorted_history[0].addTime - def is_add_time_more_than_one_day(addTime: int) -> bool: - """ - 判断 addTime 是否与当前时间相隔大于 3600 × 24 秒 - :param addTime: Unix 时间戳 - :return: 如果 addTime 与当前时间相隔大于 3600 × 24 秒,返回 True;否则返回 False - """ - # 获取当前时间的时间戳 - current_time = time.time() - - # 计算时间戳差值 - time_difference = abs(current_time - addTime) - - # 检查是否大于 3600 × 24 秒 - return time_difference > 3600 * 24 - is_more_than_one_day= is_add_time_more_than_one_day(last_add_time) - - if not is_more_than_one_day: - logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经当天邀请,不再邀请') + if group_add_contacts_history: + sorted_history = sorted(group_add_contacts_history, key=lambda x: x.addTime, reverse=True) + + # 已经邀请过两次,不再邀请 + if len(sorted_history)==2: + logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经邀请过2次,不再邀请') continue - - + # 当天邀请过,不再邀请 + if len(sorted_history) > 0: + last_add_time = sorted_history[0].addTime + def is_add_time_more_than_one_day(addTime: int) -> bool: + """ + 判断 addTime 是否与当前时间相隔大于 3600 × 24 秒 + :param addTime: Unix 时间戳 + :return: 如果 addTime 与当前时间相隔大于 3600 × 24 秒,返回 True;否则返回 False + """ + # 获取当前时间的时间戳 + current_time = time.time() + + # 计算时间戳差值 + time_difference = abs(current_time - addTime) + + # 检查是否大于 3600 × 24 秒 + return time_difference > 3600 * 24 + + is_more_than_one_day= is_add_time_more_than_one_day(last_add_time) + + if not is_more_than_one_day: + logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经当天邀请,不再邀请') + continue + + + ret, msg, data = await gewe_service.add_group_member_as_friend_async(token_id, app_id, chatroom_id, m.get('wxid'), f'我是群聊"{chatroom_nickname}"群的{nickname}') if ret!=200: logger.warning(f'群好友邀请失败原因:{ret} {data}')