RocketMQ第四章:手把手教老婆实现-顺序消息生产者和顺序消息消费者
-
技术活,该赏
-
关注+一键三连(点赞,评论,收藏)再看,养成好习惯
目录
第一节:介绍
顺序消息含义介绍
顺序消息指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。
原理解析
在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);
而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。
但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。如下图所示:
当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
第二节:顺序消息-生产者和消息者步骤说明
顺序消息生产者代码实现步骤
1.创建消息生产者producer并制定生产者组名
2.指定Nameserver地址
3.启动producer
4.创建消息对象,指定主题Topic、Tag和消息体
5.发送消息,选择的send方法有三个参数:
- * 参数一:消息对象
- * 参数二:消息队列的选择器
- * 参数三:选择队列的业务标识 6.关闭生产者producer
顺序消息消费者代码实现步骤
1.创建消费者Consumer,制定消费者组名
2.指定Nameserver地址
3.订阅主题Topic和Tag
4.设置回调函数,处理消息:与普通消息的差别,这里用的是MessageListenerOrderly
5.启动消费者consumer
注意:消费者的 Topic 和 Tag 需要和生产者保持一致
第三节:顺序消息生产者
-
public class Producer {
-
-
public static void main(String[] args) throws Exception {
-
// 1.创建消息生产者producer,并制定生产者组名
-
DefaultMQProducer producer = new DefaultMQProducer("demo_producer_order_group");
-
// 2.指定Nameserver地址
-
producer.setNamesrvAddr("192.168.88.131:9876");
-
// 3.启动producer
-
producer.start();
-
System.out.println("生产者启动");
-
for (int i = 0; i < 20; i++) {
-
// 4.创建消息对象,指定主题Topic、Tag和消息体
-
-
Message msg = new Message("Topic_order_demo", "Tag_order_demo",
-
("Hello 虚竹,这是顺序消息" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
-
// 5.发送消息
-
/**
-
* 参数一:消息对象
-
* 参数二:消息队列的选择器
-
* 参数三:选择队列的业务标识
-
*/
-
SendResult result = producer.send(msg, new MessageQueueSelector() {
-
/**
-
*
-
* @param mqs:队列集合
-
* @param msg:消息对象
-
* @param arg:业务标识的参数
-
* @return
-
*/
-
@Override
-
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
-
Integer index = (Integer) arg;
-
return mqs.get(index);
-
}
-
}, 1);
-
System.out.println("发送结果:" + msg.toString());
-
}
-
-
// 6.关闭生产者producer
-
producer.shutdown();
-
System.out.println("生产者关闭");
-
}
-
}
效果:
第四节:顺序消息消费者
-
public class Consumer {
-
-
public static void main(String[] args) throws Exception {
-
//1.创建消费者Consumer,制定消费者组名
-
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_consumer_order_group");
-
//2.指定Nameserver地址
-
consumer.setNamesrvAddr("192.168.88.131:9876");
-
-
//消息拉取最大条数
-
consumer.setConsumeMessageBatchMaxSize(2);
-
//3.订阅主题Topic和Tag
-
consumer.subscribe("Topic_order_demo", "*");
-
-
//4.设置回调函数,处理消息
-
consumer.registerMessageListener(new MessageListenerOrderly() {
-
//接受消息内容
-
@Override
-
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext consumeOrderlyContext) {
-
for (MessageExt msg : msgs) {
-
-
try {
-
//获取主题
-
String topic = msg.getTopic();
-
//获取标签
-
String tags = msg.getTags();
-
//获取信息
-
byte[] body = msg.getBody();
-
String result = new String(body, RemotingHelper.DEFAULT_CHARSET);
-
System.out.println("Consumer消费信息:topic:" + topic + ",tags:"+tags+
-
",result"+ result);
-
-
} catch (UnsupportedEncodingException e) {
-
e.printStackTrace();
-
//重试
-
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
-
}
-
}
-
return ConsumeOrderlyStatus.SUCCESS;
-
}
-
});
-
//5.启动消费者consumer
-
consumer.start();
-
}
-
}
效果:
文章来源: xiaoxuzhu.blog.csdn.net,作者:小虚竹,版权归原作者所有,如需转载,请联系作者。
原文链接:xiaoxuzhu.blog.csdn.net/article/details/115596634
- 点赞
- 收藏
- 关注作者
评论(0)