import threading from common import kafka_helper,redis_helper,utils from urllib.parse import urlparse, unquote import json,time,re,random,os from common.log import logger, log_exception from datetime import datetime from wechat import gewe_chat def wx_messages_process_callback(agent_tel,message): try: # print(f'手机号 {agent_tel}') wxchat = gewe_chat.wxchat msg_content = message cleaned_content = clean_json_string(msg_content) content = json.loads(cleaned_content) data = content.get("data", {}) msg_type_data = data.get("msg_type", None) content_data = data.get("content", {}) agent_tel = content_data.get("agent_tel", None) if msg_type_data == 'group-sending': process_group_sending(wxchat, content_data, agent_tel) except json.JSONDecodeError as e: print(f"JSON解码错误: {e}, 消息内容: {message}") except Exception as e: print(f"处理消息时发生错误: {e}, 消息内容: {message}") def process_group_sending_v0(wxchat:gewe_chat.GeWeChatCom, content_data, agent_tel:str): # 获取登录信息 hash_key = f"__AI_OPS_WX__:LOGININFO:{agent_tel}" logininfo = redis_helper.redis_helper.get_hash(hash_key) if not logininfo: logger.warning(f"未找到 {agent_tel} 的登录信息") return token_id = logininfo.get('tokenId') app_id = logininfo.get('appId') agent_wxid = logininfo.get('wxid') # 获取联系人列表并计算交集 hash_key = f"__AI_OPS_WX__:CONTACTS_BRIEF:{agent_wxid}" cache_friend_wxids_str=redis_helper.redis_helper.get_hash_field(hash_key,"data") cache_friend_wxids_list=json.loads(cache_friend_wxids_str) if cache_friend_wxids_str else [] cache_friend_wxids=[f["userName"] for f in cache_friend_wxids_list] wxid_contact_list_content_data = [c['wxid'] for c in content_data.get("contact_list", [])] intersection_wxids = list(set(cache_friend_wxids) & set(wxid_contact_list_content_data)) # 发送消息 wx_content_list = content_data.get("wx_content", []) for wx_content in wx_content_list: if wx_content["type"] == "text": send_text_message(wxchat, token_id, app_id, agent_wxid, intersection_wxids, wx_content["text"]) elif wx_content["type"] == "image_url": send_image_message(wxchat, token_id, app_id, agent_wxid, intersection_wxids, wx_content.get("image_url", {}).get("url")) elif wx_content["type"] == "tts": send_tts_message(wxchat, token_id, app_id, agent_wxid, intersection_wxids, wx_content["text"]) def process_group_sending(wxchat:gewe_chat.GeWeChatCom, content_data, agent_tel:str): # 获取登录信息 hash_key = f"__AI_OPS_WX__:LOGININFO:{agent_tel}" logininfo = redis_helper.redis_helper.get_hash(hash_key) if not logininfo: logger.warning(f"未找到 {agent_tel} 的登录信息") return token_id = logininfo.get('tokenId') app_id = logininfo.get('appId') agent_wxid = logininfo.get('wxid') # 获取联系人列表并计算交集 hash_key = f"__AI_OPS_WX__:CONTACTS_BRIEF:{agent_wxid}" cache_friend_wxids_str=redis_helper.redis_helper.get_hash_field(hash_key,"data") cache_friend_wxids_list=json.loads(cache_friend_wxids_str) if cache_friend_wxids_str else [] cache_friend_wxids=[f["userName"] for f in cache_friend_wxids_list] # 获取群交集 hash_key = f"__AI_OPS_WX__:GROUPS_INFO:{agent_wxid}" cache_chatrooms = redis_helper.redis_helper.get_hash(hash_key) cache_chatroom_ids=cache_chatrooms.keys() wxid_contact_list_content_data = [c['wxid'] for c in content_data.get("contact_list", [])] intersection_friend_wxids = list(set(cache_friend_wxids) & set(wxid_contact_list_content_data)) intersection_chatroom_ids = list(set(cache_chatroom_ids) & set(wxid_contact_list_content_data)) intersection_wxids=intersection_friend_wxids+intersection_chatroom_ids # 发送消息 wx_content_list = content_data.get("wx_content", []) # for wx_content in wx_content_list: # if wx_content["type"] == "text": # send_text_message(wxchat, token_id, app_id, agent_wxid, intersection_wxids, wx_content["text"]) # elif wx_content["type"] == "image_url": # send_image_message(wxchat, token_id, app_id, agent_wxid, intersection_wxids, wx_content.get("image_url", {}).get("url")) # elif wx_content["type"] == "tts": # send_tts_message(wxchat, token_id, app_id, agent_wxid, intersection_wxids, wx_content["text"]) wxchat.forward_video_aeskey = '' wxchat.forward_video_cdnvideourl = '' wxchat.forward_video_length = 0 for intersection_wxid in intersection_wxids: for wx_content in wx_content_list: if wx_content["type"] == "text": send_text_message(wxchat, token_id, app_id, agent_wxid, [intersection_wxid], wx_content["text"]) elif wx_content["type"] == "image_url": send_image_message(wxchat, token_id, app_id, agent_wxid, [intersection_wxid], wx_content.get("image_url", {}).get("url")) elif wx_content["type"] == "tts": send_tts_message(wxchat, token_id, app_id, agent_wxid, [intersection_wxid], wx_content["text"]) elif wx_content["type"] == "file": send_file_message(wxchat, token_id, app_id, agent_wxid, [intersection_wxid], wx_content.get("file_url", {}).get("url")) def send_text_message(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, agent_wxid, intersection_wxids, text): for t in intersection_wxids: # 发送文本消息 ret,ret_msg,res = wxchat.post_text(token_id, app_id, t, text) logger.info(f'{agent_wxid} 向 {t} 发送文字【{text}】') # 构造对话消息并发送到 Kafka input_wx_content_dialogue_message = [{"type": "text", "text": text}] input_message = utils.dialogue_message(agent_wxid, t, input_wx_content_dialogue_message) kafka_helper.kafka_client.produce_message(input_message) logger.info("发送对话 %s", input_message) # 等待随机时间 time.sleep(random.uniform(5, 15)) def send_image_message(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, agent_wxid, intersection_wxids, image_url): aeskey, cdnthumburl, cdnthumblength, cdnthumbheight, cdnthumbwidth, length, md5 = "", "", 0, 0, 0, 0, "" for t in intersection_wxids: if t == intersection_wxids[0]: # 发送图片 ret,ret_msg,res = wxchat.post_image(token_id, app_id, t, image_url) if ret==200: aeskey = res["aesKey"] cdnthumburl = res["fileId"] cdnthumblength = res["cdnThumbLength"] cdnthumbheight = res["height"] cdnthumbwidth = res["width"] length = res["length"] md5 = res["md5"] logger.info(f'{agent_wxid} 向 {t} 发送图片【{image_url}】{ret_msg}') else: logger.warning(f'{agent_wxid} 向 {t} 发送图片【{image_url}】{ret_msg}') else: if aeskey !="": # 转发图片 res,ret,ret_msg= wxchat.forward_image(token_id, app_id, t, aeskey, cdnthumburl, cdnthumblength, cdnthumbheight, cdnthumbwidth, length, md5) logger.info(f'{agent_wxid} 向 {t} 转发图片【{image_url}】{ret_msg}') else: # 发送图片 ret,ret_msg,res = wxchat.post_image(token_id, app_id, t, image_url) if ret==200: aeskey = res["aesKey"] cdnthumburl = res["fileId"] cdnthumblength = res["cdnThumbLength"] cdnthumbheight = res["height"] cdnthumbwidth = res["width"] length = res["length"] md5 = res["md5"] logger.info(f'{agent_wxid} 向 {t} 发送图片【{image_url}】{ret_msg}') else: logger.warning(f'{agent_wxid} 向 {t} 发送图片【{image_url}】{ret_msg}') # 构造对话消息并发送到 Kafka wx_content_dialogue_message = [{"type": "image_url", "image_url": {"url": image_url}}] input_message = utils.dialogue_message(agent_wxid, t, wx_content_dialogue_message) kafka_helper.kafka_client.produce_message(input_message) logger.info("发送对话 %s", input_message) # 等待随机时间 time.sleep(random.uniform(5, 15)) def send_tts_message(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, agent_wxid, intersection_wxids, text): voice_during,voice_url=utils.wx_voice(text) for t in intersection_wxids: # 发送送语音消息 if voice_url: ret,ret_msg,res = wxchat.post_voice(token_id, app_id, t, voice_url,voice_during) if ret==200: logger.info(f'{agent_wxid} 向 {t} 发送语音文本【{text}】{ret_msg}') # 构造对话消息并发送到 Kafka input_wx_content_dialogue_message = [{"type": "text", "text": text}] input_message = utils.dialogue_message(agent_wxid, t, input_wx_content_dialogue_message) kafka_helper.kafka_client.produce_message(input_message) logger.info("发送对话 %s", input_message) else: logger.warning((f'{agent_wxid} 向 {t} 发送语音文本【{text}】{ret_msg}')) else: logger.warning((f'{agent_wxid} 向 {t} 发送语音文本【{text}】出错')) # 等待随机时间 time.sleep(random.uniform(5, 15)) def send_file_message(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, agent_wxid, intersection_wxids, file_url): parsed_url = urlparse(file_url) path = parsed_url.path # 从路径中提取文件名 filename = path.split('/')[-1] # 获取扩展名 _, ext = os.path.splitext(filename) if ext == '.mp4': send_video_message(wxchat, token_id, app_id, agent_wxid, intersection_wxids, file_url) else: send_other_file_message(wxchat, token_id, app_id, agent_wxid, intersection_wxids, file_url) #time.sleep(random.uniform(5, 15)) def send_video_message(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, agent_wxid, intersection_wxids, file_url): for t in intersection_wxids: # 发送视频消息 parsed_url = urlparse(file_url) filename = os.path.basename(parsed_url.path) tmp_file_path = os.path.join(os.getcwd(),'tmp', filename) # 拼接完整路径 thumbnail_path=tmp_file_path.replace('.mp4','.jpg') video_thumb_url,video_duration =utils.download_video_and_get_thumbnail(file_url,thumbnail_path) print(f'视频缩略图 {video_thumb_url} 时长 {video_duration}') if wxchat.forward_video_aeskey == '': ret,ret_msg,res = wxchat.post_video(token_id, app_id, t, file_url,video_thumb_url,video_duration) if ret==200: wxchat.forward_video_aeskey = res["aesKey"] wxchat.forward_video_cdnvideourl = res["cdnVideoUrl"] wxchat.forward_video_length = res["length"] else: ret,ret_msg,res = wxchat.forward_video(token_id, app_id, t, wxchat.forward_video_aeskey, wxchat.forward_video_cdnvideourl, wxchat.forward_video_length) print('转发视频') if ret==200: logger.info(f'{agent_wxid} 向 {t} 发送视频【{file_url}】{ret_msg}') # 构造对话消息并发送到 Kafka input_wx_content_dialogue_message = [{"type": "file", "file_url": {"url": file_url}}] input_message = utils.dialogue_message(agent_wxid, t, input_wx_content_dialogue_message) kafka_helper.kafka_client.produce_message(input_message) logger.info("发送对话 %s", input_message) else: logger.warning((f'{agent_wxid} 向 {t} 发送视频【{file_url}】{ret_msg}')) # 等待随机时间 time.sleep(random.uniform(5, 15)) def send_other_file_message(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, agent_wxid, intersection_wxids, file_url): print('send_otherfile_message') def clean_json_string(json_str): # 删除所有控制字符(非打印字符),包括换行符、回车符等 return re.sub(r'[\x00-\x1f\x7f]', '', json_str) # 启动 Kafka 消费者线程 def start_kafka_consumer_thread(): agent_tel=os.environ.get('tel', '18029274615') # consumer_thread = threading.Thread(target=kafka_helper.kafka_client.consume_messages, args=(agent_tel,wx_messages_process_callback,)) consumer_thread = threading.Thread(target=kafka_helper.kafka_client.consume_messages, args=(ops_messages_process,)) consumer_thread.daemon = True # 设置为守护线程,应用退出时会自动结束 consumer_thread.start() def login_or_reconnect(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, region_id,hash_key, is_reconnect=False, max_retries=5): """ 封装微信登录或重连的逻辑 """ agent_tel=hash_key.split(":")[-1] retry_count = 0 while retry_count < max_retries: retry_count += 1 if is_reconnect: logger.info("尝试重连...") else: logger.info("获取二维码进行登录...") qr_code = wxchat.get_login_qr_code(token_id, app_id,region_id) base64_string = qr_code.get('qrImgBase64') uuid = qr_code.get('uuid') if not uuid: logger.error(f"uuid获取二维码失败: {qr_code}") break app_id = app_id or qr_code.get('appId') start_time = time.time() qr_code_urls= wxchat.qrCallback(uuid, base64_string) # 构造 Kafka 消息发送二维码 k_message=utils.login_qrcode_message(token_id,agent_tel,base64_string,qr_code_urls) kafka_helper.kafka_client.produce_message(k_message) while True: now = time.time() # 如果登录超时,重新获取二维码 if now- start_time > 150: #150 秒 二维码失效 break logger.info(f"{token_id} 使用 {app_id},等待扫码登录,二维码有效时间 {150 - int(now - start_time)} 秒") captch_code = wxchat.get_login_wx_captch_code_to_cache(token_id) captch_code= captch_code if captch_code else '' ret,msg,res = wxchat.check_login(token_id, app_id, uuid,captch_code) if ret == 200: flag = res.get('status') # 构造 Kafka 消息发送登录状态 # todo if flag == 2: logger.info(f"登录成功: {res}") head_img_url=res.get('headImgUrl','') login_info = res.get('loginInfo', {}) login_info.update({'appId': app_id, 'uuid': uuid, 'tokenId': token_id,'status': 1,'headImgUrl':head_img_url,'regionId':region_id}) cache_login_info=redis_helper.redis_helper.get_hash(hash_key) if 'appId' not in cache_login_info: login_info.update({"create_at":int(time.time()),"modify_at":int(time.time())}) else: login_info.update({"modify_at":int(time.time())}) # if 'appId' in cache_login_info: # login_info.update({"reg_time":datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3],"login_time":datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]}) # else: # login_info.update({"login_time":datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]}) cleaned_login_info = {k: (v if v is not None else '') for k, v in login_info.items()} redis_helper.redis_helper.set_hash(hash_key, cleaned_login_info) return login_info time.sleep(5) logger.error(f"登录失败,二维码生成 {max_retries} 次") def fetch_and_save_contacts(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, hash_key): """ 获取联系人列表并保存到缓存 """ ret,msg,contacts_list = wxchat.fetch_contacts_list(token_id, app_id) friend_wxids = contacts_list['friends'][3:] # 可以调整截取范围 wxid = redis_helper.redis_helper.get_hash_field(hash_key, 'wxid') wxchat.save_contacts_brief_to_cache(token_id, app_id, wxid, friend_wxids) print(f'微信ID {wxid} 登录APPID {app_id} 成功,联系人已保存') def wx_login(wxchat:gewe_chat.GeWeChatCom,tel,token_id,region_id): hash_key = f"__AI_OPS_WX__:LOGININFO:{tel}" login_info = redis_helper.redis_helper.get_hash(hash_key) if not login_info: login_info = login_or_reconnect(wxchat, token_id, '', region_id,hash_key) else: app_id = login_info.get('appId') token_id = login_info.get('tokenId') wxid= login_info.get('wxid') # 检查是否已经登录 is_online = wxchat.check_online(token_id, app_id) if is_online: logger.info(f'微信ID {wxid} 在APPID {app_id} 已经在线') else: # 尝试重连 res = wxchat.reconnection(token_id, app_id) if res.get('ret') == 200: logger.info(f'微信ID {wxid} 在APPID {app_id} 重连成功') else: print("重连失败,重新登录...") login_info = login_or_reconnect(wxchat, token_id, app_id, region_id,hash_key, is_reconnect=True) if login_info: fetch_and_save_contacts(wxchat, token_id, login_info.get('appId'), hash_key) def login_wx_captch_code_process(wxchat:gewe_chat.GeWeChatCom,message): msg_content = message cleaned_content = clean_json_string(msg_content) content = json.loads(cleaned_content) data = content.get("data", {}) msg_type_data = data.get("msg_type", None) content_data = data.get("content", {}) token_id = content_data.get("token_id", None) captch_code = content_data.get("captch_code", None) wxchat.save_login_wx_captch_code_to_cache(token_id,captch_code) def ops_messages_process(message): try: wxchat = gewe_chat.wxchat #print(message) # logger.info(f"接收到kafka消息: {json.dumps(message, separators=(',', ':'), ensure_ascii=False)}") logger.info(f"接收到kafka消息: {json.dumps(json.loads(message), ensure_ascii=False)}") msg_content = message cleaned_content = clean_json_string(msg_content) content = json.loads(cleaned_content) data = content.get("data", {}) msg_type_data = data.get("msg_type", None) content_data = data.get("content", {}) if msg_type_data=="login": tel=content_data.get('tel', '18029274615') token_id=content_data.get('token_id', 'f828cb3c-1039-489f-b9ae-7494d1778a15') region_id=content_data.get('region_id', '440000') #wx_login(wxchat,tel,token_id) thread = threading.Thread(target=wx_login, args=(wxchat,tel,token_id,region_id)) thread.daemon = True thread.start() elif msg_type_data == 'group-sending': agent_tel=content_data.get('agent_tel', '18029274615') # 使用线程处理 #wx_messages_process_callback(agent_tel,message) thread = threading.Thread(target=wx_messages_process_callback, args=(agent_tel,message,)) thread.daemon = True thread.start() elif msg_type_data == 'login_wx_captch_code': thread = threading.Thread(target=login_wx_captch_code_process, args=(wxchat,message,)) thread.daemon = True thread.start() else: print(f'未处理息类型 {msg_type_data}') except Exception as e: print(f"处理消息时发生错误: {e}, 消息内容: {message}")