Browse Source

文件处理

develop
H Vs 5 days ago
parent
commit
f1cd44bca1
15 changed files with 836 additions and 41 deletions
  1. +22
    -0
      app.py
  2. +2
    -1
      bot/chatgpt/chat_gpt_bot.py
  3. +475
    -0
      channel/wechat/wechat_channel.py
  4. +1
    -0
      channel/wechat/wechat_message.py
  5. +109
    -0
      common/kafka_helper.py
  6. +97
    -0
      common/mq_client.py
  7. +20
    -0
      common/platform_client.py
  8. +51
    -0
      common/redis_helper.py
  9. +8
    -12
      config-dev.json
  10. +17
    -16
      config-test.json
  11. +19
    -9
      config.json
  12. +9
    -0
      config.py
  13. +2
    -2
      plugins/healthai/healthai.py
  14. +2
    -0
      plugins/healthai/requirements.txt
  15. +2
    -1
      requirements.txt

+ 22
- 0
app.py View File

@@ -37,6 +37,26 @@ def start_channel(channel_name: str):
threading.Thread(target=linkai_client.start, args=(channel,)).start()
except Exception as e:
pass

# try:
# if channel_name in ['wx']:
# from common import platform_client
# threading.Thread(target=platform_client.start, args=(channel,)).start()
# except Exception as e:
# pass

try:
from common import redis_helper
threading.Thread(target=redis_helper.start).start()
except Exception as e:
pass

try:
from common import kafka_helper
threading.Thread(target=kafka_helper.start).start()
except Exception as e:
pass

channel.startup()


@@ -60,6 +80,8 @@ def run():

start_channel(channel_name)

while True:
time.sleep(1)
except Exception as e:


+ 2
- 1
bot/chatgpt/chat_gpt_bot.py View File

@@ -133,10 +133,11 @@ class ChatGPTBot(Bot, OpenAIImage):
# logger.info("[CHATGPT] 响应={}".format(response))
logger.info("[CHATGPT] 响应={}".format(json.dumps(response, separators=(',', ':'),ensure_ascii=False)))
# logger.info("[ChatGPT] reply={}, total_tokens={}".format(response.choices[0]['message']['content'], response["usage"]["total_tokens"]))
content=response.choices[0]["message"]["content"]
return {
"total_tokens": response["usage"]["total_tokens"],
"completion_tokens": response["usage"]["completion_tokens"],
"content": response.choices[0]["message"]["content"],
"content": content.lstrip("\n"),
}
except Exception as e:
need_retry = retry_count < 2


+ 475
- 0
channel/wechat/wechat_channel.py View File

@@ -24,6 +24,21 @@ from common.utils import convert_webp_to_png
from config import conf, get_appdata_dir
from lib import itchat
from lib.itchat.content import *
from urllib.parse import urlparse


import threading

from common import kafka_helper, redis_helper

from confluent_kafka import Consumer, KafkaException
import json,time,re
import pickle
from datetime import datetime
import oss2


# from common.kafka_client import KafkaClient


@itchat.msg_register([TEXT, VOICE, PICTURE, NOTE, ATTACHMENT, SHARING])
@@ -118,6 +133,9 @@ class WechatChannel(ChatChannel):
# login by scan QRCode
hotReload = conf().get("hot_reload", False)
status_path = os.path.join(get_appdata_dir(), "itchat.pkl")
# with open(status_path, 'rb') as file:
# data = pickle.load(file)
# logger.info(data)
itchat.auto_login(
enableCmdQR=2,
hotReload=hotReload,
@@ -129,15 +147,51 @@ class WechatChannel(ChatChannel):
self.user_id = itchat.instance.storageClass.userName
self.name = itchat.instance.storageClass.nickName
logger.info("Wechat login success, user_id: {}, nickname: {}".format(self.user_id, self.name))

# 创建一个线程来运行 consume_messages
# kafka_thread = threading.Thread(target=consume_messages, args=('47.116.67.214:9092', 'ai-test-group', topic,self.name))
# kafka_client=KafkaClient()
# kafka_thread = threading.Thread(target=consume_wx_messages, args=('47.116.67.214:9092', 'ai-test-group', topic,self.name))
kafka_thread = threading.Thread(target=kafka_helper.kafka_client.consume_messages, args=(wx_messages_process_callback, self.name))

kafka_thread.start()
logger.info("启动kafka")

# 好友定时同步
agent_nickname=self.name
friend_thread =threading.Thread(target=hourly_change_save_friends, args=(agent_nickname,))
friend_thread.start()

# 立刻同步
agent_info=fetch_agent_info(agent_nickname)
agent_tel=agent_info.get("agent_tel",None)
# friends=itchat.get_contact(update=True)[1:]
friends=itchat.get_friends(update=True)[1:]
# logger.info("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
# logger.info(friends)
# logger.info("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
save_friends_to_redis(agent_tel,agent_nickname, friends)
logger.info("启动好友同步")
# start message listener

logger.info("启动itchat")
itchat.run()
# 运行kafka
#

except Exception as e:
logger.exception(e)

def exitCallback(self):
print('主动退出')
try:
from common.linkai_client import chat_client
if chat_client.client_id and conf().get("use_linkai"):
print('退出')
_send_logout()
time.sleep(2)
self.auto_login_times += 1
@@ -149,6 +203,8 @@ class WechatChannel(ChatChannel):

def loginCallback(self):
logger.debug("Login success")
print('登录成功')
# 同步
_send_login_success()

# handle_* 系列函数处理收到的消息后构造Context,然后传入produce函数中处理Context和发送回复
@@ -174,10 +230,47 @@ class WechatChannel(ChatChannel):
logger.debug("[WX]receive voice msg: {}".format(cmsg.content))
elif cmsg.ctype == ContextType.IMAGE:
logger.debug("[WX]receive image msg: {}".format(cmsg.content))
# print(cmsg.content)
file_path = cmsg.content
logger.info(f"on_handle_context: 获取到图片路径 {file_path}")
oss_access_key_id="LTAI5tRTG6pLhTpKACJYoPR5"
oss_access_key_secret="E7dMzeeMxq4VQvLg7Tq7uKf3XWpYfN"
oss_endpoint="http://oss-cn-shanghai.aliyuncs.com"
oss_bucket_name="cow-agent"
oss_image_url = upload_oss(oss_access_key_id, oss_access_key_secret, oss_endpoint, oss_bucket_name, file_path, f'cow/{os.path.basename(file_path)}')
print(f"oss_image_url:{oss_image_url}")
input_content = oss_image_url
input_from_user_nickname = cmsg.from_user_nickname
input_to_user_nickname = cmsg.to_user_nickname

input_wx_content_dialogue_message=[{"type": "image_url", "image_url": {"url": input_content}}]
input_message=dialogue_message(input_from_user_nickname,input_to_user_nickname,input_wx_content_dialogue_message)
kafka_helper.kafka_client.produce_message(input_message)
logger.info("发送对话 %s",input_message)

elif cmsg.ctype == ContextType.PATPAT:
logger.debug("[WX]receive patpat msg: {}".format(cmsg.content))
elif cmsg.ctype == ContextType.TEXT:
logger.debug("[WX]receive text msg: {}, cmsg={}".format(json.dumps(cmsg._rawmsg, ensure_ascii=False), cmsg))
# content = cmsg.content # 消息内容
# from_user_nickname = cmsg.from_user_nickname # 发送方昵称
# to_user_nickname = cmsg.to_user_nickname # 接收方昵称

# wx_content_dialogue_message=[{"type": "text", "text": content}]
# message=dialogue_message(from_user_nickname,to_user_nickname,wx_content_dialogue_message)
# kafka_helper.kafka_client.produce_message(message)
# logger.info("发送对话 %s", json.dumps(message, ensure_ascii=False))

input_content = cmsg.content
input_from_user_nickname = cmsg.from_user_nickname
input_to_user_nickname = cmsg.to_user_nickname

input_wx_content_dialogue_message=[{"type": "text", "text": input_content}]
input_message=dialogue_message(input_from_user_nickname,input_to_user_nickname,input_wx_content_dialogue_message)
kafka_helper.kafka_client.produce_message(input_message)
logger.info("发送对话 %s",input_message)


else:
logger.debug("[WX]receive msg: {}, cmsg={}".format(cmsg.content, cmsg))
context = self._compose_context(cmsg.ctype, cmsg.content, isgroup=False, msg=cmsg)
@@ -212,6 +305,36 @@ class WechatChannel(ChatChannel):
if reply.type == ReplyType.TEXT:
itchat.send(reply.content, toUserName=receiver)
logger.info("[WX] sendMsg={}, receiver={}".format(reply, receiver))
# logger.info(context)
# logger.info(context["msg"])
# // 发送kafka
# msg=context["msg"]
msg: ChatMessage = context["msg"]
# content=msg["content"]
is_group=msg.is_group
if not is_group:
# print(f'响应:{content}')
# 用户输入
# input_content=msg.content
# input_from_user_nickname=msg.from_user_nickname
# input_to_user_nickname=msg.to_user_nickname

# input_wx_content_dialogue_message=[{"type": "text", "text": input_content}]
# input_message=dialogue_message(input_from_user_nickname,input_to_user_nickname,input_wx_content_dialogue_message)
# kafka_helper.kafka_client.produce_message(input_message)
# logger.info("发送对话 %s", json.dumps(input_message, separators=(',', ':'), ensure_ascii=False))

# 响应用户
output_content=reply.content
output_from_user_nickname=msg.to_user_nickname # 回复翻转
output_to_user_nickname=msg.from_user_nickname # 回复翻转

output_wx_content_dialogue_message=[{"type": "text", "text": output_content}]
output_message=dialogue_message(output_from_user_nickname,output_to_user_nickname,output_wx_content_dialogue_message)
kafka_helper.kafka_client.produce_message(output_message)
logger.info("发送对话 %s", output_message)
elif reply.type == ReplyType.ERROR or reply.type == ReplyType.INFO:
itchat.send(reply.content, toUserName=receiver)
logger.info("[WX] sendMsg={}, receiver={}".format(reply, receiver))
@@ -246,6 +369,36 @@ class WechatChannel(ChatChannel):
file_storage = reply.content
itchat.send_file(file_storage, toUserName=receiver)
logger.info("[WX] sendFile, receiver={}".format(receiver))


# msg: ChatMessage = context["msg"]
# # content=msg["content"]
# is_group=msg.is_group
# if not is_group:
# # print(f'响应:{content}')
# # 用户输入
# # input_content=msg.content
# # input_from_user_nickname=msg.from_user_nickname
# # input_to_user_nickname=msg.to_user_nickname

# # input_wx_content_dialogue_message=[{"type": "text", "text": input_content}]
# # input_message=dialogue_message(input_from_user_nickname,input_to_user_nickname,input_wx_content_dialogue_message)
# # kafka_helper.kafka_client.produce_message(input_message)
# # logger.info("发送对话 %s", json.dumps(input_message, separators=(',', ':'), ensure_ascii=False))

# # 响应用户
# output_content=reply.content
# output_from_user_nickname=msg.to_user_nickname # 回复翻转
# output_to_user_nickname=msg.from_user_nickname # 回复翻转

# output_wx_content_dialogue_message=[{"type": "file", "text": output_content}]
# output_message=dialogue_message(output_from_user_nickname,output_to_user_nickname,output_wx_content_dialogue_message)
# kafka_helper.kafka_client.produce_message(output_message)
# logger.info("发送对话 %s", output_message)

elif reply.type == ReplyType.VIDEO: # 新增视频回复类型
video_storage = reply.content
itchat.send_video(video_storage, toUserName=receiver)
@@ -269,6 +422,7 @@ def _send_login_success():
from common.linkai_client import chat_client
if chat_client.client_id:
chat_client.send_login_success()

except Exception as e:
pass

@@ -290,3 +444,324 @@ def _send_qr_code(qrcode_list: list):
except Exception as e:
pass


def clean_json_string(json_str):

# 删除所有控制字符(非打印字符),包括换行符、回车符等
return re.sub(r'[\x00-\x1f\x7f]', '', json_str)

def save_friends_to_redis(agent_tel,agent_nickname, friends):
contact_list = []
for friend in friends:
friend_data = {
"UserName": friend.UserName,
"NickName": friend.NickName,
"Signature": friend.Signature,
"Province": friend.Province,
"City": friend.City,
"Sex": str(friend.Sex), # 性别可转换为字符串存储
"Alias": friend.Alias
}
contact_list.append(friend_data) # 将每个朋友的信息加入到列表中

agent_contact_list = {
"AgentTel":agent_tel,
"agent_nick_name": agent_nickname,
"contact_list": contact_list # 将朋友列表添加到字典中
}
# 将联系人信息保存到 Redis,使用一个合适的 key
hash_key = f"__AI_OPS_WX__:CONTACTLIST"
redis_helper.redis_helper.update_hash_field(hash_key, agent_tel, json.dumps(agent_contact_list, ensure_ascii=False)) # 设置有效期为 600 秒

def hourly_change_save_friends(agent_nickname):
last_hour = datetime.now().hour # 获取当前小时
while True:
current_hour = datetime.now().hour
if current_hour != last_hour: # 检测小时是否变化
friends=itchat.get_friends(update=True)[1:]

agent_info=fetch_agent_info(agent_nickname)
agent_tel=agent_info.get("agent_tel",None)
save_friends_to_redis(agent_tel,agent_nickname, friends)
last_hour = current_hour
time.sleep(1) # 每秒检查一次

def wx_messages_process_callback(user_nickname,message):
"""
处理消费到的 Kafka 消息(基础示例)
:param message: Kafka 消费到的消息内容
"""
# print(user_nickname)
# print(f"Processing message: {message}")
# return True

msg_content= message
cleaned_content = clean_json_string(msg_content)
content=json.loads(cleaned_content)
data = content.get("data", {})
msg_type_data=data.get("msg_type",None)
content_data = data.get("content",{})
agent_nickname_data=content_data.get("agent_nickname",None)
agent_tel=content_data.get("agent_tel",None)

if user_nickname == agent_nickname_data and msg_type_data=='group-sending':
friends=itchat.get_friends(update=True)[1:]
contact_list_content_data=content_data.get("contact_list",None)

# 更新好友缓存
save_friends_to_redis(agent_tel,agent_nickname_data,friends)

# 遍历要群发的好友
for contact in contact_list_content_data:
nickname = contact.get("nickname",None)
if(nickname not in [friend['NickName'] for friend in friends]):
logger.warning(f'微信中没有 {nickname} 的昵称,将不会发送消息')

for friend in friends:
if friend.get("NickName",None) == nickname:
wx_content_list=content_data.get("wx_content",[])
for wx_content in wx_content_list:
# 处理文件
if wx_content.get("type",None) == 'text':
wx_content_text=wx_content.get("text",None)
itchat.send(wx_content_text, toUserName=friend.get("UserName",None))
logger.info(f"{user_nickname} 向 {nickname} 发送文字【 {wx_content_text} 】")

# // 发送kafka
wx_content_dialogue_message=[{"type": "text", "text": wx_content_text}]
message=dialogue_message(agent_nickname_data,friend.get("NickName",None),wx_content_dialogue_message)
kafka_helper.kafka_client.produce_message(message)
logger.info("发送对话 %s",message)

time.sleep(3)
# 处理图片
elif wx_content.get("type",None) == 'image_url':
print('发送图片')
image_url= wx_content.get("image_url",{})
url=image_url.get("url",None)
# 网络图片
logger.debug(f"[WX] start download image, img_url={url}")
pic_res = requests.get(url, stream=True)
image_storage = io.BytesIO()
size = 0
for block in pic_res.iter_content(1024):
size += len(block)
image_storage.write(block)
logger.info(f"[WX] download image success, size={size}, img_url={url}")
image_storage.seek(0)
if ".webp" in url:
try:
image_storage = convert_webp_to_png(image_storage)
except Exception as e:
logger.error(f"Failed to convert image: {e}")
return

itchat.send_image(image_storage, toUserName=friend.get("UserName",None))
logger.info(f"{user_nickname} 向 {nickname} 发送图片【 {url} 】")
# // 发送kafka
wx_content_dialogue_message=[{"type": "image_url", "image_url": {"url": url}}]
message=dialogue_message(agent_nickname_data,friend.get("NickName",None),wx_content_dialogue_message)
kafka_helper.kafka_client.produce_message(message)
logger.info("发送对话 %s",message)
time.sleep(3)
#处理文件
elif wx_content.get("type",None) == 'file':
print('处理文件')
file_url= wx_content.get("file_url",{})
url=file_url.get("url",None)

# 提取路径部分
parsed_url = urlparse(url).path

# 获取文件名和扩展名
filename = os.path.basename(parsed_url) # 文件名(包含扩展名)
name, ext = os.path.splitext(filename) # 分离文件名和扩展名
if ext in ['.pdf']:
print('处理PDF文件')

tmp_file_path=save_to_local_from_url(url)

itchat.send_file(tmp_file_path, toUserName=friend.get("UserName",None))
logger.info(f'删除本地{ext}文件: {tmp_file_path}')
os.remove(tmp_file_path)
logger.info(f"{user_nickname} 向 {nickname} 发送 {ext} 文件【 {url} 】")
# // 发送kafka
wx_content_dialogue_message=[{"type": "file", "file_url": {"url": url}}]
message=dialogue_message(agent_nickname_data,friend.get("NickName",None),wx_content_dialogue_message)
kafka_helper.kafka_client.produce_message(message)
logger.info("发送对话 %s",message)
time.sleep(3)

elif ext in ['.mp4']:

print('处理MP4文件')
tmp_file_path=save_to_local_from_url(url)
itchat.send_file(tmp_file_path, toUserName=friend.get("UserName",None))
logger.info(f'删除本地{ext}文件: {tmp_file_path}')
os.remove(tmp_file_path)
logger.info(f"{user_nickname} 向 {nickname} 发送 {ext} 文件【 {url} 】")
# // 发送kafka
wx_content_dialogue_message=[{"type": "file", "file_url": {"url": url}}]
message=dialogue_message(agent_nickname_data,friend.get("NickName",None),wx_content_dialogue_message)
kafka_helper.kafka_client.produce_message(message)
logger.info("发送对话 %s",message)
time.sleep(3)
else:
logger.warning(f'暂不支持 {ext} 文件的处理')
return True
else:
return False

def dialogue_message(nickname_from,nickname_to,wx_content):
"""
构造消息的 JSON 数据
:param contents: list,包含多个消息内容,每个内容为字典,如:
[{"type": "text", "text": "AAAAAAA"},
{"type": "image_url", "image_url": {"url": "https://AAAAA.jpg"}},
{"type":"file","file_url":{"url":"https://AAAAA.pdf"}}
]
:return: JSON 字符串
"""

# 获取当前时间戳,精确到毫秒
current_timestamp = int(time.time() * 1000)

# 获取当前时间,格式化为 "YYYY-MM-DD HH:MM:SS"
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# 构造 JSON 数据
data = {
"messageId": str(current_timestamp),
"topic": "topic.aiops.wx",
"time": current_time,
"data": {
"msg_type": "dialogue",
"content": {
"nickname_from": nickname_from,
"nickname_to": nickname_to,
"wx_content":wx_content
}
}
}

return json.dumps(data, separators=(',', ':'), ensure_ascii=False)

def fetch_agent_info(agent_nickname):

if os.environ.get('environment', 'default')=='default':
return {
"agent_nickname": agent_nickname,
"agent_tel": "19200137635"
}

aiops_api=conf().get("aiops_api")
# 定义请求URL
url = f"{aiops_api}/business/Agent/smartinfobyname"
# 定义请求头
headers = {
"accept": "*/*",
"Content-Type": "application/json"
}
# 定义请求数据
data = {
"smartName": agent_nickname
}
try:
# 发送POST请求
response = requests.post(url, headers=headers, data=json.dumps(data))
# 确认响应状态码
if response.status_code == 200:
response_data = response.json()
if response_data.get("code") == 200:
# 提取 smartName 和 smartPhone
data = response_data.get("data", {})
return {
"agent_nickname": data.get("smartName"),
"agent_tel": data.get("smartPhone")
}
else:
logger.error(f"API 返回错误信息: {response_data.get('msg')}")
return None
else:
logger.error(f"请求失败,状态码:{response.status_code}")
return None
except Exception as e:
logger.error(f"请求出错: {e}")
return None


def save_to_local_from_url(url):
'''
从url保存到本地tmp目录
'''

parsed_url = urlparse(url)
# 从 URL 提取文件名
filename = os.path.basename(parsed_url.path)
# tmp_dir = os.path(__file__) # 获取系统临时目录
# print(tmp_dir)
tmp_file_path = os.path.join(os.getcwd(),'tmp', filename) # 拼接完整路径

# 检查是否存在同名文件
if os.path.exists(tmp_file_path):
logger.info(f"文件已存在,将覆盖:{tmp_file_path}")

# 下载文件并保存到临时目录
response = requests.get(url, stream=True)
with open(tmp_file_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=1024):
if chunk: # 检查是否有内容
f.write(chunk)

return tmp_file_path

def upload_oss(access_key_id, access_key_secret, endpoint, bucket_name, local_file_path, oss_file_name, expiration_days=7):
"""
上传文件到阿里云OSS并设置生命周期规则,同时返回文件的公共访问地址。
:param access_key_id: 阿里云AccessKey ID
:param access_key_secret: 阿里云AccessKey Secret
:param endpoint: OSS区域对应的Endpoint
:param bucket_name: OSS中的Bucket名称
:param local_file_path: 本地文件路径
:param oss_file_name: OSS中的文件存储路径
:param expiration_days: 文件保存天数,默认7天后删除
:return: 文件的公共访问地址
"""
# 创建Bucket实例
auth = oss2.Auth(access_key_id, access_key_secret)
bucket = oss2.Bucket(auth, endpoint, bucket_name)

### 1. 设置生命周期规则 ###
rule_id = f'delete_after_{expiration_days}_days' # 规则ID
prefix = oss_file_name.split('/')[0] + '/' # 设置规则应用的前缀为文件所在目录

# 定义生命周期规则
rule = oss2.models.LifecycleRule(rule_id, prefix, status=oss2.models.LifecycleRule.ENABLED,
expiration=oss2.models.LifecycleExpiration(days=expiration_days))

# 设置Bucket的生命周期
lifecycle = oss2.models.BucketLifecycle([rule])
bucket.put_bucket_lifecycle(lifecycle)

print(f"已设置生命周期规则:文件将在{expiration_days}天后自动删除")

### 2. 上传文件到OSS ###
bucket.put_object_from_file(oss_file_name, local_file_path)

### 3. 构建公共访问URL ###
file_url = f"http://{bucket_name}.{endpoint.replace('http://', '')}/{oss_file_name}"

print(f"文件上传成功,公共访问地址:{file_url}")
return file_url

+ 1
- 0
channel/wechat/wechat_message.py View File

@@ -10,6 +10,7 @@ from lib.itchat.content import *
class WechatMessage(ChatMessage):
def __init__(self, itchat_msg, is_group=False):
super().__init__(itchat_msg)
# print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
self.msg_id = itchat_msg["MsgId"]
self.create_time = itchat_msg["CreateTime"]
self.is_group = is_group


+ 109
- 0
common/kafka_helper.py View File

@@ -0,0 +1,109 @@
from confluent_kafka import Producer, Consumer, KafkaException, KafkaError
import os
from common.singleton import singleton
from config import conf
# 定义全局 redis_helper
kafka_client = None

class KafkaClient:
def __init__(self):
bootstrap_servers=conf().get("kafka_bootstrap_servers")
consumer_group='aiops-wx-group'
topic="topic.aiops.wx"

self.bootstrap_servers = bootstrap_servers
self.consumer_group = consumer_group
self.topic = topic
self.producer = Producer({'bootstrap.servers': self.bootstrap_servers})
self.consumer = Consumer({
'bootstrap.servers': self.bootstrap_servers,
'group.id': self.consumer_group,
'auto.offset.reset': 'earliest',
'enable.auto.commit': False # 禁用自动提交,使用手动提交
})

def delivery_report(self, err, msg):
"""
回调函数,用于确认消息是否成功发送
"""
if err is not None:
print(f"Message delivery failed: {err}")
else:
print(f"Message delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}")

def produce_messages(self, messages):
"""
发送消息
"""
try:
for message in messages:
self.producer.produce(self.topic, value=message, callback=self.delivery_report)
print(f"Produced: {message}")
self.producer.poll(0)
except Exception as e:
print(f"An error occurred: {e}")
finally:
self.producer.flush()

def produce_message(self, message):
"""
发送消息
"""
try:
self.producer.produce(self.topic, value=message, callback=self.delivery_report)
# print(f"Produced: {message}")
self.producer.poll(0)
except Exception as e:
print(f"An error occurred: {e}")
finally:
self.producer.flush()

def consume_messages(self,process_callback, user_nickname):
"""
消费消息并调用回调处理业务逻辑,只有当回调返回 True 时才提交偏移量
:param process_callback: 业务逻辑回调函数,返回布尔值
:param user_nickname: 用户昵称
"""
self.consumer.subscribe([self.topic])
try:
while True:
msg = self.consumer.poll(0.3)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
print(f"End of partition {msg.partition}, offset {msg.offset()}")
else:
raise KafkaException(msg.error())
else:
# 调用业务处理逻辑
# process_callback(msg.value().decode('utf-8'))
# 调用业务处理逻辑,传递 user_nickname 和消息
if process_callback(user_nickname, msg.value().decode('utf-8')):
# 如果返回 True,表示处理成功,可以提交偏移量
try:
self.consumer.commit(msg)
print(f"Manually committed offset: {msg.offset()}")
except KafkaException as e:
print(f"Error committing offset: {e}")
except KeyboardInterrupt:
print("消费中断")
finally:
self.consumer.close()

# if __name__ == '__main__':
# kafka_client = KafkaClient(bootstrap_servers='localhost:9092', consumer_group='my-consumer-group', topic='my_topic')
# # 生产消息
# messages_to_produce = [f"Message {i}" for i in range(10)]
# kafka_client.produce_messages(messages_to_produce)
# # 消费消息
# kafka_client.consume_messages()

def start():
global kafka_client
kafka_client = KafkaClient()

+ 97
- 0
common/mq_client.py View File

@@ -0,0 +1,97 @@
"""
kafka客户端
"""
import threading
from lib import itchat
from lib.itchat.content import *
from common.log import logger

from bridge.context import Context, ContextType
from bridge.reply import Reply, ReplyType

from confluent_kafka import Consumer, KafkaException
import json,time,re


class MessageQueueClient():
def __init__(self, channel):
self.channel = channel
self.client_type = channel.channel_type

def consume_messages(self, broker, group_id, topic,user_id,user_nickname):
# 配置消费者
conf = {
'bootstrap.servers': broker,
'group.id': group_id,
'auto.offset.reset': 'earliest'
}

consumer = Consumer(conf)

try:
# 订阅主题
consumer.subscribe([topic])

print(f"开始消费主题 {topic} 的消息...")
while True:
# 拉取消息
msg = consumer.poll(timeout=0.3) # 超时时间 1 秒
if msg is None:
continue
if msg.error():
# 处理 Kafka 异常
if msg.error().code() == KafkaException._PARTITION_EOF:
print(f"分区末尾: {msg.topic()} [{msg.partition()}] {msg.offset()}")
else:
print(f"消费错误: {msg.error()}")
else:
# 打印消息
# print(f"收到消息: {msg.value().decode('utf-8')} (主题: {msg.topic()}, 分区: {msg.partition()}, 偏移: {msg.offset()})")
msg_content= msg.value().decode('utf-8')
# content=json.loads(msg_content)

cleaned_content = clean_json_string(msg_content)
content=json.loads(cleaned_content)
print(content["messageId"])
print(content["data"])
print(content["data"]["content"])

friends=itchat.get_friends(update=True)[1:]
# logger.info(friends)
# logger.info(f'好友列表{friends}')
# 提取所有好友的 NickName
friend_info = [{'NickName': friend['NickName'], 'UserName': friend['UserName']} for friend in friends]
content_text=content["data"]["content"]
# 打印好友信息
for info in friend_info:
print(f"NickName: {info['NickName']}, UserName: {info['UserName']}")
# if info['NickName'] in ['王韦(: )~','何潮华','laih']:
if info['NickName'] in ['爱扣美丽顾问@乐华']:
itchat.send(content["data"]["content"], toUserName=info['UserName'])
logger.info(f"{user_nickname} 向 {info['NickName']} 发送【 {content_text} 】")
time.sleep(3)
# itchat.send(content["data"]["content"], toUserName=info['UserName'])
# logger.info(f"{user_nickname} 向 {info['NickName']} 发送 {content_text}")
# time.sleep(3)
# itchat.send(content["data"]["content"], toUserName=receiver)
# 打印所有 NickName
# for nickname in nicknames:
# print(nickname)
except KeyboardInterrupt:
print("终止消费")
finally:
# 关闭消费者
consumer.close()

def start(channel):
global mq_client
mq_client = MessageQueueClient(channel=channel)
user_id = itchat.instance.storageClass.userName
name = itchat.instance.storageClass.nickName
mq_client.consume_messages('47.116.67.214:9092', 'ai-test-group', 'topic.ai.test',user_id,name)
def clean_json_string(json_str):
# 删除所有控制字符(非打印字符),包括换行符、回车符等
return re.sub(r'[\x00-\x1f\x7f]', '', json_str)

+ 20
- 0
common/platform_client.py View File

@@ -0,0 +1,20 @@
from bridge.context import Context, ContextType
from bridge.reply import Reply, ReplyType
from common.log import logger
from config import conf, pconf, plugin_config, available_setting
from plugins import PluginManager
import time


class PlatformClient():
def __init__(self, channel):
self.channel = channel
self.client_type = channel.channel_type

def on_message(self):
print('监听消息')

def start(channel):
global platform_client
platform_client = PlatformClient(channel=channel)
print('平台客户端开始')

+ 51
- 0
common/redis_helper.py View File

@@ -0,0 +1,51 @@
import redis
import os
from config import conf
# 定义全局 redis_helper
redis_helper = None

class RedisHelper:
def __init__(self, host='localhost', port=6379, password=None ,db=0):
# 初始化 Redis 连接
self.client = redis.Redis(host=host, port=port,db=db,password=password)

def set_hash(self, hash_key, data, timeout=None):
"""添加或更新哈希,并设置有效期"""
self.client.hset(hash_key, mapping=data)
if timeout:
# 设置有效期(单位:秒)
self.client.expire(hash_key, timeout)

def get_hash(self, hash_key):
"""获取整个哈希表数据"""
result = self.client.hgetall(hash_key)
# 将字节数据解码成字符串格式返回
return {k.decode('utf-8'): v.decode('utf-8') for k, v in result.items()}

def get_hash_field(self, hash_key, field):
"""获取哈希表中的单个字段值"""
result = self.client.hget(hash_key, field)
return result.decode('utf-8') if result else None

def delete_hash(self, hash_key):
"""删除整个哈希表"""
self.client.delete(hash_key)
def delete_hash_field(self, hash_key, field):
"""删除哈希表中的某个字段"""
self.client.hdel(hash_key, field)

def update_hash_field(self, hash_key, field, value):
"""更新哈希表中的某个字段"""
self.client.hset(hash_key, field, value)

def start():
global redis_helper
host=conf().get("redis_host")
port=conf().get("redis_port")
password=conf().get("redis_password")
db=conf().get("redis_db")
redis_helper = RedisHelper(host=host,port=port,password=password,db=db)


+ 8
- 12
config-dev.json View File

@@ -1,36 +1,32 @@
{
"channel_type": "wx",
"model": "7374349217580056592",
"open_ai_api_key": "sk-tPyolP9giSyBLhG6soygVf3rpFUFqdtqXobW1NkVnVps3QxGz2kWlhFuhZ7Zm4TQ",
"open_ai_api_key": "sk-tdi7u0zuLsR0JpPMGBeFZxymOpL0zoFVafX8EEEvEakIDAGQ22NyQ6w",
"open_ai_api_base": "http://106.15.182.218:3000/api/v1",
"claude_api_key": "YOUR API KEY",
"proxy": "",
"hot_reload": false,
"debug": true,
"single_chat_reply_prefix": "[小蕴]",
"single_chat_reply_prefix": "",
"group_chat_prefix": [
"小蕴",
"@AI好蕴"
"zhushou"
],
"group_name_white_list": [
"AI好蕴测试群",
"AI好蕴测试群2",
"AI好蕴测试群3",
"AI好蕴孕妈服务测试群"
],
"image_create_prefix": [
"画","识别","看"
],
"group_welcome_msg": "您好,我是AI好蕴健康顾问,非常高兴能够陪伴您一起踏上这一段美妙而重要的旅程。怀孕是生命中的一个特殊阶段,不仅承载着新生命的期许,也意味着您和家庭将迎来新的挑战和变化。在这个过程中,了解怀孕的意义、注意事项和如何进行科学的健康管理,对您和宝宝的健康至关重要。\n怀孕期的每一天都是新生命成长的过程,每一次胎动都让人感受到生命的奇迹。对于准妈妈来说,这是一段与宝宝建立深厚联系的时期。与此同时,这段时间也会让您对生活、家庭和未来有更深层次的认识和规划。\n怀孕不仅是生理上的变化,更是心理和情感上的一次洗礼。一个健康乐观的妈妈才能诞生一个阳光天使宝宝!\n通过科学的健康管理和正确的生活方式,您可以为自己和宝宝创造一个健康、安全的环境。我们专门给您设立了独立的服务支撑保证体系,包括各个方面的专家将为您提供贴身呵护和陪伴,为您提供专业的指导和支持,愿您度过一个平安、健康且愉快的孕期。\n如果你有任何健康问题咨询,可@我或输入“小蕴”,呼唤我为你服务!\n 【如果你有任何健康问题咨询,可@我、或语音“小蕴”呼唤我为你服务!】 \n祝您怀孕顺利,宝宝健康成长!",
"group_welcome_msg": "",
"trigger_by_self": true,
"voice_to_text":"ali",
"speech_recognition": true,
"group_speech_recognition": false,
"group_speech_recognition": true,
"voice_reply_voice": false,
"conversation_max_tokens": 2500,
"expires_in_seconds": 300,
"character_desc": "you are professional doctor",
"character_desc": "",
"temperature": 0.9,
"subscribe_msg": "感谢您的关注!\n这里是AI智能助手,可以自由对话。\n支持语音对话。\n支持图片输入。\n支持图片输出,画字开头的消息将按要求创作图片。\n支持tool、角色扮演和文字冒险等丰富的插件。\n输入{trigger_prefix}#help 查看详细指令。",
"subscribe_msg": "",
"use_linkai": false,
"linkai_api_key": "",
"linkai_app_code": ""


+ 17
- 16
config-test.json View File

@@ -1,30 +1,22 @@
{
"channel_type": "wx",
"model": "7374349217580056592",
"open_ai_api_key": "sk-flDQg2UT0fYZZsKtmIXrRUhLySpSOgWedQof6Vw2iYB0la2iF44AD",
"open_ai_api_key": "sk-jr69ONIehfGKe9JFphuNk4DU5Y5wooHKHhQv7oSnFzVbwCnW65fXO9kvH",
"open_ai_api_base": "http://106.15.182.218:3000/api/v1",
"claude_api_key": "YOUR API KEY",
"proxy": "",
"hot_reload": false,
"hot_reload": true,
"debug": false,
"single_chat_reply_prefix": "[小蕴]",
"single_chat_reply_prefix": "",
"group_chat_prefix": [
"小蕴",
"@AI好蕴",
"xiaoyun",
"xiaoyin",
"xiaoying",
"xiaoyue",
"xiaoyuan",
"xiaoxin"
"zhushou"
],
"group_name_white_list": [
"AI好蕴测试群3"
],
"image_create_prefix": [
"画","识别","看"
],
"group_welcome_msg": "您好,我是AI好蕴健康顾问,非常高兴能够陪伴您一起踏上这一段美妙而重要的旅程。怀孕是生命中的一个特殊阶段,不仅承载着新生命的期许,也意味着您和家庭将迎来新的挑战和变化。在这个过程中,了解怀孕的意义、注意事项和如何进行科学的健康管理,对您和宝宝的健康至关重要。\n怀孕期的每一天都是新生命成长的过程,每一次胎动都让人感受到生命的奇迹。对于准妈妈来说,这是一段与宝宝建立深厚联系的时期。与此同时,这段时间也会让您对生活、家庭和未来有更深层次的认识和规划。\n怀孕不仅是生理上的变化,更是心理和情感上的一次洗礼。一个健康乐观的妈妈才能诞生一个阳光天使宝宝!\n通过科学的健康管理和正确的生活方式,您可以为自己和宝宝创造一个健康、安全的环境。我们专门给您设立了独立的服务支撑保证体系,包括各个方面的专家将为您提供贴身呵护和陪伴,为您提供专业的指导和支持,愿您度过一个平安、健康且愉快的孕期。\n如果你有任何健康问题咨询,可@我或输入“小蕴”,呼唤我为你服务!\n 【如果你有任何健康问题咨询,可@我、或语音“小蕴”呼唤我为你服务!】 \n祝您怀孕顺利,宝宝健康成长!",
"group_welcome_msg": "",
"trigger_by_self": true,
"voice_to_text":"ali",
"speech_recognition": true,
@@ -32,10 +24,19 @@
"voice_reply_voice": false,
"conversation_max_tokens": 2500,
"expires_in_seconds": 300,
"character_desc": "you are professional doctor",
"character_desc": "",
"temperature": 0.9,
"subscribe_msg": "感谢您的关注!\n这里是AI智能助手,可以自由对话。\n支持语音对话。\n支持图片输入。\n支持图片输出,画字开头的消息将按要求创作图片。\n支持tool、角色扮演和文字冒险等丰富的插件。\n输入{trigger_prefix}#help 查看详细指令。",
"subscribe_msg": "",
"use_linkai": false,
"linkai_api_key": "",
"linkai_app_code": ""
"linkai_app_code": "",

"redis_host":"47.116.142.20",
"redis_port":8090,
"redis_password":"telpo#1234",
"redis_db":3,
"kafka_bootstrap_servers":"172.19.42.53:9092",

"aiops_api":"https://id.ssjlai.com/aiopsadmin"
}

+ 19
- 9
config.json View File

@@ -1,16 +1,15 @@
{
"channel_type": "wx",
"model": "7374349217580056592",
"open_ai_api_key": "sk-tPyolP9giSyBLhG6soygVf3rpFUFqdtqXobW1NkVnVps3QxGz2kWlhFuhZ7Zm4TQ",
"open_ai_api_key": "sk-tdi7u0zuLsR0JpPMGBeFZxymOpL0zoFVafX8EEEvEakIDAGQ22NyQ6w",
"open_ai_api_base": "http://106.15.182.218:3000/api/v1",
"claude_api_key": "YOUR API KEY",
"proxy": "",
"proxy": "",
"hot_reload": false,
"debug": false,
"single_chat_reply_prefix": "[小蕴]",
"single_chat_reply_prefix": "",
"group_chat_prefix": [
"小蕴",
"@AI好蕴"
"zhushou"
],
"group_name_white_list": [
"AI好蕴测试群3"
@@ -18,7 +17,7 @@
"image_create_prefix": [
"画","识别","看"
],
"group_welcome_msg": "您好,我是AI好蕴健康顾问,非常高兴能够陪伴您一起踏上这一段美妙而重要的旅程。怀孕是生命中的一个特殊阶段,不仅承载着新生命的期许,也意味着您和家庭将迎来新的挑战和变化。在这个过程中,了解怀孕的意义、注意事项和如何进行科学的健康管理,对您和宝宝的健康至关重要。\n怀孕期的每一天都是新生命成长的过程,每一次胎动都让人感受到生命的奇迹。对于准妈妈来说,这是一段与宝宝建立深厚联系的时期。与此同时,这段时间也会让您对生活、家庭和未来有更深层次的认识和规划。\n怀孕不仅是生理上的变化,更是心理和情感上的一次洗礼。一个健康乐观的妈妈才能诞生一个阳光天使宝宝!\n通过科学的健康管理和正确的生活方式,您可以为自己和宝宝创造一个健康、安全的环境。我们专门给您设立了独立的服务支撑保证体系,包括各个方面的专家将为您提供贴身呵护和陪伴,为您提供专业的指导和支持,愿您度过一个平安、健康且愉快的孕期。\n如果你有任何健康问题咨询,可@我或输入“小蕴”,呼唤我为你服务!\n 【如果你有任何健康问题咨询,可@我、或语音“小蕴”呼唤我为你服务!】 \n祝您怀孕顺利,宝宝健康成长!",
"group_welcome_msg": "",
"trigger_by_self": true,
"voice_to_text":"ali",
"speech_recognition": true,
@@ -26,10 +25,21 @@
"voice_reply_voice": false,
"conversation_max_tokens": 2500,
"expires_in_seconds": 300,
"character_desc": "you are professional doctor",
"character_desc": "",
"temperature": 0.9,
"subscribe_msg": "感谢您的关注!\n这里是AI智能助手,可以自由对话。\n支持语音对话。\n支持图片输入。\n支持图片输出,画字开头的消息将按要求创作图片。\n支持tool、角色扮演和文字冒险等丰富的插件。\n输入{trigger_prefix}#help 查看详细指令。",
"subscribe_msg": "",
"use_linkai": false,
"linkai_api_key": "",
"linkai_app_code": ""
"linkai_app_code": "",

"redis_host":"192.168.2.121",
"redis_port":8090,
"redis_password":"telpo#1234",
"redis_db":3,
"kafka_bootstrap_servers":"192.168.2.121:9092",

"aiops_api":"https://id.ssjlai.com/aiopsadmin"

}

+ 9
- 0
config.py View File

@@ -179,6 +179,15 @@ available_setting = {
"Minimax_api_key": "",
"Minimax_group_id": "",
"Minimax_base_url": "",
#redis 配置
"redis_host":"",
"redis_port":0,
"redis_password":"",
"redis_db":0,
# kafka配置
"kafka_bootstrap_servers":"",
# aiops平台
"aiops_api":""
}




+ 2
- 2
plugins/healthai/healthai.py View File

@@ -188,7 +188,7 @@ class healthai(Plugin):
text=self.params_cache[user_id]['previous_prompt']
logger.info(f'{text},{contains_keywords(text)}')

itchat_content= f'@{msg.actual_user_nickname}' if e_context['context']['isgroup'] else '[小蕴]'
itchat_content= f'@{msg.actual_user_nickname}' if e_context['context']['isgroup'] else ''
itchat_content+="已经收到,立刻为您服务"
flag=contains_keywords(text)
if flag==True:
@@ -263,7 +263,7 @@ class healthai(Plugin):
if previous_prompt and last_content and contains_keywords(previous_prompt):
logger.info('先回应')
receiver = user_id
itchat_content = f'@{msg.actual_user_nickname}' if is_group else '[小蕴]'
itchat_content = f'@{msg.actual_user_nickname}' if is_group else ''
itchat_content += "已经收到,立刻为您服务"

if contains_keywords(previous_prompt):


+ 2
- 0
plugins/healthai/requirements.txt View File

@@ -7,3 +7,5 @@ python-pptx
Pillow
oss2
pypinyin
confluent-kafka
kafka-python

+ 2
- 1
requirements.txt View File

@@ -9,4 +9,5 @@ pre-commit
web.py
linkai>=0.0.6.0
pypng
pypinyin
pypinyin
redis

Loading…
Cancel
Save