diff --git a/app.py b/app.py index 0c2f50a..9bd34c9 100644 --- a/app.py +++ b/app.py @@ -7,9 +7,11 @@ from resources.groups_resources import GetGroupsInfoResource from resources.login_resources import GetLoginInfoResource,GetLoginWxQRCodeResource,LoginWxCaptchCodeResource from resources.sns_resources import SendSNSTextResource,SendSNSImageResource, SendSNSVideoResource from common.log import logger, log_exception -from common.interceptors import before_request, after_request, handle_exception +from common.interceptors import before_request, after_request, handle_exception,check_login_status import threading from common import kafka_helper, redis_helper,utils +import random + from model import Models @@ -164,24 +166,38 @@ def process_add_contacts_from_chatrooms(wxchat:gewe_chat.GeWeChatCom,status, wxi else: logger.info(f'微信ID {wxid} 未登录 {app_id} ,群成员不能定时定时') - def auto_add_contacts_from_to_add_contacts_queue(): ''' 从待添加好友队列中自动添加好友 ''' logger.info('自动添加好友') + wxchat=gewe_chat.wxchat while True: login_keys = list(redis_helper.redis_helper.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*')) - - # 遍历每一个获取到的登录键 - -# def background_wxchat_thread(): -# scan_wx_login_info() -# # 启动同步联系人线程 -# threading.Thread(target=fetch_and_save_contacts).start() -# threading.Thread(target=auto_add_contacts_from_chatrooms).start() - + for k in login_keys: + r= redis_helper.redis_helper.get_hash(k) + token_id = r.get('tokenId') + app_id = r.get('appId') + nickname=r.get('nickName') + wxid = r.get('wxid') + status=r.get('status') + if status == '1': + c = wxchat.get_wxchat_config_from_cache(wxid) + contacts = wxchat.get_contacts_brief_from_cache(wxid) + contact_wxids = [c.get('userName') for c in contacts] + add_contacts_queue = c.get('addContactsQueue', []) + for contact_wxid in add_contacts_queue: + if contact_wxid not in contact_wxids: + ret, msg, data = wxchat.add_contacts(token_id, app_id, contact_wxid) + if ret==200: + contact_wxids.append(contact_wxid) + wxchat.save_contacts_brief_to_cache(token_id, app_id, wxid, contact_wxids) + history=Models.AddContactsHistory.model_validate({ + "wxid":wxid, + "contactWixd":contact_wxid, + "addTime":int(time.time()) + }) def background_wxchat_thread(): lock_name = "background_wxchat_thread_lock" @@ -193,8 +209,10 @@ def background_wxchat_thread(): logger.info("分布式锁已成功获取") # 启动任务 scan_wx_login_info() - threading.Thread(target=fetch_and_save_contacts).start() - threading.Thread(target=auto_add_contacts_from_chatrooms).start() + #threading.Thread(target=fetch_and_save_contacts).start() + #threading.Thread(target=auto_add_contacts_from_chatrooms).start() + + threading.Thread(target=wx_thread_manager.check_and_manage_threads()).start() # 保持锁的续期 while True: @@ -246,8 +264,243 @@ def scan_wx_login_info(): if cursor == 0: break +class WechatThreadManager: + def __init__(self): + # 同步好友服务线程 + self.sync_contacts_worker_threads = {} + self.sync_contacts_task_threads = {} + # 群加好友服务线程 + self.add_contacts_from_group_worker_threads = {} + # 群加好友任务线程 + self.add_group_member_as_friend_task_threads = {} + + # 自动加好友线程 + self.add_contacts_threads = {} + self.wxchat=gewe_chat.wxchat + + def check_and_manage_threads(self): + + while True: + login_keys = list(redis_helper.redis_helper.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*')) + for k in login_keys: + r= redis_helper.redis_helper.get_hash(k) + token_id = r.get('tokenId') + app_id = r.get('appId') + nickname=r.get('nickName') + wxid = r.get('wxid') + status=r.get('status') + #print(status) + if status == '1': + #print(wxid not in self.add_contacts_from_group_worker_threads) + if wxid not in self.add_contacts_from_group_worker_threads: + print(f"微信 {wxid} 在线,创建群好友添加线程...") + self.add_contacts_from_group_worker_threads[wxid] = threading.Thread( + target=self.auto_add_contacts_from_group_worker, + kwargs={"wxid":wxid, "token_id": token_id, "app_id": app_id} # wxid 作为关键字参数 + ) + + self.add_contacts_from_group_worker_threads[wxid].start() + + + if wxid not in self.sync_contacts_worker_threads: + print(f"微信 {wxid} 在线,创建同步联系人任务...") + self.sync_contacts_worker_threads[wxid] = threading.Thread( + target=self.sync_contacts_processer, + kwargs={"wxid":wxid, "token_id": token_id, "app_id": app_id} + ) + self.sync_contacts_worker_threads[wxid].start() + + else: + if wxid in self.add_contacts_from_group_worker_threads: + print(f"微信 {wxid} 离线,关闭群好友添加线程...") + self.add_contacts_from_group_worker_threads[wxid].join() + del self.add_contacts_from_group_worker_threads[wxid] + if wxid in self.sync_contacts_worker_threads: + print(f"微信 {wxid} 离线,关闭同步联系线程线程...") + self.sync_contacts_worker_threads[wxid].join() + del self.sync_contacts_worker_threads[wxid] + print('关闭线程') + time.sleep(0) + + def auto_add_contacts_worker(self,wxid): + while True: + try: + to_add_contact= self.wxchat.dequeue_to_add_contacts(wxid) + if not to_add_contact: + continue + + except Exception as e: + logger.error(f'{wxid} 自动添加好友出错') + + def add_friends2(self,wxid): + ''' + 24 小时只能加 5-15 位好友,每 2 小时不要超过 8 人,每个好友添加间隔要做随机间隔 + ''' + total_friends = 0 + start_time = time.time() + last_friend_time = start_time + + while time.time() - start_time < 24 * 60 * 60: # 24 小时内 + # 每 2 小时内不超过 8 个好友 + if time.time() - last_friend_time < 2 * 60 * 60: + if len([t for t in [last_friend_time] if t > time.time() - 2 * 60 * 60]) >= 8: + time.sleep(1) + continue + + # 添加好友 + print(f"添加好友 {total_friends + 1}") + total_friends += 1 + + # 随机间隔 + interval = random.randint(1, 10) * 60 # 1到10分钟的随机间隔 + time.sleep(interval) + + # 更新最后添加好友的时间 + last_friend_time = time.time() + + # 如果已经添加了 15 个好友,提前退出 + if total_friends >= 15: + break + + def add_friends(): + total_friends = 0 + start_time = time.time() + friend_times = [] # 用于记录每次添加好友的时间 + + while time.time() - start_time < 24 * 60 * 60: # 24 小时内 + # 检查最近 2 小时内是否已经添加了 8 个好友 + current_time = time.time() + recent_friends = [t for t in friend_times if t > current_time - 2 * 60 * 60] + if len(recent_friends) >= 8: + # 如果已经添加了 8 个好友,等待一段时间再检查 + time.sleep(1) + continue + + # 添加好友 + print(f"添加好友 {total_friends + 1}") + total_friends += 1 + + # 记录添加好友的时间 + friend_times.append(current_time) + + # 随机间隔 + interval = random.randint(1, 10) * 60 # 1到10分钟的随机间隔 + time.sleep(interval) + + # 如果已经添加了 15 个好友,提前退出 + if total_friends >= 15: + break + + def auto_add_contacts_from_group_worker(self,wxid, token_id, app_id): + ''' + 从群添加好友任务 + ''' + while True: + k, login_info = utils.get_login_info_by_wxid(wxid) + if login_info.get('status') == "0": + return + c = self.wxchat.get_wxchat_config_from_cache(wxid) + contacts = self.wxchat.get_contacts_brief_from_cache(wxid) + contact_wxids = [c.get('userName') for c in contacts] + chatrooms = c.get('addContactsFromChatroomIdWhiteList', []) + for chatroom_id in chatrooms: + chatroom = self.wxchat.get_group_info_from_cache(wxid, chatroom_id) + chatroom_member=self.wxchat.get_group_members_from_cache(wxid, chatroom_id) + chatroom_nickname = chatroom.get('nickName') + chatroom_owner_wxid = chatroom_member.get('chatroomOwner', None) + admin_wxid = chatroom_member.get('adminWxid', None) + #logger.info(f'{chatroom_nickname} 的群主是 {chatroom_owner_wxid},管理员是{admin_wxid}') + contact_wxids_set = set(contact_wxids) + if admin_wxid is not None: + contact_wxids_set.add(admin_wxid) + if chatroom_owner_wxid is not None: + contact_wxids_set.add(chatroom_owner_wxid) + + contact_wxids_set.add(wxid) + + unavailable_wixds=self.wxchat.check_wixd_group_add_contacts_history(wxid,chatroom_id) + contact_wxids_set.update(unavailable_wixds) + + chatroot_member_list = chatroom.get('memberList', []) + remaining_chatroot_members = [x for x in chatroot_member_list if x.get('wxid') not in contact_wxids_set] + + nickname = next((member['nickName'] for member in chatroot_member_list if member['wxid'] == wxid), None) + + #logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里还可以邀请的好友有:{[x.get("nickName") for x in remaining_chatroot_members]}') + for m in remaining_chatroot_members: + tasks_id=f'{wxid}-{chatroom_id}-{m.get("wxid")}' + if tasks_id not in self.add_group_member_as_friend_task_threads: + self.add_group_member_as_friend_task_threads[tasks_id] =threading.Thread(target=self.add_group_member_as_friend_thread, + args=(token_id, app_id, wxid,chatroom_id,m,nickname,chatroom_nickname,) + ) + #self.add_group_member_as_friend_task_threads[tasks_id].daemon = True # 设置为守护线程 + self.add_group_member_as_friend_task_threads[tasks_id].start() + time.sleep(0) + + def sync_contacts_processer(self,wxid, token_id, app_id): + while True: + k, login_info = utils.get_login_info_by_wxid(wxid) + if login_info.get('status') == "0": + print('sync_contacts_processer exit') + #self.sync_contacts_task_threads[wxid].join() + del self.sync_contacts_task_threads[wxid] + return + if wxid not in self.sync_contacts_task_threads: + self.sync_contacts_task_threads[wxid]=threading.Thread(target=self.sync_contacts_threads_processer, args=(wxid,token_id,app_id)) + #self.sync_contacts_task_threads[wxid].daemon = True + self.sync_contacts_task_threads[wxid].start() + time.sleep(0) + + def sync_contacts_threads_processer(self,wxid, token_id, app_id): + ret,msg,contacts_list = self.wxchat.fetch_contacts_list(token_id, app_id) + if ret==200: + friend_wxids = [c for c in contacts_list['friends'] if c not in ['fmessage', 'medianote','weixin','weixingongzhong']] # 可以调整截取范围 + print(f'{wxid}的好友数量 {len(friend_wxids)}') + self.wxchat.save_contacts_brief_to_cache(token_id, app_id, wxid, friend_wxids) + logger.info(f'微信ID {wxid} 登录APPID {app_id} 成功,联系人已定时保存') + chatrooms=contacts_list['chatrooms'] + self.wxchat.save_groups_info_to_cache(token_id, app_id, wxid, chatrooms) + self.wxchat.save_groups_members_to_cache(token_id, app_id, wxid, chatrooms) + logger.info(f'微信ID {wxid} 登录APPID {app_id} 成功,群信息已定时保存') + else: + logger.warning(f'{msg}-微信ID {wxid} 登录APPID {app_id} 不能获取好友和群资料') + time.sleep(random.uniform(3000, 3600)) + + def add_group_member_as_friend_thread(self,token_id, app_id, wxid,chatroom_id,chatroot_member,nickname,chatroom_nickname): + ''' + 从群添加好友线程, + ''' + tasks_id=f'{wxid}-{chatroom_id}-{chatroot_member.get("wxid")}' + k, login_info = utils.get_login_info_by_wxid(wxid) + if login_info.get('status') == "0": + del self.add_group_member_as_friend_task_threads[tasks_id] + return + + contact_history=self.wxchat.get_group_add_contacts_history(wxid,chatroom_id,chatroot_member.get('wxid')) + if len(contact_history)==1: + last_contact_history=contact_history[0] + diff_time=int(time.time())-last_contact_history.addTime + oneday_seconds=60 * 60 * 24 + if diff_time 0: + wxid = args[0] + else: + raise ValueError("wxid is required") + + k, login_info = utils.get_login_info_by_wxid(wxid) + if login_info.get('status') == "0": + return # 如果 status 为 "0",直接返回,不执行原函数 + + return func(*args, **kwargs) + return wrapper + + def auth_required_time(f): @wraps(f) def decorated_function(*args, **kwargs): diff --git a/wechat/gewe_chat.py b/wechat/gewe_chat.py index 136e03b..85c9436 100644 --- a/wechat/gewe_chat.py +++ b/wechat/gewe_chat.py @@ -1016,7 +1016,7 @@ class GeWeChatCom: data.append(history.model_dump()) redis_helper.redis_helper.update_hash_field(hash_key, contact_wxid, json.dumps(data, ensure_ascii=False)) - def get_group_add_contacts_history(self,wxid,chatroom_id,contact_wxid): + def get_group_add_contacts_history(self,wxid,chatroom_id,contact_wxid)->list[Models.AddGroupContactsHistory]: ''' 获取群加好友历史 '''