From fe59485fa90eca9b94c9dd9eefb28e970b3c64f0 Mon Sep 17 00:00:00 2001 From: Peine Date: Wed, 3 Feb 2021 09:46:56 +0800 Subject: [PATCH] Improve RocketMQ integration example (#1757) - The demo was unable to run and stop because of missing namesrv configuration, and now fixed. --- .../csp/sentinel/demo/rocketmq/Constants.java | 1 + .../sentinel/demo/rocketmq/PullConsumerDemo.java | 11 +++++++++-- .../csp/sentinel/demo/rocketmq/SyncProducer.java | 15 ++++++++++----- 3 files changed, 20 insertions(+), 7 deletions(-) diff --git a/sentinel-demo/sentinel-demo-rocketmq/src/main/java/com/alibaba/csp/sentinel/demo/rocketmq/Constants.java b/sentinel-demo/sentinel-demo-rocketmq/src/main/java/com/alibaba/csp/sentinel/demo/rocketmq/Constants.java index 2f67dc6c..d16f7cc2 100755 --- a/sentinel-demo/sentinel-demo-rocketmq/src/main/java/com/alibaba/csp/sentinel/demo/rocketmq/Constants.java +++ b/sentinel-demo/sentinel-demo-rocketmq/src/main/java/com/alibaba/csp/sentinel/demo/rocketmq/Constants.java @@ -19,6 +19,7 @@ public final class Constants { public static final String TEST_GROUP_NAME = "sentinel-group"; public static final String TEST_TOPIC_NAME = "SentinelTopicTest"; + public static final String TEST_NAMESRV_ADDR = "127.0.0.1:9876"; private Constants() {} } diff --git a/sentinel-demo/sentinel-demo-rocketmq/src/main/java/com/alibaba/csp/sentinel/demo/rocketmq/PullConsumerDemo.java b/sentinel-demo/sentinel-demo-rocketmq/src/main/java/com/alibaba/csp/sentinel/demo/rocketmq/PullConsumerDemo.java index 522c707f..301cca6a 100755 --- a/sentinel-demo/sentinel-demo-rocketmq/src/main/java/com/alibaba/csp/sentinel/demo/rocketmq/PullConsumerDemo.java +++ b/sentinel-demo/sentinel-demo-rocketmq/src/main/java/com/alibaba/csp/sentinel/demo/rocketmq/PullConsumerDemo.java @@ -17,6 +17,7 @@ package com.alibaba.csp.sentinel.demo.rocketmq; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; @@ -55,10 +56,16 @@ public class PullConsumerDemo { initFlowControlRule(); DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(Constants.TEST_GROUP_NAME); - + consumer.setNamesrvAddr(Constants.TEST_NAMESRV_ADDR); consumer.start(); - Set mqs = consumer.fetchSubscribeMessageQueues(Constants.TEST_TOPIC_NAME); + Set mqs = new HashSet<>(); + try { + mqs = consumer.fetchSubscribeMessageQueues(Constants.TEST_TOPIC_NAME); + } catch (Exception e) { + e.printStackTrace(); + } + for (MessageQueue mq : mqs) { System.out.printf("Consuming messages from the queue: %s%n", mq); SINGLE_MQ: diff --git a/sentinel-demo/sentinel-demo-rocketmq/src/main/java/com/alibaba/csp/sentinel/demo/rocketmq/SyncProducer.java b/sentinel-demo/sentinel-demo-rocketmq/src/main/java/com/alibaba/csp/sentinel/demo/rocketmq/SyncProducer.java index 8cae72a1..c55bb506 100755 --- a/sentinel-demo/sentinel-demo-rocketmq/src/main/java/com/alibaba/csp/sentinel/demo/rocketmq/SyncProducer.java +++ b/sentinel-demo/sentinel-demo-rocketmq/src/main/java/com/alibaba/csp/sentinel/demo/rocketmq/SyncProducer.java @@ -24,8 +24,8 @@ public class SyncProducer { public static void main(String[] args) throws Exception { // Instantiate with a producer group name. - DefaultMQProducer producer = new - DefaultMQProducer(Constants.TEST_GROUP_NAME); + DefaultMQProducer producer = new DefaultMQProducer(Constants.TEST_GROUP_NAME); + producer.setNamesrvAddr(Constants.TEST_NAMESRV_ADDR); // Launch the instance. producer.start(); for (int i = 0; i < 1000; i++) { @@ -33,9 +33,14 @@ public class SyncProducer { Message msg = new Message(Constants.TEST_TOPIC_NAME, "TagA", ("Hello RocketMQ From Sentinel " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) ); - // Call send message to deliver message to one of brokers. - SendResult sendResult = producer.send(msg); - System.out.printf("%s%n", sendResult); + + try { + // Call send message to deliver message to one of brokers. + SendResult sendResult = producer.send(msg); + System.out.printf("%s%n", sendResult); + } catch (Exception e) { + e.printStackTrace(); + } } // Shut down once the producer instance is not longer in use. producer.shutdown();