|
- """
- kafka客户端
- """
- import threading
- from lib import itchat
- from lib.itchat.content import *
- from common.log import logger
-
- from bridge.context import Context, ContextType
- from bridge.reply import Reply, ReplyType
-
- from confluent_kafka import Consumer, KafkaException
- import json,time,re
-
-
- class MessageQueueClient():
- def __init__(self, channel):
- self.channel = channel
- self.client_type = channel.channel_type
-
- def consume_messages(self, broker, group_id, topic,user_id,user_nickname):
- # 配置消费者
- conf = {
- 'bootstrap.servers': broker,
- 'group.id': group_id,
- 'auto.offset.reset': 'earliest'
- }
-
- consumer = Consumer(conf)
-
- try:
- # 订阅主题
- consumer.subscribe([topic])
-
- print(f"开始消费主题 {topic} 的消息...")
- while True:
- # 拉取消息
- msg = consumer.poll(timeout=0.3) # 超时时间 1 秒
- if msg is None:
- continue
- if msg.error():
- # 处理 Kafka 异常
- if msg.error().code() == KafkaException._PARTITION_EOF:
- print(f"分区末尾: {msg.topic()} [{msg.partition()}] {msg.offset()}")
- else:
- print(f"消费错误: {msg.error()}")
- else:
- # 打印消息
- # print(f"收到消息: {msg.value().decode('utf-8')} (主题: {msg.topic()}, 分区: {msg.partition()}, 偏移: {msg.offset()})")
- msg_content= msg.value().decode('utf-8')
- # content=json.loads(msg_content)
-
- cleaned_content = clean_json_string(msg_content)
- content=json.loads(cleaned_content)
- print(content["messageId"])
- print(content["data"])
- print(content["data"]["content"])
-
- friends=itchat.get_friends(update=True)[1:]
- # logger.info(friends)
- # logger.info(f'好友列表{friends}')
- # 提取所有好友的 NickName
- friend_info = [{'NickName': friend['NickName'], 'UserName': friend['UserName']} for friend in friends]
- content_text=content["data"]["content"]
- # 打印好友信息
- for info in friend_info:
- print(f"NickName: {info['NickName']}, UserName: {info['UserName']}")
- # if info['NickName'] in ['王韦(: )~','何潮华','laih']:
- if info['NickName'] in ['爱扣美丽顾问@乐华']:
- itchat.send(content["data"]["content"], toUserName=info['UserName'])
- logger.info(f"{user_nickname} 向 {info['NickName']} 发送【 {content_text} 】")
- time.sleep(3)
- # itchat.send(content["data"]["content"], toUserName=info['UserName'])
- # logger.info(f"{user_nickname} 向 {info['NickName']} 发送 {content_text}")
- # time.sleep(3)
-
- # itchat.send(content["data"]["content"], toUserName=receiver)
- # 打印所有 NickName
- # for nickname in nicknames:
- # print(nickname)
- except KeyboardInterrupt:
- print("终止消费")
- finally:
- # 关闭消费者
- consumer.close()
-
-
- def start(channel):
- global mq_client
- mq_client = MessageQueueClient(channel=channel)
- user_id = itchat.instance.storageClass.userName
- name = itchat.instance.storageClass.nickName
- mq_client.consume_messages('47.116.67.214:9092', 'ai-test-group', 'topic.ai.test',user_id,name)
-
- def clean_json_string(json_str):
- # 删除所有控制字符(非打印字符),包括换行符、回车符等
- return re.sub(r'[\x00-\x1f\x7f]', '', json_str)
|