You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

3 hafta önce
3 hafta önce
3 hafta önce
3 hafta önce
3 hafta önce
3 hafta önce
3 hafta önce
3 hafta önce
3 hafta önce
3 hafta önce
3 hafta önce
3 hafta önce
3 hafta önce
2 hafta önce
2 hafta önce
3 hafta önce
2 hafta önce
3 hafta önce
3 hafta önce
3 hafta önce
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389
  1. from celery_app import celery_app
  2. from fastapi import Request,FastAPI
  3. import time,datetime
  4. from celery import Celery
  5. import celery.schedules
  6. from redbeat import RedBeatSchedulerEntry
  7. from datetime import timedelta
  8. from services.redis_service import RedisService
  9. from services.kafka_service import KafkaService
  10. from services.gewe_service import GeWeService
  11. from common.log import logger
  12. import asyncio,random
  13. from model.models import AddGroupContactsHistory
  14. @celery_app.task(name='tasks.add_task', bind=True, acks_late=True)
  15. def add_task(self, x, y):
  16. time.sleep(5) # 模拟长时间计算
  17. logger.info('add')
  18. return x + y
  19. @celery_app.task(name='tasks.mul_task', bind=True, acks_late=True)
  20. def mul_task(self, x, y):
  21. time.sleep(5) # 模拟长时间计算
  22. return x * y
  23. # @celery.task(name='app.tasks.sync_contacts', bind=True, acks_late=True)
  24. # async def sync_contacts_task(self,app):
  25. # login_keys = list(await app.state.redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'))
  26. # return login_keys
  27. # # for k in login_keys:
  28. # # print(k)
  29. @celery_app.task(name='tasks.sync_contacts', bind=True, acks_late=True)
  30. async def sync_contacts_task(self, redis_service):
  31. # Use the redis_service passed as an argument
  32. login_keys = list(await redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'))
  33. return login_keys
  34. @celery_app.task(name='tasks.background_worker_task', bind=True, acks_late=True)
  35. def background_worker_task(self, redis_config, kafka_config, gewe_config):
  36. async def task():
  37. redis_service = RedisService()
  38. await redis_service.init(**redis_config)
  39. login_keys = []
  40. async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
  41. login_keys.append(key)
  42. print(login_keys)
  43. asyncio.run(task())
  44. # @celery.task(name='tasks.background_worker_task', bind=True, acks_late=True)
  45. # async def background_worker_task(self, redis_config, kafka_config, gewe_config):
  46. # # Initialize services inside the task
  47. # redis_service = RedisService()
  48. # await redis_service.init(**redis_config)
  49. # login_keys = []
  50. # async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): # 使用 async for 遍历异步生成器
  51. # login_keys.append(key)
  52. # print(login_keys)
  53. # kafka_service = KafkaService(**kafka_config)
  54. # await kafka_service.start()
  55. # gewe_service = await GeWeService.get_instance(None, gewe_config['api_url'])
  56. # # Task logic
  57. # lock_name = "background_wxchat_thread_lock"
  58. # lock_identifier = str(time.time())
  59. # while True:
  60. # if await redis_service.acquire_lock(lock_name, timeout=10):
  61. # try:
  62. # logger.info("分布式锁已成功获取")
  63. # # Perform task logic
  64. # finally:
  65. # await redis_service.release_lock(lock_name, lock_identifier)
  66. # break
  67. # else:
  68. # logger.info("获取分布式锁失败,等待10秒后重试...")
  69. # await asyncio.sleep(10)
  70. # @celery_app.task(name='tasks.scheduled_task', bind=True, acks_late=True)
  71. # def scheduled_task(self):
  72. # print("定时任务执行成功!~~~~~~~~~~~~~~~~~")
  73. # return "Hello from Celery Beat + RedBeat!"
  74. # @celery_app.task(name='tasks.scheduled_task_sync_wx', bind=True, acks_late=True)
  75. # def scheduled_task_sync_wx(self,redis_service,kafka_service,gewe_service):
  76. # print("scheduled_task_sync_wx 定时任务执行成功!")
  77. # return "Hello from Celery Beat + RedBeat!"
  78. # @celery_app.task(name='tasks.scheduled_task_sync_wx_info_1', bind=True, acks_late=True)
  79. # def scheduled_task_sync_wx_info_1(self,redis_config, kafka_config, gewe_config):
  80. # '''
  81. # 定时获取微信号资料
  82. # '''
  83. # loop = asyncio.new_event_loop()
  84. # asyncio.set_event_loop(loop)
  85. # async def task():
  86. # try:
  87. # redis_service = RedisService()
  88. # await redis_service.init(**redis_config)
  89. # # gewe_service = await GeWeService.get_instance(None, gewe_config['api_url'])
  90. # login_keys = []
  91. # async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
  92. # login_keys.append(key)
  93. # print(login_keys)
  94. # # for k in login_keys:
  95. # # r = await redis_service.get_hash(k)
  96. # # app_id = r.get("appId")
  97. # # token_id = r.get("tokenId")
  98. # # wxid = r.get("wxid")
  99. # # status = r.get('status')
  100. # # if status == '0':
  101. # # continue
  102. # # ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id)
  103. # # if ret != 200:
  104. # # logger.warning(f"同步微信号 {wxid} 资料失败: {ret}-{msg}")
  105. # # continue
  106. # # nickname=profile.get("nickName")
  107. # # head_img_url=profile.get("smallHeadImgUrl")
  108. # # r.update({"nickName":nickname,"headImgUrl":head_img_url,"modify_at":int(time.time())})
  109. # # cleaned_login_info = {k: (v if v is not None else '') for k, v in r.items()}
  110. # # await redis_service.set_hash(k, cleaned_login_info)
  111. # # logger.info(f"同步微信号 {wxid} 资料 成功")
  112. # # redis_service.update_hash_field(k,"nickName",nickname)
  113. # # redis_service.update_hash_field(k,"headImgUrl",head_img_url)
  114. # # redis_service.update_hash_field(k,"modify_at",int(time.time()))
  115. # except Exception as e:
  116. # logger.error(f"任务执行过程中发生异常: {e}")
  117. # print("scheduled_task_sync_wx_info 定时任务执行成功!")
  118. # return "Hello from Celery Beat + RedBeat!"
  119. # loop.run_until_complete(task())
  120. # loop.close()
  121. @celery_app.task(name='tasks.scheduled_task_sync_wx_info', bind=True, acks_late=True)
  122. def scheduled_task_sync_wx_info(self, redis_config, kafka_config, gewe_config):
  123. '''
  124. 定时获取微信号资料
  125. '''
  126. async def task():
  127. try:
  128. redis_service = RedisService()
  129. await redis_service.init(**redis_config)
  130. gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url'])
  131. login_keys = []
  132. async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
  133. login_keys.append(key)
  134. print(login_keys)
  135. for k in login_keys:
  136. r = await redis_service.get_hash(k)
  137. app_id = r.get("appId")
  138. token_id = r.get("tokenId")
  139. wxid = r.get("wxid")
  140. status = r.get('status')
  141. if status == '0':
  142. logger.warning(f"微信号 {wxid} 已经离线")
  143. continue
  144. ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id)
  145. if ret != 200:
  146. logger.warning(f"同步微信号 {wxid} 资料失败: {ret}-{msg}")
  147. continue
  148. nickname=profile.get("nickName")
  149. head_img_url=profile.get("smallHeadImgUrl")
  150. # print(nickname)
  151. nickname=profile.get("nickName")
  152. head_img_url=profile.get("smallHeadImgUrl")
  153. r.update({"nickName":nickname,"headImgUrl":head_img_url,"modify_at":int(time.time())})
  154. cleaned_login_info = {k: (v if v is not None else '') for k, v in r.items()}
  155. await redis_service.set_hash(k, cleaned_login_info)
  156. logger.info(f"定时同步微信号{wxid}-昵称{nickname} 资料成功")
  157. except Exception as e:
  158. logger.error(f"任务执行过程中发生异常: {e}")
  159. loop = asyncio.get_event_loop()
  160. if loop.is_closed():
  161. loop = asyncio.new_event_loop()
  162. asyncio.set_event_loop(loop)
  163. loop.run_until_complete(task()) # 在现有事件循环中运行任务
  164. @celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms', bind=True, acks_late=True)
  165. def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config, gewe_config):
  166. async def task():
  167. try:
  168. logger.info('定时群成员定时添好友任务开始')
  169. redis_service = RedisService()
  170. await redis_service.init(**redis_config)
  171. gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url'])
  172. login_keys = []
  173. async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
  174. login_keys.append(key)
  175. print(login_keys)
  176. for k in login_keys:
  177. r = await redis_service.get_hash(k)
  178. app_id = r.get("appId")
  179. token_id = r.get("tokenId")
  180. wxid = r.get("wxid")
  181. status = r.get('status')
  182. if status == '0':
  183. logger.warning(f"微信号 {wxid} 已经离线,群成员不能定时添加")
  184. continue
  185. c = await gewe_service.get_wxchat_config_from_cache_async(wxid)
  186. contacts = await gewe_service.get_contacts_brief_from_cache_async(wxid)
  187. contact_wxids = [c.get('userName') for c in contacts]
  188. chatrooms = c.get('addContactsFromChatroomIdWhiteList', [])
  189. for chatroom_id in chatrooms:
  190. chatroom = await gewe_service.get_group_info_from_cache_async(wxid, chatroom_id)
  191. chatroom_member=await gewe_service.get_group_members_from_cache_async(wxid, chatroom_id)
  192. chatroom_nickname = chatroom.get('nickName')
  193. chatroom_owner_wxid = chatroom_member.get('chatroomOwner', None)
  194. admin_wxids = chatroom_member.get('adminWxid', [])
  195. admin_wxids = chatroom_member.get('adminWxid')
  196. if admin_wxids is None:
  197. admin_wxids = [] # 如果 admin_wxids 是 None,将其初始化为空列表
  198. logger.info(f'{chatroom_nickname} 的群主是 {chatroom_owner_wxid},管理员是{admin_wxids}')
  199. contact_wxids_set = set(contact_wxids)
  200. # for admin_wxid in admin_wxids:
  201. # contact_wxids_set.add(admin_wxid)
  202. contact_wxids_set.update(admin_wxids)
  203. if chatroom_owner_wxid is not None:
  204. contact_wxids_set.add(chatroom_owner_wxid)
  205. contact_wxids_set.add(wxid)
  206. unavailable_wixds=await gewe_service.check_wixd_group_add_contacts_history_async(wxid,chatroom_id)
  207. contact_wxids_set.update(unavailable_wixds)
  208. chatroot_member_list = chatroom.get('memberList', [])
  209. remaining_chatroot_members = [x for x in chatroot_member_list if x.get('wxid') not in contact_wxids_set]
  210. nickname = next((member['nickName'] for member in chatroot_member_list if member['wxid'] == wxid), None)
  211. logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里还可以邀请的好友有:{[x.get("nickName") for x in remaining_chatroot_members]}')
  212. for m in remaining_chatroot_members:
  213. ret, msg, data = await gewe_service.add_group_member_as_friend_async(token_id, app_id, chatroom_id, m.get('wxid'), f'我是群聊"{chatroom_nickname}"群的{nickname}')
  214. if ret==200:
  215. contact_wxids= m.get('wxid')
  216. history=AddGroupContactsHistory.model_validate({
  217. "chatroomId":chatroom_id,
  218. "wxid":wxid,
  219. "contactWixd":contact_wxids,
  220. "addTime":int(time.time())
  221. })
  222. await gewe_service.save_group_add_contacts_history_async(wxid,chatroom_id,contact_wxids,history)
  223. else:
  224. logger.info(f'群好友邀请失败原因:{data}')
  225. logger.info(f'{nickname} 向 {chatroom_nickname}-{chatroom_id} 群的 {m.get("nickName")}-{m.get("wxid")} 发送好友邀请 {msg}')
  226. await asyncio.sleep(random.uniform(1.5, 3))
  227. await asyncio.sleep(random.uniform(1.5, 3))
  228. except Exception as e:
  229. logger.error(f"任务执行过程中发生异常: {e}")
  230. loop = asyncio.get_event_loop()
  231. if loop.is_closed():
  232. loop = asyncio.new_event_loop()
  233. asyncio.set_event_loop(loop)
  234. loop.run_until_complete(task()) # 在现有事件循环中运行任务
  235. REDIS_KEY_PATTERN = "friend_add_limit:{date}"
  236. REDIS_LAST_RUN_KEY = "last_run_time:add_friends_task"
  237. @celery_app.task(name='tasks.add_friends_task', bind=True, acks_late=True)
  238. def add_friends_task(self,redis_config):
  239. """
  240. 限制每天最多 15 个,每 2 小时最多 8 个
  241. """
  242. async def task():
  243. redis_service = RedisService()
  244. await redis_service.init(**redis_config)
  245. today_str = datetime.datetime.now().strftime("%Y%m%d")
  246. redis_key = REDIS_KEY_PATTERN.format(date=today_str)
  247. # 获取当前总添加数量
  248. total_added = await redis_service.get_hash_field(redis_key, "total") or 0
  249. last_2h_added =await redis_service.get_hash_field(redis_key, "last_2h") or 0
  250. total_added = int(total_added)
  251. last_2h_added = int(last_2h_added)
  252. logger.info(f"当前添加好友总数: {total_added}, 过去2小时添加: {last_2h_added}")
  253. # 判断是否超过限制
  254. if total_added >= 15:
  255. logger.warning("今日好友添加已达上限!")
  256. return
  257. if last_2h_added >= 8:
  258. logger.warning("过去2小时添加已达上限!")
  259. return
  260. # 计算本次要添加的好友数量 (控制每天 5-15 个)
  261. max_add = min(15 - total_added, 8 - last_2h_added)
  262. if max_add <= 0:
  263. return
  264. num_to_add = min(max_add, 1) # 每次最多加 1 个
  265. logger.info(f"本次添加 {num_to_add} 位好友")
  266. # TODO: 调用好友添加逻辑 (接口 or 业务逻辑)
  267. # success = add_friends(num_to_add)
  268. success = num_to_add # 假设成功添加 num_to_add 个
  269. # 更新 Redis 计数
  270. if success > 0:
  271. await redis_service.increment_hash_field(redis_key, "total", success)
  272. await redis_service.increment_hash_field(redis_key, "last_2h", success)
  273. # 设置 Redis 过期时间 (每日记录存 1 天, 2 小时记录存 2 小时)
  274. await redis_service.expire(redis_key, 86400) # 24小时
  275. await redis_service.expire_field(redis_key, "last_2h", 7200) # 2小时
  276. logger.info(f"成功添加 {success} 位好友, 今日总数 {total_added + success}")
  277. # 生成一个新的随机时间(5-15 分钟之间)
  278. # next_interval = random.randint(10, 20)
  279. # # 计算新的执行时间
  280. # next_run_time = datetime.datetime.now() + timedelta(seconds=next_interval)
  281. # # 重新注册 RedBeat 任务,确保下次执行时间不同
  282. # redbeat_entry = RedBeatSchedulerEntry(
  283. # name="redbeat:add_friends_task",
  284. # task="tasks.add_friends_task",
  285. # schedule=celery.schedules.schedule(timedelta(seconds=next_interval)),
  286. # args=[redis_config],
  287. # app=celery_app
  288. # )
  289. # # 设置任务的下次执行时间
  290. # redbeat_entry.last_run_at = next_run_time
  291. # redbeat_entry.save()
  292. # logger.info(f"下次任务将在 {next_run_time} 执行(间隔 {next_interval} 秒)")
  293. loop = asyncio.get_event_loop()
  294. if loop.is_closed():
  295. loop = asyncio.new_event_loop()
  296. asyncio.set_event_loop(loop)
  297. loop.run_until_complete(task()) # 在现有事件循环中运行任务
  298. @celery_app.task(name='tasks.random_scheduled_task', bind=True, acks_late=True)
  299. def random_scheduled_task(self,):
  300. print(f"Task executed at {datetime.datetime.now()}")
  301. # 随机生成下次执行时间(例如:10-60秒内的随机时间)
  302. next_run_in = random.randint(10, 60)
  303. print(f"Next execution will be in {next_run_in} seconds")
  304. # 设置下次执行时间
  305. entry = RedBeatSchedulerEntry(
  306. name='random-task',
  307. task='tasks.random_scheduled_task',
  308. schedule=timedelta(seconds=next_run_in),
  309. app=celery_app
  310. )
  311. entry.save()
  312. return f"Scheduled next run in {next_run_in} seconds"