From b1d7b3bf0626bf57b2ed9ef281083daf110a9836 Mon Sep 17 00:00:00 2001
From: H Vs <vson.iwork@outlook.com>
Date: Wed, 18 Dec 2024 15:19:03 +0800
Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4pkl?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

---
 channel/wechat/wechat_channel.py |  41 ++-------
 channel/wechat/wechat_message.py |   1 -
 common/kafka_helper.py           |   4 +-
 config.json                      |   5 +-
 plugins/healthai/healthai.py     | 143 +++++++++++++------------------
 plugins/tool/tool.py             |   2 +-
 6 files changed, 71 insertions(+), 125 deletions(-)

diff --git a/channel/wechat/wechat_channel.py b/channel/wechat/wechat_channel.py
index b3daa21..86ceb9a 100644
--- a/channel/wechat/wechat_channel.py
+++ b/channel/wechat/wechat_channel.py
@@ -132,7 +132,7 @@ class WechatChannel(ChatChannel):
             itchat.instance.receivingRetryCount = 600  # 修改断线超时时间
             # login by scan QRCode
             hotReload = conf().get("hot_reload", False)
-            status_path = os.path.join(get_appdata_dir(), "itchat.pkl")
+            status_path = os.path.join(get_appdata_dir(), "itchat","itchat.pkl")
             # with open(status_path, 'rb') as file:
             #     data = pickle.load(file)
             # logger.info(data)
@@ -147,14 +147,8 @@ class WechatChannel(ChatChannel):
             self.user_id = itchat.instance.storageClass.userName
             self.name = itchat.instance.storageClass.nickName
             logger.info("Wechat login success, user_id: {}, nickname: {}".format(self.user_id, self.name))
-            
-            
-            
 
              # 创建一个线程来运行 consume_messages
-            # kafka_thread = threading.Thread(target=consume_messages, args=('47.116.67.214:9092', 'ai-test-group', topic,self.name))
-            # kafka_client=KafkaClient()
-            # kafka_thread = threading.Thread(target=consume_wx_messages, args=('47.116.67.214:9092', 'ai-test-group', topic,self.name))
             kafka_thread = threading.Thread(target=kafka_helper.kafka_client.consume_messages, args=(wx_messages_process_callback, self.name))
 
             kafka_thread.start()
@@ -170,18 +164,12 @@ class WechatChannel(ChatChannel):
             agent_tel=agent_info.get("agent_tel",None)
             # friends=itchat.get_contact(update=True)[1:]
             friends=itchat.get_friends(update=True)[1:]
-            # logger.info("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
-            # logger.info(friends)
-            # logger.info("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
             save_friends_to_redis(agent_tel,agent_nickname, friends)
             logger.info("启动好友同步")
             # start message listener
 
             logger.info("启动itchat")
             itchat.run()
-            
-            # 运行kafka
-            # 
 
         except Exception as e:
             logger.exception(e)
@@ -230,24 +218,6 @@ class WechatChannel(ChatChannel):
             logger.debug("[WX]receive voice msg: {}".format(cmsg.content))
         elif cmsg.ctype == ContextType.IMAGE:
             logger.debug("[WX]receive image msg: {}".format(cmsg.content))
-            # print(cmsg.content)
-            file_path = cmsg.content
-            logger.info(f"on_handle_context: 获取到图片路径 {file_path}")
-            oss_access_key_id="LTAI5tRTG6pLhTpKACJYoPR5"
-            oss_access_key_secret="E7dMzeeMxq4VQvLg7Tq7uKf3XWpYfN"
-            oss_endpoint="http://oss-cn-shanghai.aliyuncs.com"
-            oss_bucket_name="cow-agent"
-            oss_image_url = upload_oss(oss_access_key_id, oss_access_key_secret, oss_endpoint, oss_bucket_name, file_path, f'cow/{os.path.basename(file_path)}')
-            print(f"oss_image_url:{oss_image_url}")
-            input_content = oss_image_url
-            input_from_user_nickname = cmsg.from_user_nickname
-            input_to_user_nickname = cmsg.to_user_nickname 
-
-            input_wx_content_dialogue_message=[{"type": "image_url", "image_url": {"url": input_content}}]
-            input_message=dialogue_message(input_from_user_nickname,input_to_user_nickname,input_wx_content_dialogue_message)
-            kafka_helper.kafka_client.produce_message(input_message)
-            logger.info("发送对话 %s",input_message)
-
         elif cmsg.ctype == ContextType.PATPAT:
             logger.debug("[WX]receive patpat msg: {}".format(cmsg.content))
         elif cmsg.ctype == ContextType.TEXT:
@@ -270,7 +240,6 @@ class WechatChannel(ChatChannel):
             kafka_helper.kafka_client.produce_message(input_message)
             logger.info("发送对话 %s",input_message)
 
-
         else:
             logger.debug("[WX]receive msg: {}, cmsg={}".format(cmsg.content, cmsg))
         context = self._compose_context(cmsg.ctype, cmsg.content, isgroup=False, msg=cmsg)
@@ -534,7 +503,7 @@ def wx_messages_process_callback(user_nickname,message):
                             kafka_helper.kafka_client.produce_message(message)
                             logger.info("发送对话 %s",message)
 
-                            time.sleep(3)
+                            time.sleep(10)
                         # 处理图片   
                         elif wx_content.get("type",None) == 'image_url':
                             print('发送图片')
@@ -566,7 +535,7 @@ def wx_messages_process_callback(user_nickname,message):
                             message=dialogue_message(agent_nickname_data,friend.get("NickName",None),wx_content_dialogue_message)
                             kafka_helper.kafka_client.produce_message(message)
                             logger.info("发送对话 %s",message)
-                            time.sleep(3)
+                            time.sleep(10)
                         #处理文件
                         elif wx_content.get("type",None) == 'file':
                             print('处理文件')
@@ -593,7 +562,7 @@ def wx_messages_process_callback(user_nickname,message):
                                 message=dialogue_message(agent_nickname_data,friend.get("NickName",None),wx_content_dialogue_message)
                                 kafka_helper.kafka_client.produce_message(message)
                                 logger.info("发送对话 %s",message)
-                                time.sleep(3)
+                                time.sleep(10)
 
                             elif ext in ['.mp4']:
 
@@ -608,7 +577,7 @@ def wx_messages_process_callback(user_nickname,message):
                                 message=dialogue_message(agent_nickname_data,friend.get("NickName",None),wx_content_dialogue_message)
                                 kafka_helper.kafka_client.produce_message(message)
                                 logger.info("发送对话 %s",message)
-                                time.sleep(3)
+                                time.sleep(10)
                                 
                             else:
                                 logger.warning(f'暂不支持 {ext} 文件的处理')
diff --git a/channel/wechat/wechat_message.py b/channel/wechat/wechat_message.py
index a037df0..e7109d6 100644
--- a/channel/wechat/wechat_message.py
+++ b/channel/wechat/wechat_message.py
@@ -10,7 +10,6 @@ from lib.itchat.content import *
 class WechatMessage(ChatMessage):
     def __init__(self, itchat_msg, is_group=False):
         super().__init__(itchat_msg)
-        # print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
         self.msg_id = itchat_msg["MsgId"]
         self.create_time = itchat_msg["CreateTime"]
         self.is_group = is_group
diff --git a/common/kafka_helper.py b/common/kafka_helper.py
index 855de2e..006e61d 100644
--- a/common/kafka_helper.py
+++ b/common/kafka_helper.py
@@ -9,7 +9,9 @@ class KafkaClient:
     def __init__(self):
         
         bootstrap_servers=conf().get("kafka_bootstrap_servers")
-        consumer_group='aiops-wx-group'
+        agent_tel=os.environ.get('tel', '19200137635')
+        consumer_group=f'aiops-wx_{agent_tel}'
+        print(f'kafka消费组 {consumer_group}')
         topic="topic.aiops.wx"
 
         self.bootstrap_servers = bootstrap_servers
diff --git a/config.json b/config.json
index 9f15c8f..47a20b5 100644
--- a/config.json
+++ b/config.json
@@ -20,9 +20,10 @@
   "group_welcome_msg": "", 
   "trigger_by_self": true, 
   "voice_to_text":"ali",
+  "text_to_voice":"ali",
   "speech_recognition": true,
-  "group_speech_recognition": true,
-  "voice_reply_voice": false,
+  "group_speech_recognition": false,
+  "voice_reply_voice": true,
   "conversation_max_tokens": 2500,
   "expires_in_seconds": 300,
   "character_desc": "",
diff --git a/plugins/healthai/healthai.py b/plugins/healthai/healthai.py
index d4779c1..a2d9f6d 100644
--- a/plugins/healthai/healthai.py
+++ b/plugins/healthai/healthai.py
@@ -20,6 +20,10 @@ from bot.session_manager import SessionManager
 
 from bot.chatgpt.chat_gpt_session import ChatGPTSession
 
+from common import kafka_helper
+
+import time
+
 @plugins.register(
     name="healthai",
     desire_priority=-1,
@@ -74,7 +78,7 @@ class healthai(Plugin):
         user_id = msg.from_user_id
         content = context.content
         isgroup = e_context["context"].get("isgroup", False)
-
+        
         context.get("msg").prepare()
 
         logger.info(f'当前缓存:self.params_cache:{self.params_cache}') 
@@ -152,8 +156,18 @@ class healthai(Plugin):
             logger.info('删除图片')
             os.remove(file_path)
 
+            input_content = file_content
+            input_from_user_nickname = msg.from_user_nickname
+            input_to_user_nickname = msg.to_user_nickname 
+            input_wx_content_dialogue_message=[{"type": "image_url", "image_url": {"url": input_content}}]
+            input_message=dialogue_message(input_from_user_nickname,input_to_user_nickname,input_wx_content_dialogue_message)
+            kafka_helper.kafka_client.produce_message(input_message)
+            logger.info("发送对话 %s",input_message)
+
+
+
         if context.type == ContextType.FILE:
-            logger.info('处理图片')
+            logger.info('处理文件')
             file_path = context.content
             logger.info(f"on_handle_context: 获取到文件路径 {file_path}")
             if user_id in self.params_cache:
@@ -180,6 +194,14 @@ class healthai(Plugin):
             logger.info('删除文件')
             os.remove(file_path)
 
+            # input_content = file_content
+            # input_from_user_nickname = msg.from_user_nickname
+            # input_to_user_nickname = msg.to_user_nickname 
+            # input_wx_content_dialogue_message=[{"type": "image_url", "image_url": {"url": input_content}}]
+            # input_message=dialogue_message(input_from_user_nickname,input_to_user_nickname,input_wx_content_dialogue_message)
+            # kafka_helper.kafka_client.produce_message(input_message)
+            # logger.info("发送对话 %s",input_message)
+
         # 先回应
         if 'previous_prompt' in self.params_cache[user_id] and 'last_content' in self.params_cache[user_id] and contains_keywords(self.params_cache[user_id]['previous_prompt']):
             logger.info('先回应')
@@ -227,87 +249,6 @@ class healthai(Plugin):
                 e_context.action = EventAction.BREAK
                 return
 
-
-       
-
-    def on_handle_context2(self, e_context: EventContext):
-        context = e_context["context"]
-
-        # 检查 context 类型
-        if context.type not in {ContextType.TEXT, ContextType.SHARING, ContextType.FILE, ContextType.IMAGE}:
-            return
-
-        msg: ChatMessage = context["msg"]
-        user_id = msg.from_user_id
-        content = context.content
-        is_group = context.get("isgroup", False)
-
-        # 准备消息
-        context.get("msg").prepare()
-
-        logger.info(f'当前缓存:self.params_cache:{self.params_cache}')
-
-        # 初始化用户缓存
-        user_cache = self.params_cache.setdefault(user_id, {})
-        if not user_cache:
-            logger.info(f'初始化缓存:{self.params_cache}')
-
-        previous_prompt = user_cache.get('previous_prompt')
-        last_content = user_cache.get('last_content')
-
-        # 更新 previous_prompt
-        if context.type == ContextType.TEXT and previous_prompt and contains_keywords(previous_prompt):
-            user_cache['previous_prompt'] = msg.content
-
-        # 处理 previous_prompt 和 last_content
-        if previous_prompt and last_content and contains_keywords(previous_prompt):
-            logger.info('先回应')
-            receiver = user_id
-            itchat_content = f'@{msg.actual_user_nickname}' if is_group else ''
-            itchat_content += "已经收到,立刻为您服务"
-
-            if contains_keywords(previous_prompt):
-                logger.info(f'发送消息: {itchat_content}')
-                itchat.send(itchat_content, toUserName=receiver)
-            e_context.action = EventAction.BREAK
-
-            # 清空缓存
-            self.params_cache.clear()
-            logger.info(f'清空缓存后:{self.params_cache}')
-        else:
-            if not is_group:
-                reply = Reply()
-                reply.type = ReplyType.TEXT
-                reply.content = "您刚刚上传了,请问我有什么可以帮您的呢?"
-                e_context["reply"] = reply
-            e_context.action = EventAction.BREAK
-
-        if context.type in [ContextType.FILE,ContextType.IMAGE]:
-            logger.info('处理上传')
-            file_path = context.content
-            logger.info(f"on_handle_context: 获取到图片路径 {file_path},{user_id in self.params_cache}")
-            if user_id in self.params_cache:
-                if 'previous_prompt' not in self.params_cache[user_id] and not e_context['context']['isgroup']:
-                    reply = Reply()
-                    reply.type = ReplyType.TEXT
-                    if context.type==ContextType.FILE:
-                        reply.content = f"您刚刚上传文件,请问我有什么可以帮您的呢?"
-                    else:
-                        reply.content = f"您刚刚上传图片,请问我有什么可以帮您的呢?"
-                    e_context["reply"] = reply
-                    e_context.action = EventAction.BREAK
-
-                file_content = upload_oss(self.oss_access_key_id, self.oss_access_key_secret, self.oss_endpoint, self.oss_bucket_name, file_path, f'cow/{os.path.basename(file_path)}')
-                # 确保 'urls' 键存在,并且是一个列表
-                if 'urls' not in self.params_cache[user_id]:
-                    self.params_cache[user_id]['urls'] = []
-
-                # 添加文件内容到 'urls' 列表
-                self.params_cache[user_id]['urls'].append(file_content)
-
-            logger.info('删除图片')
-            os.remove(file_path)   
-
     def generate_openai_messages_content(self, last_content,prompt):
         content = []
 
@@ -453,4 +394,38 @@ def contains_keywords_by_re(text):
 
 def contains_keywords(text):
     keywords = ["分析", "总结", "报告", "描述","说说","讲述","讲讲","讲一下","图片"]
-    return any(keyword in text for keyword in keywords)
\ No newline at end of file
+    return any(keyword in text for keyword in keywords)
+
+def dialogue_message(nickname_from,nickname_to,wx_content):
+    """
+    构造消息的 JSON 数据
+    :param contents: list,包含多个消息内容,每个内容为字典,如:
+                     [{"type": "text", "text": "AAAAAAA"},
+                      {"type": "image_url", "image_url": {"url": "https://AAAAA.jpg"}},
+                      {"type":"file","file_url":{"url":"https://AAAAA.pdf"}}
+                      ]
+    :return: JSON 字符串
+    """
+
+    # 获取当前时间戳,精确到毫秒
+    current_timestamp = int(time.time() * 1000)
+
+    # 获取当前时间,格式化为 "YYYY-MM-DD HH:MM:SS"
+    current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
+    
+    # 构造 JSON 数据
+    data = {
+        "messageId": str(current_timestamp),
+        "topic": "topic.aiops.wx",
+        "time": current_time,
+        "data": {
+            "msg_type": "dialogue",
+            "content": {
+                "nickname_from": nickname_from,
+                "nickname_to": nickname_to,
+                "wx_content":wx_content
+            }
+        }
+    }
+
+    return json.dumps(data, separators=(',', ':'), ensure_ascii=False)
\ No newline at end of file
diff --git a/plugins/tool/tool.py b/plugins/tool/tool.py
index fe36a68..4a2b09e 100644
--- a/plugins/tool/tool.py
+++ b/plugins/tool/tool.py
@@ -218,7 +218,7 @@ class Tool(Plugin):
             "url_get_use_summary": kwargs.get("url_get_use_summary", True),  # 是否对返回结果使用tool功能
             # for wechat tool
             "wechat_hot_reload": kwargs.get("wechat_hot_reload", True),  # 是否使用热重载的方式发送wechat
-            "wechat_cpt_path": kwargs.get("wechat_cpt_path", os.path.join(get_appdata_dir(), "itchat.pkl")),  # wechat 配置文件(`itchat.pkl`)
+            "wechat_cpt_path": kwargs.get("wechat_cpt_path", os.path.join(get_appdata_dir(), "itchat", "itchat.pkl")),  # wechat 配置文件(`itchat.pkl`)
             "wechat_send_group": kwargs.get("wechat_send_group", False),  # 是否向群组发送消息
             "wechat_nickname_mapping": kwargs.get("wechat_nickname_mapping", "{}"),  # 关于人的代号映射关系。键为代号值为微信名(昵称、备注名均可)
             # for wikipedia tool