RocketMQ第四章:手把手教老婆实现-顺序消息生产者和顺序消息消费者
【摘要】
技术活,该赏关注+一键三连(点赞,评论,收藏)再看,养成好习惯
RocketMQ使用教程相关系列 目录
目录
第一节:介绍
顺序消息含义介绍
原理解析
第二节:顺序消息-生产者和消息者步骤说明
顺序消息生产者代码实现步骤
顺序消息消费者代码实现步骤
第三节:顺序消息生产者
效果:
第四节:顺序消息消费者
效果:...
技术活,该赏
关注+一键三连(点赞,评论,收藏)再看,养成好习惯
目录
第一节:介绍
顺序消息含义介绍
顺序消息指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。
原理解析
在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);
而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。
但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。如下图所示:
当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
第二节:顺序消息-生产者和消息者步骤说明
顺序消息生产者代码实现步骤
1.创建消息生产者producer并制定生产者组名
2.指定Nameserver地址
3.启动producer
4.创建消息对象,指定主题Topic、Tag和消息体
5.发送消息,选择的send方法有三个参数:
- * 参数一:消息对象
- * 参数二:消息队列的选择器
- * 参数三:选择队列的业务标识 6.关闭生产者producer
顺序消息消费者代码实现步骤
1.创建消费者Consumer,制定消费者组名
2.指定Nameserver地址
3.订阅主题Topic和Tag
4.设置回调函数,处理消息:与普通消息的差别,这里用的是MessageListenerOrderly
5.启动消费者consumer
注意:消费者的 Topic 和 Tag 需要和生产者保持一致
第三节:顺序消息生产者
public class Producer {
public static void main(String[] args) throws Exception {
// 1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("demo_producer_order_group");
// 2.指定Nameserver地址
producer.setNamesrvAddr("192.168.88.131:9876");
// 3.启动producer
producer.start();
System.out.println("生产者启动");
for (int i = 0; i < 20; i++) {
// 4.创建消息对象,指定主题Topic、Tag和消息体
Message msg = new Message("Topic_order_demo", "Tag_order_demo",
("Hello 虚竹,这是顺序消息" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 5.发送消息
/**
* 参数一:消息对象
* 参数二:消息队列的选择器
* 参数三:选择队列的业务标识
*/
SendResult result = producer.send(msg, new MessageQueueSelector() {
/**
*
* @param mqs:队列集合
* @param msg:消息对象
* @param arg:业务标识的参数
* @return
*/
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer index = (Integer) arg;
return mqs.get(index);
}
}, 1);
System.out.println("发送结果:" + msg.toString());
}
// 6.关闭生产者producer
producer.shutdown();
System.out.println("生产者关闭");
}
}
效果:
第四节:顺序消息消费者
public class Consumer {
public static void main(String[] args) throws Exception {
//1.创建消费者Consumer,制定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_consumer_order_group");
//2.指定Nameserver地址
consumer.setNamesrvAddr("192.168.88.131:9876");
//消息拉取最大条数
consumer.setConsumeMessageBatchMaxSize(2);
//3.订阅主题Topic和Tag
consumer.subscribe("Topic_order_demo", "*");
//4.设置回调函数,处理消息
consumer.registerMessageListener(new MessageListenerOrderly() {
//接受消息内容
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext consumeOrderlyContext) {
for (MessageExt msg : msgs) {
try {
//获取主题
String topic = msg.getTopic();
//获取标签
String tags = msg.getTags();
//获取信息
byte[] body = msg.getBody();
String result = new String(body, RemotingHelper.DEFAULT_CHARSET);
System.out.println("Consumer消费信息:topic:" + topic + ",tags:"+tags+
",result"+ result);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
//重试
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
//5.启动消费者consumer
consumer.start();
}
}
效果:
文章来源: xiaoxuzhu.blog.csdn.net,作者:小虚竹,版权归原作者所有,如需转载,请联系作者。
原文链接:xiaoxuzhu.blog.csdn.net/article/details/115596634
【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
作者其他文章
评论(0)