RocketMQ第四章:手把手教老婆实现-顺序消息生产者和顺序消息消费者

举报
小虚竹 发表于 2021/10/19 01:17:14 2021/10/19
【摘要】 技术活,该赏关注+一键三连(点赞,评论,收藏)再看,养成好习惯 RocketMQ使用教程相关系列 目录 目录 第一节:介绍 顺序消息含义介绍 原理解析 第二节:顺序消息-生产者和消息者步骤说明 顺序消息生产者代码实现步骤 顺序消息消费者代码实现步骤 第三节:顺序消息生产者 效果: 第四节:顺序消息消费者 效果:...

  
  1. 技术活,该赏
  2. 关注+一键三连(点赞,评论,收藏)再看,养成好习惯

RocketMQ使用教程相关系列 目录


目录

第一节:介绍

顺序消息含义介绍

原理解析

第二节:顺序消息-生产者和消息者步骤说明

顺序消息生产者代码实现步骤

顺序消息消费者代码实现步骤

第三节:顺序消息生产者

效果:

第四节:顺序消息消费者

效果:


第一节:介绍

顺序消息含义介绍

顺序消息指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。

原理解析

在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);

而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。

但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。如下图所示:

当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。

第二节:顺序消息-生产者和消息者步骤说明

顺序消息生产者代码实现步骤

1.创建消息生产者producer并制定生产者组名

2.指定Nameserver地址

3.启动producer

4.创建消息对象,指定主题Topic、Tag和消息体

5.发送消息,选择的send方法有三个参数:

  • * 参数一:消息对象
  • * 参数二:消息队列的选择器
  • * 参数三:选择队列的业务标识 6.关闭生产者producer

顺序消息消费者代码实现步骤

1.创建消费者Consumer,制定消费者组名

2.指定Nameserver地址

3.订阅主题Topic和Tag

4.设置回调函数,处理消息:与普通消息的差别,这里用的是MessageListenerOrderly

5.启动消费者consumer

注意:消费者的 Topic 和 Tag 需要和生产者保持一致

第三节:顺序消息生产者


  
  1. public class Producer {
  2. public static void main(String[] args) throws Exception {
  3. // 1.创建消息生产者producer,并制定生产者组名
  4. DefaultMQProducer producer = new DefaultMQProducer("demo_producer_order_group");
  5. // 2.指定Nameserver地址
  6. producer.setNamesrvAddr("192.168.88.131:9876");
  7. // 3.启动producer
  8. producer.start();
  9. System.out.println("生产者启动");
  10. for (int i = 0; i < 20; i++) {
  11. // 4.创建消息对象,指定主题Topic、Tag和消息体
  12. Message msg = new Message("Topic_order_demo", "Tag_order_demo",
  13. ("Hello 虚竹,这是顺序消息" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
  14. // 5.发送消息
  15. /**
  16. * 参数一:消息对象
  17. * 参数二:消息队列的选择器
  18. * 参数三:选择队列的业务标识
  19. */
  20. SendResult result = producer.send(msg, new MessageQueueSelector() {
  21. /**
  22. *
  23. * @param mqs:队列集合
  24. * @param msg:消息对象
  25. * @param arg:业务标识的参数
  26. * @return
  27. */
  28. @Override
  29. public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
  30. Integer index = (Integer) arg;
  31. return mqs.get(index);
  32. }
  33. }, 1);
  34. System.out.println("发送结果:" + msg.toString());
  35. }
  36. // 6.关闭生产者producer
  37. producer.shutdown();
  38. System.out.println("生产者关闭");
  39. }
  40. }

效果:

0

第四节:顺序消息消费者


  
  1. public class Consumer {
  2. public static void main(String[] args) throws Exception {
  3. //1.创建消费者Consumer,制定消费者组名
  4. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_consumer_order_group");
  5. //2.指定Nameserver地址
  6. consumer.setNamesrvAddr("192.168.88.131:9876");
  7. //消息拉取最大条数
  8. consumer.setConsumeMessageBatchMaxSize(2);
  9. //3.订阅主题Topic和Tag
  10. consumer.subscribe("Topic_order_demo", "*");
  11. //4.设置回调函数,处理消息
  12. consumer.registerMessageListener(new MessageListenerOrderly() {
  13. //接受消息内容
  14. @Override
  15. public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext consumeOrderlyContext) {
  16. for (MessageExt msg : msgs) {
  17. try {
  18. //获取主题
  19. String topic = msg.getTopic();
  20. //获取标签
  21. String tags = msg.getTags();
  22. //获取信息
  23. byte[] body = msg.getBody();
  24. String result = new String(body, RemotingHelper.DEFAULT_CHARSET);
  25. System.out.println("Consumer消费信息:topic:" + topic + ",tags:"+tags+
  26. ",result"+ result);
  27. } catch (UnsupportedEncodingException e) {
  28. e.printStackTrace();
  29. //重试
  30. return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
  31. }
  32. }
  33. return ConsumeOrderlyStatus.SUCCESS;
  34. }
  35. });
  36. //5.启动消费者consumer
  37. consumer.start();
  38. }
  39. }

效果:

0

文章来源: xiaoxuzhu.blog.csdn.net,作者:小虚竹,版权归原作者所有,如需转载,请联系作者。

原文链接:xiaoxuzhu.blog.csdn.net/article/details/115596634

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。