from flask import Flask, send_from_directory, request,jsonify from flask_restful import Api,got_request_exception from resources.messages_resource import MessagesResource from resources.contacts_resources import DeleteFriendResource,GetFriendsInfoResource from resources.config_reources import GetWxchatConfigResource ,SaveWxchatConfigResource from resources.groups_resources import GetGroupsInfoResource,GetGroupMemberList from resources.login_resources import GetLoginInfoResource,GetLoginWxQRCodeResource,LoginWxCaptchCodeResource from resources.sns_resources import SendSNSTextResource,SendSNSImageResource, SendSNSVideoResource from common.log import logger, log_exception from common.interceptors import before_request, after_request, handle_exception,check_login_status import threading from common import kafka_helper, redis_helper,utils import random from model import Models import logging from config import load_config from wechat.biz import start_kafka_consumer_thread from wechat import gewe_chat import os,time,json,time from voice.ali.ali_voice import AliVoice # 自定义错误消息 errors = { 'UserAlreadyExistsError': { 'message': "A user with that username already exists.", 'status': 409, }, 'ResourceDoesNotExist': { 'message': "A resource with that ID no longer exists.", 'status': 410, 'extra': "Any extra information you want.", }, } def fetch_and_save_contacts(): """ 获取联系人列表并保存到缓存 """ wxchat=gewe_chat.wxchat while True: try: login_keys = list(redis_helper.redis_helper.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*')) logger.info(f"Fetching login keys: {login_keys}") # 遍历每一个获取到的登录键 for k in login_keys: try: r= redis_helper.redis_helper.get_hash(k) # print(r) token_id = r.get('tokenId') app_id = r.get('appId') wxid = r.get('wxid') status=r.get('status') if status=='1': ret,msg,contacts_list = wxchat.fetch_contacts_list(token_id, app_id) if ret==200: friend_wxids = [c for c in contacts_list['friends'] if c not in ['fmessage', 'medianote','weixin','weixingongzhong']] # 可以调整截取范围 print(f'{wxid}的好友数量 {len(friend_wxids)}') wxchat.save_contacts_brief_to_cache(token_id, app_id, wxid, friend_wxids) logger.info(f'微信ID {wxid} 登录APPID {app_id} 成功,联系人已定时保存') chatrooms=contacts_list['chatrooms'] wxchat.save_groups_info_to_cache(token_id, app_id, wxid, chatrooms) wxchat.save_groups_members_to_cache(token_id, app_id, wxid, chatrooms) logger.info(f'微信ID {wxid} 登录APPID {app_id} 成功,群信息已定时保存') else: logger.warning(f'{msg}-微信ID {wxid} 登录APPID {app_id} 不能获取好友和群资料') else: logger.info(f'微信ID {wxid} 未登录 {app_id} ,联系人不能定时保存') time.sleep(3) except Exception as e: logger.error(f'处理好友和群资料出错 login key {k}: {str(e)}', exc_info=True) except Exception as e: logger.error(f'发送错误 {str(e)}', exc_info=True) time.sleep(3600*1) def auto_add_contacts_from_chatrooms(): ''' 从群中自动添加好友 ''' logger.info('自动从群添加好友') wxchat=gewe_chat.wxchat while True: login_keys = list(redis_helper.redis_helper.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*')) #logger.info(f"Fetching login keys: {login_keys}") # 遍历每一个获取到的登录键 for k in login_keys: r= redis_helper.redis_helper.get_hash(k) # print(r) token_id = r.get('tokenId') app_id = r.get('appId') nickname=r.get('nickName') wxid = r.get('wxid') status=r.get('status') # 启动线程处理登录信息 thread = threading.Thread(target=process_add_contacts_from_chatrooms, args=(wxchat,status, wxid, token_id, app_id)) thread.start() time.sleep(3) #time.sleep(60*10) time.sleep(3600*24) def process_add_contacts_from_chatrooms(wxchat:gewe_chat.GeWeChatCom,status, wxid, token_id, app_id): if status == '1': c = wxchat.get_wxchat_config_from_cache(wxid) contacts = wxchat.get_contacts_brief_from_cache(wxid) contact_wxids = [c.get('userName') for c in contacts] chatrooms = c.get('addContactsFromChatroomIdWhiteList', []) for chatroom_id in chatrooms: chatroom = wxchat.get_group_info_from_cache(wxid, chatroom_id) chatroom_member=wxchat.get_group_members_from_cache(wxid, chatroom_id) chatroom_nickname = chatroom.get('nickName') chatroom_owner_wxid = chatroom_member.get('chatroomOwner', None) admin_wxid = chatroom_member.get('adminWxid', None) logger.info(f'{chatroom_nickname} 的群主是 {chatroom_owner_wxid},管理员是{admin_wxid}') contact_wxids_set = set(contact_wxids) if admin_wxid is not None: contact_wxids_set.add(admin_wxid) if chatroom_owner_wxid is not None: contact_wxids_set.add(chatroom_owner_wxid) contact_wxids_set.add(wxid) unavailable_wixds=wxchat.check_wixd_group_add_contacts_history(wxid,chatroom_id) contact_wxids_set.update(unavailable_wixds) chatroot_member_list = chatroom.get('memberList', []) remaining_chatroot_members = [x for x in chatroot_member_list if x.get('wxid') not in contact_wxids_set] nickname = next((member['nickName'] for member in chatroot_member_list if member['wxid'] == wxid), None) logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里还可以邀请的好友有:{[x.get("nickName") for x in remaining_chatroot_members]}') for m in remaining_chatroot_members: ret, msg, data = wxchat.add_group_member_as_friend(token_id, app_id, chatroom_id, m.get('wxid'), f'我是群聊"{chatroom_nickname}"群的{nickname}') if ret==200: contact_wxids= m.get('wxid') history=Models.AddGroupContactsHistory.model_validate({ "chatroomId":chatroom_id, "wxid":wxid, "contactWixd":contact_wxids, "addTime":int(time.time()) }) wxchat.save_group_add_contacts_history(wxid,chatroom_id,contact_wxids,history) else: logger.info(f'群好友邀请失败原因:{data}') logger.info(f'{nickname} 向 {chatroom_nickname}-{chatroom_id} 群的 {m.get("nickName")}-{m.get("wxid")} 发送好友邀请 {msg}') time.sleep(10) time.sleep(20) else: logger.info(f'微信ID {wxid} 未登录 {app_id} ,群成员不能定时定时') def auto_add_contacts_from_to_add_contacts_queue(): ''' 从待添加好友队列中自动添加好友 ''' logger.info('自动添加好友') wxchat=gewe_chat.wxchat while True: login_keys = list(redis_helper.redis_helper.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*')) for k in login_keys: r= redis_helper.redis_helper.get_hash(k) token_id = r.get('tokenId') app_id = r.get('appId') nickname=r.get('nickName') wxid = r.get('wxid') status=r.get('status') if status == '1': c = wxchat.get_wxchat_config_from_cache(wxid) contacts = wxchat.get_contacts_brief_from_cache(wxid) contact_wxids = [c.get('userName') for c in contacts] add_contacts_queue = c.get('addContactsQueue', []) for contact_wxid in add_contacts_queue: if contact_wxid not in contact_wxids: ret, msg, data = wxchat.add_contacts(token_id, app_id, contact_wxid) if ret==200: contact_wxids.append(contact_wxid) wxchat.save_contacts_brief_to_cache(token_id, app_id, wxid, contact_wxids) history=Models.AddContactsHistory.model_validate({ "wxid":wxid, "contactWixd":contact_wxid, "addTime":int(time.time()) }) def background_wxchat_thread(): lock_name = "background_wxchat_thread_lock" lock_identifier = str(time.time()) # 使用时间戳作为唯一标识 # 尝试获取分布式锁 if redis_helper.redis_helper.acquire_lock(lock_name, timeout=60): try: logger.info("分布式锁已成功获取") # 启动任务 scan_wx_login_info() #threading.Thread(target=fetch_and_save_contacts).start() #threading.Thread(target=auto_add_contacts_from_chatrooms).start() threading.Thread(target=wx_thread_manager.check_and_manage_threads()).start() # 保持锁的续期 while True: time.sleep(30) # 每30秒检查一次锁的状态 if not redis_helper.redis_helper.renew_lock(lock_name, lock_identifier, timeout=60): break # 如果无法续期锁,退出循环 finally: # 释放锁 redis_helper.redis_helper.release_lock(lock_name, lock_identifier) else: # 如果获取锁失败,等待一段时间后重试 time.sleep(10) background_wxchat_thread() def scan_wx_login_info(): wxchat = gewe_chat.wxchat cursor = 0 while True: cursor, login_keys = redis_helper.redis_helper.client.scan(cursor, match='__AI_OPS_WX__:LOGININFO:*') for k in login_keys: r = redis_helper.redis_helper.get_hash(k) app_id=r.get("appId") #tel=r.get("mobile") token_id=r.get("tokenId") wxid=r.get("wxid") is_online = wxchat.check_online(token_id, app_id) if is_online: logger.info(f'微信ID {wxid} 在APPID {app_id} 已经在线') else: # 尝试重连 res = wxchat.reconnection(token_id, app_id) if res.get('ret') == 200: logger.info(f'微信ID {wxid} 在APPID {app_id} 重连成功') # 同步联系人 #fetch_and_save_contacts(wxchat, token_id, app_id, k) else: print("重连失败,重新登录...") print("发送离线消息到kafka") redis_helper.redis_helper.update_hash_field(k,'status',0) time.sleep(3) # 如果游标为 0,则表示扫描完成 if cursor == 0: break class WechatThreadManager: def __init__(self): # 同步好友服务线程 self.sync_contacts_worker_threads = {} self.sync_contacts_task_threads = {} # 群加好友服务线程 self.add_contacts_from_group_worker_threads = {} # 群加好友任务线程 self.add_group_member_as_friend_task_threads = {} # 自动加好友线程 self.add_contacts_threads = {} self.wxchat=gewe_chat.wxchat def check_and_manage_threads(self): while True: login_keys = list(redis_helper.redis_helper.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*')) for k in login_keys: r= redis_helper.redis_helper.get_hash(k) token_id = r.get('tokenId') app_id = r.get('appId') nickname=r.get('nickName') wxid = r.get('wxid') status=r.get('status') #print(status) if status == '1': #print(wxid not in self.add_contacts_from_group_worker_threads) if wxid not in self.add_contacts_from_group_worker_threads: print(f"微信 {wxid} 在线,创建群好友添加线程...") self.add_contacts_from_group_worker_threads[wxid] = threading.Thread( target=self.auto_add_contacts_from_group_worker, kwargs={"wxid":wxid, "token_id": token_id, "app_id": app_id} # wxid 作为关键字参数 ) self.add_contacts_from_group_worker_threads[wxid].start() if wxid not in self.sync_contacts_worker_threads: print(f"微信 {wxid} 在线,创建同步联系人任务...") self.sync_contacts_worker_threads[wxid] = threading.Thread( target=self.sync_contacts_processer, kwargs={"wxid":wxid, "token_id": token_id, "app_id": app_id} ) self.sync_contacts_worker_threads[wxid].start() else: if wxid in self.add_contacts_from_group_worker_threads: print(f"微信 {wxid} 离线,关闭群好友添加线程...") self.add_contacts_from_group_worker_threads[wxid].join() del self.add_contacts_from_group_worker_threads[wxid] if wxid in self.sync_contacts_worker_threads: print(f"微信 {wxid} 离线,关闭同步联系线程线程...") self.sync_contacts_worker_threads[wxid].join() del self.sync_contacts_worker_threads[wxid] print('关闭线程') time.sleep(0) def auto_add_contacts_worker(self,wxid): while True: try: to_add_contact= self.wxchat.dequeue_to_add_contacts(wxid) if not to_add_contact: continue except Exception as e: logger.error(f'{wxid} 自动添加好友出错') def add_friends2(self,wxid): ''' 24 小时只能加 5-15 位好友,每 2 小时不要超过 8 人,每个好友添加间隔要做随机间隔 ''' total_friends = 0 start_time = time.time() last_friend_time = start_time while time.time() - start_time < 24 * 60 * 60: # 24 小时内 # 每 2 小时内不超过 8 个好友 if time.time() - last_friend_time < 2 * 60 * 60: if len([t for t in [last_friend_time] if t > time.time() - 2 * 60 * 60]) >= 8: time.sleep(1) continue # 添加好友 print(f"添加好友 {total_friends + 1}") total_friends += 1 # 随机间隔 interval = random.randint(1, 10) * 60 # 1到10分钟的随机间隔 time.sleep(interval) # 更新最后添加好友的时间 last_friend_time = time.time() # 如果已经添加了 15 个好友,提前退出 if total_friends >= 15: break def add_friends(): total_friends = 0 start_time = time.time() friend_times = [] # 用于记录每次添加好友的时间 while time.time() - start_time < 24 * 60 * 60: # 24 小时内 # 检查最近 2 小时内是否已经添加了 8 个好友 current_time = time.time() recent_friends = [t for t in friend_times if t > current_time - 2 * 60 * 60] if len(recent_friends) >= 8: # 如果已经添加了 8 个好友,等待一段时间再检查 time.sleep(1) continue # 添加好友 print(f"添加好友 {total_friends + 1}") total_friends += 1 # 记录添加好友的时间 friend_times.append(current_time) # 随机间隔 interval = random.randint(1, 10) * 60 # 1到10分钟的随机间隔 time.sleep(interval) # 如果已经添加了 15 个好友,提前退出 if total_friends >= 15: break def auto_add_contacts_from_group_worker(self,wxid, token_id, app_id): ''' 从群添加好友任务 ''' while True: k, login_info = utils.get_login_info_by_wxid(wxid) if login_info.get('status') == "0": return c = self.wxchat.get_wxchat_config_from_cache(wxid) contacts = self.wxchat.get_contacts_brief_from_cache(wxid) contact_wxids = [c.get('userName') for c in contacts] chatrooms = c.get('addContactsFromChatroomIdWhiteList', []) for chatroom_id in chatrooms: chatroom = self.wxchat.get_group_info_from_cache(wxid, chatroom_id) chatroom_member=self.wxchat.get_group_members_from_cache(wxid, chatroom_id) chatroom_nickname = chatroom.get('nickName') chatroom_owner_wxid = chatroom_member.get('chatroomOwner', None) admin_wxid = chatroom_member.get('adminWxid', None) #logger.info(f'{chatroom_nickname} 的群主是 {chatroom_owner_wxid},管理员是{admin_wxid}') contact_wxids_set = set(contact_wxids) if admin_wxid is not None: contact_wxids_set.add(admin_wxid) if chatroom_owner_wxid is not None: contact_wxids_set.add(chatroom_owner_wxid) contact_wxids_set.add(wxid) unavailable_wixds=self.wxchat.check_wixd_group_add_contacts_history(wxid,chatroom_id) contact_wxids_set.update(unavailable_wixds) chatroot_member_list = chatroom.get('memberList', []) remaining_chatroot_members = [x for x in chatroot_member_list if x.get('wxid') not in contact_wxids_set] nickname = next((member['nickName'] for member in chatroot_member_list if member['wxid'] == wxid), None) #logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里还可以邀请的好友有:{[x.get("nickName") for x in remaining_chatroot_members]}') for m in remaining_chatroot_members: tasks_id=f'{wxid}-{chatroom_id}-{m.get("wxid")}' if tasks_id not in self.add_group_member_as_friend_task_threads: self.add_group_member_as_friend_task_threads[tasks_id] =threading.Thread(target=self.add_group_member_as_friend_thread, args=(token_id, app_id, wxid,chatroom_id,m,nickname,chatroom_nickname,) ) #self.add_group_member_as_friend_task_threads[tasks_id].daemon = True # 设置为守护线程 self.add_group_member_as_friend_task_threads[tasks_id].start() time.sleep(0) def sync_contacts_processer(self,wxid, token_id, app_id): while True: k, login_info = utils.get_login_info_by_wxid(wxid) if login_info.get('status') == "0": print('sync_contacts_processer exit') #self.sync_contacts_task_threads[wxid].join() del self.sync_contacts_task_threads[wxid] return if wxid not in self.sync_contacts_task_threads: self.sync_contacts_task_threads[wxid]=threading.Thread(target=self.sync_contacts_threads_processer, args=(wxid,token_id,app_id)) #self.sync_contacts_task_threads[wxid].daemon = True self.sync_contacts_task_threads[wxid].start() time.sleep(0) def sync_contacts_threads_processer(self,wxid, token_id, app_id): ret,msg,contacts_list = self.wxchat.fetch_contacts_list(token_id, app_id) if ret==200: friend_wxids = [c for c in contacts_list['friends'] if c not in ['fmessage', 'medianote','weixin','weixingongzhong']] # 可以调整截取范围 print(f'{wxid}的好友数量 {len(friend_wxids)}') self.wxchat.save_contacts_brief_to_cache(token_id, app_id, wxid, friend_wxids) logger.info(f'微信ID {wxid} 登录APPID {app_id} 成功,联系人已定时保存') chatrooms=contacts_list['chatrooms'] self.wxchat.save_groups_info_to_cache(token_id, app_id, wxid, chatrooms) self.wxchat.save_groups_members_to_cache(token_id, app_id, wxid, chatrooms) logger.info(f'微信ID {wxid} 登录APPID {app_id} 成功,群信息已定时保存') else: logger.warning(f'{msg}-微信ID {wxid} 登录APPID {app_id} 不能获取好友和群资料') time.sleep(random.uniform(3000, 3600)) def add_group_member_as_friend_thread(self,token_id, app_id, wxid,chatroom_id,chatroot_member,nickname,chatroom_nickname): ''' 从群添加好友线程, ''' tasks_id=f'{wxid}-{chatroom_id}-{chatroot_member.get("wxid")}' k, login_info = utils.get_login_info_by_wxid(wxid) if login_info.get('status') == "0": del self.add_group_member_as_friend_task_threads[tasks_id] return contact_history=self.wxchat.get_group_add_contacts_history(wxid,chatroom_id,chatroot_member.get('wxid')) if len(contact_history)==1: last_contact_history=contact_history[0] diff_time=int(time.time())-last_contact_history.addTime oneday_seconds=60 * 60 * 24 if diff_time