RocketMQ第三章:手把手教老婆实现:普通消息(同步,异步和单向)的生产者和消费者

举报
小虚竹 发表于 2021/10/19 01:15:50 2021/10/19
【摘要】 技术活,该赏关注+一键三连(点赞,评论,收藏)再看,养成好习惯 RocketMQ使用教程相关系列 目录 目录 第一节:项目准备 第二节:普通消息-生产者和消息者步骤说明 普通消息生产者代码实现步骤 普通消息消费者代码实现步骤 第三节:普通消息-同步消息 生产者 效果:  消费者  效果: 第...

  
  1. 技术活,该赏
  2. 关注+一键三连(点赞,评论,收藏)再看,养成好习惯

RocketMQ使用教程相关系列 目录


目录

第一节:项目准备

第二节:普通消息-生产者和消息者步骤说明

普通消息生产者代码实现步骤

普通消息消费者代码实现步骤

第三节:普通消息-同步消息

生产者

效果: 

消费者 

效果:

第四节:普通消息-异步消息

生产者

效果:

消费者: 

效果:

分析:

第五节:普通消息-单向消息

生产者

效果:

消费者 

效果:

分析:

第六节:总结


第一节:项目准备

rocketmq-client依赖


  
  1. <groupId>org.apache.rocketmq</groupId>
  2. <artifactId>rocketmq-client</artifactId>
  3. <version>4.4.0</version>

第二节:普通消息-生产者和消息者步骤说明

普通消息生产者代码实现步骤

  • 1.创建消息生产者producer,并制定生产者组名
  • 2.指定Nameserver地址
  • 3.启动producer
  • 4.创建消息对象 Message,指定主题Topic、Tag和消息体
  • 5.发送消息
  • 6.关闭生产者producer

普通消息消费者代码实现步骤

  • 1.创建消费者Consumer,制定消费者组名
  • 2.指定Nameserver地址
  • 3.订阅主题Topic和Tag
  • 4.设置回调函数,处理消息
  • 5.启动消费者consumer

注意:消费者的 Topic 和 Tag 需要和生产者保持一致

第三节:普通消息-同步消息

同步消息这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。

生产者


  
  1. public class ProducerSync {
  2. public static void main(String[] args) throws Exception {
  3. // 1.创建消息生产者producer,并制定生产者组名
  4. DefaultMQProducer producer = new DefaultMQProducer("demo_producer_group");
  5. // 2.指定Nameserver地址
  6. producer.setNamesrvAddr("192.168.88.131:9876");
  7. // 3.启动producer
  8. System.out.println("生产者启动");
  9. producer.start();
  10. for (int i = 0; i < 3; i++) {
  11. // 4.创建消息对象,指定主题Topic、Tag和消息体
  12. /**
  13. * 参数一:消息主题Topic
  14. * 参数二:消息Tag
  15. * 参数三:消息内容
  16. */
  17. Message msg = new Message("Topic_demo_sync", "Tag_demo_sync",
  18. ("Hello 虚竹," + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
  19. // 5.发送同步消息
  20. SendResult sendResult = producer.send(msg);
  21. System.out.println("发送结果:" + sendResult);
  22. }
  23. // 6.关闭生产者producer
  24. producer.shutdown();
  25. System.out.println("生产者关闭");
  26. }
  27. }

效果: 

0

消费者 


  
  1. public class ConsumerSync {
  2. public static void main(String[] args) throws Exception {
  3. // 1.创建消费者Consumer,制定消费者组名
  4. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_consumer_group");
  5. // 2.指定Nameserver地址
  6. consumer.setNamesrvAddr("192.168.88.131:9876");
  7. // 消息拉取最大条数
  8. consumer.setConsumeMessageBatchMaxSize(2);
  9. // 3.订阅主题Topic和Tag
  10. consumer.subscribe("Topic_demo_sync", "*");
  11. // 4.设置回调函数,处理消息
  12. consumer.registerMessageListener(new MessageListenerConcurrently() {
  13. // 接受消息内容
  14. @Override
  15. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
  16. for (MessageExt msg : msgs) {
  17. try {
  18. // 获取主题
  19. String topic = msg.getTopic();
  20. // 获取标签
  21. String tags = msg.getTags();
  22. // 获取信息
  23. byte[] body = msg.getBody();
  24. String result = new String(body, RemotingHelper.DEFAULT_CHARSET);
  25. System.out.println("Consumer消费信息:topic:" + topic + ",tags:" + tags + ",result:" + result);
  26. } catch (UnsupportedEncodingException e) {
  27. e.printStackTrace();
  28. }
  29. }
  30. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  31. }
  32. });
  33. // 5.启动消费者consumer
  34. consumer.start();
  35. }
  36. }

效果:

0

第四节:普通消息-异步消息

异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。

生产者


  
  1. public class ProducerASync {
  2. public static void main(String[] args) throws Exception {
  3. // 1.创建消息生产者producer,并制定生产者组名
  4. DefaultMQProducer producer = new DefaultMQProducer("demo_producer_group");
  5. // 2.指定Nameserver地址
  6. producer.setNamesrvAddr("192.168.88.131:9876");
  7. // 3.启动producer
  8. System.out.println("生产者启动");
  9. producer.start();
  10. for (int i = 0; i < 3; i++) {
  11. // 4.创建消息对象,指定主题Topic、Tag和消息体
  12. /**
  13. * 参数一:消息主题Topic
  14. * 参数二:消息Tag
  15. * 参数三:消息内容
  16. */
  17. Message msg = new Message("Topic_demo_async", "Tag_demo_async",
  18. ("Hello 虚竹," + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
  19. // 发送消息到一个Broker,异步发送没有返回值,需要使用 SendCallback 接收异步返回结果的回调
  20. producer.send(msg, new SendCallback() {
  21. @Override
  22. public void onSuccess(SendResult sendResult) {
  23. System.out.println("发送成功:" + sendResult);
  24. }
  25. @Override
  26. public void onException(Throwable throwable) {
  27. System.out.println("发送异常:" + throwable.getMessage());
  28. }
  29. });
  30. }
  31. Thread.sleep(2000);
  32. // 6.关闭生产者producer
  33. producer.shutdown();
  34. System.out.println("生产者关闭");
  35. }
  36. }

效果:

0

消费者: 


  
  1. public class ConsumerASync {
  2. public static void main(String[] args) throws Exception {
  3. // 1.创建消费者Consumer,制定消费者组名
  4. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_consumer_group");
  5. // 2.指定Nameserver地址
  6. consumer.setNamesrvAddr("192.168.88.131:9876");
  7. // 消息拉取最大条数
  8. consumer.setConsumeMessageBatchMaxSize(2);
  9. // 3.订阅主题Topic和Tag
  10. consumer.subscribe("Topic_demo_async", "*");
  11. // 4.设置回调函数,处理消息
  12. consumer.registerMessageListener(new MessageListenerConcurrently() {
  13. // 接受消息内容
  14. @Override
  15. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
  16. for (MessageExt msg : msgs) {
  17. try {
  18. // 获取主题
  19. String topic = msg.getTopic();
  20. // 获取标签
  21. String tags = msg.getTags();
  22. // 获取信息
  23. byte[] body = msg.getBody();
  24. String result = new String(body, RemotingHelper.DEFAULT_CHARSET);
  25. System.out.println("Consumer消费信息:topic:" + topic + ",tags:" + tags + ",result:" + result);
  26. } catch (UnsupportedEncodingException e) {
  27. e.printStackTrace();
  28. }
  29. }
  30. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  31. }
  32. });
  33. // 5.启动消费者consumer
  34. consumer.start();
  35. }
  36. }

效果:

0

分析:

从消费者代码中可以看出:异步消费者的实现方式和同步消费者实现方式并无区别。

从消费者的结果来看:可以看出异步消费者接收的结果是无序的。

第五节:普通消息-单向消息

这种方式主要用在不特别关心发送结果的场景,例如日志发送。

生产者


  
  1. public class Producer {
  2. public static void main(String[] args) throws Exception {
  3. // 1.创建消息生产者producer,并制定生产者组名
  4. DefaultMQProducer producer = new DefaultMQProducer("demo_producer_group");
  5. // 2.指定Nameserver地址
  6. producer.setNamesrvAddr("192.168.88.131:9876");
  7. // 3.启动producer
  8. producer.start();
  9. System.out.println("生产者启动");
  10. for (int i = 0; i < 20; i++) {
  11. // 4.创建消息对象,指定主题Topic、Tag和消息体
  12. /**
  13. * 参数一:消息主题Topic
  14. * 参数二:消息Tag
  15. * 参数三:消息内容
  16. */
  17. Message msg = new Message("Topic_demo", "Tag_demo",
  18. ("Hello 虚竹," + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
  19. // 5.发送单向消息
  20. producer.sendOneway(msg);
  21. System.out.println("发送结果:" + msg);
  22. // 线程睡1秒
  23. }
  24. // 6.关闭生产者producer
  25. producer.shutdown();
  26. System.out.println("生产者关闭");
  27. }
  28. }

效果:

0

消费者 


  
  1. public class Consumer {
  2. public static void main(String[] args) throws Exception {
  3. // 1.创建消费者Consumer,制定消费者组名
  4. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_consumer_group");
  5. // 2.指定Nameserver地址
  6. consumer.setNamesrvAddr("192.168.88.131:9876");
  7. // 消息拉取最大条数
  8. consumer.setConsumeMessageBatchMaxSize(2);
  9. // 3.订阅主题Topic和Tag
  10. consumer.subscribe("Topic_demo", "*");
  11. // 4.设置回调函数,处理消息
  12. consumer.registerMessageListener(new MessageListenerConcurrently() {
  13. // 接受消息内容
  14. @Override
  15. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
  16. for (MessageExt msg : msgs) {
  17. try {
  18. // 获取主题
  19. String topic = msg.getTopic();
  20. // 获取标签
  21. String tags = msg.getTags();
  22. // 获取信息
  23. byte[] body = msg.getBody();
  24. String result = new String(body, RemotingHelper.DEFAULT_CHARSET);
  25. System.out.println("Consumer消费信息:topic:" + topic + ",tags:" + tags + ",result:" + result);
  26. } catch (UnsupportedEncodingException e) {
  27. e.printStackTrace();
  28. }
  29. }
  30. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  31. }
  32. });
  33. // 5.启动消费者consumer
  34. consumer.start();
  35. }
  36. }

效果:

0

分析:

从消费结果来看,消费的顺序是无序的

第六节:总结

同步消息、异步消息和单向消息的消费者实现方式是一样的。

同步消息、异步消息和单向消息的区别在于消息的发送方。

异步消息生产者没有返回值,需要使用 SendCallback 接收异步返回结果的回调。

异步消息生产者,在关闭实例之前,建议进行休眠。

单向消息也是没有返回值的,并且它的消费者也是无序消费。

单向消息和异步消息的区别是单向消息不需要 SendCallback 来接收异步返回结果的回调。

文章来源: xiaoxuzhu.blog.csdn.net,作者:小虚竹,版权归原作者所有,如需转载,请联系作者。

原文链接:xiaoxuzhu.blog.csdn.net/article/details/115584781

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。