|
- import threading
-
- from common import kafka_helper,redis_helper,utils
- from urllib.parse import urlparse, unquote
- import json,time,re,random,os
- from common.log import logger, log_exception
- from datetime import datetime
-
-
- from wechat import gewe_chat
-
- from model import Models
-
-
-
- def wx_messages_process_callback(agent_tel,message):
- try:
- # print(f'手机号 {agent_tel}')
- wxchat = gewe_chat.wxchat
- msg_content = message
- cleaned_content = clean_json_string(msg_content)
- content = json.loads(cleaned_content)
- data = content.get("data", {})
-
- msg_type_data = data.get("msg_type", None)
- content_data = data.get("content", {})
- agent_tel = content_data.get("agent_tel", None)
-
- if msg_type_data == 'group-sending':
- process_group_sending(wxchat, content_data, agent_tel)
-
- except json.JSONDecodeError as e:
- print(f"JSON解码错误: {e}, 消息内容: {message}")
- except Exception as e:
- print(f"处理消息时发生错误: {e}, 消息内容: {message}")
-
-
-
- def process_group_sending_v0(wxchat:gewe_chat.GeWeChatCom, content_data, agent_tel:str):
- # 获取登录信息
- hash_key = f"__AI_OPS_WX__:LOGININFO:{agent_tel}"
- logininfo = redis_helper.redis_helper.get_hash(hash_key)
-
- if not logininfo:
- logger.warning(f"未找到 {agent_tel} 的登录信息")
- return
-
- token_id = logininfo.get('tokenId')
- app_id = logininfo.get('appId')
- agent_wxid = logininfo.get('wxid')
-
-
- # 获取联系人列表并计算交集
- hash_key = f"__AI_OPS_WX__:CONTACTS_BRIEF:{agent_wxid}"
- cache_friend_wxids_str=redis_helper.redis_helper.get_hash_field(hash_key,"data")
- cache_friend_wxids_list=json.loads(cache_friend_wxids_str) if cache_friend_wxids_str else []
- cache_friend_wxids=[f["userName"] for f in cache_friend_wxids_list]
-
-
- wxid_contact_list_content_data = [c['wxid'] for c in content_data.get("contact_list", [])]
- intersection_wxids = list(set(cache_friend_wxids) & set(wxid_contact_list_content_data))
-
- # 发送消息
- wx_content_list = content_data.get("wx_content", [])
- for wx_content in wx_content_list:
- if wx_content["type"] == "text":
- send_text_message(wxchat, token_id, app_id, agent_wxid, intersection_wxids, wx_content["text"])
- elif wx_content["type"] == "image_url":
- send_image_message(wxchat, token_id, app_id, agent_wxid, intersection_wxids, wx_content.get("image_url", {}).get("url"))
- elif wx_content["type"] == "tts":
- send_tts_message(wxchat, token_id, app_id, agent_wxid, intersection_wxids, wx_content["text"])
-
-
-
- def process_group_sending(wxchat:gewe_chat.GeWeChatCom, content_data, agent_tel:str):
- # 获取登录信息
- hash_key = f"__AI_OPS_WX__:LOGININFO:{agent_tel}"
- logininfo = redis_helper.redis_helper.get_hash(hash_key)
-
- if not logininfo:
- logger.warning(f"未找到 {agent_tel} 的登录信息")
- return
-
- token_id = logininfo.get('tokenId')
- app_id = logininfo.get('appId')
- agent_wxid = logininfo.get('wxid')
-
-
- # 获取联系人列表并计算交集
- hash_key = f"__AI_OPS_WX__:CONTACTS_BRIEF:{agent_wxid}"
- cache_friend_wxids_str=redis_helper.redis_helper.get_hash_field(hash_key,"data")
- cache_friend_wxids_list=json.loads(cache_friend_wxids_str) if cache_friend_wxids_str else []
- cache_friend_wxids=[f["userName"] for f in cache_friend_wxids_list]
-
-
- # 获取群交集
- hash_key = f"__AI_OPS_WX__:GROUPS_INFO:{agent_wxid}"
- cache_chatrooms = redis_helper.redis_helper.get_hash(hash_key)
- cache_chatroom_ids=cache_chatrooms.keys()
-
-
-
- wxid_contact_list_content_data = [c['wxid'] for c in content_data.get("contact_list", [])]
- intersection_friend_wxids = list(set(cache_friend_wxids) & set(wxid_contact_list_content_data))
- intersection_chatroom_ids = list(set(cache_chatroom_ids) & set(wxid_contact_list_content_data))
-
- intersection_wxids=intersection_friend_wxids+intersection_chatroom_ids
-
- # 发送消息
- wx_content_list = content_data.get("wx_content", [])
- # for wx_content in wx_content_list:
- # if wx_content["type"] == "text":
- # send_text_message(wxchat, token_id, app_id, agent_wxid, intersection_wxids, wx_content["text"])
- # elif wx_content["type"] == "image_url":
- # send_image_message(wxchat, token_id, app_id, agent_wxid, intersection_wxids, wx_content.get("image_url", {}).get("url"))
- # elif wx_content["type"] == "tts":
- # send_tts_message(wxchat, token_id, app_id, agent_wxid, intersection_wxids, wx_content["text"])
- wxchat.forward_video_aeskey = ''
- wxchat.forward_video_cdnvideourl = ''
- wxchat.forward_video_length = 0
-
- for intersection_wxid in intersection_wxids:
- for wx_content in wx_content_list:
- if wx_content["type"] == "text":
- send_text_message(wxchat, token_id, app_id, agent_wxid, [intersection_wxid], wx_content["text"])
- elif wx_content["type"] == "image_url":
- send_image_message(wxchat, token_id, app_id, agent_wxid, [intersection_wxid], wx_content.get("image_url", {}).get("url"))
- elif wx_content["type"] == "tts":
- send_tts_message(wxchat, token_id, app_id, agent_wxid, [intersection_wxid], wx_content["text"])
- elif wx_content["type"] == "file":
- send_file_message(wxchat, token_id, app_id, agent_wxid, [intersection_wxid], wx_content.get("file_url", {}).get("url"))
-
-
-
- def send_text_message(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, agent_wxid, intersection_wxids, text):
- for t in intersection_wxids:
- # 发送文本消息
- ret,ret_msg,res = wxchat.post_text(token_id, app_id, t, text)
- logger.info(f'{agent_wxid} 向 {t} 发送文字【{text}】')
-
- # 构造对话消息并发送到 Kafka
- input_wx_content_dialogue_message = [{"type": "text", "text": text}]
- input_message = utils.dialogue_message(agent_wxid, t, input_wx_content_dialogue_message)
- kafka_helper.kafka_client.produce_message(input_message)
- logger.info("发送对话 %s", input_message)
-
- # 等待随机时间
- time.sleep(random.uniform(5, 15))
-
- def send_image_message(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, agent_wxid, intersection_wxids, image_url):
-
- aeskey, cdnthumburl, cdnthumblength, cdnthumbheight, cdnthumbwidth, length, md5 = "", "", 0, 0, 0, 0, ""
-
- for t in intersection_wxids:
- if t == intersection_wxids[0]:
- # 发送图片
- ret,ret_msg,res = wxchat.post_image(token_id, app_id, t, image_url)
- if ret==200:
- aeskey = res["aesKey"]
- cdnthumburl = res["fileId"]
- cdnthumblength = res["cdnThumbLength"]
- cdnthumbheight = res["height"]
- cdnthumbwidth = res["width"]
- length = res["length"]
- md5 = res["md5"]
- logger.info(f'{agent_wxid} 向 {t} 发送图片【{image_url}】{ret_msg}')
- else:
- logger.warning(f'{agent_wxid} 向 {t} 发送图片【{image_url}】{ret_msg}')
-
- else:
- if aeskey !="":
- # 转发图片
- res,ret,ret_msg= wxchat.forward_image(token_id, app_id, t, aeskey, cdnthumburl, cdnthumblength, cdnthumbheight, cdnthumbwidth, length, md5)
- logger.info(f'{agent_wxid} 向 {t} 转发图片【{image_url}】{ret_msg}')
- else:
- # 发送图片
- ret,ret_msg,res = wxchat.post_image(token_id, app_id, t, image_url)
- if ret==200:
- aeskey = res["aesKey"]
- cdnthumburl = res["fileId"]
- cdnthumblength = res["cdnThumbLength"]
- cdnthumbheight = res["height"]
- cdnthumbwidth = res["width"]
- length = res["length"]
- md5 = res["md5"]
- logger.info(f'{agent_wxid} 向 {t} 发送图片【{image_url}】{ret_msg}')
- else:
- logger.warning(f'{agent_wxid} 向 {t} 发送图片【{image_url}】{ret_msg}')
-
- # 构造对话消息并发送到 Kafka
- wx_content_dialogue_message = [{"type": "image_url", "image_url": {"url": image_url}}]
- input_message = utils.dialogue_message(agent_wxid, t, wx_content_dialogue_message)
- kafka_helper.kafka_client.produce_message(input_message)
- logger.info("发送对话 %s", input_message)
-
- # 等待随机时间
- time.sleep(random.uniform(5, 15))
-
- def send_tts_message(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, agent_wxid, intersection_wxids, text):
-
- voice_during,voice_url=utils.wx_voice(text)
- for t in intersection_wxids:
- # 发送送语音消息
- if voice_url:
- ret,ret_msg,res = wxchat.post_voice(token_id, app_id, t, voice_url,voice_during)
- if ret==200:
- logger.info(f'{agent_wxid} 向 {t} 发送语音文本【{text}】{ret_msg}')
-
- # 构造对话消息并发送到 Kafka
- input_wx_content_dialogue_message = [{"type": "text", "text": text}]
- input_message = utils.dialogue_message(agent_wxid, t, input_wx_content_dialogue_message)
- kafka_helper.kafka_client.produce_message(input_message)
- logger.info("发送对话 %s", input_message)
- else:
- logger.warning((f'{agent_wxid} 向 {t} 发送语音文本【{text}】{ret_msg}'))
- else:
- logger.warning((f'{agent_wxid} 向 {t} 发送语音文本【{text}】出错'))
-
- # 等待随机时间
- time.sleep(random.uniform(5, 15))
-
- def send_file_message(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, agent_wxid, intersection_wxids, file_url):
-
- parsed_url = urlparse(file_url)
- path = parsed_url.path
- # 从路径中提取文件名
- filename = path.split('/')[-1]
- # 获取扩展名
- _, ext = os.path.splitext(filename)
-
- if ext == '.mp4':
- send_video_message(wxchat, token_id, app_id, agent_wxid, intersection_wxids, file_url)
- else:
- send_other_file_message(wxchat, token_id, app_id, agent_wxid, intersection_wxids, file_url)
- #time.sleep(random.uniform(5, 15))
-
-
- def send_video_message(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, agent_wxid, intersection_wxids, file_url):
- for t in intersection_wxids:
- # 发送视频消息
- parsed_url = urlparse(file_url)
- filename = os.path.basename(parsed_url.path)
- tmp_file_path = os.path.join(os.getcwd(),'tmp', filename) # 拼接完整路径
- thumbnail_path=tmp_file_path.replace('.mp4','.jpg')
- video_thumb_url,video_duration =utils.download_video_and_get_thumbnail(file_url,thumbnail_path)
-
- print(f'视频缩略图 {video_thumb_url} 时长 {video_duration}')
- if wxchat.forward_video_aeskey == '':
- ret,ret_msg,res = wxchat.post_video(token_id, app_id, t, file_url,video_thumb_url,video_duration)
- if ret==200:
- wxchat.forward_video_aeskey = res["aesKey"]
- wxchat.forward_video_cdnvideourl = res["cdnVideoUrl"]
- wxchat.forward_video_length = res["length"]
- else:
- ret,ret_msg,res = wxchat.forward_video(token_id, app_id, t, wxchat.forward_video_aeskey, wxchat.forward_video_cdnvideourl, wxchat.forward_video_length)
- print('转发视频')
- if ret==200:
- logger.info(f'{agent_wxid} 向 {t} 发送视频【{file_url}】{ret_msg}')
- # 构造对话消息并发送到 Kafka
- input_wx_content_dialogue_message = [{"type": "file", "file_url": {"url": file_url}}]
- input_message = utils.dialogue_message(agent_wxid, t, input_wx_content_dialogue_message)
- kafka_helper.kafka_client.produce_message(input_message)
- logger.info("发送对话 %s", input_message)
- else:
- logger.warning((f'{agent_wxid} 向 {t} 发送视频【{file_url}】{ret_msg}'))
-
- # 等待随机时间
- time.sleep(random.uniform(5, 15))
-
- def send_other_file_message(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, agent_wxid, intersection_wxids, file_url):
- print('send_otherfile_message')
-
- def clean_json_string(json_str):
-
- # 删除所有控制字符(非打印字符),包括换行符、回车符等
- return re.sub(r'[\x00-\x1f\x7f]', '', json_str)
-
- # 启动 Kafka 消费者线程
- def start_kafka_consumer_thread():
- agent_tel=os.environ.get('tel', '18029274615')
- # consumer_thread = threading.Thread(target=kafka_helper.kafka_client.consume_messages, args=(agent_tel,wx_messages_process_callback,))
- consumer_thread = threading.Thread(target=kafka_helper.kafka_client.consume_messages, args=(ops_messages_process,))
- consumer_thread.daemon = True # 设置为守护线程,应用退出时会自动结束
- consumer_thread.start()
-
-
- def login_or_reconnect(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, region_id,agent_token_id,hash_key, is_reconnect=False, max_retries=5):
- """
- 封装微信登录或重连的逻辑
- """
- agent_tel=hash_key.split(":")[-1]
- retry_count = 0
- while retry_count < max_retries:
- retry_count += 1
- if is_reconnect:
- logger.info("尝试重连...")
- else:
- logger.info("获取二维码进行登录...")
-
- qr_code = wxchat.get_login_qr_code(token_id, app_id,region_id)
- base64_string = qr_code.get('qrImgBase64')
- uuid = qr_code.get('uuid')
- if not uuid:
- logger.error(f"uuid获取二维码失败: {qr_code}")
- wxchat.release_login_lock(token_id)
- break
-
-
-
- app_id = app_id or qr_code.get('appId')
- start_time = time.time()
- qr_code_urls= wxchat.qrCallback(uuid, base64_string)
- # 构造 Kafka 消息发送二维码
- k_message=utils.login_qrcode_message(token_id,agent_tel,base64_string,qr_code_urls)
- kafka_helper.kafka_client.produce_message(k_message)
- while True:
- now = time.time()
- # 如果登录超时,重新获取二维码
- if now- start_time > 150: #150 秒 二维码失效
- break
- logger.info(f"{token_id} 使用 {app_id},等待扫码登录,二维码有效时间 {150 - int(now - start_time)} 秒")
- captch_code = wxchat.get_login_wx_captch_code_from_cache(token_id)
- captch_code= captch_code if captch_code else ''
- ret,msg,res = wxchat.check_login(token_id, app_id, uuid,captch_code)
- if ret == 200:
- flag = res.get('status')
- # 构造 Kafka 消息发送登录状态
- # todo
- if flag == 2:
- logger.info(f"登录成功: {res}")
- head_img_url=res.get('headImgUrl','')
- login_info = res.get('loginInfo', {})
-
- login_info.update({'appId': app_id, 'uuid': uuid, 'tokenId': token_id,'status': 1,'headImgUrl':head_img_url,'regionId':region_id})
- cache_login_info=redis_helper.redis_helper.get_hash(hash_key)
- if 'appId' not in cache_login_info:
- login_info.update({"create_at":int(time.time()),"modify_at":int(time.time())})
- else:
- login_info.update({"modify_at":int(time.time())})
- # if 'appId' in cache_login_info:
- # login_info.update({"reg_time":datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3],"login_time":datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]})
- # else:
- # login_info.update({"login_time":datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]})
- cleaned_login_info = {k: (v if v is not None else '') for k, v in login_info.items()}
-
- redis_helper.redis_helper.set_hash(hash_key, cleaned_login_info)
- wxchat.release_login_lock(token_id)
- # 默认配置
-
- config=Models.AgentConfig.model_validate({
- "chatroomIdWhiteList": [],
- "agentTokenId": agent_token_id,
- "agentEnabled": False,
- "addContactsFromChatroomIdWhiteList": [],
- "chatWaitingMsgEnabled": True
- })
-
- config_dict=config.model_dump()
- wxid=cleaned_login_info.get('wxid',agent_tel)
- wxchat.save_wxchat_config(wxid,config_dict)
- return login_info
- else:
- logger.info(f"登录检查中: {ret}-{msg}-{res}")
-
-
- time.sleep(5)
- logger.error(f"登录失败,二维码生成 {max_retries} 次")
- wxchat.release_login_lock(token_id)
-
- def fetch_and_save_contacts(wxchat:gewe_chat.GeWeChatCom, token_id, app_id, hash_key):
- """
- 获取联系人列表并保存到缓存
- """
- ret,msg,contacts_list = wxchat.fetch_contacts_list(token_id, app_id)
- friend_wxids = contacts_list['friends'][3:] # 可以调整截取范围
- wxid = redis_helper.redis_helper.get_hash_field(hash_key, 'wxid')
-
- wxchat.save_contacts_brief_to_cache(token_id, app_id, wxid, friend_wxids)
- print(f'微信ID {wxid} 登录APPID {app_id} 成功,联系人已保存')
-
-
- def wx_login(wxchat:gewe_chat.GeWeChatCom,tel,token_id,region_id,agent_token_id):
- hash_key = f"__AI_OPS_WX__:LOGININFO:{tel}"
- login_info = redis_helper.redis_helper.get_hash(hash_key)
-
- if not login_info:
- login_info = login_or_reconnect(wxchat, token_id, '', region_id,agent_token_id,hash_key)
- else:
- app_id = login_info.get('appId')
- token_id = login_info.get('tokenId')
- wxid= login_info.get('wxid')
- # 检查是否已经登录
- is_online = wxchat.check_online(token_id, app_id)
- if is_online:
- logger.info(f'微信ID {wxid} 在APPID {app_id} 已经在线')
-
- else:
- # 尝试重连
- res = wxchat.reconnection(token_id, app_id)
- if res.get('ret') == 200:
- logger.info(f'微信ID {wxid} 在APPID {app_id} 重连成功')
-
- else:
- print("重连失败,重新登录...")
- login_info = login_or_reconnect(wxchat, token_id, app_id, region_id,agent_token_id,hash_key, is_reconnect=True)
- if login_info:
- fetch_and_save_contacts(wxchat, token_id, login_info.get('appId'), hash_key)
-
- def login_wx_captch_code_process(wxchat:gewe_chat.GeWeChatCom,message):
- msg_content = message
- cleaned_content = clean_json_string(msg_content)
- content = json.loads(cleaned_content)
- data = content.get("data", {})
-
- msg_type_data = data.get("msg_type", None)
- content_data = data.get("content", {})
- token_id = content_data.get("token_id", None)
- captch_code = content_data.get("captch_code", None)
- wxchat.save_login_wx_captch_code_to_cache(token_id,captch_code)
-
-
- def ops_messages_process(message):
- try:
- wxchat = gewe_chat.wxchat
- #print(message)
- # logger.info(f"接收到kafka消息: {json.dumps(message, separators=(',', ':'), ensure_ascii=False)}")
- logger.info(f"接收到kafka消息: {json.dumps(json.loads(message), ensure_ascii=False)}")
- msg_content = message
- cleaned_content = clean_json_string(msg_content)
- content = json.loads(cleaned_content)
- data = content.get("data", {})
- msg_type_data = data.get("msg_type", None)
- content_data = data.get("content", {})
- if msg_type_data=="login":
-
- # tel=content_data.get('tel', '18029274615')
- # token_id=content_data.get('token_id', 'f828cb3c-1039-489f-b9ae-7494d1778a15')
- # region_id=content_data.get('region_id', '440000')
- # agent_token_id=content_data.get('agent_token_id', '')
-
- tel=content_data.get('tel', '18733438393')
- token_id=content_data.get('token_id', 'c50b7d57-2efa-4a53-8c11-104a06d1e1fa')
- region_id=content_data.get('region_id', '440000')
- agent_token_id=content_data.get('agent_token_id', 'sk-fAOIdANeGXjWKW5mFybnsNZZGYU2lFLmqVY9rVFaFmjiOaWt3tcWMi')
-
- loginfo=gewe_chat.wxchat.get_login_info_from_cache(tel)
- print(loginfo)
- status=loginfo.get('status','0')
-
- if status=='1':
- logger.info(f'手机号{tel},wx_token{token_id} 已经微信登录,终止登录流程')
- return
- flag=gewe_chat.wxchat.acquire_login_lock(token_id,800)
- if flag:
- thread = threading.Thread(target=wx_login, args=(wxchat,tel,token_id,region_id,agent_token_id))
- thread.daemon = True
- thread.start()
- else:
- logger.info(f'手机号{tel}, wx_token{token_id} 登录进行中,稍后再试')
- elif msg_type_data == 'group-sending':
- agent_tel=content_data.get('agent_tel', '18029274615')
- # 使用线程处理
- #wx_messages_process_callback(agent_tel,message)
-
- thread = threading.Thread(target=wx_messages_process_callback, args=(agent_tel,message,))
- thread.daemon = True
- thread.start()
- elif msg_type_data == 'login_wx_captch_code':
- thread = threading.Thread(target=login_wx_captch_code_process, args=(wxchat,message,))
- thread.daemon = True
- thread.start()
- else:
- print(f'未处理息类型 {msg_type_data}')
-
- except Exception as e:
- print(f"处理消息时发生错误: {e}, 消息内容: {message}")
|