You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

863 lines
42KB

  1. from celery_app import celery_app
  2. from fastapi import Request,FastAPI
  3. import time,datetime
  4. from celery import Celery
  5. import celery.schedules
  6. from redbeat import RedBeatSchedulerEntry
  7. from datetime import timedelta
  8. from services.redis_service import RedisService
  9. from services.kafka_service import KafkaService
  10. from services.gewe_service import GeWeService
  11. # from common.log import logger
  12. from common.utils import *
  13. import asyncio,random
  14. from model.models import AddGroupContactsHistory
  15. import logging
  16. import logging
  17. logger = logging.getLogger('redbeat')
  18. @celery_app.task(name='tasks.add_task', bind=True, acks_late=True)
  19. def add_task(self, x, y):
  20. time.sleep(5) # 模拟长时间计算
  21. logger.info('add')
  22. return x + y
  23. @celery_app.task(name='tasks.mul_task', bind=True, acks_late=True)
  24. def mul_task(self, x, y):
  25. time.sleep(5) # 模拟长时间计算
  26. return x * y
  27. # @celery.task(name='app.tasks.sync_contacts', bind=True, acks_late=True)
  28. # async def sync_contacts_task(self,app):
  29. # login_keys = list(await app.state.redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'))
  30. # return login_keys
  31. # # for k in login_keys:
  32. # # print(k)
  33. @celery_app.task(name='tasks.sync_contacts', bind=True, acks_late=True)
  34. async def sync_contacts_task(self, redis_service):
  35. # Use the redis_service passed as an argument
  36. login_keys = list(await redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'))
  37. return login_keys
  38. @celery_app.task(name='tasks.background_worker_task', bind=True, acks_late=True)
  39. def background_worker_task(self, redis_config, kafka_config, gewe_config):
  40. async def task():
  41. redis_service = RedisService()
  42. await redis_service.init(**redis_config)
  43. login_keys = []
  44. async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
  45. login_keys.append(key)
  46. print(login_keys)
  47. asyncio.run(task())
  48. # @celery.task(name='tasks.background_worker_task', bind=True, acks_late=True)
  49. # async def background_worker_task(self, redis_config, kafka_config, gewe_config):
  50. # # Initialize services inside the task
  51. # redis_service = RedisService()
  52. # await redis_service.init(**redis_config)
  53. # login_keys = []
  54. # async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): # 使用 async for 遍历异步生成器
  55. # login_keys.append(key)
  56. # print(login_keys)
  57. # kafka_service = KafkaService(**kafka_config)
  58. # await kafka_service.start()
  59. # gewe_service = await GeWeService.get_instance(None, gewe_config['api_url'])
  60. # # Task logic
  61. # lock_name = "background_wxchat_thread_lock"
  62. # lock_identifier = str(time.time())
  63. # while True:
  64. # if await redis_service.acquire_lock(lock_name, timeout=10):
  65. # try:
  66. # logger.info("分布式锁已成功获取")
  67. # # Perform task logic
  68. # finally:
  69. # await redis_service.release_lock(lock_name, lock_identifier)
  70. # break
  71. # else:
  72. # logger.info("获取分布式锁失败,等待10秒后重试...")
  73. # await asyncio.sleep(10)
  74. # @celery_app.task(name='tasks.scheduled_task', bind=True, acks_late=True)
  75. # def scheduled_task(self):
  76. # print("定时任务执行成功!~~~~~~~~~~~~~~~~~")
  77. # return "Hello from Celery Beat + RedBeat!"
  78. # @celery_app.task(name='tasks.scheduled_task_sync_wx', bind=True, acks_late=True)
  79. # def scheduled_task_sync_wx(self,redis_service,kafka_service,gewe_service):
  80. # print("scheduled_task_sync_wx 定时任务执行成功!")
  81. # return "Hello from Celery Beat + RedBeat!"
  82. # @celery_app.task(name='tasks.scheduled_task_sync_wx_info_1', bind=True, acks_late=True)
  83. # def scheduled_task_sync_wx_info_1(self,redis_config, kafka_config, gewe_config):
  84. # '''
  85. # 定时获取微信号资料
  86. # '''
  87. # loop = asyncio.new_event_loop()
  88. # asyncio.set_event_loop(loop)
  89. # async def task():
  90. # try:
  91. # redis_service = RedisService()
  92. # await redis_service.init(**redis_config)
  93. # # gewe_service = await GeWeService.get_instance(None, gewe_config['api_url'])
  94. # login_keys = []
  95. # async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
  96. # login_keys.append(key)
  97. # print(login_keys)
  98. # # for k in login_keys:
  99. # # r = await redis_service.get_hash(k)
  100. # # app_id = r.get("appId")
  101. # # token_id = r.get("tokenId")
  102. # # wxid = r.get("wxid")
  103. # # status = r.get('status')
  104. # # if status == '0':
  105. # # continue
  106. # # ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id)
  107. # # if ret != 200:
  108. # # logger.warning(f"同步微信号 {wxid} 资料失败: {ret}-{msg}")
  109. # # continue
  110. # # nickname=profile.get("nickName")
  111. # # head_img_url=profile.get("smallHeadImgUrl")
  112. # # r.update({"nickName":nickname,"headImgUrl":head_img_url,"modify_at":int(time.time())})
  113. # # cleaned_login_info = {k: (v if v is not None else '') for k, v in r.items()}
  114. # # await redis_service.set_hash(k, cleaned_login_info)
  115. # # logger.info(f"同步微信号 {wxid} 资料 成功")
  116. # # redis_service.update_hash_field(k,"nickName",nickname)
  117. # # redis_service.update_hash_field(k,"headImgUrl",head_img_url)
  118. # # redis_service.update_hash_field(k,"modify_at",int(time.time()))
  119. # except Exception as e:
  120. # logger.error(f"任务执行过程中发生异常: {e}")
  121. # print("scheduled_task_sync_wx_info 定时任务执行成功!")
  122. # return "Hello from Celery Beat + RedBeat!"
  123. # loop.run_until_complete(task())
  124. # loop.close()
  125. @celery_app.task(name='tasks.scheduled_task_sync_wx_info', bind=True, acks_late=True)
  126. def scheduled_task_sync_wx_info(self, redis_config, kafka_config, gewe_config):
  127. '''
  128. 定时获取微信号资料
  129. '''
  130. async def task():
  131. try:
  132. redis_service = RedisService()
  133. await redis_service.init(**redis_config)
  134. gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url'])
  135. login_keys = []
  136. async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
  137. login_keys.append(key)
  138. print(login_keys)
  139. for k in login_keys:
  140. r = await redis_service.get_hash(k)
  141. app_id = r.get("appId")
  142. token_id = r.get("tokenId")
  143. wxid = r.get("wxid")
  144. status = r.get('status')
  145. if status == '0':
  146. logger.warning(f"微信号 {wxid} 已经离线")
  147. continue
  148. ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id)
  149. if ret != 200:
  150. logger.warning(f"同步微信号 {wxid} 资料失败: {ret}-{msg}")
  151. continue
  152. nickname=profile.get("nickName")
  153. head_img_url=profile.get("smallHeadImgUrl")
  154. # print(nickname)
  155. nickname=profile.get("nickName")
  156. head_img_url=profile.get("smallHeadImgUrl")
  157. r.update({"nickName":nickname,"headImgUrl":head_img_url,"modify_at":int(time.time())})
  158. cleaned_login_info = {k: (v if v is not None else '') for k, v in r.items()}
  159. await redis_service.set_hash(k, cleaned_login_info)
  160. logger.info(f"定时同步微信号{wxid}-昵称{nickname} 资料成功")
  161. except Exception as e:
  162. logger.error(f"任务执行过程中发生异常: {e}")
  163. loop = asyncio.get_event_loop()
  164. if loop.is_closed():
  165. loop = asyncio.new_event_loop()
  166. asyncio.set_event_loop(loop)
  167. loop.run_until_complete(task()) # 在现有事件循环中运行任务
  168. #@celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms_p', bind=True, acks_late=True)
  169. def scheduled_task_add_contacts_from_chatrooms_p(self, redis_config, kafka_config, gewe_config):
  170. async def task():
  171. try:
  172. now = datetime.now()
  173. if now.hour < 8:
  174. logger.info(f"定时群成员定时添好友任务不启动,当前时间为 {now.strftime('%Y-%m-%d %H:%M:%S')},早于8点")
  175. return
  176. logger.info('定时群成员定时添好友任务开始')
  177. redis_service = RedisService()
  178. await redis_service.init(**redis_config)
  179. gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url'])
  180. login_keys = []
  181. async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
  182. login_keys.append(key)
  183. #print(login_keys)
  184. for k in login_keys:
  185. r = await redis_service.get_hash(k)
  186. app_id = r.get("appId")
  187. token_id = r.get("tokenId")
  188. wxid = r.get("wxid")
  189. status = r.get('status')
  190. if status == '0':
  191. logger.warning(f"微信号 {wxid} 已经离线,群成员不能定时添加")
  192. continue
  193. c = await gewe_service.get_wxchat_config_from_cache_async(wxid)
  194. contacts = await gewe_service.get_contacts_brief_from_cache_async(wxid)
  195. contact_wxids = [c.get('userName') for c in contacts]
  196. chatrooms = c.get('addContactsFromChatroomIdWhiteList', [])
  197. logger.info(f'{wxid} 定时群成员定时添好友任务开始')
  198. for chatroom_id in chatrooms:
  199. chatroom = await gewe_service.get_group_info_from_cache_async(wxid, chatroom_id)
  200. chatroom_member=await gewe_service.get_group_members_from_cache_async(wxid, chatroom_id)
  201. chatroom_nickname = chatroom.get('nickName')
  202. chatroom_owner_wxid = chatroom_member.get('chatroomOwner', None)
  203. admin_wxids = chatroom_member.get('adminWxid', [])
  204. admin_wxids = chatroom_member.get('adminWxid')
  205. if admin_wxids is None:
  206. admin_wxids = [] # 如果 admin_wxids 是 None,将其初始化为空列表
  207. # 判断当天群成员是否已经加了200个好友
  208. is_add_group_200_times = await gewe_service.is_group_add_contacts_history_one_day_200_async(wxid, chatroom_id)
  209. if is_add_group_200_times:
  210. logger.info(f"{wxid}在 {chatroom_nickname} 群成员已经加了200个好友,不再添加,群id:{chatroom_id}")
  211. continue
  212. logger.info(f'{chatroom_nickname} 的群主是 {chatroom_owner_wxid},管理员是{admin_wxids}')
  213. contact_wxids_set = set(contact_wxids)
  214. # for admin_wxid in admin_wxids:
  215. # contact_wxids_set.add(admin_wxid)
  216. if admin_wxids:
  217. contact_wxids_set.update(set(admin_wxids))
  218. contact_wxids_set.update(set(admin_wxids))
  219. if chatroom_owner_wxid is not None:
  220. contact_wxids_set.add(chatroom_owner_wxid)
  221. contact_wxids_set.add(wxid)
  222. # unavailable_wixds=await gewe_service.check_wixd_group_add_contacts_history_async(wxid,chatroom_id)
  223. # contact_wxids_set.update(unavailable_wixds)
  224. chatroot_member_list = chatroom.get('memberList', [])
  225. remaining_chatroot_members = [x for x in chatroot_member_list if x.get('wxid') not in contact_wxids_set]
  226. nickname = next((member['nickName'] for member in chatroot_member_list if member['wxid'] == wxid), None)
  227. logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里还可以邀请的好友有:{[x.get("nickName") for x in remaining_chatroot_members]}')
  228. for m in remaining_chatroot_members:
  229. contact_wxid= m.get('wxid')
  230. member_nickname=m.get("nickName")
  231. group_add_contacts_history = await gewe_service.get_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid)
  232. sorted_history = sorted(group_add_contacts_history, key=lambda x: x.addTime, reverse=True)
  233. # 已经邀请过两次,不再邀请
  234. if len(sorted_history)==2:
  235. logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经邀请过2次,不再邀请')
  236. continue
  237. # 当天邀请过,不再邀请
  238. if len(sorted_history) > 0:
  239. last_add_time = sorted_history[0].addTime
  240. def is_add_time_more_than_one_day(addTime: int) -> bool:
  241. """
  242. 判断 addTime 是否与当前时间相隔大于 3600 × 24 秒
  243. :param addTime: Unix 时间戳
  244. :return: 如果 addTime 与当前时间相隔大于 3600 × 24 秒,返回 True;否则返回 False
  245. """
  246. # 获取当前时间的时间戳
  247. current_time = time.time()
  248. # 计算时间戳差值
  249. time_difference = abs(current_time - addTime)
  250. # 检查是否大于 3600 × 24 秒
  251. return time_difference > 3600 * 24
  252. is_more_than_one_day= is_add_time_more_than_one_day(last_add_time)
  253. if not is_more_than_one_day:
  254. logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的{member_nickname}-{contact_wxid}已经当天邀请,不再邀请')
  255. continue
  256. ret, msg, data = await gewe_service.add_group_member_as_friend_async(token_id, app_id, chatroom_id, m.get('wxid'), f'我是群聊"{chatroom_nickname}"群的{nickname}')
  257. # if ret==200:
  258. # history=AddGroupContactsHistory.model_validate({
  259. # "chatroomId":chatroom_id,
  260. # "wxid":wxid,
  261. # "contactWixd":contact_wxid,
  262. # "addTime":int(time.time())
  263. # })
  264. # await gewe_service.save_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid,history)
  265. # else:
  266. # logger.info(f'群好友邀请失败原因:{data}')
  267. if ret!=200:
  268. logger.warning(f'群好友邀请失败原因:{ret} {data}')
  269. history=AddGroupContactsHistory.model_validate({
  270. "chatroomId":chatroom_id,
  271. "wxid":wxid,
  272. "contactWixd":contact_wxid,
  273. "addTime":int(time.time())
  274. })
  275. await gewe_service.save_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid,history)
  276. logger.info(f'{nickname} 向 {chatroom_nickname}-{chatroom_id} 群的 {m.get("nickName")}-{m.get("wxid")} 发送好友邀请 {msg}')
  277. await asyncio.sleep(random.uniform(1.5, 3))
  278. await asyncio.sleep(random.uniform(1.5, 3))
  279. except Exception as e:
  280. logger.error(f"任务执行过程中发生异常: {e}")
  281. loop = asyncio.get_event_loop()
  282. if loop.is_closed():
  283. loop = asyncio.new_event_loop()
  284. asyncio.set_event_loop(loop)
  285. loop.run_until_complete(task()) # 在现有事件循环中运行任务
  286. #@celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms_p2', bind=True, acks_late=True)
  287. def scheduled_task_add_contacts_from_chatrooms_p2(self, redis_config, kafka_config, gewe_config):
  288. '''
  289. 关于群加好友的请求规则:一次30人,间隔2小时做1次,一天做3次,即最多90人/天。
  290. '''
  291. async def task():
  292. try:
  293. now = datetime.now()
  294. if now.hour < 8:
  295. logger.info(f"定时群成员定时添好友任务不启动,当前时间为 {now.strftime('%Y-%m-%d %H:%M:%S')},早于8点")
  296. return
  297. logger.info('定时群成员定时添好友任务开始')
  298. redis_service = RedisService()
  299. await redis_service.init(**redis_config)
  300. gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url'])
  301. KAFKA_BOOTSTRAP_SERVERS=kafka_config['bootstrap_servers']
  302. KAFKA_TOPIC=kafka_config['topic']
  303. KAFKA_GROUP_ID=kafka_config['group_id']
  304. kafka_service= KafkaService(KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC, KAFKA_TOPIC,KAFKA_GROUP_ID)
  305. await kafka_service.start()
  306. login_keys = []
  307. async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
  308. login_keys.append(key)
  309. wixd_add_contacts_from_chatrooms_times = {}
  310. #print(login_keys)
  311. for k in login_keys:
  312. r = await redis_service.get_hash(k)
  313. app_id = r.get("appId")
  314. token_id = r.get("tokenId")
  315. wxid = r.get("wxid")
  316. status = r.get('status')
  317. if status == '0':
  318. logger.warning(f"微信号 {wxid} 已经离线,群成员不能定时添加")
  319. continue
  320. c = await gewe_service.get_wxchat_config_from_cache_async(wxid)
  321. contacts = await gewe_service.get_contacts_brief_from_cache_async(wxid)
  322. contact_wxids = [c.get('userName') for c in contacts]
  323. chatrooms = c.get('addContactsFromChatroomIdWhiteList', [])
  324. logger.info(f'{wxid} 定时群成员定时添好友任务开始')
  325. wixd_add_contacts_from_chatrooms_times[wxid] = 0
  326. for chatroom_id in chatrooms:
  327. chatroom = await gewe_service.get_group_info_from_cache_async(wxid, chatroom_id)
  328. chatroom_member=await gewe_service.get_group_members_from_cache_async(wxid, chatroom_id)
  329. chatroom_nickname = chatroom.get('nickName')
  330. chatroom_owner_wxid = chatroom_member.get('chatroomOwner', None)
  331. admin_wxids = chatroom_member.get('adminWxid', [])
  332. admin_wxids = chatroom_member.get('adminWxid')
  333. if admin_wxids is None:
  334. admin_wxids = [] # 如果 admin_wxids 是 None,将其初始化为空列表
  335. logger.info(f'{chatroom_nickname} 的群主是 {chatroom_owner_wxid},管理员是{admin_wxids}')
  336. contact_wxids_set = set(contact_wxids)
  337. # for admin_wxid in admin_wxids:
  338. # contact_wxids_set.add(admin_wxid)
  339. if admin_wxids:
  340. contact_wxids_set.update(set(admin_wxids))
  341. if chatroom_owner_wxid is not None:
  342. contact_wxids_set.add(chatroom_owner_wxid)
  343. contact_wxids_set.add(wxid)
  344. unavailable_wixds=await gewe_service.check_wixd_group_add_contacts_history_async(wxid,chatroom_id)
  345. if unavailable_wixds:
  346. contact_wxids_set.update(set(unavailable_wixds))
  347. chatroom_member_list = chatroom.get('memberList', [])
  348. if chatroom_member_list is None:
  349. chatroom_member_list = [] # 如果 memberList 是 None,将其初始化为空列表
  350. elif not isinstance(chatroom_member_list, list):
  351. chatroom_member_list = list(chatroom_member_list) # 如果 memberList 不是列表,将其转换为列表
  352. remaining_chatroot_members = [x for x in chatroom_member_list if x.get('wxid') not in contact_wxids_set]
  353. nickname = next((member['nickName'] for member in chatroom_member_list if member['wxid'] == wxid), None)
  354. logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里还可以邀请的好友有:{[x.get("nickName") for x in remaining_chatroot_members]}')
  355. for m in remaining_chatroot_members:
  356. # 判断本次任务是否已经邀请了30个好友
  357. if wixd_add_contacts_from_chatrooms_times[wxid] == 30:
  358. logger.info(f"{wxid} 本次任务已经邀请了30人,不再邀请")
  359. return
  360. # 判断当天群成员是否已经加了90个好友
  361. is_add_group_90_times = await gewe_service.is_group_add_contacts_history_one_day_90_async(wxid)
  362. if is_add_group_90_times:
  363. logger.info(f"当天 {wxid} 所有群的成员已经加了90个好友,不再添加")
  364. return
  365. contact_wxid= m.get('wxid')
  366. member_nickname=m.get("nickName")
  367. group_add_contacts_history = await gewe_service.get_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid)
  368. sorted_history = sorted(group_add_contacts_history, key=lambda x: x.addTime, reverse=True)
  369. # 已经邀请过两次,不再邀请
  370. if len(sorted_history)==2:
  371. logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经邀请过2次,不再邀请')
  372. continue
  373. # 当天邀请过,不再邀请
  374. if len(sorted_history) > 0:
  375. last_add_time = sorted_history[0].addTime
  376. def is_add_time_more_than_one_day(addTime: int) -> bool:
  377. """
  378. 判断 addTime 是否与当前时间相隔大于 3600 × 24 秒
  379. :param addTime: Unix 时间戳
  380. :return: 如果 addTime 与当前时间相隔大于 3600 × 24 秒,返回 True;否则返回 False
  381. """
  382. # 获取当前时间的时间戳
  383. current_time = time.time()
  384. # 计算时间戳差值
  385. time_difference = abs(current_time - addTime)
  386. # 检查是否大于 3600 × 24 秒
  387. return time_difference > 3600 * 24
  388. is_more_than_one_day= is_add_time_more_than_one_day(last_add_time)
  389. if not is_more_than_one_day:
  390. logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的{member_nickname}-{contact_wxid}已经当天邀请,不再邀请')
  391. continue
  392. ret, msg, data = await gewe_service.add_group_member_as_friend_async(token_id, app_id, chatroom_id, m.get('wxid'), f'我是群聊"{chatroom_nickname}"群的{nickname}')
  393. if ret!=200:
  394. logger.warning(f'群好友邀请失败原因:{ret} {data}')
  395. history=AddGroupContactsHistory.model_validate({
  396. "chatroomId":chatroom_id,
  397. "wxid":wxid,
  398. "contactWixd":contact_wxid,
  399. "addTime":int(time.time())
  400. })
  401. await gewe_service.save_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid,history)
  402. wixd_add_contacts_from_chatrooms_times[wxid]+=1
  403. logger.info(f'{nickname} 向 {chatroom_nickname}-{chatroom_id} 群的 {m.get("nickName")}-{m.get("wxid")} 发送好友邀请 {msg}')
  404. # 推送到kafka
  405. k_message = wx_add_contacts_from_chatroom_message(history.wxid,history.chatroomId,history.contactWixd,history.addTime)
  406. await kafka_service.send_message_async(k_message)
  407. await asyncio.sleep(random.uniform(1.5, 3))
  408. await asyncio.sleep(random.uniform(1.5, 3))
  409. except Exception as e:
  410. logger.error(f"任务执行过程中发生异常: {e}")
  411. loop = asyncio.get_event_loop()
  412. if loop.is_closed():
  413. loop = asyncio.new_event_loop()
  414. asyncio.set_event_loop(loop)
  415. loop.run_until_complete(task()) # 在现有事件循环中运行任务
  416. @celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms', bind=True, acks_late=True)
  417. def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config, gewe_config):
  418. '''
  419. 关于群加好友的请求规则:一次30人,间隔2小时做1次,一天做3次,即最多90人/天。
  420. 加好友规则:每天处理次数、间隔时间(分钟)、每次加好友人数这3个参数都可以设置。目前默认只是上面的设置。
  421. '''
  422. async def task():
  423. try:
  424. now = datetime.now()
  425. if now.hour < 8:
  426. logger.info(f"定时群成员定时添好友任务不启动,当前时间为 {now.strftime('%Y-%m-%d %H:%M:%S')},早于8点")
  427. return
  428. logger.info('定时群成员定时添好友任务开始')
  429. redis_service = RedisService()
  430. await redis_service.init(**redis_config)
  431. gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url'])
  432. KAFKA_BOOTSTRAP_SERVERS=kafka_config['bootstrap_servers']
  433. KAFKA_TOPIC=kafka_config['topic']
  434. KAFKA_GROUP_ID=kafka_config['group_id']
  435. kafka_service= KafkaService(KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC, KAFKA_TOPIC,KAFKA_GROUP_ID)
  436. await kafka_service.start_producer()
  437. global_config=await gewe_service.get_global_config_from_cache_async()
  438. scheduled_task_add_contacts_from_chatrooms_config=global_config.get('scheduledTaskAddContactsFromChatrooms',{})
  439. oneday_add_contacts_total=90
  440. once_add_contacts_total=30
  441. oneday_times=3
  442. if scheduled_task_add_contacts_from_chatrooms_config:
  443. oneday_add_contacts_total=scheduled_task_add_contacts_from_chatrooms_config.get('oneDayAddContactsTotal',90)
  444. once_add_contacts_total=scheduled_task_add_contacts_from_chatrooms_config.get('onceAddContactsTotal',30)
  445. oneday_times=scheduled_task_add_contacts_from_chatrooms_config.get('oneDayTimes',3)
  446. cache_task_run_time_logs= await gewe_service.get_task_run_time_async('scheduled_task_add_contacts_from_chatrooms')
  447. if len(cache_task_run_time_logs) == oneday_times:
  448. logger.info(f"今日定时群成员定时添好友任务已达上限 {oneday_times} 次!")
  449. return
  450. if cache_task_run_time_logs:
  451. sorted_tasks = sorted(cache_task_run_time_logs, key=lambda x: x.get("runTime"), reverse=True)
  452. last_run_time=sorted_tasks[0].get("runTime")
  453. if last_run_time > 1e12: # 毫秒级时间戳
  454. last_run_time = last_run_time / 1000 # 转换为秒
  455. # 将时间戳转换为 datetime 对象
  456. last_run_time = datetime.fromtimestamp(last_run_time)
  457. # 获取当前时间
  458. current_time = datetime.now()
  459. # 计算时间差
  460. time_difference = current_time - last_run_time
  461. # 判断是否相差2小时
  462. if time_difference < timedelta(hours=2):
  463. logger.info(f"上次定时群成员定时添好友任务在2小时内,不再执行")
  464. return
  465. # 获取当前时间
  466. current_time = datetime.now()
  467. # 计算当天的结束时间(23:59:59)
  468. end_of_day = datetime(current_time.year, current_time.month, current_time.day, 23, 59, 59)
  469. # 计算时间差
  470. time_difference = end_of_day - current_time
  471. # 将时间差转换为秒数
  472. time_difference_seconds = int(time_difference.total_seconds())
  473. cache_task_run_time_logs.append({"runTime":int(time.time())})
  474. await gewe_service.save_task_run_time_async('scheduled_task_add_contacts_from_chatrooms',cache_task_run_time_logs,time_difference_seconds)
  475. login_keys = []
  476. async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
  477. login_keys.append(key)
  478. wixd_add_contacts_from_chatrooms_times = {}
  479. for k in login_keys:
  480. r = await redis_service.get_hash(k)
  481. app_id = r.get("appId")
  482. token_id = r.get("tokenId")
  483. wxid = r.get("wxid")
  484. status = r.get('status')
  485. if status == '0':
  486. logger.warning(f"微信号 {wxid} 已经离线,群成员不能定时添加")
  487. continue
  488. c = await gewe_service.get_wxchat_config_from_cache_async(wxid)
  489. contacts = await gewe_service.get_contacts_brief_from_cache_async(wxid)
  490. contact_wxids = [c.get('userName') for c in contacts]
  491. chatrooms = c.get('addContactsFromChatroomIdWhiteList', [])
  492. logger.info(f'{wxid} 定时群成员定时添好友任务开始')
  493. wixd_add_contacts_from_chatrooms_times[wxid] = 0
  494. for chatroom_id in chatrooms:
  495. chatroom = await gewe_service.get_group_info_from_cache_async(wxid, chatroom_id)
  496. chatroom_member=await gewe_service.get_group_members_from_cache_async(wxid, chatroom_id)
  497. chatroom_nickname = chatroom.get('nickName')
  498. chatroom_owner_wxid = chatroom_member.get('chatroomOwner', None)
  499. admin_wxids = chatroom_member.get('adminWxid', [])
  500. admin_wxids = chatroom_member.get('adminWxid')
  501. if admin_wxids is None:
  502. admin_wxids = [] # 如果 admin_wxids 是 None,将其初始化为空列表
  503. logger.info(f'{chatroom_nickname} 的群主是 {chatroom_owner_wxid},管理员是{admin_wxids}')
  504. contact_wxids_set = set(contact_wxids)
  505. # for admin_wxid in admin_wxids:
  506. # contact_wxids_set.add(admin_wxid)
  507. if admin_wxids:
  508. contact_wxids_set.update(set(admin_wxids))
  509. if chatroom_owner_wxid is not None:
  510. contact_wxids_set.add(chatroom_owner_wxid)
  511. contact_wxids_set.add(wxid)
  512. # unavailable_wixds=await gewe_service.check_wixd_group_add_contacts_history_async(wxid,chatroom_id)
  513. # contact_wxids_set.update(set(unavailable_wixds))
  514. # chatroom_member_list = chatroom.get('memberList', [])
  515. unavailable_wixds=await gewe_service.check_wixd_group_add_contacts_history_async(wxid,chatroom_id)
  516. if unavailable_wixds:
  517. contact_wxids_set.update(set(unavailable_wixds))
  518. chatroom_member_list = chatroom.get('memberList', [])
  519. if chatroom_member_list is None:
  520. chatroom_member_list = [] # 如果 memberList 是 None,将其初始化为空列表
  521. elif not isinstance(chatroom_member_list, list):
  522. chatroom_member_list = list(chatroom_member_list) # 如果 memberList 不是列表,将其转换为列表
  523. remaining_chatroot_members = [x for x in chatroom_member_list if x.get('wxid') not in contact_wxids_set]
  524. nickname = next((member['nickName'] for member in chatroom_member_list if member['wxid'] == wxid), None)
  525. logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里还可以邀请的好友有:{[x.get("nickName") for x in remaining_chatroot_members]}')
  526. for m in remaining_chatroot_members:
  527. # 判断本次任务是否已经邀请了30个好友
  528. if wixd_add_contacts_from_chatrooms_times[wxid] == once_add_contacts_total:
  529. logger.info(f"{wxid} 本次任务已经邀请了{once_add_contacts_total}人,不再邀请")
  530. return
  531. # 判断当天群成员是否已经加了90个好友
  532. is_add_group_times = await gewe_service.is_group_add_contacts_history_one_day_async(wxid,oneday_add_contacts_total)
  533. if is_add_group_times:
  534. logger.info(f"当天 {wxid} 所有群的成员已经加了{oneday_add_contacts_total}个好友,不再添加")
  535. return
  536. contact_wxid= m.get('wxid')
  537. member_nickname=m.get("nickName")
  538. group_add_contacts_history = await gewe_service.get_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid)
  539. sorted_history = sorted(group_add_contacts_history, key=lambda x: x.addTime, reverse=True)
  540. # 已经邀请过两次,不再邀请
  541. if len(sorted_history)==2:
  542. logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经邀请过2次,不再邀请')
  543. continue
  544. # 当天邀请过,不再邀请
  545. if len(sorted_history) > 0:
  546. last_add_time = sorted_history[0].addTime
  547. def is_add_time_more_than_one_day(addTime: int) -> bool:
  548. """
  549. 判断 addTime 是否与当前时间相隔大于 3600 × 24 秒
  550. :param addTime: Unix 时间戳
  551. :return: 如果 addTime 与当前时间相隔大于 3600 × 24 秒,返回 True;否则返回 False
  552. """
  553. # 获取当前时间的时间戳
  554. current_time = time.time()
  555. # 计算时间戳差值
  556. time_difference = abs(current_time - addTime)
  557. # 检查是否大于 3600 × 24 秒
  558. return time_difference > 3600 * 24
  559. is_more_than_one_day= is_add_time_more_than_one_day(last_add_time)
  560. if not is_more_than_one_day:
  561. logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的{member_nickname}-{contact_wxid}已经当天邀请,不再邀请')
  562. continue
  563. ret, msg, data = await gewe_service.add_group_member_as_friend_async(token_id, app_id, chatroom_id, m.get('wxid'), f'我是群聊"{chatroom_nickname}"群的{nickname}')
  564. if ret!=200:
  565. logger.warning(f'群好友邀请失败原因:{ret} {data}')
  566. history=AddGroupContactsHistory.model_validate({
  567. "chatroomId":chatroom_id,
  568. "wxid":wxid,
  569. "contactWixd":contact_wxid,
  570. "addTime":int(time.time())
  571. })
  572. await gewe_service.save_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid,history)
  573. wixd_add_contacts_from_chatrooms_times[wxid]+=1
  574. logger.info(f'{nickname} 向 {chatroom_nickname}-{chatroom_id} 群的 {m.get("nickName")}-{m.get("wxid")} 发送好友邀请 {msg}')
  575. # 推送到kafka
  576. k_message = wx_add_contacts_from_chatroom_message(history.wxid,history.chatroomId,history.contactWixd,history.addTime)
  577. await kafka_service.send_message_async(k_message)
  578. await asyncio.sleep(random.uniform(1.5, 3))
  579. await asyncio.sleep(random.uniform(1.5, 3))
  580. except Exception as e:
  581. logger.error(f"任务执行过程中发生异常: {e}")
  582. finally:
  583. await kafka_service.stop_producer()
  584. loop = asyncio.get_event_loop()
  585. if loop.is_closed():
  586. loop = asyncio.new_event_loop()
  587. asyncio.set_event_loop(loop)
  588. loop.run_until_complete(task()) # 在现有事件循环中运行任务
  589. REDIS_KEY_PATTERN = "friend_add_limit:{date}"
  590. REDIS_LAST_RUN_KEY = "last_run_time:add_friends_task"
  591. @celery_app.task(name='tasks.add_friends_task', bind=True, acks_late=True)
  592. def add_friends_task(self,redis_config):
  593. """
  594. 限制每天最多 15 个,每 2 小时最多 8 个
  595. """
  596. async def task():
  597. redis_service = RedisService()
  598. await redis_service.init(**redis_config)
  599. today_str = datetime.now().strftime("%Y%m%d")
  600. redis_key = REDIS_KEY_PATTERN.format(date=today_str)
  601. # 获取当前总添加数量
  602. total_added = await redis_service.get_hash_field(redis_key, "total") or 0
  603. last_2h_added =await redis_service.get_hash_field(redis_key, "last_2h") or 0
  604. total_added = int(total_added)
  605. last_2h_added = int(last_2h_added)
  606. logger.info(f"当前添加好友总数: {total_added}, 过去2小时添加: {last_2h_added}")
  607. # 判断是否超过限制
  608. if total_added >= 15:
  609. logger.warning("今日好友添加已达上限!")
  610. return
  611. if last_2h_added >= 8:
  612. logger.warning("过去2小时添加已达上限!")
  613. return
  614. # 计算本次要添加的好友数量 (控制每天 5-15 个)
  615. max_add = min(15 - total_added, 8 - last_2h_added)
  616. if max_add <= 0:
  617. return
  618. num_to_add = min(max_add, 1) # 每次最多加 1 个
  619. logger.info(f"本次添加 {num_to_add} 位好友")
  620. # TODO: 调用好友添加逻辑 (接口 or 业务逻辑)
  621. # success = add_friends(num_to_add)
  622. success = num_to_add # 假设成功添加 num_to_add 个
  623. # 更新 Redis 计数
  624. if success > 0:
  625. await redis_service.increment_hash_field(redis_key, "total", success)
  626. await redis_service.increment_hash_field(redis_key, "last_2h", success)
  627. # 设置 Redis 过期时间 (每日记录存 1 天, 2 小时记录存 2 小时)
  628. await redis_service.expire(redis_key, 86400) # 24小时
  629. await redis_service.expire_field(redis_key, "last_2h", 7200) # 2小时
  630. logger.info(f"成功添加 {success} 位好友, 今日总数 {total_added + success}")
  631. # 生成一个新的随机时间(5-15 分钟之间)
  632. # next_interval = random.randint(10, 20)
  633. # # 计算新的执行时间
  634. # next_run_time = datetime.datetime.now() + timedelta(seconds=next_interval)
  635. # # 重新注册 RedBeat 任务,确保下次执行时间不同
  636. # redbeat_entry = RedBeatSchedulerEntry(
  637. # name="redbeat:add_friends_task",
  638. # task="tasks.add_friends_task",
  639. # schedule=celery.schedules.schedule(timedelta(seconds=next_interval)),
  640. # args=[redis_config],
  641. # app=celery_app
  642. # )
  643. # # 设置任务的下次执行时间
  644. # redbeat_entry.last_run_at = next_run_time
  645. # redbeat_entry.save()
  646. # logger.info(f"下次任务将在 {next_run_time} 执行(间隔 {next_interval} 秒)")
  647. loop = asyncio.get_event_loop()
  648. if loop.is_closed():
  649. loop = asyncio.new_event_loop()
  650. asyncio.set_event_loop(loop)
  651. loop.run_until_complete(task()) # 在现有事件循环中运行任务
  652. @celery_app.task(name='tasks.random_scheduled_task', bind=True, acks_late=True)
  653. def random_scheduled_task(self,):
  654. print(f"Task executed at {datetime.now()}")
  655. # 随机生成下次执行时间(例如:10-60秒内的随机时间)
  656. next_run_in = random.randint(10, 60)
  657. print(f"Next execution will be in {next_run_in} seconds")
  658. # 设置下次执行时间
  659. entry = RedBeatSchedulerEntry(
  660. name='random-task',
  661. task='tasks.random_scheduled_task',
  662. schedule=timedelta(seconds=next_run_in),
  663. app=celery_app
  664. )
  665. entry.save()
  666. return f"Scheduled next run in {next_run_in} seconds"