from celery_app import celery_app from fastapi import Request,FastAPI import time,datetime from celery import Celery import celery.schedules from redbeat import RedBeatSchedulerEntry from datetime import timedelta from services.redis_service import RedisService from services.kafka_service import KafkaService from services.gewe_service import GeWeService # from common.log import logger from common.utils import * import asyncio,random from model.models import AddGroupContactsHistory import logging from model.models import AgentConfig import logging logger = logging.getLogger('redbeat') @celery_app.task(name='tasks.add_task', bind=True, acks_late=True) def add_task(self, x, y): time.sleep(5) # 模拟长时间计算 logger.info('add') return x + y @celery_app.task(name='tasks.mul_task', bind=True, acks_late=True) def mul_task(self, x, y): time.sleep(5) # 模拟长时间计算 return x * y # @celery.task(name='app.tasks.sync_contacts', bind=True, acks_late=True) # async def sync_contacts_task(self,app): # login_keys = list(await app.state.redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*')) # return login_keys # # for k in login_keys: # # print(k) @celery_app.task(name='tasks.sync_contacts', bind=True, acks_late=True) async def sync_contacts_task(self, redis_service): # Use the redis_service passed as an argument login_keys = list(await redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*')) return login_keys @celery_app.task(name='tasks.background_worker_task', bind=True, acks_late=True) def background_worker_task(self, redis_config, kafka_config, gewe_config): async def task(): redis_service = RedisService() await redis_service.init(**redis_config) login_keys = [] async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): login_keys.append(key) print(login_keys) asyncio.run(task()) # @celery.task(name='tasks.background_worker_task', bind=True, acks_late=True) # async def background_worker_task(self, redis_config, kafka_config, gewe_config): # # Initialize services inside the task # redis_service = RedisService() # await redis_service.init(**redis_config) # login_keys = [] # async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): # 使用 async for 遍历异步生成器 # login_keys.append(key) # print(login_keys) # kafka_service = KafkaService(**kafka_config) # await kafka_service.start() # gewe_service = await GeWeService.get_instance(None, gewe_config['api_url']) # # Task logic # lock_name = "background_wxchat_thread_lock" # lock_identifier = str(time.time()) # while True: # if await redis_service.acquire_lock(lock_name, timeout=10): # try: # logger.info("分布式锁已成功获取") # # Perform task logic # finally: # await redis_service.release_lock(lock_name, lock_identifier) # break # else: # logger.info("获取分布式锁失败,等待10秒后重试...") # await asyncio.sleep(10) # @celery_app.task(name='tasks.scheduled_task', bind=True, acks_late=True) # def scheduled_task(self): # print("定时任务执行成功!~~~~~~~~~~~~~~~~~") # return "Hello from Celery Beat + RedBeat!" # @celery_app.task(name='tasks.scheduled_task_sync_wx', bind=True, acks_late=True) # def scheduled_task_sync_wx(self,redis_service,kafka_service,gewe_service): # print("scheduled_task_sync_wx 定时任务执行成功!") # return "Hello from Celery Beat + RedBeat!" # @celery_app.task(name='tasks.scheduled_task_sync_wx_info_1', bind=True, acks_late=True) # def scheduled_task_sync_wx_info_1(self,redis_config, kafka_config, gewe_config): # ''' # 定时获取微信号资料 # ''' # loop = asyncio.new_event_loop() # asyncio.set_event_loop(loop) # async def task(): # try: # redis_service = RedisService() # await redis_service.init(**redis_config) # # gewe_service = await GeWeService.get_instance(None, gewe_config['api_url']) # login_keys = [] # async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): # login_keys.append(key) # print(login_keys) # # for k in login_keys: # # r = await redis_service.get_hash(k) # # app_id = r.get("appId") # # token_id = r.get("tokenId") # # wxid = r.get("wxid") # # status = r.get('status') # # if status == '0': # # continue # # ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id) # # if ret != 200: # # logger.warning(f"同步微信号 {wxid} 资料失败: {ret}-{msg}") # # continue # # nickname=profile.get("nickName") # # head_img_url=profile.get("smallHeadImgUrl") # # r.update({"nickName":nickname,"headImgUrl":head_img_url,"modify_at":int(time.time())}) # # cleaned_login_info = {k: (v if v is not None else '') for k, v in r.items()} # # await redis_service.set_hash(k, cleaned_login_info) # # logger.info(f"同步微信号 {wxid} 资料 成功") # # redis_service.update_hash_field(k,"nickName",nickname) # # redis_service.update_hash_field(k,"headImgUrl",head_img_url) # # redis_service.update_hash_field(k,"modify_at",int(time.time())) # except Exception as e: # logger.error(f"任务执行过程中发生异常: {e}") # print("scheduled_task_sync_wx_info 定时任务执行成功!") # return "Hello from Celery Beat + RedBeat!" # loop.run_until_complete(task()) # loop.close() @celery_app.task(name='tasks.scheduled_task_sync_wx_info', bind=True, acks_late=True) def scheduled_task_sync_wx_info(self, redis_config, kafka_config, gewe_config): ''' 定时获取微信号资料 ''' async def task(): try: redis_service = RedisService() await redis_service.init(**redis_config) gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url']) login_keys = [] async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): login_keys.append(key) print(login_keys) for k in login_keys: r = await redis_service.get_hash(k) app_id = r.get("appId") token_id = r.get("tokenId") wxid = r.get("wxid") status = r.get('status') if status == '0': logger.warning(f"微信号 {wxid} 已经离线") continue ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id) if ret != 200: logger.warning(f"同步微信号 {wxid} 资料失败: {ret}-{msg}") continue nickname=profile.get("nickName") head_img_url=profile.get("smallHeadImgUrl") # print(nickname) nickname=profile.get("nickName") head_img_url=profile.get("smallHeadImgUrl") r.update({"nickName":nickname,"headImgUrl":head_img_url,"modify_at":int(time.time())}) cleaned_login_info = {k: (v if v is not None else '') for k, v in r.items()} await redis_service.set_hash(k, cleaned_login_info) logger.info(f"定时同步微信号{wxid}-昵称{nickname} 资料成功") except Exception as e: logger.error(f"任务执行过程中发生异常: {e}") loop = asyncio.get_event_loop() if loop.is_closed(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(task()) # 在现有事件循环中运行任务 #@celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms_p', bind=True, acks_late=True) def scheduled_task_add_contacts_from_chatrooms_p(self, redis_config, kafka_config, gewe_config): async def task(): try: now = datetime.now() if now.hour < 8: logger.info(f"定时群成员定时添好友任务不启动,当前时间为 {now.strftime('%Y-%m-%d %H:%M:%S')},早于8点") return logger.info('定时群成员定时添好友任务开始') redis_service = RedisService() await redis_service.init(**redis_config) gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url']) login_keys = [] async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): login_keys.append(key) #print(login_keys) for k in login_keys: r = await redis_service.get_hash(k) app_id = r.get("appId") token_id = r.get("tokenId") wxid = r.get("wxid") status = r.get('status') if status == '0': logger.warning(f"微信号 {wxid} 已经离线,群成员不能定时添加") continue c = await gewe_service.get_wxchat_config_from_cache_async(wxid) contacts = await gewe_service.get_contacts_brief_from_cache_async(wxid) contact_wxids = [c.get('userName') for c in contacts] chatrooms = c.get('addContactsFromChatroomIdWhiteList', []) logger.info(f'{wxid} 定时群成员定时添好友任务开始') for chatroom_id in chatrooms: chatroom = await gewe_service.get_group_info_from_cache_async(wxid, chatroom_id) chatroom_member=await gewe_service.get_group_members_from_cache_async(wxid, chatroom_id) chatroom_nickname = chatroom.get('nickName') chatroom_owner_wxid = chatroom_member.get('chatroomOwner', None) admin_wxids = chatroom_member.get('adminWxid', []) admin_wxids = chatroom_member.get('adminWxid') if admin_wxids is None: admin_wxids = [] # 如果 admin_wxids 是 None,将其初始化为空列表 # 判断当天群成员是否已经加了200个好友 is_add_group_200_times = await gewe_service.is_group_add_contacts_history_one_day_200_async(wxid, chatroom_id) if is_add_group_200_times: logger.info(f"{wxid}在 {chatroom_nickname} 群成员已经加了200个好友,不再添加,群id:{chatroom_id}") continue logger.info(f'{chatroom_nickname} 的群主是 {chatroom_owner_wxid},管理员是{admin_wxids}') contact_wxids_set = set(contact_wxids) # for admin_wxid in admin_wxids: # contact_wxids_set.add(admin_wxid) if admin_wxids: contact_wxids_set.update(set(admin_wxids)) contact_wxids_set.update(set(admin_wxids)) if chatroom_owner_wxid is not None: contact_wxids_set.add(chatroom_owner_wxid) contact_wxids_set.add(wxid) # unavailable_wixds=await gewe_service.check_wixd_group_add_contacts_history_async(wxid,chatroom_id) # contact_wxids_set.update(unavailable_wixds) chatroot_member_list = chatroom.get('memberList', []) remaining_chatroot_members = [x for x in chatroot_member_list if x.get('wxid') not in contact_wxids_set] nickname = next((member['nickName'] for member in chatroot_member_list if member['wxid'] == wxid), None) logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里还可以邀请的好友有:{[x.get("nickName") for x in remaining_chatroot_members]}') for m in remaining_chatroot_members: contact_wxid= m.get('wxid') member_nickname=m.get("nickName") group_add_contacts_history = await gewe_service.get_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid) sorted_history = sorted(group_add_contacts_history, key=lambda x: x.addTime, reverse=True) # 已经邀请过两次,不再邀请 if len(sorted_history)==2: logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经邀请过2次,不再邀请') continue # 当天邀请过,不再邀请 if len(sorted_history) > 0: last_add_time = sorted_history[0].addTime def is_add_time_more_than_one_day(addTime: int) -> bool: """ 判断 addTime 是否与当前时间相隔大于 3600 × 24 秒 :param addTime: Unix 时间戳 :return: 如果 addTime 与当前时间相隔大于 3600 × 24 秒,返回 True;否则返回 False """ # 获取当前时间的时间戳 current_time = time.time() # 计算时间戳差值 time_difference = abs(current_time - addTime) # 检查是否大于 3600 × 24 秒 return time_difference > 3600 * 24 is_more_than_one_day= is_add_time_more_than_one_day(last_add_time) if not is_more_than_one_day: logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的{member_nickname}-{contact_wxid}已经当天邀请,不再邀请') continue ret, msg, data = await gewe_service.add_group_member_as_friend_async(token_id, app_id, chatroom_id, m.get('wxid'), f'我是群聊"{chatroom_nickname}"群的{nickname}') # if ret==200: # history=AddGroupContactsHistory.model_validate({ # "chatroomId":chatroom_id, # "wxid":wxid, # "contactWixd":contact_wxid, # "addTime":int(time.time()) # }) # await gewe_service.save_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid,history) # else: # logger.info(f'群好友邀请失败原因:{data}') if ret!=200: logger.warning(f'群好友邀请失败原因:{ret} {data}') history=AddGroupContactsHistory.model_validate({ "chatroomId":chatroom_id, "wxid":wxid, "contactWixd":contact_wxid, "addTime":int(time.time()) }) await gewe_service.save_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid,history) logger.info(f'{nickname} 向 {chatroom_nickname}-{chatroom_id} 群的 {m.get("nickName")}-{m.get("wxid")} 发送好友邀请 {msg}') await asyncio.sleep(random.uniform(1.5, 3)) await asyncio.sleep(random.uniform(1.5, 3)) except Exception as e: logger.error(f"任务执行过程中发生异常: {e}") loop = asyncio.get_event_loop() if loop.is_closed(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(task()) # 在现有事件循环中运行任务 #@celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms_p2', bind=True, acks_late=True) def scheduled_task_add_contacts_from_chatrooms_p2(self, redis_config, kafka_config, gewe_config): ''' 关于群加好友的请求规则:一次30人,间隔2小时做1次,一天做3次,即最多90人/天。 ''' async def task(): try: now = datetime.now() if now.hour < 8: logger.info(f"定时群成员定时添好友任务不启动,当前时间为 {now.strftime('%Y-%m-%d %H:%M:%S')},早于8点") return logger.info('定时群成员定时添好友任务开始') redis_service = RedisService() await redis_service.init(**redis_config) gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url']) KAFKA_BOOTSTRAP_SERVERS=kafka_config['bootstrap_servers'] KAFKA_TOPIC=kafka_config['topic'] KAFKA_GROUP_ID=kafka_config['group_id'] kafka_service= KafkaService(KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC, KAFKA_TOPIC,KAFKA_GROUP_ID) await kafka_service.start() login_keys = [] async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): login_keys.append(key) wixd_add_contacts_from_chatrooms_times = {} #print(login_keys) for k in login_keys: r = await redis_service.get_hash(k) app_id = r.get("appId") token_id = r.get("tokenId") wxid = r.get("wxid") status = r.get('status') if status == '0': logger.warning(f"微信号 {wxid} 已经离线,群成员不能定时添加") continue c = await gewe_service.get_wxchat_config_from_cache_async(wxid) contacts = await gewe_service.get_contacts_brief_from_cache_async(wxid) contact_wxids = [c.get('userName') for c in contacts] chatrooms = c.get('addContactsFromChatroomIdWhiteList', []) logger.info(f'{wxid} 定时群成员定时添好友任务开始') wixd_add_contacts_from_chatrooms_times[wxid] = 0 for chatroom_id in chatrooms: chatroom = await gewe_service.get_group_info_from_cache_async(wxid, chatroom_id) chatroom_member=await gewe_service.get_group_members_from_cache_async(wxid, chatroom_id) chatroom_nickname = chatroom.get('nickName') chatroom_owner_wxid = chatroom_member.get('chatroomOwner', None) admin_wxids = chatroom_member.get('adminWxid', []) admin_wxids = chatroom_member.get('adminWxid') if admin_wxids is None: admin_wxids = [] # 如果 admin_wxids 是 None,将其初始化为空列表 logger.info(f'{chatroom_nickname} 的群主是 {chatroom_owner_wxid},管理员是{admin_wxids}') contact_wxids_set = set(contact_wxids) # for admin_wxid in admin_wxids: # contact_wxids_set.add(admin_wxid) if admin_wxids: contact_wxids_set.update(set(admin_wxids)) if chatroom_owner_wxid is not None: contact_wxids_set.add(chatroom_owner_wxid) contact_wxids_set.add(wxid) unavailable_wixds=await gewe_service.check_wixd_group_add_contacts_history_async(wxid,chatroom_id) if unavailable_wixds: contact_wxids_set.update(set(unavailable_wixds)) chatroom_member_list = chatroom.get('memberList', []) if chatroom_member_list is None: chatroom_member_list = [] # 如果 memberList 是 None,将其初始化为空列表 elif not isinstance(chatroom_member_list, list): chatroom_member_list = list(chatroom_member_list) # 如果 memberList 不是列表,将其转换为列表 remaining_chatroot_members = [x for x in chatroom_member_list if x.get('wxid') not in contact_wxids_set] nickname = next((member['nickName'] for member in chatroom_member_list if member['wxid'] == wxid), None) logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里还可以邀请的好友有:{[x.get("nickName") for x in remaining_chatroot_members]}') for m in remaining_chatroot_members: # 判断本次任务是否已经邀请了30个好友 if wixd_add_contacts_from_chatrooms_times[wxid] == 30: logger.info(f"{wxid} 本次任务已经邀请了30人,不再邀请") return # 判断当天群成员是否已经加了90个好友 is_add_group_90_times = await gewe_service.is_group_add_contacts_history_one_day_90_async(wxid) if is_add_group_90_times: logger.info(f"当天 {wxid} 所有群的成员已经加了90个好友,不再添加") return contact_wxid= m.get('wxid') member_nickname=m.get("nickName") group_add_contacts_history = await gewe_service.get_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid) sorted_history = sorted(group_add_contacts_history, key=lambda x: x.addTime, reverse=True) # 已经邀请过两次,不再邀请 if len(sorted_history)==2: logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经邀请过2次,不再邀请') continue # 当天邀请过,不再邀请 if len(sorted_history) > 0: last_add_time = sorted_history[0].addTime def is_add_time_more_than_one_day(addTime: int) -> bool: """ 判断 addTime 是否与当前时间相隔大于 3600 × 24 秒 :param addTime: Unix 时间戳 :return: 如果 addTime 与当前时间相隔大于 3600 × 24 秒,返回 True;否则返回 False """ # 获取当前时间的时间戳 current_time = time.time() # 计算时间戳差值 time_difference = abs(current_time - addTime) # 检查是否大于 3600 × 24 秒 return time_difference > 3600 * 24 is_more_than_one_day= is_add_time_more_than_one_day(last_add_time) if not is_more_than_one_day: logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的{member_nickname}-{contact_wxid}已经当天邀请,不再邀请') continue ret, msg, data = await gewe_service.add_group_member_as_friend_async(token_id, app_id, chatroom_id, m.get('wxid'), f'我是群聊"{chatroom_nickname}"群的{nickname}') if ret!=200: logger.warning(f'群好友邀请失败原因:{ret} {data}') history=AddGroupContactsHistory.model_validate({ "chatroomId":chatroom_id, "wxid":wxid, "contactWixd":contact_wxid, "addTime":int(time.time()) }) await gewe_service.save_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid,history) wixd_add_contacts_from_chatrooms_times[wxid]+=1 logger.info(f'{nickname} 向 {chatroom_nickname}-{chatroom_id} 群的 {m.get("nickName")}-{m.get("wxid")} 发送好友邀请 {msg}') # 推送到kafka k_message = wx_add_contacts_from_chatroom_message(history.wxid,history.chatroomId,history.contactWixd,history.addTime) await kafka_service.send_message_async(k_message) await asyncio.sleep(random.uniform(1.5, 3)) await asyncio.sleep(random.uniform(1.5, 3)) except Exception as e: logger.error(f"任务执行过程中发生异常: {e}") loop = asyncio.get_event_loop() if loop.is_closed(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(task()) # 在现有事件循环中运行任务 @celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms', bind=True, acks_late=True) def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config, gewe_config): ''' 关于群加好友的请求规则:一次30人,间隔2小时做1次,一天做3次,即最多90人/天。 加好友规则:每天处理次数、间隔时间(分钟)、每次加好友人数这3个参数都可以设置。目前默认只是上面的设置。 ''' async def task(): try: now = datetime.now() if now.hour < 8: logger.info(f"定时群成员定时添好友任务不启动,当前时间为 {now.strftime('%Y-%m-%d %H:%M:%S')},早于8点") return logger.info('定时群成员定时添好友任务开始') redis_service = RedisService() await redis_service.init(**redis_config) gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url']) KAFKA_BOOTSTRAP_SERVERS=kafka_config['bootstrap_servers'] KAFKA_TOPIC=kafka_config['topic'] KAFKA_GROUP_ID=kafka_config['group_id'] kafka_service= KafkaService(KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC, KAFKA_TOPIC,KAFKA_GROUP_ID) await kafka_service.start_producer() global_config=await gewe_service.get_global_config_from_cache_async() scheduled_task_add_contacts_from_chatrooms_config=global_config.get('scheduledTaskAddContactsFromChatrooms',{}) oneday_add_contacts_total=90 once_add_contacts_total=30 oneday_times=3 if scheduled_task_add_contacts_from_chatrooms_config: oneday_add_contacts_total=scheduled_task_add_contacts_from_chatrooms_config.get('oneDayAddContactsTotal',90) once_add_contacts_total=scheduled_task_add_contacts_from_chatrooms_config.get('onceAddContactsTotal',30) oneday_times=scheduled_task_add_contacts_from_chatrooms_config.get('oneDayTimes',3) cache_task_run_time_logs= await gewe_service.get_task_run_time_async('scheduled_task_add_contacts_from_chatrooms') if len(cache_task_run_time_logs) == oneday_times: logger.info(f"今日定时群成员定时添好友任务已达上限 {oneday_times} 次!") return if cache_task_run_time_logs: sorted_tasks = sorted(cache_task_run_time_logs, key=lambda x: x.get("runTime"), reverse=True) last_run_time=sorted_tasks[0].get("runTime") if last_run_time > 1e12: # 毫秒级时间戳 last_run_time = last_run_time / 1000 # 转换为秒 # 将时间戳转换为 datetime 对象 last_run_time = datetime.fromtimestamp(last_run_time) # 获取当前时间 current_time = datetime.now() # 计算时间差 time_difference = current_time - last_run_time # 判断是否相差2小时 if time_difference < timedelta(hours=2): logger.info(f"上次定时群成员定时添好友任务在2小时内,不再执行") return # 获取当前时间 current_time = datetime.now() # 计算当天的结束时间(23:59:59) end_of_day = datetime(current_time.year, current_time.month, current_time.day, 23, 59, 59) # 计算时间差 time_difference = end_of_day - current_time # 将时间差转换为秒数 time_difference_seconds = int(time_difference.total_seconds()) cache_task_run_time_logs.append({"runTime":int(time.time())}) await gewe_service.save_task_run_time_async('scheduled_task_add_contacts_from_chatrooms',cache_task_run_time_logs,time_difference_seconds) login_keys = [] async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): login_keys.append(key) wixd_add_contacts_from_chatrooms_times = {} for k in login_keys: r = await redis_service.get_hash(k) app_id = r.get("appId") token_id = r.get("tokenId") wxid = r.get("wxid") status = r.get('status') if status == '0': logger.warning(f"微信号 {wxid} 已经离线,群成员不能定时添加") continue config=await gewe_service.get_wxchat_config_from_cache_async(wxid) validated_config = AgentConfig.model_validate(config) if not validated_config.agentEnabled: logger.warning(f"微信号 {wxid} 取消了托管,群成员不能定时添加") continue c = await gewe_service.get_wxchat_config_from_cache_async(wxid) contacts = await gewe_service.get_contacts_brief_from_cache_async(wxid) contact_wxids = [c.get('userName') for c in contacts] chatrooms = c.get('addContactsFromChatroomIdWhiteList', []) logger.info(f'{wxid} 定时群成员定时添好友任务开始') wixd_add_contacts_from_chatrooms_times[wxid] = 0 for chatroom_id in chatrooms: chatroom = await gewe_service.get_group_info_from_cache_async(wxid, chatroom_id) chatroom_member=await gewe_service.get_group_members_from_cache_async(wxid, chatroom_id) chatroom_nickname = chatroom.get('nickName') chatroom_owner_wxid = chatroom_member.get('chatroomOwner', None) admin_wxids = chatroom_member.get('adminWxid', []) admin_wxids = chatroom_member.get('adminWxid') if admin_wxids is None: admin_wxids = [] # 如果 admin_wxids 是 None,将其初始化为空列表 logger.info(f'{chatroom_nickname} 的群主是 {chatroom_owner_wxid},管理员是{admin_wxids}') contact_wxids_set = set(contact_wxids) # for admin_wxid in admin_wxids: # contact_wxids_set.add(admin_wxid) if admin_wxids: contact_wxids_set.update(set(admin_wxids)) if chatroom_owner_wxid is not None: contact_wxids_set.add(chatroom_owner_wxid) contact_wxids_set.add(wxid) # unavailable_wixds=await gewe_service.check_wixd_group_add_contacts_history_async(wxid,chatroom_id) # contact_wxids_set.update(set(unavailable_wixds)) # chatroom_member_list = chatroom.get('memberList', []) unavailable_wixds=await gewe_service.check_wixd_group_add_contacts_history_async(wxid,chatroom_id) if unavailable_wixds: contact_wxids_set.update(set(unavailable_wixds)) chatroom_member_list = chatroom.get('memberList', []) if chatroom_member_list is None: chatroom_member_list = [] # 如果 memberList 是 None,将其初始化为空列表 elif not isinstance(chatroom_member_list, list): chatroom_member_list = list(chatroom_member_list) # 如果 memberList 不是列表,将其转换为列表 remaining_chatroot_members = [x for x in chatroom_member_list if x.get('wxid') not in contact_wxids_set] nickname = next((member['nickName'] for member in chatroom_member_list if member['wxid'] == wxid), None) if not remaining_chatroot_members: logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里没有好友可以邀请') # 任务状态推送到kafka k_message=wx_add_contacts_from_chatroom_task_status(wxid,chatroom_id,2) await kafka_service.send_message_async(k_message) continue logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里还可以邀请的好友有:{[x.get("nickName") for x in remaining_chatroot_members]}') for m in remaining_chatroot_members: # 判断本次任务是否已经邀请了30个好友 if wixd_add_contacts_from_chatrooms_times[wxid] == once_add_contacts_total: logger.info(f"{wxid} 本次任务已经邀请了{once_add_contacts_total}人,不再邀请") return # 判断当天群成员是否已经加了90个好友 is_add_group_times = await gewe_service.is_group_add_contacts_history_one_day_async(wxid,oneday_add_contacts_total) if is_add_group_times: logger.info(f"当天 {wxid} 所有群的成员已经加了{oneday_add_contacts_total}个好友,不再添加") return contact_wxid= m.get('wxid') member_nickname=m.get("nickName") group_add_contacts_history = await gewe_service.get_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid) sorted_history = sorted(group_add_contacts_history, key=lambda x: x.addTime, reverse=True) # 已经邀请过两次,不再邀请 if len(sorted_history)==2: logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经邀请过2次,不再邀请') continue # 当天邀请过,不再邀请 if len(sorted_history) > 0: last_add_time = sorted_history[0].addTime def is_add_time_more_than_one_day(addTime: int) -> bool: """ 判断 addTime 是否与当前时间相隔大于 3600 × 24 秒 :param addTime: Unix 时间戳 :return: 如果 addTime 与当前时间相隔大于 3600 × 24 秒,返回 True;否则返回 False """ # 获取当前时间的时间戳 current_time = time.time() # 计算时间戳差值 time_difference = abs(current_time - addTime) # 检查是否大于 3600 × 24 秒 return time_difference > 3600 * 24 is_more_than_one_day= is_add_time_more_than_one_day(last_add_time) if not is_more_than_one_day: logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的{member_nickname}-{contact_wxid}已经当天邀请,不再邀请') continue ret, msg, data = await gewe_service.add_group_member_as_friend_async(token_id, app_id, chatroom_id, m.get('wxid'), f'我是群聊"{chatroom_nickname}"群的{nickname}') if ret!=200: logger.warning(f'群好友邀请失败原因:{ret} {data}') history=AddGroupContactsHistory.model_validate({ "chatroomId":chatroom_id, "wxid":wxid, "contactWixd":contact_wxid, "addTime":int(time.time()) }) await gewe_service.save_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid,history) wixd_add_contacts_from_chatrooms_times[wxid]+=1 logger.info(f'{nickname} 向 {chatroom_nickname}-{chatroom_id} 群的 {m.get("nickName")}-{m.get("wxid")} 发送好友邀请 {msg}') # 推送到kafka k_message = wx_add_contacts_from_chatroom_message(history.wxid,history.chatroomId,history.contactWixd,history.addTime) await kafka_service.send_message_async(k_message) await asyncio.sleep(random.uniform(1.5, 3)) # 任务状态推送到kafka task_status=await gewe_service.wx_add_contacts_from_chatroom_task_status_async(wxid,chatroom_id) wx_add_contacts_from_chatroom_task_status(wxid,chatroom_id,task_status) await kafka_service.send_message_async(k_message) # 下一个群 await asyncio.sleep(random.uniform(1.5, 3)) except Exception as e: logger.error(f"任务执行过程中发生异常: {e}") finally: await kafka_service.stop_producer() loop = asyncio.get_event_loop() if loop.is_closed(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(task()) # 在现有事件循环中运行任务 REDIS_KEY_PATTERN = "friend_add_limit:{date}" REDIS_LAST_RUN_KEY = "last_run_time:add_friends_task" @celery_app.task(name='tasks.add_friends_task', bind=True, acks_late=True) def add_friends_task(self,redis_config): """ 限制每天最多 15 个,每 2 小时最多 8 个 """ async def task(): redis_service = RedisService() await redis_service.init(**redis_config) today_str = datetime.now().strftime("%Y%m%d") redis_key = REDIS_KEY_PATTERN.format(date=today_str) # 获取当前总添加数量 total_added = await redis_service.get_hash_field(redis_key, "total") or 0 last_2h_added =await redis_service.get_hash_field(redis_key, "last_2h") or 0 total_added = int(total_added) last_2h_added = int(last_2h_added) logger.info(f"当前添加好友总数: {total_added}, 过去2小时添加: {last_2h_added}") # 判断是否超过限制 if total_added >= 15: logger.warning("今日好友添加已达上限!") return if last_2h_added >= 8: logger.warning("过去2小时添加已达上限!") return # 计算本次要添加的好友数量 (控制每天 5-15 个) max_add = min(15 - total_added, 8 - last_2h_added) if max_add <= 0: return num_to_add = min(max_add, 1) # 每次最多加 1 个 logger.info(f"本次添加 {num_to_add} 位好友") # TODO: 调用好友添加逻辑 (接口 or 业务逻辑) # success = add_friends(num_to_add) success = num_to_add # 假设成功添加 num_to_add 个 # 更新 Redis 计数 if success > 0: await redis_service.increment_hash_field(redis_key, "total", success) await redis_service.increment_hash_field(redis_key, "last_2h", success) # 设置 Redis 过期时间 (每日记录存 1 天, 2 小时记录存 2 小时) await redis_service.expire(redis_key, 86400) # 24小时 await redis_service.expire_field(redis_key, "last_2h", 7200) # 2小时 logger.info(f"成功添加 {success} 位好友, 今日总数 {total_added + success}") # 生成一个新的随机时间(5-15 分钟之间) # next_interval = random.randint(10, 20) # # 计算新的执行时间 # next_run_time = datetime.datetime.now() + timedelta(seconds=next_interval) # # 重新注册 RedBeat 任务,确保下次执行时间不同 # redbeat_entry = RedBeatSchedulerEntry( # name="redbeat:add_friends_task", # task="tasks.add_friends_task", # schedule=celery.schedules.schedule(timedelta(seconds=next_interval)), # args=[redis_config], # app=celery_app # ) # # 设置任务的下次执行时间 # redbeat_entry.last_run_at = next_run_time # redbeat_entry.save() # logger.info(f"下次任务将在 {next_run_time} 执行(间隔 {next_interval} 秒)") loop = asyncio.get_event_loop() if loop.is_closed(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(task()) # 在现有事件循环中运行任务 @celery_app.task(name='tasks.random_scheduled_task', bind=True, acks_late=True) def random_scheduled_task(self,): print(f"Task executed at {datetime.now()}") # 随机生成下次执行时间(例如:10-60秒内的随机时间) next_run_in = random.randint(10, 60) print(f"Next execution will be in {next_run_in} seconds") # 设置下次执行时间 entry = RedBeatSchedulerEntry( name='random-task', task='tasks.random_scheduled_task', schedule=timedelta(seconds=next_run_in), app=celery_app ) entry.save() return f"Scheduled next run in {next_run_in} seconds"