You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

860 satır
42KB

  1. from celery_app import celery_app
  2. from fastapi import Request,FastAPI
  3. import time,datetime
  4. from celery import Celery
  5. import celery.schedules
  6. from redbeat import RedBeatSchedulerEntry
  7. from datetime import timedelta
  8. from services.redis_service import RedisService
  9. from services.kafka_service import KafkaService
  10. from services.gewe_service import GeWeService
  11. from common.log import logger
  12. from common.utils import *
  13. import asyncio,random
  14. from model.models import AddGroupContactsHistory
  15. @celery_app.task(name='tasks.add_task', bind=True, acks_late=True)
  16. def add_task(self, x, y):
  17. time.sleep(5) # 模拟长时间计算
  18. logger.info('add')
  19. return x + y
  20. @celery_app.task(name='tasks.mul_task', bind=True, acks_late=True)
  21. def mul_task(self, x, y):
  22. time.sleep(5) # 模拟长时间计算
  23. return x * y
  24. # @celery.task(name='app.tasks.sync_contacts', bind=True, acks_late=True)
  25. # async def sync_contacts_task(self,app):
  26. # login_keys = list(await app.state.redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'))
  27. # return login_keys
  28. # # for k in login_keys:
  29. # # print(k)
  30. @celery_app.task(name='tasks.sync_contacts', bind=True, acks_late=True)
  31. async def sync_contacts_task(self, redis_service):
  32. # Use the redis_service passed as an argument
  33. login_keys = list(await redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'))
  34. return login_keys
  35. @celery_app.task(name='tasks.background_worker_task', bind=True, acks_late=True)
  36. def background_worker_task(self, redis_config, kafka_config, gewe_config):
  37. async def task():
  38. redis_service = RedisService()
  39. await redis_service.init(**redis_config)
  40. login_keys = []
  41. async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
  42. login_keys.append(key)
  43. print(login_keys)
  44. asyncio.run(task())
  45. # @celery.task(name='tasks.background_worker_task', bind=True, acks_late=True)
  46. # async def background_worker_task(self, redis_config, kafka_config, gewe_config):
  47. # # Initialize services inside the task
  48. # redis_service = RedisService()
  49. # await redis_service.init(**redis_config)
  50. # login_keys = []
  51. # async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): # 使用 async for 遍历异步生成器
  52. # login_keys.append(key)
  53. # print(login_keys)
  54. # kafka_service = KafkaService(**kafka_config)
  55. # await kafka_service.start()
  56. # gewe_service = await GeWeService.get_instance(None, gewe_config['api_url'])
  57. # # Task logic
  58. # lock_name = "background_wxchat_thread_lock"
  59. # lock_identifier = str(time.time())
  60. # while True:
  61. # if await redis_service.acquire_lock(lock_name, timeout=10):
  62. # try:
  63. # logger.info("分布式锁已成功获取")
  64. # # Perform task logic
  65. # finally:
  66. # await redis_service.release_lock(lock_name, lock_identifier)
  67. # break
  68. # else:
  69. # logger.info("获取分布式锁失败,等待10秒后重试...")
  70. # await asyncio.sleep(10)
  71. # @celery_app.task(name='tasks.scheduled_task', bind=True, acks_late=True)
  72. # def scheduled_task(self):
  73. # print("定时任务执行成功!~~~~~~~~~~~~~~~~~")
  74. # return "Hello from Celery Beat + RedBeat!"
  75. # @celery_app.task(name='tasks.scheduled_task_sync_wx', bind=True, acks_late=True)
  76. # def scheduled_task_sync_wx(self,redis_service,kafka_service,gewe_service):
  77. # print("scheduled_task_sync_wx 定时任务执行成功!")
  78. # return "Hello from Celery Beat + RedBeat!"
  79. # @celery_app.task(name='tasks.scheduled_task_sync_wx_info_1', bind=True, acks_late=True)
  80. # def scheduled_task_sync_wx_info_1(self,redis_config, kafka_config, gewe_config):
  81. # '''
  82. # 定时获取微信号资料
  83. # '''
  84. # loop = asyncio.new_event_loop()
  85. # asyncio.set_event_loop(loop)
  86. # async def task():
  87. # try:
  88. # redis_service = RedisService()
  89. # await redis_service.init(**redis_config)
  90. # # gewe_service = await GeWeService.get_instance(None, gewe_config['api_url'])
  91. # login_keys = []
  92. # async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
  93. # login_keys.append(key)
  94. # print(login_keys)
  95. # # for k in login_keys:
  96. # # r = await redis_service.get_hash(k)
  97. # # app_id = r.get("appId")
  98. # # token_id = r.get("tokenId")
  99. # # wxid = r.get("wxid")
  100. # # status = r.get('status')
  101. # # if status == '0':
  102. # # continue
  103. # # ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id)
  104. # # if ret != 200:
  105. # # logger.warning(f"同步微信号 {wxid} 资料失败: {ret}-{msg}")
  106. # # continue
  107. # # nickname=profile.get("nickName")
  108. # # head_img_url=profile.get("smallHeadImgUrl")
  109. # # r.update({"nickName":nickname,"headImgUrl":head_img_url,"modify_at":int(time.time())})
  110. # # cleaned_login_info = {k: (v if v is not None else '') for k, v in r.items()}
  111. # # await redis_service.set_hash(k, cleaned_login_info)
  112. # # logger.info(f"同步微信号 {wxid} 资料 成功")
  113. # # redis_service.update_hash_field(k,"nickName",nickname)
  114. # # redis_service.update_hash_field(k,"headImgUrl",head_img_url)
  115. # # redis_service.update_hash_field(k,"modify_at",int(time.time()))
  116. # except Exception as e:
  117. # logger.error(f"任务执行过程中发生异常: {e}")
  118. # print("scheduled_task_sync_wx_info 定时任务执行成功!")
  119. # return "Hello from Celery Beat + RedBeat!"
  120. # loop.run_until_complete(task())
  121. # loop.close()
  122. @celery_app.task(name='tasks.scheduled_task_sync_wx_info', bind=True, acks_late=True)
  123. def scheduled_task_sync_wx_info(self, redis_config, kafka_config, gewe_config):
  124. '''
  125. 定时获取微信号资料
  126. '''
  127. async def task():
  128. try:
  129. redis_service = RedisService()
  130. await redis_service.init(**redis_config)
  131. gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url'])
  132. login_keys = []
  133. async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
  134. login_keys.append(key)
  135. print(login_keys)
  136. for k in login_keys:
  137. r = await redis_service.get_hash(k)
  138. app_id = r.get("appId")
  139. token_id = r.get("tokenId")
  140. wxid = r.get("wxid")
  141. status = r.get('status')
  142. if status == '0':
  143. logger.warning(f"微信号 {wxid} 已经离线")
  144. continue
  145. ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id)
  146. if ret != 200:
  147. logger.warning(f"同步微信号 {wxid} 资料失败: {ret}-{msg}")
  148. continue
  149. nickname=profile.get("nickName")
  150. head_img_url=profile.get("smallHeadImgUrl")
  151. # print(nickname)
  152. nickname=profile.get("nickName")
  153. head_img_url=profile.get("smallHeadImgUrl")
  154. r.update({"nickName":nickname,"headImgUrl":head_img_url,"modify_at":int(time.time())})
  155. cleaned_login_info = {k: (v if v is not None else '') for k, v in r.items()}
  156. await redis_service.set_hash(k, cleaned_login_info)
  157. logger.info(f"定时同步微信号{wxid}-昵称{nickname} 资料成功")
  158. except Exception as e:
  159. logger.error(f"任务执行过程中发生异常: {e}")
  160. loop = asyncio.get_event_loop()
  161. if loop.is_closed():
  162. loop = asyncio.new_event_loop()
  163. asyncio.set_event_loop(loop)
  164. loop.run_until_complete(task()) # 在现有事件循环中运行任务
  165. #@celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms_p', bind=True, acks_late=True)
  166. def scheduled_task_add_contacts_from_chatrooms_p(self, redis_config, kafka_config, gewe_config):
  167. async def task():
  168. try:
  169. now = datetime.now()
  170. if now.hour < 8:
  171. logger.info(f"定时群成员定时添好友任务不启动,当前时间为 {now.strftime('%Y-%m-%d %H:%M:%S')},早于8点")
  172. return
  173. logger.info('定时群成员定时添好友任务开始')
  174. redis_service = RedisService()
  175. await redis_service.init(**redis_config)
  176. gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url'])
  177. login_keys = []
  178. async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
  179. login_keys.append(key)
  180. #print(login_keys)
  181. for k in login_keys:
  182. r = await redis_service.get_hash(k)
  183. app_id = r.get("appId")
  184. token_id = r.get("tokenId")
  185. wxid = r.get("wxid")
  186. status = r.get('status')
  187. if status == '0':
  188. logger.warning(f"微信号 {wxid} 已经离线,群成员不能定时添加")
  189. continue
  190. c = await gewe_service.get_wxchat_config_from_cache_async(wxid)
  191. contacts = await gewe_service.get_contacts_brief_from_cache_async(wxid)
  192. contact_wxids = [c.get('userName') for c in contacts]
  193. chatrooms = c.get('addContactsFromChatroomIdWhiteList', [])
  194. logger.info(f'{wxid} 定时群成员定时添好友任务开始')
  195. for chatroom_id in chatrooms:
  196. chatroom = await gewe_service.get_group_info_from_cache_async(wxid, chatroom_id)
  197. chatroom_member=await gewe_service.get_group_members_from_cache_async(wxid, chatroom_id)
  198. chatroom_nickname = chatroom.get('nickName')
  199. chatroom_owner_wxid = chatroom_member.get('chatroomOwner', None)
  200. admin_wxids = chatroom_member.get('adminWxid', [])
  201. admin_wxids = chatroom_member.get('adminWxid')
  202. if admin_wxids is None:
  203. admin_wxids = [] # 如果 admin_wxids 是 None,将其初始化为空列表
  204. # 判断当天群成员是否已经加了200个好友
  205. is_add_group_200_times = await gewe_service.is_group_add_contacts_history_one_day_200_async(wxid, chatroom_id)
  206. if is_add_group_200_times:
  207. logger.info(f"{wxid}在 {chatroom_nickname} 群成员已经加了200个好友,不再添加,群id:{chatroom_id}")
  208. continue
  209. logger.info(f'{chatroom_nickname} 的群主是 {chatroom_owner_wxid},管理员是{admin_wxids}')
  210. contact_wxids_set = set(contact_wxids)
  211. # for admin_wxid in admin_wxids:
  212. # contact_wxids_set.add(admin_wxid)
  213. if admin_wxids:
  214. contact_wxids_set.update(set(admin_wxids))
  215. contact_wxids_set.update(set(admin_wxids))
  216. if chatroom_owner_wxid is not None:
  217. contact_wxids_set.add(chatroom_owner_wxid)
  218. contact_wxids_set.add(wxid)
  219. # unavailable_wixds=await gewe_service.check_wixd_group_add_contacts_history_async(wxid,chatroom_id)
  220. # contact_wxids_set.update(unavailable_wixds)
  221. chatroot_member_list = chatroom.get('memberList', [])
  222. remaining_chatroot_members = [x for x in chatroot_member_list if x.get('wxid') not in contact_wxids_set]
  223. nickname = next((member['nickName'] for member in chatroot_member_list if member['wxid'] == wxid), None)
  224. logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里还可以邀请的好友有:{[x.get("nickName") for x in remaining_chatroot_members]}')
  225. for m in remaining_chatroot_members:
  226. contact_wxid= m.get('wxid')
  227. member_nickname=m.get("nickName")
  228. group_add_contacts_history = await gewe_service.get_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid)
  229. sorted_history = sorted(group_add_contacts_history, key=lambda x: x.addTime, reverse=True)
  230. # 已经邀请过两次,不再邀请
  231. if len(sorted_history)==2:
  232. logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经邀请过2次,不再邀请')
  233. continue
  234. # 当天邀请过,不再邀请
  235. if len(sorted_history) > 0:
  236. last_add_time = sorted_history[0].addTime
  237. def is_add_time_more_than_one_day(addTime: int) -> bool:
  238. """
  239. 判断 addTime 是否与当前时间相隔大于 3600 × 24 秒
  240. :param addTime: Unix 时间戳
  241. :return: 如果 addTime 与当前时间相隔大于 3600 × 24 秒,返回 True;否则返回 False
  242. """
  243. # 获取当前时间的时间戳
  244. current_time = time.time()
  245. # 计算时间戳差值
  246. time_difference = abs(current_time - addTime)
  247. # 检查是否大于 3600 × 24 秒
  248. return time_difference > 3600 * 24
  249. is_more_than_one_day= is_add_time_more_than_one_day(last_add_time)
  250. if not is_more_than_one_day:
  251. logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的{member_nickname}-{contact_wxid}已经当天邀请,不再邀请')
  252. continue
  253. ret, msg, data = await gewe_service.add_group_member_as_friend_async(token_id, app_id, chatroom_id, m.get('wxid'), f'我是群聊"{chatroom_nickname}"群的{nickname}')
  254. # if ret==200:
  255. # history=AddGroupContactsHistory.model_validate({
  256. # "chatroomId":chatroom_id,
  257. # "wxid":wxid,
  258. # "contactWixd":contact_wxid,
  259. # "addTime":int(time.time())
  260. # })
  261. # await gewe_service.save_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid,history)
  262. # else:
  263. # logger.info(f'群好友邀请失败原因:{data}')
  264. if ret!=200:
  265. logger.warning(f'群好友邀请失败原因:{ret} {data}')
  266. history=AddGroupContactsHistory.model_validate({
  267. "chatroomId":chatroom_id,
  268. "wxid":wxid,
  269. "contactWixd":contact_wxid,
  270. "addTime":int(time.time())
  271. })
  272. await gewe_service.save_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid,history)
  273. logger.info(f'{nickname} 向 {chatroom_nickname}-{chatroom_id} 群的 {m.get("nickName")}-{m.get("wxid")} 发送好友邀请 {msg}')
  274. await asyncio.sleep(random.uniform(1.5, 3))
  275. await asyncio.sleep(random.uniform(1.5, 3))
  276. except Exception as e:
  277. logger.error(f"任务执行过程中发生异常: {e}")
  278. loop = asyncio.get_event_loop()
  279. if loop.is_closed():
  280. loop = asyncio.new_event_loop()
  281. asyncio.set_event_loop(loop)
  282. loop.run_until_complete(task()) # 在现有事件循环中运行任务
  283. #@celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms_p2', bind=True, acks_late=True)
  284. def scheduled_task_add_contacts_from_chatrooms_p2(self, redis_config, kafka_config, gewe_config):
  285. '''
  286. 关于群加好友的请求规则:一次30人,间隔2小时做1次,一天做3次,即最多90人/天。
  287. '''
  288. async def task():
  289. try:
  290. now = datetime.now()
  291. if now.hour < 8:
  292. logger.info(f"定时群成员定时添好友任务不启动,当前时间为 {now.strftime('%Y-%m-%d %H:%M:%S')},早于8点")
  293. return
  294. logger.info('定时群成员定时添好友任务开始')
  295. redis_service = RedisService()
  296. await redis_service.init(**redis_config)
  297. gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url'])
  298. KAFKA_BOOTSTRAP_SERVERS=kafka_config['bootstrap_servers']
  299. KAFKA_TOPIC=kafka_config['topic']
  300. KAFKA_GROUP_ID=kafka_config['group_id']
  301. kafka_service= KafkaService(KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC, KAFKA_TOPIC,KAFKA_GROUP_ID)
  302. await kafka_service.start()
  303. login_keys = []
  304. async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
  305. login_keys.append(key)
  306. wixd_add_contacts_from_chatrooms_times = {}
  307. #print(login_keys)
  308. for k in login_keys:
  309. r = await redis_service.get_hash(k)
  310. app_id = r.get("appId")
  311. token_id = r.get("tokenId")
  312. wxid = r.get("wxid")
  313. status = r.get('status')
  314. if status == '0':
  315. logger.warning(f"微信号 {wxid} 已经离线,群成员不能定时添加")
  316. continue
  317. c = await gewe_service.get_wxchat_config_from_cache_async(wxid)
  318. contacts = await gewe_service.get_contacts_brief_from_cache_async(wxid)
  319. contact_wxids = [c.get('userName') for c in contacts]
  320. chatrooms = c.get('addContactsFromChatroomIdWhiteList', [])
  321. logger.info(f'{wxid} 定时群成员定时添好友任务开始')
  322. wixd_add_contacts_from_chatrooms_times[wxid] = 0
  323. for chatroom_id in chatrooms:
  324. chatroom = await gewe_service.get_group_info_from_cache_async(wxid, chatroom_id)
  325. chatroom_member=await gewe_service.get_group_members_from_cache_async(wxid, chatroom_id)
  326. chatroom_nickname = chatroom.get('nickName')
  327. chatroom_owner_wxid = chatroom_member.get('chatroomOwner', None)
  328. admin_wxids = chatroom_member.get('adminWxid', [])
  329. admin_wxids = chatroom_member.get('adminWxid')
  330. if admin_wxids is None:
  331. admin_wxids = [] # 如果 admin_wxids 是 None,将其初始化为空列表
  332. logger.info(f'{chatroom_nickname} 的群主是 {chatroom_owner_wxid},管理员是{admin_wxids}')
  333. contact_wxids_set = set(contact_wxids)
  334. # for admin_wxid in admin_wxids:
  335. # contact_wxids_set.add(admin_wxid)
  336. if admin_wxids:
  337. contact_wxids_set.update(set(admin_wxids))
  338. if chatroom_owner_wxid is not None:
  339. contact_wxids_set.add(chatroom_owner_wxid)
  340. contact_wxids_set.add(wxid)
  341. unavailable_wixds=await gewe_service.check_wixd_group_add_contacts_history_async(wxid,chatroom_id)
  342. if unavailable_wixds:
  343. contact_wxids_set.update(set(unavailable_wixds))
  344. chatroom_member_list = chatroom.get('memberList', [])
  345. if chatroom_member_list is None:
  346. chatroom_member_list = [] # 如果 memberList 是 None,将其初始化为空列表
  347. elif not isinstance(chatroom_member_list, list):
  348. chatroom_member_list = list(chatroom_member_list) # 如果 memberList 不是列表,将其转换为列表
  349. remaining_chatroot_members = [x for x in chatroom_member_list if x.get('wxid') not in contact_wxids_set]
  350. nickname = next((member['nickName'] for member in chatroom_member_list if member['wxid'] == wxid), None)
  351. logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里还可以邀请的好友有:{[x.get("nickName") for x in remaining_chatroot_members]}')
  352. for m in remaining_chatroot_members:
  353. # 判断本次任务是否已经邀请了30个好友
  354. if wixd_add_contacts_from_chatrooms_times[wxid] == 30:
  355. logger.info(f"{wxid} 本次任务已经邀请了30人,不再邀请")
  356. return
  357. # 判断当天群成员是否已经加了90个好友
  358. is_add_group_90_times = await gewe_service.is_group_add_contacts_history_one_day_90_async(wxid)
  359. if is_add_group_90_times:
  360. logger.info(f"当天 {wxid} 所有群的成员已经加了90个好友,不再添加")
  361. return
  362. contact_wxid= m.get('wxid')
  363. member_nickname=m.get("nickName")
  364. group_add_contacts_history = await gewe_service.get_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid)
  365. sorted_history = sorted(group_add_contacts_history, key=lambda x: x.addTime, reverse=True)
  366. # 已经邀请过两次,不再邀请
  367. if len(sorted_history)==2:
  368. logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经邀请过2次,不再邀请')
  369. continue
  370. # 当天邀请过,不再邀请
  371. if len(sorted_history) > 0:
  372. last_add_time = sorted_history[0].addTime
  373. def is_add_time_more_than_one_day(addTime: int) -> bool:
  374. """
  375. 判断 addTime 是否与当前时间相隔大于 3600 × 24 秒
  376. :param addTime: Unix 时间戳
  377. :return: 如果 addTime 与当前时间相隔大于 3600 × 24 秒,返回 True;否则返回 False
  378. """
  379. # 获取当前时间的时间戳
  380. current_time = time.time()
  381. # 计算时间戳差值
  382. time_difference = abs(current_time - addTime)
  383. # 检查是否大于 3600 × 24 秒
  384. return time_difference > 3600 * 24
  385. is_more_than_one_day= is_add_time_more_than_one_day(last_add_time)
  386. if not is_more_than_one_day:
  387. logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的{member_nickname}-{contact_wxid}已经当天邀请,不再邀请')
  388. continue
  389. ret, msg, data = await gewe_service.add_group_member_as_friend_async(token_id, app_id, chatroom_id, m.get('wxid'), f'我是群聊"{chatroom_nickname}"群的{nickname}')
  390. if ret!=200:
  391. logger.warning(f'群好友邀请失败原因:{ret} {data}')
  392. history=AddGroupContactsHistory.model_validate({
  393. "chatroomId":chatroom_id,
  394. "wxid":wxid,
  395. "contactWixd":contact_wxid,
  396. "addTime":int(time.time())
  397. })
  398. await gewe_service.save_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid,history)
  399. wixd_add_contacts_from_chatrooms_times[wxid]+=1
  400. logger.info(f'{nickname} 向 {chatroom_nickname}-{chatroom_id} 群的 {m.get("nickName")}-{m.get("wxid")} 发送好友邀请 {msg}')
  401. # 推送到kafka
  402. k_message = wx_add_contacts_from_chatroom_message(history.wxid,history.chatroomId,history.contactWixd,history.addTime)
  403. await kafka_service.send_message_async(k_message)
  404. await asyncio.sleep(random.uniform(1.5, 3))
  405. await asyncio.sleep(random.uniform(1.5, 3))
  406. except Exception as e:
  407. logger.error(f"任务执行过程中发生异常: {e}")
  408. loop = asyncio.get_event_loop()
  409. if loop.is_closed():
  410. loop = asyncio.new_event_loop()
  411. asyncio.set_event_loop(loop)
  412. loop.run_until_complete(task()) # 在现有事件循环中运行任务
  413. @celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms', bind=True, acks_late=True)
  414. def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config, gewe_config):
  415. '''
  416. 关于群加好友的请求规则:一次30人,间隔2小时做1次,一天做3次,即最多90人/天。
  417. 加好友规则:每天处理次数、间隔时间(分钟)、每次加好友人数这3个参数都可以设置。目前默认只是上面的设置。
  418. '''
  419. async def task():
  420. try:
  421. now = datetime.now()
  422. if now.hour < 8:
  423. logger.info(f"定时群成员定时添好友任务不启动,当前时间为 {now.strftime('%Y-%m-%d %H:%M:%S')},早于8点")
  424. return
  425. logger.info('定时群成员定时添好友任务开始')
  426. redis_service = RedisService()
  427. await redis_service.init(**redis_config)
  428. gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url'])
  429. KAFKA_BOOTSTRAP_SERVERS=kafka_config['bootstrap_servers']
  430. KAFKA_TOPIC=kafka_config['topic']
  431. KAFKA_GROUP_ID=kafka_config['group_id']
  432. kafka_service= KafkaService(KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC, KAFKA_TOPIC,KAFKA_GROUP_ID)
  433. await kafka_service.start_producer()
  434. global_config=await gewe_service.get_global_config_from_cache_async()
  435. scheduled_task_add_contacts_from_chatrooms_config=global_config.get('scheduledTaskAddContactsFromChatrooms',{})
  436. oneday_add_contacts_total=90
  437. once_add_contacts_total=30
  438. oneday_times=3
  439. if scheduled_task_add_contacts_from_chatrooms_config:
  440. oneday_add_contacts_total=scheduled_task_add_contacts_from_chatrooms_config.get('oneDayAddContactsTotal',90)
  441. once_add_contacts_total=scheduled_task_add_contacts_from_chatrooms_config.get('onceAddContactsTotal',30)
  442. oneday_times=scheduled_task_add_contacts_from_chatrooms_config.get('oneDayTimes',3)
  443. cache_task_run_time_logs= await gewe_service.get_task_run_time_async('scheduled_task_add_contacts_from_chatrooms')
  444. if len(cache_task_run_time_logs) == oneday_times:
  445. logger.info(f"今日定时群成员定时添好友任务已达上限 {oneday_times} 次!")
  446. return
  447. if cache_task_run_time_logs:
  448. sorted_tasks = sorted(cache_task_run_time_logs, key=lambda x: x.get("runTime"), reverse=True)
  449. last_run_time=sorted_tasks[0].get("runTime")
  450. if last_run_time > 1e12: # 毫秒级时间戳
  451. last_run_time = last_run_time / 1000 # 转换为秒
  452. # 将时间戳转换为 datetime 对象
  453. last_run_time = datetime.fromtimestamp(last_run_time)
  454. # 获取当前时间
  455. current_time = datetime.now()
  456. # 计算时间差
  457. time_difference = current_time - last_run_time
  458. # 判断是否相差2小时
  459. if time_difference < timedelta(hours=2):
  460. logger.info(f"上次定时群成员定时添好友任务在2小时内,不再执行")
  461. return
  462. # 获取当前时间
  463. current_time = datetime.now()
  464. # 计算当天的结束时间(23:59:59)
  465. end_of_day = datetime(current_time.year, current_time.month, current_time.day, 23, 59, 59)
  466. # 计算时间差
  467. time_difference = end_of_day - current_time
  468. # 将时间差转换为秒数
  469. time_difference_seconds = int(time_difference.total_seconds())
  470. cache_task_run_time_logs.append({"runTime":int(time.time())})
  471. await gewe_service.save_task_run_time_async('scheduled_task_add_contacts_from_chatrooms',cache_task_run_time_logs,time_difference_seconds)
  472. login_keys = []
  473. async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
  474. login_keys.append(key)
  475. wixd_add_contacts_from_chatrooms_times = {}
  476. for k in login_keys:
  477. r = await redis_service.get_hash(k)
  478. app_id = r.get("appId")
  479. token_id = r.get("tokenId")
  480. wxid = r.get("wxid")
  481. status = r.get('status')
  482. if status == '0':
  483. logger.warning(f"微信号 {wxid} 已经离线,群成员不能定时添加")
  484. continue
  485. c = await gewe_service.get_wxchat_config_from_cache_async(wxid)
  486. contacts = await gewe_service.get_contacts_brief_from_cache_async(wxid)
  487. contact_wxids = [c.get('userName') for c in contacts]
  488. chatrooms = c.get('addContactsFromChatroomIdWhiteList', [])
  489. logger.info(f'{wxid} 定时群成员定时添好友任务开始')
  490. wixd_add_contacts_from_chatrooms_times[wxid] = 0
  491. for chatroom_id in chatrooms:
  492. chatroom = await gewe_service.get_group_info_from_cache_async(wxid, chatroom_id)
  493. chatroom_member=await gewe_service.get_group_members_from_cache_async(wxid, chatroom_id)
  494. chatroom_nickname = chatroom.get('nickName')
  495. chatroom_owner_wxid = chatroom_member.get('chatroomOwner', None)
  496. admin_wxids = chatroom_member.get('adminWxid', [])
  497. admin_wxids = chatroom_member.get('adminWxid')
  498. if admin_wxids is None:
  499. admin_wxids = [] # 如果 admin_wxids 是 None,将其初始化为空列表
  500. logger.info(f'{chatroom_nickname} 的群主是 {chatroom_owner_wxid},管理员是{admin_wxids}')
  501. contact_wxids_set = set(contact_wxids)
  502. # for admin_wxid in admin_wxids:
  503. # contact_wxids_set.add(admin_wxid)
  504. if admin_wxids:
  505. contact_wxids_set.update(set(admin_wxids))
  506. if chatroom_owner_wxid is not None:
  507. contact_wxids_set.add(chatroom_owner_wxid)
  508. contact_wxids_set.add(wxid)
  509. # unavailable_wixds=await gewe_service.check_wixd_group_add_contacts_history_async(wxid,chatroom_id)
  510. # contact_wxids_set.update(set(unavailable_wixds))
  511. # chatroom_member_list = chatroom.get('memberList', [])
  512. unavailable_wixds=await gewe_service.check_wixd_group_add_contacts_history_async(wxid,chatroom_id)
  513. if unavailable_wixds:
  514. contact_wxids_set.update(set(unavailable_wixds))
  515. chatroom_member_list = chatroom.get('memberList', [])
  516. if chatroom_member_list is None:
  517. chatroom_member_list = [] # 如果 memberList 是 None,将其初始化为空列表
  518. elif not isinstance(chatroom_member_list, list):
  519. chatroom_member_list = list(chatroom_member_list) # 如果 memberList 不是列表,将其转换为列表
  520. remaining_chatroot_members = [x for x in chatroom_member_list if x.get('wxid') not in contact_wxids_set]
  521. nickname = next((member['nickName'] for member in chatroom_member_list if member['wxid'] == wxid), None)
  522. logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里还可以邀请的好友有:{[x.get("nickName") for x in remaining_chatroot_members]}')
  523. for m in remaining_chatroot_members:
  524. # 判断本次任务是否已经邀请了30个好友
  525. if wixd_add_contacts_from_chatrooms_times[wxid] == once_add_contacts_total:
  526. logger.info(f"{wxid} 本次任务已经邀请了{once_add_contacts_total}人,不再邀请")
  527. return
  528. # 判断当天群成员是否已经加了90个好友
  529. is_add_group_times = await gewe_service.is_group_add_contacts_history_one_day_async(wxid,oneday_add_contacts_total)
  530. if is_add_group_times:
  531. logger.info(f"当天 {wxid} 所有群的成员已经加了{oneday_add_contacts_total}个好友,不再添加")
  532. return
  533. contact_wxid= m.get('wxid')
  534. member_nickname=m.get("nickName")
  535. group_add_contacts_history = await gewe_service.get_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid)
  536. sorted_history = sorted(group_add_contacts_history, key=lambda x: x.addTime, reverse=True)
  537. # 已经邀请过两次,不再邀请
  538. if len(sorted_history)==2:
  539. logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经邀请过2次,不再邀请')
  540. continue
  541. # 当天邀请过,不再邀请
  542. if len(sorted_history) > 0:
  543. last_add_time = sorted_history[0].addTime
  544. def is_add_time_more_than_one_day(addTime: int) -> bool:
  545. """
  546. 判断 addTime 是否与当前时间相隔大于 3600 × 24 秒
  547. :param addTime: Unix 时间戳
  548. :return: 如果 addTime 与当前时间相隔大于 3600 × 24 秒,返回 True;否则返回 False
  549. """
  550. # 获取当前时间的时间戳
  551. current_time = time.time()
  552. # 计算时间戳差值
  553. time_difference = abs(current_time - addTime)
  554. # 检查是否大于 3600 × 24 秒
  555. return time_difference > 3600 * 24
  556. is_more_than_one_day= is_add_time_more_than_one_day(last_add_time)
  557. if not is_more_than_one_day:
  558. logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的{member_nickname}-{contact_wxid}已经当天邀请,不再邀请')
  559. continue
  560. ret, msg, data = await gewe_service.add_group_member_as_friend_async(token_id, app_id, chatroom_id, m.get('wxid'), f'我是群聊"{chatroom_nickname}"群的{nickname}')
  561. if ret!=200:
  562. logger.warning(f'群好友邀请失败原因:{ret} {data}')
  563. history=AddGroupContactsHistory.model_validate({
  564. "chatroomId":chatroom_id,
  565. "wxid":wxid,
  566. "contactWixd":contact_wxid,
  567. "addTime":int(time.time())
  568. })
  569. await gewe_service.save_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid,history)
  570. wixd_add_contacts_from_chatrooms_times[wxid]+=1
  571. logger.info(f'{nickname} 向 {chatroom_nickname}-{chatroom_id} 群的 {m.get("nickName")}-{m.get("wxid")} 发送好友邀请 {msg}')
  572. # 推送到kafka
  573. k_message = wx_add_contacts_from_chatroom_message(history.wxid,history.chatroomId,history.contactWixd,history.addTime)
  574. await kafka_service.send_message_async(k_message)
  575. await asyncio.sleep(random.uniform(1.5, 3))
  576. await asyncio.sleep(random.uniform(1.5, 3))
  577. except Exception as e:
  578. logger.error(f"任务执行过程中发生异常: {e}")
  579. finally:
  580. await kafka_service.stop_producer()
  581. loop = asyncio.get_event_loop()
  582. if loop.is_closed():
  583. loop = asyncio.new_event_loop()
  584. asyncio.set_event_loop(loop)
  585. loop.run_until_complete(task()) # 在现有事件循环中运行任务
  586. REDIS_KEY_PATTERN = "friend_add_limit:{date}"
  587. REDIS_LAST_RUN_KEY = "last_run_time:add_friends_task"
  588. @celery_app.task(name='tasks.add_friends_task', bind=True, acks_late=True)
  589. def add_friends_task(self,redis_config):
  590. """
  591. 限制每天最多 15 个,每 2 小时最多 8 个
  592. """
  593. async def task():
  594. redis_service = RedisService()
  595. await redis_service.init(**redis_config)
  596. today_str = datetime.datetime.now().strftime("%Y%m%d")
  597. redis_key = REDIS_KEY_PATTERN.format(date=today_str)
  598. # 获取当前总添加数量
  599. total_added = await redis_service.get_hash_field(redis_key, "total") or 0
  600. last_2h_added =await redis_service.get_hash_field(redis_key, "last_2h") or 0
  601. total_added = int(total_added)
  602. last_2h_added = int(last_2h_added)
  603. logger.info(f"当前添加好友总数: {total_added}, 过去2小时添加: {last_2h_added}")
  604. # 判断是否超过限制
  605. if total_added >= 15:
  606. logger.warning("今日好友添加已达上限!")
  607. return
  608. if last_2h_added >= 8:
  609. logger.warning("过去2小时添加已达上限!")
  610. return
  611. # 计算本次要添加的好友数量 (控制每天 5-15 个)
  612. max_add = min(15 - total_added, 8 - last_2h_added)
  613. if max_add <= 0:
  614. return
  615. num_to_add = min(max_add, 1) # 每次最多加 1 个
  616. logger.info(f"本次添加 {num_to_add} 位好友")
  617. # TODO: 调用好友添加逻辑 (接口 or 业务逻辑)
  618. # success = add_friends(num_to_add)
  619. success = num_to_add # 假设成功添加 num_to_add 个
  620. # 更新 Redis 计数
  621. if success > 0:
  622. await redis_service.increment_hash_field(redis_key, "total", success)
  623. await redis_service.increment_hash_field(redis_key, "last_2h", success)
  624. # 设置 Redis 过期时间 (每日记录存 1 天, 2 小时记录存 2 小时)
  625. await redis_service.expire(redis_key, 86400) # 24小时
  626. await redis_service.expire_field(redis_key, "last_2h", 7200) # 2小时
  627. logger.info(f"成功添加 {success} 位好友, 今日总数 {total_added + success}")
  628. # 生成一个新的随机时间(5-15 分钟之间)
  629. # next_interval = random.randint(10, 20)
  630. # # 计算新的执行时间
  631. # next_run_time = datetime.datetime.now() + timedelta(seconds=next_interval)
  632. # # 重新注册 RedBeat 任务,确保下次执行时间不同
  633. # redbeat_entry = RedBeatSchedulerEntry(
  634. # name="redbeat:add_friends_task",
  635. # task="tasks.add_friends_task",
  636. # schedule=celery.schedules.schedule(timedelta(seconds=next_interval)),
  637. # args=[redis_config],
  638. # app=celery_app
  639. # )
  640. # # 设置任务的下次执行时间
  641. # redbeat_entry.last_run_at = next_run_time
  642. # redbeat_entry.save()
  643. # logger.info(f"下次任务将在 {next_run_time} 执行(间隔 {next_interval} 秒)")
  644. loop = asyncio.get_event_loop()
  645. if loop.is_closed():
  646. loop = asyncio.new_event_loop()
  647. asyncio.set_event_loop(loop)
  648. loop.run_until_complete(task()) # 在现有事件循环中运行任务
  649. @celery_app.task(name='tasks.random_scheduled_task', bind=True, acks_late=True)
  650. def random_scheduled_task(self,):
  651. print(f"Task executed at {datetime.datetime.now()}")
  652. # 随机生成下次执行时间(例如:10-60秒内的随机时间)
  653. next_run_in = random.randint(10, 60)
  654. print(f"Next execution will be in {next_run_in} seconds")
  655. # 设置下次执行时间
  656. entry = RedBeatSchedulerEntry(
  657. name='random-task',
  658. task='tasks.random_scheduled_task',
  659. schedule=timedelta(seconds=next_run_in),
  660. app=celery_app
  661. )
  662. entry.save()
  663. return f"Scheduled next run in {next_run_in} seconds"