您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符

1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
6 天前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
6 天前
1周前
6 天前
1周前
1周前
6 天前
1周前
6 天前
1周前
1周前
6 天前
1周前
6 天前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1周前
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153
  1. from celery_app import celery_app
  2. from fastapi import Request,FastAPI
  3. import time,datetime
  4. from celery import Celery
  5. import celery.schedules
  6. from redbeat import RedBeatSchedulerEntry
  7. from datetime import timedelta
  8. from services.redis_service import RedisService
  9. from services.kafka_service import KafkaService
  10. from services.gewe_service import GeWeService
  11. # from common.log import logger
  12. from common.utils import *
  13. import asyncio,random
  14. from model.models import AddGroupContactsHistory
  15. import logging
  16. from model.models import AgentConfig
  17. import logging
  18. import sys,traceback
  19. logger = logging.getLogger('redbeat')
  20. @celery_app.task(name='tasks.add_task', bind=True, acks_late=True)
  21. def add_task(self, x, y):
  22. time.sleep(5) # 模拟长时间计算
  23. logger.info('add')
  24. return x + y
  25. @celery_app.task(name='tasks.mul_task', bind=True, acks_late=True)
  26. def mul_task(self, x, y):
  27. time.sleep(5) # 模拟长时间计算
  28. return x * y
  29. # @celery.task(name='app.tasks.sync_contacts', bind=True, acks_late=True)
  30. # async def sync_contacts_task(self,app):
  31. # login_keys = list(await app.state.redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'))
  32. # return login_keys
  33. # # for k in login_keys:
  34. # # print(k)
  35. @celery_app.task(name='tasks.sync_contacts', bind=True, acks_late=True)
  36. async def sync_contacts_task(self, redis_service):
  37. # Use the redis_service passed as an argument
  38. login_keys = list(await redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'))
  39. return login_keys
  40. @celery_app.task(name='tasks.background_worker_task', bind=True, acks_late=True)
  41. def background_worker_task(self, redis_config, kafka_config, gewe_config):
  42. async def task():
  43. redis_service = RedisService()
  44. await redis_service.init(**redis_config)
  45. login_keys = []
  46. async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
  47. login_keys.append(key)
  48. print(login_keys)
  49. asyncio.run(task())
  50. # @celery.task(name='tasks.background_worker_task', bind=True, acks_late=True)
  51. # async def background_worker_task(self, redis_config, kafka_config, gewe_config):
  52. # # Initialize services inside the task
  53. # redis_service = RedisService()
  54. # await redis_service.init(**redis_config)
  55. # login_keys = []
  56. # async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'): # 使用 async for 遍历异步生成器
  57. # login_keys.append(key)
  58. # print(login_keys)
  59. # kafka_service = KafkaService(**kafka_config)
  60. # await kafka_service.start()
  61. # gewe_service = await GeWeService.get_instance(None, gewe_config['api_url'])
  62. # # Task logic
  63. # lock_name = "background_wxchat_thread_lock"
  64. # lock_identifier = str(time.time())
  65. # while True:
  66. # if await redis_service.acquire_lock(lock_name, timeout=10):
  67. # try:
  68. # logger.info("分布式锁已成功获取")
  69. # # Perform task logic
  70. # finally:
  71. # await redis_service.release_lock(lock_name, lock_identifier)
  72. # break
  73. # else:
  74. # logger.info("获取分布式锁失败,等待10秒后重试...")
  75. # await asyncio.sleep(10)
  76. # @celery_app.task(name='tasks.scheduled_task', bind=True, acks_late=True)
  77. # def scheduled_task(self):
  78. # print("定时任务执行成功!~~~~~~~~~~~~~~~~~")
  79. # return "Hello from Celery Beat + RedBeat!"
  80. # @celery_app.task(name='tasks.scheduled_task_sync_wx', bind=True, acks_late=True)
  81. # def scheduled_task_sync_wx(self,redis_service,kafka_service,gewe_service):
  82. # print("scheduled_task_sync_wx 定时任务执行成功!")
  83. # return "Hello from Celery Beat + RedBeat!"
  84. # @celery_app.task(name='tasks.scheduled_task_sync_wx_info_1', bind=True, acks_late=True)
  85. # def scheduled_task_sync_wx_info_1(self,redis_config, kafka_config, gewe_config):
  86. # '''
  87. # 定时获取微信号资料
  88. # '''
  89. # loop = asyncio.new_event_loop()
  90. # asyncio.set_event_loop(loop)
  91. # async def task():
  92. # try:
  93. # redis_service = RedisService()
  94. # await redis_service.init(**redis_config)
  95. # # gewe_service = await GeWeService.get_instance(None, gewe_config['api_url'])
  96. # login_keys = []
  97. # async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
  98. # login_keys.append(key)
  99. # print(login_keys)
  100. # # for k in login_keys:
  101. # # r = await redis_service.get_hash(k)
  102. # # app_id = r.get("appId")
  103. # # token_id = r.get("tokenId")
  104. # # wxid = r.get("wxid")
  105. # # status = r.get('status')
  106. # # if status == '0':
  107. # # continue
  108. # # ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id)
  109. # # if ret != 200:
  110. # # logger.warning(f"同步微信号 {wxid} 资料失败: {ret}-{msg}")
  111. # # continue
  112. # # nickname=profile.get("nickName")
  113. # # head_img_url=profile.get("smallHeadImgUrl")
  114. # # r.update({"nickName":nickname,"headImgUrl":head_img_url,"modify_at":int(time.time())})
  115. # # cleaned_login_info = {k: (v if v is not None else '') for k, v in r.items()}
  116. # # await redis_service.set_hash(k, cleaned_login_info)
  117. # # logger.info(f"同步微信号 {wxid} 资料 成功")
  118. # # redis_service.update_hash_field(k,"nickName",nickname)
  119. # # redis_service.update_hash_field(k,"headImgUrl",head_img_url)
  120. # # redis_service.update_hash_field(k,"modify_at",int(time.time()))
  121. # except Exception as e:
  122. # logger.error(f"任务执行过程中发生异常: {e}")
  123. # print("scheduled_task_sync_wx_info 定时任务执行成功!")
  124. # return "Hello from Celery Beat + RedBeat!"
  125. # loop.run_until_complete(task())
  126. # loop.close()
  127. # @celery_app.task(name='tasks.scheduled_task_sync_wx_info', bind=True, acks_late=True)
  128. # def scheduled_task_sync_wx_info(self, redis_config, kafka_config, gewe_config):
  129. # '''
  130. # 定时获取微信号资料
  131. # '''
  132. # async def task():
  133. # try:
  134. # redis_service = RedisService()
  135. # await redis_service.init(**redis_config)
  136. # gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url'])
  137. # login_keys = []
  138. # async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
  139. # login_keys.append(key)
  140. # #print(login_keys)
  141. # for k in login_keys:
  142. # r = await redis_service.get_hash(k)
  143. # app_id = r.get("appId")
  144. # token_id = r.get("tokenId")
  145. # wxid = r.get("wxid")
  146. # status = r.get('status')
  147. # if status == '0':
  148. # logger.warning(f"微信号 {wxid} 已经离线")
  149. # continue
  150. # ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id)
  151. # if ret != 200:
  152. # logger.warning(f"同步微信号 {wxid} 资料失败: {ret}-{msg}")
  153. # continue
  154. # nickname=profile.get("nickName")
  155. # head_img_url=profile.get("smallHeadImgUrl")
  156. # # print(nickname)
  157. # nickname=profile.get("nickName")
  158. # head_img_url=profile.get("smallHeadImgUrl")
  159. # r.update({"nickName":nickname,"headImgUrl":head_img_url,"modify_at":int(time.time())})
  160. # cleaned_login_info = {k: (v if v is not None else '') for k, v in r.items()}
  161. # await redis_service.set_hash(k, cleaned_login_info)
  162. # logger.info(f"定时同步微信号{wxid}-昵称{nickname} 资料成功")
  163. # except Exception as e:
  164. # logger.error(f"任务执行过程中发生异常: {e}")
  165. # loop = asyncio.get_event_loop()
  166. # if loop.is_closed():
  167. # loop = asyncio.new_event_loop()
  168. # asyncio.set_event_loop(loop)
  169. # loop.run_until_complete(task()) # 在现有事件循环中运行任务
  170. @celery_app.task(name='tasks.scheduled_task_sync_wx_info', bind=True, acks_late=True)
  171. def scheduled_task_sync_wx_info(self, redis_config, kafka_config, gewe_config):
  172. '''
  173. 定时获取微信号资料
  174. '''
  175. async def process_key(redis_service, gewe_service, semaphore, key):
  176. async with semaphore: # 使用 Semaphore 控制并发
  177. try:
  178. r = await redis_service.get_hash(key)
  179. app_id = r.get("appId")
  180. token_id = r.get("tokenId")
  181. wxid = r.get("wxid")
  182. status = r.get('status')
  183. if status == '0':
  184. logger.warning(f"微信号 {wxid} 已经离线")
  185. return
  186. ret, msg, profile = await gewe_service.get_profile_async(token_id, app_id)
  187. if ret != 200:
  188. logger.warning(f"同步微信号 {wxid} 资料失败: {ret}-{msg}")
  189. return
  190. nickname = profile.get("nickName")
  191. head_img_url = profile.get("smallHeadImgUrl")
  192. r.update({"nickName": nickname, "headImgUrl": head_img_url, "modify_at": int(time.time())})
  193. cleaned_login_info = {k: (v if v is not None else '') for k, v in r.items()}
  194. await redis_service.set_hash(key, cleaned_login_info)
  195. logger.info(f"定时同步微信号{wxid}-昵称{nickname} 资料成功")
  196. except Exception as e:
  197. logger.error(f"处理键 {key} 时发生异常: {e}")
  198. async def task():
  199. try:
  200. redis_service = RedisService()
  201. await redis_service.init(**redis_config)
  202. gewe_service = await GeWeService.get_instance(redis_service, gewe_config['api_url'])
  203. login_keys = []
  204. async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
  205. login_keys.append(key)
  206. # 设置 Semaphore,限制并发数为 10
  207. semaphore = asyncio.Semaphore(10)
  208. # 使用 asyncio.gather 并发处理所有键
  209. await asyncio.gather(*[process_key(redis_service, gewe_service, semaphore, key) for key in login_keys])
  210. except Exception as e:
  211. logger.error(f"任务执行过程中发生异常: {e}")
  212. loop = asyncio.get_event_loop()
  213. if loop.is_closed():
  214. loop = asyncio.new_event_loop()
  215. asyncio.set_event_loop(loop)
  216. loop.run_until_complete(task()) # 在现有事件循环中运行任务
  217. # @celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms', bind=True, acks_late=True)
  218. # def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config, gewe_config):
  219. # '''
  220. # 关于群加好友的请求规则: 每个智能体一次最多30人,间隔2小时做1次,即最多90人/天。
  221. # 加好友规则:每天处理次数、间隔时间(分钟)、每次加好友人数这3个参数都可以设置。目前默认只是上面的设置。
  222. # '''
  223. # async def task():
  224. # try:
  225. # now = datetime.now()
  226. # if now.hour < 8:
  227. # logger.info(f"定时群成员定时添好友任务不启动,当前时间为 {now.strftime('%Y-%m-%d %H:%M:%S')},早于8点")
  228. # return
  229. # logger.info('定时群成员定时添好友任务开始')
  230. # redis_service = RedisService()
  231. # await redis_service.init(**redis_config)
  232. # gewe_service = await GeWeService.get_instance(redis_service,gewe_config['api_url'])
  233. # KAFKA_BOOTSTRAP_SERVERS=kafka_config['bootstrap_servers']
  234. # KAFKA_TOPIC=kafka_config['topic']
  235. # KAFKA_GROUP_ID=kafka_config['group_id']
  236. # kafka_service= KafkaService(KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC, KAFKA_TOPIC,KAFKA_GROUP_ID)
  237. # await kafka_service.start_producer()
  238. # global_config=await gewe_service.get_global_config_from_cache_async()
  239. # scheduled_task_add_contacts_from_chatrooms_config=global_config.get('scheduledTaskAddContactsFromChatrooms',{})
  240. # oneday_add_contacts_total=90
  241. # once_add_contacts_total=30
  242. # #oneday_times=3
  243. # if scheduled_task_add_contacts_from_chatrooms_config:
  244. # oneday_add_contacts_total=scheduled_task_add_contacts_from_chatrooms_config.get('oneDayAddContactsTotal',90)
  245. # once_add_contacts_total=scheduled_task_add_contacts_from_chatrooms_config.get('onceAddContactsTotal',30)
  246. # #oneday_times=scheduled_task_add_contacts_from_chatrooms_config.get('oneDayTimes',3)
  247. # # cache_task_run_time_logs= await gewe_service.get_task_run_time_async('scheduled_task_add_contacts_from_chatrooms')
  248. # # if cache_task_run_time_logs:
  249. # # sorted_tasks = sorted(cache_task_run_time_logs, key=lambda x: x.get("runTime"), reverse=True)
  250. # # last_run_time=sorted_tasks[0].get("runTime")
  251. # # if last_run_time > 1e12: # 毫秒级时间戳
  252. # # last_run_time = last_run_time / 1000 # 转换为秒
  253. # # # 将时间戳转换为 datetime 对象
  254. # # last_run_time = datetime.fromtimestamp(last_run_time)
  255. # # # 获取当前时间
  256. # # current_time = datetime.now()
  257. # # # 计算时间差
  258. # # time_difference = current_time - last_run_time
  259. # # # 判断是否相差2小时
  260. # # if time_difference < timedelta(hours=2):
  261. # # logger.info(f"上次定时群成员定时添好友任务在2小时内,不再执行")
  262. # # return
  263. # # time_difference_seconds = today_seconds_remaining()
  264. # # cache_task_run_time_logs.append({"runTime":int(time.time())})
  265. # # await gewe_service.save_task_run_time_async('scheduled_task_add_contacts_from_chatrooms',cache_task_run_time_logs,time_difference_seconds)
  266. # login_keys = []
  267. # async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
  268. # login_keys.append(key)
  269. # wixd_add_contacts_from_chatrooms_times = {}
  270. # for k in login_keys:
  271. # r = await redis_service.get_hash(k)
  272. # app_id = r.get("appId")
  273. # token_id = r.get("tokenId")
  274. # wxid = r.get("wxid")
  275. # status = r.get('status')
  276. # if status == '0':
  277. # logger.warning(f"微信号 {wxid} 已经离线,群成员不能定时添加")
  278. # continue
  279. # config=await gewe_service.get_wxchat_config_from_cache_async(wxid)
  280. # validated_config = AgentConfig.model_validate(config)
  281. # if not validated_config.agentEnabled:
  282. # logger.warning(f"微信号 {wxid} 取消了托管,群成员不能定时添加")
  283. # continue
  284. # # 判断是否过于频繁
  285. # is_wx_expection = await gewe_service.get_wx_expection_async(wxid,"addGroupMemberAsFriend")
  286. # if is_wx_expection:
  287. # logger.info(f"{wxid} 本次任务接口addGroupMemberAsFriend异常, {is_wx_expection},本次群好友邀请任务未开始,跳过任务。")
  288. # continue
  289. # cache_task_run_time_wxid_logs= await gewe_service.get_task_run_time_by_wxid_async(wxid,'scheduled_task_add_contacts_from_chatrooms')
  290. # if cache_task_run_time_wxid_logs:
  291. # sorted_tasks = sorted(cache_task_run_time_wxid_logs, key=lambda x: x.get("runTime"), reverse=True)
  292. # last_run_time=sorted_tasks[0].get("runTime")
  293. # if last_run_time > 1e12: # 毫秒级时间戳
  294. # last_run_time = last_run_time / 1000 # 转换为秒
  295. # # 将时间戳转换为 datetime 对象
  296. # last_run_time = datetime.fromtimestamp(last_run_time)
  297. # # 获取当前时间
  298. # current_time = datetime.now()
  299. # # 计算时间差
  300. # time_difference = current_time - last_run_time
  301. # # 判断是否相差2小时
  302. # if time_difference < timedelta(hours=2):
  303. # logger.info(f"{wxid}上次定时群成员定时添好友任务在2小时内,不再执行")
  304. # continue
  305. # cache_task_run_time_wxid_logs.append({"runTime":int(time.time())})
  306. # await gewe_service.save_task_run_time_by_wxid_async(wxid,'scheduled_task_add_contacts_from_chatrooms',cache_task_run_time_wxid_logs,3600*2)
  307. # c = await gewe_service.get_wxchat_config_from_cache_async(wxid)
  308. # contacts = await gewe_service.get_contacts_brief_from_cache_async(wxid)
  309. # contact_wxids = [c.get('userName') for c in contacts]
  310. # chatrooms = c.get('addContactsFromChatroomIdWhiteList', [])
  311. # logger.info(f'{wxid} 定时群成员定时添好友任务开始')
  312. # wixd_add_contacts_from_chatrooms_times[wxid] = 0
  313. # for chatroom_id in chatrooms:
  314. # chatroom = await gewe_service.get_group_info_from_cache_async(wxid, chatroom_id)
  315. # chatroom_member=await gewe_service.get_group_members_from_cache_async(wxid, chatroom_id)
  316. # chatroom_nickname = chatroom.get('nickName')
  317. # chatroom_owner_wxid = chatroom_member.get('chatroomOwner', None)
  318. # admin_wxids = chatroom_member.get('adminWxid', [])
  319. # admin_wxids = chatroom_member.get('adminWxid')
  320. # if admin_wxids is None:
  321. # admin_wxids = [] # 如果 admin_wxids 是 None,将其初始化为空列表
  322. # logger.info(f'{chatroom_nickname} 的群主是 {chatroom_owner_wxid},管理员是{admin_wxids}')
  323. # contact_wxids_set = set(contact_wxids)
  324. # # for admin_wxid in admin_wxids:
  325. # # contact_wxids_set.add(admin_wxid)
  326. # if admin_wxids:
  327. # contact_wxids_set.update(set(admin_wxids))
  328. # if chatroom_owner_wxid is not None:
  329. # contact_wxids_set.add(chatroom_owner_wxid)
  330. # contact_wxids_set.add(wxid)
  331. # # unavailable_wixds=await gewe_service.check_wixd_group_add_contacts_history_async(wxid,chatroom_id)
  332. # # contact_wxids_set.update(set(unavailable_wixds))
  333. # # chatroom_member_list = chatroom.get('memberList', [])
  334. # unavailable_wixds=await gewe_service.check_wixd_group_add_contacts_history_async(wxid,chatroom_id)
  335. # if unavailable_wixds:
  336. # contact_wxids_set.update(set(unavailable_wixds))
  337. # chatroom_member_list = chatroom.get('memberList', [])
  338. # if chatroom_member_list is None:
  339. # chatroom_member_list = [] # 如果 memberList 是 None,将其初始化为空列表
  340. # elif not isinstance(chatroom_member_list, list):
  341. # chatroom_member_list = list(chatroom_member_list) # 如果 memberList 不是列表,将其转换为列表
  342. # remaining_chatroot_members = [x for x in chatroom_member_list if x.get('wxid') not in contact_wxids_set]
  343. # nickname = next((member['nickName'] for member in chatroom_member_list if member['wxid'] == wxid), None)
  344. # if not remaining_chatroot_members:
  345. # logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里没有好友可以邀请')
  346. # # 任务状态推送到kafka
  347. # k_message=wx_add_contacts_from_chatroom_task_status_message(wxid,chatroom_id,2)
  348. # await kafka_service.send_message_async(k_message)
  349. # continue
  350. # logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里还可以邀请的好友有:{[x.get("nickName") for x in remaining_chatroot_members]}')
  351. # for m in remaining_chatroot_members:
  352. # # 判断本次任务是否已经邀请了30个好友
  353. # if wixd_add_contacts_from_chatrooms_times[wxid] == once_add_contacts_total:
  354. # logger.info(f"{wxid} 本次任务已经邀请了{once_add_contacts_total}人,不再邀请")
  355. # continue
  356. # # 判断当天群成员是否已经加了90个好友
  357. # is_add_group_times = await gewe_service.is_group_add_contacts_history_one_day_async(wxid,oneday_add_contacts_total)
  358. # if is_add_group_times:
  359. # logger.info(f"当天 {wxid} 所有群的成员已经加了{oneday_add_contacts_total}个好友,不再添加")
  360. # continue
  361. # # 判断是否过于频繁
  362. # is_wx_expection = await gewe_service.get_wx_expection_async(wxid,"addGroupMemberAsFriend")
  363. # if is_wx_expection:
  364. # logger.info(f"{wxid} 本次任务接口addGroupMemberAsFriend异常,不再邀请,{is_wx_expection}")
  365. # continue
  366. # contact_wxid= m.get('wxid')
  367. # member_nickname=m.get("nickName")
  368. # group_add_contacts_history = await gewe_service.get_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid)
  369. # if group_add_contacts_history:
  370. # sorted_history = sorted(group_add_contacts_history, key=lambda x: x.addTime, reverse=True)
  371. # # 已经邀请过两次,不再邀请
  372. # if len(sorted_history)==2:
  373. # logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经邀请过2次,不再邀请')
  374. # continue
  375. # # 当天邀请过,不再邀请
  376. # if len(sorted_history) > 0:
  377. # last_add_time = sorted_history[0].addTime
  378. # def is_add_time_more_than_one_day(addTime: int) -> bool:
  379. # """
  380. # 判断 addTime 是否与当前时间相隔大于 3600 × 24 秒
  381. # :param addTime: Unix 时间戳
  382. # :return: 如果 addTime 与当前时间相隔大于 3600 × 24 秒,返回 True;否则返回 False
  383. # """
  384. # # 获取当前时间的时间戳
  385. # current_time = time.time()
  386. # # 计算时间戳差值
  387. # time_difference = abs(current_time - addTime)
  388. # # 检查是否大于 3600 × 24 秒
  389. # return time_difference > 3600 * 24
  390. # is_more_than_one_day= is_add_time_more_than_one_day(last_add_time)
  391. # if not is_more_than_one_day:
  392. # logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经当天邀请,不再邀请')
  393. # continue
  394. # ret, msg, data = await gewe_service.add_group_member_as_friend_async(token_id, app_id, chatroom_id, m.get('wxid'), f'我是群聊"{chatroom_nickname}"群的{nickname}')
  395. # if ret!=200:
  396. # logger.warning(f'群好友邀请失败原因:{ret} {data}')
  397. # if msg in '操作过于频繁,请稍后再试。':
  398. # await gewe_service.save_wx_expection_async(wxid,"addGroupMemberAsFriend",msg,today_seconds_remaining())
  399. # logger.warning(f'{nickname}-{wxid} 操作过于频繁,本次群好友邀请任务未完成跳过。')
  400. # continue
  401. # history=AddGroupContactsHistory.model_validate({
  402. # "chatroomId":chatroom_id,
  403. # "wxid":wxid,
  404. # "contactWixd":contact_wxid,
  405. # "addTime":int(time.time())
  406. # })
  407. # await gewe_service.save_group_add_contacts_history_async(wxid,chatroom_id,contact_wxid,history)
  408. # wixd_add_contacts_from_chatrooms_times[wxid]+=1
  409. # logger.info(f'{nickname} 向 {chatroom_nickname}-{chatroom_id} 群的 {m.get("nickName")}-{m.get("wxid")} 发送好友邀请 {msg}')
  410. # # 推送到kafka
  411. # k_message = wx_add_contacts_from_chatroom_message(history.wxid,history.chatroomId,history.contactWixd,history.addTime)
  412. # await kafka_service.send_message_async(k_message)
  413. # #await asyncio.sleep(random.uniform(1.5, 3))
  414. # await asyncio.sleep(random.uniform(30,60))
  415. # # 任务状态推送到kafka
  416. # task_status=await gewe_service.wx_add_contacts_from_chatroom_task_status_async(wxid,chatroom_id)
  417. # k_message=wx_add_contacts_from_chatroom_task_status_message(wxid,chatroom_id,task_status)
  418. # await kafka_service.send_message_async(k_message)
  419. # # 下一个群
  420. # await asyncio.sleep(random.uniform(1.5, 3))
  421. # except Exception as e:
  422. # # 获取当前的堆栈跟踪
  423. # tb = sys.exc_info()[2]
  424. # # 为异常附加堆栈跟踪
  425. # e = e.with_traceback(tb)
  426. # # 输出详细的错误信息
  427. # logger.error(f"任务执行过程中发生异常: {e}\n异常类型: {type(e).__name__}\n异常信息: {str(e)}\n堆栈跟踪: {traceback.format_exc()}")
  428. # finally:
  429. # await kafka_service.stop_producer()
  430. # loop = asyncio.get_event_loop()
  431. # if loop.is_closed():
  432. # loop = asyncio.new_event_loop()
  433. # asyncio.set_event_loop(loop)
  434. # loop.run_until_complete(task()) # 在现有事件循环中运行任务
  435. @celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms', bind=True, acks_late=True)
  436. def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config, gewe_config):
  437. '''
  438. 关于群加好友的请求规则: 每个智能体一次最多30人,间隔2小时做1次,即最多90人/天。
  439. 加好友规则:每天处理次数、间隔时间(分钟)、每次加好友人数这3个参数都可以设置。目前默认只是上面的设置。
  440. '''
  441. async def process_login_key(redis_service:RedisService, gewe_service: GeWeService, kafka_service:KafkaService, k, gewe_config, oneday_add_contacts_total, once_add_contacts_total, semaphore):
  442. async with semaphore: # 使用 Semaphore 控制并发
  443. r = await redis_service.get_hash(k)
  444. app_id = r.get("appId")
  445. token_id = r.get("tokenId")
  446. wxid = r.get("wxid")
  447. status = r.get('status')
  448. if status == '0':
  449. logger.warning(f"微信号 {wxid} 已经离线,群成员不能定时添加")
  450. return
  451. config = await gewe_service.get_wxchat_config_from_cache_async(wxid)
  452. validated_config = AgentConfig.model_validate(config)
  453. if not validated_config.agentEnabled:
  454. logger.warning(f"微信号 {wxid} 取消了托管,群成员不能定时添加")
  455. return
  456. # 判断是否过于频繁
  457. is_wx_expection = await gewe_service.get_wx_expection_async(wxid, "addGroupMemberAsFriend")
  458. if is_wx_expection:
  459. logger.info(f"{wxid} 本次任务接口addGroupMemberAsFriend异常, {is_wx_expection},本次群好友邀请任务未开始,跳过任务。")
  460. return
  461. cache_task_run_time_wxid_logs = await gewe_service.get_task_run_time_by_wxid_async(wxid, 'scheduled_task_add_contacts_from_chatrooms')
  462. if cache_task_run_time_wxid_logs:
  463. sorted_tasks = sorted(cache_task_run_time_wxid_logs, key=lambda x: x.get("runTime"), reverse=True)
  464. last_run_time = sorted_tasks[0].get("runTime")
  465. if last_run_time > 1e12: # 毫秒级时间戳
  466. last_run_time = last_run_time / 1000 # 转换为秒
  467. # 将时间戳转换为 datetime 对象
  468. last_run_time = datetime.fromtimestamp(last_run_time)
  469. # 获取当前时间
  470. current_time = datetime.now()
  471. # 计算时间差
  472. time_difference = current_time - last_run_time
  473. # 判断是否相差2小时
  474. if time_difference < timedelta(hours=2):
  475. logger.info(f"{wxid}上次定时群成员定时添好友任务在2小时内,不再执行")
  476. return
  477. cache_task_run_time_wxid_logs.append({"runTime": int(time.time())})
  478. await gewe_service.save_task_run_time_by_wxid_async(wxid, 'scheduled_task_add_contacts_from_chatrooms', cache_task_run_time_wxid_logs, 3600 * 2)
  479. c:dict = await gewe_service.get_wxchat_config_from_cache_async(wxid)
  480. contacts = await gewe_service.get_contacts_brief_from_cache_async(wxid)
  481. contact_wxids = [c.get('userName') for c in contacts]
  482. chatrooms = c.get('addContactsFromChatroomIdWhiteList', [])
  483. logger.info(f'{wxid} 定时群成员定时添好友任务开始')
  484. wixd_add_contacts_from_chatrooms_times = {wxid: 0}
  485. for chatroom_id in chatrooms:
  486. chatroom = await gewe_service.get_group_info_from_cache_async(wxid, chatroom_id)
  487. chatroom_member = await gewe_service.get_group_members_from_cache_async(wxid, chatroom_id)
  488. chatroom_nickname = chatroom.get('nickName')
  489. chatroom_owner_wxid = chatroom_member.get('chatroomOwner', None)
  490. admin_wxids = chatroom_member.get('adminWxid', [])
  491. admin_wxids = chatroom_member.get('adminWxid')
  492. if admin_wxids is None:
  493. admin_wxids = [] # 如果 admin_wxids 是 None,将其初始化为空列表
  494. logger.info(f'{chatroom_nickname} 的群主是 {chatroom_owner_wxid},管理员是{admin_wxids}')
  495. contact_wxids_set = set(contact_wxids)
  496. if admin_wxids:
  497. contact_wxids_set.update(set(admin_wxids))
  498. if chatroom_owner_wxid is not None:
  499. contact_wxids_set.add(chatroom_owner_wxid)
  500. contact_wxids_set.add(wxid)
  501. unavailable_wixds = await gewe_service.check_wixd_group_add_contacts_history_async(wxid, chatroom_id)
  502. if unavailable_wixds:
  503. contact_wxids_set.update(set(unavailable_wixds))
  504. chatroom_member_list = chatroom.get('memberList', [])
  505. if chatroom_member_list is None:
  506. chatroom_member_list = [] # 如果 memberList 是 None,将其初始化为空列表
  507. elif not isinstance(chatroom_member_list, list):
  508. chatroom_member_list = list(chatroom_member_list) # 如果 memberList 不是列表,将其转换为列表
  509. remaining_chatroot_members = [x for x in chatroom_member_list if x.get('wxid') not in contact_wxids_set]
  510. nickname = next((member['nickName'] for member in chatroom_member_list if member['wxid'] == wxid), None)
  511. if not remaining_chatroot_members:
  512. logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname}-{chatroom_id} 群里没有好友可以邀请')
  513. # 任务状态推送到kafka
  514. k_message = wx_add_contacts_from_chatroom_task_status_message(wxid, chatroom_id, 2)
  515. await kafka_service.send_message_async(k_message)
  516. continue
  517. logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里还可以邀请的好友有:{[x.get("nickName") for x in remaining_chatroot_members]}')
  518. for m in remaining_chatroot_members:
  519. # 判断本次任务是否已经邀请了30个好友
  520. if wixd_add_contacts_from_chatrooms_times[wxid] == once_add_contacts_total:
  521. logger.info(f"{wxid} 本次任务已经邀请了{once_add_contacts_total}人,不再邀请")
  522. return
  523. # 判断当天群成员是否已经加了90个好友
  524. is_add_group_times = await gewe_service.is_group_add_contacts_history_one_day_async(wxid, oneday_add_contacts_total)
  525. if is_add_group_times:
  526. logger.info(f"当天 {wxid} 所有群的成员已经加了{oneday_add_contacts_total}个好友,不再添加")
  527. return
  528. # 判断是否过于频繁
  529. is_wx_expection = await gewe_service.get_wx_expection_async(wxid, "addGroupMemberAsFriend")
  530. if is_wx_expection:
  531. logger.info(f"{wxid} 本次任务接口addGroupMemberAsFriend异常,不再邀请,{is_wx_expection}")
  532. return
  533. contact_wxid = m.get('wxid')
  534. member_nickname = m.get("nickName")
  535. group_add_contacts_history = await gewe_service.get_group_add_contacts_history_async(wxid, chatroom_id, contact_wxid)
  536. if group_add_contacts_history:
  537. sorted_history = sorted(group_add_contacts_history, key=lambda x: x.addTime, reverse=True)
  538. # 已经邀请过两次,不再邀请
  539. if len(sorted_history) == 2:
  540. logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经邀请过2次,不再邀请')
  541. continue
  542. # 24小时邀请过,不再邀请
  543. if len(sorted_history) > 0:
  544. last_add_time = sorted_history[0].addTime
  545. def is_add_time_more_than_one_day(addTime: int) -> bool:
  546. """
  547. 判断 addTime 是否与当前时间相隔大于 3600 × 24 秒
  548. :param addTime: Unix 时间戳
  549. :return: 如果 addTime 与当前时间相隔大于 3600 × 24 秒,返回 True;否则返回 False
  550. """
  551. # 获取当前时间的时间戳
  552. current_time = time.time()
  553. # 计算时间戳差值
  554. time_difference = abs(current_time - addTime)
  555. # 检查是否大于 3600 × 24 秒
  556. return time_difference > 3600 * 24
  557. is_more_than_one_day = is_add_time_more_than_one_day(last_add_time)
  558. if not is_more_than_one_day:
  559. logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经24小时邀请,不再邀请')
  560. continue
  561. ret, msg, data = await gewe_service.add_group_member_as_friend_async(token_id, app_id, chatroom_id, m.get('wxid'),
  562. f'我是群聊"{chatroom_nickname}"群的{nickname}')
  563. history = AddGroupContactsHistory.model_validate({
  564. "chatroomId": chatroom_id,
  565. "wxid": wxid,
  566. "contactWixd": contact_wxid,
  567. "addTime": int(time.time())
  568. })
  569. if ret != 200:
  570. logger.warning(f'{nickname}-{wxid} 在 {chatroom_nickname}-{chatroom_id} 群好友 {member_nickname}-{contact_wxid} 邀请失败原因:{ret} {msg} {data}')
  571. if '操作过于频繁' in data.get('msg'):
  572. await gewe_service.save_wx_expection_async(wxid, "addGroupMemberAsFriend", msg, today_seconds_remaining())
  573. await gewe_service.save_group_add_contacts_history_async(wxid, chatroom_id, contact_wxid, history)
  574. logger.warning(f'{nickname}-{wxid} 在 {chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 操作过于频繁,本次群好友邀请任务未完成跳过。当天不再处理该号群好友邀请任务')
  575. return
  576. await gewe_service.save_group_add_contacts_history_async(wxid, chatroom_id, contact_wxid, history)
  577. wixd_add_contacts_from_chatrooms_times[wxid] += 1
  578. logger.info(f'{nickname} 向 {chatroom_nickname}-{chatroom_id} 群的 {m.get("nickName")}-{m.get("wxid")} 发送好友邀请 {msg}')
  579. # 推送到kafka
  580. k_message = wx_add_contacts_from_chatroom_message(history.wxid, history.chatroomId, history.contactWixd, history.addTime)
  581. await kafka_service.send_message_async(k_message)
  582. # await asyncio.sleep(random.uniform(1.5, 3))
  583. # await asyncio.sleep(random.uniform(30, 60))
  584. await asyncio.sleep(random.uniform(270,300))
  585. # 任务状态推送到kafka
  586. task_status = await gewe_service.wx_add_contacts_from_chatroom_task_status_async(wxid, chatroom_id)
  587. k_message=wx_add_contacts_from_chatroom_task_status_message(wxid, chatroom_id, task_status)
  588. await kafka_service.send_message_async(k_message)
  589. # 下一个群
  590. await asyncio.sleep(random.uniform(1.5, 3))
  591. async def task():
  592. try:
  593. now = datetime.now()
  594. # if 10> now.hour < 8:
  595. # logger.info(f"定时群成员定时添好友任务不启动,当前时间为 {now.strftime('%Y-%m-%d %H:%M:%S')},早于8点")
  596. # return
  597. if now.hour < 8 or now.hour > 22:
  598. logger.info(f"定时群成员定时添好友任务不启动, 当前时间为 {now.strftime('%Y-%m-%d %H:%M:%S')},不在8点到22点之间")
  599. return
  600. logger.info('定时群成员定时添好友任务开始')
  601. redis_service = RedisService()
  602. await redis_service.init(**redis_config)
  603. gewe_service = await GeWeService.get_instance(redis_service, gewe_config['api_url'])
  604. KAFKA_BOOTSTRAP_SERVERS = kafka_config['bootstrap_servers']
  605. KAFKA_TOPIC = kafka_config['topic']
  606. KAFKA_GROUP_ID = kafka_config['group_id']
  607. kafka_service = KafkaService(KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC, KAFKA_TOPIC, KAFKA_GROUP_ID)
  608. await kafka_service.start_producer()
  609. global_config = await gewe_service.get_global_config_from_cache_async()
  610. scheduled_task_add_contacts_from_chatrooms_config = global_config.get('scheduledTaskAddContactsFromChatrooms', {})
  611. oneday_add_contacts_total = 90
  612. once_add_contacts_total = 30
  613. # oneday_times=3
  614. if scheduled_task_add_contacts_from_chatrooms_config:
  615. oneday_add_contacts_total = scheduled_task_add_contacts_from_chatrooms_config.get('oneDayAddContactsTotal', 90)
  616. once_add_contacts_total = scheduled_task_add_contacts_from_chatrooms_config.get('onceAddContactsTotal', 30)
  617. # oneday_times=scheduled_task_add_contacts_from_chatrooms_config.get('oneDayTimes',3)
  618. login_keys = []
  619. async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
  620. login_keys.append(key)
  621. # 设置 Semaphore,限制并发任务数量
  622. semaphore = asyncio.Semaphore(10) # 例如,限制同时运行的任务数量为 10
  623. # 使用 asyncio.gather 并发处理每个 login_key
  624. await asyncio.gather(*[process_login_key(redis_service, gewe_service, kafka_service, k, gewe_config, oneday_add_contacts_total, once_add_contacts_total, semaphore) for k in login_keys])
  625. except Exception as e:
  626. # 获取当前的堆栈跟踪
  627. tb = sys.exc_info()[2]
  628. # 为异常附加堆栈跟踪
  629. e = e.with_traceback(tb)
  630. # 输出详细的错误信息
  631. logger.error(f"任务执行过程中发生异常: {e}\n异常类型: {type(e).__name__}\n异常信息: {str(e)}\n堆栈跟踪: {traceback.format_exc()}")
  632. finally:
  633. await kafka_service.stop_producer()
  634. loop = asyncio.get_event_loop()
  635. if loop.is_closed():
  636. loop = asyncio.new_event_loop()
  637. asyncio.set_event_loop(loop)
  638. loop.run_until_complete(task()) # 在现有事件循环中运行任务
  639. # @celery_app.task(name='tasks.scheduled_task_add_contacts_from_chatrooms', bind=True, acks_late=True)
  640. # def scheduled_task_add_contacts_from_chatrooms(self, redis_config, kafka_config, gewe_config):
  641. # '''
  642. # 关于群加好友的请求规则: 每个智能体一次最多30人,间隔2小时做1次,即最多90人/天。
  643. # 加好友规则:每天处理次数、间隔时间(分钟)、每次加好友人数这3个参数都可以设置。目前默认只是上面的设置。
  644. # '''
  645. # async def process_login_key(redis_service:RedisService, gewe_service: GeWeService, kafka_service:KafkaService, k, gewe_config, oneday_add_contacts_total, once_add_contacts_total):
  646. # r = await redis_service.get_hash(k)
  647. # app_id = r.get("appId")
  648. # token_id = r.get("tokenId")
  649. # wxid = r.get("wxid")
  650. # status = r.get('status')
  651. # if status == '0':
  652. # logger.warning(f"微信号 {wxid} 已经离线,群成员不能定时添加")
  653. # return
  654. # config = await gewe_service.get_wxchat_config_from_cache_async(wxid)
  655. # validated_config = AgentConfig.model_validate(config)
  656. # if not validated_config.agentEnabled:
  657. # logger.warning(f"微信号 {wxid} 取消了托管,群成员不能定时添加")
  658. # return
  659. # # 判断是否过于频繁
  660. # is_wx_expection = await gewe_service.get_wx_expection_async(wxid, "addGroupMemberAsFriend")
  661. # if is_wx_expection:
  662. # logger.info(f"{wxid} 本次任务接口addGroupMemberAsFriend异常, {is_wx_expection},本次群好友邀请任务未开始,跳过任务。")
  663. # return
  664. # cache_task_run_time_wxid_logs = await gewe_service.get_task_run_time_by_wxid_async(wxid, 'scheduled_task_add_contacts_from_chatrooms')
  665. # if cache_task_run_time_wxid_logs:
  666. # sorted_tasks = sorted(cache_task_run_time_wxid_logs, key=lambda x: x.get("runTime"), reverse=True)
  667. # last_run_time = sorted_tasks[0].get("runTime")
  668. # if last_run_time > 1e12: # 毫秒级时间戳
  669. # last_run_time = last_run_time / 1000 # 转换为秒
  670. # # 将时间戳转换为 datetime 对象
  671. # last_run_time = datetime.fromtimestamp(last_run_time)
  672. # # 获取当前时间
  673. # current_time = datetime.now()
  674. # # 计算时间差
  675. # time_difference = current_time - last_run_time
  676. # # 判断是否相差2小时
  677. # if time_difference < timedelta(hours=2):
  678. # logger.info(f"{wxid}上次定时群成员定时添好友任务在2小时内,不再执行")
  679. # return
  680. # cache_task_run_time_wxid_logs.append({"runTime": int(time.time())})
  681. # await gewe_service.save_task_run_time_by_wxid_async(wxid, 'scheduled_task_add_contacts_from_chatrooms', cache_task_run_time_wxid_logs, 3600 * 2)
  682. # c:dict = await gewe_service.get_wxchat_config_from_cache_async(wxid)
  683. # contacts = await gewe_service.get_contacts_brief_from_cache_async(wxid)
  684. # contact_wxids = [c.get('userName') for c in contacts]
  685. # chatrooms = c.get('addContactsFromChatroomIdWhiteList', [])
  686. # logger.info(f'{wxid} 定时群成员定时添好友任务开始')
  687. # wixd_add_contacts_from_chatrooms_times = {wxid: 0}
  688. # for chatroom_id in chatrooms:
  689. # chatroom = await gewe_service.get_group_info_from_cache_async(wxid, chatroom_id)
  690. # chatroom_member = await gewe_service.get_group_members_from_cache_async(wxid, chatroom_id)
  691. # chatroom_nickname = chatroom.get('nickName')
  692. # chatroom_owner_wxid = chatroom_member.get('chatroomOwner', None)
  693. # admin_wxids = chatroom_member.get('adminWxid', [])
  694. # admin_wxids = chatroom_member.get('adminWxid')
  695. # if admin_wxids is None:
  696. # admin_wxids = [] # 如果 admin_wxids 是 None,将其初始化为空列表
  697. # logger.info(f'{chatroom_nickname} 的群主是 {chatroom_owner_wxid},管理员是{admin_wxids}')
  698. # contact_wxids_set = set(contact_wxids)
  699. # if admin_wxids:
  700. # contact_wxids_set.update(set(admin_wxids))
  701. # if chatroom_owner_wxid is not None:
  702. # contact_wxids_set.add(chatroom_owner_wxid)
  703. # contact_wxids_set.add(wxid)
  704. # unavailable_wixds = await gewe_service.check_wixd_group_add_contacts_history_async(wxid, chatroom_id)
  705. # if unavailable_wixds:
  706. # contact_wxids_set.update(set(unavailable_wixds))
  707. # chatroom_member_list = chatroom.get('memberList', [])
  708. # if chatroom_member_list is None:
  709. # chatroom_member_list = [] # 如果 memberList 是 None,将其初始化为空列表
  710. # elif not isinstance(chatroom_member_list, list):
  711. # chatroom_member_list = list(chatroom_member_list) # 如果 memberList 不是列表,将其转换为列表
  712. # remaining_chatroot_members = [x for x in chatroom_member_list if x.get('wxid') not in contact_wxids_set]
  713. # nickname = next((member['nickName'] for member in chatroom_member_list if member['wxid'] == wxid), None)
  714. # if not remaining_chatroot_members:
  715. # logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里没有好友可以邀请')
  716. # # 任务状态推送到kafka
  717. # k_message = wx_add_contacts_from_chatroom_task_status_message(wxid, chatroom_id, 2)
  718. # await kafka_service.send_message_async(k_message)
  719. # return
  720. # logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里还可以邀请的好友有:{[x.get("nickName") for x in remaining_chatroot_members]}')
  721. # for m in remaining_chatroot_members:
  722. # # 判断本次任务是否已经邀请了30个好友
  723. # if wixd_add_contacts_from_chatrooms_times[wxid] == once_add_contacts_total:
  724. # logger.info(f"{wxid} 本次任务已经邀请了{once_add_contacts_total}人,不再邀请")
  725. # return
  726. # # 判断当天群成员是否已经加了90个好友
  727. # is_add_group_times = await gewe_service.is_group_add_contacts_history_one_day_async(wxid, oneday_add_contacts_total)
  728. # if is_add_group_times:
  729. # logger.info(f"当天 {wxid} 所有群的成员已经加了{oneday_add_contacts_total}个好友,不再添加")
  730. # return
  731. # # 判断是否过于频繁
  732. # is_wx_expection = await gewe_service.get_wx_expection_async(wxid, "addGroupMemberAsFriend")
  733. # if is_wx_expection:
  734. # logger.info(f"{wxid} 本次任务接口addGroupMemberAsFriend异常,不再邀请,{is_wx_expection}")
  735. # return
  736. # contact_wxid = m.get('wxid')
  737. # member_nickname = m.get("nickName")
  738. # group_add_contacts_history = await gewe_service.get_group_add_contacts_history_async(wxid, chatroom_id, contact_wxid)
  739. # if group_add_contacts_history:
  740. # sorted_history = sorted(group_add_contacts_history, key=lambda x: x.addTime, reverse=True)
  741. # # 已经邀请过两次,不再邀请
  742. # if len(sorted_history) == 2:
  743. # logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经邀请过2次,不再邀请')
  744. # return
  745. # # 当天邀请过,不再邀请
  746. # if len(sorted_history) > 0:
  747. # last_add_time = sorted_history[0].addTime
  748. # def is_add_time_more_than_one_day(addTime: int) -> bool:
  749. # """
  750. # 判断 addTime 是否与当前时间相隔大于 3600 × 24 秒
  751. # :param addTime: Unix 时间戳
  752. # :return: 如果 addTime 与当前时间相隔大于 3600 × 24 秒,返回 True;否则返回 False
  753. # """
  754. # # 获取当前时间的时间戳
  755. # current_time = time.time()
  756. # # 计算时间戳差值
  757. # time_difference = abs(current_time - addTime)
  758. # # 检查是否大于 3600 × 24 秒
  759. # return time_difference > 3600 * 24
  760. # is_more_than_one_day = is_add_time_more_than_one_day(last_add_time)
  761. # if not is_more_than_one_day:
  762. # logger.info(f'{nickname}-{wxid}在{chatroom_nickname}-{chatroom_id} 群的 {member_nickname}-{contact_wxid} 已经当天邀请,不再邀请')
  763. # return
  764. # ret, msg, data = await gewe_service.add_group_member_as_friend_async(token_id, app_id, chatroom_id, m.get('wxid'),
  765. # f'我是群聊"{chatroom_nickname}"群的{nickname}')
  766. # if ret != 200:
  767. # logger.warning(f'群好友邀请失败原因:{ret} {msg} {data}')
  768. # if '操作过于频繁' in data.get('msg'):
  769. # await gewe_service.save_wx_expection_async(wxid, "addGroupMemberAsFriend", msg, today_seconds_remaining())
  770. # logger.warning(f'{nickname}-{wxid} 操作过于频繁,本次群好友邀请任务未完成跳过。')
  771. # return
  772. # history = AddGroupContactsHistory.model_validate({
  773. # "chatroomId": chatroom_id,
  774. # "wxid": wxid,
  775. # "contactWixd": contact_wxid,
  776. # "addTime": int(time.time())
  777. # })
  778. # await gewe_service.save_group_add_contacts_history_async(wxid, chatroom_id, contact_wxid, history)
  779. # wixd_add_contacts_from_chatrooms_times[wxid] += 1
  780. # logger.info(f'{nickname} 向 {chatroom_nickname}-{chatroom_id} 群的 {m.get("nickName")}-{m.get("wxid")} 发送好友邀请 {msg}')
  781. # # 推送到kafka
  782. # k_message = wx_add_contacts_from_chatroom_message(history.wxid, history.chatroomId, history.contactWixd, history.addTime)
  783. # await kafka_service.send_message_async(k_message)
  784. # # await asyncio.sleep(random.uniform(1.5, 3))
  785. # await asyncio.sleep(random.uniform(30, 60))
  786. # # 任务状态推送到kafka
  787. # task_status = await gewe_service.wx_add_contacts_from_chatroom_task_status_async(wxid, chatroom_id)
  788. # wx_add_contacts_from_chatroom_task_status_message(wxid, chatroom_id, task_status)
  789. # await kafka_service.send_message_async(k_message)
  790. # # 下一个群
  791. # await asyncio.sleep(random.uniform(1.5, 3))
  792. # async def task():
  793. # try:
  794. # now = datetime.now()
  795. # if now.hour < 8:
  796. # logger.info(f"定时群成员定时添好友任务不启动,当前时间为 {now.strftime('%Y-%m-%d %H:%M:%S')},早于8点")
  797. # return
  798. # logger.info('定时群成员定时添好友任务开始')
  799. # redis_service = RedisService()
  800. # await redis_service.init(**redis_config)
  801. # gewe_service = await GeWeService.get_instance(redis_service, gewe_config['api_url'])
  802. # KAFKA_BOOTSTRAP_SERVERS = kafka_config['bootstrap_servers']
  803. # KAFKA_TOPIC = kafka_config['topic']
  804. # KAFKA_GROUP_ID = kafka_config['group_id']
  805. # kafka_service = KafkaService(KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC, KAFKA_TOPIC, KAFKA_GROUP_ID)
  806. # await kafka_service.start_producer()
  807. # global_config = await gewe_service.get_global_config_from_cache_async()
  808. # scheduled_task_add_contacts_from_chatrooms_config = global_config.get('scheduledTaskAddContactsFromChatrooms', {})
  809. # oneday_add_contacts_total = 90
  810. # once_add_contacts_total = 30
  811. # # oneday_times=3
  812. # if scheduled_task_add_contacts_from_chatrooms_config:
  813. # oneday_add_contacts_total = scheduled_task_add_contacts_from_chatrooms_config.get('oneDayAddContactsTotal', 90)
  814. # once_add_contacts_total = scheduled_task_add_contacts_from_chatrooms_config.get('onceAddContactsTotal', 30)
  815. # # oneday_times=scheduled_task_add_contacts_from_chatrooms_config.get('oneDayTimes',3)
  816. # login_keys = []
  817. # async for key in redis_service.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'):
  818. # login_keys.append(key)
  819. # # 使用 asyncio.gather 并发处理每个 login_key
  820. # await asyncio.gather(*[process_login_key(redis_service, gewe_service, kafka_service, k, gewe_config, oneday_add_contacts_total, once_add_contacts_total) for k in login_keys])
  821. # except Exception as e:
  822. # # 获取当前的堆栈跟踪
  823. # tb = sys.exc_info()[2]
  824. # # 为异常附加堆栈跟踪
  825. # e = e.with_traceback(tb)
  826. # # 输出详细的错误信息
  827. # logger.error(f"任务执行过程中发生异常: {e}\n异常类型: {type(e).__name__}\n异常信息: {str(e)}\n堆栈跟踪: {traceback.format_exc()}")
  828. # finally:
  829. # await kafka_service.stop_producer()
  830. # loop = asyncio.get_event_loop()
  831. # if loop.is_closed():
  832. # loop = asyncio.new_event_loop()
  833. # asyncio.set_event_loop(loop)
  834. # loop.run_until_complete(task()) # 在现有事件循环中运行任务
  835. REDIS_KEY_PATTERN = "friend_add_limit:{date}"
  836. REDIS_LAST_RUN_KEY = "last_run_time:add_friends_task"
  837. @celery_app.task(name='tasks.add_friends_task', bind=True, acks_late=True)
  838. def add_friends_task(self,redis_config):
  839. """
  840. 限制每天最多 15 个,每 2 小时最多 8 个
  841. """
  842. async def task():
  843. redis_service = RedisService()
  844. await redis_service.init(**redis_config)
  845. today_str = datetime.now().strftime("%Y%m%d")
  846. redis_key = REDIS_KEY_PATTERN.format(date=today_str)
  847. # 获取当前总添加数量
  848. total_added = await redis_service.get_hash_field(redis_key, "total") or 0
  849. last_2h_added =await redis_service.get_hash_field(redis_key, "last_2h") or 0
  850. total_added = int(total_added)
  851. last_2h_added = int(last_2h_added)
  852. logger.info(f"当前添加好友总数: {total_added}, 过去2小时添加: {last_2h_added}")
  853. # 判断是否超过限制
  854. if total_added >= 15:
  855. logger.warning("今日好友添加已达上限!")
  856. return
  857. if last_2h_added >= 8:
  858. logger.warning("过去2小时添加已达上限!")
  859. return
  860. # 计算本次要添加的好友数量 (控制每天 5-15 个)
  861. max_add = min(15 - total_added, 8 - last_2h_added)
  862. if max_add <= 0:
  863. return
  864. num_to_add = min(max_add, 1) # 每次最多加 1 个
  865. logger.info(f"本次添加 {num_to_add} 位好友")
  866. # TODO: 调用好友添加逻辑 (接口 or 业务逻辑)
  867. # success = add_friends(num_to_add)
  868. success = num_to_add # 假设成功添加 num_to_add 个
  869. # 更新 Redis 计数
  870. if success > 0:
  871. await redis_service.increment_hash_field(redis_key, "total", success)
  872. await redis_service.increment_hash_field(redis_key, "last_2h", success)
  873. # 设置 Redis 过期时间 (每日记录存 1 天, 2 小时记录存 2 小时)
  874. await redis_service.expire(redis_key, 86400) # 24小时
  875. await redis_service.expire_field(redis_key, "last_2h", 7200) # 2小时
  876. logger.info(f"成功添加 {success} 位好友, 今日总数 {total_added + success}")
  877. # 生成一个新的随机时间(5-15 分钟之间)
  878. # next_interval = random.randint(10, 20)
  879. # # 计算新的执行时间
  880. # next_run_time = datetime.datetime.now() + timedelta(seconds=next_interval)
  881. # # 重新注册 RedBeat 任务,确保下次执行时间不同
  882. # redbeat_entry = RedBeatSchedulerEntry(
  883. # name="redbeat:add_friends_task",
  884. # task="tasks.add_friends_task",
  885. # schedule=celery.schedules.schedule(timedelta(seconds=next_interval)),
  886. # args=[redis_config],
  887. # app=celery_app
  888. # )
  889. # # 设置任务的下次执行时间
  890. # redbeat_entry.last_run_at = next_run_time
  891. # redbeat_entry.save()
  892. # logger.info(f"下次任务将在 {next_run_time} 执行(间隔 {next_interval} 秒)")
  893. loop = asyncio.get_event_loop()
  894. if loop.is_closed():
  895. loop = asyncio.new_event_loop()
  896. asyncio.set_event_loop(loop)
  897. loop.run_until_complete(task()) # 在现有事件循环中运行任务
  898. def today_seconds_remaining()->int:
  899. current_time = datetime.now()
  900. # 计算当天的结束时间(23:59:59)
  901. end_of_day = datetime(current_time.year, current_time.month, current_time.day, 23, 59, 59)
  902. # 计算时间差
  903. time_difference = end_of_day - current_time
  904. # 将时间差转换为秒数
  905. time_difference_seconds = int(time_difference.total_seconds())
  906. return time_difference_seconds
  907. @celery_app.task(name='tasks.random_scheduled_task', bind=True, acks_late=True)
  908. def random_scheduled_task(self,):
  909. print(f"Task executed at {datetime.now()}")
  910. # 随机生成下次执行时间(例如:10-60秒内的随机时间)
  911. next_run_in = random.randint(10, 60)
  912. print(f"Next execution will be in {next_run_in} seconds")
  913. # 设置下次执行时间
  914. entry = RedBeatSchedulerEntry(
  915. name='random-task',
  916. task='tasks.random_scheduled_task',
  917. schedule=timedelta(seconds=next_run_in),
  918. app=celery_app
  919. )
  920. entry.save()
  921. return f"Scheduled next run in {next_run_in} seconds"