From ba158dfeb912fe0b1eace8725921b844933e07f5 Mon Sep 17 00:00:00 2001 From: H Vs Date: Wed, 19 Feb 2025 15:07:12 +0800 Subject: [PATCH] =?UTF-8?q?=E7=99=BB=E5=BD=95=E9=AA=8C=E8=AF=81=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app.py | 3 ++ common/memory.py | 3 +- resources/login_resources.py | 16 +++++- resources/messages_resource.py | 2 +- wechat/biz.py | 92 ++++++++++++++++++++-------------- wechat/gewe_chat.py | 49 +++++++++++++----- 6 files changed, 111 insertions(+), 54 deletions(-) diff --git a/app.py b/app.py index 6a8986f..b56ad0c 100644 --- a/app.py +++ b/app.py @@ -87,6 +87,9 @@ def fetch_and_save_contacts2(): time.sleep(3600*24) +def auto_contacts_from_chatroom_id(): + print('自动从群添加好友') + def start_wxchat_thread(): # gewe_chat.start() scan_wx_login_info() diff --git a/common/memory.py b/common/memory.py index fc7915b..8cdb194 100644 --- a/common/memory.py +++ b/common/memory.py @@ -1,4 +1,5 @@ from common.expired_dict import ExpiredDict USER_IMAGE_CACHE = ExpiredDict(60 * 3) -USER_INTERACTIVE_CACHE=ExpiredDict(60 * 1) \ No newline at end of file +USER_INTERACTIVE_CACHE=ExpiredDict(60 * 1) +USER_LOGIN_QRCODE=ExpiredDict(80) \ No newline at end of file diff --git a/resources/login_resources.py b/resources/login_resources.py index 6851c03..32a0988 100644 --- a/resources/login_resources.py +++ b/resources/login_resources.py @@ -13,4 +13,18 @@ class GetLoginInfoResource(Resource): req = request.get_json() tel = req.get("tel") config=self.wxchat.get_login_info_from_cache(tel) - return jsonify(config) \ No newline at end of file + return jsonify(config) + + +class LoginWxCaptchCodeResource(Resource): + def __init__(self): + self.parser = reqparse.RequestParser() + self.wxchat = gewe_chat.wxchat + + def post(self): + req = request.get_json() + token_id = req.get("token_id") + captch_code= req.get("captch_code") + res=self.wxchat.save_login_wx_captch_code_to_cache(token_id,captch_code) + return jsonify(res) + diff --git a/resources/messages_resource.py b/resources/messages_resource.py index ad2e07f..b55459a 100644 --- a/resources/messages_resource.py +++ b/resources/messages_resource.py @@ -14,7 +14,7 @@ import os from voice import audio_convert -timeout_duration = 4.0 +timeout_duration = 8.0 class MessagesResource(Resource): def __init__(self): diff --git a/wechat/biz.py b/wechat/biz.py index b55e514..de10924 100644 --- a/wechat/biz.py +++ b/wechat/biz.py @@ -282,7 +282,7 @@ def start_kafka_consumer_thread(): consumer_thread.start() -def login_or_reconnect(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, hash_key, is_reconnect=False, max_retries=5): +def login_or_reconnect(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, region_id,hash_key, is_reconnect=False, max_retries=5): """ 封装微信登录或重连的逻辑 """ @@ -295,7 +295,7 @@ def login_or_reconnect(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, hash_key, else: logger.info("获取二维码进行登录...") - qr_code = wxchat.get_login_qr_code(token_id, app_id) + qr_code = wxchat.get_login_qr_code(token_id, app_id,region_id) base64_string = qr_code.get('qrImgBase64') uuid = qr_code.get('uuid') @@ -306,31 +306,37 @@ def login_or_reconnect(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, hash_key, k_message=utils.login_qrcode_message(token_id,agent_tel,base64_string,qr_code_urls) kafka_helper.kafka_client.produce_message(k_message) while True: + now = time.time() # 如果登录超时,重新获取二维码 - if time.time() - start_time > 150: #150 秒 二维码失效 + if now- start_time > 150: #150 秒 二维码失效 break - res = wxchat.check_login(token_id, app_id, uuid) - flag = res.get('status') - # 构造 Kafka 消息发送登录状态 - # todo - if flag == 2: - logger.info(f"登录成功: {res}") - head_img_url=res.get('headImgUrl','') - login_info = res.get('loginInfo', {}) - - login_info.update({'appId': app_id, 'uuid': uuid, 'tokenId': token_id,'status': 1,'headImgUrl':head_img_url}) - cache_login_info=redis_helper.redis_helper.get_hash(hash_key) - if 'appId' not in cache_login_info: - login_info.update({"create_at":int(time.time()),"modify_at":int(time.time())}) - else: - login_info.update({"modify_at":int(time.time())}) - # if 'appId' in cache_login_info: - # login_info.update({"reg_time":datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3],"login_time":datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]}) - # else: - # login_info.update({"login_time":datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]}) - cleaned_login_info = {k: (v if v is not None else '') for k, v in login_info.items()} - redis_helper.redis_helper.set_hash(hash_key, cleaned_login_info) - return login_info + logger.info(f"{token_id} 使用 {app_id},等待扫码登录,二维码有效时间 {150 - int(now - start_time)} 秒") + captch_code = wxchat.get_login_wx_captch_code_to_cache(token_id) + captch_code= captch_code if captch_code else '' + ret,msg,res = wxchat.check_login(token_id, app_id, uuid,captch_code) + if ret == 200: + flag = res.get('status') + # 构造 Kafka 消息发送登录状态 + # todo + if flag == 2: + logger.info(f"登录成功: {res}") + head_img_url=res.get('headImgUrl','') + login_info = res.get('loginInfo', {}) + + login_info.update({'appId': app_id, 'uuid': uuid, 'tokenId': token_id,'status': 1,'headImgUrl':head_img_url,'regionId':region_id}) + cache_login_info=redis_helper.redis_helper.get_hash(hash_key) + if 'appId' not in cache_login_info: + login_info.update({"create_at":int(time.time()),"modify_at":int(time.time())}) + else: + login_info.update({"modify_at":int(time.time())}) + # if 'appId' in cache_login_info: + # login_info.update({"reg_time":datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3],"login_time":datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]}) + # else: + # login_info.update({"login_time":datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]}) + cleaned_login_info = {k: (v if v is not None else '') for k, v in login_info.items()} + redis_helper.redis_helper.set_hash(hash_key, cleaned_login_info) + return login_info + time.sleep(5) logger.error(f"登录失败,二维码生成 {max_retries} 次") @@ -346,12 +352,12 @@ def fetch_and_save_contacts(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, hash print(f'微信ID {wxid} 登录APPID {app_id} 成功,联系人已保存') -def wx_login(wxchat:gewe_chat.GeWeChatCom,tel,token_id): +def wx_login(wxchat:gewe_chat.GeWeChatCom,tel,token_id,region_id): hash_key = f"__AI_OPS_WX__:LOGININFO:{tel}" login_info = redis_helper.redis_helper.get_hash(hash_key) if not login_info: - login_info = login_or_reconnect(wxchat, token_id, '', hash_key) + login_info = login_or_reconnect(wxchat, token_id, '', region_id,hash_key) else: app_id = login_info.get('appId') token_id = login_info.get('tokenId') @@ -367,10 +373,23 @@ def wx_login(wxchat:gewe_chat.GeWeChatCom,tel,token_id): logger.info(f'微信ID {wxid} 在APPID {app_id} 重连成功') else: print("重连失败,重新登录...") - login_info = login_or_reconnect(wxchat, token_id, app_id, hash_key, is_reconnect=True) + login_info = login_or_reconnect(wxchat, token_id, app_id, region_id,hash_key, is_reconnect=True) if login_info: fetch_and_save_contacts(wxchat, token_id, login_info.get('appId'), hash_key) +def login_wx_captch_code_process(wxchat:gewe_chat.GeWeChatCom,message): + msg_content = message + cleaned_content = clean_json_string(msg_content) + content = json.loads(cleaned_content) + data = content.get("data", {}) + + msg_type_data = data.get("msg_type", None) + content_data = data.get("content", {}) + token_id = content_data.get("token_id", None) + captch_code = content_data.get("captch_code", None) + wxchat.save_login_wx_captch_code_to_cache(token_id,captch_code) + + def ops_messages_process(message): try: wxchat = gewe_chat.wxchat @@ -387,9 +406,9 @@ def ops_messages_process(message): tel=content_data.get('tel', '18029274615') token_id=content_data.get('token_id', 'f828cb3c-1039-489f-b9ae-7494d1778a15') - + region_id=content_data.get('region_id', '440000') #wx_login(wxchat,tel,token_id) - thread = threading.Thread(target=wx_login, args=(wxchat,tel,token_id,)) + thread = threading.Thread(target=wx_login, args=(wxchat,tel,token_id,region_id)) thread.daemon = True thread.start() elif msg_type_data == 'group-sending': @@ -400,17 +419,12 @@ def ops_messages_process(message): thread = threading.Thread(target=wx_messages_process_callback, args=(agent_tel,message,)) thread.daemon = True thread.start() + elif msg_type_data == 'login_wx_captch_code': + thread = threading.Thread(target=login_wx_captch_code_process, args=(wxchat,message,)) + thread.daemon = True + thread.start() else: print(f'未处理息类型 {msg_type_data}') except Exception as e: print(f"处理消息时发生错误: {e}, 消息内容: {message}") - - -# 启动 -# def consumer_start_up_thread(): -# print('启动') -# try: -# print('启动') -# except Exception as e: -# print(f"处理消息时发生错误: {e}, 消息内容: {message}") \ No newline at end of file diff --git a/wechat/gewe_chat.py b/wechat/gewe_chat.py index aa73acb..c61e583 100644 --- a/wechat/gewe_chat.py +++ b/wechat/gewe_chat.py @@ -11,6 +11,7 @@ import requests from io import BytesIO from PIL import Image from common import redis_helper +from common.log import logger wxchat=None @@ -20,7 +21,7 @@ class GeWeChatCom: self.base_url = base_url ############################### 登录模块 ############################### - def check_login(self, token_id, app_id, uuid): + def check_login(self, token_id, app_id, uuid,captch_code=""): ''' 执行登录(步骤3) @@ -38,14 +39,23 @@ class GeWeChatCom: } data = { "appId": app_id, - "uuid": uuid + "uuid": uuid, + "captchCode":captch_code } + if captch_code=="": + data = { + "appId": app_id, + "uuid": uuid + } + response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_data = response.json() - print(response_data) - return response_data.get('data') + # print(response_data) + # return response_data.get('data') + response_object = response.json() + return response_object.get('ret',None),response_object.get('msg',None),response_object.get('data',None) - def get_login_qr_code(self, token_id,app_id=""): + def get_login_qr_code(self, token_id,app_id="",region_id="440000"): ''' 获取登录二维码(步骤2) @@ -62,18 +72,25 @@ class GeWeChatCom: 'X-GEWE-TOKEN': token_id, 'Content-Type': 'application/json' } - if app_id=="": + # if app_id=="": - data = { - "appId": app_id - } - else: - data = { + # data = { + # "appId": app_id + # } + # else: + # data = { + # "appId": app_id, + # "regionId":region_id + # } + + data = { "appId": app_id, - "regionId":"440000" + "regionId":region_id } response = requests.post(url=api_url, headers=headers, data=json.dumps(data)) response_data = response.json() + data=json.dumps(response_data, separators=(',', ':'),ensure_ascii=False) + logger.info(f'{token_id} 的登录APP信息:{data}') return response_data.get('data') def qrCallback(self,uuid, base64_string): @@ -863,6 +880,14 @@ class GeWeChatCom: cache = redis_helper.redis_helper.get_hash(hash_key) return cache + def save_login_wx_captch_code_to_cache(self,token_id,captch_code): + hash_key = f"__AI_OPS_WX__:WXCAPTCHCODE:{token_id}" + redis_helper.redis_helper.set_hash(hash_key,{"data":captch_code},10) + + def get_login_wx_captch_code_to_cache(self,token_id)->str: + hash_key = f"__AI_OPS_WX__:WXCAPTCHCODE:{token_id}" + r=redis_helper.redis_helper.get_hash_field(hash_key,"data") + return r def start(): global wxchat