|
|
@@ -282,7 +282,7 @@ def start_kafka_consumer_thread(): |
|
|
|
consumer_thread.start() |
|
|
|
|
|
|
|
|
|
|
|
def login_or_reconnect(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, hash_key, is_reconnect=False, max_retries=5): |
|
|
|
def login_or_reconnect(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, region_id,hash_key, is_reconnect=False, max_retries=5): |
|
|
|
""" |
|
|
|
封装微信登录或重连的逻辑 |
|
|
|
""" |
|
|
@@ -295,7 +295,7 @@ def login_or_reconnect(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, hash_key, |
|
|
|
else: |
|
|
|
logger.info("获取二维码进行登录...") |
|
|
|
|
|
|
|
qr_code = wxchat.get_login_qr_code(token_id, app_id) |
|
|
|
qr_code = wxchat.get_login_qr_code(token_id, app_id,region_id) |
|
|
|
base64_string = qr_code.get('qrImgBase64') |
|
|
|
uuid = qr_code.get('uuid') |
|
|
|
|
|
|
@@ -306,31 +306,37 @@ def login_or_reconnect(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, hash_key, |
|
|
|
k_message=utils.login_qrcode_message(token_id,agent_tel,base64_string,qr_code_urls) |
|
|
|
kafka_helper.kafka_client.produce_message(k_message) |
|
|
|
while True: |
|
|
|
now = time.time() |
|
|
|
# 如果登录超时,重新获取二维码 |
|
|
|
if time.time() - start_time > 150: #150 秒 二维码失效 |
|
|
|
if now- start_time > 150: #150 秒 二维码失效 |
|
|
|
break |
|
|
|
res = wxchat.check_login(token_id, app_id, uuid) |
|
|
|
flag = res.get('status') |
|
|
|
# 构造 Kafka 消息发送登录状态 |
|
|
|
# todo |
|
|
|
if flag == 2: |
|
|
|
logger.info(f"登录成功: {res}") |
|
|
|
head_img_url=res.get('headImgUrl','') |
|
|
|
login_info = res.get('loginInfo', {}) |
|
|
|
|
|
|
|
login_info.update({'appId': app_id, 'uuid': uuid, 'tokenId': token_id,'status': 1,'headImgUrl':head_img_url}) |
|
|
|
cache_login_info=redis_helper.redis_helper.get_hash(hash_key) |
|
|
|
if 'appId' not in cache_login_info: |
|
|
|
login_info.update({"create_at":int(time.time()),"modify_at":int(time.time())}) |
|
|
|
else: |
|
|
|
login_info.update({"modify_at":int(time.time())}) |
|
|
|
# if 'appId' in cache_login_info: |
|
|
|
# login_info.update({"reg_time":datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3],"login_time":datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]}) |
|
|
|
# else: |
|
|
|
# login_info.update({"login_time":datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]}) |
|
|
|
cleaned_login_info = {k: (v if v is not None else '') for k, v in login_info.items()} |
|
|
|
redis_helper.redis_helper.set_hash(hash_key, cleaned_login_info) |
|
|
|
return login_info |
|
|
|
logger.info(f"{token_id} 使用 {app_id},等待扫码登录,二维码有效时间 {150 - int(now - start_time)} 秒") |
|
|
|
captch_code = wxchat.get_login_wx_captch_code_to_cache(token_id) |
|
|
|
captch_code= captch_code if captch_code else '' |
|
|
|
ret,msg,res = wxchat.check_login(token_id, app_id, uuid,captch_code) |
|
|
|
if ret == 200: |
|
|
|
flag = res.get('status') |
|
|
|
# 构造 Kafka 消息发送登录状态 |
|
|
|
# todo |
|
|
|
if flag == 2: |
|
|
|
logger.info(f"登录成功: {res}") |
|
|
|
head_img_url=res.get('headImgUrl','') |
|
|
|
login_info = res.get('loginInfo', {}) |
|
|
|
|
|
|
|
login_info.update({'appId': app_id, 'uuid': uuid, 'tokenId': token_id,'status': 1,'headImgUrl':head_img_url,'regionId':region_id}) |
|
|
|
cache_login_info=redis_helper.redis_helper.get_hash(hash_key) |
|
|
|
if 'appId' not in cache_login_info: |
|
|
|
login_info.update({"create_at":int(time.time()),"modify_at":int(time.time())}) |
|
|
|
else: |
|
|
|
login_info.update({"modify_at":int(time.time())}) |
|
|
|
# if 'appId' in cache_login_info: |
|
|
|
# login_info.update({"reg_time":datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3],"login_time":datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]}) |
|
|
|
# else: |
|
|
|
# login_info.update({"login_time":datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]}) |
|
|
|
cleaned_login_info = {k: (v if v is not None else '') for k, v in login_info.items()} |
|
|
|
redis_helper.redis_helper.set_hash(hash_key, cleaned_login_info) |
|
|
|
return login_info |
|
|
|
|
|
|
|
time.sleep(5) |
|
|
|
logger.error(f"登录失败,二维码生成 {max_retries} 次") |
|
|
|
|
|
|
@@ -346,12 +352,12 @@ def fetch_and_save_contacts(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, hash |
|
|
|
print(f'微信ID {wxid} 登录APPID {app_id} 成功,联系人已保存') |
|
|
|
|
|
|
|
|
|
|
|
def wx_login(wxchat:gewe_chat.GeWeChatCom,tel,token_id): |
|
|
|
def wx_login(wxchat:gewe_chat.GeWeChatCom,tel,token_id,region_id): |
|
|
|
hash_key = f"__AI_OPS_WX__:LOGININFO:{tel}" |
|
|
|
login_info = redis_helper.redis_helper.get_hash(hash_key) |
|
|
|
|
|
|
|
if not login_info: |
|
|
|
login_info = login_or_reconnect(wxchat, token_id, '', hash_key) |
|
|
|
login_info = login_or_reconnect(wxchat, token_id, '', region_id,hash_key) |
|
|
|
else: |
|
|
|
app_id = login_info.get('appId') |
|
|
|
token_id = login_info.get('tokenId') |
|
|
@@ -367,10 +373,23 @@ def wx_login(wxchat:gewe_chat.GeWeChatCom,tel,token_id): |
|
|
|
logger.info(f'微信ID {wxid} 在APPID {app_id} 重连成功') |
|
|
|
else: |
|
|
|
print("重连失败,重新登录...") |
|
|
|
login_info = login_or_reconnect(wxchat, token_id, app_id, hash_key, is_reconnect=True) |
|
|
|
login_info = login_or_reconnect(wxchat, token_id, app_id, region_id,hash_key, is_reconnect=True) |
|
|
|
if login_info: |
|
|
|
fetch_and_save_contacts(wxchat, token_id, login_info.get('appId'), hash_key) |
|
|
|
|
|
|
|
def login_wx_captch_code_process(wxchat:gewe_chat.GeWeChatCom,message): |
|
|
|
msg_content = message |
|
|
|
cleaned_content = clean_json_string(msg_content) |
|
|
|
content = json.loads(cleaned_content) |
|
|
|
data = content.get("data", {}) |
|
|
|
|
|
|
|
msg_type_data = data.get("msg_type", None) |
|
|
|
content_data = data.get("content", {}) |
|
|
|
token_id = content_data.get("token_id", None) |
|
|
|
captch_code = content_data.get("captch_code", None) |
|
|
|
wxchat.save_login_wx_captch_code_to_cache(token_id,captch_code) |
|
|
|
|
|
|
|
|
|
|
|
def ops_messages_process(message): |
|
|
|
try: |
|
|
|
wxchat = gewe_chat.wxchat |
|
|
@@ -387,9 +406,9 @@ def ops_messages_process(message): |
|
|
|
|
|
|
|
tel=content_data.get('tel', '18029274615') |
|
|
|
token_id=content_data.get('token_id', 'f828cb3c-1039-489f-b9ae-7494d1778a15') |
|
|
|
|
|
|
|
region_id=content_data.get('region_id', '440000') |
|
|
|
#wx_login(wxchat,tel,token_id) |
|
|
|
thread = threading.Thread(target=wx_login, args=(wxchat,tel,token_id,)) |
|
|
|
thread = threading.Thread(target=wx_login, args=(wxchat,tel,token_id,region_id)) |
|
|
|
thread.daemon = True |
|
|
|
thread.start() |
|
|
|
elif msg_type_data == 'group-sending': |
|
|
@@ -400,17 +419,12 @@ def ops_messages_process(message): |
|
|
|
thread = threading.Thread(target=wx_messages_process_callback, args=(agent_tel,message,)) |
|
|
|
thread.daemon = True |
|
|
|
thread.start() |
|
|
|
elif msg_type_data == 'login_wx_captch_code': |
|
|
|
thread = threading.Thread(target=login_wx_captch_code_process, args=(wxchat,message,)) |
|
|
|
thread.daemon = True |
|
|
|
thread.start() |
|
|
|
else: |
|
|
|
print(f'未处理息类型 {msg_type_data}') |
|
|
|
|
|
|
|
except Exception as e: |
|
|
|
print(f"处理消息时发生错误: {e}, 消息内容: {message}") |
|
|
|
|
|
|
|
|
|
|
|
# 启动 |
|
|
|
# def consumer_start_up_thread(): |
|
|
|
# print('启动') |
|
|
|
# try: |
|
|
|
# print('启动') |
|
|
|
# except Exception as e: |
|
|
|
# print(f"处理消息时发生错误: {e}, 消息内容: {message}") |