RocketMQ第三章:手把手教老婆实现:普通消息(同步,异步和单向)的生产者和消费者
【摘要】
技术活,该赏关注+一键三连(点赞,评论,收藏)再看,养成好习惯
RocketMQ使用教程相关系列 目录
目录
第一节:项目准备
第二节:普通消息-生产者和消息者步骤说明
普通消息生产者代码实现步骤
普通消息消费者代码实现步骤
第三节:普通消息-同步消息
生产者
效果:
消费者
效果:
第...
-
技术活,该赏
-
关注+一键三连(点赞,评论,收藏)再看,养成好习惯
目录
第一节:项目准备
rocketmq-client依赖
-
<groupId>org.apache.rocketmq</groupId>
-
<artifactId>rocketmq-client</artifactId>
-
<version>4.4.0</version>
第二节:普通消息-生产者和消息者步骤说明
普通消息生产者代码实现步骤
- 1.创建消息生产者producer,并制定生产者组名
- 2.指定Nameserver地址
- 3.启动producer
- 4.创建消息对象 Message,指定主题Topic、Tag和消息体
- 5.发送消息
- 6.关闭生产者producer
普通消息消费者代码实现步骤
- 1.创建消费者Consumer,制定消费者组名
- 2.指定Nameserver地址
- 3.订阅主题Topic和Tag
- 4.设置回调函数,处理消息
- 5.启动消费者consumer
注意:消费者的 Topic 和 Tag 需要和生产者保持一致
第三节:普通消息-同步消息
同步消息这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
生产者
-
public class ProducerSync {
-
-
public static void main(String[] args) throws Exception {
-
// 1.创建消息生产者producer,并制定生产者组名
-
DefaultMQProducer producer = new DefaultMQProducer("demo_producer_group");
-
// 2.指定Nameserver地址
-
producer.setNamesrvAddr("192.168.88.131:9876");
-
// 3.启动producer
-
System.out.println("生产者启动");
-
producer.start();
-
-
for (int i = 0; i < 3; i++) {
-
// 4.创建消息对象,指定主题Topic、Tag和消息体
-
/**
-
* 参数一:消息主题Topic
-
* 参数二:消息Tag
-
* 参数三:消息内容
-
*/
-
Message msg = new Message("Topic_demo_sync", "Tag_demo_sync",
-
("Hello 虚竹," + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
-
// 5.发送同步消息
-
SendResult sendResult = producer.send(msg);
-
System.out.println("发送结果:" + sendResult);
-
}
-
-
// 6.关闭生产者producer
-
producer.shutdown();
-
System.out.println("生产者关闭");
-
}
-
}
效果:
消费者
-
public class ConsumerSync {
-
-
public static void main(String[] args) throws Exception {
-
// 1.创建消费者Consumer,制定消费者组名
-
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_consumer_group");
-
// 2.指定Nameserver地址
-
consumer.setNamesrvAddr("192.168.88.131:9876");
-
-
// 消息拉取最大条数
-
consumer.setConsumeMessageBatchMaxSize(2);
-
// 3.订阅主题Topic和Tag
-
consumer.subscribe("Topic_demo_sync", "*");
-
-
// 4.设置回调函数,处理消息
-
consumer.registerMessageListener(new MessageListenerConcurrently() {
-
-
// 接受消息内容
-
@Override
-
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
-
for (MessageExt msg : msgs) {
-
-
try {
-
// 获取主题
-
String topic = msg.getTopic();
-
// 获取标签
-
String tags = msg.getTags();
-
// 获取信息
-
byte[] body = msg.getBody();
-
String result = new String(body, RemotingHelper.DEFAULT_CHARSET);
-
System.out.println("Consumer消费信息:topic:" + topic + ",tags:" + tags + ",result:" + result);
-
-
} catch (UnsupportedEncodingException e) {
-
e.printStackTrace();
-
}
-
}
-
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-
}
-
});
-
// 5.启动消费者consumer
-
consumer.start();
-
}
-
}
效果:
第四节:普通消息-异步消息
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
生产者
-
public class ProducerASync {
-
-
public static void main(String[] args) throws Exception {
-
// 1.创建消息生产者producer,并制定生产者组名
-
DefaultMQProducer producer = new DefaultMQProducer("demo_producer_group");
-
// 2.指定Nameserver地址
-
producer.setNamesrvAddr("192.168.88.131:9876");
-
// 3.启动producer
-
System.out.println("生产者启动");
-
producer.start();
-
-
for (int i = 0; i < 3; i++) {
-
// 4.创建消息对象,指定主题Topic、Tag和消息体
-
/**
-
* 参数一:消息主题Topic
-
* 参数二:消息Tag
-
* 参数三:消息内容
-
*/
-
Message msg = new Message("Topic_demo_async", "Tag_demo_async",
-
("Hello 虚竹," + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
-
// 发送消息到一个Broker,异步发送没有返回值,需要使用 SendCallback 接收异步返回结果的回调
-
producer.send(msg, new SendCallback() {
-
@Override
-
public void onSuccess(SendResult sendResult) {
-
System.out.println("发送成功:" + sendResult);
-
}
-
-
@Override
-
public void onException(Throwable throwable) {
-
System.out.println("发送异常:" + throwable.getMessage());
-
}
-
});
-
-
}
-
Thread.sleep(2000);
-
// 6.关闭生产者producer
-
producer.shutdown();
-
System.out.println("生产者关闭");
-
}
-
}
效果:
消费者:
-
public class ConsumerASync {
-
-
public static void main(String[] args) throws Exception {
-
// 1.创建消费者Consumer,制定消费者组名
-
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_consumer_group");
-
// 2.指定Nameserver地址
-
consumer.setNamesrvAddr("192.168.88.131:9876");
-
-
// 消息拉取最大条数
-
consumer.setConsumeMessageBatchMaxSize(2);
-
// 3.订阅主题Topic和Tag
-
consumer.subscribe("Topic_demo_async", "*");
-
-
// 4.设置回调函数,处理消息
-
consumer.registerMessageListener(new MessageListenerConcurrently() {
-
-
// 接受消息内容
-
@Override
-
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
-
for (MessageExt msg : msgs) {
-
-
try {
-
// 获取主题
-
String topic = msg.getTopic();
-
// 获取标签
-
String tags = msg.getTags();
-
// 获取信息
-
byte[] body = msg.getBody();
-
String result = new String(body, RemotingHelper.DEFAULT_CHARSET);
-
System.out.println("Consumer消费信息:topic:" + topic + ",tags:" + tags + ",result:" + result);
-
-
} catch (UnsupportedEncodingException e) {
-
e.printStackTrace();
-
}
-
}
-
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-
}
-
});
-
// 5.启动消费者consumer
-
consumer.start();
-
}
-
}
效果:
分析:
从消费者代码中可以看出:异步消费者的实现方式和同步消费者实现方式并无区别。
从消费者的结果来看:可以看出异步消费者接收的结果是无序的。
第五节:普通消息-单向消息
这种方式主要用在不特别关心发送结果的场景,例如日志发送。
生产者
-
public class Producer {
-
-
public static void main(String[] args) throws Exception {
-
// 1.创建消息生产者producer,并制定生产者组名
-
DefaultMQProducer producer = new DefaultMQProducer("demo_producer_group");
-
// 2.指定Nameserver地址
-
producer.setNamesrvAddr("192.168.88.131:9876");
-
// 3.启动producer
-
producer.start();
-
System.out.println("生产者启动");
-
for (int i = 0; i < 20; i++) {
-
// 4.创建消息对象,指定主题Topic、Tag和消息体
-
/**
-
* 参数一:消息主题Topic
-
* 参数二:消息Tag
-
* 参数三:消息内容
-
*/
-
Message msg = new Message("Topic_demo", "Tag_demo",
-
("Hello 虚竹," + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
-
// 5.发送单向消息
-
producer.sendOneway(msg);
-
System.out.println("发送结果:" + msg);
-
// 线程睡1秒
-
}
-
-
// 6.关闭生产者producer
-
producer.shutdown();
-
System.out.println("生产者关闭");
-
}
-
}
效果:
消费者
-
public class Consumer {
-
-
public static void main(String[] args) throws Exception {
-
// 1.创建消费者Consumer,制定消费者组名
-
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_consumer_group");
-
// 2.指定Nameserver地址
-
consumer.setNamesrvAddr("192.168.88.131:9876");
-
-
// 消息拉取最大条数
-
consumer.setConsumeMessageBatchMaxSize(2);
-
// 3.订阅主题Topic和Tag
-
consumer.subscribe("Topic_demo", "*");
-
-
// 4.设置回调函数,处理消息
-
consumer.registerMessageListener(new MessageListenerConcurrently() {
-
-
// 接受消息内容
-
@Override
-
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
-
for (MessageExt msg : msgs) {
-
-
try {
-
// 获取主题
-
String topic = msg.getTopic();
-
// 获取标签
-
String tags = msg.getTags();
-
// 获取信息
-
byte[] body = msg.getBody();
-
String result = new String(body, RemotingHelper.DEFAULT_CHARSET);
-
System.out.println("Consumer消费信息:topic:" + topic + ",tags:" + tags + ",result:" + result);
-
-
} catch (UnsupportedEncodingException e) {
-
e.printStackTrace();
-
}
-
}
-
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-
}
-
});
-
// 5.启动消费者consumer
-
consumer.start();
-
}
-
}
效果:
分析:
从消费结果来看,消费的顺序是无序的
第六节:总结
同步消息、异步消息和单向消息的消费者实现方式是一样的。
同步消息、异步消息和单向消息的区别在于消息的发送方。
异步消息生产者没有返回值,需要使用 SendCallback 接收异步返回结果的回调。
异步消息生产者,在关闭实例之前,建议进行休眠。
单向消息也是没有返回值的,并且它的消费者也是无序消费。
单向消息和异步消息的区别是单向消息不需要 SendCallback 来接收异步返回结果的回调。
文章来源: xiaoxuzhu.blog.csdn.net,作者:小虚竹,版权归原作者所有,如需转载,请联系作者。
原文链接:xiaoxuzhu.blog.csdn.net/article/details/115584781
【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)