@@ -99,9 +99,11 @@ from common.log import logger | |||
# } | |||
scheduled_task_sync_wx_info_interval = 6000 | |||
scheduled_task_add_contacts_from_chatrooms_interval=10 | |||
environment = os.environ.get('environment', 'default') | |||
if environment != 'default': | |||
scheduled_task_sync_wx_info_interval = 60*11 | |||
scheduled_task_add_contacts_from_chatrooms_interval = 3600*24 | |||
# 定义定时任务列表 (任务 ID, 任务名称, 执行间隔秒, 任务参数) | |||
@@ -142,6 +144,9 @@ tasks_schedule = [ | |||
# crontab(minute=random_minute, hour='*'), # 每小时在随机分钟执行 | |||
# # 如果想每天在随机时间执行,可以用: crontab(minute=random_minute, hour=random_hour) | |||
# [redis_config]), | |||
("redbeat:scheduled_task_add_contacts_from_chatrooms", "tasks.scheduled_task_add_contacts_from_chatrooms", | |||
celery.schedules.schedule(timedelta(seconds=scheduled_task_add_contacts_from_chatrooms_interval)), | |||
[redis_config, kafka_config, gewe_config]), | |||
] | |||
# # 注册 RedBeat 任务 | |||
@@ -164,81 +169,20 @@ for task_id, task_name, schedule_obj, task_args in tasks_schedule: | |||
logger.info(f"scheduled_task_sync_wx_info 定时任务执行成功!") | |||
# def setup_schedule(): | |||
# # 初始化第一次执行(例如:15秒后执行) | |||
# initial_run_in = 5 | |||
# entry = RedBeatSchedulerEntry( | |||
# name='random-task', | |||
# task='tasks.random_scheduled_task', | |||
# schedule=timedelta(seconds=initial_run_in), | |||
# app=celery_app | |||
# ) | |||
# entry.save() | |||
# print(f"Initial task scheduled in {initial_run_in} seconds") | |||
# setup_schedule() | |||
# for task_id, task_name, schedule_obj, task_args in tasks_schedule: | |||
# entry_key = f'redbeat:{task_id}' # RedBeat 任务的键 | |||
# existing_entry = None | |||
# try: | |||
# existing_entry = RedBeatSchedulerEntry.from_key(entry_key, app=celery_app) | |||
# except KeyError: | |||
# # 任务不存在 | |||
# pass | |||
# print(existing_entry) | |||
# if existing_entry: | |||
# print(f"任务 `{task_name}` 已存在,跳过注册") | |||
# logger.info(f"任务 `{task_name}` 已存在,跳过注册") | |||
# else: | |||
# redbeat_entry = RedBeatSchedulerEntry( | |||
# name=task_id, | |||
# task=task_name, | |||
# schedule=schedule_obj, # 现在使用 schedule_obj 而不是创建新的 schedule | |||
# args=task_args, | |||
# app=celery_app | |||
# ) | |||
# redbeat_entry.save() | |||
# if task_name == "tasks.add_friends_task": | |||
# if isinstance(schedule_obj, crontab): | |||
# print(f"已注册 `{task_name}` 任务,将在每小时的第 {schedule_obj._orig_minute} 分钟执行") | |||
# logger.info(f"已注册 `{task_name}` 任务,将在每小时的第 {schedule_obj._orig_minute} 分钟执行") | |||
# else: | |||
# print("scheduled_task_sync_wx_info 定时任务执行成功!") | |||
# logger.info("scheduled_task_sync_wx_info 定时任务执行成功!") | |||
# # **仅在首次启动时,手动注册 RedBeat 任务** | |||
# def register_initial_tasks(): | |||
# try: | |||
# # 生成一个随机的首次任务执行间隔(5-15 分钟) | |||
# initial_interval = random.randint(1, 3) | |||
# #initial_interval= | |||
# # RedBeat 任务注册 | |||
# redbeat_entry = RedBeatSchedulerEntry( | |||
# name="redbeat:add_friends_task", | |||
# task="tasks.add_friends_task", | |||
# schedule=celery.schedules.schedule(timedelta(seconds=initial_interval)), | |||
# args=[redis_config], | |||
# app=celery_app | |||
# ) | |||
# redbeat_entry.save() | |||
# print(f"已注册 `tasks.add_friends_task` 任务,首次将在 {initial_interval} 秒后执行") | |||
# except Exception as e: | |||
# print(f"任务注册失败: {e}") | |||
# register_initial_tasks() | |||
#add_friends_task.apply_async(args=[redis_config]) | |||
def setup_schedule(): | |||
# 初始化第一次执行(例如:15秒后执行) | |||
initial_run_in = 5 | |||
entry = RedBeatSchedulerEntry( | |||
name='random-task', | |||
task='tasks.random_scheduled_task', | |||
schedule=timedelta(seconds=initial_run_in), | |||
app=celery_app | |||
) | |||
entry.save() | |||
print(f"Initial task scheduled in {initial_run_in} seconds") | |||
#trigger_initial_task() | |||
setup_schedule() |
@@ -39,7 +39,7 @@ if __name__ == "__main__": | |||
celery_beat_process = start_celery_beat() | |||
# 等待子进程完成 | |||
fastapi_process.wait() | |||
#fastapi_process.wait() | |||
celery_worker_process.wait() | |||
celery_beat_process.wait() | |||
@@ -508,7 +508,7 @@ class GeWeService: | |||
response_object = await response.json() | |||
return response_object.get('ret', None), response_object.get('msg', None), response_object.get('data', None) | |||
async def add_group_member_as_friend(self, token_id, app_id, chatroom_id, member_wxid, content): | |||
async def add_group_member_as_friend_async(self, token_id, app_id, chatroom_id, member_wxid, content): | |||
''' | |||
添加群成员为好友 | |||
''' | |||
@@ -875,7 +875,6 @@ class GeWeService: | |||
await self.redis_service.update_hash_field(hash_key, chatroom_id, json.dumps(data, ensure_ascii=False)) | |||
await asyncio.sleep(0.1) | |||
async def save_groups_members_to_cache_async(self, token_id, app_id, wxid, chatroom_ids: list): | |||
""" | |||
将群成员保存到 Redis 缓存。 | |||
@@ -1128,7 +1127,8 @@ class GeWeService: | |||
cache = await self.redis_service.get_hash(hash_key) | |||
wxids = [key for key, value in cache.items() if len(json.loads(value)) == 2] | |||
return wxids | |||
async def enqueue_to_add_contacts_async(self,wxid,scene:int,v3,v4): | |||
""" | |||
入列待添加好友 | |||
@@ -11,6 +11,7 @@ from services.kafka_service import KafkaService | |||
from services.gewe_service import GeWeService | |||
from common.log import logger | |||
import asyncio,random | |||
from model.models import AddGroupContactsHistory | |||
@celery_app.task(name='tasks.add_task', bind=True, acks_late=True) | |||
@@ -168,7 +169,7 @@ def scheduled_task_sync_wx_info(self, redis_config, kafka_config, gewe_config): | |||
wxid = r.get("wxid") | |||
status = r.get('status') | |||
if status == '0': | |||
logger.warning(f"微信号 {wxid} 已经离线: {ret}-{msg}") | |||
logger.warning(f"微信号 {wxid} 已经离线") | |||
continue | |||
ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id) | |||
if ret != 200: | |||
@@ -197,6 +198,85 @@ def scheduled_task_sync_wx_info(self, redis_config, kafka_config, gewe_config): | |||
@celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms', bind=True, acks_late=True) | |||
def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config, gewe_config): | |||
async def task(): | |||
try: | |||
redis_service = RedisService() | |||
await redis_service.init(**redis_config) | |||
gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url']) | |||
login_keys = [] | |||
async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): | |||
login_keys.append(key) | |||
print(login_keys) | |||
for k in login_keys: | |||
r = await redis_service.get_hash(k) | |||
app_id = r.get("appId") | |||
token_id = r.get("tokenId") | |||
wxid = r.get("wxid") | |||
status = r.get('status') | |||
if status == '0': | |||
logger.warning(f"微信号 {wxid} 已经离线,群成员不能定时添加") | |||
continue | |||
c = await gewe_service.get_wxchat_config_from_cache_async(wxid) | |||
contacts = await gewe_service.get_contacts_brief_from_cache_async(wxid) | |||
contact_wxids = [c.get('userName') for c in contacts] | |||
chatrooms = c.get('addContactsFromChatroomIdWhiteList', []) | |||
for chatroom_id in chatrooms: | |||
chatroom = await gewe_service.get_group_info_from_cache_async(wxid, chatroom_id) | |||
chatroom_member=await gewe_service.get_group_members_from_cache_async(wxid, chatroom_id) | |||
chatroom_nickname = chatroom.get('nickName') | |||
chatroom_owner_wxid = chatroom_member.get('chatroomOwner', None) | |||
admin_wxid = chatroom_member.get('adminWxid', None) | |||
logger.info(f'{chatroom_nickname} 的群主是 {chatroom_owner_wxid},管理员是{admin_wxid}') | |||
contact_wxids_set = set(contact_wxids) | |||
if admin_wxid is not None: | |||
contact_wxids_set.add(admin_wxid) | |||
if chatroom_owner_wxid is not None: | |||
contact_wxids_set.add(chatroom_owner_wxid) | |||
contact_wxids_set.add(wxid) | |||
unavailable_wixds=await gewe_service.check_wixd_group_add_contacts_history_async(wxid,chatroom_id) | |||
contact_wxids_set.update(unavailable_wixds) | |||
chatroot_member_list = chatroom.get('memberList', []) | |||
remaining_chatroot_members = [x for x in chatroot_member_list if x.get('wxid') not in contact_wxids_set] | |||
nickname = next((member['nickName'] for member in chatroot_member_list if member['wxid'] == wxid), None) | |||
logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里还可以邀请的好友有:{[x.get("nickName") for x in remaining_chatroot_members]}') | |||
for m in remaining_chatroot_members: | |||
ret, msg, data = await gewe_service.add_group_member_as_friend_async(token_id, app_id, chatroom_id, m.get('wxid'), f'我是群聊"{chatroom_nickname}"群的{nickname}') | |||
if ret==200: | |||
contact_wxids= m.get('wxid') | |||
history=AddGroupContactsHistory.model_validate({ | |||
"chatroomId":chatroom_id, | |||
"wxid":wxid, | |||
"contactWixd":contact_wxids, | |||
"addTime":int(time.time()) | |||
}) | |||
await gewe_service.save_group_add_contacts_history_async(wxid,chatroom_id,contact_wxids,history) | |||
else: | |||
logger.info(f'群好友邀请失败原因:{data}') | |||
logger.info(f'{nickname} 向 {chatroom_nickname}-{chatroom_id} 群的 {m.get("nickName")}-{m.get("wxid")} 发送好友邀请 {msg}') | |||
await asyncio.sleep(random.uniform(1.5, 3)) | |||
await asyncio.sleep(random.uniform(1.5, 3)) | |||
except Exception as e: | |||
logger.error(f"任务执行过程中发生异常: {e}") | |||
loop = asyncio.get_event_loop() | |||
if loop.is_closed(): | |||
loop = asyncio.new_event_loop() | |||
asyncio.set_event_loop(loop) | |||
loop.run_until_complete(task()) # 在现有事件循环中运行任务 | |||
REDIS_KEY_PATTERN = "friend_add_limit:{date}" | |||
REDIS_LAST_RUN_KEY = "last_run_time:add_friends_task" | |||
@@ -298,4 +378,5 @@ def random_scheduled_task(self,): | |||
app=celery_app | |||
) | |||
entry.save() | |||
return f"Scheduled next run in {next_run_in} seconds" | |||
return f"Scheduled next run in {next_run_in} seconds" | |||