|
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153 |
- from celery_app import celery_app
- from fastapi import Request,FastAPI
- import time,datetime
- from celery import Celery
- import celery.schedules
- from redbeat import RedBeatSchedulerEntry
- from datetime import timedelta
-
- from services.redis_service import RedisService
- from services.kafka_service import KafkaService
- from services.gewe_service import GeWeService
- # from common.log import logger
- from common.utils import *
- import asyncio,random
- from model.models import AddGroupContactsHistory
- import logging
- from model.models import AgentConfig
-
- import logging
- import sys,traceback
-
- logger = logging.getLogger('redbeat')
-
-
- @celery_app.task(name='tasks.add_task', bind=True, acks_late=True)
- def add_task(self, x, y):
-
- time.sleep(5) # 模拟长时间计算
- logger.info('add')
- return x + y
-
-
- @celery_app.task(name='tasks.mul_task', bind=True, acks_late=True)
- def mul_task(self, x, y):
- time.sleep(5) # 模拟长时间计算
- return x * y
-
-
- # @celery.task(name='app.tasks.sync_contacts', bind=True, acks_late=True)
- # async def sync_contacts_task(self,app):
- # login_keys = list(await app.state.redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'))
- # return login_keys
- # # for k in login_keys:
- # # print(k)
-
- @celery_app.task(name='tasks.sync_contacts', bind=True, acks_late=True)
- async def sync_contacts_task(self, redis_service):
- # Use the redis_service passed as an argument
- login_keys = list(await redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'))
- return login_keys
-
-
- @celery_app.task(name='tasks.background_worker_task', bind=True, acks_late=True)
- def background_worker_task(self, redis_config, kafka_config, gewe_config):
- async def task():
- redis_service = RedisService()
- await redis_service.init(**redis_config)
- login_keys = []
- async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
- login_keys.append(key)
- print(login_keys)
-
- asyncio.run(task())
-
- # @celery.task(name='tasks.background_worker_task', bind=True, acks_late=True)
- # async def background_worker_task(self, redis_config, kafka_config, gewe_config):
- # # Initialize services inside the task
- # redis_service = RedisService()
- # await redis_service.init(**redis_config)
-
- # login_keys = []
- # async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): # 使用 async for 遍历异步生成器
- # login_keys.append(key)
-
- # print(login_keys)
-
- # kafka_service = KafkaService(**kafka_config)
- # await kafka_service.start()
-
- # gewe_service = await GeWeService.get_instance(None, gewe_config['api_url'])
-
- # # Task logic
- # lock_name = "background_wxchat_thread_lock"
- # lock_identifier = str(time.time())
- # while True:
- # if await redis_service.acquire_lock(lock_name, timeout=10):
- # try:
- # logger.info("分布式锁已成功获取")
- # # Perform task logic
- # finally:
- # await redis_service.release_lock(lock_name, lock_identifier)
- # break
- # else:
- # logger.info("获取分布式锁失败,等待10秒后重试...")
- # await asyncio.sleep(10)
-
-
-
- # @celery_app.task(name='tasks.scheduled_task', bind=True, acks_late=True)
- # def scheduled_task(self):
- # print("定时任务执行成功!~~~~~~~~~~~~~~~~~")
- # return "Hello from Celery Beat + RedBeat!"
-
-
- # @celery_app.task(name='tasks.scheduled_task_sync_wx', bind=True, acks_late=True)
- # def scheduled_task_sync_wx(self,redis_service,kafka_service,gewe_service):
- # print("scheduled_task_sync_wx 定时任务执行成功!")
- # return "Hello from Celery Beat + RedBeat!"
-
- # @celery_app.task(name='tasks.scheduled_task_sync_wx_info_1', bind=True, acks_late=True)
- # def scheduled_task_sync_wx_info_1(self,redis_config, kafka_config, gewe_config):
- # '''
- # 定时获取微信号资料
- # '''
- # loop = asyncio.new_event_loop()
- # asyncio.set_event_loop(loop)
- # async def task():
- # try:
- # redis_service = RedisService()
- # await redis_service.init(**redis_config)
- # # gewe_service = await GeWeService.get_instance(None, gewe_config['api_url'])
- # login_keys = []
- # async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
- # login_keys.append(key)
-
- # print(login_keys)
- # # for k in login_keys:
- # # r = await redis_service.get_hash(k)
- # # app_id = r.get("appId")
- # # token_id = r.get("tokenId")
- # # wxid = r.get("wxid")
- # # status = r.get('status')
- # # if status == '0':
- # # continue
- # # ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id)
- # # if ret != 200:
- # # logger.warning(f"同步微信号 {wxid} 资料失败: {ret}-{msg}")
- # # continue
- # # nickname=profile.get("nickName")
- # # head_img_url=profile.get("smallHeadImgUrl")
- # # r.update({"nickName":nickname,"headImgUrl":head_img_url,"modify_at":int(time.time())})
- # # cleaned_login_info = {k: (v if v is not None else '') for k, v in r.items()}
- # # await redis_service.set_hash(k, cleaned_login_info)
- # # logger.info(f"同步微信号 {wxid} 资料 成功")
- # # redis_service.update_hash_field(k,"nickName",nickname)
- # # redis_service.update_hash_field(k,"headImgUrl",head_img_url)
- # # redis_service.update_hash_field(k,"modify_at",int(time.time()))
- # except Exception as e:
- # logger.error(f"任务执行过程中发生异常: {e}")
-
- # print("scheduled_task_sync_wx_info 定时任务执行成功!")
- # return "Hello from Celery Beat + RedBeat!"
-
- # loop.run_until_complete(task())
- # loop.close()
-
-
-
- # @celery_app.task(name='tasks.scheduled_task_sync_wx_info', bind=True, acks_late=True)
- # def scheduled_task_sync_wx_info(self, redis_config, kafka_config, gewe_config):
- # '''
- # 定时获取微信号资料
- # '''
- # async def task():
- # try:
- # redis_service = RedisService()
- # await redis_service.init(**redis_config)
- # gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url'])
-
- # login_keys = []
- # async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
- # login_keys.append(key)
-
- # #print(login_keys)
- # for k in login_keys:
- # r = await redis_service.get_hash(k)
- # app_id = r.get("appId")
- # token_id = r.get("tokenId")
- # wxid = r.get("wxid")
- # status = r.get('status')
- # if status == '0':
- # logger.warning(f"微信号 {wxid} 已经离线")
- # continue
- # ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id)
- # if ret != 200:
- # logger.warning(f"同步微信号 {wxid} 资料失败: {ret}-{msg}")
- # continue
- # nickname=profile.get("nickName")
- # head_img_url=profile.get("smallHeadImgUrl")
- # # print(nickname)
-
- # nickname=profile.get("nickName")
- # head_img_url=profile.get("smallHeadImgUrl")
- # r.update({"nickName":nickname,"headImgUrl":head_img_url,"modify_at":int(time.time())})
- # cleaned_login_info = {k: (v if v is not None else '') for k, v in r.items()}
- # await redis_service.set_hash(k, cleaned_login_info)
- # logger.info(f"定时同步微信号{wxid}-昵称{nickname} 资料成功")
-
- # except Exception as e:
- # logger.error(f"任务执行过程中发生异常: {e}")
-
- # loop = asyncio.get_event_loop()
- # if loop.is_closed():
- # loop = asyncio.new_event_loop()
- # asyncio.set_event_loop(loop)
-
- # loop.run_until_complete(task()) # 在现有事件循环中运行任务
-
-
- @celery_app.task(name='tasks.scheduled_task_sync_wx_info', bind=True, acks_late=True)
- def scheduled_task_sync_wx_info(self, redis_config, kafka_config, gewe_config):
- '''
- 定时获取微信号资料
- '''
- async def process_key(redis_service, gewe_service, semaphore, key):
- async with semaphore: # 使用 Semaphore 控制并发
- try:
- r = await redis_service.get_hash(key)
- app_id = r.get("appId")
- token_id = r.get("tokenId")
- wxid = r.get("wxid")
- status = r.get('status')
- if status == '0':
- logger.warning(f"微信号 {wxid} 已经离线")
- return
- ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id)
- if ret != 200:
- logger.warning(f"同步微信号 {wxid} 资料失败: {ret}-{msg}")
- return
- nickname = profile.get("nickName")
- head_img_url = profile.get("smallHeadImgUrl")
- r.update({"nickName": nickname, "headImgUrl": head_img_url, "modify_at": int(time.time())})
- cleaned_login_info = {k: (v if v is not None else '') for k, v in r.items()}
- await redis_service.set_hash(key, cleaned_login_info)
- logger.info(f"定时同步微信号{wxid}-昵称{nickname} 资料成功")
- except Exception as e:
- logger.error(f"处理键 {key} 时发生异常: {e}")
-
- async def task():
- try:
- redis_service = RedisService()
- await redis_service.init(**redis_config)
- gewe_service = await GeWeService.get_instance(redis_service, gewe_config['api_url'])
- login_keys = []
- async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
- login_keys.append(key)
-
- # 设置 Semaphore,限制并发数为 10
- semaphore = asyncio.Semaphore(10)
- # 使用 asyncio.gather 并发处理所有键
- await asyncio.gather(*[process_key(redis_service, gewe_service, semaphore, key) for key in login_keys])
- except Exception as e:
- logger.error(f"任务执行过程中发生异常: {e}")
-
- loop = asyncio.get_event_loop()
- if loop.is_closed():
- loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
- loop.run_until_complete(task()) # 在现有事件循环中运行任务
-
-
-
-
- # @celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms', bind=True, acks_late=True)
- # def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config, gewe_config):
-
- # '''
- # 关于群加好友的请求规则: 每个智能体一次最多30人,间隔2小时做1次,即最多90人/天。
- # 加好友规则:每天处理次数、间隔时间(分钟)、每次加好友人数这3个参数都可以设置。目前默认只是上面的设置。
- # '''
- # async def task():
- # try:
- # now = datetime.now()
- # if now.hour < 8:
- # logger.info(f"定时群成员定时添好友任务不启动,当前时间为 {now.strftime('%Y-%m-%d %H:%M:%S')},早于8点")
- # return
-
- # logger.info('定时群成员定时添好友任务开始')
- # redis_service = RedisService()
- # await redis_service.init(**redis_config)
-
- # gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url'])
-
- # KAFKA_BOOTSTRAP_SERVERS=kafka_config['bootstrap_servers']
- # KAFKA_TOPIC=kafka_config['topic']
- # KAFKA_GROUP_ID=kafka_config['group_id']
-
- # kafka_service= KafkaService(KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC, KAFKA_TOPIC,KAFKA_GROUP_ID)
- # await kafka_service.start_producer()
-
- # global_config=await gewe_service.get_global_config_from_cache_async()
- # scheduled_task_add_contacts_from_chatrooms_config=global_config.get('scheduledTaskAddContactsFromChatrooms',{})
-
- # oneday_add_contacts_total=90
- # once_add_contacts_total=30
- # #oneday_times=3
-
- # if scheduled_task_add_contacts_from_chatrooms_config:
- # oneday_add_contacts_total=scheduled_task_add_contacts_from_chatrooms_config.get('oneDayAddContactsTotal',90)
- # once_add_contacts_total=scheduled_task_add_contacts_from_chatrooms_config.get('onceAddContactsTotal',30)
- # #oneday_times=scheduled_task_add_contacts_from_chatrooms_config.get('oneDayTimes',3)
-
- # # cache_task_run_time_logs= await gewe_service.get_task_run_time_async('scheduled_task_add_contacts_from_chatrooms')
- # # if cache_task_run_time_logs:
- # # sorted_tasks = sorted(cache_task_run_time_logs, key=lambda x: x.get("runTime"), reverse=True)
- # # last_run_time=sorted_tasks[0].get("runTime")
-
- # # if last_run_time > 1e12: # 毫秒级时间戳
- # # last_run_time = last_run_time / 1000 # 转换为秒
-
- # # # 将时间戳转换为 datetime 对象
- # # last_run_time = datetime.fromtimestamp(last_run_time)
-
- # # # 获取当前时间
- # # current_time = datetime.now()
-
- # # # 计算时间差
- # # time_difference = current_time - last_run_time
-
- # # # 判断是否相差2小时
- # # if time_difference < timedelta(hours=2):
- # # logger.info(f"上次定时群成员定时添好友任务在2小时内,不再执行")
- # # return
-
- # # time_difference_seconds = today_seconds_remaining()
- # # cache_task_run_time_logs.append({"runTime":int(time.time())})
- # # await gewe_service.save_task_run_time_async('scheduled_task_add_contacts_from_chatrooms',cache_task_run_time_logs,time_difference_seconds)
-
- # login_keys = []
- # async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
- # login_keys.append(key)
-
- # wixd_add_contacts_from_chatrooms_times = {}
-
- # for k in login_keys:
- # r = await redis_service.get_hash(k)
- # app_id = r.get("appId")
- # token_id = r.get("tokenId")
- # wxid = r.get("wxid")
- # status = r.get('status')
- # if status == '0':
- # logger.warning(f"微信号 {wxid} 已经离线,群成员不能定时添加")
- # continue
-
- # config=await gewe_service.get_wxchat_config_from_cache_async(wxid)
- # validated_config = AgentConfig.model_validate(config)
- # if not validated_config.agentEnabled:
- # logger.warning(f"微信号 {wxid} 取消了托管,群成员不能定时添加")
- # continue
-
-
- # # 判断是否过于频繁
- # is_wx_expection = await gewe_service.get_wx_expection_async(wxid,"addGroupMemberAsFriend")
- # if is_wx_expection:
- # logger.info(f"{wxid} 本次任务接口addGroupMemberAsFriend异常, {is_wx_expection},本次群好友邀请任务未开始,跳过任务。")
- # continue
-
-
- # cache_task_run_time_wxid_logs= await gewe_service.get_task_run_time_by_wxid_async(wxid,'scheduled_task_add_contacts_from_chatrooms')
- # if cache_task_run_time_wxid_logs:
- # sorted_tasks = sorted(cache_task_run_time_wxid_logs, key=lambda x: x.get("runTime"), reverse=True)
- # last_run_time=sorted_tasks[0].get("runTime")
-
- # if last_run_time > 1e12: # 毫秒级时间戳
- # last_run_time = last_run_time / 1000 # 转换为秒
-
- # # 将时间戳转换为 datetime 对象
- # last_run_time = datetime.fromtimestamp(last_run_time)
-
- # # 获取当前时间
- # current_time = datetime.now()
-
- # # 计算时间差
- # time_difference = current_time - last_run_time
-
- # # 判断是否相差2小时
- # if time_difference < timedelta(hours=2):
- # logger.info(f"{wxid}上次定时群成员定时添好友任务在2小时内,不再执行")
- # continue
-
-
- # cache_task_run_time_wxid_logs.append({"runTime":int(time.time())})
- # await gewe_service.save_task_run_time_by_wxid_async(wxid,'scheduled_task_add_contacts_from_chatrooms',cache_task_run_time_wxid_logs,3600*2)
-
-
-
- # c = await gewe_service.get_wxchat_config_from_cache_async(wxid)
- # contacts = await gewe_service.get_contacts_brief_from_cache_async(wxid)
-
- # contact_wxids = [c.get('userName') for c in contacts]
- # chatrooms = c.get('addContactsFromChatroomIdWhiteList', [])
- # logger.info(f'{wxid} 定时群成员定时添好友任务开始')
- # wixd_add_contacts_from_chatrooms_times[wxid] = 0
- # for chatroom_id in chatrooms:
- # chatroom = await gewe_service.get_group_info_from_cache_async(wxid, chatroom_id)
- # chatroom_member=await gewe_service.get_group_members_from_cache_async(wxid, chatroom_id)
-
- # chatroom_nickname = chatroom.get('nickName')
- # chatroom_owner_wxid = chatroom_member.get('chatroomOwner', None)
-
- # admin_wxids = chatroom_member.get('adminWxid', [])
-
- # admin_wxids = chatroom_member.get('adminWxid')
- # if admin_wxids is None:
- # admin_wxids = [] # 如果 admin_wxids 是 None,将其初始化为空列表
-
- # logger.info(f'{chatroom_nickname} 的群主是 {chatroom_owner_wxid},管理员是{admin_wxids}')
- # contact_wxids_set = set(contact_wxids)
- # # for admin_wxid in admin_wxids:
- # # contact_wxids_set.add(admin_wxid)
- # if admin_wxids:
- # contact_wxids_set.update(set(admin_wxids))
- # if chatroom_owner_wxid is not None:
- # contact_wxids_set.add(chatroom_owner_wxid)
-
- # contact_wxids_set.add(wxid)
-
- # # unavailable_wixds=await gewe_service.check_wixd_group_add_contacts_history_async(wxid,chatroom_id)
- # # contact_wxids_set.update(set(unavailable_wixds))
-
- # # chatroom_member_list = chatroom.get('memberList', [])
-
-
- # unavailable_wixds=await gewe_service.check_wixd_group_add_contacts_history_async(wxid,chatroom_id)
- # if unavailable_wixds:
- # contact_wxids_set.update(set(unavailable_wixds))
-
- # chatroom_member_list = chatroom.get('memberList', [])
- # if chatroom_member_list is None:
- # chatroom_member_list = [] # 如果 memberList 是 None,将其初始化为空列表
- # elif not isinstance(chatroom_member_list, list):
- # chatroom_member_list = list(chatroom_member_list) # 如果 memberList 不是列表,将其转换为列表
-
-
- # remaining_chatroot_members = [x for x in chatroom_member_list if x.get('wxid') not in contact_wxids_set]
-
- # nickname = next((member['nickName'] for member in chatroom_member_list if member['wxid'] == wxid), None)
-
- # if not remaining_chatroot_members:
- # logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里没有好友可以邀请')
- # # 任务状态推送到kafka
- # k_message=wx_add_contacts_from_chatroom_task_status_message(wxid,chatroom_id,2)
- # await kafka_service.send_message_async(k_message)
- # continue
-
- # logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里还可以邀请的好友有:{[x.get("nickName") for x in remaining_chatroot_members]}')
- # for m in remaining_chatroot_members:
- # # 判断本次任务是否已经邀请了30个好友
- # if wixd_add_contacts_from_chatrooms_times[wxid] == once_add_contacts_total:
- # logger.info(f"{wxid} 本次任务已经邀请了{once_add_contacts_total}人,不再邀请")
- # continue
- # # 判断当天群成员是否已经加了90个好友
- # is_add_group_times = await gewe_service.is_group_add_contacts_history_one_day_async(wxid,oneday_add_contacts_total)
- # if is_add_group_times:
- # logger.info(f"当天 {wxid} 所有群的成员已经加了{oneday_add_contacts_total}个好友,不再添加")
- # continue
-
- # # 判断是否过于频繁
- # is_wx_expection = await gewe_service.get_wx_expection_async(wxid,"addGroupMemberAsFriend")
- # if is_wx_expection:
- # logger.info(f"{wxid} 本次任务接口addGroupMemberAsFriend异常,不再邀请,{is_wx_expection}")
- # continue
-
- # contact_wxid= m.get('wxid')
- # member_nickname=m.get("nickName")
- # group_add_contacts_history = await gewe_service.get_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid)
-
- # if group_add_contacts_history:
- # sorted_history = sorted(group_add_contacts_history, key=lambda x: x.addTime, reverse=True)
-
- # # 已经邀请过两次,不再邀请
- # if len(sorted_history)==2:
- # logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经邀请过2次,不再邀请')
- # continue
-
- # # 当天邀请过,不再邀请
- # if len(sorted_history) > 0:
- # last_add_time = sorted_history[0].addTime
- # def is_add_time_more_than_one_day(addTime: int) -> bool:
- # """
- # 判断 addTime 是否与当前时间相隔大于 3600 × 24 秒
- # :param addTime: Unix 时间戳
- # :return: 如果 addTime 与当前时间相隔大于 3600 × 24 秒,返回 True;否则返回 False
- # """
- # # 获取当前时间的时间戳
- # current_time = time.time()
-
- # # 计算时间戳差值
- # time_difference = abs(current_time - addTime)
-
- # # 检查是否大于 3600 × 24 秒
- # return time_difference > 3600 * 24
-
- # is_more_than_one_day= is_add_time_more_than_one_day(last_add_time)
-
- # if not is_more_than_one_day:
- # logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经当天邀请,不再邀请')
- # continue
-
- # ret, msg, data = await gewe_service.add_group_member_as_friend_async(token_id, app_id, chatroom_id, m.get('wxid'), f'我是群聊"{chatroom_nickname}"群的{nickname}')
- # if ret!=200:
- # logger.warning(f'群好友邀请失败原因:{ret} {data}')
- # if msg in '操作过于频繁,请稍后再试。':
- # await gewe_service.save_wx_expection_async(wxid,"addGroupMemberAsFriend",msg,today_seconds_remaining())
- # logger.warning(f'{nickname}-{wxid} 操作过于频繁,本次群好友邀请任务未完成跳过。')
- # continue
-
- # history=AddGroupContactsHistory.model_validate({
- # "chatroomId":chatroom_id,
- # "wxid":wxid,
- # "contactWixd":contact_wxid,
- # "addTime":int(time.time())
- # })
- # await gewe_service.save_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid,history)
- # wixd_add_contacts_from_chatrooms_times[wxid]+=1
- # logger.info(f'{nickname} 向 {chatroom_nickname}-{chatroom_id} 群的 {m.get("nickName")}-{m.get("wxid")} 发送好友邀请 {msg}')
- # # 推送到kafka
-
- # k_message = wx_add_contacts_from_chatroom_message(history.wxid,history.chatroomId,history.contactWixd,history.addTime)
- # await kafka_service.send_message_async(k_message)
- # #await asyncio.sleep(random.uniform(1.5, 3))
- # await asyncio.sleep(random.uniform(30,60))
-
- # # 任务状态推送到kafka
- # task_status=await gewe_service.wx_add_contacts_from_chatroom_task_status_async(wxid,chatroom_id)
- # k_message=wx_add_contacts_from_chatroom_task_status_message(wxid,chatroom_id,task_status)
- # await kafka_service.send_message_async(k_message)
- # # 下一个群
- # await asyncio.sleep(random.uniform(1.5, 3))
- # except Exception as e:
- # # 获取当前的堆栈跟踪
- # tb = sys.exc_info()[2]
- # # 为异常附加堆栈跟踪
- # e = e.with_traceback(tb)
- # # 输出详细的错误信息
- # logger.error(f"任务执行过程中发生异常: {e}\n异常类型: {type(e).__name__}\n异常信息: {str(e)}\n堆栈跟踪: {traceback.format_exc()}")
- # finally:
- # await kafka_service.stop_producer()
-
- # loop = asyncio.get_event_loop()
- # if loop.is_closed():
- # loop = asyncio.new_event_loop()
- # asyncio.set_event_loop(loop)
-
- # loop.run_until_complete(task()) # 在现有事件循环中运行任务
-
-
- @celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms', bind=True, acks_late=True)
- def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config, gewe_config):
-
- '''
- 关于群加好友的请求规则: 每个智能体一次最多30人,间隔2小时做1次,即最多90人/天。
- 加好友规则:每天处理次数、间隔时间(分钟)、每次加好友人数这3个参数都可以设置。目前默认只是上面的设置。
- '''
- async def process_login_key(redis_service:RedisService, gewe_service: GeWeService, kafka_service:KafkaService, k, gewe_config, oneday_add_contacts_total, once_add_contacts_total, semaphore):
- async with semaphore: # 使用 Semaphore 控制并发
- r = await redis_service.get_hash(k)
- app_id = r.get("appId")
- token_id = r.get("tokenId")
- wxid = r.get("wxid")
- status = r.get('status')
-
- if status == '0':
- logger.warning(f"微信号 {wxid} 已经离线,群成员不能定时添加")
- return
-
- config = await gewe_service.get_wxchat_config_from_cache_async(wxid)
- validated_config = AgentConfig.model_validate(config)
- if not validated_config.agentEnabled:
- logger.warning(f"微信号 {wxid} 取消了托管,群成员不能定时添加")
- return
-
- # 判断是否过于频繁
- is_wx_expection = await gewe_service.get_wx_expection_async(wxid, "addGroupMemberAsFriend")
- if is_wx_expection:
- logger.info(f"{wxid} 本次任务接口addGroupMemberAsFriend异常, {is_wx_expection},本次群好友邀请任务未开始,跳过任务。")
- return
-
- cache_task_run_time_wxid_logs = await gewe_service.get_task_run_time_by_wxid_async(wxid, 'scheduled_task_add_contacts_from_chatrooms')
- if cache_task_run_time_wxid_logs:
- sorted_tasks = sorted(cache_task_run_time_wxid_logs, key=lambda x: x.get("runTime"), reverse=True)
- last_run_time = sorted_tasks[0].get("runTime")
- if last_run_time > 1e12: # 毫秒级时间戳
- last_run_time = last_run_time / 1000 # 转换为秒
-
- # 将时间戳转换为 datetime 对象
- last_run_time = datetime.fromtimestamp(last_run_time)
-
- # 获取当前时间
- current_time = datetime.now()
-
- # 计算时间差
- time_difference = current_time - last_run_time
-
- # 判断是否相差2小时
- if time_difference < timedelta(hours=2):
- logger.info(f"{wxid}上次定时群成员定时添好友任务在2小时内,不再执行")
- return
-
- cache_task_run_time_wxid_logs.append({"runTime": int(time.time())})
- await gewe_service.save_task_run_time_by_wxid_async(wxid, 'scheduled_task_add_contacts_from_chatrooms', cache_task_run_time_wxid_logs, 3600 * 2)
- c:dict = await gewe_service.get_wxchat_config_from_cache_async(wxid)
- contacts = await gewe_service.get_contacts_brief_from_cache_async(wxid)
-
- contact_wxids = [c.get('userName') for c in contacts]
- chatrooms = c.get('addContactsFromChatroomIdWhiteList', [])
- logger.info(f'{wxid} 定时群成员定时添好友任务开始')
- wixd_add_contacts_from_chatrooms_times = {wxid: 0}
- for chatroom_id in chatrooms:
- chatroom = await gewe_service.get_group_info_from_cache_async(wxid, chatroom_id)
- chatroom_member = await gewe_service.get_group_members_from_cache_async(wxid, chatroom_id)
-
- chatroom_nickname = chatroom.get('nickName')
- chatroom_owner_wxid = chatroom_member.get('chatroomOwner', None)
- admin_wxids = chatroom_member.get('adminWxid', [])
- admin_wxids = chatroom_member.get('adminWxid')
- if admin_wxids is None:
- admin_wxids = [] # 如果 admin_wxids 是 None,将其初始化为空列表
-
- logger.info(f'{chatroom_nickname} 的群主是 {chatroom_owner_wxid},管理员是{admin_wxids}')
- contact_wxids_set = set(contact_wxids)
- if admin_wxids:
- contact_wxids_set.update(set(admin_wxids))
- if chatroom_owner_wxid is not None:
- contact_wxids_set.add(chatroom_owner_wxid)
-
- contact_wxids_set.add(wxid)
- unavailable_wixds = await gewe_service.check_wixd_group_add_contacts_history_async(wxid, chatroom_id)
- if unavailable_wixds:
- contact_wxids_set.update(set(unavailable_wixds))
-
- chatroom_member_list = chatroom.get('memberList', [])
- if chatroom_member_list is None:
- chatroom_member_list = [] # 如果 memberList 是 None,将其初始化为空列表
- elif not isinstance(chatroom_member_list, list):
- chatroom_member_list = list(chatroom_member_list) # 如果 memberList 不是列表,将其转换为列表
-
- remaining_chatroot_members = [x for x in chatroom_member_list if x.get('wxid') not in contact_wxids_set]
-
- nickname = next((member['nickName'] for member in chatroom_member_list if member['wxid'] == wxid), None)
- if not remaining_chatroot_members:
- logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname}-{chatroom_id} 群里没有好友可以邀请')
- # 任务状态推送到kafka
- k_message = wx_add_contacts_from_chatroom_task_status_message(wxid, chatroom_id, 2)
- await kafka_service.send_message_async(k_message)
- continue
-
- logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里还可以邀请的好友有:{[x.get("nickName") for x in remaining_chatroot_members]}')
- for m in remaining_chatroot_members:
- # 判断本次任务是否已经邀请了30个好友
- if wixd_add_contacts_from_chatrooms_times[wxid] == once_add_contacts_total:
- logger.info(f"{wxid} 本次任务已经邀请了{once_add_contacts_total}人,不再邀请")
- return
-
- # 判断当天群成员是否已经加了90个好友
- is_add_group_times = await gewe_service.is_group_add_contacts_history_one_day_async(wxid, oneday_add_contacts_total)
- if is_add_group_times:
- logger.info(f"当天 {wxid} 所有群的成员已经加了{oneday_add_contacts_total}个好友,不再添加")
- return
-
- # 判断是否过于频繁
- is_wx_expection = await gewe_service.get_wx_expection_async(wxid, "addGroupMemberAsFriend")
- if is_wx_expection:
- logger.info(f"{wxid} 本次任务接口addGroupMemberAsFriend异常,不再邀请,{is_wx_expection}")
- return
-
- contact_wxid = m.get('wxid')
- member_nickname = m.get("nickName")
- group_add_contacts_history = await gewe_service.get_group_add_contacts_history_async(wxid, chatroom_id, contact_wxid)
-
- if group_add_contacts_history:
- sorted_history = sorted(group_add_contacts_history, key=lambda x: x.addTime, reverse=True)
-
- # 已经邀请过两次,不再邀请
- if len(sorted_history) == 2:
- logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经邀请过2次,不再邀请')
- continue
-
- # 24小时邀请过,不再邀请
- if len(sorted_history) > 0:
- last_add_time = sorted_history[0].addTime
-
- def is_add_time_more_than_one_day(addTime: int) -> bool:
- """
- 判断 addTime 是否与当前时间相隔大于 3600 × 24 秒
- :param addTime: Unix 时间戳
- :return: 如果 addTime 与当前时间相隔大于 3600 × 24 秒,返回 True;否则返回 False
- """
- # 获取当前时间的时间戳
- current_time = time.time()
-
- # 计算时间戳差值
- time_difference = abs(current_time - addTime)
-
- # 检查是否大于 3600 × 24 秒
- return time_difference > 3600 * 24
-
- is_more_than_one_day = is_add_time_more_than_one_day(last_add_time)
- if not is_more_than_one_day:
- logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经24小时邀请,不再邀请')
- continue
-
- ret, msg, data = await gewe_service.add_group_member_as_friend_async(token_id, app_id, chatroom_id, m.get('wxid'),
- f'我是群聊"{chatroom_nickname}"群的{nickname}')
-
- history = AddGroupContactsHistory.model_validate({
- "chatroomId": chatroom_id,
- "wxid": wxid,
- "contactWixd": contact_wxid,
- "addTime": int(time.time())
- })
-
- if ret != 200:
- logger.warning(f'{nickname}-{wxid} 在 {chatroom_nickname}-{chatroom_id} 群好友 {member_nickname}-{contact_wxid} 邀请失败原因:{ret} {msg} {data}')
- if '操作过于频繁' in data.get('msg'):
- await gewe_service.save_wx_expection_async(wxid, "addGroupMemberAsFriend", msg, today_seconds_remaining())
- await gewe_service.save_group_add_contacts_history_async(wxid, chatroom_id, contact_wxid, history)
- logger.warning(f'{nickname}-{wxid} 在 {chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 操作过于频繁,本次群好友邀请任务未完成跳过。当天不再处理该号群好友邀请任务')
- return
-
-
- await gewe_service.save_group_add_contacts_history_async(wxid, chatroom_id, contact_wxid, history)
- wixd_add_contacts_from_chatrooms_times[wxid] += 1
- logger.info(f'{nickname} 向 {chatroom_nickname}-{chatroom_id} 群的 {m.get("nickName")}-{m.get("wxid")} 发送好友邀请 {msg} \n {data} \n 当前已邀请好友数:{wixd_add_contacts_from_chatrooms_times[wxid]}')
- # 推送到kafka
- k_message = wx_add_contacts_from_chatroom_message(history.wxid, history.chatroomId, history.contactWixd, history.addTime)
- await kafka_service.send_message_async(k_message)
- # await asyncio.sleep(random.uniform(1.5, 3))
- # await asyncio.sleep(random.uniform(30, 60))
- await asyncio.sleep(random.uniform(270,300))
- # 任务状态推送到kafka
- task_status = await gewe_service.wx_add_contacts_from_chatroom_task_status_async(wxid, chatroom_id)
- k_message=wx_add_contacts_from_chatroom_task_status_message(wxid, chatroom_id, task_status)
- await kafka_service.send_message_async(k_message)
- # 下一个群
- await asyncio.sleep(random.uniform(1.5, 3))
-
- async def task():
- try:
- now = datetime.now()
- # if 10> now.hour < 8:
- # logger.info(f"定时群成员定时添好友任务不启动,当前时间为 {now.strftime('%Y-%m-%d %H:%M:%S')},早于8点")
- # return
-
- if now.hour < 8 or now.hour > 22:
- logger.info(f"定时群成员定时添好友任务不启动, 当前时间为 {now.strftime('%Y-%m-%d %H:%M:%S')},不在8点到22点之间")
- return
-
- logger.info('定时群成员定时添好友任务开始')
- redis_service = RedisService()
- await redis_service.init(**redis_config)
- gewe_service = await GeWeService.get_instance(redis_service, gewe_config['api_url'])
- KAFKA_BOOTSTRAP_SERVERS = kafka_config['bootstrap_servers']
- KAFKA_TOPIC = kafka_config['topic']
- KAFKA_GROUP_ID = kafka_config['group_id']
-
- kafka_service = KafkaService(KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC, KAFKA_TOPIC, KAFKA_GROUP_ID)
- await kafka_service.start_producer()
- global_config = await gewe_service.get_global_config_from_cache_async()
- scheduled_task_add_contacts_from_chatrooms_config = global_config.get('scheduledTaskAddContactsFromChatrooms', {})
-
- oneday_add_contacts_total = 90
- once_add_contacts_total = 30
- # oneday_times=3
- if scheduled_task_add_contacts_from_chatrooms_config:
- oneday_add_contacts_total = scheduled_task_add_contacts_from_chatrooms_config.get('oneDayAddContactsTotal', 90)
- once_add_contacts_total = scheduled_task_add_contacts_from_chatrooms_config.get('onceAddContactsTotal', 30)
- # oneday_times=scheduled_task_add_contacts_from_chatrooms_config.get('oneDayTimes',3)
-
- login_keys = []
- async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
- login_keys.append(key)
-
- # 设置 Semaphore,限制并发任务数量
- semaphore = asyncio.Semaphore(10) # 例如,限制同时运行的任务数量为 10
- # 使用 asyncio.gather 并发处理每个 login_key
- await asyncio.gather(*[process_login_key(redis_service, gewe_service, kafka_service, k, gewe_config, oneday_add_contacts_total, once_add_contacts_total, semaphore) for k in login_keys])
-
- except Exception as e:
- # 获取当前的堆栈跟踪
- tb = sys.exc_info()[2]
- # 为异常附加堆栈跟踪
- e = e.with_traceback(tb)
- # 输出详细的错误信息
- logger.error(f"任务执行过程中发生异常: {e}\n异常类型: {type(e).__name__}\n异常信息: {str(e)}\n堆栈跟踪: {traceback.format_exc()}")
- finally:
- await kafka_service.stop_producer()
-
- loop = asyncio.get_event_loop()
- if loop.is_closed():
- loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
- loop.run_until_complete(task()) # 在现有事件循环中运行任务
-
-
-
- # @celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms', bind=True, acks_late=True)
- # def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config, gewe_config):
-
- # '''
- # 关于群加好友的请求规则: 每个智能体一次最多30人,间隔2小时做1次,即最多90人/天。
- # 加好友规则:每天处理次数、间隔时间(分钟)、每次加好友人数这3个参数都可以设置。目前默认只是上面的设置。
- # '''
- # async def process_login_key(redis_service:RedisService, gewe_service: GeWeService, kafka_service:KafkaService, k, gewe_config, oneday_add_contacts_total, once_add_contacts_total):
- # r = await redis_service.get_hash(k)
- # app_id = r.get("appId")
- # token_id = r.get("tokenId")
- # wxid = r.get("wxid")
- # status = r.get('status')
-
- # if status == '0':
- # logger.warning(f"微信号 {wxid} 已经离线,群成员不能定时添加")
- # return
-
- # config = await gewe_service.get_wxchat_config_from_cache_async(wxid)
- # validated_config = AgentConfig.model_validate(config)
- # if not validated_config.agentEnabled:
- # logger.warning(f"微信号 {wxid} 取消了托管,群成员不能定时添加")
- # return
-
- # # 判断是否过于频繁
- # is_wx_expection = await gewe_service.get_wx_expection_async(wxid, "addGroupMemberAsFriend")
- # if is_wx_expection:
- # logger.info(f"{wxid} 本次任务接口addGroupMemberAsFriend异常, {is_wx_expection},本次群好友邀请任务未开始,跳过任务。")
- # return
-
- # cache_task_run_time_wxid_logs = await gewe_service.get_task_run_time_by_wxid_async(wxid, 'scheduled_task_add_contacts_from_chatrooms')
- # if cache_task_run_time_wxid_logs:
- # sorted_tasks = sorted(cache_task_run_time_wxid_logs, key=lambda x: x.get("runTime"), reverse=True)
- # last_run_time = sorted_tasks[0].get("runTime")
- # if last_run_time > 1e12: # 毫秒级时间戳
- # last_run_time = last_run_time / 1000 # 转换为秒
-
- # # 将时间戳转换为 datetime 对象
- # last_run_time = datetime.fromtimestamp(last_run_time)
-
- # # 获取当前时间
- # current_time = datetime.now()
-
- # # 计算时间差
- # time_difference = current_time - last_run_time
-
- # # 判断是否相差2小时
- # if time_difference < timedelta(hours=2):
- # logger.info(f"{wxid}上次定时群成员定时添好友任务在2小时内,不再执行")
- # return
-
- # cache_task_run_time_wxid_logs.append({"runTime": int(time.time())})
- # await gewe_service.save_task_run_time_by_wxid_async(wxid, 'scheduled_task_add_contacts_from_chatrooms', cache_task_run_time_wxid_logs, 3600 * 2)
- # c:dict = await gewe_service.get_wxchat_config_from_cache_async(wxid)
- # contacts = await gewe_service.get_contacts_brief_from_cache_async(wxid)
-
- # contact_wxids = [c.get('userName') for c in contacts]
- # chatrooms = c.get('addContactsFromChatroomIdWhiteList', [])
- # logger.info(f'{wxid} 定时群成员定时添好友任务开始')
- # wixd_add_contacts_from_chatrooms_times = {wxid: 0}
- # for chatroom_id in chatrooms:
- # chatroom = await gewe_service.get_group_info_from_cache_async(wxid, chatroom_id)
- # chatroom_member = await gewe_service.get_group_members_from_cache_async(wxid, chatroom_id)
-
- # chatroom_nickname = chatroom.get('nickName')
- # chatroom_owner_wxid = chatroom_member.get('chatroomOwner', None)
- # admin_wxids = chatroom_member.get('adminWxid', [])
- # admin_wxids = chatroom_member.get('adminWxid')
- # if admin_wxids is None:
- # admin_wxids = [] # 如果 admin_wxids 是 None,将其初始化为空列表
-
- # logger.info(f'{chatroom_nickname} 的群主是 {chatroom_owner_wxid},管理员是{admin_wxids}')
- # contact_wxids_set = set(contact_wxids)
- # if admin_wxids:
- # contact_wxids_set.update(set(admin_wxids))
- # if chatroom_owner_wxid is not None:
- # contact_wxids_set.add(chatroom_owner_wxid)
-
- # contact_wxids_set.add(wxid)
- # unavailable_wixds = await gewe_service.check_wixd_group_add_contacts_history_async(wxid, chatroom_id)
- # if unavailable_wixds:
- # contact_wxids_set.update(set(unavailable_wixds))
-
- # chatroom_member_list = chatroom.get('memberList', [])
- # if chatroom_member_list is None:
- # chatroom_member_list = [] # 如果 memberList 是 None,将其初始化为空列表
- # elif not isinstance(chatroom_member_list, list):
- # chatroom_member_list = list(chatroom_member_list) # 如果 memberList 不是列表,将其转换为列表
-
- # remaining_chatroot_members = [x for x in chatroom_member_list if x.get('wxid') not in contact_wxids_set]
-
- # nickname = next((member['nickName'] for member in chatroom_member_list if member['wxid'] == wxid), None)
- # if not remaining_chatroot_members:
- # logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里没有好友可以邀请')
- # # 任务状态推送到kafka
- # k_message = wx_add_contacts_from_chatroom_task_status_message(wxid, chatroom_id, 2)
- # await kafka_service.send_message_async(k_message)
- # return
-
- # logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里还可以邀请的好友有:{[x.get("nickName") for x in remaining_chatroot_members]}')
- # for m in remaining_chatroot_members:
- # # 判断本次任务是否已经邀请了30个好友
- # if wixd_add_contacts_from_chatrooms_times[wxid] == once_add_contacts_total:
- # logger.info(f"{wxid} 本次任务已经邀请了{once_add_contacts_total}人,不再邀请")
- # return
-
- # # 判断当天群成员是否已经加了90个好友
- # is_add_group_times = await gewe_service.is_group_add_contacts_history_one_day_async(wxid, oneday_add_contacts_total)
- # if is_add_group_times:
- # logger.info(f"当天 {wxid} 所有群的成员已经加了{oneday_add_contacts_total}个好友,不再添加")
- # return
-
- # # 判断是否过于频繁
- # is_wx_expection = await gewe_service.get_wx_expection_async(wxid, "addGroupMemberAsFriend")
- # if is_wx_expection:
- # logger.info(f"{wxid} 本次任务接口addGroupMemberAsFriend异常,不再邀请,{is_wx_expection}")
- # return
-
- # contact_wxid = m.get('wxid')
- # member_nickname = m.get("nickName")
- # group_add_contacts_history = await gewe_service.get_group_add_contacts_history_async(wxid, chatroom_id, contact_wxid)
-
- # if group_add_contacts_history:
- # sorted_history = sorted(group_add_contacts_history, key=lambda x: x.addTime, reverse=True)
-
- # # 已经邀请过两次,不再邀请
- # if len(sorted_history) == 2:
- # logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经邀请过2次,不再邀请')
- # return
-
- # # 当天邀请过,不再邀请
- # if len(sorted_history) > 0:
- # last_add_time = sorted_history[0].addTime
-
- # def is_add_time_more_than_one_day(addTime: int) -> bool:
- # """
- # 判断 addTime 是否与当前时间相隔大于 3600 × 24 秒
- # :param addTime: Unix 时间戳
- # :return: 如果 addTime 与当前时间相隔大于 3600 × 24 秒,返回 True;否则返回 False
- # """
- # # 获取当前时间的时间戳
- # current_time = time.time()
-
- # # 计算时间戳差值
- # time_difference = abs(current_time - addTime)
-
- # # 检查是否大于 3600 × 24 秒
- # return time_difference > 3600 * 24
-
- # is_more_than_one_day = is_add_time_more_than_one_day(last_add_time)
- # if not is_more_than_one_day:
- # logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经当天邀请,不再邀请')
- # return
-
- # ret, msg, data = await gewe_service.add_group_member_as_friend_async(token_id, app_id, chatroom_id, m.get('wxid'),
- # f'我是群聊"{chatroom_nickname}"群的{nickname}')
- # if ret != 200:
- # logger.warning(f'群好友邀请失败原因:{ret} {msg} {data}')
- # if '操作过于频繁' in data.get('msg'):
- # await gewe_service.save_wx_expection_async(wxid, "addGroupMemberAsFriend", msg, today_seconds_remaining())
- # logger.warning(f'{nickname}-{wxid} 操作过于频繁,本次群好友邀请任务未完成跳过。')
- # return
-
- # history = AddGroupContactsHistory.model_validate({
- # "chatroomId": chatroom_id,
- # "wxid": wxid,
- # "contactWixd": contact_wxid,
- # "addTime": int(time.time())
- # })
- # await gewe_service.save_group_add_contacts_history_async(wxid, chatroom_id, contact_wxid, history)
- # wixd_add_contacts_from_chatrooms_times[wxid] += 1
- # logger.info(f'{nickname} 向 {chatroom_nickname}-{chatroom_id} 群的 {m.get("nickName")}-{m.get("wxid")} 发送好友邀请 {msg}')
- # # 推送到kafka
- # k_message = wx_add_contacts_from_chatroom_message(history.wxid, history.chatroomId, history.contactWixd, history.addTime)
- # await kafka_service.send_message_async(k_message)
- # # await asyncio.sleep(random.uniform(1.5, 3))
- # await asyncio.sleep(random.uniform(30, 60))
- # # 任务状态推送到kafka
- # task_status = await gewe_service.wx_add_contacts_from_chatroom_task_status_async(wxid, chatroom_id)
- # wx_add_contacts_from_chatroom_task_status_message(wxid, chatroom_id, task_status)
- # await kafka_service.send_message_async(k_message)
- # # 下一个群
- # await asyncio.sleep(random.uniform(1.5, 3))
-
- # async def task():
- # try:
- # now = datetime.now()
- # if now.hour < 8:
- # logger.info(f"定时群成员定时添好友任务不启动,当前时间为 {now.strftime('%Y-%m-%d %H:%M:%S')},早于8点")
- # return
-
- # logger.info('定时群成员定时添好友任务开始')
- # redis_service = RedisService()
- # await redis_service.init(**redis_config)
- # gewe_service = await GeWeService.get_instance(redis_service, gewe_config['api_url'])
- # KAFKA_BOOTSTRAP_SERVERS = kafka_config['bootstrap_servers']
- # KAFKA_TOPIC = kafka_config['topic']
- # KAFKA_GROUP_ID = kafka_config['group_id']
-
- # kafka_service = KafkaService(KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC, KAFKA_TOPIC, KAFKA_GROUP_ID)
- # await kafka_service.start_producer()
- # global_config = await gewe_service.get_global_config_from_cache_async()
- # scheduled_task_add_contacts_from_chatrooms_config = global_config.get('scheduledTaskAddContactsFromChatrooms', {})
-
- # oneday_add_contacts_total = 90
- # once_add_contacts_total = 30
- # # oneday_times=3
- # if scheduled_task_add_contacts_from_chatrooms_config:
- # oneday_add_contacts_total = scheduled_task_add_contacts_from_chatrooms_config.get('oneDayAddContactsTotal', 90)
- # once_add_contacts_total = scheduled_task_add_contacts_from_chatrooms_config.get('onceAddContactsTotal', 30)
- # # oneday_times=scheduled_task_add_contacts_from_chatrooms_config.get('oneDayTimes',3)
-
- # login_keys = []
- # async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
- # login_keys.append(key)
-
- # # 使用 asyncio.gather 并发处理每个 login_key
- # await asyncio.gather(*[process_login_key(redis_service, gewe_service, kafka_service, k, gewe_config, oneday_add_contacts_total, once_add_contacts_total) for k in login_keys])
-
- # except Exception as e:
- # # 获取当前的堆栈跟踪
- # tb = sys.exc_info()[2]
- # # 为异常附加堆栈跟踪
- # e = e.with_traceback(tb)
- # # 输出详细的错误信息
- # logger.error(f"任务执行过程中发生异常: {e}\n异常类型: {type(e).__name__}\n异常信息: {str(e)}\n堆栈跟踪: {traceback.format_exc()}")
- # finally:
- # await kafka_service.stop_producer()
-
- # loop = asyncio.get_event_loop()
- # if loop.is_closed():
- # loop = asyncio.new_event_loop()
- # asyncio.set_event_loop(loop)
- # loop.run_until_complete(task()) # 在现有事件循环中运行任务
-
-
-
-
-
-
- REDIS_KEY_PATTERN = "friend_add_limit:{date}"
- REDIS_LAST_RUN_KEY = "last_run_time:add_friends_task"
-
- @celery_app.task(name='tasks.add_friends_task', bind=True, acks_late=True)
- def add_friends_task(self,redis_config):
- """
- 限制每天最多 15 个,每 2 小时最多 8 个
- """
- async def task():
- redis_service = RedisService()
- await redis_service.init(**redis_config)
- today_str = datetime.now().strftime("%Y%m%d")
- redis_key = REDIS_KEY_PATTERN.format(date=today_str)
-
- # 获取当前总添加数量
- total_added = await redis_service.get_hash_field(redis_key, "total") or 0
- last_2h_added =await redis_service.get_hash_field(redis_key, "last_2h") or 0
-
- total_added = int(total_added)
- last_2h_added = int(last_2h_added)
-
- logger.info(f"当前添加好友总数: {total_added}, 过去2小时添加: {last_2h_added}")
-
- # 判断是否超过限制
- if total_added >= 15:
- logger.warning("今日好友添加已达上限!")
- return
-
- if last_2h_added >= 8:
- logger.warning("过去2小时添加已达上限!")
- return
-
- # 计算本次要添加的好友数量 (控制每天 5-15 个)
- max_add = min(15 - total_added, 8 - last_2h_added)
- if max_add <= 0:
- return
-
- num_to_add = min(max_add, 1) # 每次最多加 1 个
- logger.info(f"本次添加 {num_to_add} 位好友")
-
- # TODO: 调用好友添加逻辑 (接口 or 业务逻辑)
- # success = add_friends(num_to_add)
-
- success = num_to_add # 假设成功添加 num_to_add 个
-
- # 更新 Redis 计数
- if success > 0:
- await redis_service.increment_hash_field(redis_key, "total", success)
- await redis_service.increment_hash_field(redis_key, "last_2h", success)
-
- # 设置 Redis 过期时间 (每日记录存 1 天, 2 小时记录存 2 小时)
- await redis_service.expire(redis_key, 86400) # 24小时
- await redis_service.expire_field(redis_key, "last_2h", 7200) # 2小时
-
- logger.info(f"成功添加 {success} 位好友, 今日总数 {total_added + success}")
-
- # 生成一个新的随机时间(5-15 分钟之间)
- # next_interval = random.randint(10, 20)
-
- # # 计算新的执行时间
- # next_run_time = datetime.datetime.now() + timedelta(seconds=next_interval)
-
- # # 重新注册 RedBeat 任务,确保下次执行时间不同
- # redbeat_entry = RedBeatSchedulerEntry(
- # name="redbeat:add_friends_task",
- # task="tasks.add_friends_task",
- # schedule=celery.schedules.schedule(timedelta(seconds=next_interval)),
- # args=[redis_config],
- # app=celery_app
- # )
-
- # # 设置任务的下次执行时间
- # redbeat_entry.last_run_at = next_run_time
- # redbeat_entry.save()
-
- # logger.info(f"下次任务将在 {next_run_time} 执行(间隔 {next_interval} 秒)")
-
- loop = asyncio.get_event_loop()
-
- if loop.is_closed():
- loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
-
- loop.run_until_complete(task()) # 在现有事件循环中运行任务
-
- def today_seconds_remaining()->int:
- current_time = datetime.now()
-
- # 计算当天的结束时间(23:59:59)
- end_of_day = datetime(current_time.year, current_time.month, current_time.day, 23, 59, 59)
-
- # 计算时间差
- time_difference = end_of_day - current_time
-
- # 将时间差转换为秒数
- time_difference_seconds = int(time_difference.total_seconds())
-
- return time_difference_seconds
-
-
- @celery_app.task(name='tasks.random_scheduled_task', bind=True, acks_late=True)
- def random_scheduled_task(self,):
- print(f"Task executed at {datetime.now()}")
- # 随机生成下次执行时间(例如:10-60秒内的随机时间)
- next_run_in = random.randint(10, 60)
- print(f"Next execution will be in {next_run_in} seconds")
-
- # 设置下次执行时间
- entry = RedBeatSchedulerEntry(
- name='random-task',
- task='tasks.random_scheduled_task',
- schedule=timedelta(seconds=next_run_in),
- app=celery_app
- )
- entry.save()
- return f"Scheduled next run in {next_run_in} seconds"
-
|