浏览代码

Improve RocketMQ integration example (#1757)

- The demo was unable to run and stop because of missing namesrv configuration, and now fixed.
master
Peine GitHub 3 年前
父节点
当前提交
fe59485fa9
找不到此签名对应的密钥 GPG 密钥 ID: 4AEE18F83AFDEB23
共有 3 个文件被更改,包括 20 次插入7 次删除
  1. +1
    -0
      sentinel-demo/sentinel-demo-rocketmq/src/main/java/com/alibaba/csp/sentinel/demo/rocketmq/Constants.java
  2. +9
    -2
      sentinel-demo/sentinel-demo-rocketmq/src/main/java/com/alibaba/csp/sentinel/demo/rocketmq/PullConsumerDemo.java
  3. +10
    -5
      sentinel-demo/sentinel-demo-rocketmq/src/main/java/com/alibaba/csp/sentinel/demo/rocketmq/SyncProducer.java

+ 1
- 0
sentinel-demo/sentinel-demo-rocketmq/src/main/java/com/alibaba/csp/sentinel/demo/rocketmq/Constants.java 查看文件

@@ -19,6 +19,7 @@ public final class Constants {

public static final String TEST_GROUP_NAME = "sentinel-group";
public static final String TEST_TOPIC_NAME = "SentinelTopicTest";
public static final String TEST_NAMESRV_ADDR = "127.0.0.1:9876";

private Constants() {}
}

+ 9
- 2
sentinel-demo/sentinel-demo-rocketmq/src/main/java/com/alibaba/csp/sentinel/demo/rocketmq/PullConsumerDemo.java 查看文件

@@ -17,6 +17,7 @@ package com.alibaba.csp.sentinel.demo.rocketmq;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
@@ -55,10 +56,16 @@ public class PullConsumerDemo {
initFlowControlRule();

DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(Constants.TEST_GROUP_NAME);
consumer.setNamesrvAddr(Constants.TEST_NAMESRV_ADDR);
consumer.start();

Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(Constants.TEST_TOPIC_NAME);
Set<MessageQueue> mqs = new HashSet<>();
try {
mqs = consumer.fetchSubscribeMessageQueues(Constants.TEST_TOPIC_NAME);
} catch (Exception e) {
e.printStackTrace();
}

for (MessageQueue mq : mqs) {
System.out.printf("Consuming messages from the queue: %s%n", mq);
SINGLE_MQ:


+ 10
- 5
sentinel-demo/sentinel-demo-rocketmq/src/main/java/com/alibaba/csp/sentinel/demo/rocketmq/SyncProducer.java 查看文件

@@ -24,8 +24,8 @@ public class SyncProducer {

public static void main(String[] args) throws Exception {
// Instantiate with a producer group name.
DefaultMQProducer producer = new
DefaultMQProducer(Constants.TEST_GROUP_NAME);
DefaultMQProducer producer = new DefaultMQProducer(Constants.TEST_GROUP_NAME);
producer.setNamesrvAddr(Constants.TEST_NAMESRV_ADDR);
// Launch the instance.
producer.start();
for (int i = 0; i < 1000; i++) {
@@ -33,9 +33,14 @@ public class SyncProducer {
Message msg = new Message(Constants.TEST_TOPIC_NAME, "TagA",
("Hello RocketMQ From Sentinel " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// Call send message to deliver message to one of brokers.
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);

try {
// Call send message to deliver message to one of brokers.
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
}
}
// Shut down once the producer instance is not longer in use.
producer.shutdown();


正在加载...
取消
保存