SpringBoot RocketMQ发布订阅
【摘要】
SpringBoot RocketMQ发布订阅
pom
<!-- rocketMQ-->
<dependency>
&...
SpringBoot RocketMQ发布订阅
pom
<!-- rocketMQ-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.1.0-incubating</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-common</artifactId>
<version>4.1.0-incubating</version>
</dependency>
<!-- =======end======-->
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
application.properties
#RocketMQ
# 消费者的组名
apache.rocketmq.consumer.PushConsumer=orderConsumer
# 生产者的组名
apache.rocketmq.producer.producerGroup=Producer
apache.rocketmq.namesrvAddr=192.168.2.137:9876
- 1
- 2
- 3
- 4
- 5
- 6
发布订阅
@Component
public class MsgProdeucer {
private Logger log = LoggerFactory.getLogger(this.getClass());
@Value("${apache.rocketmq.producer.producerGroup}")
private String producerGroup;
@Value("${apache.rocketmq.namesrvAddr}")
private String namesrvAddr;
private DefaultMQProducer producer ;
public DefaultMQProducer getProducer(){
return this.producer;
}
public boolean sendDetectionInfoMsg(String msg){
try {
Message messagea = new Message("Detection","info",msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult resulta = producer.send(messagea);
return true;
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
} catch (MQClientException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
}
return false;
}
@PostConstruct
public void init() {
log.info("RocketMQ 开始初始化");
producer = new DefaultMQProducer(producerGroup);
producer.setNamesrvAddr(namesrvAddr);
producer.setVipChannelEnabled(false);
try {
producer.start();
log.info("RocketMQ 初始化成功!");
} catch (Exception e) {
e.printStackTrace();
log.info("RocketMQ 初始化失败!"+e.toString());
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
Controller
@RestController
@RequestMapping("/RocketMq")
public class RocketTestController {
@Autowired
MsgProdeucer prodeucer;
@GetMapping("/test")
public String test(){
boolean result = prodeucer.sendDetectionInfoMsg("你好");
if (!result){
return "err";
}
return "success";
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
监听订阅
@Component
public class MsgConsumer {
@Value("${apache.rocketmq.consumer.PushConsumer}")
private String consumerGroup;
@Value("${apache.rocketmq.namesrvAddr}")
private String namesrvAddr;
@PostConstruct
public void defaultMQPushConsumer() {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
//多个地址以 ; 隔开
consumer.setNamesrvAddr(namesrvAddr);
try {
consumer.subscribe("Detection", "*");
//CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,跳过历史消息
//CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//MessageListenerOrderly 这个是有序的
//MessageListenerConcurrently 这个是无序的,并行的方式处理,效率高很多
consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {
try {
for (MessageExt messageExt : list) {
System.out.println("messageExt: " + messageExt);
String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.out.println("消费响应:msgId : " + messageExt.getMsgId() + ", msgBody : " + messageBody);
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功
});
consumer.start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
文章来源: blog.csdn.net,作者:小毕超,版权归原作者所有,如需转载,请联系作者。
原文链接:blog.csdn.net/qq_43692950/article/details/107443773
【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)