您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符

1307 行
72KB

  1. from celery_app import celery_app
  2. from fastapi import Request,FastAPI
  3. import time,datetime
  4. from celery import Celery
  5. import celery.schedules
  6. from redbeat import RedBeatSchedulerEntry
  7. from datetime import timedelta
  8. from services.redis_service import RedisService
  9. from services.kafka_service import KafkaService
  10. from services.gewe_service import GeWeService
  11. # from common.log import logger
  12. from common.utils import *
  13. import asyncio,random
  14. from model.models import AddGroupContactsHistory
  15. import logging
  16. from model.models import AgentConfig
  17. import logging
  18. import sys,traceback
  19. logger = logging.getLogger('redbeat')
  20. @celery_app.task(name='tasks.add_task', bind=True, acks_late=True)
  21. def add_task(self, x, y):
  22. time.sleep(5) # 模拟长时间计算
  23. logger.info('add')
  24. return x + y
  25. @celery_app.task(name='tasks.mul_task', bind=True, acks_late=True)
  26. def mul_task(self, x, y):
  27. time.sleep(5) # 模拟长时间计算
  28. return x * y
  29. # @celery.task(name='app.tasks.sync_contacts', bind=True, acks_late=True)
  30. # async def sync_contacts_task(self,app):
  31. # login_keys = list(await app.state.redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'))
  32. # return login_keys
  33. # # for k in login_keys:
  34. # # print(k)
  35. @celery_app.task(name='tasks.sync_contacts', bind=True, acks_late=True)
  36. async def sync_contacts_task(self, redis_service):
  37. # Use the redis_service passed as an argument
  38. login_keys = list(await redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'))
  39. return login_keys
  40. @celery_app.task(name='tasks.background_worker_task', bind=True, acks_late=True)
  41. def background_worker_task(self, redis_config, kafka_config, gewe_config):
  42. async def task():
  43. redis_service = RedisService()
  44. await redis_service.init(**redis_config)
  45. login_keys = []
  46. async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
  47. login_keys.append(key)
  48. print(login_keys)
  49. asyncio.run(task())
  50. # @celery.task(name='tasks.background_worker_task', bind=True, acks_late=True)
  51. # async def background_worker_task(self, redis_config, kafka_config, gewe_config):
  52. # # Initialize services inside the task
  53. # redis_service = RedisService()
  54. # await redis_service.init(**redis_config)
  55. # login_keys = []
  56. # async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): # 使用 async for 遍历异步生成器
  57. # login_keys.append(key)
  58. # print(login_keys)
  59. # kafka_service = KafkaService(**kafka_config)
  60. # await kafka_service.start()
  61. # gewe_service = await GeWeService.get_instance(None, gewe_config['api_url'])
  62. # # Task logic
  63. # lock_name = "background_wxchat_thread_lock"
  64. # lock_identifier = str(time.time())
  65. # while True:
  66. # if await redis_service.acquire_lock(lock_name, timeout=10):
  67. # try:
  68. # logger.info("分布式锁已成功获取")
  69. # # Perform task logic
  70. # finally:
  71. # await redis_service.release_lock(lock_name, lock_identifier)
  72. # break
  73. # else:
  74. # logger.info("获取分布式锁失败,等待10秒后重试...")
  75. # await asyncio.sleep(10)
  76. # @celery_app.task(name='tasks.scheduled_task', bind=True, acks_late=True)
  77. # def scheduled_task(self):
  78. # print("定时任务执行成功!~~~~~~~~~~~~~~~~~")
  79. # return "Hello from Celery Beat + RedBeat!"
  80. # @celery_app.task(name='tasks.scheduled_task_sync_wx', bind=True, acks_late=True)
  81. # def scheduled_task_sync_wx(self,redis_service,kafka_service,gewe_service):
  82. # print("scheduled_task_sync_wx 定时任务执行成功!")
  83. # return "Hello from Celery Beat + RedBeat!"
  84. # @celery_app.task(name='tasks.scheduled_task_sync_wx_info_1', bind=True, acks_late=True)
  85. # def scheduled_task_sync_wx_info_1(self,redis_config, kafka_config, gewe_config):
  86. # '''
  87. # 定时获取微信号资料
  88. # '''
  89. # loop = asyncio.new_event_loop()
  90. # asyncio.set_event_loop(loop)
  91. # async def task():
  92. # try:
  93. # redis_service = RedisService()
  94. # await redis_service.init(**redis_config)
  95. # # gewe_service = await GeWeService.get_instance(None, gewe_config['api_url'])
  96. # login_keys = []
  97. # async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
  98. # login_keys.append(key)
  99. # print(login_keys)
  100. # # for k in login_keys:
  101. # # r = await redis_service.get_hash(k)
  102. # # app_id = r.get("appId")
  103. # # token_id = r.get("tokenId")
  104. # # wxid = r.get("wxid")
  105. # # status = r.get('status')
  106. # # if status == '0':
  107. # # continue
  108. # # ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id)
  109. # # if ret != 200:
  110. # # logger.warning(f"同步微信号 {wxid} 资料失败: {ret}-{msg}")
  111. # # continue
  112. # # nickname=profile.get("nickName")
  113. # # head_img_url=profile.get("smallHeadImgUrl")
  114. # # r.update({"nickName":nickname,"headImgUrl":head_img_url,"modify_at":int(time.time())})
  115. # # cleaned_login_info = {k: (v if v is not None else '') for k, v in r.items()}
  116. # # await redis_service.set_hash(k, cleaned_login_info)
  117. # # logger.info(f"同步微信号 {wxid} 资料 成功")
  118. # # redis_service.update_hash_field(k,"nickName",nickname)
  119. # # redis_service.update_hash_field(k,"headImgUrl",head_img_url)
  120. # # redis_service.update_hash_field(k,"modify_at",int(time.time()))
  121. # except Exception as e:
  122. # logger.error(f"任务执行过程中发生异常: {e}")
  123. # print("scheduled_task_sync_wx_info 定时任务执行成功!")
  124. # return "Hello from Celery Beat + RedBeat!"
  125. # loop.run_until_complete(task())
  126. # loop.close()
  127. # @celery_app.task(name='tasks.scheduled_task_sync_wx_info', bind=True, acks_late=True)
  128. # def scheduled_task_sync_wx_info(self, redis_config, kafka_config, gewe_config):
  129. # '''
  130. # 定时获取微信号资料
  131. # '''
  132. # async def task():
  133. # try:
  134. # redis_service = RedisService()
  135. # await redis_service.init(**redis_config)
  136. # gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url'])
  137. # login_keys = []
  138. # async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
  139. # login_keys.append(key)
  140. # #print(login_keys)
  141. # for k in login_keys:
  142. # r = await redis_service.get_hash(k)
  143. # app_id = r.get("appId")
  144. # token_id = r.get("tokenId")
  145. # wxid = r.get("wxid")
  146. # status = r.get('status')
  147. # if status == '0':
  148. # logger.warning(f"微信号 {wxid} 已经离线")
  149. # continue
  150. # ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id)
  151. # if ret != 200:
  152. # logger.warning(f"同步微信号 {wxid} 资料失败: {ret}-{msg}")
  153. # continue
  154. # nickname=profile.get("nickName")
  155. # head_img_url=profile.get("smallHeadImgUrl")
  156. # # print(nickname)
  157. # nickname=profile.get("nickName")
  158. # head_img_url=profile.get("smallHeadImgUrl")
  159. # r.update({"nickName":nickname,"headImgUrl":head_img_url,"modify_at":int(time.time())})
  160. # cleaned_login_info = {k: (v if v is not None else '') for k, v in r.items()}
  161. # await redis_service.set_hash(k, cleaned_login_info)
  162. # logger.info(f"定时同步微信号{wxid}-昵称{nickname} 资料成功")
  163. # except Exception as e:
  164. # logger.error(f"任务执行过程中发生异常: {e}")
  165. # loop = asyncio.get_event_loop()
  166. # if loop.is_closed():
  167. # loop = asyncio.new_event_loop()
  168. # asyncio.set_event_loop(loop)
  169. # loop.run_until_complete(task()) # 在现有事件循环中运行任务
  170. @celery_app.task(name='tasks.scheduled_task_sync_wx_info', bind=True, acks_late=True)
  171. def scheduled_task_sync_wx_info(self, redis_config, kafka_config, gewe_config):
  172. '''
  173. 定时获取微信号资料
  174. '''
  175. async def process_key(redis_service, gewe_service, semaphore, key):
  176. async with semaphore: # 使用 Semaphore 控制并发
  177. try:
  178. r = await redis_service.get_hash(key)
  179. app_id = r.get("appId")
  180. token_id = r.get("tokenId")
  181. wxid = r.get("wxid")
  182. status = r.get('status')
  183. if status == '0':
  184. logger.warning(f"微信号 {wxid} 已经离线")
  185. return
  186. ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id)
  187. if ret != 200:
  188. logger.warning(f"同步微信号 {wxid} 资料失败: {ret}-{msg}")
  189. return
  190. nickname = profile.get("nickName")
  191. head_img_url = profile.get("smallHeadImgUrl")
  192. r.update({"nickName": nickname, "headImgUrl": head_img_url, "modify_at": int(time.time())})
  193. cleaned_login_info = {k: (v if v is not None else '') for k, v in r.items()}
  194. await redis_service.set_hash(key, cleaned_login_info)
  195. logger.info(f"定时同步微信号{wxid}-昵称{nickname} 资料成功")
  196. except Exception as e:
  197. logger.error(f"处理键 {key} 时发生异常: {e}")
  198. async def task():
  199. try:
  200. redis_service = RedisService()
  201. await redis_service.init(**redis_config)
  202. gewe_service = await GeWeService.get_instance(redis_service, gewe_config['api_url'])
  203. login_keys = []
  204. async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
  205. login_keys.append(key)
  206. # 设置 Semaphore,限制并发数为 10
  207. semaphore = asyncio.Semaphore(10)
  208. # 使用 asyncio.gather 并发处理所有键
  209. await asyncio.gather(*[process_key(redis_service, gewe_service, semaphore, key) for key in login_keys])
  210. except Exception as e:
  211. logger.error(f"任务执行过程中发生异常: {e}")
  212. loop = asyncio.get_event_loop()
  213. if loop.is_closed():
  214. loop = asyncio.new_event_loop()
  215. asyncio.set_event_loop(loop)
  216. loop.run_until_complete(task()) # 在现有事件循环中运行任务
  217. # @celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms_new', bind=True, acks_late=True)
  218. # def scheduled_task_add_contacts_from_chatrooms_new(self, redis_config, kafka_config, gewe_config):
  219. # '''
  220. # 关于群加好友的请求规则: 每个智能体一次最多30人,间隔2小时做1次,即最多90人/天。
  221. # 加好友规则:每天处理次数、间隔时间(分钟)、每次加好友人数这3个参数都可以设置。目前默认只是上面的设置。
  222. # '''
  223. # async def process_login_key(redis_service:RedisService, gewe_service: GeWeService, kafka_service:KafkaService, k, gewe_config, oneday_add_contacts_total, once_add_contacts_total, semaphore):
  224. # async with semaphore: # 使用 Semaphore 控制并发
  225. # try:
  226. # r = await redis_service.get_hash(k)
  227. # app_id = r.get("appId")
  228. # token_id = r.get("tokenId")
  229. # wxid = r.get("wxid")
  230. # status = r.get('status')
  231. # if status == '0':
  232. # logger.warning(f"微信号 {wxid} 已经离线,群成员不能定时添加")
  233. # return
  234. # config = await gewe_service.get_wxchat_config_from_cache_async(wxid)
  235. # validated_config = AgentConfig.model_validate(config)
  236. # if not validated_config.agentEnabled:
  237. # logger.warning(f"微信号 {wxid} 取消了托管,群成员不能定时添加")
  238. # return
  239. # # 判断是否过于频繁
  240. # is_wx_expection = await gewe_service.get_wx_expection_async(wxid, "addGroupMemberAsFriend")
  241. # if is_wx_expection:
  242. # logger.info(f"{wxid} 本次任务接口addGroupMemberAsFriend异常, {is_wx_expection},本次群好友邀请任务未开始,跳过任务。")
  243. # return
  244. # cache_task_run_time_wxid_logs = await gewe_service.get_task_run_time_by_wxid_async(wxid, 'scheduled_task_add_contacts_from_chatrooms')
  245. # if cache_task_run_time_wxid_logs:
  246. # sorted_tasks = sorted(cache_task_run_time_wxid_logs, key=lambda x: x.get("runTime"), reverse=True)
  247. # last_run_time = sorted_tasks[0].get("runTime")
  248. # if last_run_time > 1e12: # 毫秒级时间戳
  249. # last_run_time = last_run_time / 1000 # 转换为秒
  250. # # 将时间戳转换为 datetime 对象
  251. # last_run_time = datetime.fromtimestamp(last_run_time)
  252. # # 获取当前时间
  253. # current_time = datetime.now()
  254. # # 计算时间差
  255. # time_difference = current_time - last_run_time
  256. # # 判断是否相差2小时
  257. # if time_difference < timedelta(hours=2):
  258. # logger.info(f"{wxid}上次定时群成员定时添好友任务在2小时内,不再执行")
  259. # return
  260. # logger.info(f'{wxid} 定时群成员定时添好友任务开始')
  261. # c:dict = await gewe_service.get_wxchat_config_from_cache_async(wxid)
  262. # contacts = await gewe_service.get_contacts_brief_from_cache_async(wxid)
  263. # contact_wxids = [c.get('userName') for c in contacts]
  264. # chatrooms = c.get('addContactsFromChatroomIdWhiteList', [])
  265. # wixd_add_contacts_from_chatrooms_times = {wxid: 0}
  266. # for chatroom_id in chatrooms:
  267. # logger.info(f'{wxid} 进行{chatroom_id} 群成员定时添好友')
  268. # chatroom = await gewe_service.get_group_info_from_cache_async(wxid, chatroom_id)
  269. # chatroom_member = await gewe_service.get_group_members_from_cache_async(wxid, chatroom_id)
  270. # chatroom_nickname = chatroom.get('nickName')
  271. # chatroom_owner_wxid = chatroom_member.get('chatroomOwner', None)
  272. # admin_wxids = chatroom_member.get('adminWxid', [])
  273. # admin_wxids = chatroom_member.get('adminWxid')
  274. # if admin_wxids is None:
  275. # admin_wxids = [] # 如果 admin_wxids 是 None,将其初始化为空列表
  276. # logger.info(f'{chatroom_nickname} 的群主是 {chatroom_owner_wxid},管理员是{admin_wxids}')
  277. # contact_wxids_set = set(contact_wxids)
  278. # if admin_wxids:
  279. # contact_wxids_set.update(set(admin_wxids))
  280. # if chatroom_owner_wxid is not None:
  281. # contact_wxids_set.add(chatroom_owner_wxid)
  282. # contact_wxids_set.add(wxid)
  283. # unavailable_wixds = await gewe_service.check_wixd_group_add_contacts_history_async(wxid, chatroom_id)
  284. # if unavailable_wixds:
  285. # contact_wxids_set.update(set(unavailable_wixds))
  286. # chatroom_member_list = chatroom.get('memberList', [])
  287. # if chatroom_member_list is None:
  288. # chatroom_member_list = [] # 如果 memberList 是 None,将其初始化为空列表
  289. # elif not isinstance(chatroom_member_list, list):
  290. # chatroom_member_list = list(chatroom_member_list) # 如果 memberList 不是列表,将其转换为列表
  291. # remaining_chatroom_members = [x for x in chatroom_member_list if x.get('wxid') not in contact_wxids_set]
  292. # nickname = next((member['nickName'] for member in chatroom_member_list if member['wxid'] == wxid), None)
  293. # if not remaining_chatroom_members:
  294. # logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname}-{chatroom_id} 群里没有好友可以邀请')
  295. # # 任务状态推送到kafka
  296. # k_message = wx_add_contacts_from_chatroom_task_status_message(wxid, chatroom_id, 2)
  297. # await kafka_service.send_message_async(k_message)
  298. # continue
  299. # logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里还可以邀请的好友有:{[x.get("nickName") for x in remaining_chatroom_members]}')
  300. # for m in remaining_chatroom_members:
  301. # # 判断群已经退出白名单
  302. # wx_config=await gewe_service.get_wxchat_config_from_cache_async(wxid)
  303. # add_contacts_from_chatroomid_list = wx_config.get('addContactsFromChatroomIdWhiteList', [])
  304. # if chatroom_id not in add_contacts_from_chatroomid_list:
  305. # logger.info(f"{wxid} 群{chatroom_id} 已经退出白名单,不再邀请")
  306. # return
  307. # # 判断本次任务是否已经邀请了30个好友
  308. # if wixd_add_contacts_from_chatrooms_times[wxid] == once_add_contacts_total:
  309. # logger.info(f"{wxid} 本次任务已经邀请了{once_add_contacts_total}人,不再邀请")
  310. # # 更新任务运行时间
  311. # cache_task_run_time_wxid_logs.append({"runTime": int(time.time())})
  312. # await gewe_service.save_task_run_time_by_wxid_async(wxid, 'scheduled_task_add_contacts_from_chatrooms', cache_task_run_time_wxid_logs, 3600 * 2)
  313. # logger.info(f'{wxid} 定时群成员定时添好友任务,在本次邀请了{once_add_contacts_total}个情况完成,退出,本次共邀请好友 {wixd_add_contacts_from_chatrooms_times[wxid]} 个')
  314. # return
  315. # # 判断当天群成员是否已经加了90个好友
  316. # is_add_group_times = await gewe_service.is_group_add_contacts_history_one_day_async(wxid, oneday_add_contacts_total)
  317. # if is_add_group_times:
  318. # logger.info(f"当天 {wxid} 所有群的成员已经加了{oneday_add_contacts_total}个好友,不再添加")
  319. # # 更新任务运行时间
  320. # cache_task_run_time_wxid_logs.append({"runTime": int(time.time())})
  321. # await gewe_service.save_task_run_time_by_wxid_async(wxid, 'scheduled_task_add_contacts_from_chatrooms', cache_task_run_time_wxid_logs, 3600 * 2)
  322. # logger.info(f'{wxid} 定时群成员定时添好友任务,在当天邀请了{once_add_contacts_total}个情况完成,退出,本次共邀请好友 {wixd_add_contacts_from_chatrooms_times[wxid]} 个')
  323. # return
  324. # # 判断是否过于频繁
  325. # is_wx_expection = await gewe_service.get_wx_expection_async(wxid, "addGroupMemberAsFriend")
  326. # if is_wx_expection:
  327. # logger.info(f"{wxid} 本次任务接口addGroupMemberAsFriend异常,不再邀请,{is_wx_expection},本次共邀请好友 {wixd_add_contacts_from_chatrooms_times[wxid]} 个")
  328. # return
  329. # contact_wxid = m.get('wxid')
  330. # member_nickname = m.get("nickName")
  331. # group_add_contacts_history = await gewe_service.get_group_add_contacts_history_async(wxid, chatroom_id, contact_wxid)
  332. # if group_add_contacts_history:
  333. # sorted_history = sorted(group_add_contacts_history, key=lambda x: x.addTime, reverse=True)
  334. # # 已经邀请过两次,不再邀请
  335. # if len(sorted_history) == 2:
  336. # logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经邀请过2次,不再邀请')
  337. # continue
  338. # # 同一个群24小时邀请过,不再邀请
  339. # if len(sorted_history) > 0:
  340. # last_add_time = sorted_history[0].addTime
  341. # def is_add_time_more_than_one_day(addTime: int) -> bool:
  342. # """
  343. # 判断 addTime 是否与当前时间相隔大于 3600 × 24 秒
  344. # :param addTime: Unix 时间戳
  345. # :return: 如果 addTime 与当前时间相隔大于 3600 × 24 秒,返回 True;否则返回 False
  346. # """
  347. # # 获取当前时间的时间戳
  348. # current_time = time.time()
  349. # # 计算时间戳差值
  350. # time_difference = abs(current_time - addTime)
  351. # # 检查是否大于 3600 × 24 秒
  352. # return time_difference > 3600 * 24
  353. # is_more_than_one_day = is_add_time_more_than_one_day(last_add_time)
  354. # if not is_more_than_one_day:
  355. # logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经24小时邀请,不再邀请')
  356. # continue
  357. # ret, msg, data = await gewe_service.add_group_member_as_friend_async(token_id, app_id, chatroom_id, m.get('wxid'),
  358. # f'我是群聊"{chatroom_nickname}"群的{nickname}')
  359. # history = AddGroupContactsHistory.model_validate({
  360. # "chatroomId": chatroom_id,
  361. # "wxid": wxid,
  362. # "contactWixd": contact_wxid,
  363. # "addTime": int(time.time())
  364. # })
  365. # if ret != 200:
  366. # logger.warning(f'{nickname}-{wxid} 在 {chatroom_nickname}-{chatroom_id} 群好友 {member_nickname}-{contact_wxid} 邀请失败原因:{ret} {msg} {data}')
  367. # if data and '操作过于频繁' in data.get('msg', ''):
  368. # await gewe_service.save_wx_expection_async(wxid, "addGroupMemberAsFriend", msg, today_seconds_remaining())
  369. # await gewe_service.save_group_add_contacts_history_async(wxid, chatroom_id, contact_wxid, history)
  370. # wixd_add_contacts_from_chatrooms_times[wxid] += 1
  371. # logger.warning(f'{nickname}-{wxid} 在 {chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 操作过于频繁,本次群好友邀请任务未完成跳过,本次共邀请好友 {wixd_add_contacts_from_chatrooms_times[wxid]} 个。当天不再处理该号群好友邀请任务')
  372. # return
  373. # await gewe_service.save_group_add_contacts_history_async(wxid, chatroom_id, contact_wxid, history)
  374. # wixd_add_contacts_from_chatrooms_times[wxid] += 1
  375. # logger.info(f'{nickname} 向 {chatroom_nickname}-{chatroom_id} 群的 {m.get("nickName")}-{m.get("wxid")} 发送好友邀请 {msg} \n {data} \n 当前已邀请好友数:{wixd_add_contacts_from_chatrooms_times[wxid]}')
  376. # # 推送到kafka
  377. # k_message = wx_add_contacts_from_chatroom_message(history.wxid, history.chatroomId, history.contactWixd, history.addTime)
  378. # await kafka_service.send_message_async(k_message)
  379. # # await asyncio.sleep(random.uniform(1.5, 3))
  380. # # await asyncio.sleep(random.uniform(30, 60))
  381. # await asyncio.sleep(random.uniform(270,300))
  382. # # 任务状态推送到kafka
  383. # task_status = await gewe_service.wx_add_contacts_from_chatroom_task_status_async(wxid, chatroom_id)
  384. # k_message=wx_add_contacts_from_chatroom_task_status_message(wxid, chatroom_id, task_status)
  385. # await kafka_service.send_message_async(k_message)
  386. # # 下一个群
  387. # await asyncio.sleep(random.uniform(1.5, 3))
  388. # # 更新任务运行时间
  389. # cache_task_run_time_wxid_logs.append({"runTime": int(time.time())})
  390. # await gewe_service.save_task_run_time_by_wxid_async(wxid, 'scheduled_task_add_contacts_from_chatrooms', cache_task_run_time_wxid_logs, 3600 * 2)
  391. # logger.info(f'{wxid} 定时群成员定时添好友任务完成,本次共邀请好友{wixd_add_contacts_from_chatrooms_times[wxid]}个')
  392. # except Exception as e:
  393. # # 获取当前的堆栈跟踪
  394. # tb = sys.exc_info()[2]
  395. # # 为异常附加堆栈跟踪
  396. # e = e.with_traceback(tb)
  397. # # 输出详细的错误信息
  398. # logger.error(f"任务执行过程中发生异常: {e}\n异常类型: {type(e).__name__}\n异常信息: {str(e)}\n堆栈跟踪: {traceback.format_exc()}")
  399. # async def task():
  400. # try:
  401. # now = datetime.now()
  402. # # if 10> now.hour < 8:
  403. # # logger.info(f"定时群成员定时添好友任务不启动,当前时间为 {now.strftime('%Y-%m-%d %H:%M:%S')},早于8点")
  404. # # return
  405. # if now.hour < 8 or now.hour > 22:
  406. # logger.info(f"定时群成员定时添好友任务不启动, 当前时间为 {now.strftime('%Y-%m-%d %H:%M:%S')},不在8点到22点之间")
  407. # return
  408. # logger.info('定时群成员定时添好友任务开始')
  409. # redis_service = RedisService()
  410. # await redis_service.init(**redis_config)
  411. # gewe_service = await GeWeService.get_instance(redis_service, gewe_config['api_url'])
  412. # KAFKA_BOOTSTRAP_SERVERS = kafka_config['bootstrap_servers']
  413. # KAFKA_TOPIC = kafka_config['topic']
  414. # KAFKA_GROUP_ID = kafka_config['group_id']
  415. # kafka_service = KafkaService(KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC, KAFKA_TOPIC, KAFKA_GROUP_ID)
  416. # await kafka_service.start_producer()
  417. # global_config = await gewe_service.get_global_config_from_cache_async()
  418. # scheduled_task_add_contacts_from_chatrooms_config = global_config.get('scheduledTaskAddContactsFromChatrooms', {})
  419. # oneday_add_contacts_total = 90
  420. # once_add_contacts_total = 30
  421. # # oneday_times=3
  422. # if scheduled_task_add_contacts_from_chatrooms_config:
  423. # oneday_add_contacts_total = scheduled_task_add_contacts_from_chatrooms_config.get('oneDayAddContactsTotal', 90)
  424. # once_add_contacts_total = scheduled_task_add_contacts_from_chatrooms_config.get('onceAddContactsTotal', 30)
  425. # # oneday_times=scheduled_task_add_contacts_from_chatrooms_config.get('oneDayTimes',3)
  426. # login_keys = []
  427. # async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
  428. # login_keys.append(key)
  429. # # 设置 Semaphore,限制并发任务数量
  430. # semaphore = asyncio.Semaphore(10) # 例如,限制同时运行的任务数量为 10
  431. # # 使用 asyncio.gather 并发处理每个 login_key
  432. # await asyncio.gather(*[process_login_key(redis_service, gewe_service, kafka_service, k, gewe_config, oneday_add_contacts_total, once_add_contacts_total, semaphore) for k in login_keys])
  433. # except Exception as e:
  434. # # 获取当前的堆栈跟踪
  435. # tb = sys.exc_info()[2]
  436. # # 为异常附加堆栈跟踪
  437. # e = e.with_traceback(tb)
  438. # # 输出详细的错误信息
  439. # logger.error(f"任务执行过程中发生异常: {e}\n异常类型: {type(e).__name__}\n异常信息: {str(e)}\n堆栈跟踪: {traceback.format_exc()}")
  440. # finally:
  441. # await kafka_service.stop_producer()
  442. # loop = asyncio.get_event_loop()
  443. # if loop.is_closed():
  444. # loop = asyncio.new_event_loop()
  445. # asyncio.set_event_loop(loop)
  446. # loop.run_until_complete(task()) # 在现有事件循环中运行任务
  447. # 📦 获取群资料与成员信息
  448. async def get_chatroom_info(gewe_service: GeWeService, wxid, chatroom_id):
  449. chatroom = await gewe_service.get_group_info_from_cache_async(wxid, chatroom_id)
  450. chatroom_member = await gewe_service.get_group_members_from_cache_async(wxid, chatroom_id)
  451. chatroom_nickname = chatroom.get('nickName')
  452. chatroom_owner_wxid = chatroom_member.get('chatroomOwner', None)
  453. admin_wxids = chatroom_member.get('adminWxid') or []
  454. chatroom_member_list = chatroom.get('memberList', [])
  455. if chatroom_member_list is None:
  456. chatroom_member_list = [] # 如果 memberList 是 None,将其初始化为空列表
  457. elif not isinstance(chatroom_member_list, list):
  458. chatroom_member_list = list(chatroom_member_list) # 如果 memberList 不是列表,将其转换为列表
  459. return chatroom, chatroom_member, chatroom_nickname, chatroom_owner_wxid, admin_wxids,chatroom_member_list
  460. async def todo_add_contacts_wxids_async(gewe_service: GeWeService, chatrooms, wxid, contact_wxids) -> list:
  461. remaining_wxids = set()
  462. for chatroom_id in chatrooms:
  463. # 🎯 获取群信息
  464. chatroom, chatroom_member, chatroom_nickname, chatroom_owner_wxid, admin_wxids, chatroom_member_list = await get_chatroom_info(
  465. gewe_service, wxid, chatroom_id
  466. )
  467. contact_wxids_set = set(contact_wxids)
  468. if admin_wxids:
  469. contact_wxids_set.update(admin_wxids)
  470. if chatroom_owner_wxid:
  471. contact_wxids_set.add(chatroom_owner_wxid)
  472. contact_wxids_set.add(wxid)
  473. unavailable_wixds = await gewe_service.check_wixd_group_add_contacts_history_async(wxid, chatroom_id)
  474. if unavailable_wixds:
  475. contact_wxids_set.update(unavailable_wixds)
  476. for member in chatroom_member_list:
  477. member_wxid = member.get('wxid')
  478. if member_wxid and member_wxid not in contact_wxids_set:
  479. remaining_wxids.add(member_wxid)
  480. return list(remaining_wxids)
  481. def is_add_time_more_than_one_day(addTime: int) -> bool:
  482. """
  483. 判断 addTime 是否与当前时间相隔大于 3600 × 24 秒
  484. :param addTime: Unix 时间戳
  485. :return: 如果 addTime 与当前时间相隔大于 3600 × 24 秒,返回 True;否则返回 False
  486. """
  487. # 获取当前时间的时间戳
  488. current_time = time.time()
  489. # 计算时间戳差值
  490. time_difference = abs(current_time - addTime)
  491. # 检查是否大于 3600 × 24 秒
  492. return time_difference > 3600 * 24
  493. @celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms', bind=True, acks_late=True)
  494. def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config, gewe_config):
  495. '''
  496. 关于群加好友的请求规则: 每个智能体一次最多30人,间隔2小时做1次,即最多90人/天。
  497. 加好友规则:每天处理次数、间隔时间(分钟)、每次加好友人数这3个参数都可以设置。目前默认只是上面的设置。
  498. '''
  499. async def process_login_key(redis_service:RedisService, gewe_service: GeWeService, kafka_service:KafkaService, k, gewe_config, oneday_add_contacts_total, once_add_contacts_total, semaphore):
  500. async with semaphore: # 使用 Semaphore 控制并发
  501. try:
  502. r = await redis_service.get_hash(k)
  503. app_id = r.get("appId")
  504. token_id = r.get("tokenId")
  505. wxid = r.get("wxid")
  506. status = r.get('status')
  507. if status == '0':
  508. logger.warning(f"微信号 {wxid} 已经离线,群成员不能定时添加")
  509. return
  510. config = await gewe_service.get_wxchat_config_from_cache_async(wxid)
  511. validated_config = AgentConfig.model_validate(config)
  512. if not validated_config.agentEnabled:
  513. logger.warning(f"微信号 {wxid} 取消了托管,群成员不能定时添加")
  514. return
  515. # 判断是否过于频繁
  516. is_wx_expection = await gewe_service.get_wx_expection_async(wxid, "addGroupMemberAsFriend")
  517. if is_wx_expection:
  518. logger.info(f"{wxid} 本次任务接口addGroupMemberAsFriend异常, {is_wx_expection},本次群好友邀请任务未开始,跳过任务。")
  519. return
  520. cache_task_run_time_wxid_logs = await gewe_service.get_task_run_time_by_wxid_async(wxid, 'scheduled_task_add_contacts_from_chatrooms')
  521. if cache_task_run_time_wxid_logs:
  522. sorted_tasks = sorted(cache_task_run_time_wxid_logs, key=lambda x: x.get("runTime"), reverse=True)
  523. last_run_time = sorted_tasks[0].get("runTime")
  524. if last_run_time > 1e12: # 毫秒级时间戳
  525. last_run_time = last_run_time / 1000 # 转换为秒
  526. # 将时间戳转换为 datetime 对象
  527. last_run_time = datetime.fromtimestamp(last_run_time)
  528. # 获取当前时间
  529. current_time = datetime.now()
  530. # 计算时间差
  531. time_difference = current_time - last_run_time
  532. # 判断是否相差2小时
  533. if time_difference < timedelta(hours=2):
  534. logger.info(f"{wxid}上次定时群成员定时添好友任务在2小时内,不再执行")
  535. return
  536. # cache_task_run_time_wxid_logs.append({"runTime": int(time.time())})
  537. # await gewe_service.save_task_run_time_by_wxid_async(wxid, 'scheduled_task_add_contacts_from_chatrooms', cache_task_run_time_wxid_logs, 3600 * 2)
  538. c:dict = await gewe_service.get_wxchat_config_from_cache_async(wxid)
  539. contacts = await gewe_service.get_contacts_brief_from_cache_async(wxid)
  540. contact_wxids = [c.get('userName') for c in contacts]
  541. chatrooms = c.get('addContactsFromChatroomIdWhiteList', [])
  542. # async def todo_add_contacts_wxids_async(chatrooms)->list:
  543. # remaining_chatroom_members=[]
  544. # for chatroom_id in chatrooms:
  545. # chatroom = await gewe_service.get_group_info_from_cache_async(wxid, chatroom_id)
  546. # chatroom_member = await gewe_service.get_group_members_from_cache_async(wxid, chatroom_id)
  547. # chatroom_nickname = chatroom.get('nickName')
  548. # chatroom_owner_wxid = chatroom_member.get('chatroomOwner', None)
  549. # admin_wxids = chatroom_member.get('adminWxid', [])
  550. # admin_wxids = chatroom_member.get('adminWxid')
  551. # if admin_wxids is None:
  552. # admin_wxids = [] # 如果 admin_wxids 是 None,将其初始化为空列表
  553. # chatroom_member_list = chatroom.get('memberList', [])
  554. # if chatroom_member_list is None:
  555. # chatroom_member_list = [] # 如果 memberList 是 None,将其初始化为空列表
  556. # elif not isinstance(chatroom_member_list, list):
  557. # chatroom_member_list = list(chatroom_member_list) # 如果 memberList 不是列表,将其转换为列表
  558. # logger.info(f'{chatroom_nickname} 的群主是 {chatroom_owner_wxid},管理员是{admin_wxids}')
  559. # contact_wxids_set = set(contact_wxids)
  560. # if admin_wxids:
  561. # contact_wxids_set.update(set(admin_wxids))
  562. # if chatroom_owner_wxid is not None:
  563. # contact_wxids_set.add(chatroom_owner_wxid)
  564. # contact_wxids_set.add(wxid)
  565. # unavailable_wixds = await gewe_service.check_wixd_group_add_contacts_history_async(wxid, chatroom_id)
  566. # if unavailable_wixds:
  567. # contact_wxids_set.update(set(unavailable_wixds))
  568. # remaining_chatroom_members.append([x for x in chatroom_member_list if x.get('wxid') not in contact_wxids_set])
  569. # return list(set(remaining_chatroom_members))
  570. # todo_add_contacts =await todo_add_contacts_wxids_async(chatrooms)
  571. todo_add_contacts =await todo_add_contacts_wxids_async(gewe_service,chatrooms, wxid,contact_wxids)
  572. if not todo_add_contacts:
  573. logger.info(f'{wxid} 没有符合条件的群待加好友')
  574. return
  575. cache_task_run_time_wxid_logs.append({"runTime": int(time.time())})
  576. await gewe_service.save_task_run_time_by_wxid_async(wxid, 'scheduled_task_add_contacts_from_chatrooms', cache_task_run_time_wxid_logs, 3600 * 2)
  577. logger.info(f'{wxid} 定时群成员定时添好友任务开始')
  578. wixd_add_contacts_from_chatrooms_times = {wxid: 0}
  579. for chatroom_id in chatrooms:
  580. # chatroom = await gewe_service.get_group_info_from_cache_async(wxid, chatroom_id)
  581. # chatroom_member = await gewe_service.get_group_members_from_cache_async(wxid, chatroom_id)
  582. # chatroom_nickname = chatroom.get('nickName')
  583. # chatroom_owner_wxid = chatroom_member.get('chatroomOwner', None)
  584. # admin_wxids = chatroom_member.get('adminWxid', [])
  585. # admin_wxids = chatroom_member.get('adminWxid')
  586. # if admin_wxids is None:
  587. # admin_wxids = [] # 如果 admin_wxids 是 None,将其初始化为空列表
  588. # chatroom_member_list = chatroom.get('memberList', [])
  589. # if chatroom_member_list is None:
  590. # chatroom_member_list = [] # 如果 memberList 是 None,将其初始化为空列表
  591. # elif not isinstance(chatroom_member_list, list):
  592. # chatroom_member_list = list(chatroom_member_list) # 如果 memberList 不是列表,将其转换为列表
  593. chatroom, chatroom_member, chatroom_nickname, chatroom_owner_wxid, admin_wxids,chatroom_member_list = await get_chatroom_info(
  594. gewe_service, wxid, chatroom_id
  595. )
  596. logger.info(f'{chatroom_nickname} 的群主是 {chatroom_owner_wxid},管理员是{admin_wxids}')
  597. contact_wxids_set = set(contact_wxids)
  598. if admin_wxids:
  599. contact_wxids_set.update(set(admin_wxids))
  600. if chatroom_owner_wxid is not None:
  601. contact_wxids_set.add(chatroom_owner_wxid)
  602. contact_wxids_set.add(wxid)
  603. unavailable_wixds = await gewe_service.check_wixd_group_add_contacts_history_async(wxid, chatroom_id)
  604. if unavailable_wixds:
  605. contact_wxids_set.update(set(unavailable_wixds))
  606. #remaining_chatroom_members = [x for x in chatroom_member_list if x.get('wxid') not in contact_wxids_set]
  607. # 倒序
  608. remaining_chatroom_members = [x for x in reversed(chatroom_member_list) if x.get('wxid') not in contact_wxids_set]
  609. nickname = next((member['nickName'] for member in chatroom_member_list if member['wxid'] == wxid), None)
  610. if not remaining_chatroom_members:
  611. logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname}-{chatroom_id} 群里没有好友可以邀请')
  612. # 任务状态推送到kafka
  613. k_message = wx_add_contacts_from_chatroom_task_status_message(wxid, chatroom_id, 2)
  614. await kafka_service.send_message_async(k_message)
  615. continue
  616. logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里还可以邀请的好友有:{[x.get("nickName") for x in remaining_chatroom_members]}')
  617. for m in remaining_chatroom_members:
  618. # 判断本次任务是否已经邀请了30个好友
  619. if wixd_add_contacts_from_chatrooms_times[wxid] == once_add_contacts_total:
  620. logger.info(f"{wxid} 本次任务已经邀请了{once_add_contacts_total}人,不再邀请")
  621. return
  622. # 判断当天群成员是否已经加了90个好友
  623. is_add_group_times = await gewe_service.is_group_add_contacts_history_one_day_async(wxid, oneday_add_contacts_total)
  624. if is_add_group_times:
  625. logger.info(f"当天 {wxid} 所有群的成员已经加了{oneday_add_contacts_total}个好友,不再添加")
  626. return
  627. # 判断是否过于频繁
  628. is_wx_expection = await gewe_service.get_wx_expection_async(wxid, "addGroupMemberAsFriend")
  629. if is_wx_expection:
  630. logger.info(f"{wxid} 本次任务接口addGroupMemberAsFriend异常,不再邀请,{is_wx_expection}")
  631. return
  632. contact_wxid = m.get('wxid')
  633. member_nickname = m.get("nickName")
  634. group_add_contacts_history = await gewe_service.get_group_add_contacts_history_async(wxid, chatroom_id, contact_wxid)
  635. if group_add_contacts_history:
  636. sorted_history = sorted(group_add_contacts_history, key=lambda x: x.addTime, reverse=True)
  637. # 已经邀请过两次,不再邀请
  638. if len(sorted_history) == 2:
  639. logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经邀请过2次,不再邀请')
  640. continue
  641. # 24小时邀请过,不再邀请
  642. if len(sorted_history) > 0:
  643. last_add_time = sorted_history[0].addTime
  644. # def is_add_time_more_than_one_day(addTime: int) -> bool:
  645. # """
  646. # 判断 addTime 是否与当前时间相隔大于 3600 × 24 秒
  647. # :param addTime: Unix 时间戳
  648. # :return: 如果 addTime 与当前时间相隔大于 3600 × 24 秒,返回 True;否则返回 False
  649. # """
  650. # # 获取当前时间的时间戳
  651. # current_time = time.time()
  652. # # 计算时间戳差值
  653. # time_difference = abs(current_time - addTime)
  654. # # 检查是否大于 3600 × 24 秒
  655. # return time_difference > 3600 * 24
  656. is_more_than_one_day = is_add_time_more_than_one_day(last_add_time)
  657. if not is_more_than_one_day:
  658. logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经24小时邀请,不再邀请')
  659. continue
  660. ret, msg, data = await gewe_service.add_group_member_as_friend_async(token_id, app_id, chatroom_id, m.get('wxid'),
  661. f'我是群聊"{chatroom_nickname}"群的{nickname}')
  662. history = AddGroupContactsHistory.model_validate({
  663. "chatroomId": chatroom_id,
  664. "wxid": wxid,
  665. "contactWixd": contact_wxid,
  666. "addTime": int(time.time())
  667. })
  668. if ret != 200:
  669. logger.warning(f'{nickname}-{wxid} 在 {chatroom_nickname}-{chatroom_id} 群好友 {member_nickname}-{contact_wxid} 邀请失败原因:{ret} {msg} {data}')
  670. if data and '操作过于频繁' in data.get('msg', ''):
  671. await gewe_service.save_wx_expection_async(wxid, "addGroupMemberAsFriend", msg, today_seconds_remaining())
  672. await gewe_service.save_group_add_contacts_history_async(wxid, chatroom_id, contact_wxid, history)
  673. logger.warning(f'{nickname}-{wxid} 在 {chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 操作过于频繁,本次群好友邀请任务未完成跳过。当天不再处理该号群好友邀请任务')
  674. return
  675. await gewe_service.save_group_add_contacts_history_async(wxid, chatroom_id, contact_wxid, history)
  676. wixd_add_contacts_from_chatrooms_times[wxid] += 1
  677. logger.info(f'{nickname} 向 {chatroom_nickname}-{chatroom_id} 群的 {m.get("nickName")}-{m.get("wxid")} 发送好友邀请 {msg} \n {data} \n 当前已邀请好友数:{wixd_add_contacts_from_chatrooms_times[wxid]}')
  678. # 推送到kafka
  679. k_message = wx_add_contacts_from_chatroom_message(history.wxid, history.chatroomId, history.contactWixd, history.addTime)
  680. await kafka_service.send_message_async(k_message)
  681. # await asyncio.sleep(random.uniform(1.5, 3))
  682. # await asyncio.sleep(random.uniform(30, 60))
  683. delay=random.uniform(270,300)
  684. await asyncio.sleep(delay)
  685. # 任务状态推送到kafka
  686. task_status = await gewe_service.wx_add_contacts_from_chatroom_task_status_async(wxid, chatroom_id)
  687. k_message=wx_add_contacts_from_chatroom_task_status_message(wxid, chatroom_id, task_status)
  688. await kafka_service.send_message_async(k_message)
  689. # 下一个群
  690. await asyncio.sleep(random.uniform(1.5, 3))
  691. logger.info(f'{wxid} 定时群成员定时添好友任务完成,本次共邀请好友{wixd_add_contacts_from_chatrooms_times[wxid]}个')
  692. except Exception as e:
  693. # 获取当前的堆栈跟踪
  694. tb = sys.exc_info()[2]
  695. # 为异常附加堆栈跟踪
  696. e = e.with_traceback(tb)
  697. # 输出详细的错误信息
  698. logger.error(f"任务执行过程中发生异常: {e}\n异常类型: {type(e).__name__}\n异常信息: {str(e)}\n堆栈跟踪: {traceback.format_exc()}")
  699. async def task():
  700. try:
  701. now = datetime.now()
  702. # if 10> now.hour < 8:
  703. # logger.info(f"定时群成员定时添好友任务不启动,当前时间为 {now.strftime('%Y-%m-%d %H:%M:%S')},早于8点")
  704. # return
  705. if now.hour < 8 or now.hour > 22:
  706. logger.info(f"定时群成员定时添好友任务不启动, 当前时间为 {now.strftime('%Y-%m-%d %H:%M:%S')},不在8点到22点之间")
  707. return
  708. logger.info('定时群成员定时添好友任务开始')
  709. redis_service = RedisService()
  710. await redis_service.init(**redis_config)
  711. gewe_service = await GeWeService.get_instance(redis_service, gewe_config['api_url'])
  712. KAFKA_BOOTSTRAP_SERVERS = kafka_config['bootstrap_servers']
  713. KAFKA_TOPIC = kafka_config['topic']
  714. KAFKA_GROUP_ID = kafka_config['group_id']
  715. kafka_service = KafkaService(KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC, KAFKA_TOPIC, KAFKA_GROUP_ID)
  716. await kafka_service.start_producer()
  717. global_config = await gewe_service.get_global_config_from_cache_async()
  718. scheduled_task_add_contacts_from_chatrooms_config = global_config.get('scheduledTaskAddContactsFromChatrooms', {})
  719. oneday_add_contacts_total = 90
  720. once_add_contacts_total = 30
  721. # oneday_times=3
  722. if scheduled_task_add_contacts_from_chatrooms_config:
  723. oneday_add_contacts_total = scheduled_task_add_contacts_from_chatrooms_config.get('oneDayAddContactsTotal', 90)
  724. once_add_contacts_total = scheduled_task_add_contacts_from_chatrooms_config.get('onceAddContactsTotal', 30)
  725. # oneday_times=scheduled_task_add_contacts_from_chatrooms_config.get('oneDayTimes',3)
  726. login_keys = []
  727. async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
  728. login_keys.append(key)
  729. # 设置 Semaphore,限制并发任务数量
  730. semaphore = asyncio.Semaphore(5) # 限制同时运行的任务数量
  731. # 使用 asyncio.gather 并发处理每个 login_key
  732. await asyncio.gather(*[process_login_key(redis_service, gewe_service, kafka_service, k, gewe_config, oneday_add_contacts_total, once_add_contacts_total, semaphore) for k in login_keys])
  733. except Exception as e:
  734. # 获取当前的堆栈跟踪
  735. tb = sys.exc_info()[2]
  736. # 为异常附加堆栈跟踪
  737. e = e.with_traceback(tb)
  738. # 输出详细的错误信息
  739. logger.error(f"任务执行过程中发生异常: {e}\n异常类型: {type(e).__name__}\n异常信息: {str(e)}\n堆栈跟踪: {traceback.format_exc()}")
  740. finally:
  741. await kafka_service.stop_producer()
  742. loop = asyncio.get_event_loop()
  743. if loop.is_closed():
  744. loop = asyncio.new_event_loop()
  745. asyncio.set_event_loop(loop)
  746. loop.run_until_complete(task()) # 在现有事件循环中运行任务
  747. # @celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms_bk', bind=True, acks_late=True)
  748. # def scheduled_task_add_contacts_from_chatrooms_bk(self, redis_config, kafka_config, gewe_config):
  749. # '''
  750. # 关于群加好友的请求规则: 每个智能体一次最多30人,间隔2小时做1次,即最多90人/天。
  751. # 加好友规则:每天处理次数、间隔时间(分钟)、每次加好友人数这3个参数都可以设置。目前默认只是上面的设置。
  752. # '''
  753. # async def process_login_key(redis_service:RedisService, gewe_service: GeWeService, kafka_service:KafkaService, k, gewe_config, oneday_add_contacts_total, once_add_contacts_total, semaphore):
  754. # async with semaphore: # 使用 Semaphore 控制并发
  755. # try:
  756. # r = await redis_service.get_hash(k)
  757. # app_id = r.get("appId")
  758. # token_id = r.get("tokenId")
  759. # wxid = r.get("wxid")
  760. # status = r.get('status')
  761. # if status == '0':
  762. # logger.warning(f"微信号 {wxid} 已经离线,群成员不能定时添加")
  763. # return
  764. # config = await gewe_service.get_wxchat_config_from_cache_async(wxid)
  765. # validated_config = AgentConfig.model_validate(config)
  766. # if not validated_config.agentEnabled:
  767. # logger.warning(f"微信号 {wxid} 取消了托管,群成员不能定时添加")
  768. # return
  769. # # 判断是否过于频繁
  770. # is_wx_expection = await gewe_service.get_wx_expection_async(wxid, "addGroupMemberAsFriend")
  771. # if is_wx_expection:
  772. # logger.info(f"{wxid} 本次任务接口addGroupMemberAsFriend异常, {is_wx_expection},本次群好友邀请任务未开始,跳过任务。")
  773. # return
  774. # cache_task_run_time_wxid_logs = await gewe_service.get_task_run_time_by_wxid_async(wxid, 'scheduled_task_add_contacts_from_chatrooms')
  775. # if cache_task_run_time_wxid_logs:
  776. # sorted_tasks = sorted(cache_task_run_time_wxid_logs, key=lambda x: x.get("runTime"), reverse=True)
  777. # last_run_time = sorted_tasks[0].get("runTime")
  778. # if last_run_time > 1e12: # 毫秒级时间戳
  779. # last_run_time = last_run_time / 1000 # 转换为秒
  780. # # 将时间戳转换为 datetime 对象
  781. # last_run_time = datetime.fromtimestamp(last_run_time)
  782. # # 获取当前时间
  783. # current_time = datetime.now()
  784. # # 计算时间差
  785. # time_difference = current_time - last_run_time
  786. # # 判断是否相差2小时
  787. # if time_difference < timedelta(hours=2):
  788. # logger.info(f"{wxid}上次定时群成员定时添好友任务在2小时内,不再执行")
  789. # return
  790. # cache_task_run_time_wxid_logs.append({"runTime": int(time.time())})
  791. # await gewe_service.save_task_run_time_by_wxid_async(wxid, 'scheduled_task_add_contacts_from_chatrooms', cache_task_run_time_wxid_logs, 3600 * 2)
  792. # c:dict = await gewe_service.get_wxchat_config_from_cache_async(wxid)
  793. # contacts = await gewe_service.get_contacts_brief_from_cache_async(wxid)
  794. # contact_wxids = [c.get('userName') for c in contacts]
  795. # chatrooms = c.get('addContactsFromChatroomIdWhiteList', [])
  796. # logger.info(f'{wxid} 定时群成员定时添好友任务开始')
  797. # wixd_add_contacts_from_chatrooms_times = {wxid: 0}
  798. # for chatroom_id in chatrooms:
  799. # chatroom = await gewe_service.get_group_info_from_cache_async(wxid, chatroom_id)
  800. # chatroom_member = await gewe_service.get_group_members_from_cache_async(wxid, chatroom_id)
  801. # chatroom_nickname = chatroom.get('nickName')
  802. # chatroom_owner_wxid = chatroom_member.get('chatroomOwner', None)
  803. # admin_wxids = chatroom_member.get('adminWxid', [])
  804. # admin_wxids = chatroom_member.get('adminWxid')
  805. # if admin_wxids is None:
  806. # admin_wxids = [] # 如果 admin_wxids 是 None,将其初始化为空列表
  807. # logger.info(f'{chatroom_nickname} 的群主是 {chatroom_owner_wxid},管理员是{admin_wxids}')
  808. # contact_wxids_set = set(contact_wxids)
  809. # if admin_wxids:
  810. # contact_wxids_set.update(set(admin_wxids))
  811. # if chatroom_owner_wxid is not None:
  812. # contact_wxids_set.add(chatroom_owner_wxid)
  813. # contact_wxids_set.add(wxid)
  814. # unavailable_wixds = await gewe_service.check_wixd_group_add_contacts_history_async(wxid, chatroom_id)
  815. # if unavailable_wixds:
  816. # contact_wxids_set.update(set(unavailable_wixds))
  817. # chatroom_member_list = chatroom.get('memberList', [])
  818. # if chatroom_member_list is None:
  819. # chatroom_member_list = [] # 如果 memberList 是 None,将其初始化为空列表
  820. # elif not isinstance(chatroom_member_list, list):
  821. # chatroom_member_list = list(chatroom_member_list) # 如果 memberList 不是列表,将其转换为列表
  822. # remaining_chatroom_members = [x for x in chatroom_member_list if x.get('wxid') not in contact_wxids_set]
  823. # nickname = next((member['nickName'] for member in chatroom_member_list if member['wxid'] == wxid), None)
  824. # if not remaining_chatroom_members:
  825. # logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname}-{chatroom_id} 群里没有好友可以邀请')
  826. # # 任务状态推送到kafka
  827. # k_message = wx_add_contacts_from_chatroom_task_status_message(wxid, chatroom_id, 2)
  828. # await kafka_service.send_message_async(k_message)
  829. # continue
  830. # logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里还可以邀请的好友有:{[x.get("nickName") for x in remaining_chatroom_members]}')
  831. # for m in remaining_chatroom_members:
  832. # # 判断本次任务是否已经邀请了30个好友
  833. # if wixd_add_contacts_from_chatrooms_times[wxid] == once_add_contacts_total:
  834. # logger.info(f"{wxid} 本次任务已经邀请了{once_add_contacts_total}人,不再邀请")
  835. # return
  836. # # 判断当天群成员是否已经加了90个好友
  837. # is_add_group_times = await gewe_service.is_group_add_contacts_history_one_day_async(wxid, oneday_add_contacts_total)
  838. # if is_add_group_times:
  839. # logger.info(f"当天 {wxid} 所有群的成员已经加了{oneday_add_contacts_total}个好友,不再添加")
  840. # return
  841. # # 判断是否过于频繁
  842. # is_wx_expection = await gewe_service.get_wx_expection_async(wxid, "addGroupMemberAsFriend")
  843. # if is_wx_expection:
  844. # logger.info(f"{wxid} 本次任务接口addGroupMemberAsFriend异常,不再邀请,{is_wx_expection}")
  845. # return
  846. # contact_wxid = m.get('wxid')
  847. # member_nickname = m.get("nickName")
  848. # group_add_contacts_history = await gewe_service.get_group_add_contacts_history_async(wxid, chatroom_id, contact_wxid)
  849. # if group_add_contacts_history:
  850. # sorted_history = sorted(group_add_contacts_history, key=lambda x: x.addTime, reverse=True)
  851. # # 已经邀请过两次,不再邀请
  852. # if len(sorted_history) == 2:
  853. # logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经邀请过2次,不再邀请')
  854. # continue
  855. # # 24小时邀请过,不再邀请
  856. # if len(sorted_history) > 0:
  857. # last_add_time = sorted_history[0].addTime
  858. # def is_add_time_more_than_one_day(addTime: int) -> bool:
  859. # """
  860. # 判断 addTime 是否与当前时间相隔大于 3600 × 24 秒
  861. # :param addTime: Unix 时间戳
  862. # :return: 如果 addTime 与当前时间相隔大于 3600 × 24 秒,返回 True;否则返回 False
  863. # """
  864. # # 获取当前时间的时间戳
  865. # current_time = time.time()
  866. # # 计算时间戳差值
  867. # time_difference = abs(current_time - addTime)
  868. # # 检查是否大于 3600 × 24 秒
  869. # return time_difference > 3600 * 24
  870. # is_more_than_one_day = is_add_time_more_than_one_day(last_add_time)
  871. # if not is_more_than_one_day:
  872. # logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经24小时邀请,不再邀请')
  873. # continue
  874. # ret, msg, data = await gewe_service.add_group_member_as_friend_async(token_id, app_id, chatroom_id, m.get('wxid'),
  875. # f'我是群聊"{chatroom_nickname}"群的{nickname}')
  876. # history = AddGroupContactsHistory.model_validate({
  877. # "chatroomId": chatroom_id,
  878. # "wxid": wxid,
  879. # "contactWixd": contact_wxid,
  880. # "addTime": int(time.time())
  881. # })
  882. # if ret != 200:
  883. # logger.warning(f'{nickname}-{wxid} 在 {chatroom_nickname}-{chatroom_id} 群好友 {member_nickname}-{contact_wxid} 邀请失败原因:{ret} {msg} {data}')
  884. # if data and '操作过于频繁' in data.get('msg', ''):
  885. # await gewe_service.save_wx_expection_async(wxid, "addGroupMemberAsFriend", msg, today_seconds_remaining())
  886. # await gewe_service.save_group_add_contacts_history_async(wxid, chatroom_id, contact_wxid, history)
  887. # logger.warning(f'{nickname}-{wxid} 在 {chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 操作过于频繁,本次群好友邀请任务未完成跳过。当天不再处理该号群好友邀请任务')
  888. # return
  889. # await gewe_service.save_group_add_contacts_history_async(wxid, chatroom_id, contact_wxid, history)
  890. # wixd_add_contacts_from_chatrooms_times[wxid] += 1
  891. # logger.info(f'{nickname} 向 {chatroom_nickname}-{chatroom_id} 群的 {m.get("nickName")}-{m.get("wxid")} 发送好友邀请 {msg} \n {data} \n 当前已邀请好友数:{wixd_add_contacts_from_chatrooms_times[wxid]}')
  892. # # 推送到kafka
  893. # k_message = wx_add_contacts_from_chatroom_message(history.wxid, history.chatroomId, history.contactWixd, history.addTime)
  894. # await kafka_service.send_message_async(k_message)
  895. # # await asyncio.sleep(random.uniform(1.5, 3))
  896. # # await asyncio.sleep(random.uniform(30, 60))
  897. # await asyncio.sleep(random.uniform(270,300))
  898. # # 任务状态推送到kafka
  899. # task_status = await gewe_service.wx_add_contacts_from_chatroom_task_status_async(wxid, chatroom_id)
  900. # k_message=wx_add_contacts_from_chatroom_task_status_message(wxid, chatroom_id, task_status)
  901. # await kafka_service.send_message_async(k_message)
  902. # # 下一个群
  903. # await asyncio.sleep(random.uniform(1.5, 3))
  904. # #logger.info(f'{wxid} 定时群成员定时添好友任务完成,本次共邀请好友{wixd_add_contacts_from_chatrooms_times[wxid]}个')
  905. # except Exception as e:
  906. # # 获取当前的堆栈跟踪
  907. # tb = sys.exc_info()[2]
  908. # # 为异常附加堆栈跟踪
  909. # e = e.with_traceback(tb)
  910. # # 输出详细的错误信息
  911. # logger.error(f"任务执行过程中发生异常: {e}\n异常类型: {type(e).__name__}\n异常信息: {str(e)}\n堆栈跟踪: {traceback.format_exc()}")
  912. # async def task():
  913. # try:
  914. # now = datetime.now()
  915. # # if 10> now.hour < 8:
  916. # # logger.info(f"定时群成员定时添好友任务不启动,当前时间为 {now.strftime('%Y-%m-%d %H:%M:%S')},早于8点")
  917. # # return
  918. # if now.hour < 8 or now.hour > 22:
  919. # logger.info(f"定时群成员定时添好友任务不启动, 当前时间为 {now.strftime('%Y-%m-%d %H:%M:%S')},不在8点到22点之间")
  920. # return
  921. # logger.info('定时群成员定时添好友任务开始')
  922. # redis_service = RedisService()
  923. # await redis_service.init(**redis_config)
  924. # gewe_service = await GeWeService.get_instance(redis_service, gewe_config['api_url'])
  925. # KAFKA_BOOTSTRAP_SERVERS = kafka_config['bootstrap_servers']
  926. # KAFKA_TOPIC = kafka_config['topic']
  927. # KAFKA_GROUP_ID = kafka_config['group_id']
  928. # kafka_service = KafkaService(KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC, KAFKA_TOPIC, KAFKA_GROUP_ID)
  929. # await kafka_service.start_producer()
  930. # global_config = await gewe_service.get_global_config_from_cache_async()
  931. # scheduled_task_add_contacts_from_chatrooms_config = global_config.get('scheduledTaskAddContactsFromChatrooms', {})
  932. # oneday_add_contacts_total = 90
  933. # once_add_contacts_total = 30
  934. # # oneday_times=3
  935. # if scheduled_task_add_contacts_from_chatrooms_config:
  936. # oneday_add_contacts_total = scheduled_task_add_contacts_from_chatrooms_config.get('oneDayAddContactsTotal', 90)
  937. # once_add_contacts_total = scheduled_task_add_contacts_from_chatrooms_config.get('onceAddContactsTotal', 30)
  938. # # oneday_times=scheduled_task_add_contacts_from_chatrooms_config.get('oneDayTimes',3)
  939. # login_keys = []
  940. # async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
  941. # login_keys.append(key)
  942. # # 设置 Semaphore,限制并发任务数量
  943. # semaphore = asyncio.Semaphore(10) # 例如,限制同时运行的任务数量为 10
  944. # # 使用 asyncio.gather 并发处理每个 login_key
  945. # await asyncio.gather(*[process_login_key(redis_service, gewe_service, kafka_service, k, gewe_config, oneday_add_contacts_total, once_add_contacts_total, semaphore) for k in login_keys])
  946. # except Exception as e:
  947. # # 获取当前的堆栈跟踪
  948. # tb = sys.exc_info()[2]
  949. # # 为异常附加堆栈跟踪
  950. # e = e.with_traceback(tb)
  951. # # 输出详细的错误信息
  952. # logger.error(f"任务执行过程中发生异常: {e}\n异常类型: {type(e).__name__}\n异常信息: {str(e)}\n堆栈跟踪: {traceback.format_exc()}")
  953. # finally:
  954. # await kafka_service.stop_producer()
  955. # loop = asyncio.get_event_loop()
  956. # if loop.is_closed():
  957. # loop = asyncio.new_event_loop()
  958. # asyncio.set_event_loop(loop)
  959. # loop.run_until_complete(task()) # 在现有事件循环中运行任务
  960. REDIS_KEY_PATTERN = "friend_add_limit:{date}"
  961. REDIS_LAST_RUN_KEY = "last_run_time:add_friends_task"
  962. @celery_app.task(name='tasks.add_friends_task', bind=True, acks_late=True)
  963. def add_friends_task(self,redis_config):
  964. """
  965. 限制每天最多 15 个,每 2 小时最多 8 个
  966. """
  967. async def task():
  968. redis_service = RedisService()
  969. await redis_service.init(**redis_config)
  970. today_str = datetime.now().strftime("%Y%m%d")
  971. redis_key = REDIS_KEY_PATTERN.format(date=today_str)
  972. # 获取当前总添加数量
  973. total_added = await redis_service.get_hash_field(redis_key, "total") or 0
  974. last_2h_added =await redis_service.get_hash_field(redis_key, "last_2h") or 0
  975. total_added = int(total_added)
  976. last_2h_added = int(last_2h_added)
  977. logger.info(f"当前添加好友总数: {total_added}, 过去2小时添加: {last_2h_added}")
  978. # 判断是否超过限制
  979. if total_added >= 15:
  980. logger.warning("今日好友添加已达上限!")
  981. return
  982. if last_2h_added >= 8:
  983. logger.warning("过去2小时添加已达上限!")
  984. return
  985. # 计算本次要添加的好友数量 (控制每天 5-15 个)
  986. max_add = min(15 - total_added, 8 - last_2h_added)
  987. if max_add <= 0:
  988. return
  989. num_to_add = min(max_add, 1) # 每次最多加 1 个
  990. logger.info(f"本次添加 {num_to_add} 位好友")
  991. # TODO: 调用好友添加逻辑 (接口 or 业务逻辑)
  992. # success = add_friends(num_to_add)
  993. success = num_to_add # 假设成功添加 num_to_add 个
  994. # 更新 Redis 计数
  995. if success > 0:
  996. await redis_service.increment_hash_field(redis_key, "total", success)
  997. await redis_service.increment_hash_field(redis_key, "last_2h", success)
  998. # 设置 Redis 过期时间 (每日记录存 1 天, 2 小时记录存 2 小时)
  999. await redis_service.expire(redis_key, 86400) # 24小时
  1000. await redis_service.expire_field(redis_key, "last_2h", 7200) # 2小时
  1001. logger.info(f"成功添加 {success} 位好友, 今日总数 {total_added + success}")
  1002. # 生成一个新的随机时间(5-15 分钟之间)
  1003. # next_interval = random.randint(10, 20)
  1004. # # 计算新的执行时间
  1005. # next_run_time = datetime.datetime.now() + timedelta(seconds=next_interval)
  1006. # # 重新注册 RedBeat 任务,确保下次执行时间不同
  1007. # redbeat_entry = RedBeatSchedulerEntry(
  1008. # name="redbeat:add_friends_task",
  1009. # task="tasks.add_friends_task",
  1010. # schedule=celery.schedules.schedule(timedelta(seconds=next_interval)),
  1011. # args=[redis_config],
  1012. # app=celery_app
  1013. # )
  1014. # # 设置任务的下次执行时间
  1015. # redbeat_entry.last_run_at = next_run_time
  1016. # redbeat_entry.save()
  1017. # logger.info(f"下次任务将在 {next_run_time} 执行(间隔 {next_interval} 秒)")
  1018. loop = asyncio.get_event_loop()
  1019. if loop.is_closed():
  1020. loop = asyncio.new_event_loop()
  1021. asyncio.set_event_loop(loop)
  1022. loop.run_until_complete(task()) # 在现有事件循环中运行任务
  1023. def today_seconds_remaining()->int:
  1024. current_time = datetime.now()
  1025. # 计算当天的结束时间(23:59:59)
  1026. end_of_day = datetime(current_time.year, current_time.month, current_time.day, 23, 59, 59)
  1027. # 计算时间差
  1028. time_difference = end_of_day - current_time
  1029. # 将时间差转换为秒数
  1030. time_difference_seconds = int(time_difference.total_seconds())
  1031. return time_difference_seconds
  1032. @celery_app.task(name='tasks.random_scheduled_task', bind=True, acks_late=True)
  1033. def random_scheduled_task(self,):
  1034. print(f"Task executed at {datetime.now()}")
  1035. # 随机生成下次执行时间(例如:10-60秒内的随机时间)
  1036. next_run_in = random.randint(10, 60)
  1037. print(f"Next execution will be in {next_run_in} seconds")
  1038. # 设置下次执行时间
  1039. entry = RedBeatSchedulerEntry(
  1040. name='random-task',
  1041. task='tasks.random_scheduled_task',
  1042. schedule=timedelta(seconds=next_run_in),
  1043. app=celery_app
  1044. )
  1045. entry.save()
  1046. return f"Scheduled next run in {next_run_in} seconds"