H Vs pirms 1 mēnesi
vecāks
revīzija
5c31d2994f
3 mainītis faili ar 288 papildinājumiem un 15 dzēšanām
  1. +268
    -14
      app.py
  2. +19
    -0
      common/interceptors.py
  3. +1
    -1
      wechat/gewe_chat.py

+ 268
- 14
app.py Parādīt failu

@@ -7,9 +7,11 @@ from resources.groups_resources import GetGroupsInfoResource
from resources.login_resources import GetLoginInfoResource,GetLoginWxQRCodeResource,LoginWxCaptchCodeResource
from resources.sns_resources import SendSNSTextResource,SendSNSImageResource, SendSNSVideoResource
from common.log import logger, log_exception
from common.interceptors import before_request, after_request, handle_exception
from common.interceptors import before_request, after_request, handle_exception,check_login_status
import threading
from common import kafka_helper, redis_helper,utils
import random



from model import Models
@@ -164,24 +166,38 @@ def process_add_contacts_from_chatrooms(wxchat:gewe_chat.GeWeChatCom,status, wxi
else:
logger.info(f'微信ID {wxid} 未登录 {app_id} ,群成员不能定时定时')


def auto_add_contacts_from_to_add_contacts_queue():
'''
从待添加好友队列中自动添加好友
'''
logger.info('自动添加好友')
wxchat=gewe_chat.wxchat

while True:
login_keys = list(redis_helper.redis_helper.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'))

# 遍历每一个获取到的登录键

# def background_wxchat_thread():
# scan_wx_login_info()
# # 启动同步联系人线程
# threading.Thread(target=fetch_and_save_contacts).start()
# threading.Thread(target=auto_add_contacts_from_chatrooms).start()

for k in login_keys:
r= redis_helper.redis_helper.get_hash(k)
token_id = r.get('tokenId')
app_id = r.get('appId')
nickname=r.get('nickName')
wxid = r.get('wxid')
status=r.get('status')
if status == '1':
c = wxchat.get_wxchat_config_from_cache(wxid)
contacts = wxchat.get_contacts_brief_from_cache(wxid)
contact_wxids = [c.get('userName') for c in contacts]
add_contacts_queue = c.get('addContactsQueue', [])
for contact_wxid in add_contacts_queue:
if contact_wxid not in contact_wxids:
ret, msg, data = wxchat.add_contacts(token_id, app_id, contact_wxid)
if ret==200:
contact_wxids.append(contact_wxid)
wxchat.save_contacts_brief_to_cache(token_id, app_id, wxid, contact_wxids)
history=Models.AddContactsHistory.model_validate({
"wxid":wxid,
"contactWixd":contact_wxid,
"addTime":int(time.time())
})

def background_wxchat_thread():
lock_name = "background_wxchat_thread_lock"
@@ -193,8 +209,10 @@ def background_wxchat_thread():
logger.info("分布式锁已成功获取")
# 启动任务
scan_wx_login_info()
threading.Thread(target=fetch_and_save_contacts).start()
threading.Thread(target=auto_add_contacts_from_chatrooms).start()
#threading.Thread(target=fetch_and_save_contacts).start()
#threading.Thread(target=auto_add_contacts_from_chatrooms).start()
threading.Thread(target=wx_thread_manager.check_and_manage_threads()).start()

# 保持锁的续期
while True:
@@ -246,8 +264,243 @@ def scan_wx_login_info():
if cursor == 0:
break
class WechatThreadManager:
def __init__(self):
# 同步好友服务线程
self.sync_contacts_worker_threads = {}
self.sync_contacts_task_threads = {}
# 群加好友服务线程
self.add_contacts_from_group_worker_threads = {}
# 群加好友任务线程
self.add_group_member_as_friend_task_threads = {}

# 自动加好友线程
self.add_contacts_threads = {}


self.wxchat=gewe_chat.wxchat

def check_and_manage_threads(self):
while True:
login_keys = list(redis_helper.redis_helper.client.scan_iter(match='__AI_OPS_WX__:LOGININFO:*'))
for k in login_keys:
r= redis_helper.redis_helper.get_hash(k)
token_id = r.get('tokenId')
app_id = r.get('appId')
nickname=r.get('nickName')
wxid = r.get('wxid')
status=r.get('status')
#print(status)
if status == '1':
#print(wxid not in self.add_contacts_from_group_worker_threads)
if wxid not in self.add_contacts_from_group_worker_threads:
print(f"微信 {wxid} 在线,创建群好友添加线程...")
self.add_contacts_from_group_worker_threads[wxid] = threading.Thread(
target=self.auto_add_contacts_from_group_worker,
kwargs={"wxid":wxid, "token_id": token_id, "app_id": app_id} # wxid 作为关键字参数
)

self.add_contacts_from_group_worker_threads[wxid].start()

if wxid not in self.sync_contacts_worker_threads:
print(f"微信 {wxid} 在线,创建同步联系人任务...")
self.sync_contacts_worker_threads[wxid] = threading.Thread(
target=self.sync_contacts_processer,
kwargs={"wxid":wxid, "token_id": token_id, "app_id": app_id}
)
self.sync_contacts_worker_threads[wxid].start()

else:
if wxid in self.add_contacts_from_group_worker_threads:
print(f"微信 {wxid} 离线,关闭群好友添加线程...")
self.add_contacts_from_group_worker_threads[wxid].join()
del self.add_contacts_from_group_worker_threads[wxid]
if wxid in self.sync_contacts_worker_threads:
print(f"微信 {wxid} 离线,关闭同步联系线程线程...")
self.sync_contacts_worker_threads[wxid].join()
del self.sync_contacts_worker_threads[wxid]
print('关闭线程')
time.sleep(0)
def auto_add_contacts_worker(self,wxid):
while True:
try:
to_add_contact= self.wxchat.dequeue_to_add_contacts(wxid)
if not to_add_contact:
continue
except Exception as e:
logger.error(f'{wxid} 自动添加好友出错')
def add_friends2(self,wxid):
'''
24 小时只能加 5-15 位好友,每 2 小时不要超过 8 人,每个好友添加间隔要做随机间隔
'''
total_friends = 0
start_time = time.time()
last_friend_time = start_time
while time.time() - start_time < 24 * 60 * 60: # 24 小时内
# 每 2 小时内不超过 8 个好友
if time.time() - last_friend_time < 2 * 60 * 60:
if len([t for t in [last_friend_time] if t > time.time() - 2 * 60 * 60]) >= 8:
time.sleep(1)
continue

# 添加好友
print(f"添加好友 {total_friends + 1}")
total_friends += 1

# 随机间隔
interval = random.randint(1, 10) * 60 # 1到10分钟的随机间隔
time.sleep(interval)

# 更新最后添加好友的时间
last_friend_time = time.time()

# 如果已经添加了 15 个好友,提前退出
if total_friends >= 15:
break

def add_friends():
total_friends = 0
start_time = time.time()
friend_times = [] # 用于记录每次添加好友的时间

while time.time() - start_time < 24 * 60 * 60: # 24 小时内
# 检查最近 2 小时内是否已经添加了 8 个好友
current_time = time.time()
recent_friends = [t for t in friend_times if t > current_time - 2 * 60 * 60]
if len(recent_friends) >= 8:
# 如果已经添加了 8 个好友,等待一段时间再检查
time.sleep(1)
continue

# 添加好友
print(f"添加好友 {total_friends + 1}")
total_friends += 1

# 记录添加好友的时间
friend_times.append(current_time)

# 随机间隔
interval = random.randint(1, 10) * 60 # 1到10分钟的随机间隔
time.sleep(interval)

# 如果已经添加了 15 个好友,提前退出
if total_friends >= 15:
break
def auto_add_contacts_from_group_worker(self,wxid, token_id, app_id):
'''
从群添加好友任务
'''
while True:
k, login_info = utils.get_login_info_by_wxid(wxid)
if login_info.get('status') == "0":
return
c = self.wxchat.get_wxchat_config_from_cache(wxid)
contacts = self.wxchat.get_contacts_brief_from_cache(wxid)
contact_wxids = [c.get('userName') for c in contacts]
chatrooms = c.get('addContactsFromChatroomIdWhiteList', [])
for chatroom_id in chatrooms:
chatroom = self.wxchat.get_group_info_from_cache(wxid, chatroom_id)
chatroom_member=self.wxchat.get_group_members_from_cache(wxid, chatroom_id)
chatroom_nickname = chatroom.get('nickName')
chatroom_owner_wxid = chatroom_member.get('chatroomOwner', None)
admin_wxid = chatroom_member.get('adminWxid', None)
#logger.info(f'{chatroom_nickname} 的群主是 {chatroom_owner_wxid},管理员是{admin_wxid}')
contact_wxids_set = set(contact_wxids)
if admin_wxid is not None:
contact_wxids_set.add(admin_wxid)
if chatroom_owner_wxid is not None:
contact_wxids_set.add(chatroom_owner_wxid)
contact_wxids_set.add(wxid)

unavailable_wixds=self.wxchat.check_wixd_group_add_contacts_history(wxid,chatroom_id)
contact_wxids_set.update(unavailable_wixds)

chatroot_member_list = chatroom.get('memberList', [])
remaining_chatroot_members = [x for x in chatroot_member_list if x.get('wxid') not in contact_wxids_set]
nickname = next((member['nickName'] for member in chatroot_member_list if member['wxid'] == wxid), None)

#logger.info(f'{nickname}-{wxid} 在 {chatroom_nickname} 群里还可以邀请的好友有:{[x.get("nickName") for x in remaining_chatroot_members]}')
for m in remaining_chatroot_members:
tasks_id=f'{wxid}-{chatroom_id}-{m.get("wxid")}'
if tasks_id not in self.add_group_member_as_friend_task_threads:
self.add_group_member_as_friend_task_threads[tasks_id] =threading.Thread(target=self.add_group_member_as_friend_thread,
args=(token_id, app_id, wxid,chatroom_id,m,nickname,chatroom_nickname,)
)
#self.add_group_member_as_friend_task_threads[tasks_id].daemon = True # 设置为守护线程
self.add_group_member_as_friend_task_threads[tasks_id].start()
time.sleep(0)

def sync_contacts_processer(self,wxid, token_id, app_id):
while True:
k, login_info = utils.get_login_info_by_wxid(wxid)
if login_info.get('status') == "0":
print('sync_contacts_processer exit')
#self.sync_contacts_task_threads[wxid].join()
del self.sync_contacts_task_threads[wxid]
return
if wxid not in self.sync_contacts_task_threads:
self.sync_contacts_task_threads[wxid]=threading.Thread(target=self.sync_contacts_threads_processer, args=(wxid,token_id,app_id))
#self.sync_contacts_task_threads[wxid].daemon = True
self.sync_contacts_task_threads[wxid].start()
time.sleep(0)

def sync_contacts_threads_processer(self,wxid, token_id, app_id):
ret,msg,contacts_list = self.wxchat.fetch_contacts_list(token_id, app_id)
if ret==200:
friend_wxids = [c for c in contacts_list['friends'] if c not in ['fmessage', 'medianote','weixin','weixingongzhong']] # 可以调整截取范围
print(f'{wxid}的好友数量 {len(friend_wxids)}')
self.wxchat.save_contacts_brief_to_cache(token_id, app_id, wxid, friend_wxids)
logger.info(f'微信ID {wxid} 登录APPID {app_id} 成功,联系人已定时保存')
chatrooms=contacts_list['chatrooms']
self.wxchat.save_groups_info_to_cache(token_id, app_id, wxid, chatrooms)
self.wxchat.save_groups_members_to_cache(token_id, app_id, wxid, chatrooms)
logger.info(f'微信ID {wxid} 登录APPID {app_id} 成功,群信息已定时保存')
else:
logger.warning(f'{msg}-微信ID {wxid} 登录APPID {app_id} 不能获取好友和群资料')
time.sleep(random.uniform(3000, 3600))

def add_group_member_as_friend_thread(self,token_id, app_id, wxid,chatroom_id,chatroot_member,nickname,chatroom_nickname):
'''
从群添加好友线程,
'''
tasks_id=f'{wxid}-{chatroom_id}-{chatroot_member.get("wxid")}'
k, login_info = utils.get_login_info_by_wxid(wxid)
if login_info.get('status') == "0":
del self.add_group_member_as_friend_task_threads[tasks_id]
return
contact_history=self.wxchat.get_group_add_contacts_history(wxid,chatroom_id,chatroot_member.get('wxid'))
if len(contact_history)==1:
last_contact_history=contact_history[0]
diff_time=int(time.time())-last_contact_history.addTime
oneday_seconds=60 * 60 * 24
if diff_time<oneday_seconds:
logging.info(f'{nickname}-{nickname} 向 {chatroom_nickname}-{chatroom_id} 群的 {chatroot_member.get("nickName")}-{ chatroot_member.get("wxid")} 第二次邀请好友,等待{oneday_seconds-diff_time}秒再执行')
time.sleep(oneday_seconds-diff_time)
ret, msg, data = self.wxchat.add_group_member_as_friend(token_id, app_id, chatroom_id, chatroot_member.get('wxid'), f'我是群聊"{chatroom_nickname}"群的{nickname}')
if ret!=200:
logger.info(f'群好友邀请失败原因:{data}')
contact_wxids= chatroot_member.get('wxid')
history=Models.AddGroupContactsHistory.model_validate({
"chatroomId":chatroom_id,
"wxid":wxid,
"contactWixd":contact_wxids,
"addTime":int(time.time())
})
self.wxchat.save_group_add_contacts_history(wxid,chatroom_id,contact_wxids,history)
logger.info(f'{nickname} 向 {chatroom_nickname}-{chatroom_id} 群的 {chatroot_member.get("nickName")}-{chatroot_member.get("wxid")} 发送好友邀请 {msg}')
time.sleep(random.uniform(5, 10))
del self.add_group_member_as_friend_task_threads[tasks_id]


app = Flask(__name__)

@@ -289,7 +542,8 @@ load_config()
kafka_helper.start()
redis_helper.start()
gewe_chat.start()
wx_thread_manager=WechatThreadManager()
#wx_thread_manager.check_and_manage_threads()
start_kafka_consumer_thread()
# background_wxchat_thread()
threading.Thread(target=background_wxchat_thread).start()


+ 19
- 0
common/interceptors.py Parādīt failu

@@ -85,6 +85,25 @@ def handle_exception(error):



def check_login_status(func):
@wraps(func)
def wrapper(*args, **kwargs):

wxid = kwargs.get('wxid') # 从关键字参数中获取 wxid
if not wxid:
if len(args) > 0:
wxid = args[0]
else:
raise ValueError("wxid is required")

k, login_info = utils.get_login_info_by_wxid(wxid)
if login_info.get('status') == "0":
return # 如果 status 为 "0",直接返回,不执行原函数
return func(*args, **kwargs)
return wrapper


def auth_required_time(f):
@wraps(f)
def decorated_function(*args, **kwargs):


+ 1
- 1
wechat/gewe_chat.py Parādīt failu

@@ -1016,7 +1016,7 @@ class GeWeChatCom:
data.append(history.model_dump())
redis_helper.redis_helper.update_hash_field(hash_key, contact_wxid, json.dumps(data, ensure_ascii=False))

def get_group_add_contacts_history(self,wxid,chatroom_id,contact_wxid):
def get_group_add_contacts_history(self,wxid,chatroom_id,contact_wxid)->list[Models.AddGroupContactsHistory]:
'''
获取群加好友历史
'''


Notiek ielāde…
Atcelt
Saglabāt