diff --git a/channel/wechat/wechat_channel.py b/channel/wechat/wechat_channel.py index b3daa21..86ceb9a 100644 --- a/channel/wechat/wechat_channel.py +++ b/channel/wechat/wechat_channel.py @@ -132,7 +132,7 @@ class WechatChannel(ChatChannel): itchat.instance.receivingRetryCount = 600 # 修改断线超时时间 # login by scan QRCode hotReload = conf().get("hot_reload", False) - status_path = os.path.join(get_appdata_dir(), "itchat.pkl") + status_path = os.path.join(get_appdata_dir(), "itchat","itchat.pkl") # with open(status_path, 'rb') as file: # data = pickle.load(file) # logger.info(data) @@ -147,14 +147,8 @@ class WechatChannel(ChatChannel): self.user_id = itchat.instance.storageClass.userName self.name = itchat.instance.storageClass.nickName logger.info("Wechat login success, user_id: {}, nickname: {}".format(self.user_id, self.name)) - - - # 创建一个线程来运行 consume_messages - # kafka_thread = threading.Thread(target=consume_messages, args=('47.116.67.214:9092', 'ai-test-group', topic,self.name)) - # kafka_client=KafkaClient() - # kafka_thread = threading.Thread(target=consume_wx_messages, args=('47.116.67.214:9092', 'ai-test-group', topic,self.name)) kafka_thread = threading.Thread(target=kafka_helper.kafka_client.consume_messages, args=(wx_messages_process_callback, self.name)) kafka_thread.start() @@ -170,18 +164,12 @@ class WechatChannel(ChatChannel): agent_tel=agent_info.get("agent_tel",None) # friends=itchat.get_contact(update=True)[1:] friends=itchat.get_friends(update=True)[1:] - # logger.info("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~") - # logger.info(friends) - # logger.info("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~") save_friends_to_redis(agent_tel,agent_nickname, friends) logger.info("启动好友同步") # start message listener logger.info("启动itchat") itchat.run() - - # 运行kafka - # except Exception as e: logger.exception(e) @@ -230,24 +218,6 @@ class WechatChannel(ChatChannel): logger.debug("[WX]receive voice msg: {}".format(cmsg.content)) elif cmsg.ctype == ContextType.IMAGE: logger.debug("[WX]receive image msg: {}".format(cmsg.content)) - # print(cmsg.content) - file_path = cmsg.content - logger.info(f"on_handle_context: 获取到图片路径 {file_path}") - oss_access_key_id="LTAI5tRTG6pLhTpKACJYoPR5" - oss_access_key_secret="E7dMzeeMxq4VQvLg7Tq7uKf3XWpYfN" - oss_endpoint="http://oss-cn-shanghai.aliyuncs.com" - oss_bucket_name="cow-agent" - oss_image_url = upload_oss(oss_access_key_id, oss_access_key_secret, oss_endpoint, oss_bucket_name, file_path, f'cow/{os.path.basename(file_path)}') - print(f"oss_image_url:{oss_image_url}") - input_content = oss_image_url - input_from_user_nickname = cmsg.from_user_nickname - input_to_user_nickname = cmsg.to_user_nickname - - input_wx_content_dialogue_message=[{"type": "image_url", "image_url": {"url": input_content}}] - input_message=dialogue_message(input_from_user_nickname,input_to_user_nickname,input_wx_content_dialogue_message) - kafka_helper.kafka_client.produce_message(input_message) - logger.info("发送对话 %s",input_message) - elif cmsg.ctype == ContextType.PATPAT: logger.debug("[WX]receive patpat msg: {}".format(cmsg.content)) elif cmsg.ctype == ContextType.TEXT: @@ -270,7 +240,6 @@ class WechatChannel(ChatChannel): kafka_helper.kafka_client.produce_message(input_message) logger.info("发送对话 %s",input_message) - else: logger.debug("[WX]receive msg: {}, cmsg={}".format(cmsg.content, cmsg)) context = self._compose_context(cmsg.ctype, cmsg.content, isgroup=False, msg=cmsg) @@ -534,7 +503,7 @@ def wx_messages_process_callback(user_nickname,message): kafka_helper.kafka_client.produce_message(message) logger.info("发送对话 %s",message) - time.sleep(3) + time.sleep(10) # 处理图片 elif wx_content.get("type",None) == 'image_url': print('发送图片') @@ -566,7 +535,7 @@ def wx_messages_process_callback(user_nickname,message): message=dialogue_message(agent_nickname_data,friend.get("NickName",None),wx_content_dialogue_message) kafka_helper.kafka_client.produce_message(message) logger.info("发送对话 %s",message) - time.sleep(3) + time.sleep(10) #处理文件 elif wx_content.get("type",None) == 'file': print('处理文件') @@ -593,7 +562,7 @@ def wx_messages_process_callback(user_nickname,message): message=dialogue_message(agent_nickname_data,friend.get("NickName",None),wx_content_dialogue_message) kafka_helper.kafka_client.produce_message(message) logger.info("发送对话 %s",message) - time.sleep(3) + time.sleep(10) elif ext in ['.mp4']: @@ -608,7 +577,7 @@ def wx_messages_process_callback(user_nickname,message): message=dialogue_message(agent_nickname_data,friend.get("NickName",None),wx_content_dialogue_message) kafka_helper.kafka_client.produce_message(message) logger.info("发送对话 %s",message) - time.sleep(3) + time.sleep(10) else: logger.warning(f'暂不支持 {ext} 文件的处理') diff --git a/channel/wechat/wechat_message.py b/channel/wechat/wechat_message.py index a037df0..e7109d6 100644 --- a/channel/wechat/wechat_message.py +++ b/channel/wechat/wechat_message.py @@ -10,7 +10,6 @@ from lib.itchat.content import * class WechatMessage(ChatMessage): def __init__(self, itchat_msg, is_group=False): super().__init__(itchat_msg) - # print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") self.msg_id = itchat_msg["MsgId"] self.create_time = itchat_msg["CreateTime"] self.is_group = is_group diff --git a/common/kafka_helper.py b/common/kafka_helper.py index 855de2e..006e61d 100644 --- a/common/kafka_helper.py +++ b/common/kafka_helper.py @@ -9,7 +9,9 @@ class KafkaClient: def __init__(self): bootstrap_servers=conf().get("kafka_bootstrap_servers") - consumer_group='aiops-wx-group' + agent_tel=os.environ.get('tel', '19200137635') + consumer_group=f'aiops-wx_{agent_tel}' + print(f'kafka消费组 {consumer_group}') topic="topic.aiops.wx" self.bootstrap_servers = bootstrap_servers diff --git a/config.json b/config.json index 9f15c8f..47a20b5 100644 --- a/config.json +++ b/config.json @@ -20,9 +20,10 @@ "group_welcome_msg": "", "trigger_by_self": true, "voice_to_text":"ali", + "text_to_voice":"ali", "speech_recognition": true, - "group_speech_recognition": true, - "voice_reply_voice": false, + "group_speech_recognition": false, + "voice_reply_voice": true, "conversation_max_tokens": 2500, "expires_in_seconds": 300, "character_desc": "", diff --git a/plugins/healthai/healthai.py b/plugins/healthai/healthai.py index d4779c1..a2d9f6d 100644 --- a/plugins/healthai/healthai.py +++ b/plugins/healthai/healthai.py @@ -20,6 +20,10 @@ from bot.session_manager import SessionManager from bot.chatgpt.chat_gpt_session import ChatGPTSession +from common import kafka_helper + +import time + @plugins.register( name="healthai", desire_priority=-1, @@ -74,7 +78,7 @@ class healthai(Plugin): user_id = msg.from_user_id content = context.content isgroup = e_context["context"].get("isgroup", False) - + context.get("msg").prepare() logger.info(f'当前缓存:self.params_cache:{self.params_cache}') @@ -152,8 +156,18 @@ class healthai(Plugin): logger.info('删除图片') os.remove(file_path) + input_content = file_content + input_from_user_nickname = msg.from_user_nickname + input_to_user_nickname = msg.to_user_nickname + input_wx_content_dialogue_message=[{"type": "image_url", "image_url": {"url": input_content}}] + input_message=dialogue_message(input_from_user_nickname,input_to_user_nickname,input_wx_content_dialogue_message) + kafka_helper.kafka_client.produce_message(input_message) + logger.info("发送对话 %s",input_message) + + + if context.type == ContextType.FILE: - logger.info('处理图片') + logger.info('处理文件') file_path = context.content logger.info(f"on_handle_context: 获取到文件路径 {file_path}") if user_id in self.params_cache: @@ -180,6 +194,14 @@ class healthai(Plugin): logger.info('删除文件') os.remove(file_path) + # input_content = file_content + # input_from_user_nickname = msg.from_user_nickname + # input_to_user_nickname = msg.to_user_nickname + # input_wx_content_dialogue_message=[{"type": "image_url", "image_url": {"url": input_content}}] + # input_message=dialogue_message(input_from_user_nickname,input_to_user_nickname,input_wx_content_dialogue_message) + # kafka_helper.kafka_client.produce_message(input_message) + # logger.info("发送对话 %s",input_message) + # 先回应 if 'previous_prompt' in self.params_cache[user_id] and 'last_content' in self.params_cache[user_id] and contains_keywords(self.params_cache[user_id]['previous_prompt']): logger.info('先回应') @@ -227,87 +249,6 @@ class healthai(Plugin): e_context.action = EventAction.BREAK return - - - - def on_handle_context2(self, e_context: EventContext): - context = e_context["context"] - - # 检查 context 类型 - if context.type not in {ContextType.TEXT, ContextType.SHARING, ContextType.FILE, ContextType.IMAGE}: - return - - msg: ChatMessage = context["msg"] - user_id = msg.from_user_id - content = context.content - is_group = context.get("isgroup", False) - - # 准备消息 - context.get("msg").prepare() - - logger.info(f'当前缓存:self.params_cache:{self.params_cache}') - - # 初始化用户缓存 - user_cache = self.params_cache.setdefault(user_id, {}) - if not user_cache: - logger.info(f'初始化缓存:{self.params_cache}') - - previous_prompt = user_cache.get('previous_prompt') - last_content = user_cache.get('last_content') - - # 更新 previous_prompt - if context.type == ContextType.TEXT and previous_prompt and contains_keywords(previous_prompt): - user_cache['previous_prompt'] = msg.content - - # 处理 previous_prompt 和 last_content - if previous_prompt and last_content and contains_keywords(previous_prompt): - logger.info('先回应') - receiver = user_id - itchat_content = f'@{msg.actual_user_nickname}' if is_group else '' - itchat_content += "已经收到,立刻为您服务" - - if contains_keywords(previous_prompt): - logger.info(f'发送消息: {itchat_content}') - itchat.send(itchat_content, toUserName=receiver) - e_context.action = EventAction.BREAK - - # 清空缓存 - self.params_cache.clear() - logger.info(f'清空缓存后:{self.params_cache}') - else: - if not is_group: - reply = Reply() - reply.type = ReplyType.TEXT - reply.content = "您刚刚上传了,请问我有什么可以帮您的呢?" - e_context["reply"] = reply - e_context.action = EventAction.BREAK - - if context.type in [ContextType.FILE,ContextType.IMAGE]: - logger.info('处理上传') - file_path = context.content - logger.info(f"on_handle_context: 获取到图片路径 {file_path},{user_id in self.params_cache}") - if user_id in self.params_cache: - if 'previous_prompt' not in self.params_cache[user_id] and not e_context['context']['isgroup']: - reply = Reply() - reply.type = ReplyType.TEXT - if context.type==ContextType.FILE: - reply.content = f"您刚刚上传文件,请问我有什么可以帮您的呢?" - else: - reply.content = f"您刚刚上传图片,请问我有什么可以帮您的呢?" - e_context["reply"] = reply - e_context.action = EventAction.BREAK - - file_content = upload_oss(self.oss_access_key_id, self.oss_access_key_secret, self.oss_endpoint, self.oss_bucket_name, file_path, f'cow/{os.path.basename(file_path)}') - # 确保 'urls' 键存在,并且是一个列表 - if 'urls' not in self.params_cache[user_id]: - self.params_cache[user_id]['urls'] = [] - - # 添加文件内容到 'urls' 列表 - self.params_cache[user_id]['urls'].append(file_content) - - logger.info('删除图片') - os.remove(file_path) - def generate_openai_messages_content(self, last_content,prompt): content = [] @@ -453,4 +394,38 @@ def contains_keywords_by_re(text): def contains_keywords(text): keywords = ["分析", "总结", "报告", "描述","说说","讲述","讲讲","讲一下","图片"] - return any(keyword in text for keyword in keywords) \ No newline at end of file + return any(keyword in text for keyword in keywords) + +def dialogue_message(nickname_from,nickname_to,wx_content): + """ + 构造消息的 JSON 数据 + :param contents: list,包含多个消息内容,每个内容为字典,如: + [{"type": "text", "text": "AAAAAAA"}, + {"type": "image_url", "image_url": {"url": "https://AAAAA.jpg"}}, + {"type":"file","file_url":{"url":"https://AAAAA.pdf"}} + ] + :return: JSON 字符串 + """ + + # 获取当前时间戳,精确到毫秒 + current_timestamp = int(time.time() * 1000) + + # 获取当前时间,格式化为 "YYYY-MM-DD HH:MM:SS" + current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + + # 构造 JSON 数据 + data = { + "messageId": str(current_timestamp), + "topic": "topic.aiops.wx", + "time": current_time, + "data": { + "msg_type": "dialogue", + "content": { + "nickname_from": nickname_from, + "nickname_to": nickname_to, + "wx_content":wx_content + } + } + } + + return json.dumps(data, separators=(',', ':'), ensure_ascii=False) \ No newline at end of file diff --git a/plugins/tool/tool.py b/plugins/tool/tool.py index fe36a68..4a2b09e 100644 --- a/plugins/tool/tool.py +++ b/plugins/tool/tool.py @@ -218,7 +218,7 @@ class Tool(Plugin): "url_get_use_summary": kwargs.get("url_get_use_summary", True), # 是否对返回结果使用tool功能 # for wechat tool "wechat_hot_reload": kwargs.get("wechat_hot_reload", True), # 是否使用热重载的方式发送wechat - "wechat_cpt_path": kwargs.get("wechat_cpt_path", os.path.join(get_appdata_dir(), "itchat.pkl")), # wechat 配置文件(`itchat.pkl`) + "wechat_cpt_path": kwargs.get("wechat_cpt_path", os.path.join(get_appdata_dir(), "itchat", "itchat.pkl")), # wechat 配置文件(`itchat.pkl`) "wechat_send_group": kwargs.get("wechat_send_group", False), # 是否向群组发送消息 "wechat_nickname_mapping": kwargs.get("wechat_nickname_mapping", "{}"), # 关于人的代号映射关系。键为代号值为微信名(昵称、备注名均可) # for wikipedia tool