RocketMQ第三章:手把手教老婆实现:普通消息(同步,异步和单向)的生产者和消费者

举报
小虚竹 发表于 2021/10/19 01:15:50 2021/10/19
6k+ 0 0
【摘要】 技术活,该赏关注+一键三连(点赞,评论,收藏)再看,养成好习惯 RocketMQ使用教程相关系列 目录 目录 第一节:项目准备 第二节:普通消息-生产者和消息者步骤说明 普通消息生产者代码实现步骤 普通消息消费者代码实现步骤 第三节:普通消息-同步消息 生产者 效果:  消费者  效果: 第...

      技术活,该赏
      关注+一键三连(点赞,评论,收藏)再看,养成好习惯
  
 

RocketMQ使用教程相关系列 目录


目录

第一节:项目准备

第二节:普通消息-生产者和消息者步骤说明

普通消息生产者代码实现步骤

普通消息消费者代码实现步骤

第三节:普通消息-同步消息

生产者

效果: 

消费者 

效果:

第四节:普通消息-异步消息

生产者

效果:

消费者: 

效果:

分析:

第五节:普通消息-单向消息

生产者

效果:

消费者 

效果:

分析:

第六节:总结


第一节:项目准备

rocketmq-client依赖


      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-client</artifactId>
      <version>4.4.0</version>
  
 

第二节:普通消息-生产者和消息者步骤说明

普通消息生产者代码实现步骤

  • 1.创建消息生产者producer,并制定生产者组名
  • 2.指定Nameserver地址
  • 3.启动producer
  • 4.创建消息对象 Message,指定主题Topic、Tag和消息体
  • 5.发送消息
  • 6.关闭生产者producer

普通消息消费者代码实现步骤

  • 1.创建消费者Consumer,制定消费者组名
  • 2.指定Nameserver地址
  • 3.订阅主题Topic和Tag
  • 4.设置回调函数,处理消息
  • 5.启动消费者consumer

注意:消费者的 Topic 和 Tag 需要和生产者保持一致

第三节:普通消息-同步消息

同步消息这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。

生产者


      public class ProducerSync {
        public static void main(String[] args) throws Exception {
           // 1.创建消息生产者producer,并制定生产者组名
            DefaultMQProducer producer = new DefaultMQProducer("demo_producer_group");
           // 2.指定Nameserver地址
            producer.setNamesrvAddr("192.168.88.131:9876");
           // 3.启动producer
            System.out.println("生产者启动");
            producer.start();
           for (int i = 0; i < 3; i++) {
              // 4.创建消息对象,指定主题Topic、Tag和消息体
              /**
       * 参数一:消息主题Topic
       * 参数二:消息Tag
       * 参数三:消息内容
       */
               Message msg = new Message("Topic_demo_sync", "Tag_demo_sync",
                     ("Hello 虚竹," + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
              // 5.发送同步消息
               SendResult sendResult = producer.send(msg);
               System.out.println("发送结果:" + sendResult);
            }
           // 6.关闭生产者producer
            producer.shutdown();
            System.out.println("生产者关闭");
         }
      }
  
 

效果: 

0

消费者 


      public class ConsumerSync {
        public static void main(String[] args) throws Exception {
           // 1.创建消费者Consumer,制定消费者组名
           DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_consumer_group");
           // 2.指定Nameserver地址
            consumer.setNamesrvAddr("192.168.88.131:9876");
           // 消息拉取最大条数
            consumer.setConsumeMessageBatchMaxSize(2);
           // 3.订阅主题Topic和Tag
            consumer.subscribe("Topic_demo_sync", "*");
           // 4.设置回调函数,处理消息
            consumer.registerMessageListener(new MessageListenerConcurrently() {
              // 接受消息内容
              @Override
              public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                 for (MessageExt msg : msgs) {
                    try {
                       // 获取主题
                       String topic = msg.getTopic();
                       // 获取标签
                       String tags = msg.getTags();
                       // 获取信息
                       byte[] body = msg.getBody();
                       String result = new String(body, RemotingHelper.DEFAULT_CHARSET);
                        System.out.println("Consumer消费信息:topic:" + topic + ",tags:" + tags + ",result:" + result);
                     } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                     }
                  }
                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
               }
            });
           // 5.启动消费者consumer
            consumer.start();
         }
      }
  
 

效果:

0

第四节:普通消息-异步消息

异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。

生产者


      public class ProducerASync {
        public static void main(String[] args) throws Exception {
           // 1.创建消息生产者producer,并制定生产者组名
            DefaultMQProducer producer = new DefaultMQProducer("demo_producer_group");
           // 2.指定Nameserver地址
            producer.setNamesrvAddr("192.168.88.131:9876");
           // 3.启动producer
            System.out.println("生产者启动");
            producer.start();
           for (int i = 0; i < 3; i++) {
              // 4.创建消息对象,指定主题Topic、Tag和消息体
              /**
       * 参数一:消息主题Topic
       * 参数二:消息Tag
       * 参数三:消息内容
       */
               Message msg = new Message("Topic_demo_async", "Tag_demo_async",
                     ("Hello 虚竹," + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
              // 发送消息到一个Broker,异步发送没有返回值,需要使用 SendCallback 接收异步返回结果的回调
               producer.send(msg, new SendCallback() {
                  @Override
                 public void onSuccess(SendResult sendResult) {
                     System.out.println("发送成功:" + sendResult);
                  }
                  @Override
                 public void onException(Throwable throwable) {
                     System.out.println("发送异常:" + throwable.getMessage());
                  }
               });
            }
            Thread.sleep(2000);
           // 6.关闭生产者producer
            producer.shutdown();
            System.out.println("生产者关闭");
         }
      }
  
 

效果:

0

消费者: 


      public class ConsumerASync {
        public static void main(String[] args) throws Exception {
           // 1.创建消费者Consumer,制定消费者组名
           DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_consumer_group");
           // 2.指定Nameserver地址
            consumer.setNamesrvAddr("192.168.88.131:9876");
           // 消息拉取最大条数
            consumer.setConsumeMessageBatchMaxSize(2);
           // 3.订阅主题Topic和Tag
            consumer.subscribe("Topic_demo_async", "*");
           // 4.设置回调函数,处理消息
            consumer.registerMessageListener(new MessageListenerConcurrently() {
              // 接受消息内容
              @Override
              public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                 for (MessageExt msg : msgs) {
                    try {
                       // 获取主题
                       String topic = msg.getTopic();
                       // 获取标签
                       String tags = msg.getTags();
                       // 获取信息
                       byte[] body = msg.getBody();
                       String result = new String(body, RemotingHelper.DEFAULT_CHARSET);
                        System.out.println("Consumer消费信息:topic:" + topic + ",tags:" + tags + ",result:" + result);
                     } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                     }
                  }
                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
               }
            });
           // 5.启动消费者consumer
            consumer.start();
         }
      }
  
 

效果:

0

分析:

从消费者代码中可以看出:异步消费者的实现方式和同步消费者实现方式并无区别。

从消费者的结果来看:可以看出异步消费者接收的结果是无序的。

第五节:普通消息-单向消息

这种方式主要用在不特别关心发送结果的场景,例如日志发送。

生产者


      public class Producer {
        public static void main(String[] args) throws Exception {
           // 1.创建消息生产者producer,并制定生产者组名
            DefaultMQProducer producer = new DefaultMQProducer("demo_producer_group");
           // 2.指定Nameserver地址
            producer.setNamesrvAddr("192.168.88.131:9876");
           // 3.启动producer
            producer.start();
            System.out.println("生产者启动");
           for (int i = 0; i < 20; i++) {
              // 4.创建消息对象,指定主题Topic、Tag和消息体
              /**
       * 参数一:消息主题Topic
       * 参数二:消息Tag
       * 参数三:消息内容
       */
               Message msg = new Message("Topic_demo", "Tag_demo",
                     ("Hello 虚竹," + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
              // 5.发送单向消息
               producer.sendOneway(msg);
               System.out.println("发送结果:" + msg);
              // 线程睡1秒
            }
           // 6.关闭生产者producer
            producer.shutdown();
            System.out.println("生产者关闭");
         }
      }
  
 

效果:

0

消费者 


      public class Consumer {
        public static void main(String[] args) throws Exception {
           // 1.创建消费者Consumer,制定消费者组名
           DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_consumer_group");
           // 2.指定Nameserver地址
            consumer.setNamesrvAddr("192.168.88.131:9876");
           // 消息拉取最大条数
            consumer.setConsumeMessageBatchMaxSize(2);
           // 3.订阅主题Topic和Tag
            consumer.subscribe("Topic_demo", "*");
           // 4.设置回调函数,处理消息
            consumer.registerMessageListener(new MessageListenerConcurrently() {
              // 接受消息内容
              @Override
              public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                 for (MessageExt msg : msgs) {
                    try {
                       // 获取主题
                       String topic = msg.getTopic();
                       // 获取标签
                       String tags = msg.getTags();
                       // 获取信息
                       byte[] body = msg.getBody();
                       String result = new String(body, RemotingHelper.DEFAULT_CHARSET);
                        System.out.println("Consumer消费信息:topic:" + topic + ",tags:" + tags + ",result:" + result);
                     } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                     }
                  }
                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
               }
            });
           // 5.启动消费者consumer
            consumer.start();
         }
      }
  
 

效果:

0

分析:

从消费结果来看,消费的顺序是无序的

第六节:总结

同步消息、异步消息和单向消息的消费者实现方式是一样的。

同步消息、异步消息和单向消息的区别在于消息的发送方。

异步消息生产者没有返回值,需要使用 SendCallback 接收异步返回结果的回调。

异步消息生产者,在关闭实例之前,建议进行休眠。

单向消息也是没有返回值的,并且它的消费者也是无序消费。

单向消息和异步消息的区别是单向消息不需要 SendCallback 来接收异步返回结果的回调。

文章来源: xiaoxuzhu.blog.csdn.net,作者:小虚竹,版权归原作者所有,如需转载,请联系作者。

原文链接:xiaoxuzhu.blog.csdn.net/article/details/115584781

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

作者其他文章

评论(0

抱歉,系统识别当前为高风险访问,暂不支持该操作

    全部回复

    上滑加载中

    设置昵称

    在此一键设置昵称,即可参与社区互动!

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。