|
|
@@ -1,5 +1,5 @@ |
|
|
|
from confluent_kafka import Producer, Consumer, KafkaException, KafkaError |
|
|
|
import os,time |
|
|
|
import os,time,threading |
|
|
|
from config import conf |
|
|
|
# 定义全局 redis_helper |
|
|
|
kafka_client = None |
|
|
@@ -9,7 +9,8 @@ class KafkaClient: |
|
|
|
|
|
|
|
bootstrap_servers=conf().get("kafka_bootstrap_servers") |
|
|
|
agent_tel=os.environ.get('tel', '18029274615') |
|
|
|
consumer_group=f'aiops-wx_{agent_tel}' |
|
|
|
#consumer_group=f'aiops-wx_{agent_tel}' |
|
|
|
consumer_group='ai-ops-wx' |
|
|
|
print(f'kafka消费组 {consumer_group}') |
|
|
|
topic="topic.ai.ops.wx" |
|
|
|
|
|
|
@@ -22,8 +23,9 @@ class KafkaClient: |
|
|
|
'bootstrap.servers': self.bootstrap_servers, |
|
|
|
'group.id': self.consumer_group, |
|
|
|
'auto.offset.reset': 'earliest', |
|
|
|
# 'enable.auto.commit': False # 禁用自动提交,使用手动提交 |
|
|
|
'enable.auto.commit': True |
|
|
|
#'enable.auto.commit': False # 禁用自动提交,使用手动提交 |
|
|
|
'enable.auto.commit': True, |
|
|
|
'max.poll.interval.ms': 600000 |
|
|
|
}) |
|
|
|
|
|
|
|
def delivery_report(self, err, msg): |
|
|
@@ -72,7 +74,7 @@ class KafkaClient: |
|
|
|
|
|
|
|
try: |
|
|
|
while True: |
|
|
|
msg = self.consumer.poll(0.3) |
|
|
|
msg = self.consumer.poll(0.1) |
|
|
|
if msg is None: |
|
|
|
continue |
|
|
|
if msg.error(): |
|
|
@@ -84,7 +86,9 @@ class KafkaClient: |
|
|
|
# 调用业务处理逻辑 |
|
|
|
# process_callback(msg.value().decode('utf-8')) |
|
|
|
# 调用业务处理逻辑,传递 user_nickname 和消息 |
|
|
|
process_callback(msg.value().decode('utf-8')) |
|
|
|
|
|
|
|
# process_callback(msg.value().decode('utf-8')) |
|
|
|
threading.Thread(target=process_callback, args=(msg.value().decode('utf-8'),)).start() |
|
|
|
# if process_callback(user_nickname, msg.value().decode('utf-8')): |
|
|
|
# # 如果返回 True,表示处理成功,可以提交偏移量 |
|
|
|
# try: |
|
|
|