RocketMQ第四章:手把手教老婆实现-顺序消息生产者和顺序消息消费者

举报
小虚竹 发表于 2021/10/19 01:17:14 2021/10/19
8.7k+ 0 0
【摘要】 技术活,该赏关注+一键三连(点赞,评论,收藏)再看,养成好习惯 RocketMQ使用教程相关系列 目录 目录 第一节:介绍 顺序消息含义介绍 原理解析 第二节:顺序消息-生产者和消息者步骤说明 顺序消息生产者代码实现步骤 顺序消息消费者代码实现步骤 第三节:顺序消息生产者 效果: 第四节:顺序消息消费者 效果:...

      技术活,该赏
      关注+一键三连(点赞,评论,收藏)再看,养成好习惯
  
 

RocketMQ使用教程相关系列 目录


目录

第一节:介绍

顺序消息含义介绍

原理解析

第二节:顺序消息-生产者和消息者步骤说明

顺序消息生产者代码实现步骤

顺序消息消费者代码实现步骤

第三节:顺序消息生产者

效果:

第四节:顺序消息消费者

效果:


第一节:介绍

顺序消息含义介绍

顺序消息指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。

原理解析

在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);

而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。

但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。如下图所示:

当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。

第二节:顺序消息-生产者和消息者步骤说明

顺序消息生产者代码实现步骤

1.创建消息生产者producer并制定生产者组名

2.指定Nameserver地址

3.启动producer

4.创建消息对象,指定主题Topic、Tag和消息体

5.发送消息,选择的send方法有三个参数:

  • * 参数一:消息对象
  • * 参数二:消息队列的选择器
  • * 参数三:选择队列的业务标识 6.关闭生产者producer

顺序消息消费者代码实现步骤

1.创建消费者Consumer,制定消费者组名

2.指定Nameserver地址

3.订阅主题Topic和Tag

4.设置回调函数,处理消息:与普通消息的差别,这里用的是MessageListenerOrderly

5.启动消费者consumer

注意:消费者的 Topic 和 Tag 需要和生产者保持一致

第三节:顺序消息生产者


      public class Producer {
        public static void main(String[] args) throws Exception {
           // 1.创建消息生产者producer,并制定生产者组名
           DefaultMQProducer producer = new DefaultMQProducer("demo_producer_order_group");
           // 2.指定Nameserver地址
            producer.setNamesrvAddr("192.168.88.131:9876");
           // 3.启动producer
            producer.start();
            System.out.println("生产者启动");
           for (int i = 0; i < 20; i++) {
              // 4.创建消息对象,指定主题Topic、Tag和消息体
              Message msg = new Message("Topic_order_demo", "Tag_order_demo",
                     ("Hello 虚竹,这是顺序消息" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
              // 5.发送消息
              /**
       * 参数一:消息对象
       * 参数二:消息队列的选择器
       * 参数三:选择队列的业务标识
       */
              SendResult result = producer.send(msg, new MessageQueueSelector() {
                 /**
       *
       * @param mqs:队列集合
       * @param msg:消息对象
       * @param arg:业务标识的参数
       * @return
       */
                 @Override
                 public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    Integer index = (Integer) arg;
                    return mqs.get(index);
                  }
               }, 1);
               System.out.println("发送结果:" + msg.toString());
            }
           // 6.关闭生产者producer
            producer.shutdown();
            System.out.println("生产者关闭");
         }
      }
  
 

效果:

0

第四节:顺序消息消费者


      public class Consumer {
         public static void main(String[] args) throws Exception {
             //1.创建消费者Consumer,制定消费者组名
             DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_consumer_order_group");
             //2.指定Nameserver地址
              consumer.setNamesrvAddr("192.168.88.131:9876");
             //消息拉取最大条数
              consumer.setConsumeMessageBatchMaxSize(2);
             //3.订阅主题Topic和Tag
              consumer.subscribe("Topic_order_demo", "*");
             //4.设置回调函数,处理消息
              consumer.registerMessageListener(new MessageListenerOrderly() {
                 //接受消息内容
                 @Override
                 public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext consumeOrderlyContext) {
                     for (MessageExt msg : msgs) {
                         try {
                             //获取主题
                             String topic = msg.getTopic();
                             //获取标签
                             String tags = msg.getTags();
                             //获取信息
                             byte[] body =  msg.getBody();
                             String result = new String(body, RemotingHelper.DEFAULT_CHARSET);
                              System.out.println("Consumer消费信息:topic:" + topic + ",tags:"+tags+
                                     ",result"+ result);
                          } catch (UnsupportedEncodingException e) {
                              e.printStackTrace();
                             //重试
                             return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                          }
                      }
                     return ConsumeOrderlyStatus.SUCCESS;
                  }
              });
             //5.启动消费者consumer
              consumer.start();
          }
      }
  
 

效果:

0

文章来源: xiaoxuzhu.blog.csdn.net,作者:小虚竹,版权归原作者所有,如需转载,请联系作者。

原文链接:xiaoxuzhu.blog.csdn.net/article/details/115596634

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

作者其他文章

评论(0

抱歉,系统识别当前为高风险访问,暂不支持该操作

    全部回复

    上滑加载中

    设置昵称

    在此一键设置昵称,即可参与社区互动!

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。