You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

97 lines
4.0KB

  1. """
  2. kafka客户端
  3. """
  4. import threading
  5. from lib import itchat
  6. from lib.itchat.content import *
  7. from common.log import logger
  8. from bridge.context import Context, ContextType
  9. from bridge.reply import Reply, ReplyType
  10. from confluent_kafka import Consumer, KafkaException
  11. import json,time,re
  12. class MessageQueueClient():
  13. def __init__(self, channel):
  14. self.channel = channel
  15. self.client_type = channel.channel_type
  16. def consume_messages(self, broker, group_id, topic,user_id,user_nickname):
  17. # 配置消费者
  18. conf = {
  19. 'bootstrap.servers': broker,
  20. 'group.id': group_id,
  21. 'auto.offset.reset': 'earliest'
  22. }
  23. consumer = Consumer(conf)
  24. try:
  25. # 订阅主题
  26. consumer.subscribe([topic])
  27. print(f"开始消费主题 {topic} 的消息...")
  28. while True:
  29. # 拉取消息
  30. msg = consumer.poll(timeout=0.3) # 超时时间 1 秒
  31. if msg is None:
  32. continue
  33. if msg.error():
  34. # 处理 Kafka 异常
  35. if msg.error().code() == KafkaException._PARTITION_EOF:
  36. print(f"分区末尾: {msg.topic()} [{msg.partition()}] {msg.offset()}")
  37. else:
  38. print(f"消费错误: {msg.error()}")
  39. else:
  40. # 打印消息
  41. # print(f"收到消息: {msg.value().decode('utf-8')} (主题: {msg.topic()}, 分区: {msg.partition()}, 偏移: {msg.offset()})")
  42. msg_content= msg.value().decode('utf-8')
  43. # content=json.loads(msg_content)
  44. cleaned_content = clean_json_string(msg_content)
  45. content=json.loads(cleaned_content)
  46. print(content["messageId"])
  47. print(content["data"])
  48. print(content["data"]["content"])
  49. friends=itchat.get_friends(update=True)[1:]
  50. # logger.info(friends)
  51. # logger.info(f'好友列表{friends}')
  52. # 提取所有好友的 NickName
  53. friend_info = [{'NickName': friend['NickName'], 'UserName': friend['UserName']} for friend in friends]
  54. content_text=content["data"]["content"]
  55. # 打印好友信息
  56. for info in friend_info:
  57. print(f"NickName: {info['NickName']}, UserName: {info['UserName']}")
  58. # if info['NickName'] in ['王韦(: )~','何潮华','laih']:
  59. if info['NickName'] in ['爱扣美丽顾问@乐华']:
  60. itchat.send(content["data"]["content"], toUserName=info['UserName'])
  61. logger.info(f"{user_nickname} 向 {info['NickName']} 发送【 {content_text} 】")
  62. time.sleep(3)
  63. # itchat.send(content["data"]["content"], toUserName=info['UserName'])
  64. # logger.info(f"{user_nickname} 向 {info['NickName']} 发送 {content_text}")
  65. # time.sleep(3)
  66. # itchat.send(content["data"]["content"], toUserName=receiver)
  67. # 打印所有 NickName
  68. # for nickname in nicknames:
  69. # print(nickname)
  70. except KeyboardInterrupt:
  71. print("终止消费")
  72. finally:
  73. # 关闭消费者
  74. consumer.close()
  75. def start(channel):
  76. global mq_client
  77. mq_client = MessageQueueClient(channel=channel)
  78. user_id = itchat.instance.storageClass.userName
  79. name = itchat.instance.storageClass.nickName
  80. mq_client.consume_messages('47.116.67.214:9092', 'ai-test-group', 'topic.ai.test',user_id,name)
  81. def clean_json_string(json_str):
  82. # 删除所有控制字符(非打印字符),包括换行符、回车符等
  83. return re.sub(r'[\x00-\x1f\x7f]', '', json_str)