diff --git a/app.py b/app.py index ff2a6c7..761a98d 100644 --- a/app.py +++ b/app.py @@ -37,6 +37,26 @@ def start_channel(channel_name: str): threading.Thread(target=linkai_client.start, args=(channel,)).start() except Exception as e: pass + + # try: + # if channel_name in ['wx']: + # from common import platform_client + # threading.Thread(target=platform_client.start, args=(channel,)).start() + # except Exception as e: + # pass + + try: + from common import redis_helper + threading.Thread(target=redis_helper.start).start() + except Exception as e: + pass + + try: + from common import kafka_helper + threading.Thread(target=kafka_helper.start).start() + except Exception as e: + pass + channel.startup() @@ -60,6 +80,8 @@ def run(): start_channel(channel_name) + + while True: time.sleep(1) except Exception as e: diff --git a/bot/chatgpt/chat_gpt_bot.py b/bot/chatgpt/chat_gpt_bot.py index 12a4878..a7913a8 100644 --- a/bot/chatgpt/chat_gpt_bot.py +++ b/bot/chatgpt/chat_gpt_bot.py @@ -133,10 +133,11 @@ class ChatGPTBot(Bot, OpenAIImage): # logger.info("[CHATGPT] 响应={}".format(response)) logger.info("[CHATGPT] 响应={}".format(json.dumps(response, separators=(',', ':'),ensure_ascii=False))) # logger.info("[ChatGPT] reply={}, total_tokens={}".format(response.choices[0]['message']['content'], response["usage"]["total_tokens"])) + content=response.choices[0]["message"]["content"] return { "total_tokens": response["usage"]["total_tokens"], "completion_tokens": response["usage"]["completion_tokens"], - "content": response.choices[0]["message"]["content"], + "content": content.lstrip("\n"), } except Exception as e: need_retry = retry_count < 2 diff --git a/channel/wechat/wechat_channel.py b/channel/wechat/wechat_channel.py index 8b44554..b3daa21 100644 --- a/channel/wechat/wechat_channel.py +++ b/channel/wechat/wechat_channel.py @@ -24,6 +24,21 @@ from common.utils import convert_webp_to_png from config import conf, get_appdata_dir from lib import itchat from lib.itchat.content import * +from urllib.parse import urlparse + + +import threading + +from common import kafka_helper, redis_helper + +from confluent_kafka import Consumer, KafkaException +import json,time,re +import pickle +from datetime import datetime +import oss2 + + +# from common.kafka_client import KafkaClient @itchat.msg_register([TEXT, VOICE, PICTURE, NOTE, ATTACHMENT, SHARING]) @@ -118,6 +133,9 @@ class WechatChannel(ChatChannel): # login by scan QRCode hotReload = conf().get("hot_reload", False) status_path = os.path.join(get_appdata_dir(), "itchat.pkl") + # with open(status_path, 'rb') as file: + # data = pickle.load(file) + # logger.info(data) itchat.auto_login( enableCmdQR=2, hotReload=hotReload, @@ -129,15 +147,51 @@ class WechatChannel(ChatChannel): self.user_id = itchat.instance.storageClass.userName self.name = itchat.instance.storageClass.nickName logger.info("Wechat login success, user_id: {}, nickname: {}".format(self.user_id, self.name)) + + + + + # 创建一个线程来运行 consume_messages + # kafka_thread = threading.Thread(target=consume_messages, args=('47.116.67.214:9092', 'ai-test-group', topic,self.name)) + # kafka_client=KafkaClient() + # kafka_thread = threading.Thread(target=consume_wx_messages, args=('47.116.67.214:9092', 'ai-test-group', topic,self.name)) + kafka_thread = threading.Thread(target=kafka_helper.kafka_client.consume_messages, args=(wx_messages_process_callback, self.name)) + + kafka_thread.start() + logger.info("启动kafka") + + # 好友定时同步 + agent_nickname=self.name + friend_thread =threading.Thread(target=hourly_change_save_friends, args=(agent_nickname,)) + friend_thread.start() + + # 立刻同步 + agent_info=fetch_agent_info(agent_nickname) + agent_tel=agent_info.get("agent_tel",None) + # friends=itchat.get_contact(update=True)[1:] + friends=itchat.get_friends(update=True)[1:] + # logger.info("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~") + # logger.info(friends) + # logger.info("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~") + save_friends_to_redis(agent_tel,agent_nickname, friends) + logger.info("启动好友同步") # start message listener + + logger.info("启动itchat") itchat.run() + + # 运行kafka + # + except Exception as e: logger.exception(e) def exitCallback(self): + print('主动退出') try: from common.linkai_client import chat_client if chat_client.client_id and conf().get("use_linkai"): + print('退出') _send_logout() time.sleep(2) self.auto_login_times += 1 @@ -149,6 +203,8 @@ class WechatChannel(ChatChannel): def loginCallback(self): logger.debug("Login success") + print('登录成功') + # 同步 _send_login_success() # handle_* 系列函数处理收到的消息后构造Context,然后传入produce函数中处理Context和发送回复 @@ -174,10 +230,47 @@ class WechatChannel(ChatChannel): logger.debug("[WX]receive voice msg: {}".format(cmsg.content)) elif cmsg.ctype == ContextType.IMAGE: logger.debug("[WX]receive image msg: {}".format(cmsg.content)) + # print(cmsg.content) + file_path = cmsg.content + logger.info(f"on_handle_context: 获取到图片路径 {file_path}") + oss_access_key_id="LTAI5tRTG6pLhTpKACJYoPR5" + oss_access_key_secret="E7dMzeeMxq4VQvLg7Tq7uKf3XWpYfN" + oss_endpoint="http://oss-cn-shanghai.aliyuncs.com" + oss_bucket_name="cow-agent" + oss_image_url = upload_oss(oss_access_key_id, oss_access_key_secret, oss_endpoint, oss_bucket_name, file_path, f'cow/{os.path.basename(file_path)}') + print(f"oss_image_url:{oss_image_url}") + input_content = oss_image_url + input_from_user_nickname = cmsg.from_user_nickname + input_to_user_nickname = cmsg.to_user_nickname + + input_wx_content_dialogue_message=[{"type": "image_url", "image_url": {"url": input_content}}] + input_message=dialogue_message(input_from_user_nickname,input_to_user_nickname,input_wx_content_dialogue_message) + kafka_helper.kafka_client.produce_message(input_message) + logger.info("发送对话 %s",input_message) + elif cmsg.ctype == ContextType.PATPAT: logger.debug("[WX]receive patpat msg: {}".format(cmsg.content)) elif cmsg.ctype == ContextType.TEXT: logger.debug("[WX]receive text msg: {}, cmsg={}".format(json.dumps(cmsg._rawmsg, ensure_ascii=False), cmsg)) + # content = cmsg.content # 消息内容 + # from_user_nickname = cmsg.from_user_nickname # 发送方昵称 + # to_user_nickname = cmsg.to_user_nickname # 接收方昵称 + + # wx_content_dialogue_message=[{"type": "text", "text": content}] + # message=dialogue_message(from_user_nickname,to_user_nickname,wx_content_dialogue_message) + # kafka_helper.kafka_client.produce_message(message) + # logger.info("发送对话 %s", json.dumps(message, ensure_ascii=False)) + + input_content = cmsg.content + input_from_user_nickname = cmsg.from_user_nickname + input_to_user_nickname = cmsg.to_user_nickname + + input_wx_content_dialogue_message=[{"type": "text", "text": input_content}] + input_message=dialogue_message(input_from_user_nickname,input_to_user_nickname,input_wx_content_dialogue_message) + kafka_helper.kafka_client.produce_message(input_message) + logger.info("发送对话 %s",input_message) + + else: logger.debug("[WX]receive msg: {}, cmsg={}".format(cmsg.content, cmsg)) context = self._compose_context(cmsg.ctype, cmsg.content, isgroup=False, msg=cmsg) @@ -212,6 +305,36 @@ class WechatChannel(ChatChannel): if reply.type == ReplyType.TEXT: itchat.send(reply.content, toUserName=receiver) logger.info("[WX] sendMsg={}, receiver={}".format(reply, receiver)) + # logger.info(context) + # logger.info(context["msg"]) + # // 发送kafka + # msg=context["msg"] + msg: ChatMessage = context["msg"] + # content=msg["content"] + + + is_group=msg.is_group + if not is_group: + # print(f'响应:{content}') + # 用户输入 + # input_content=msg.content + # input_from_user_nickname=msg.from_user_nickname + # input_to_user_nickname=msg.to_user_nickname + + # input_wx_content_dialogue_message=[{"type": "text", "text": input_content}] + # input_message=dialogue_message(input_from_user_nickname,input_to_user_nickname,input_wx_content_dialogue_message) + # kafka_helper.kafka_client.produce_message(input_message) + # logger.info("发送对话 %s", json.dumps(input_message, separators=(',', ':'), ensure_ascii=False)) + + # 响应用户 + output_content=reply.content + output_from_user_nickname=msg.to_user_nickname # 回复翻转 + output_to_user_nickname=msg.from_user_nickname # 回复翻转 + + output_wx_content_dialogue_message=[{"type": "text", "text": output_content}] + output_message=dialogue_message(output_from_user_nickname,output_to_user_nickname,output_wx_content_dialogue_message) + kafka_helper.kafka_client.produce_message(output_message) + logger.info("发送对话 %s", output_message) elif reply.type == ReplyType.ERROR or reply.type == ReplyType.INFO: itchat.send(reply.content, toUserName=receiver) logger.info("[WX] sendMsg={}, receiver={}".format(reply, receiver)) @@ -246,6 +369,36 @@ class WechatChannel(ChatChannel): file_storage = reply.content itchat.send_file(file_storage, toUserName=receiver) logger.info("[WX] sendFile, receiver={}".format(receiver)) + + + # msg: ChatMessage = context["msg"] + # # content=msg["content"] + + + # is_group=msg.is_group + # if not is_group: + # # print(f'响应:{content}') + # # 用户输入 + # # input_content=msg.content + # # input_from_user_nickname=msg.from_user_nickname + # # input_to_user_nickname=msg.to_user_nickname + + # # input_wx_content_dialogue_message=[{"type": "text", "text": input_content}] + # # input_message=dialogue_message(input_from_user_nickname,input_to_user_nickname,input_wx_content_dialogue_message) + # # kafka_helper.kafka_client.produce_message(input_message) + # # logger.info("发送对话 %s", json.dumps(input_message, separators=(',', ':'), ensure_ascii=False)) + + # # 响应用户 + # output_content=reply.content + # output_from_user_nickname=msg.to_user_nickname # 回复翻转 + # output_to_user_nickname=msg.from_user_nickname # 回复翻转 + + # output_wx_content_dialogue_message=[{"type": "file", "text": output_content}] + # output_message=dialogue_message(output_from_user_nickname,output_to_user_nickname,output_wx_content_dialogue_message) + # kafka_helper.kafka_client.produce_message(output_message) + # logger.info("发送对话 %s", output_message) + + elif reply.type == ReplyType.VIDEO: # 新增视频回复类型 video_storage = reply.content itchat.send_video(video_storage, toUserName=receiver) @@ -269,6 +422,7 @@ def _send_login_success(): from common.linkai_client import chat_client if chat_client.client_id: chat_client.send_login_success() + except Exception as e: pass @@ -290,3 +444,324 @@ def _send_qr_code(qrcode_list: list): except Exception as e: pass + +def clean_json_string(json_str): + + # 删除所有控制字符(非打印字符),包括换行符、回车符等 + return re.sub(r'[\x00-\x1f\x7f]', '', json_str) + +def save_friends_to_redis(agent_tel,agent_nickname, friends): + contact_list = [] + for friend in friends: + friend_data = { + "UserName": friend.UserName, + "NickName": friend.NickName, + "Signature": friend.Signature, + "Province": friend.Province, + "City": friend.City, + "Sex": str(friend.Sex), # 性别可转换为字符串存储 + "Alias": friend.Alias + } + contact_list.append(friend_data) # 将每个朋友的信息加入到列表中 + + agent_contact_list = { + "AgentTel":agent_tel, + "agent_nick_name": agent_nickname, + "contact_list": contact_list # 将朋友列表添加到字典中 + } + + # 将联系人信息保存到 Redis,使用一个合适的 key + hash_key = f"__AI_OPS_WX__:CONTACTLIST" + redis_helper.redis_helper.update_hash_field(hash_key, agent_tel, json.dumps(agent_contact_list, ensure_ascii=False)) # 设置有效期为 600 秒 + +def hourly_change_save_friends(agent_nickname): + last_hour = datetime.now().hour # 获取当前小时 + while True: + current_hour = datetime.now().hour + if current_hour != last_hour: # 检测小时是否变化 + friends=itchat.get_friends(update=True)[1:] + + agent_info=fetch_agent_info(agent_nickname) + agent_tel=agent_info.get("agent_tel",None) + save_friends_to_redis(agent_tel,agent_nickname, friends) + last_hour = current_hour + time.sleep(1) # 每秒检查一次 + +def wx_messages_process_callback(user_nickname,message): + """ + 处理消费到的 Kafka 消息(基础示例) + :param message: Kafka 消费到的消息内容 + """ + # print(user_nickname) + # print(f"Processing message: {message}") + # return True + + msg_content= message + cleaned_content = clean_json_string(msg_content) + content=json.loads(cleaned_content) + data = content.get("data", {}) + msg_type_data=data.get("msg_type",None) + content_data = data.get("content",{}) + agent_nickname_data=content_data.get("agent_nickname",None) + agent_tel=content_data.get("agent_tel",None) + + if user_nickname == agent_nickname_data and msg_type_data=='group-sending': + friends=itchat.get_friends(update=True)[1:] + contact_list_content_data=content_data.get("contact_list",None) + + # 更新好友缓存 + save_friends_to_redis(agent_tel,agent_nickname_data,friends) + + # 遍历要群发的好友 + for contact in contact_list_content_data: + nickname = contact.get("nickname",None) + if(nickname not in [friend['NickName'] for friend in friends]): + logger.warning(f'微信中没有 {nickname} 的昵称,将不会发送消息') + + for friend in friends: + if friend.get("NickName",None) == nickname: + wx_content_list=content_data.get("wx_content",[]) + for wx_content in wx_content_list: + # 处理文件 + if wx_content.get("type",None) == 'text': + wx_content_text=wx_content.get("text",None) + itchat.send(wx_content_text, toUserName=friend.get("UserName",None)) + logger.info(f"{user_nickname} 向 {nickname} 发送文字【 {wx_content_text} 】") + + # // 发送kafka + wx_content_dialogue_message=[{"type": "text", "text": wx_content_text}] + message=dialogue_message(agent_nickname_data,friend.get("NickName",None),wx_content_dialogue_message) + kafka_helper.kafka_client.produce_message(message) + logger.info("发送对话 %s",message) + + time.sleep(3) + # 处理图片 + elif wx_content.get("type",None) == 'image_url': + print('发送图片') + image_url= wx_content.get("image_url",{}) + url=image_url.get("url",None) + + # 网络图片 + logger.debug(f"[WX] start download image, img_url={url}") + pic_res = requests.get(url, stream=True) + image_storage = io.BytesIO() + size = 0 + for block in pic_res.iter_content(1024): + size += len(block) + image_storage.write(block) + logger.info(f"[WX] download image success, size={size}, img_url={url}") + image_storage.seek(0) + if ".webp" in url: + try: + image_storage = convert_webp_to_png(image_storage) + except Exception as e: + logger.error(f"Failed to convert image: {e}") + return + + itchat.send_image(image_storage, toUserName=friend.get("UserName",None)) + logger.info(f"{user_nickname} 向 {nickname} 发送图片【 {url} 】") + + # // 发送kafka + wx_content_dialogue_message=[{"type": "image_url", "image_url": {"url": url}}] + message=dialogue_message(agent_nickname_data,friend.get("NickName",None),wx_content_dialogue_message) + kafka_helper.kafka_client.produce_message(message) + logger.info("发送对话 %s",message) + time.sleep(3) + #处理文件 + elif wx_content.get("type",None) == 'file': + print('处理文件') + file_url= wx_content.get("file_url",{}) + url=file_url.get("url",None) + + # 提取路径部分 + parsed_url = urlparse(url).path + + # 获取文件名和扩展名 + filename = os.path.basename(parsed_url) # 文件名(包含扩展名) + name, ext = os.path.splitext(filename) # 分离文件名和扩展名 + if ext in ['.pdf']: + print('处理PDF文件') + + tmp_file_path=save_to_local_from_url(url) + + itchat.send_file(tmp_file_path, toUserName=friend.get("UserName",None)) + logger.info(f'删除本地{ext}文件: {tmp_file_path}') + os.remove(tmp_file_path) + logger.info(f"{user_nickname} 向 {nickname} 发送 {ext} 文件【 {url} 】") + # // 发送kafka + wx_content_dialogue_message=[{"type": "file", "file_url": {"url": url}}] + message=dialogue_message(agent_nickname_data,friend.get("NickName",None),wx_content_dialogue_message) + kafka_helper.kafka_client.produce_message(message) + logger.info("发送对话 %s",message) + time.sleep(3) + + elif ext in ['.mp4']: + + print('处理MP4文件') + tmp_file_path=save_to_local_from_url(url) + itchat.send_file(tmp_file_path, toUserName=friend.get("UserName",None)) + logger.info(f'删除本地{ext}文件: {tmp_file_path}') + os.remove(tmp_file_path) + logger.info(f"{user_nickname} 向 {nickname} 发送 {ext} 文件【 {url} 】") + # // 发送kafka + wx_content_dialogue_message=[{"type": "file", "file_url": {"url": url}}] + message=dialogue_message(agent_nickname_data,friend.get("NickName",None),wx_content_dialogue_message) + kafka_helper.kafka_client.produce_message(message) + logger.info("发送对话 %s",message) + time.sleep(3) + + else: + logger.warning(f'暂不支持 {ext} 文件的处理') + + return True + else: + return False + +def dialogue_message(nickname_from,nickname_to,wx_content): + """ + 构造消息的 JSON 数据 + :param contents: list,包含多个消息内容,每个内容为字典,如: + [{"type": "text", "text": "AAAAAAA"}, + {"type": "image_url", "image_url": {"url": "https://AAAAA.jpg"}}, + {"type":"file","file_url":{"url":"https://AAAAA.pdf"}} + ] + :return: JSON 字符串 + """ + + # 获取当前时间戳,精确到毫秒 + current_timestamp = int(time.time() * 1000) + + # 获取当前时间,格式化为 "YYYY-MM-DD HH:MM:SS" + current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + + # 构造 JSON 数据 + data = { + "messageId": str(current_timestamp), + "topic": "topic.aiops.wx", + "time": current_time, + "data": { + "msg_type": "dialogue", + "content": { + "nickname_from": nickname_from, + "nickname_to": nickname_to, + "wx_content":wx_content + } + } + } + + return json.dumps(data, separators=(',', ':'), ensure_ascii=False) + +def fetch_agent_info(agent_nickname): + + if os.environ.get('environment', 'default')=='default': + return { + "agent_nickname": agent_nickname, + "agent_tel": "19200137635" + } + + aiops_api=conf().get("aiops_api") + # 定义请求URL + url = f"{aiops_api}/business/Agent/smartinfobyname" + + # 定义请求头 + headers = { + "accept": "*/*", + "Content-Type": "application/json" + } + + # 定义请求数据 + data = { + "smartName": agent_nickname + } + + try: + # 发送POST请求 + response = requests.post(url, headers=headers, data=json.dumps(data)) + + # 确认响应状态码 + if response.status_code == 200: + response_data = response.json() + if response_data.get("code") == 200: + # 提取 smartName 和 smartPhone + data = response_data.get("data", {}) + return { + "agent_nickname": data.get("smartName"), + "agent_tel": data.get("smartPhone") + } + else: + logger.error(f"API 返回错误信息: {response_data.get('msg')}") + return None + else: + logger.error(f"请求失败,状态码:{response.status_code}") + return None + except Exception as e: + logger.error(f"请求出错: {e}") + return None + + +def save_to_local_from_url(url): + ''' + 从url保存到本地tmp目录 + ''' + + parsed_url = urlparse(url) + # 从 URL 提取文件名 + filename = os.path.basename(parsed_url.path) + # tmp_dir = os.path(__file__) # 获取系统临时目录 + # print(tmp_dir) + tmp_file_path = os.path.join(os.getcwd(),'tmp', filename) # 拼接完整路径 + + # 检查是否存在同名文件 + if os.path.exists(tmp_file_path): + logger.info(f"文件已存在,将覆盖:{tmp_file_path}") + + # 下载文件并保存到临时目录 + response = requests.get(url, stream=True) + with open(tmp_file_path, 'wb') as f: + for chunk in response.iter_content(chunk_size=1024): + if chunk: # 检查是否有内容 + f.write(chunk) + + return tmp_file_path + +def upload_oss(access_key_id, access_key_secret, endpoint, bucket_name, local_file_path, oss_file_name, expiration_days=7): + """ + 上传文件到阿里云OSS并设置生命周期规则,同时返回文件的公共访问地址。 + + :param access_key_id: 阿里云AccessKey ID + :param access_key_secret: 阿里云AccessKey Secret + :param endpoint: OSS区域对应的Endpoint + :param bucket_name: OSS中的Bucket名称 + :param local_file_path: 本地文件路径 + :param oss_file_name: OSS中的文件存储路径 + :param expiration_days: 文件保存天数,默认7天后删除 + :return: 文件的公共访问地址 + """ + + # 创建Bucket实例 + auth = oss2.Auth(access_key_id, access_key_secret) + bucket = oss2.Bucket(auth, endpoint, bucket_name) + + ### 1. 设置生命周期规则 ### + rule_id = f'delete_after_{expiration_days}_days' # 规则ID + prefix = oss_file_name.split('/')[0] + '/' # 设置规则应用的前缀为文件所在目录 + + # 定义生命周期规则 + rule = oss2.models.LifecycleRule(rule_id, prefix, status=oss2.models.LifecycleRule.ENABLED, + expiration=oss2.models.LifecycleExpiration(days=expiration_days)) + + # 设置Bucket的生命周期 + lifecycle = oss2.models.BucketLifecycle([rule]) + bucket.put_bucket_lifecycle(lifecycle) + + print(f"已设置生命周期规则:文件将在{expiration_days}天后自动删除") + + ### 2. 上传文件到OSS ### + bucket.put_object_from_file(oss_file_name, local_file_path) + + ### 3. 构建公共访问URL ### + file_url = f"http://{bucket_name}.{endpoint.replace('http://', '')}/{oss_file_name}" + + print(f"文件上传成功,公共访问地址:{file_url}") + + return file_url diff --git a/channel/wechat/wechat_message.py b/channel/wechat/wechat_message.py index e7109d6..a037df0 100644 --- a/channel/wechat/wechat_message.py +++ b/channel/wechat/wechat_message.py @@ -10,6 +10,7 @@ from lib.itchat.content import * class WechatMessage(ChatMessage): def __init__(self, itchat_msg, is_group=False): super().__init__(itchat_msg) + # print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") self.msg_id = itchat_msg["MsgId"] self.create_time = itchat_msg["CreateTime"] self.is_group = is_group diff --git a/common/kafka_helper.py b/common/kafka_helper.py new file mode 100644 index 0000000..855de2e --- /dev/null +++ b/common/kafka_helper.py @@ -0,0 +1,109 @@ +from confluent_kafka import Producer, Consumer, KafkaException, KafkaError +import os +from common.singleton import singleton +from config import conf +# 定义全局 redis_helper +kafka_client = None + +class KafkaClient: + def __init__(self): + + bootstrap_servers=conf().get("kafka_bootstrap_servers") + consumer_group='aiops-wx-group' + topic="topic.aiops.wx" + + self.bootstrap_servers = bootstrap_servers + self.consumer_group = consumer_group + self.topic = topic + + self.producer = Producer({'bootstrap.servers': self.bootstrap_servers}) + self.consumer = Consumer({ + 'bootstrap.servers': self.bootstrap_servers, + 'group.id': self.consumer_group, + 'auto.offset.reset': 'earliest', + 'enable.auto.commit': False # 禁用自动提交,使用手动提交 + }) + + def delivery_report(self, err, msg): + """ + 回调函数,用于确认消息是否成功发送 + """ + if err is not None: + print(f"Message delivery failed: {err}") + else: + print(f"Message delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}") + + def produce_messages(self, messages): + """ + 发送消息 + """ + try: + for message in messages: + self.producer.produce(self.topic, value=message, callback=self.delivery_report) + print(f"Produced: {message}") + self.producer.poll(0) + except Exception as e: + print(f"An error occurred: {e}") + finally: + self.producer.flush() + + def produce_message(self, message): + """ + 发送消息 + """ + try: + self.producer.produce(self.topic, value=message, callback=self.delivery_report) + # print(f"Produced: {message}") + self.producer.poll(0) + except Exception as e: + print(f"An error occurred: {e}") + finally: + self.producer.flush() + + def consume_messages(self,process_callback, user_nickname): + """ + 消费消息并调用回调处理业务逻辑,只有当回调返回 True 时才提交偏移量 + :param process_callback: 业务逻辑回调函数,返回布尔值 + :param user_nickname: 用户昵称 + """ + self.consumer.subscribe([self.topic]) + + try: + while True: + msg = self.consumer.poll(0.3) + if msg is None: + continue + if msg.error(): + if msg.error().code() == KafkaError._PARTITION_EOF: + print(f"End of partition {msg.partition}, offset {msg.offset()}") + else: + raise KafkaException(msg.error()) + else: + # 调用业务处理逻辑 + # process_callback(msg.value().decode('utf-8')) + # 调用业务处理逻辑,传递 user_nickname 和消息 + if process_callback(user_nickname, msg.value().decode('utf-8')): + # 如果返回 True,表示处理成功,可以提交偏移量 + try: + self.consumer.commit(msg) + print(f"Manually committed offset: {msg.offset()}") + except KafkaException as e: + print(f"Error committing offset: {e}") + except KeyboardInterrupt: + print("消费中断") + finally: + self.consumer.close() + +# if __name__ == '__main__': +# kafka_client = KafkaClient(bootstrap_servers='localhost:9092', consumer_group='my-consumer-group', topic='my_topic') + +# # 生产消息 +# messages_to_produce = [f"Message {i}" for i in range(10)] +# kafka_client.produce_messages(messages_to_produce) + +# # 消费消息 +# kafka_client.consume_messages() + +def start(): + global kafka_client + kafka_client = KafkaClient() \ No newline at end of file diff --git a/common/mq_client.py b/common/mq_client.py new file mode 100644 index 0000000..1b9119b --- /dev/null +++ b/common/mq_client.py @@ -0,0 +1,97 @@ +""" +kafka客户端 +""" +import threading +from lib import itchat +from lib.itchat.content import * +from common.log import logger + +from bridge.context import Context, ContextType +from bridge.reply import Reply, ReplyType + +from confluent_kafka import Consumer, KafkaException +import json,time,re + + +class MessageQueueClient(): + def __init__(self, channel): + self.channel = channel + self.client_type = channel.channel_type + + def consume_messages(self, broker, group_id, topic,user_id,user_nickname): + # 配置消费者 + conf = { + 'bootstrap.servers': broker, + 'group.id': group_id, + 'auto.offset.reset': 'earliest' + } + + consumer = Consumer(conf) + + try: + # 订阅主题 + consumer.subscribe([topic]) + + print(f"开始消费主题 {topic} 的消息...") + while True: + # 拉取消息 + msg = consumer.poll(timeout=0.3) # 超时时间 1 秒 + if msg is None: + continue + if msg.error(): + # 处理 Kafka 异常 + if msg.error().code() == KafkaException._PARTITION_EOF: + print(f"分区末尾: {msg.topic()} [{msg.partition()}] {msg.offset()}") + else: + print(f"消费错误: {msg.error()}") + else: + # 打印消息 + # print(f"收到消息: {msg.value().decode('utf-8')} (主题: {msg.topic()}, 分区: {msg.partition()}, 偏移: {msg.offset()})") + msg_content= msg.value().decode('utf-8') + # content=json.loads(msg_content) + + cleaned_content = clean_json_string(msg_content) + content=json.loads(cleaned_content) + print(content["messageId"]) + print(content["data"]) + print(content["data"]["content"]) + + friends=itchat.get_friends(update=True)[1:] + # logger.info(friends) + # logger.info(f'好友列表{friends}') + # 提取所有好友的 NickName + friend_info = [{'NickName': friend['NickName'], 'UserName': friend['UserName']} for friend in friends] + content_text=content["data"]["content"] + # 打印好友信息 + for info in friend_info: + print(f"NickName: {info['NickName']}, UserName: {info['UserName']}") + # if info['NickName'] in ['王韦(: )~','何潮华','laih']: + if info['NickName'] in ['爱扣美丽顾问@乐华']: + itchat.send(content["data"]["content"], toUserName=info['UserName']) + logger.info(f"{user_nickname} 向 {info['NickName']} 发送【 {content_text} 】") + time.sleep(3) + # itchat.send(content["data"]["content"], toUserName=info['UserName']) + # logger.info(f"{user_nickname} 向 {info['NickName']} 发送 {content_text}") + # time.sleep(3) + + # itchat.send(content["data"]["content"], toUserName=receiver) + # 打印所有 NickName + # for nickname in nicknames: + # print(nickname) + except KeyboardInterrupt: + print("终止消费") + finally: + # 关闭消费者 + consumer.close() + + +def start(channel): + global mq_client + mq_client = MessageQueueClient(channel=channel) + user_id = itchat.instance.storageClass.userName + name = itchat.instance.storageClass.nickName + mq_client.consume_messages('47.116.67.214:9092', 'ai-test-group', 'topic.ai.test',user_id,name) + +def clean_json_string(json_str): + # 删除所有控制字符(非打印字符),包括换行符、回车符等 + return re.sub(r'[\x00-\x1f\x7f]', '', json_str) \ No newline at end of file diff --git a/common/platform_client.py b/common/platform_client.py new file mode 100644 index 0000000..eb1d065 --- /dev/null +++ b/common/platform_client.py @@ -0,0 +1,20 @@ +from bridge.context import Context, ContextType +from bridge.reply import Reply, ReplyType +from common.log import logger +from config import conf, pconf, plugin_config, available_setting +from plugins import PluginManager +import time + + +class PlatformClient(): + def __init__(self, channel): + self.channel = channel + self.client_type = channel.channel_type + + def on_message(self): + print('监听消息') + +def start(channel): + global platform_client + platform_client = PlatformClient(channel=channel) + print('平台客户端开始') \ No newline at end of file diff --git a/common/redis_helper.py b/common/redis_helper.py new file mode 100644 index 0000000..a9080aa --- /dev/null +++ b/common/redis_helper.py @@ -0,0 +1,51 @@ +import redis +import os +from config import conf +# 定义全局 redis_helper +redis_helper = None + +class RedisHelper: + def __init__(self, host='localhost', port=6379, password=None ,db=0): + # 初始化 Redis 连接 + self.client = redis.Redis(host=host, port=port,db=db,password=password) + + def set_hash(self, hash_key, data, timeout=None): + """添加或更新哈希,并设置有效期""" + self.client.hset(hash_key, mapping=data) + if timeout: + # 设置有效期(单位:秒) + self.client.expire(hash_key, timeout) + + def get_hash(self, hash_key): + """获取整个哈希表数据""" + result = self.client.hgetall(hash_key) + # 将字节数据解码成字符串格式返回 + return {k.decode('utf-8'): v.decode('utf-8') for k, v in result.items()} + + def get_hash_field(self, hash_key, field): + """获取哈希表中的单个字段值""" + result = self.client.hget(hash_key, field) + return result.decode('utf-8') if result else None + + def delete_hash(self, hash_key): + """删除整个哈希表""" + self.client.delete(hash_key) + + def delete_hash_field(self, hash_key, field): + """删除哈希表中的某个字段""" + self.client.hdel(hash_key, field) + + def update_hash_field(self, hash_key, field, value): + """更新哈希表中的某个字段""" + self.client.hset(hash_key, field, value) + +def start(): + global redis_helper + + host=conf().get("redis_host") + port=conf().get("redis_port") + password=conf().get("redis_password") + db=conf().get("redis_db") + redis_helper = RedisHelper(host=host,port=port,password=password,db=db) + + diff --git a/config-dev.json b/config-dev.json index 648181c..16f2d74 100644 --- a/config-dev.json +++ b/config-dev.json @@ -1,36 +1,32 @@ { "channel_type": "wx", "model": "7374349217580056592", - "open_ai_api_key": "sk-tPyolP9giSyBLhG6soygVf3rpFUFqdtqXobW1NkVnVps3QxGz2kWlhFuhZ7Zm4TQ", + "open_ai_api_key": "sk-tdi7u0zuLsR0JpPMGBeFZxymOpL0zoFVafX8EEEvEakIDAGQ22NyQ6w", "open_ai_api_base": "http://106.15.182.218:3000/api/v1", "claude_api_key": "YOUR API KEY", "proxy": "", "hot_reload": false, "debug": true, - "single_chat_reply_prefix": "[小蕴]", + "single_chat_reply_prefix": "", "group_chat_prefix": [ - "小蕴", - "@AI好蕴" + "zhushou" ], "group_name_white_list": [ - "AI好蕴测试群", - "AI好蕴测试群2", - "AI好蕴测试群3", - "AI好蕴孕妈服务测试群" ], "image_create_prefix": [ "画","识别","看" ], - "group_welcome_msg": "您好,我是AI好蕴健康顾问,非常高兴能够陪伴您一起踏上这一段美妙而重要的旅程。怀孕是生命中的一个特殊阶段,不仅承载着新生命的期许,也意味着您和家庭将迎来新的挑战和变化。在这个过程中,了解怀孕的意义、注意事项和如何进行科学的健康管理,对您和宝宝的健康至关重要。\n怀孕期的每一天都是新生命成长的过程,每一次胎动都让人感受到生命的奇迹。对于准妈妈来说,这是一段与宝宝建立深厚联系的时期。与此同时,这段时间也会让您对生活、家庭和未来有更深层次的认识和规划。\n怀孕不仅是生理上的变化,更是心理和情感上的一次洗礼。一个健康乐观的妈妈才能诞生一个阳光天使宝宝!\n通过科学的健康管理和正确的生活方式,您可以为自己和宝宝创造一个健康、安全的环境。我们专门给您设立了独立的服务支撑保证体系,包括各个方面的专家将为您提供贴身呵护和陪伴,为您提供专业的指导和支持,愿您度过一个平安、健康且愉快的孕期。\n如果你有任何健康问题咨询,可@我或输入“小蕴”,呼唤我为你服务!\n 【如果你有任何健康问题咨询,可@我、或语音“小蕴”呼唤我为你服务!】 \n祝您怀孕顺利,宝宝健康成长!", + "group_welcome_msg": "", "trigger_by_self": true, + "voice_to_text":"ali", "speech_recognition": true, - "group_speech_recognition": false, + "group_speech_recognition": true, "voice_reply_voice": false, "conversation_max_tokens": 2500, "expires_in_seconds": 300, - "character_desc": "you are professional doctor", + "character_desc": "", "temperature": 0.9, - "subscribe_msg": "感谢您的关注!\n这里是AI智能助手,可以自由对话。\n支持语音对话。\n支持图片输入。\n支持图片输出,画字开头的消息将按要求创作图片。\n支持tool、角色扮演和文字冒险等丰富的插件。\n输入{trigger_prefix}#help 查看详细指令。", + "subscribe_msg": "", "use_linkai": false, "linkai_api_key": "", "linkai_app_code": "" diff --git a/config-test.json b/config-test.json index 13c9310..5f872ae 100644 --- a/config-test.json +++ b/config-test.json @@ -1,30 +1,22 @@ { "channel_type": "wx", "model": "7374349217580056592", - "open_ai_api_key": "sk-flDQg2UT0fYZZsKtmIXrRUhLySpSOgWedQof6Vw2iYB0la2iF44AD", + "open_ai_api_key": "sk-jr69ONIehfGKe9JFphuNk4DU5Y5wooHKHhQv7oSnFzVbwCnW65fXO9kvH", "open_ai_api_base": "http://106.15.182.218:3000/api/v1", "claude_api_key": "YOUR API KEY", "proxy": "", - "hot_reload": false, + "hot_reload": true, "debug": false, - "single_chat_reply_prefix": "[小蕴]", + "single_chat_reply_prefix": "", "group_chat_prefix": [ - "小蕴", - "@AI好蕴", - "xiaoyun", - "xiaoyin", - "xiaoying", - "xiaoyue", - "xiaoyuan", - "xiaoxin" + "zhushou" ], "group_name_white_list": [ - "AI好蕴测试群3" ], "image_create_prefix": [ "画","识别","看" ], - "group_welcome_msg": "您好,我是AI好蕴健康顾问,非常高兴能够陪伴您一起踏上这一段美妙而重要的旅程。怀孕是生命中的一个特殊阶段,不仅承载着新生命的期许,也意味着您和家庭将迎来新的挑战和变化。在这个过程中,了解怀孕的意义、注意事项和如何进行科学的健康管理,对您和宝宝的健康至关重要。\n怀孕期的每一天都是新生命成长的过程,每一次胎动都让人感受到生命的奇迹。对于准妈妈来说,这是一段与宝宝建立深厚联系的时期。与此同时,这段时间也会让您对生活、家庭和未来有更深层次的认识和规划。\n怀孕不仅是生理上的变化,更是心理和情感上的一次洗礼。一个健康乐观的妈妈才能诞生一个阳光天使宝宝!\n通过科学的健康管理和正确的生活方式,您可以为自己和宝宝创造一个健康、安全的环境。我们专门给您设立了独立的服务支撑保证体系,包括各个方面的专家将为您提供贴身呵护和陪伴,为您提供专业的指导和支持,愿您度过一个平安、健康且愉快的孕期。\n如果你有任何健康问题咨询,可@我或输入“小蕴”,呼唤我为你服务!\n 【如果你有任何健康问题咨询,可@我、或语音“小蕴”呼唤我为你服务!】 \n祝您怀孕顺利,宝宝健康成长!", + "group_welcome_msg": "", "trigger_by_self": true, "voice_to_text":"ali", "speech_recognition": true, @@ -32,10 +24,19 @@ "voice_reply_voice": false, "conversation_max_tokens": 2500, "expires_in_seconds": 300, - "character_desc": "you are professional doctor", + "character_desc": "", "temperature": 0.9, - "subscribe_msg": "感谢您的关注!\n这里是AI智能助手,可以自由对话。\n支持语音对话。\n支持图片输入。\n支持图片输出,画字开头的消息将按要求创作图片。\n支持tool、角色扮演和文字冒险等丰富的插件。\n输入{trigger_prefix}#help 查看详细指令。", + "subscribe_msg": "", "use_linkai": false, "linkai_api_key": "", - "linkai_app_code": "" + "linkai_app_code": "", + + "redis_host":"47.116.142.20", + "redis_port":8090, + "redis_password":"telpo#1234", + "redis_db":3, + + "kafka_bootstrap_servers":"172.19.42.53:9092", + + "aiops_api":"https://id.ssjlai.com/aiopsadmin" } diff --git a/config.json b/config.json index 4e7d9f2..9f15c8f 100644 --- a/config.json +++ b/config.json @@ -1,16 +1,15 @@ { "channel_type": "wx", "model": "7374349217580056592", - "open_ai_api_key": "sk-tPyolP9giSyBLhG6soygVf3rpFUFqdtqXobW1NkVnVps3QxGz2kWlhFuhZ7Zm4TQ", + "open_ai_api_key": "sk-tdi7u0zuLsR0JpPMGBeFZxymOpL0zoFVafX8EEEvEakIDAGQ22NyQ6w", "open_ai_api_base": "http://106.15.182.218:3000/api/v1", "claude_api_key": "YOUR API KEY", - "proxy": "", + "proxy": "", "hot_reload": false, "debug": false, - "single_chat_reply_prefix": "[小蕴]", + "single_chat_reply_prefix": "", "group_chat_prefix": [ - "小蕴", - "@AI好蕴" + "zhushou" ], "group_name_white_list": [ "AI好蕴测试群3" @@ -18,7 +17,7 @@ "image_create_prefix": [ "画","识别","看" ], - "group_welcome_msg": "您好,我是AI好蕴健康顾问,非常高兴能够陪伴您一起踏上这一段美妙而重要的旅程。怀孕是生命中的一个特殊阶段,不仅承载着新生命的期许,也意味着您和家庭将迎来新的挑战和变化。在这个过程中,了解怀孕的意义、注意事项和如何进行科学的健康管理,对您和宝宝的健康至关重要。\n怀孕期的每一天都是新生命成长的过程,每一次胎动都让人感受到生命的奇迹。对于准妈妈来说,这是一段与宝宝建立深厚联系的时期。与此同时,这段时间也会让您对生活、家庭和未来有更深层次的认识和规划。\n怀孕不仅是生理上的变化,更是心理和情感上的一次洗礼。一个健康乐观的妈妈才能诞生一个阳光天使宝宝!\n通过科学的健康管理和正确的生活方式,您可以为自己和宝宝创造一个健康、安全的环境。我们专门给您设立了独立的服务支撑保证体系,包括各个方面的专家将为您提供贴身呵护和陪伴,为您提供专业的指导和支持,愿您度过一个平安、健康且愉快的孕期。\n如果你有任何健康问题咨询,可@我或输入“小蕴”,呼唤我为你服务!\n 【如果你有任何健康问题咨询,可@我、或语音“小蕴”呼唤我为你服务!】 \n祝您怀孕顺利,宝宝健康成长!", + "group_welcome_msg": "", "trigger_by_self": true, "voice_to_text":"ali", "speech_recognition": true, @@ -26,10 +25,21 @@ "voice_reply_voice": false, "conversation_max_tokens": 2500, "expires_in_seconds": 300, - "character_desc": "you are professional doctor", + "character_desc": "", "temperature": 0.9, - "subscribe_msg": "感谢您的关注!\n这里是AI智能助手,可以自由对话。\n支持语音对话。\n支持图片输入。\n支持图片输出,画字开头的消息将按要求创作图片。\n支持tool、角色扮演和文字冒险等丰富的插件。\n输入{trigger_prefix}#help 查看详细指令。", + "subscribe_msg": "", "use_linkai": false, "linkai_api_key": "", - "linkai_app_code": "" + "linkai_app_code": "", + + + "redis_host":"192.168.2.121", + "redis_port":8090, + "redis_password":"telpo#1234", + "redis_db":3, + + "kafka_bootstrap_servers":"192.168.2.121:9092", + + "aiops_api":"https://id.ssjlai.com/aiopsadmin" + } diff --git a/config.py b/config.py index b09482d..5cf215c 100644 --- a/config.py +++ b/config.py @@ -179,6 +179,15 @@ available_setting = { "Minimax_api_key": "", "Minimax_group_id": "", "Minimax_base_url": "", + #redis 配置 + "redis_host":"", + "redis_port":0, + "redis_password":"", + "redis_db":0, + # kafka配置 + "kafka_bootstrap_servers":"", + # aiops平台 + "aiops_api":"" } diff --git a/plugins/healthai/healthai.py b/plugins/healthai/healthai.py index b5506c6..d4779c1 100644 --- a/plugins/healthai/healthai.py +++ b/plugins/healthai/healthai.py @@ -188,7 +188,7 @@ class healthai(Plugin): text=self.params_cache[user_id]['previous_prompt'] logger.info(f'{text},{contains_keywords(text)}') - itchat_content= f'@{msg.actual_user_nickname}' if e_context['context']['isgroup'] else '[小蕴]' + itchat_content= f'@{msg.actual_user_nickname}' if e_context['context']['isgroup'] else '' itchat_content+="已经收到,立刻为您服务" flag=contains_keywords(text) if flag==True: @@ -263,7 +263,7 @@ class healthai(Plugin): if previous_prompt and last_content and contains_keywords(previous_prompt): logger.info('先回应') receiver = user_id - itchat_content = f'@{msg.actual_user_nickname}' if is_group else '[小蕴]' + itchat_content = f'@{msg.actual_user_nickname}' if is_group else '' itchat_content += "已经收到,立刻为您服务" if contains_keywords(previous_prompt): diff --git a/plugins/healthai/requirements.txt b/plugins/healthai/requirements.txt index 999d9c0..98454d5 100644 --- a/plugins/healthai/requirements.txt +++ b/plugins/healthai/requirements.txt @@ -7,3 +7,5 @@ python-pptx Pillow oss2 pypinyin +confluent-kafka +kafka-python \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 323adc1..bc82725 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,4 +9,5 @@ pre-commit web.py linkai>=0.0.6.0 pypng -pypinyin \ No newline at end of file +pypinyin +redis \ No newline at end of file