Vous ne pouvez pas sélectionner plus de 25 sujets Les noms de sujets doivent commencer par une lettre ou un nombre, peuvent contenir des tirets ('-') et peuvent comporter jusqu'à 35 caractères.

943 lignes
46KB

  1. from celery_app import celery_app
  2. from fastapi import Request,FastAPI
  3. import time,datetime
  4. from celery import Celery
  5. import celery.schedules
  6. from redbeat import RedBeatSchedulerEntry
  7. from datetime import timedelta
  8. from services.redis_service import RedisService
  9. from services.kafka_service import KafkaService
  10. from services.gewe_service import GeWeService
  11. # from common.log import logger
  12. from common.utils import *
  13. import asyncio,random
  14. from model.models import AddGroupContactsHistory
  15. import logging
  16. from model.models import AgentConfig
  17. import logging
  18. import sys,traceback
  19. logger = logging.getLogger('redbeat')
  20. @celery_app.task(name='tasks.add_task', bind=True, acks_late=True)
  21. def add_task(self, x, y):
  22. time.sleep(5) # 模拟长时间计算
  23. logger.info('add')
  24. return x + y
  25. @celery_app.task(name='tasks.mul_task', bind=True, acks_late=True)
  26. def mul_task(self, x, y):
  27. time.sleep(5) # 模拟长时间计算
  28. return x * y
  29. # @celery.task(name='app.tasks.sync_contacts', bind=True, acks_late=True)
  30. # async def sync_contacts_task(self,app):
  31. # login_keys = list(await app.state.redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'))
  32. # return login_keys
  33. # # for k in login_keys:
  34. # # print(k)
  35. @celery_app.task(name='tasks.sync_contacts', bind=True, acks_late=True)
  36. async def sync_contacts_task(self, redis_service):
  37. # Use the redis_service passed as an argument
  38. login_keys = list(await redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'))
  39. return login_keys
  40. @celery_app.task(name='tasks.background_worker_task', bind=True, acks_late=True)
  41. def background_worker_task(self, redis_config, kafka_config, gewe_config):
  42. async def task():
  43. redis_service = RedisService()
  44. await redis_service.init(**redis_config)
  45. login_keys = []
  46. async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
  47. login_keys.append(key)
  48. print(login_keys)
  49. asyncio.run(task())
  50. # @celery.task(name='tasks.background_worker_task', bind=True, acks_late=True)
  51. # async def background_worker_task(self, redis_config, kafka_config, gewe_config):
  52. # # Initialize services inside the task
  53. # redis_service = RedisService()
  54. # await redis_service.init(**redis_config)
  55. # login_keys = []
  56. # async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): # 使用 async for 遍历异步生成器
  57. # login_keys.append(key)
  58. # print(login_keys)
  59. # kafka_service = KafkaService(**kafka_config)
  60. # await kafka_service.start()
  61. # gewe_service = await GeWeService.get_instance(None, gewe_config['api_url'])
  62. # # Task logic
  63. # lock_name = "background_wxchat_thread_lock"
  64. # lock_identifier = str(time.time())
  65. # while True:
  66. # if await redis_service.acquire_lock(lock_name, timeout=10):
  67. # try:
  68. # logger.info("分布式锁已成功获取")
  69. # # Perform task logic
  70. # finally:
  71. # await redis_service.release_lock(lock_name, lock_identifier)
  72. # break
  73. # else:
  74. # logger.info("获取分布式锁失败,等待10秒后重试...")
  75. # await asyncio.sleep(10)
  76. # @celery_app.task(name='tasks.scheduled_task', bind=True, acks_late=True)
  77. # def scheduled_task(self):
  78. # print("定时任务执行成功!~~~~~~~~~~~~~~~~~")
  79. # return "Hello from Celery Beat + RedBeat!"
  80. # @celery_app.task(name='tasks.scheduled_task_sync_wx', bind=True, acks_late=True)
  81. # def scheduled_task_sync_wx(self,redis_service,kafka_service,gewe_service):
  82. # print("scheduled_task_sync_wx 定时任务执行成功!")
  83. # return "Hello from Celery Beat + RedBeat!"
  84. # @celery_app.task(name='tasks.scheduled_task_sync_wx_info_1', bind=True, acks_late=True)
  85. # def scheduled_task_sync_wx_info_1(self,redis_config, kafka_config, gewe_config):
  86. # '''
  87. # 定时获取微信号资料
  88. # '''
  89. # loop = asyncio.new_event_loop()
  90. # asyncio.set_event_loop(loop)
  91. # async def task():
  92. # try:
  93. # redis_service = RedisService()
  94. # await redis_service.init(**redis_config)
  95. # # gewe_service = await GeWeService.get_instance(None, gewe_config['api_url'])
  96. # login_keys = []
  97. # async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
  98. # login_keys.append(key)
  99. # print(login_keys)
  100. # # for k in login_keys:
  101. # # r = await redis_service.get_hash(k)
  102. # # app_id = r.get("appId")
  103. # # token_id = r.get("tokenId")
  104. # # wxid = r.get("wxid")
  105. # # status = r.get('status')
  106. # # if status == '0':
  107. # # continue
  108. # # ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id)
  109. # # if ret != 200:
  110. # # logger.warning(f"同步微信号 {wxid} 资料失败: {ret}-{msg}")
  111. # # continue
  112. # # nickname=profile.get("nickName")
  113. # # head_img_url=profile.get("smallHeadImgUrl")
  114. # # r.update({"nickName":nickname,"headImgUrl":head_img_url,"modify_at":int(time.time())})
  115. # # cleaned_login_info = {k: (v if v is not None else '') for k, v in r.items()}
  116. # # await redis_service.set_hash(k, cleaned_login_info)
  117. # # logger.info(f"同步微信号 {wxid} 资料 成功")
  118. # # redis_service.update_hash_field(k,"nickName",nickname)
  119. # # redis_service.update_hash_field(k,"headImgUrl",head_img_url)
  120. # # redis_service.update_hash_field(k,"modify_at",int(time.time()))
  121. # except Exception as e:
  122. # logger.error(f"任务执行过程中发生异常: {e}")
  123. # print("scheduled_task_sync_wx_info 定时任务执行成功!")
  124. # return "Hello from Celery Beat + RedBeat!"
  125. # loop.run_until_complete(task())
  126. # loop.close()
  127. @celery_app.task(name='tasks.scheduled_task_sync_wx_info', bind=True, acks_late=True)
  128. def scheduled_task_sync_wx_info(self, redis_config, kafka_config, gewe_config):
  129. '''
  130. 定时获取微信号资料
  131. '''
  132. async def task():
  133. try:
  134. redis_service = RedisService()
  135. await redis_service.init(**redis_config)
  136. gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url'])
  137. login_keys = []
  138. async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
  139. login_keys.append(key)
  140. print(login_keys)
  141. for k in login_keys:
  142. r = await redis_service.get_hash(k)
  143. app_id = r.get("appId")
  144. token_id = r.get("tokenId")
  145. wxid = r.get("wxid")
  146. status = r.get('status')
  147. if status == '0':
  148. logger.warning(f"微信号 {wxid} 已经离线")
  149. continue
  150. ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id)
  151. if ret != 200:
  152. logger.warning(f"同步微信号 {wxid} 资料失败: {ret}-{msg}")
  153. continue
  154. nickname=profile.get("nickName")
  155. head_img_url=profile.get("smallHeadImgUrl")
  156. # print(nickname)
  157. nickname=profile.get("nickName")
  158. head_img_url=profile.get("smallHeadImgUrl")
  159. r.update({"nickName":nickname,"headImgUrl":head_img_url,"modify_at":int(time.time())})
  160. cleaned_login_info = {k: (v if v is not None else '') for k, v in r.items()}
  161. await redis_service.set_hash(k, cleaned_login_info)
  162. logger.info(f"定时同步微信号{wxid}-昵称{nickname} 资料成功")
  163. except Exception as e:
  164. logger.error(f"任务执行过程中发生异常: {e}")
  165. loop = asyncio.get_event_loop()
  166. if loop.is_closed():
  167. loop = asyncio.new_event_loop()
  168. asyncio.set_event_loop(loop)
  169. loop.run_until_complete(task()) # 在现有事件循环中运行任务
  170. #@celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms_p', bind=True, acks_late=True)
  171. def scheduled_task_add_contacts_from_chatrooms_p(self, redis_config, kafka_config, gewe_config):
  172. async def task():
  173. try:
  174. now = datetime.now()
  175. if now.hour < 8:
  176. logger.info(f"定时群成员定时添好友任务不启动,当前时间为 {now.strftime('%Y-%m-%d %H:%M:%S')},早于8点")
  177. return
  178. logger.info('定时群成员定时添好友任务开始')
  179. redis_service = RedisService()
  180. await redis_service.init(**redis_config)
  181. gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url'])
  182. login_keys = []
  183. async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
  184. login_keys.append(key)
  185. #print(login_keys)
  186. for k in login_keys:
  187. r = await redis_service.get_hash(k)
  188. app_id = r.get("appId")
  189. token_id = r.get("tokenId")
  190. wxid = r.get("wxid")
  191. status = r.get('status')
  192. if status == '0':
  193. logger.warning(f"微信号 {wxid} 已经离线,群成员不能定时添加")
  194. continue
  195. c = await gewe_service.get_wxchat_config_from_cache_async(wxid)
  196. contacts = await gewe_service.get_contacts_brief_from_cache_async(wxid)
  197. contact_wxids = [c.get('userName') for c in contacts]
  198. chatrooms = c.get('addContactsFromChatroomIdWhiteList', [])
  199. logger.info(f'{wxid} 定时群成员定时添好友任务开始')
  200. for chatroom_id in chatrooms:
  201. chatroom = await gewe_service.get_group_info_from_cache_async(wxid, chatroom_id)
  202. chatroom_member=await gewe_service.get_group_members_from_cache_async(wxid, chatroom_id)
  203. chatroom_nickname = chatroom.get('nickName')
  204. chatroom_owner_wxid = chatroom_member.get('chatroomOwner', None)
  205. admin_wxids = chatroom_member.get('adminWxid', [])
  206. admin_wxids = chatroom_member.get('adminWxid')
  207. if admin_wxids is None:
  208. admin_wxids = [] # 如果 admin_wxids 是 None,将其初始化为空列表
  209. # 判断当天群成员是否已经加了200个好友
  210. is_add_group_200_times = await gewe_service.is_group_add_contacts_history_one_day_200_async(wxid, chatroom_id)
  211. if is_add_group_200_times:
  212. logger.info(f"{wxid}在 {chatroom_nickname} 群成员已经加了200个好友,不再添加,群id:{chatroom_id}")
  213. continue
  214. logger.info(f'{chatroom_nickname} 的群主是 {chatroom_owner_wxid},管理员是{admin_wxids}')
  215. contact_wxids_set = set(contact_wxids)
  216. # for admin_wxid in admin_wxids:
  217. # contact_wxids_set.add(admin_wxid)
  218. if admin_wxids:
  219. contact_wxids_set.update(set(admin_wxids))
  220. contact_wxids_set.update(set(admin_wxids))
  221. if chatroom_owner_wxid is not None:
  222. contact_wxids_set.add(chatroom_owner_wxid)
  223. contact_wxids_set.add(wxid)
  224. # unavailable_wixds=await gewe_service.check_wixd_group_add_contacts_history_async(wxid,chatroom_id)
  225. # contact_wxids_set.update(unavailable_wixds)
  226. chatroot_member_list = chatroom.get('memberList', [])
  227. remaining_chatroot_members = [x for x in chatroot_member_list if x.get('wxid') not in contact_wxids_set]
  228. nickname = next((member['nickName'] for member in chatroot_member_list if member['wxid'] == wxid), None)
  229. logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里还可以邀请的好友有:{[x.get("nickName") for x in remaining_chatroot_members]}')
  230. for m in remaining_chatroot_members:
  231. contact_wxid= m.get('wxid')
  232. member_nickname=m.get("nickName")
  233. group_add_contacts_history = await gewe_service.get_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid)
  234. sorted_history = sorted(group_add_contacts_history, key=lambda x: x.addTime, reverse=True)
  235. # 已经邀请过两次,不再邀请
  236. if len(sorted_history)==2:
  237. logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经邀请过2次,不再邀请')
  238. continue
  239. # 当天邀请过,不再邀请
  240. if len(sorted_history) > 0:
  241. last_add_time = sorted_history[0].addTime
  242. def is_add_time_more_than_one_day(addTime: int) -> bool:
  243. """
  244. 判断 addTime 是否与当前时间相隔大于 3600 × 24 秒
  245. :param addTime: Unix 时间戳
  246. :return: 如果 addTime 与当前时间相隔大于 3600 × 24 秒,返回 True;否则返回 False
  247. """
  248. # 获取当前时间的时间戳
  249. current_time = time.time()
  250. # 计算时间戳差值
  251. time_difference = abs(current_time - addTime)
  252. # 检查是否大于 3600 × 24 秒
  253. return time_difference > 3600 * 24
  254. is_more_than_one_day= is_add_time_more_than_one_day(last_add_time)
  255. if not is_more_than_one_day:
  256. logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的{member_nickname}-{contact_wxid}已经当天邀请,不再邀请')
  257. continue
  258. ret, msg, data = await gewe_service.add_group_member_as_friend_async(token_id, app_id, chatroom_id, m.get('wxid'), f'我是群聊"{chatroom_nickname}"群的{nickname}')
  259. # if ret==200:
  260. # history=AddGroupContactsHistory.model_validate({
  261. # "chatroomId":chatroom_id,
  262. # "wxid":wxid,
  263. # "contactWixd":contact_wxid,
  264. # "addTime":int(time.time())
  265. # })
  266. # await gewe_service.save_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid,history)
  267. # else:
  268. # logger.info(f'群好友邀请失败原因:{data}')
  269. if ret!=200:
  270. logger.warning(f'群好友邀请失败原因:{ret} {data}')
  271. history=AddGroupContactsHistory.model_validate({
  272. "chatroomId":chatroom_id,
  273. "wxid":wxid,
  274. "contactWixd":contact_wxid,
  275. "addTime":int(time.time())
  276. })
  277. await gewe_service.save_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid,history)
  278. logger.info(f'{nickname} 向 {chatroom_nickname}-{chatroom_id} 群的 {m.get("nickName")}-{m.get("wxid")} 发送好友邀请 {msg}')
  279. await asyncio.sleep(random.uniform(1.5, 3))
  280. await asyncio.sleep(random.uniform(1.5, 3))
  281. except Exception as e:
  282. logger.error(f"任务执行过程中发生异常: {e}")
  283. loop = asyncio.get_event_loop()
  284. if loop.is_closed():
  285. loop = asyncio.new_event_loop()
  286. asyncio.set_event_loop(loop)
  287. loop.run_until_complete(task()) # 在现有事件循环中运行任务
  288. #@celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms_p2', bind=True, acks_late=True)
  289. def scheduled_task_add_contacts_from_chatrooms_p2(self, redis_config, kafka_config, gewe_config):
  290. '''
  291. 关于群加好友的请求规则:一次30人,间隔2小时做1次,一天做3次,即最多90人/天。
  292. '''
  293. async def task():
  294. try:
  295. now = datetime.now()
  296. if now.hour < 8:
  297. logger.info(f"定时群成员定时添好友任务不启动,当前时间为 {now.strftime('%Y-%m-%d %H:%M:%S')},早于8点")
  298. return
  299. logger.info('定时群成员定时添好友任务开始')
  300. redis_service = RedisService()
  301. await redis_service.init(**redis_config)
  302. gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url'])
  303. KAFKA_BOOTSTRAP_SERVERS=kafka_config['bootstrap_servers']
  304. KAFKA_TOPIC=kafka_config['topic']
  305. KAFKA_GROUP_ID=kafka_config['group_id']
  306. kafka_service= KafkaService(KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC, KAFKA_TOPIC,KAFKA_GROUP_ID)
  307. await kafka_service.start()
  308. login_keys = []
  309. async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
  310. login_keys.append(key)
  311. wixd_add_contacts_from_chatrooms_times = {}
  312. #print(login_keys)
  313. for k in login_keys:
  314. r = await redis_service.get_hash(k)
  315. app_id = r.get("appId")
  316. token_id = r.get("tokenId")
  317. wxid = r.get("wxid")
  318. status = r.get('status')
  319. if status == '0':
  320. logger.warning(f"微信号 {wxid} 已经离线,群成员不能定时添加")
  321. continue
  322. c = await gewe_service.get_wxchat_config_from_cache_async(wxid)
  323. contacts = await gewe_service.get_contacts_brief_from_cache_async(wxid)
  324. contact_wxids = [c.get('userName') for c in contacts]
  325. chatrooms = c.get('addContactsFromChatroomIdWhiteList', [])
  326. logger.info(f'{wxid} 定时群成员定时添好友任务开始')
  327. wixd_add_contacts_from_chatrooms_times[wxid] = 0
  328. for chatroom_id in chatrooms:
  329. chatroom = await gewe_service.get_group_info_from_cache_async(wxid, chatroom_id)
  330. chatroom_member=await gewe_service.get_group_members_from_cache_async(wxid, chatroom_id)
  331. chatroom_nickname = chatroom.get('nickName')
  332. chatroom_owner_wxid = chatroom_member.get('chatroomOwner', None)
  333. admin_wxids = chatroom_member.get('adminWxid', [])
  334. admin_wxids = chatroom_member.get('adminWxid')
  335. if admin_wxids is None:
  336. admin_wxids = [] # 如果 admin_wxids 是 None,将其初始化为空列表
  337. logger.info(f'{chatroom_nickname} 的群主是 {chatroom_owner_wxid},管理员是{admin_wxids}')
  338. contact_wxids_set = set(contact_wxids)
  339. # for admin_wxid in admin_wxids:
  340. # contact_wxids_set.add(admin_wxid)
  341. if admin_wxids:
  342. contact_wxids_set.update(set(admin_wxids))
  343. if chatroom_owner_wxid is not None:
  344. contact_wxids_set.add(chatroom_owner_wxid)
  345. contact_wxids_set.add(wxid)
  346. unavailable_wixds=await gewe_service.check_wixd_group_add_contacts_history_async(wxid,chatroom_id)
  347. if unavailable_wixds:
  348. contact_wxids_set.update(set(unavailable_wixds))
  349. chatroom_member_list = chatroom.get('memberList', [])
  350. if chatroom_member_list is None:
  351. chatroom_member_list = [] # 如果 memberList 是 None,将其初始化为空列表
  352. elif not isinstance(chatroom_member_list, list):
  353. chatroom_member_list = list(chatroom_member_list) # 如果 memberList 不是列表,将其转换为列表
  354. remaining_chatroot_members = [x for x in chatroom_member_list if x.get('wxid') not in contact_wxids_set]
  355. nickname = next((member['nickName'] for member in chatroom_member_list if member['wxid'] == wxid), None)
  356. logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里还可以邀请的好友有:{[x.get("nickName") for x in remaining_chatroot_members]}')
  357. for m in remaining_chatroot_members:
  358. # 判断本次任务是否已经邀请了30个好友
  359. if wixd_add_contacts_from_chatrooms_times[wxid] == 30:
  360. logger.info(f"{wxid} 本次任务已经邀请了30人,不再邀请")
  361. return
  362. # 判断当天群成员是否已经加了90个好友
  363. is_add_group_90_times = await gewe_service.is_group_add_contacts_history_one_day_90_async(wxid)
  364. if is_add_group_90_times:
  365. logger.info(f"当天 {wxid} 所有群的成员已经加了90个好友,不再添加")
  366. return
  367. contact_wxid= m.get('wxid')
  368. member_nickname=m.get("nickName")
  369. group_add_contacts_history = await gewe_service.get_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid)
  370. sorted_history = sorted(group_add_contacts_history, key=lambda x: x.addTime, reverse=True)
  371. # 已经邀请过两次,不再邀请
  372. if len(sorted_history)==2:
  373. logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经邀请过2次,不再邀请')
  374. continue
  375. # 当天邀请过,不再邀请
  376. if len(sorted_history) > 0:
  377. last_add_time = sorted_history[0].addTime
  378. def is_add_time_more_than_one_day(addTime: int) -> bool:
  379. """
  380. 判断 addTime 是否与当前时间相隔大于 3600 × 24 秒
  381. :param addTime: Unix 时间戳
  382. :return: 如果 addTime 与当前时间相隔大于 3600 × 24 秒,返回 True;否则返回 False
  383. """
  384. # 获取当前时间的时间戳
  385. current_time = time.time()
  386. # 计算时间戳差值
  387. time_difference = abs(current_time - addTime)
  388. # 检查是否大于 3600 × 24 秒
  389. return time_difference > 3600 * 24
  390. is_more_than_one_day= is_add_time_more_than_one_day(last_add_time)
  391. if not is_more_than_one_day:
  392. logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的{member_nickname}-{contact_wxid}已经当天邀请,不再邀请')
  393. continue
  394. ret, msg, data = await gewe_service.add_group_member_as_friend_async(token_id, app_id, chatroom_id, m.get('wxid'), f'我是群聊"{chatroom_nickname}"群的{nickname}')
  395. if ret!=200:
  396. logger.warning(f'群好友邀请失败原因:{ret} {data}')
  397. history=AddGroupContactsHistory.model_validate({
  398. "chatroomId":chatroom_id,
  399. "wxid":wxid,
  400. "contactWixd":contact_wxid,
  401. "addTime":int(time.time())
  402. })
  403. await gewe_service.save_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid,history)
  404. wixd_add_contacts_from_chatrooms_times[wxid]+=1
  405. logger.info(f'{nickname} 向 {chatroom_nickname}-{chatroom_id} 群的 {m.get("nickName")}-{m.get("wxid")} 发送好友邀请 {msg}')
  406. # 推送到kafka
  407. k_message = wx_add_contacts_from_chatroom_message(history.wxid,history.chatroomId,history.contactWixd,history.addTime)
  408. await kafka_service.send_message_async(k_message)
  409. await asyncio.sleep(random.uniform(1.5, 3))
  410. await asyncio.sleep(random.uniform(1.5, 3))
  411. except Exception as e:
  412. logger.error(f"任务执行过程中发生异常: {e}")
  413. loop = asyncio.get_event_loop()
  414. if loop.is_closed():
  415. loop = asyncio.new_event_loop()
  416. asyncio.set_event_loop(loop)
  417. loop.run_until_complete(task()) # 在现有事件循环中运行任务
  418. @celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms', bind=True, acks_late=True)
  419. def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config, gewe_config):
  420. '''
  421. 关于群加好友的请求规则: 每个智能体一次最多30人,间隔2小时做1次,即最多90人/天。
  422. 加好友规则:每天处理次数、间隔时间(分钟)、每次加好友人数这3个参数都可以设置。目前默认只是上面的设置。
  423. '''
  424. async def task():
  425. try:
  426. now = datetime.now()
  427. if now.hour < 8:
  428. logger.info(f"定时群成员定时添好友任务不启动,当前时间为 {now.strftime('%Y-%m-%d %H:%M:%S')},早于8点")
  429. return
  430. logger.info('定时群成员定时添好友任务开始')
  431. redis_service = RedisService()
  432. await redis_service.init(**redis_config)
  433. gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url'])
  434. KAFKA_BOOTSTRAP_SERVERS=kafka_config['bootstrap_servers']
  435. KAFKA_TOPIC=kafka_config['topic']
  436. KAFKA_GROUP_ID=kafka_config['group_id']
  437. kafka_service= KafkaService(KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC, KAFKA_TOPIC,KAFKA_GROUP_ID)
  438. await kafka_service.start_producer()
  439. global_config=await gewe_service.get_global_config_from_cache_async()
  440. scheduled_task_add_contacts_from_chatrooms_config=global_config.get('scheduledTaskAddContactsFromChatrooms',{})
  441. oneday_add_contacts_total=90
  442. once_add_contacts_total=30
  443. #oneday_times=3
  444. if scheduled_task_add_contacts_from_chatrooms_config:
  445. oneday_add_contacts_total=scheduled_task_add_contacts_from_chatrooms_config.get('oneDayAddContactsTotal',90)
  446. once_add_contacts_total=scheduled_task_add_contacts_from_chatrooms_config.get('onceAddContactsTotal',30)
  447. #oneday_times=scheduled_task_add_contacts_from_chatrooms_config.get('oneDayTimes',3)
  448. # cache_task_run_time_logs= await gewe_service.get_task_run_time_async('scheduled_task_add_contacts_from_chatrooms')
  449. # if cache_task_run_time_logs:
  450. # sorted_tasks = sorted(cache_task_run_time_logs, key=lambda x: x.get("runTime"), reverse=True)
  451. # last_run_time=sorted_tasks[0].get("runTime")
  452. # if last_run_time > 1e12: # 毫秒级时间戳
  453. # last_run_time = last_run_time / 1000 # 转换为秒
  454. # # 将时间戳转换为 datetime 对象
  455. # last_run_time = datetime.fromtimestamp(last_run_time)
  456. # # 获取当前时间
  457. # current_time = datetime.now()
  458. # # 计算时间差
  459. # time_difference = current_time - last_run_time
  460. # # 判断是否相差2小时
  461. # if time_difference < timedelta(hours=2):
  462. # logger.info(f"上次定时群成员定时添好友任务在2小时内,不再执行")
  463. # return
  464. # time_difference_seconds = today_seconds_remaining()
  465. # cache_task_run_time_logs.append({"runTime":int(time.time())})
  466. # await gewe_service.save_task_run_time_async('scheduled_task_add_contacts_from_chatrooms',cache_task_run_time_logs,time_difference_seconds)
  467. login_keys = []
  468. async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
  469. login_keys.append(key)
  470. wixd_add_contacts_from_chatrooms_times = {}
  471. for k in login_keys:
  472. r = await redis_service.get_hash(k)
  473. app_id = r.get("appId")
  474. token_id = r.get("tokenId")
  475. wxid = r.get("wxid")
  476. status = r.get('status')
  477. if status == '0':
  478. logger.warning(f"微信号 {wxid} 已经离线,群成员不能定时添加")
  479. continue
  480. config=await gewe_service.get_wxchat_config_from_cache_async(wxid)
  481. validated_config = AgentConfig.model_validate(config)
  482. if not validated_config.agentEnabled:
  483. logger.warning(f"微信号 {wxid} 取消了托管,群成员不能定时添加")
  484. continue
  485. # 判断是否过于频繁
  486. is_wx_expection = await gewe_service.get_wx_expection_async(wxid,"addGroupMemberAsFriend")
  487. if is_wx_expection:
  488. logger.info(f"{wxid} 本次任务接口addGroupMemberAsFriend异常, {is_wx_expection},本次群好友邀请任务未开始,跳过任务。")
  489. return
  490. cache_task_run_time_wxid_logs= await gewe_service.get_task_run_time_by_wxid_async(wxid,'scheduled_task_add_contacts_from_chatrooms')
  491. if cache_task_run_time_wxid_logs:
  492. sorted_tasks = sorted(cache_task_run_time_wxid_logs, key=lambda x: x.get("runTime"), reverse=True)
  493. last_run_time=sorted_tasks[0].get("runTime")
  494. if last_run_time > 1e12: # 毫秒级时间戳
  495. last_run_time = last_run_time / 1000 # 转换为秒
  496. # 将时间戳转换为 datetime 对象
  497. last_run_time = datetime.fromtimestamp(last_run_time)
  498. # 获取当前时间
  499. current_time = datetime.now()
  500. # 计算时间差
  501. time_difference = current_time - last_run_time
  502. # 判断是否相差2小时
  503. if time_difference < timedelta(hours=2):
  504. logger.info(f"{wxid}上次定时群成员定时添好友任务在2小时内,不再执行")
  505. return
  506. cache_task_run_time_wxid_logs.append({"runTime":int(time.time())})
  507. await gewe_service.save_task_run_time_by_wxid_async(wxid,'scheduled_task_add_contacts_from_chatrooms',cache_task_run_time_wxid_logs,3600*2)
  508. c = await gewe_service.get_wxchat_config_from_cache_async(wxid)
  509. contacts = await gewe_service.get_contacts_brief_from_cache_async(wxid)
  510. contact_wxids = [c.get('userName') for c in contacts]
  511. chatrooms = c.get('addContactsFromChatroomIdWhiteList', [])
  512. logger.info(f'{wxid} 定时群成员定时添好友任务开始')
  513. wixd_add_contacts_from_chatrooms_times[wxid] = 0
  514. for chatroom_id in chatrooms:
  515. chatroom = await gewe_service.get_group_info_from_cache_async(wxid, chatroom_id)
  516. chatroom_member=await gewe_service.get_group_members_from_cache_async(wxid, chatroom_id)
  517. chatroom_nickname = chatroom.get('nickName')
  518. chatroom_owner_wxid = chatroom_member.get('chatroomOwner', None)
  519. admin_wxids = chatroom_member.get('adminWxid', [])
  520. admin_wxids = chatroom_member.get('adminWxid')
  521. if admin_wxids is None:
  522. admin_wxids = [] # 如果 admin_wxids 是 None,将其初始化为空列表
  523. logger.info(f'{chatroom_nickname} 的群主是 {chatroom_owner_wxid},管理员是{admin_wxids}')
  524. contact_wxids_set = set(contact_wxids)
  525. # for admin_wxid in admin_wxids:
  526. # contact_wxids_set.add(admin_wxid)
  527. if admin_wxids:
  528. contact_wxids_set.update(set(admin_wxids))
  529. if chatroom_owner_wxid is not None:
  530. contact_wxids_set.add(chatroom_owner_wxid)
  531. contact_wxids_set.add(wxid)
  532. # unavailable_wixds=await gewe_service.check_wixd_group_add_contacts_history_async(wxid,chatroom_id)
  533. # contact_wxids_set.update(set(unavailable_wixds))
  534. # chatroom_member_list = chatroom.get('memberList', [])
  535. unavailable_wixds=await gewe_service.check_wixd_group_add_contacts_history_async(wxid,chatroom_id)
  536. if unavailable_wixds:
  537. contact_wxids_set.update(set(unavailable_wixds))
  538. chatroom_member_list = chatroom.get('memberList', [])
  539. if chatroom_member_list is None:
  540. chatroom_member_list = [] # 如果 memberList 是 None,将其初始化为空列表
  541. elif not isinstance(chatroom_member_list, list):
  542. chatroom_member_list = list(chatroom_member_list) # 如果 memberList 不是列表,将其转换为列表
  543. remaining_chatroot_members = [x for x in chatroom_member_list if x.get('wxid') not in contact_wxids_set]
  544. nickname = next((member['nickName'] for member in chatroom_member_list if member['wxid'] == wxid), None)
  545. if not remaining_chatroot_members:
  546. logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里没有好友可以邀请')
  547. # 任务状态推送到kafka
  548. k_message=wx_add_contacts_from_chatroom_task_status(wxid,chatroom_id,2)
  549. await kafka_service.send_message_async(k_message)
  550. continue
  551. logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里还可以邀请的好友有:{[x.get("nickName") for x in remaining_chatroot_members]}')
  552. for m in remaining_chatroot_members:
  553. # 判断本次任务是否已经邀请了30个好友
  554. if wixd_add_contacts_from_chatrooms_times[wxid] == once_add_contacts_total:
  555. logger.info(f"{wxid} 本次任务已经邀请了{once_add_contacts_total}人,不再邀请")
  556. return
  557. # 判断当天群成员是否已经加了90个好友
  558. is_add_group_times = await gewe_service.is_group_add_contacts_history_one_day_async(wxid,oneday_add_contacts_total)
  559. if is_add_group_times:
  560. logger.info(f"当天 {wxid} 所有群的成员已经加了{oneday_add_contacts_total}个好友,不再添加")
  561. return
  562. # 判断是否过于频繁
  563. is_wx_expection = await gewe_service.get_wx_expection_async(wxid,"addGroupMemberAsFriend")
  564. if is_wx_expection:
  565. logger.info(f"{wxid} 本次任务接口addGroupMemberAsFriend异常,不再邀请,{is_wx_expection}")
  566. return
  567. contact_wxid= m.get('wxid')
  568. member_nickname=m.get("nickName")
  569. group_add_contacts_history = await gewe_service.get_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid)
  570. sorted_history = sorted(group_add_contacts_history, key=lambda x: x.addTime, reverse=True)
  571. # 已经邀请过两次,不再邀请
  572. if len(sorted_history)==2:
  573. logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经邀请过2次,不再邀请')
  574. continue
  575. # 当天邀请过,不再邀请
  576. if len(sorted_history) > 0:
  577. last_add_time = sorted_history[0].addTime
  578. def is_add_time_more_than_one_day(addTime: int) -> bool:
  579. """
  580. 判断 addTime 是否与当前时间相隔大于 3600 × 24 秒
  581. :param addTime: Unix 时间戳
  582. :return: 如果 addTime 与当前时间相隔大于 3600 × 24 秒,返回 True;否则返回 False
  583. """
  584. # 获取当前时间的时间戳
  585. current_time = time.time()
  586. # 计算时间戳差值
  587. time_difference = abs(current_time - addTime)
  588. # 检查是否大于 3600 × 24 秒
  589. return time_difference > 3600 * 24
  590. is_more_than_one_day= is_add_time_more_than_one_day(last_add_time)
  591. if not is_more_than_one_day:
  592. logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经当天邀请,不再邀请')
  593. continue
  594. ret, msg, data = await gewe_service.add_group_member_as_friend_async(token_id, app_id, chatroom_id, m.get('wxid'), f'我是群聊"{chatroom_nickname}"群的{nickname}')
  595. if ret!=200:
  596. logger.warning(f'群好友邀请失败原因:{ret} {data}')
  597. if msg in '操作过于频繁,请稍后再试。':
  598. await gewe_service.save_wx_expection_async(wxid,"addGroupMemberAsFriend",msg,today_seconds_remaining())
  599. logger.warning(f'{nickname}-{wxid} 操作过于频繁,本次群好友邀请任务未完成跳过。')
  600. continue
  601. history=AddGroupContactsHistory.model_validate({
  602. "chatroomId":chatroom_id,
  603. "wxid":wxid,
  604. "contactWixd":contact_wxid,
  605. "addTime":int(time.time())
  606. })
  607. await gewe_service.save_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid,history)
  608. wixd_add_contacts_from_chatrooms_times[wxid]+=1
  609. logger.info(f'{nickname} 向 {chatroom_nickname}-{chatroom_id} 群的 {m.get("nickName")}-{m.get("wxid")} 发送好友邀请 {msg}')
  610. # 推送到kafka
  611. k_message = wx_add_contacts_from_chatroom_message(history.wxid,history.chatroomId,history.contactWixd,history.addTime)
  612. await kafka_service.send_message_async(k_message)
  613. #await asyncio.sleep(random.uniform(1.5, 3))
  614. await asyncio.sleep(random.uniform(30,60))
  615. # 任务状态推送到kafka
  616. task_status=await gewe_service.wx_add_contacts_from_chatroom_task_status_async(wxid,chatroom_id)
  617. wx_add_contacts_from_chatroom_task_status(wxid,chatroom_id,task_status)
  618. await kafka_service.send_message_async(k_message)
  619. # 下一个群
  620. await asyncio.sleep(random.uniform(1.5, 3))
  621. # except Exception as e:
  622. # logger.error(f"任务执行过程中发生异常: {e}")
  623. except Exception as e:
  624. # 获取当前的堆栈跟踪
  625. tb = sys.exc_info()[2]
  626. # 为异常附加堆栈跟踪
  627. e = e.with_traceback(tb)
  628. # 输出详细的错误信息
  629. logger.error(f"任务执行过程中发生异常: {e}\n异常类型: {type(e).__name__}\n异常信息: {str(e)}\n堆栈跟踪: {traceback.format_exc()}")
  630. # logger.error(f"异常类型: {type(e).__name__}")
  631. # logger.error(f"异常信息: {str(e)}")
  632. # logger.error(f"堆栈跟踪: {traceback.format_exc()}")
  633. finally:
  634. await kafka_service.stop_producer()
  635. loop = asyncio.get_event_loop()
  636. if loop.is_closed():
  637. loop = asyncio.new_event_loop()
  638. asyncio.set_event_loop(loop)
  639. loop.run_until_complete(task()) # 在现有事件循环中运行任务
  640. REDIS_KEY_PATTERN = "friend_add_limit:{date}"
  641. REDIS_LAST_RUN_KEY = "last_run_time:add_friends_task"
  642. @celery_app.task(name='tasks.add_friends_task', bind=True, acks_late=True)
  643. def add_friends_task(self,redis_config):
  644. """
  645. 限制每天最多 15 个,每 2 小时最多 8 个
  646. """
  647. async def task():
  648. redis_service = RedisService()
  649. await redis_service.init(**redis_config)
  650. today_str = datetime.now().strftime("%Y%m%d")
  651. redis_key = REDIS_KEY_PATTERN.format(date=today_str)
  652. # 获取当前总添加数量
  653. total_added = await redis_service.get_hash_field(redis_key, "total") or 0
  654. last_2h_added =await redis_service.get_hash_field(redis_key, "last_2h") or 0
  655. total_added = int(total_added)
  656. last_2h_added = int(last_2h_added)
  657. logger.info(f"当前添加好友总数: {total_added}, 过去2小时添加: {last_2h_added}")
  658. # 判断是否超过限制
  659. if total_added >= 15:
  660. logger.warning("今日好友添加已达上限!")
  661. return
  662. if last_2h_added >= 8:
  663. logger.warning("过去2小时添加已达上限!")
  664. return
  665. # 计算本次要添加的好友数量 (控制每天 5-15 个)
  666. max_add = min(15 - total_added, 8 - last_2h_added)
  667. if max_add <= 0:
  668. return
  669. num_to_add = min(max_add, 1) # 每次最多加 1 个
  670. logger.info(f"本次添加 {num_to_add} 位好友")
  671. # TODO: 调用好友添加逻辑 (接口 or 业务逻辑)
  672. # success = add_friends(num_to_add)
  673. success = num_to_add # 假设成功添加 num_to_add 个
  674. # 更新 Redis 计数
  675. if success > 0:
  676. await redis_service.increment_hash_field(redis_key, "total", success)
  677. await redis_service.increment_hash_field(redis_key, "last_2h", success)
  678. # 设置 Redis 过期时间 (每日记录存 1 天, 2 小时记录存 2 小时)
  679. await redis_service.expire(redis_key, 86400) # 24小时
  680. await redis_service.expire_field(redis_key, "last_2h", 7200) # 2小时
  681. logger.info(f"成功添加 {success} 位好友, 今日总数 {total_added + success}")
  682. # 生成一个新的随机时间(5-15 分钟之间)
  683. # next_interval = random.randint(10, 20)
  684. # # 计算新的执行时间
  685. # next_run_time = datetime.datetime.now() + timedelta(seconds=next_interval)
  686. # # 重新注册 RedBeat 任务,确保下次执行时间不同
  687. # redbeat_entry = RedBeatSchedulerEntry(
  688. # name="redbeat:add_friends_task",
  689. # task="tasks.add_friends_task",
  690. # schedule=celery.schedules.schedule(timedelta(seconds=next_interval)),
  691. # args=[redis_config],
  692. # app=celery_app
  693. # )
  694. # # 设置任务的下次执行时间
  695. # redbeat_entry.last_run_at = next_run_time
  696. # redbeat_entry.save()
  697. # logger.info(f"下次任务将在 {next_run_time} 执行(间隔 {next_interval} 秒)")
  698. loop = asyncio.get_event_loop()
  699. if loop.is_closed():
  700. loop = asyncio.new_event_loop()
  701. asyncio.set_event_loop(loop)
  702. loop.run_until_complete(task()) # 在现有事件循环中运行任务
  703. def today_seconds_remaining()->int:
  704. current_time = datetime.now()
  705. # 计算当天的结束时间(23:59:59)
  706. end_of_day = datetime(current_time.year, current_time.month, current_time.day, 23, 59, 59)
  707. # 计算时间差
  708. time_difference = end_of_day - current_time
  709. # 将时间差转换为秒数
  710. time_difference_seconds = int(time_difference.total_seconds())
  711. return time_difference_seconds
  712. @celery_app.task(name='tasks.random_scheduled_task', bind=True, acks_late=True)
  713. def random_scheduled_task(self,):
  714. print(f"Task executed at {datetime.now()}")
  715. # 随机生成下次执行时间(例如:10-60秒内的随机时间)
  716. next_run_in = random.randint(10, 60)
  717. print(f"Next execution will be in {next_run_in} seconds")
  718. # 设置下次执行时间
  719. entry = RedBeatSchedulerEntry(
  720. name='random-task',
  721. task='tasks.random_scheduled_task',
  722. schedule=timedelta(seconds=next_run_in),
  723. app=celery_app
  724. )
  725. entry.save()
  726. return f"Scheduled next run in {next_run_in} seconds"