No puede seleccionar más de 25 temas Los temas deben comenzar con una letra o número, pueden incluir guiones ('-') y pueden tener hasta 35 caracteres de largo.

884 líneas
43KB

  1. from celery_app import celery_app
  2. from fastapi import Request,FastAPI
  3. import time,datetime
  4. from celery import Celery
  5. import celery.schedules
  6. from redbeat import RedBeatSchedulerEntry
  7. from datetime import timedelta
  8. from services.redis_service import RedisService
  9. from services.kafka_service import KafkaService
  10. from services.gewe_service import GeWeService
  11. # from common.log import logger
  12. from common.utils import *
  13. import asyncio,random
  14. from model.models import AddGroupContactsHistory
  15. import logging
  16. from model.models import AgentConfig
  17. import logging
  18. logger = logging.getLogger('redbeat')
  19. @celery_app.task(name='tasks.add_task', bind=True, acks_late=True)
  20. def add_task(self, x, y):
  21. time.sleep(5) # 模拟长时间计算
  22. logger.info('add')
  23. return x + y
  24. @celery_app.task(name='tasks.mul_task', bind=True, acks_late=True)
  25. def mul_task(self, x, y):
  26. time.sleep(5) # 模拟长时间计算
  27. return x * y
  28. # @celery.task(name='app.tasks.sync_contacts', bind=True, acks_late=True)
  29. # async def sync_contacts_task(self,app):
  30. # login_keys = list(await app.state.redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'))
  31. # return login_keys
  32. # # for k in login_keys:
  33. # # print(k)
  34. @celery_app.task(name='tasks.sync_contacts', bind=True, acks_late=True)
  35. async def sync_contacts_task(self, redis_service):
  36. # Use the redis_service passed as an argument
  37. login_keys = list(await redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'))
  38. return login_keys
  39. @celery_app.task(name='tasks.background_worker_task', bind=True, acks_late=True)
  40. def background_worker_task(self, redis_config, kafka_config, gewe_config):
  41. async def task():
  42. redis_service = RedisService()
  43. await redis_service.init(**redis_config)
  44. login_keys = []
  45. async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
  46. login_keys.append(key)
  47. print(login_keys)
  48. asyncio.run(task())
  49. # @celery.task(name='tasks.background_worker_task', bind=True, acks_late=True)
  50. # async def background_worker_task(self, redis_config, kafka_config, gewe_config):
  51. # # Initialize services inside the task
  52. # redis_service = RedisService()
  53. # await redis_service.init(**redis_config)
  54. # login_keys = []
  55. # async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): # 使用 async for 遍历异步生成器
  56. # login_keys.append(key)
  57. # print(login_keys)
  58. # kafka_service = KafkaService(**kafka_config)
  59. # await kafka_service.start()
  60. # gewe_service = await GeWeService.get_instance(None, gewe_config['api_url'])
  61. # # Task logic
  62. # lock_name = "background_wxchat_thread_lock"
  63. # lock_identifier = str(time.time())
  64. # while True:
  65. # if await redis_service.acquire_lock(lock_name, timeout=10):
  66. # try:
  67. # logger.info("分布式锁已成功获取")
  68. # # Perform task logic
  69. # finally:
  70. # await redis_service.release_lock(lock_name, lock_identifier)
  71. # break
  72. # else:
  73. # logger.info("获取分布式锁失败,等待10秒后重试...")
  74. # await asyncio.sleep(10)
  75. # @celery_app.task(name='tasks.scheduled_task', bind=True, acks_late=True)
  76. # def scheduled_task(self):
  77. # print("定时任务执行成功!~~~~~~~~~~~~~~~~~")
  78. # return "Hello from Celery Beat + RedBeat!"
  79. # @celery_app.task(name='tasks.scheduled_task_sync_wx', bind=True, acks_late=True)
  80. # def scheduled_task_sync_wx(self,redis_service,kafka_service,gewe_service):
  81. # print("scheduled_task_sync_wx 定时任务执行成功!")
  82. # return "Hello from Celery Beat + RedBeat!"
  83. # @celery_app.task(name='tasks.scheduled_task_sync_wx_info_1', bind=True, acks_late=True)
  84. # def scheduled_task_sync_wx_info_1(self,redis_config, kafka_config, gewe_config):
  85. # '''
  86. # 定时获取微信号资料
  87. # '''
  88. # loop = asyncio.new_event_loop()
  89. # asyncio.set_event_loop(loop)
  90. # async def task():
  91. # try:
  92. # redis_service = RedisService()
  93. # await redis_service.init(**redis_config)
  94. # # gewe_service = await GeWeService.get_instance(None, gewe_config['api_url'])
  95. # login_keys = []
  96. # async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
  97. # login_keys.append(key)
  98. # print(login_keys)
  99. # # for k in login_keys:
  100. # # r = await redis_service.get_hash(k)
  101. # # app_id = r.get("appId")
  102. # # token_id = r.get("tokenId")
  103. # # wxid = r.get("wxid")
  104. # # status = r.get('status')
  105. # # if status == '0':
  106. # # continue
  107. # # ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id)
  108. # # if ret != 200:
  109. # # logger.warning(f"同步微信号 {wxid} 资料失败: {ret}-{msg}")
  110. # # continue
  111. # # nickname=profile.get("nickName")
  112. # # head_img_url=profile.get("smallHeadImgUrl")
  113. # # r.update({"nickName":nickname,"headImgUrl":head_img_url,"modify_at":int(time.time())})
  114. # # cleaned_login_info = {k: (v if v is not None else '') for k, v in r.items()}
  115. # # await redis_service.set_hash(k, cleaned_login_info)
  116. # # logger.info(f"同步微信号 {wxid} 资料 成功")
  117. # # redis_service.update_hash_field(k,"nickName",nickname)
  118. # # redis_service.update_hash_field(k,"headImgUrl",head_img_url)
  119. # # redis_service.update_hash_field(k,"modify_at",int(time.time()))
  120. # except Exception as e:
  121. # logger.error(f"任务执行过程中发生异常: {e}")
  122. # print("scheduled_task_sync_wx_info 定时任务执行成功!")
  123. # return "Hello from Celery Beat + RedBeat!"
  124. # loop.run_until_complete(task())
  125. # loop.close()
  126. @celery_app.task(name='tasks.scheduled_task_sync_wx_info', bind=True, acks_late=True)
  127. def scheduled_task_sync_wx_info(self, redis_config, kafka_config, gewe_config):
  128. '''
  129. 定时获取微信号资料
  130. '''
  131. async def task():
  132. try:
  133. redis_service = RedisService()
  134. await redis_service.init(**redis_config)
  135. gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url'])
  136. login_keys = []
  137. async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
  138. login_keys.append(key)
  139. print(login_keys)
  140. for k in login_keys:
  141. r = await redis_service.get_hash(k)
  142. app_id = r.get("appId")
  143. token_id = r.get("tokenId")
  144. wxid = r.get("wxid")
  145. status = r.get('status')
  146. if status == '0':
  147. logger.warning(f"微信号 {wxid} 已经离线")
  148. continue
  149. ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id)
  150. if ret != 200:
  151. logger.warning(f"同步微信号 {wxid} 资料失败: {ret}-{msg}")
  152. continue
  153. nickname=profile.get("nickName")
  154. head_img_url=profile.get("smallHeadImgUrl")
  155. # print(nickname)
  156. nickname=profile.get("nickName")
  157. head_img_url=profile.get("smallHeadImgUrl")
  158. r.update({"nickName":nickname,"headImgUrl":head_img_url,"modify_at":int(time.time())})
  159. cleaned_login_info = {k: (v if v is not None else '') for k, v in r.items()}
  160. await redis_service.set_hash(k, cleaned_login_info)
  161. logger.info(f"定时同步微信号{wxid}-昵称{nickname} 资料成功")
  162. except Exception as e:
  163. logger.error(f"任务执行过程中发生异常: {e}")
  164. loop = asyncio.get_event_loop()
  165. if loop.is_closed():
  166. loop = asyncio.new_event_loop()
  167. asyncio.set_event_loop(loop)
  168. loop.run_until_complete(task()) # 在现有事件循环中运行任务
  169. #@celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms_p', bind=True, acks_late=True)
  170. def scheduled_task_add_contacts_from_chatrooms_p(self, redis_config, kafka_config, gewe_config):
  171. async def task():
  172. try:
  173. now = datetime.now()
  174. if now.hour < 8:
  175. logger.info(f"定时群成员定时添好友任务不启动,当前时间为 {now.strftime('%Y-%m-%d %H:%M:%S')},早于8点")
  176. return
  177. logger.info('定时群成员定时添好友任务开始')
  178. redis_service = RedisService()
  179. await redis_service.init(**redis_config)
  180. gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url'])
  181. login_keys = []
  182. async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
  183. login_keys.append(key)
  184. #print(login_keys)
  185. for k in login_keys:
  186. r = await redis_service.get_hash(k)
  187. app_id = r.get("appId")
  188. token_id = r.get("tokenId")
  189. wxid = r.get("wxid")
  190. status = r.get('status')
  191. if status == '0':
  192. logger.warning(f"微信号 {wxid} 已经离线,群成员不能定时添加")
  193. continue
  194. c = await gewe_service.get_wxchat_config_from_cache_async(wxid)
  195. contacts = await gewe_service.get_contacts_brief_from_cache_async(wxid)
  196. contact_wxids = [c.get('userName') for c in contacts]
  197. chatrooms = c.get('addContactsFromChatroomIdWhiteList', [])
  198. logger.info(f'{wxid} 定时群成员定时添好友任务开始')
  199. for chatroom_id in chatrooms:
  200. chatroom = await gewe_service.get_group_info_from_cache_async(wxid, chatroom_id)
  201. chatroom_member=await gewe_service.get_group_members_from_cache_async(wxid, chatroom_id)
  202. chatroom_nickname = chatroom.get('nickName')
  203. chatroom_owner_wxid = chatroom_member.get('chatroomOwner', None)
  204. admin_wxids = chatroom_member.get('adminWxid', [])
  205. admin_wxids = chatroom_member.get('adminWxid')
  206. if admin_wxids is None:
  207. admin_wxids = [] # 如果 admin_wxids 是 None,将其初始化为空列表
  208. # 判断当天群成员是否已经加了200个好友
  209. is_add_group_200_times = await gewe_service.is_group_add_contacts_history_one_day_200_async(wxid, chatroom_id)
  210. if is_add_group_200_times:
  211. logger.info(f"{wxid}在 {chatroom_nickname} 群成员已经加了200个好友,不再添加,群id:{chatroom_id}")
  212. continue
  213. logger.info(f'{chatroom_nickname} 的群主是 {chatroom_owner_wxid},管理员是{admin_wxids}')
  214. contact_wxids_set = set(contact_wxids)
  215. # for admin_wxid in admin_wxids:
  216. # contact_wxids_set.add(admin_wxid)
  217. if admin_wxids:
  218. contact_wxids_set.update(set(admin_wxids))
  219. contact_wxids_set.update(set(admin_wxids))
  220. if chatroom_owner_wxid is not None:
  221. contact_wxids_set.add(chatroom_owner_wxid)
  222. contact_wxids_set.add(wxid)
  223. # unavailable_wixds=await gewe_service.check_wixd_group_add_contacts_history_async(wxid,chatroom_id)
  224. # contact_wxids_set.update(unavailable_wixds)
  225. chatroot_member_list = chatroom.get('memberList', [])
  226. remaining_chatroot_members = [x for x in chatroot_member_list if x.get('wxid') not in contact_wxids_set]
  227. nickname = next((member['nickName'] for member in chatroot_member_list if member['wxid'] == wxid), None)
  228. logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里还可以邀请的好友有:{[x.get("nickName") for x in remaining_chatroot_members]}')
  229. for m in remaining_chatroot_members:
  230. contact_wxid= m.get('wxid')
  231. member_nickname=m.get("nickName")
  232. group_add_contacts_history = await gewe_service.get_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid)
  233. sorted_history = sorted(group_add_contacts_history, key=lambda x: x.addTime, reverse=True)
  234. # 已经邀请过两次,不再邀请
  235. if len(sorted_history)==2:
  236. logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经邀请过2次,不再邀请')
  237. continue
  238. # 当天邀请过,不再邀请
  239. if len(sorted_history) > 0:
  240. last_add_time = sorted_history[0].addTime
  241. def is_add_time_more_than_one_day(addTime: int) -> bool:
  242. """
  243. 判断 addTime 是否与当前时间相隔大于 3600 × 24 秒
  244. :param addTime: Unix 时间戳
  245. :return: 如果 addTime 与当前时间相隔大于 3600 × 24 秒,返回 True;否则返回 False
  246. """
  247. # 获取当前时间的时间戳
  248. current_time = time.time()
  249. # 计算时间戳差值
  250. time_difference = abs(current_time - addTime)
  251. # 检查是否大于 3600 × 24 秒
  252. return time_difference > 3600 * 24
  253. is_more_than_one_day= is_add_time_more_than_one_day(last_add_time)
  254. if not is_more_than_one_day:
  255. logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的{member_nickname}-{contact_wxid}已经当天邀请,不再邀请')
  256. continue
  257. ret, msg, data = await gewe_service.add_group_member_as_friend_async(token_id, app_id, chatroom_id, m.get('wxid'), f'我是群聊"{chatroom_nickname}"群的{nickname}')
  258. # if ret==200:
  259. # history=AddGroupContactsHistory.model_validate({
  260. # "chatroomId":chatroom_id,
  261. # "wxid":wxid,
  262. # "contactWixd":contact_wxid,
  263. # "addTime":int(time.time())
  264. # })
  265. # await gewe_service.save_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid,history)
  266. # else:
  267. # logger.info(f'群好友邀请失败原因:{data}')
  268. if ret!=200:
  269. logger.warning(f'群好友邀请失败原因:{ret} {data}')
  270. history=AddGroupContactsHistory.model_validate({
  271. "chatroomId":chatroom_id,
  272. "wxid":wxid,
  273. "contactWixd":contact_wxid,
  274. "addTime":int(time.time())
  275. })
  276. await gewe_service.save_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid,history)
  277. logger.info(f'{nickname} 向 {chatroom_nickname}-{chatroom_id} 群的 {m.get("nickName")}-{m.get("wxid")} 发送好友邀请 {msg}')
  278. await asyncio.sleep(random.uniform(1.5, 3))
  279. await asyncio.sleep(random.uniform(1.5, 3))
  280. except Exception as e:
  281. logger.error(f"任务执行过程中发生异常: {e}")
  282. loop = asyncio.get_event_loop()
  283. if loop.is_closed():
  284. loop = asyncio.new_event_loop()
  285. asyncio.set_event_loop(loop)
  286. loop.run_until_complete(task()) # 在现有事件循环中运行任务
  287. #@celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms_p2', bind=True, acks_late=True)
  288. def scheduled_task_add_contacts_from_chatrooms_p2(self, redis_config, kafka_config, gewe_config):
  289. '''
  290. 关于群加好友的请求规则:一次30人,间隔2小时做1次,一天做3次,即最多90人/天。
  291. '''
  292. async def task():
  293. try:
  294. now = datetime.now()
  295. if now.hour < 8:
  296. logger.info(f"定时群成员定时添好友任务不启动,当前时间为 {now.strftime('%Y-%m-%d %H:%M:%S')},早于8点")
  297. return
  298. logger.info('定时群成员定时添好友任务开始')
  299. redis_service = RedisService()
  300. await redis_service.init(**redis_config)
  301. gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url'])
  302. KAFKA_BOOTSTRAP_SERVERS=kafka_config['bootstrap_servers']
  303. KAFKA_TOPIC=kafka_config['topic']
  304. KAFKA_GROUP_ID=kafka_config['group_id']
  305. kafka_service= KafkaService(KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC, KAFKA_TOPIC,KAFKA_GROUP_ID)
  306. await kafka_service.start()
  307. login_keys = []
  308. async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
  309. login_keys.append(key)
  310. wixd_add_contacts_from_chatrooms_times = {}
  311. #print(login_keys)
  312. for k in login_keys:
  313. r = await redis_service.get_hash(k)
  314. app_id = r.get("appId")
  315. token_id = r.get("tokenId")
  316. wxid = r.get("wxid")
  317. status = r.get('status')
  318. if status == '0':
  319. logger.warning(f"微信号 {wxid} 已经离线,群成员不能定时添加")
  320. continue
  321. c = await gewe_service.get_wxchat_config_from_cache_async(wxid)
  322. contacts = await gewe_service.get_contacts_brief_from_cache_async(wxid)
  323. contact_wxids = [c.get('userName') for c in contacts]
  324. chatrooms = c.get('addContactsFromChatroomIdWhiteList', [])
  325. logger.info(f'{wxid} 定时群成员定时添好友任务开始')
  326. wixd_add_contacts_from_chatrooms_times[wxid] = 0
  327. for chatroom_id in chatrooms:
  328. chatroom = await gewe_service.get_group_info_from_cache_async(wxid, chatroom_id)
  329. chatroom_member=await gewe_service.get_group_members_from_cache_async(wxid, chatroom_id)
  330. chatroom_nickname = chatroom.get('nickName')
  331. chatroom_owner_wxid = chatroom_member.get('chatroomOwner', None)
  332. admin_wxids = chatroom_member.get('adminWxid', [])
  333. admin_wxids = chatroom_member.get('adminWxid')
  334. if admin_wxids is None:
  335. admin_wxids = [] # 如果 admin_wxids 是 None,将其初始化为空列表
  336. logger.info(f'{chatroom_nickname} 的群主是 {chatroom_owner_wxid},管理员是{admin_wxids}')
  337. contact_wxids_set = set(contact_wxids)
  338. # for admin_wxid in admin_wxids:
  339. # contact_wxids_set.add(admin_wxid)
  340. if admin_wxids:
  341. contact_wxids_set.update(set(admin_wxids))
  342. if chatroom_owner_wxid is not None:
  343. contact_wxids_set.add(chatroom_owner_wxid)
  344. contact_wxids_set.add(wxid)
  345. unavailable_wixds=await gewe_service.check_wixd_group_add_contacts_history_async(wxid,chatroom_id)
  346. if unavailable_wixds:
  347. contact_wxids_set.update(set(unavailable_wixds))
  348. chatroom_member_list = chatroom.get('memberList', [])
  349. if chatroom_member_list is None:
  350. chatroom_member_list = [] # 如果 memberList 是 None,将其初始化为空列表
  351. elif not isinstance(chatroom_member_list, list):
  352. chatroom_member_list = list(chatroom_member_list) # 如果 memberList 不是列表,将其转换为列表
  353. remaining_chatroot_members = [x for x in chatroom_member_list if x.get('wxid') not in contact_wxids_set]
  354. nickname = next((member['nickName'] for member in chatroom_member_list if member['wxid'] == wxid), None)
  355. logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里还可以邀请的好友有:{[x.get("nickName") for x in remaining_chatroot_members]}')
  356. for m in remaining_chatroot_members:
  357. # 判断本次任务是否已经邀请了30个好友
  358. if wixd_add_contacts_from_chatrooms_times[wxid] == 30:
  359. logger.info(f"{wxid} 本次任务已经邀请了30人,不再邀请")
  360. return
  361. # 判断当天群成员是否已经加了90个好友
  362. is_add_group_90_times = await gewe_service.is_group_add_contacts_history_one_day_90_async(wxid)
  363. if is_add_group_90_times:
  364. logger.info(f"当天 {wxid} 所有群的成员已经加了90个好友,不再添加")
  365. return
  366. contact_wxid= m.get('wxid')
  367. member_nickname=m.get("nickName")
  368. group_add_contacts_history = await gewe_service.get_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid)
  369. sorted_history = sorted(group_add_contacts_history, key=lambda x: x.addTime, reverse=True)
  370. # 已经邀请过两次,不再邀请
  371. if len(sorted_history)==2:
  372. logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经邀请过2次,不再邀请')
  373. continue
  374. # 当天邀请过,不再邀请
  375. if len(sorted_history) > 0:
  376. last_add_time = sorted_history[0].addTime
  377. def is_add_time_more_than_one_day(addTime: int) -> bool:
  378. """
  379. 判断 addTime 是否与当前时间相隔大于 3600 × 24 秒
  380. :param addTime: Unix 时间戳
  381. :return: 如果 addTime 与当前时间相隔大于 3600 × 24 秒,返回 True;否则返回 False
  382. """
  383. # 获取当前时间的时间戳
  384. current_time = time.time()
  385. # 计算时间戳差值
  386. time_difference = abs(current_time - addTime)
  387. # 检查是否大于 3600 × 24 秒
  388. return time_difference > 3600 * 24
  389. is_more_than_one_day= is_add_time_more_than_one_day(last_add_time)
  390. if not is_more_than_one_day:
  391. logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的{member_nickname}-{contact_wxid}已经当天邀请,不再邀请')
  392. continue
  393. ret, msg, data = await gewe_service.add_group_member_as_friend_async(token_id, app_id, chatroom_id, m.get('wxid'), f'我是群聊"{chatroom_nickname}"群的{nickname}')
  394. if ret!=200:
  395. logger.warning(f'群好友邀请失败原因:{ret} {data}')
  396. history=AddGroupContactsHistory.model_validate({
  397. "chatroomId":chatroom_id,
  398. "wxid":wxid,
  399. "contactWixd":contact_wxid,
  400. "addTime":int(time.time())
  401. })
  402. await gewe_service.save_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid,history)
  403. wixd_add_contacts_from_chatrooms_times[wxid]+=1
  404. logger.info(f'{nickname} 向 {chatroom_nickname}-{chatroom_id} 群的 {m.get("nickName")}-{m.get("wxid")} 发送好友邀请 {msg}')
  405. # 推送到kafka
  406. k_message = wx_add_contacts_from_chatroom_message(history.wxid,history.chatroomId,history.contactWixd,history.addTime)
  407. await kafka_service.send_message_async(k_message)
  408. await asyncio.sleep(random.uniform(1.5, 3))
  409. await asyncio.sleep(random.uniform(1.5, 3))
  410. except Exception as e:
  411. logger.error(f"任务执行过程中发生异常: {e}")
  412. loop = asyncio.get_event_loop()
  413. if loop.is_closed():
  414. loop = asyncio.new_event_loop()
  415. asyncio.set_event_loop(loop)
  416. loop.run_until_complete(task()) # 在现有事件循环中运行任务
  417. @celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms', bind=True, acks_late=True)
  418. def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config, gewe_config):
  419. '''
  420. 关于群加好友的请求规则: 每个智能体一次最多30人,间隔2小时做1次,即最多90人/天。
  421. 加好友规则:每天处理次数、间隔时间(分钟)、每次加好友人数这3个参数都可以设置。目前默认只是上面的设置。
  422. '''
  423. async def task():
  424. try:
  425. now = datetime.now()
  426. if now.hour < 8:
  427. logger.info(f"定时群成员定时添好友任务不启动,当前时间为 {now.strftime('%Y-%m-%d %H:%M:%S')},早于8点")
  428. return
  429. logger.info('定时群成员定时添好友任务开始')
  430. redis_service = RedisService()
  431. await redis_service.init(**redis_config)
  432. gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url'])
  433. KAFKA_BOOTSTRAP_SERVERS=kafka_config['bootstrap_servers']
  434. KAFKA_TOPIC=kafka_config['topic']
  435. KAFKA_GROUP_ID=kafka_config['group_id']
  436. kafka_service= KafkaService(KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC, KAFKA_TOPIC,KAFKA_GROUP_ID)
  437. await kafka_service.start_producer()
  438. global_config=await gewe_service.get_global_config_from_cache_async()
  439. scheduled_task_add_contacts_from_chatrooms_config=global_config.get('scheduledTaskAddContactsFromChatrooms',{})
  440. oneday_add_contacts_total=90
  441. once_add_contacts_total=30
  442. #oneday_times=3
  443. if scheduled_task_add_contacts_from_chatrooms_config:
  444. oneday_add_contacts_total=scheduled_task_add_contacts_from_chatrooms_config.get('oneDayAddContactsTotal',90)
  445. once_add_contacts_total=scheduled_task_add_contacts_from_chatrooms_config.get('onceAddContactsTotal',30)
  446. #oneday_times=scheduled_task_add_contacts_from_chatrooms_config.get('oneDayTimes',3)
  447. cache_task_run_time_logs= await gewe_service.get_task_run_time_async('scheduled_task_add_contacts_from_chatrooms')
  448. # if len(cache_task_run_time_logs) == oneday_times:
  449. # logger.info(f"今日定时群成员定时添好友任务已达上限 {oneday_times} 次!")
  450. # return
  451. if cache_task_run_time_logs:
  452. sorted_tasks = sorted(cache_task_run_time_logs, key=lambda x: x.get("runTime"), reverse=True)
  453. last_run_time=sorted_tasks[0].get("runTime")
  454. if last_run_time > 1e12: # 毫秒级时间戳
  455. last_run_time = last_run_time / 1000 # 转换为秒
  456. # 将时间戳转换为 datetime 对象
  457. last_run_time = datetime.fromtimestamp(last_run_time)
  458. # 获取当前时间
  459. current_time = datetime.now()
  460. # 计算时间差
  461. time_difference = current_time - last_run_time
  462. # 判断是否相差2小时
  463. if time_difference < timedelta(hours=2):
  464. logger.info(f"上次定时群成员定时添好友任务在2小时内,不再执行")
  465. return
  466. # 获取当前时间
  467. current_time = datetime.now()
  468. # 计算当天的结束时间(23:59:59)
  469. end_of_day = datetime(current_time.year, current_time.month, current_time.day, 23, 59, 59)
  470. # 计算时间差
  471. time_difference = end_of_day - current_time
  472. # 将时间差转换为秒数
  473. time_difference_seconds = int(time_difference.total_seconds())
  474. cache_task_run_time_logs.append({"runTime":int(time.time())})
  475. await gewe_service.save_task_run_time_async('scheduled_task_add_contacts_from_chatrooms',cache_task_run_time_logs,time_difference_seconds)
  476. login_keys = []
  477. async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
  478. login_keys.append(key)
  479. wixd_add_contacts_from_chatrooms_times = {}
  480. for k in login_keys:
  481. r = await redis_service.get_hash(k)
  482. app_id = r.get("appId")
  483. token_id = r.get("tokenId")
  484. wxid = r.get("wxid")
  485. status = r.get('status')
  486. if status == '0':
  487. logger.warning(f"微信号 {wxid} 已经离线,群成员不能定时添加")
  488. continue
  489. config=await gewe_service.get_wxchat_config_from_cache_async(wxid)
  490. validated_config = AgentConfig.model_validate(config)
  491. if not validated_config.agentEnabled:
  492. logger.warning(f"微信号 {wxid} 取消了托管,群成员不能定时添加")
  493. continue
  494. c = await gewe_service.get_wxchat_config_from_cache_async(wxid)
  495. contacts = await gewe_service.get_contacts_brief_from_cache_async(wxid)
  496. contact_wxids = [c.get('userName') for c in contacts]
  497. chatrooms = c.get('addContactsFromChatroomIdWhiteList', [])
  498. logger.info(f'{wxid} 定时群成员定时添好友任务开始')
  499. wixd_add_contacts_from_chatrooms_times[wxid] = 0
  500. for chatroom_id in chatrooms:
  501. chatroom = await gewe_service.get_group_info_from_cache_async(wxid, chatroom_id)
  502. chatroom_member=await gewe_service.get_group_members_from_cache_async(wxid, chatroom_id)
  503. chatroom_nickname = chatroom.get('nickName')
  504. chatroom_owner_wxid = chatroom_member.get('chatroomOwner', None)
  505. admin_wxids = chatroom_member.get('adminWxid', [])
  506. admin_wxids = chatroom_member.get('adminWxid')
  507. if admin_wxids is None:
  508. admin_wxids = [] # 如果 admin_wxids 是 None,将其初始化为空列表
  509. logger.info(f'{chatroom_nickname} 的群主是 {chatroom_owner_wxid},管理员是{admin_wxids}')
  510. contact_wxids_set = set(contact_wxids)
  511. # for admin_wxid in admin_wxids:
  512. # contact_wxids_set.add(admin_wxid)
  513. if admin_wxids:
  514. contact_wxids_set.update(set(admin_wxids))
  515. if chatroom_owner_wxid is not None:
  516. contact_wxids_set.add(chatroom_owner_wxid)
  517. contact_wxids_set.add(wxid)
  518. # unavailable_wixds=await gewe_service.check_wixd_group_add_contacts_history_async(wxid,chatroom_id)
  519. # contact_wxids_set.update(set(unavailable_wixds))
  520. # chatroom_member_list = chatroom.get('memberList', [])
  521. unavailable_wixds=await gewe_service.check_wixd_group_add_contacts_history_async(wxid,chatroom_id)
  522. if unavailable_wixds:
  523. contact_wxids_set.update(set(unavailable_wixds))
  524. chatroom_member_list = chatroom.get('memberList', [])
  525. if chatroom_member_list is None:
  526. chatroom_member_list = [] # 如果 memberList 是 None,将其初始化为空列表
  527. elif not isinstance(chatroom_member_list, list):
  528. chatroom_member_list = list(chatroom_member_list) # 如果 memberList 不是列表,将其转换为列表
  529. remaining_chatroot_members = [x for x in chatroom_member_list if x.get('wxid') not in contact_wxids_set]
  530. nickname = next((member['nickName'] for member in chatroom_member_list if member['wxid'] == wxid), None)
  531. if not remaining_chatroot_members:
  532. logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里没有好友可以邀请')
  533. # 任务状态推送到kafka
  534. k_message=wx_add_contacts_from_chatroom_task_status(wxid,chatroom_id,2)
  535. await kafka_service.send_message_async(k_message)
  536. continue
  537. logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里还可以邀请的好友有:{[x.get("nickName") for x in remaining_chatroot_members]}')
  538. for m in remaining_chatroot_members:
  539. # 判断本次任务是否已经邀请了30个好友
  540. if wixd_add_contacts_from_chatrooms_times[wxid] == once_add_contacts_total:
  541. logger.info(f"{wxid} 本次任务已经邀请了{once_add_contacts_total}人,不再邀请")
  542. return
  543. # 判断当天群成员是否已经加了90个好友
  544. is_add_group_times = await gewe_service.is_group_add_contacts_history_one_day_async(wxid,oneday_add_contacts_total)
  545. if is_add_group_times:
  546. logger.info(f"当天 {wxid} 所有群的成员已经加了{oneday_add_contacts_total}个好友,不再添加")
  547. return
  548. contact_wxid= m.get('wxid')
  549. member_nickname=m.get("nickName")
  550. group_add_contacts_history = await gewe_service.get_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid)
  551. sorted_history = sorted(group_add_contacts_history, key=lambda x: x.addTime, reverse=True)
  552. # 已经邀请过两次,不再邀请
  553. if len(sorted_history)==2:
  554. logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经邀请过2次,不再邀请')
  555. continue
  556. # 当天邀请过,不再邀请
  557. if len(sorted_history) > 0:
  558. last_add_time = sorted_history[0].addTime
  559. def is_add_time_more_than_one_day(addTime: int) -> bool:
  560. """
  561. 判断 addTime 是否与当前时间相隔大于 3600 × 24 秒
  562. :param addTime: Unix 时间戳
  563. :return: 如果 addTime 与当前时间相隔大于 3600 × 24 秒,返回 True;否则返回 False
  564. """
  565. # 获取当前时间的时间戳
  566. current_time = time.time()
  567. # 计算时间戳差值
  568. time_difference = abs(current_time - addTime)
  569. # 检查是否大于 3600 × 24 秒
  570. return time_difference > 3600 * 24
  571. is_more_than_one_day= is_add_time_more_than_one_day(last_add_time)
  572. if not is_more_than_one_day:
  573. logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的{member_nickname}-{contact_wxid}已经当天邀请,不再邀请')
  574. continue
  575. ret, msg, data = await gewe_service.add_group_member_as_friend_async(token_id, app_id, chatroom_id, m.get('wxid'), f'我是群聊"{chatroom_nickname}"群的{nickname}')
  576. if ret!=200:
  577. logger.warning(f'群好友邀请失败原因:{ret} {data}')
  578. history=AddGroupContactsHistory.model_validate({
  579. "chatroomId":chatroom_id,
  580. "wxid":wxid,
  581. "contactWixd":contact_wxid,
  582. "addTime":int(time.time())
  583. })
  584. await gewe_service.save_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid,history)
  585. wixd_add_contacts_from_chatrooms_times[wxid]+=1
  586. logger.info(f'{nickname} 向 {chatroom_nickname}-{chatroom_id} 群的 {m.get("nickName")}-{m.get("wxid")} 发送好友邀请 {msg}')
  587. # 推送到kafka
  588. k_message = wx_add_contacts_from_chatroom_message(history.wxid,history.chatroomId,history.contactWixd,history.addTime)
  589. await kafka_service.send_message_async(k_message)
  590. await asyncio.sleep(random.uniform(1.5, 3))
  591. # 任务状态推送到kafka
  592. task_status=await gewe_service.wx_add_contacts_from_chatroom_task_status_async(wxid,chatroom_id)
  593. wx_add_contacts_from_chatroom_task_status(wxid,chatroom_id,task_status)
  594. await kafka_service.send_message_async(k_message)
  595. # 下一个群
  596. await asyncio.sleep(random.uniform(1.5, 3))
  597. except Exception as e:
  598. logger.error(f"任务执行过程中发生异常: {e}")
  599. finally:
  600. await kafka_service.stop_producer()
  601. loop = asyncio.get_event_loop()
  602. if loop.is_closed():
  603. loop = asyncio.new_event_loop()
  604. asyncio.set_event_loop(loop)
  605. loop.run_until_complete(task()) # 在现有事件循环中运行任务
  606. REDIS_KEY_PATTERN = "friend_add_limit:{date}"
  607. REDIS_LAST_RUN_KEY = "last_run_time:add_friends_task"
  608. @celery_app.task(name='tasks.add_friends_task', bind=True, acks_late=True)
  609. def add_friends_task(self,redis_config):
  610. """
  611. 限制每天最多 15 个,每 2 小时最多 8 个
  612. """
  613. async def task():
  614. redis_service = RedisService()
  615. await redis_service.init(**redis_config)
  616. today_str = datetime.now().strftime("%Y%m%d")
  617. redis_key = REDIS_KEY_PATTERN.format(date=today_str)
  618. # 获取当前总添加数量
  619. total_added = await redis_service.get_hash_field(redis_key, "total") or 0
  620. last_2h_added =await redis_service.get_hash_field(redis_key, "last_2h") or 0
  621. total_added = int(total_added)
  622. last_2h_added = int(last_2h_added)
  623. logger.info(f"当前添加好友总数: {total_added}, 过去2小时添加: {last_2h_added}")
  624. # 判断是否超过限制
  625. if total_added >= 15:
  626. logger.warning("今日好友添加已达上限!")
  627. return
  628. if last_2h_added >= 8:
  629. logger.warning("过去2小时添加已达上限!")
  630. return
  631. # 计算本次要添加的好友数量 (控制每天 5-15 个)
  632. max_add = min(15 - total_added, 8 - last_2h_added)
  633. if max_add <= 0:
  634. return
  635. num_to_add = min(max_add, 1) # 每次最多加 1 个
  636. logger.info(f"本次添加 {num_to_add} 位好友")
  637. # TODO: 调用好友添加逻辑 (接口 or 业务逻辑)
  638. # success = add_friends(num_to_add)
  639. success = num_to_add # 假设成功添加 num_to_add 个
  640. # 更新 Redis 计数
  641. if success > 0:
  642. await redis_service.increment_hash_field(redis_key, "total", success)
  643. await redis_service.increment_hash_field(redis_key, "last_2h", success)
  644. # 设置 Redis 过期时间 (每日记录存 1 天, 2 小时记录存 2 小时)
  645. await redis_service.expire(redis_key, 86400) # 24小时
  646. await redis_service.expire_field(redis_key, "last_2h", 7200) # 2小时
  647. logger.info(f"成功添加 {success} 位好友, 今日总数 {total_added + success}")
  648. # 生成一个新的随机时间(5-15 分钟之间)
  649. # next_interval = random.randint(10, 20)
  650. # # 计算新的执行时间
  651. # next_run_time = datetime.datetime.now() + timedelta(seconds=next_interval)
  652. # # 重新注册 RedBeat 任务,确保下次执行时间不同
  653. # redbeat_entry = RedBeatSchedulerEntry(
  654. # name="redbeat:add_friends_task",
  655. # task="tasks.add_friends_task",
  656. # schedule=celery.schedules.schedule(timedelta(seconds=next_interval)),
  657. # args=[redis_config],
  658. # app=celery_app
  659. # )
  660. # # 设置任务的下次执行时间
  661. # redbeat_entry.last_run_at = next_run_time
  662. # redbeat_entry.save()
  663. # logger.info(f"下次任务将在 {next_run_time} 执行(间隔 {next_interval} 秒)")
  664. loop = asyncio.get_event_loop()
  665. if loop.is_closed():
  666. loop = asyncio.new_event_loop()
  667. asyncio.set_event_loop(loop)
  668. loop.run_until_complete(task()) # 在现有事件循环中运行任务
  669. @celery_app.task(name='tasks.random_scheduled_task', bind=True, acks_late=True)
  670. def random_scheduled_task(self,):
  671. print(f"Task executed at {datetime.now()}")
  672. # 随机生成下次执行时间(例如:10-60秒内的随机时间)
  673. next_run_in = random.randint(10, 60)
  674. print(f"Next execution will be in {next_run_in} seconds")
  675. # 设置下次执行时间
  676. entry = RedBeatSchedulerEntry(
  677. name='random-task',
  678. task='tasks.random_scheduled_task',
  679. schedule=timedelta(seconds=next_run_in),
  680. app=celery_app
  681. )
  682. entry.save()
  683. return f"Scheduled next run in {next_run_in} seconds"