@@ -36,4 +36,4 @@ client_config.json | |||||
!plugins.json | !plugins.json | ||||
tmp/ | tmp/ | ||||
logs/ | logs/ | ||||
cmd.txt | |||||
*.txt |
@@ -1,46 +1,50 @@ | |||||
from celery import Celery | |||||
#from celery import Celery | |||||
import celery.schedules | import celery.schedules | ||||
from redbeat import RedBeatSchedulerEntry | from redbeat import RedBeatSchedulerEntry | ||||
from datetime import timedelta | from datetime import timedelta | ||||
from celery.schedules import crontab | |||||
from celery_config import * | |||||
from services.redis_service import RedisService | from services.redis_service import RedisService | ||||
from services.kafka_service import KafkaService | from services.kafka_service import KafkaService | ||||
from services.biz_service import BizService | from services.biz_service import BizService | ||||
from config import load_config,conf | |||||
#from config import load_config,conf | |||||
from urllib.parse import quote | from urllib.parse import quote | ||||
import asyncio,os | |||||
load_config() | |||||
KAFKA_BOOTSTRAP_SERVERS = conf().get("kafka_bootstrap_servers") | |||||
KAFKA_TOPIC = 'topic.ai.ops.wx' | |||||
KAFKA_GROUP_ID = 'ai-ops-wx' | |||||
redis_host=conf().get("redis_host") | |||||
redis_port=conf().get("redis_port") | |||||
redis_password=conf().get("redis_password") | |||||
redis_db=conf().get("redis_db") | |||||
encoded_password = quote(redis_password) | |||||
# 配置 Celery | |||||
celery_app = Celery( | |||||
"worker", | |||||
broker=f"redis://:{encoded_password}@{redis_host}:{redis_port}/{redis_db}", | |||||
backend=f"redis://:{encoded_password}@{redis_host}:{redis_port}/{redis_db}", | |||||
include=['tasks'] | |||||
) | |||||
# 配置 redbeat 作为 Celery Beat 调度器 | |||||
celery_app.conf.update( | |||||
timezone="Asia/Shanghai", # 设定时区 | |||||
beat_scheduler="redbeat.RedBeatScheduler", # 使用 RedBeat 作为调度器 | |||||
redbeat_redis_url=f"redis://:{encoded_password}@{redis_host}:{redis_port}/{redis_db}" # redbeat 存储任务调度信息的 Redis | |||||
, | |||||
redbeat_lock_timeout=60, # 避免多个 Beat 实例冲突 | |||||
beat_max_loop_interval=5 # 让 Celery Beat 每 5 秒检查一次任务 | |||||
) | |||||
import asyncio,os,random,sys | |||||
#from tasks import add_friends_task | |||||
from common.log import logger | |||||
# load_config() | |||||
# KAFKA_BOOTSTRAP_SERVERS = conf().get("kafka_bootstrap_servers") | |||||
# KAFKA_TOPIC = 'topic.ai.ops.wx' | |||||
# KAFKA_GROUP_ID = 'ai-ops-wx' | |||||
# redis_host=conf().get("redis_host") | |||||
# redis_port=conf().get("redis_port") | |||||
# redis_password=conf().get("redis_password") | |||||
# redis_db=conf().get("redis_db") | |||||
# encoded_password = quote(redis_password) | |||||
# # 配置 Celery | |||||
# celery_app = Celery( | |||||
# "worker", | |||||
# broker=f"redis://:{encoded_password}@{redis_host}:{redis_port}/{redis_db}", | |||||
# backend=f"redis://:{encoded_password}@{redis_host}:{redis_port}/{redis_db}", | |||||
# include=['tasks'] | |||||
# ) | |||||
# # 配置 redbeat 作为 Celery Beat 调度器 | |||||
# celery_app.conf.update( | |||||
# timezone="Asia/Shanghai", # 设定时区 | |||||
# beat_scheduler="redbeat.RedBeatScheduler", # 使用 RedBeat 作为调度器 | |||||
# redbeat_redis_url=f"redis://:{encoded_password}@{redis_host}:{redis_port}/{redis_db}" # redbeat 存储任务调度信息的 Redis | |||||
# , | |||||
# redbeat_lock_timeout=60, # 避免多个 Beat 实例冲突 | |||||
# beat_max_loop_interval=5 # 让 Celery Beat 每 5 秒检查一次任务 | |||||
# ) | |||||
# task_name = "tasks.scheduled_task" | # task_name = "tasks.scheduled_task" | ||||
# # 任务执行间隔(每 10 秒执行一次) | # # 任务执行间隔(每 10 秒执行一次) | ||||
@@ -76,46 +80,165 @@ celery_app.conf.update( | |||||
# ) | # ) | ||||
# redbeat_entry.save() | # redbeat_entry.save() | ||||
# 获取配置文件中的 redis_config、kafka_config、gewe_config | |||||
redis_config = { | |||||
'host': redis_host, | |||||
'port': redis_port, | |||||
'password': redis_password, | |||||
'db': redis_db, | |||||
} | |||||
kafka_config = { | |||||
'bootstrap_servers': KAFKA_BOOTSTRAP_SERVERS, | |||||
'topic': KAFKA_TOPIC, | |||||
'group_id': KAFKA_GROUP_ID, | |||||
} | |||||
gewe_config = { | |||||
'api_url': "http://api.geweapi.com/gewe", | |||||
} | |||||
scheduled_task_sync_wx_info_interval = 10 | |||||
# # 获取配置文件中的 redis_config、kafka_config、gewe_config | |||||
# redis_config = { | |||||
# 'host': redis_host, | |||||
# 'port': redis_port, | |||||
# 'password': redis_password, | |||||
# 'db': redis_db, | |||||
# } | |||||
# kafka_config = { | |||||
# 'bootstrap_servers': KAFKA_BOOTSTRAP_SERVERS, | |||||
# 'topic': KAFKA_TOPIC, | |||||
# 'group_id': KAFKA_GROUP_ID, | |||||
# } | |||||
# gewe_config = { | |||||
# 'api_url': "http://api.geweapi.com/gewe", | |||||
# } | |||||
scheduled_task_sync_wx_info_interval = 6000 | |||||
environment = os.environ.get('environment', 'default') | environment = os.environ.get('environment', 'default') | ||||
if environment != 'default': | if environment != 'default': | ||||
scheduled_task_sync_wx_info_interval = 60*11 | scheduled_task_sync_wx_info_interval = 60*11 | ||||
# 定义定时任务列表 (任务 ID, 任务名称, 执行间隔秒, 任务参数) | # 定义定时任务列表 (任务 ID, 任务名称, 执行间隔秒, 任务参数) | ||||
tasks_schedule = [ | |||||
#("redbeat:scheduled_task", "tasks.scheduled_task", 3, []), | |||||
#("redbeat:scheduled_task_sync_wx", "tasks.scheduled_task_sync_wx", 15, []), | |||||
("redbeat:scheduled_task_sync_wx_info", "tasks.scheduled_task_sync_wx_info", scheduled_task_sync_wx_info_interval, [redis_config, kafka_config, gewe_config]), | |||||
# tasks_schedule = [ | |||||
# #("redbeat:scheduled_task", "tasks.scheduled_task", 3, []), | |||||
# #("redbeat:scheduled_task_sync_wx", "tasks.scheduled_task_sync_wx", 15, []), | |||||
# ("redbeat:scheduled_task_sync_wx_info", "tasks.scheduled_task_sync_wx_info", scheduled_task_sync_wx_info_interval, [redis_config, kafka_config, gewe_config]), | |||||
# #("redbeat:add_friends_task", "tasks.add_friends_task", random.randint(1, 10), [redis_config]), # 10分钟执行一次 | |||||
# ] | |||||
# # 注册 RedBeat 任务 | |||||
# for task_id, task_name, interval, task_args in tasks_schedule: | |||||
# redbeat_entry = RedBeatSchedulerEntry( | |||||
# name=task_id, | |||||
# task=task_name, | |||||
# schedule=celery.schedules.schedule(timedelta(seconds=interval)), | |||||
# args=task_args, | |||||
# app=celery_app | |||||
# ) | |||||
# redbeat_entry.save() | |||||
] | |||||
# # 为 add_friends_task 生成随机的分钟和小时 | |||||
random_minute = random.randint(0, 59) | |||||
random_hour = random.randint(0, 23) # 可以根据需要调整小时范围 | |||||
# 注册 RedBeat 任务 | |||||
for task_id, task_name, interval, task_args in tasks_schedule: | |||||
# 定义定时任务列表 (任务 ID, 任务名称, 执行间隔或调度, 任务参数) | |||||
tasks_schedule = [ | |||||
# 其他任务保持不变 | |||||
("redbeat:scheduled_task_sync_wx_info", "tasks.scheduled_task_sync_wx_info", | |||||
celery.schedules.schedule(timedelta(seconds=scheduled_task_sync_wx_info_interval)), | |||||
[redis_config, kafka_config, gewe_config]), | |||||
# # add_friends_task 使用 crontab 随机时间 | |||||
# ("redbeat:add_friends_task", "tasks.add_friends_task", | |||||
# crontab(minute=random_minute, hour='*'), # 每小时在随机分钟执行 | |||||
# # 如果想每天在随机时间执行,可以用: crontab(minute=random_minute, hour=random_hour) | |||||
# [redis_config]), | |||||
] | |||||
# # 注册 RedBeat 任务 | |||||
for task_id, task_name, schedule_obj, task_args in tasks_schedule: | |||||
redbeat_entry = RedBeatSchedulerEntry( | redbeat_entry = RedBeatSchedulerEntry( | ||||
name=task_id, | name=task_id, | ||||
task=task_name, | task=task_name, | ||||
schedule=celery.schedules.schedule(timedelta(seconds=interval)), | |||||
args=task_args, | |||||
schedule=schedule_obj, # 现在使用 schedule_obj 而不是创建新的 schedule | |||||
args=task_args, | |||||
app=celery_app | app=celery_app | ||||
) | ) | ||||
redbeat_entry.save() | redbeat_entry.save() | ||||
# 如果是 add_friends_task,打印其随机调度信息 | |||||
if task_name == "tasks.add_friends_task": | |||||
if isinstance(schedule_obj, crontab): | |||||
print(f"已注册 `{task_name}` 任务,将在每小时的第 {random_minute} 分钟执行") | |||||
logger.info(f"已注册 `{task_name}` 任务,将在每小时的第 {random_minute} 分钟执行") | |||||
else: | |||||
print('scheduled_task_sync_wx_info 定时任务执行成功!') | |||||
logger.info(f"scheduled_task_sync_wx_info 定时任务执行成功!") | |||||
# def setup_schedule(): | |||||
# # 初始化第一次执行(例如:15秒后执行) | |||||
# initial_run_in = 5 | |||||
# entry = RedBeatSchedulerEntry( | |||||
# name='random-task', | |||||
# task='tasks.random_scheduled_task', | |||||
# schedule=timedelta(seconds=initial_run_in), | |||||
# app=celery_app | |||||
# ) | |||||
# entry.save() | |||||
# print(f"Initial task scheduled in {initial_run_in} seconds") | |||||
# setup_schedule() | |||||
# for task_id, task_name, schedule_obj, task_args in tasks_schedule: | |||||
# entry_key = f'redbeat:{task_id}' # RedBeat 任务的键 | |||||
# existing_entry = None | |||||
# try: | |||||
# existing_entry = RedBeatSchedulerEntry.from_key(entry_key, app=celery_app) | |||||
# except KeyError: | |||||
# # 任务不存在 | |||||
# pass | |||||
# print(existing_entry) | |||||
# if existing_entry: | |||||
# print(f"任务 `{task_name}` 已存在,跳过注册") | |||||
# logger.info(f"任务 `{task_name}` 已存在,跳过注册") | |||||
# else: | |||||
# redbeat_entry = RedBeatSchedulerEntry( | |||||
# name=task_id, | |||||
# task=task_name, | |||||
# schedule=schedule_obj, # 现在使用 schedule_obj 而不是创建新的 schedule | |||||
# args=task_args, | |||||
# app=celery_app | |||||
# ) | |||||
# redbeat_entry.save() | |||||
# if task_name == "tasks.add_friends_task": | |||||
# if isinstance(schedule_obj, crontab): | |||||
# print(f"已注册 `{task_name}` 任务,将在每小时的第 {schedule_obj._orig_minute} 分钟执行") | |||||
# logger.info(f"已注册 `{task_name}` 任务,将在每小时的第 {schedule_obj._orig_minute} 分钟执行") | |||||
# else: | |||||
# print("scheduled_task_sync_wx_info 定时任务执行成功!") | |||||
# logger.info("scheduled_task_sync_wx_info 定时任务执行成功!") | |||||
# # **仅在首次启动时,手动注册 RedBeat 任务** | |||||
# def register_initial_tasks(): | |||||
# try: | |||||
# # 生成一个随机的首次任务执行间隔(5-15 分钟) | |||||
# initial_interval = random.randint(1, 3) | |||||
# #initial_interval= | |||||
# # RedBeat 任务注册 | |||||
# redbeat_entry = RedBeatSchedulerEntry( | |||||
# name="redbeat:add_friends_task", | |||||
# task="tasks.add_friends_task", | |||||
# schedule=celery.schedules.schedule(timedelta(seconds=initial_interval)), | |||||
# args=[redis_config], | |||||
# app=celery_app | |||||
# ) | |||||
# redbeat_entry.save() | |||||
# print(f"已注册 `tasks.add_friends_task` 任务,首次将在 {initial_interval} 秒后执行") | |||||
# except Exception as e: | |||||
# print(f"任务注册失败: {e}") | |||||
# register_initial_tasks() | |||||
#add_friends_task.apply_async(args=[redis_config]) | |||||
#trigger_initial_task() |
@@ -0,0 +1,55 @@ | |||||
from celery import Celery | |||||
from config import load_config,conf | |||||
from urllib.parse import quote | |||||
load_config() | |||||
KAFKA_BOOTSTRAP_SERVERS = conf().get("kafka_bootstrap_servers") | |||||
KAFKA_TOPIC = 'topic.ai.ops.wx' | |||||
KAFKA_GROUP_ID = 'ai-ops-wx' | |||||
redis_host=conf().get("redis_host") | |||||
redis_port=conf().get("redis_port") | |||||
redis_password=conf().get("redis_password") | |||||
redis_db=conf().get("redis_db") | |||||
encoded_password = quote(redis_password) | |||||
# 配置 Celery | |||||
celery_app = Celery( | |||||
"worker", | |||||
broker=f"redis://:{encoded_password}@{redis_host}:{redis_port}/{redis_db}", | |||||
backend=f"redis://:{encoded_password}@{redis_host}:{redis_port}/{redis_db}", | |||||
include=['tasks'] | |||||
) | |||||
# 配置 redbeat 作为 Celery Beat 调度器 | |||||
celery_app.conf.update( | |||||
timezone="Asia/Shanghai", # 设定时区 | |||||
beat_scheduler="redbeat.RedBeatScheduler", # 使用 RedBeat 作为调度器 | |||||
redbeat_redis_url=f"redis://:{encoded_password}@{redis_host}:{redis_port}/{redis_db}" # redbeat 存储任务调度信息的 Redis | |||||
, | |||||
redbeat_lock_timeout=60, # 避免多个 Beat 实例冲突 | |||||
beat_max_loop_interval=5 # 让 Celery Beat 每 5 秒检查一次任务 | |||||
) | |||||
# 获取配置文件中的 redis_config、kafka_config、gewe_config | |||||
redis_config = { | |||||
'host': redis_host, | |||||
'port': redis_port, | |||||
'password': redis_password, | |||||
'db': redis_db, | |||||
} | |||||
kafka_config = { | |||||
'bootstrap_servers': KAFKA_BOOTSTRAP_SERVERS, | |||||
'topic': KAFKA_TOPIC, | |||||
'group_id': KAFKA_GROUP_ID, | |||||
} | |||||
gewe_config = { | |||||
'api_url': "http://api.geweapi.com/gewe", | |||||
} |
@@ -1,6 +1,11 @@ | |||||
import subprocess | import subprocess | ||||
import sys | import sys | ||||
import os | import os | ||||
import time | |||||
import signal | |||||
processes = [] | |||||
def start_fastapi(): | def start_fastapi(): | ||||
""" 启动 FastAPI 服务 """ | """ 启动 FastAPI 服务 """ | ||||
@@ -14,14 +19,17 @@ def start_fastapi(): | |||||
def start_celery_worker(): | def start_celery_worker(): | ||||
""" 启动 Celery Worker """ | """ 启动 Celery Worker """ | ||||
if sys.platform == "win32": | if sys.platform == "win32": | ||||
process = subprocess.Popen(["celery", "-A", "celery_app", "worker", "--loglevel=info", "-P", "solo"]) | |||||
process = subprocess.Popen(["celery", "-A", "celery_app", "worker", "--loglevel=info", "-P", "solo"], | |||||
stdout=None, stderr=None) | |||||
else: | else: | ||||
process = subprocess.Popen(["celery", "-A", "celery_app", "worker", "--loglevel=info"]) | |||||
process = subprocess.Popen(["celery", "-A", "celery_app", "worker", "--loglevel=info"], | |||||
stdout=None, stderr=None) | |||||
return process | return process | ||||
def start_celery_beat(): | def start_celery_beat(): | ||||
""" 启动 Celery Beat,使用 RedBeat 作为调度器 """ | """ 启动 Celery Beat,使用 RedBeat 作为调度器 """ | ||||
process = subprocess.Popen(["celery", "-A", "celery_app", "beat", "--scheduler", "redbeat.RedBeatScheduler", "--loglevel=info"]) | |||||
process = subprocess.Popen(["celery", "-A", "celery_app", "beat", "--scheduler", "redbeat.RedBeatScheduler", "--loglevel=info"], | |||||
stdout=None, stderr=None) | |||||
return process | return process | ||||
if __name__ == "__main__": | if __name__ == "__main__": | ||||
@@ -34,3 +42,37 @@ if __name__ == "__main__": | |||||
fastapi_process.wait() | fastapi_process.wait() | ||||
celery_worker_process.wait() | celery_worker_process.wait() | ||||
celery_beat_process.wait() | celery_beat_process.wait() | ||||
# def signal_handler(sig, frame): | |||||
# """处理退出信号,确保子进程也被终止""" | |||||
# print('正在关闭所有服务...') | |||||
# for process in processes: | |||||
# process.terminate() # 尝试优雅终止 | |||||
# # 给进程一点时间来优雅终止 | |||||
# time.sleep(2) | |||||
# # 检查是否有进程仍在运行,如果有则强制终止 | |||||
# for process in processes: | |||||
# if process.poll() is None: # 进程仍在运行 | |||||
# process.kill() # 强制终止 | |||||
# sys.exit(0) | |||||
# if __name__ == "__main__": | |||||
# # 注册信号处理程序 | |||||
# signal.signal(signal.SIGINT, signal_handler) # Ctrl+C | |||||
# signal.signal(signal.SIGTERM, signal_handler) # 终止信号 | |||||
# # 启动 FastAPI、Celery Worker 和 Celery Beat | |||||
# #fastapi_process = start_fastapi() | |||||
# celery_worker_process = start_celery_worker() | |||||
# celery_beat_process = start_celery_beat() | |||||
# # 让主进程保持运行,但不阻塞在特定子进程上 | |||||
# try: | |||||
# while True: | |||||
# time.sleep(1) # 简单地循环等待,让信号处理程序有机会工作 | |||||
# except KeyboardInterrupt: | |||||
# pass # 如果收到 KeyboardInterrupt,信号处理程序会处理 |
@@ -113,6 +113,7 @@ class BizService(): | |||||
self.wxchat.forward_video_aeskey = '' | self.wxchat.forward_video_aeskey = '' | ||||
self.wxchat.forward_video_cdnvideourl = '' | self.wxchat.forward_video_cdnvideourl = '' | ||||
self.wxchat.forward_video_length = 0 | self.wxchat.forward_video_length = 0 | ||||
self.wxchat.video_duration = 0 | |||||
for intersection_wxid in intersection_wxids: | for intersection_wxid in intersection_wxids: | ||||
for wx_content in wx_content_list: | for wx_content in wx_content_list: | ||||
@@ -243,8 +244,9 @@ class BizService(): | |||||
self.wxchat.forward_video_aeskey = res["aesKey"] | self.wxchat.forward_video_aeskey = res["aesKey"] | ||||
self.wxchat.forward_video_cdnvideourl = res["cdnThumbUrl"] | self.wxchat.forward_video_cdnvideourl = res["cdnThumbUrl"] | ||||
self.wxchat.forward_video_length = res["length"] | self.wxchat.forward_video_length = res["length"] | ||||
self.wxchat.video_duration=video_duration | |||||
else: | else: | ||||
ret,ret_msg,res = await self.wxchat.forward_video_async(token_id, app_id, t, self.wxchat.forward_video_aeskey, self.wxchat.forward_video_cdnvideourl, self.wxchat.forward_video_length) | |||||
ret,ret_msg,res = await self.wxchat.forward_video_async(token_id, app_id, t, self.wxchat.forward_video_aeskey, self.wxchat.forward_video_cdnvideourl, self.wxchat.forward_video_length,self.wxchat.video_duration) | |||||
print('转发视频') | print('转发视频') | ||||
if ret==200: | if ret==200: | ||||
logger.info(f'{agent_wxid} 向 {t} 发送视频【{file_url}】{ret_msg}') | logger.info(f'{agent_wxid} 向 {t} 发送视频【{file_url}】{ret_msg}') | ||||
@@ -377,7 +377,7 @@ class GeWeService: | |||||
response_object = await response.json() | response_object = await response.json() | ||||
return response_object.get('data', None), response_object.get('ret', None), response_object.get('msg', None) | return response_object.get('data', None), response_object.get('ret', None), response_object.get('msg', None) | ||||
async def forward_video_async(self, token_id, app_id, to_wxid, aeskey, cdnvideourl, length): | |||||
async def forward_video_async(self, token_id, app_id, to_wxid, aeskey, cdnvideourl, length,video_duration): | |||||
api_url = f"{self.base_url}/v2/api/message/forwardVideo" | api_url = f"{self.base_url}/v2/api/message/forwardVideo" | ||||
headers = { | headers = { | ||||
'X-GEWE-TOKEN': token_id, | 'X-GEWE-TOKEN': token_id, | ||||
@@ -386,7 +386,7 @@ class GeWeService: | |||||
data = { | data = { | ||||
"appId": app_id, | "appId": app_id, | ||||
"toWxid": to_wxid, | "toWxid": to_wxid, | ||||
"xml": f"<?xml version=\"1.0\"?>\n<msg>\n\t<videomsg aeskey=\"{aeskey}\" cdnvideourl=\"{cdnvideourl}\" cdnthumbaeskey=\"{aeskey}\" cdnthumburl=\"{cdnvideourl}\" length=\"{length}\" playlength=\"7\" cdnthumblength=\"8192\" cdnthumbwidth=\"135\" cdnthumbheight=\"240\" fromusername=\"zhangchuan2288\" md5=\"8804c121e9db91dd844f7a34035beb88\" newmd5=\"\" isplaceholder=\"0\" rawmd5=\"\" rawlength=\"0\" cdnrawvideourl=\"\" cdnrawvideoaeskey=\"\" overwritenewmsgid=\"0\" originsourcemd5=\"\" isad=\"0\" />\n</msg>" | |||||
"xml": f"<?xml version=\"1.0\"?>\n<msg>\n\t<videomsg aeskey=\"{aeskey}\" cdnvideourl=\"{cdnvideourl}\" cdnthumbaeskey=\"{aeskey}\" cdnthumburl=\"{cdnvideourl}\" length=\"{length}\" playlength=\"{video_duration}\" cdnthumblength=\"8192\" cdnthumbwidth=\"135\" cdnthumbheight=\"240\" fromusername=\"zhangchuan2288\" md5=\"8804c121e9db91dd844f7a34035beb88\" newmd5=\"\" isplaceholder=\"0\" rawmd5=\"\" rawlength=\"0\" cdnrawvideourl=\"\" cdnrawvideoaeskey=\"\" overwritenewmsgid=\"0\" originsourcemd5=\"\" isad=\"0\" />\n</msg>" | |||||
} | } | ||||
async with aiohttp.ClientSession() as session: | async with aiohttp.ClientSession() as session: | ||||
async with session.post(api_url, headers=headers, json=data) as response: | async with session.post(api_url, headers=headers, json=data) as response: | ||||
@@ -193,6 +193,38 @@ class RedisService: | |||||
await self.client.delete(queue_name) | await self.client.delete(queue_name) | ||||
print(f"Cleared queue {queue_name}") | print(f"Cleared queue {queue_name}") | ||||
async def increment_hash_field(self, hash_key, field, amount=1): | |||||
""" | |||||
对哈希表中的指定字段进行递增操作(原子性)。 | |||||
:param hash_key: 哈希表的 key | |||||
:param field: 要递增的字段 | |||||
:param amount: 递增的数值(默认为 1) | |||||
:return: 递增后的值 | |||||
""" | |||||
return await self.client.hincrby(hash_key, field, amount) | |||||
async def expire(self, key, timeout): | |||||
""" | |||||
设置 Redis 键的过期时间(单位:秒) | |||||
:param key: Redis 键 | |||||
:param timeout: 过期时间(秒) | |||||
""" | |||||
await self.client.expire(key, timeout) | |||||
async def expire_field(self, hash_key, field, timeout): | |||||
""" | |||||
通过辅助键方式设置哈希表中某个字段的过期时间 | |||||
:param hash_key: 哈希表的 key | |||||
:param field: 要设置过期的字段 | |||||
:param timeout: 过期时间(秒) | |||||
""" | |||||
expire_key = f"{hash_key}:{field}:expire" | |||||
await self.client.set(expire_key, "1") | |||||
await self.client.expire(expire_key, timeout) | |||||
# Dependency injection helper function | # Dependency injection helper function | ||||
async def get_redis_service(request: Request) -> RedisService: | async def get_redis_service(request: Request) -> RedisService: | ||||
return request.app.state.redis_serive | return request.app.state.redis_serive |
@@ -1,12 +1,16 @@ | |||||
from celery_app import celery_app | from celery_app import celery_app | ||||
from fastapi import Request,FastAPI | from fastapi import Request,FastAPI | ||||
import time | |||||
import time,datetime | |||||
from celery import Celery | |||||
import celery.schedules | |||||
from redbeat import RedBeatSchedulerEntry | |||||
from datetime import timedelta | |||||
from services.redis_service import RedisService | from services.redis_service import RedisService | ||||
from services.kafka_service import KafkaService | from services.kafka_service import KafkaService | ||||
from services.gewe_service import GeWeService | from services.gewe_service import GeWeService | ||||
from common.log import logger | from common.log import logger | ||||
import asyncio | |||||
import asyncio,random | |||||
@celery_app.task(name='tasks.add_task', bind=True, acks_late=True) | @celery_app.task(name='tasks.add_task', bind=True, acks_late=True) | ||||
@@ -190,3 +194,108 @@ def scheduled_task_sync_wx_info(self, redis_config, kafka_config, gewe_config): | |||||
asyncio.set_event_loop(loop) | asyncio.set_event_loop(loop) | ||||
loop.run_until_complete(task()) # 在现有事件循环中运行任务 | loop.run_until_complete(task()) # 在现有事件循环中运行任务 | ||||
REDIS_KEY_PATTERN = "friend_add_limit:{date}" | |||||
REDIS_LAST_RUN_KEY = "last_run_time:add_friends_task" | |||||
@celery_app.task(name='tasks.add_friends_task', bind=True, acks_late=True) | |||||
def add_friends_task(self,redis_config): | |||||
""" | |||||
限制每天最多 15 个,每 2 小时最多 8 个 | |||||
""" | |||||
async def task(): | |||||
redis_service = RedisService() | |||||
await redis_service.init(**redis_config) | |||||
today_str = datetime.datetime.now().strftime("%Y%m%d") | |||||
redis_key = REDIS_KEY_PATTERN.format(date=today_str) | |||||
# 获取当前总添加数量 | |||||
total_added = await redis_service.get_hash_field(redis_key, "total") or 0 | |||||
last_2h_added =await redis_service.get_hash_field(redis_key, "last_2h") or 0 | |||||
total_added = int(total_added) | |||||
last_2h_added = int(last_2h_added) | |||||
logger.info(f"当前添加好友总数: {total_added}, 过去2小时添加: {last_2h_added}") | |||||
# 判断是否超过限制 | |||||
if total_added >= 15: | |||||
logger.warning("今日好友添加已达上限!") | |||||
return | |||||
if last_2h_added >= 8: | |||||
logger.warning("过去2小时添加已达上限!") | |||||
return | |||||
# 计算本次要添加的好友数量 (控制每天 5-15 个) | |||||
max_add = min(15 - total_added, 8 - last_2h_added) | |||||
if max_add <= 0: | |||||
return | |||||
num_to_add = min(max_add, 1) # 每次最多加 1 个 | |||||
logger.info(f"本次添加 {num_to_add} 位好友") | |||||
# TODO: 调用好友添加逻辑 (接口 or 业务逻辑) | |||||
# success = add_friends(num_to_add) | |||||
success = num_to_add # 假设成功添加 num_to_add 个 | |||||
# 更新 Redis 计数 | |||||
if success > 0: | |||||
await redis_service.increment_hash_field(redis_key, "total", success) | |||||
await redis_service.increment_hash_field(redis_key, "last_2h", success) | |||||
# 设置 Redis 过期时间 (每日记录存 1 天, 2 小时记录存 2 小时) | |||||
await redis_service.expire(redis_key, 86400) # 24小时 | |||||
await redis_service.expire_field(redis_key, "last_2h", 7200) # 2小时 | |||||
logger.info(f"成功添加 {success} 位好友, 今日总数 {total_added + success}") | |||||
# 生成一个新的随机时间(5-15 分钟之间) | |||||
# next_interval = random.randint(10, 20) | |||||
# # 计算新的执行时间 | |||||
# next_run_time = datetime.datetime.now() + timedelta(seconds=next_interval) | |||||
# # 重新注册 RedBeat 任务,确保下次执行时间不同 | |||||
# redbeat_entry = RedBeatSchedulerEntry( | |||||
# name="redbeat:add_friends_task", | |||||
# task="tasks.add_friends_task", | |||||
# schedule=celery.schedules.schedule(timedelta(seconds=next_interval)), | |||||
# args=[redis_config], | |||||
# app=celery_app | |||||
# ) | |||||
# # 设置任务的下次执行时间 | |||||
# redbeat_entry.last_run_at = next_run_time | |||||
# redbeat_entry.save() | |||||
# logger.info(f"下次任务将在 {next_run_time} 执行(间隔 {next_interval} 秒)") | |||||
loop = asyncio.get_event_loop() | |||||
if loop.is_closed(): | |||||
loop = asyncio.new_event_loop() | |||||
asyncio.set_event_loop(loop) | |||||
loop.run_until_complete(task()) # 在现有事件循环中运行任务 | |||||
@celery_app.task(name='tasks.random_scheduled_task', bind=True, acks_late=True) | |||||
def random_scheduled_task(self,): | |||||
print(f"Task executed at {datetime.datetime.now()}") | |||||
# 随机生成下次执行时间(例如:10-60秒内的随机时间) | |||||
next_run_in = random.randint(10, 60) | |||||
print(f"Next execution will be in {next_run_in} seconds") | |||||
# 设置下次执行时间 | |||||
entry = RedBeatSchedulerEntry( | |||||
name='random-task', | |||||
task='tasks.random_scheduled_task', | |||||
schedule=timedelta(seconds=next_run_in), | |||||
app=celery_app | |||||
) | |||||
entry.save() | |||||
return f"Scheduled next run in {next_run_in} seconds" |