Nevar pievienot vairāk kā 25 tēmas Tēmai ir jāsākas ar burtu vai ciparu, tā var saturēt domu zīmes ('-') un var būt līdz 35 simboliem gara.

301 rinda
12KB

  1. from celery_app import celery_app
  2. from fastapi import Request,FastAPI
  3. import time,datetime
  4. from celery import Celery
  5. import celery.schedules
  6. from redbeat import RedBeatSchedulerEntry
  7. from datetime import timedelta
  8. from services.redis_service import RedisService
  9. from services.kafka_service import KafkaService
  10. from services.gewe_service import GeWeService
  11. from common.log import logger
  12. import asyncio,random
  13. @celery_app.task(name='tasks.add_task', bind=True, acks_late=True)
  14. def add_task(self, x, y):
  15. time.sleep(5) # 模拟长时间计算
  16. logger.info('add')
  17. return x + y
  18. @celery_app.task(name='tasks.mul_task', bind=True, acks_late=True)
  19. def mul_task(self, x, y):
  20. time.sleep(5) # 模拟长时间计算
  21. return x * y
  22. # @celery.task(name='app.tasks.sync_contacts', bind=True, acks_late=True)
  23. # async def sync_contacts_task(self,app):
  24. # login_keys = list(await app.state.redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'))
  25. # return login_keys
  26. # # for k in login_keys:
  27. # # print(k)
  28. @celery_app.task(name='tasks.sync_contacts', bind=True, acks_late=True)
  29. async def sync_contacts_task(self, redis_service):
  30. # Use the redis_service passed as an argument
  31. login_keys = list(await redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'))
  32. return login_keys
  33. @celery_app.task(name='tasks.background_worker_task', bind=True, acks_late=True)
  34. def background_worker_task(self, redis_config, kafka_config, gewe_config):
  35. async def task():
  36. redis_service = RedisService()
  37. await redis_service.init(**redis_config)
  38. login_keys = []
  39. async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
  40. login_keys.append(key)
  41. print(login_keys)
  42. asyncio.run(task())
  43. # @celery.task(name='tasks.background_worker_task', bind=True, acks_late=True)
  44. # async def background_worker_task(self, redis_config, kafka_config, gewe_config):
  45. # # Initialize services inside the task
  46. # redis_service = RedisService()
  47. # await redis_service.init(**redis_config)
  48. # login_keys = []
  49. # async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): # 使用 async for 遍历异步生成器
  50. # login_keys.append(key)
  51. # print(login_keys)
  52. # kafka_service = KafkaService(**kafka_config)
  53. # await kafka_service.start()
  54. # gewe_service = await GeWeService.get_instance(None, gewe_config['api_url'])
  55. # # Task logic
  56. # lock_name = "background_wxchat_thread_lock"
  57. # lock_identifier = str(time.time())
  58. # while True:
  59. # if await redis_service.acquire_lock(lock_name, timeout=10):
  60. # try:
  61. # logger.info("分布式锁已成功获取")
  62. # # Perform task logic
  63. # finally:
  64. # await redis_service.release_lock(lock_name, lock_identifier)
  65. # break
  66. # else:
  67. # logger.info("获取分布式锁失败,等待10秒后重试...")
  68. # await asyncio.sleep(10)
  69. # @celery_app.task(name='tasks.scheduled_task', bind=True, acks_late=True)
  70. # def scheduled_task(self):
  71. # print("定时任务执行成功!~~~~~~~~~~~~~~~~~")
  72. # return "Hello from Celery Beat + RedBeat!"
  73. # @celery_app.task(name='tasks.scheduled_task_sync_wx', bind=True, acks_late=True)
  74. # def scheduled_task_sync_wx(self,redis_service,kafka_service,gewe_service):
  75. # print("scheduled_task_sync_wx 定时任务执行成功!")
  76. # return "Hello from Celery Beat + RedBeat!"
  77. # @celery_app.task(name='tasks.scheduled_task_sync_wx_info_1', bind=True, acks_late=True)
  78. # def scheduled_task_sync_wx_info_1(self,redis_config, kafka_config, gewe_config):
  79. # '''
  80. # 定时获取微信号资料
  81. # '''
  82. # loop = asyncio.new_event_loop()
  83. # asyncio.set_event_loop(loop)
  84. # async def task():
  85. # try:
  86. # redis_service = RedisService()
  87. # await redis_service.init(**redis_config)
  88. # # gewe_service = await GeWeService.get_instance(None, gewe_config['api_url'])
  89. # login_keys = []
  90. # async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
  91. # login_keys.append(key)
  92. # print(login_keys)
  93. # # for k in login_keys:
  94. # # r = await redis_service.get_hash(k)
  95. # # app_id = r.get("appId")
  96. # # token_id = r.get("tokenId")
  97. # # wxid = r.get("wxid")
  98. # # status = r.get('status')
  99. # # if status == '0':
  100. # # continue
  101. # # ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id)
  102. # # if ret != 200:
  103. # # logger.warning(f"同步微信号 {wxid} 资料失败: {ret}-{msg}")
  104. # # continue
  105. # # nickname=profile.get("nickName")
  106. # # head_img_url=profile.get("smallHeadImgUrl")
  107. # # r.update({"nickName":nickname,"headImgUrl":head_img_url,"modify_at":int(time.time())})
  108. # # cleaned_login_info = {k: (v if v is not None else '') for k, v in r.items()}
  109. # # await redis_service.set_hash(k, cleaned_login_info)
  110. # # logger.info(f"同步微信号 {wxid} 资料 成功")
  111. # # redis_service.update_hash_field(k,"nickName",nickname)
  112. # # redis_service.update_hash_field(k,"headImgUrl",head_img_url)
  113. # # redis_service.update_hash_field(k,"modify_at",int(time.time()))
  114. # except Exception as e:
  115. # logger.error(f"任务执行过程中发生异常: {e}")
  116. # print("scheduled_task_sync_wx_info 定时任务执行成功!")
  117. # return "Hello from Celery Beat + RedBeat!"
  118. # loop.run_until_complete(task())
  119. # loop.close()
  120. @celery_app.task(name='tasks.scheduled_task_sync_wx_info', bind=True, acks_late=True)
  121. def scheduled_task_sync_wx_info(self, redis_config, kafka_config, gewe_config):
  122. '''
  123. 定时获取微信号资料
  124. '''
  125. async def task():
  126. try:
  127. redis_service = RedisService()
  128. await redis_service.init(**redis_config)
  129. gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url'])
  130. login_keys = []
  131. async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
  132. login_keys.append(key)
  133. print(login_keys)
  134. for k in login_keys:
  135. r = await redis_service.get_hash(k)
  136. app_id = r.get("appId")
  137. token_id = r.get("tokenId")
  138. wxid = r.get("wxid")
  139. status = r.get('status')
  140. if status == '0':
  141. logger.warning(f"微信号 {wxid} 已经离线: {ret}-{msg}")
  142. continue
  143. ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id)
  144. if ret != 200:
  145. logger.warning(f"同步微信号 {wxid} 资料失败: {ret}-{msg}")
  146. continue
  147. nickname=profile.get("nickName")
  148. head_img_url=profile.get("smallHeadImgUrl")
  149. # print(nickname)
  150. nickname=profile.get("nickName")
  151. head_img_url=profile.get("smallHeadImgUrl")
  152. r.update({"nickName":nickname,"headImgUrl":head_img_url,"modify_at":int(time.time())})
  153. cleaned_login_info = {k: (v if v is not None else '') for k, v in r.items()}
  154. await redis_service.set_hash(k, cleaned_login_info)
  155. logger.info(f"定时同步微信号{wxid}-昵称{nickname} 资料成功")
  156. except Exception as e:
  157. logger.error(f"任务执行过程中发生异常: {e}")
  158. loop = asyncio.get_event_loop()
  159. if loop.is_closed():
  160. loop = asyncio.new_event_loop()
  161. asyncio.set_event_loop(loop)
  162. loop.run_until_complete(task()) # 在现有事件循环中运行任务
  163. REDIS_KEY_PATTERN = "friend_add_limit:{date}"
  164. REDIS_LAST_RUN_KEY = "last_run_time:add_friends_task"
  165. @celery_app.task(name='tasks.add_friends_task', bind=True, acks_late=True)
  166. def add_friends_task(self,redis_config):
  167. """
  168. 限制每天最多 15 个,每 2 小时最多 8 个
  169. """
  170. async def task():
  171. redis_service = RedisService()
  172. await redis_service.init(**redis_config)
  173. today_str = datetime.datetime.now().strftime("%Y%m%d")
  174. redis_key = REDIS_KEY_PATTERN.format(date=today_str)
  175. # 获取当前总添加数量
  176. total_added = await redis_service.get_hash_field(redis_key, "total") or 0
  177. last_2h_added =await redis_service.get_hash_field(redis_key, "last_2h") or 0
  178. total_added = int(total_added)
  179. last_2h_added = int(last_2h_added)
  180. logger.info(f"当前添加好友总数: {total_added}, 过去2小时添加: {last_2h_added}")
  181. # 判断是否超过限制
  182. if total_added >= 15:
  183. logger.warning("今日好友添加已达上限!")
  184. return
  185. if last_2h_added >= 8:
  186. logger.warning("过去2小时添加已达上限!")
  187. return
  188. # 计算本次要添加的好友数量 (控制每天 5-15 个)
  189. max_add = min(15 - total_added, 8 - last_2h_added)
  190. if max_add <= 0:
  191. return
  192. num_to_add = min(max_add, 1) # 每次最多加 1 个
  193. logger.info(f"本次添加 {num_to_add} 位好友")
  194. # TODO: 调用好友添加逻辑 (接口 or 业务逻辑)
  195. # success = add_friends(num_to_add)
  196. success = num_to_add # 假设成功添加 num_to_add 个
  197. # 更新 Redis 计数
  198. if success > 0:
  199. await redis_service.increment_hash_field(redis_key, "total", success)
  200. await redis_service.increment_hash_field(redis_key, "last_2h", success)
  201. # 设置 Redis 过期时间 (每日记录存 1 天, 2 小时记录存 2 小时)
  202. await redis_service.expire(redis_key, 86400) # 24小时
  203. await redis_service.expire_field(redis_key, "last_2h", 7200) # 2小时
  204. logger.info(f"成功添加 {success} 位好友, 今日总数 {total_added + success}")
  205. # 生成一个新的随机时间(5-15 分钟之间)
  206. # next_interval = random.randint(10, 20)
  207. # # 计算新的执行时间
  208. # next_run_time = datetime.datetime.now() + timedelta(seconds=next_interval)
  209. # # 重新注册 RedBeat 任务,确保下次执行时间不同
  210. # redbeat_entry = RedBeatSchedulerEntry(
  211. # name="redbeat:add_friends_task",
  212. # task="tasks.add_friends_task",
  213. # schedule=celery.schedules.schedule(timedelta(seconds=next_interval)),
  214. # args=[redis_config],
  215. # app=celery_app
  216. # )
  217. # # 设置任务的下次执行时间
  218. # redbeat_entry.last_run_at = next_run_time
  219. # redbeat_entry.save()
  220. # logger.info(f"下次任务将在 {next_run_time} 执行(间隔 {next_interval} 秒)")
  221. loop = asyncio.get_event_loop()
  222. if loop.is_closed():
  223. loop = asyncio.new_event_loop()
  224. asyncio.set_event_loop(loop)
  225. loop.run_until_complete(task()) # 在现有事件循环中运行任务
  226. @celery_app.task(name='tasks.random_scheduled_task', bind=True, acks_late=True)
  227. def random_scheduled_task(self,):
  228. print(f"Task executed at {datetime.datetime.now()}")
  229. # 随机生成下次执行时间(例如:10-60秒内的随机时间)
  230. next_run_in = random.randint(10, 60)
  231. print(f"Next execution will be in {next_run_in} seconds")
  232. # 设置下次执行时间
  233. entry = RedBeatSchedulerEntry(
  234. name='random-task',
  235. task='tasks.random_scheduled_task',
  236. schedule=timedelta(seconds=next_run_in),
  237. app=celery_app
  238. )
  239. entry.save()
  240. return f"Scheduled next run in {next_run_in} seconds"