RocketMQ 第六章:手把手教老婆实现-批量消息生产者和消费者

举报
小虚竹 发表于 2021/10/19 00:53:50 2021/10/19
【摘要】 技术活,该赏关注+一键三连(点赞,评论,收藏)再看,养成好习惯 RocketMQ使用教程相关系列 目录 目录 第一节:介绍 限制: 第二节:批量消息-生产者和消息者步骤说明 批量消息生产者代码实现步骤 批量消息消费者代码实现步骤 第三节:批量消息生产者-小于4MB 效果: 第四节:批量消息消费者  效果...

  
  1. 技术活,该赏
  2. 关注+一键三连(点赞,评论,收藏)再看,养成好习惯

RocketMQ使用教程相关系列 目录


目录

第一节:介绍

限制:

第二节:批量消息-生产者和消息者步骤说明

批量消息生产者代码实现步骤

批量消息消费者代码实现步骤

第三节:批量消息生产者-小于4MB

效果:

第四节:批量消息消费者 

效果:

第五节:批量消息生产者-大于4MB

消息拆分工具类

生产者

效果:

第六节:批量消息消费者-大于4MB

效果:


第一节:介绍

批量发送消息能显著提高传递小消息的性能。

限制:

  • 应该有相同的topic,相同的waitStoreMsgOK
  • 不能是延时消息
  • 这一批消息的总大小不应超过4MB(默认配置:DefaultMQProducer的maxMessageSize参数,可在broker*.properties配置文件中修改)。

第二节:批量消息-生产者和消息者步骤说明

批量消息生产者代码实现步骤

1.创建消息生产者producer,并制定生产者组名

2.指定Nameserver地址

3.启动producer

4.创建消息对象集合,指定主题Topic、Tag和消息体

5.发送集合消息

6.关闭生产者producer

批量消息消费者代码实现步骤

1.创建消费者Consumer,制定消费者组名

2.指定Nameserver地址

3.订阅主题Topic和Tag

4.设置回调函数,处理消息

5.启动消费者consumer

注意:消费者的 Topic 和 Tag 需要和生产者保持一致

第三节:批量消息生产者-小于4MB


  
  1. public class Producer {
  2. public static void main(String[] args) throws Exception {
  3. // 1.创建消息生产者producer,并制定生产者组名
  4. DefaultMQProducer producer = new DefaultMQProducer("demo_producer_batch_group");
  5. // 2.指定Nameserver地址
  6. producer.setNamesrvAddr("192.168.88.131:9876");
  7. // 3.启动producer
  8. producer.start();
  9. System.out.println("生产者启动");
  10. List<Message> msgs = new ArrayList<Message>();
  11. // 4.创建消息对象,指定主题Topic、Tag和消息体
  12. /**
  13. * 参数一:消息主题Topic
  14. * 参数二:消息Tag
  15. * 参数三:消息内容
  16. */
  17. for (int i = 0; i < 20; i++) {
  18. Message msg = new Message("Topic_batch_demo", "Tag_batch_demo", ("Hello 虚竹,这是批量消息" + i).getBytes());
  19. msgs.add(msg);
  20. }
  21. // 5.发送消息
  22. SendResult result = producer.send(msgs);
  23. // 发送状态
  24. SendStatus status = result.getSendStatus();
  25. System.out.println("发送结果:" + result);
  26. // 线程睡1秒
  27. TimeUnit.SECONDS.sleep(1);
  28. // 6.关闭生产者producer
  29. producer.shutdown();
  30. System.out.println("生产者关闭");
  31. }
  32. }

效果:

0

第四节:批量消息消费者 


  
  1. public class Consumer {
  2. public static void main(String[] args) throws Exception {
  3. //1.创建消费者Consumer,制定消费者组名
  4. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_consumer_order_group");
  5. //2.指定Nameserver地址
  6. consumer.setNamesrvAddr("192.168.88.131:9876");
  7. //消息拉取最大条数
  8. consumer.setConsumeMessageBatchMaxSize(2);
  9. //3.订阅主题Topic和Tag
  10. consumer.subscribe("Topic_batch_demo", "*");
  11. //4.设置回调函数,处理消息
  12. consumer.registerMessageListener(new MessageListenerOrderly() {
  13. //接受消息内容
  14. @Override
  15. public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext consumeOrderlyContext) {
  16. for (MessageExt msg : msgs) {
  17. try {
  18. //获取主题
  19. String topic = msg.getTopic();
  20. //获取标签
  21. String tags = msg.getTags();
  22. //获取信息
  23. byte[] body = msg.getBody();
  24. String result = new String(body, RemotingHelper.DEFAULT_CHARSET);
  25. System.out.println("Consumer消费信息:topic:" + topic + ",tags:"+tags+
  26. ",result"+ result);
  27. } catch (UnsupportedEncodingException e) {
  28. e.printStackTrace();
  29. //重试
  30. return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
  31. }
  32. }
  33. return ConsumeOrderlyStatus.SUCCESS;
  34. }
  35. });
  36. //5.启动消费者consumer
  37. consumer.start();
  38. }
  39. }

效果:

0

第五节:批量消息生产者-大于4MB

消息拆分工具类

要发送大于4MB的消息,我们只需要把大的消息分裂成若干个小的消息。首先创建一个拆分消息的工具类


  
  1. public class ListSplitter implements Iterator<List<Message>> {
  2. private int SIZE_LIMIT = 1024 * 1024 * 4;
  3. private final List<Message> messages;
  4. private int currIndex;
  5. public ListSplitter(List<Message> messages) {
  6. this.messages = messages;
  7. }
  8. public ListSplitter(List<Message> messages, DefaultMQProducer mqProducer) {
  9. this.messages = messages;
  10. this.SIZE_LIMIT = mqProducer.getMaxMessageSize();
  11. }
  12. @Override
  13. public boolean hasNext() {
  14. return currIndex < messages.size();
  15. }
  16. @Override
  17. public List<Message> next() {
  18. int nextIndex = currIndex;
  19. int totalSize = 0;
  20. for (; nextIndex < messages.size(); nextIndex++) {
  21. Message message = messages.get(nextIndex);
  22. int tmpSize = message.getTopic().length() + message.getBody().length;
  23. Map<String, String> properties = message.getProperties();
  24. for (Map.Entry<String, String> entry : properties.entrySet()) {
  25. tmpSize += entry.getKey().length() + entry.getValue().length();
  26. }
  27. // 增加日志的开销20字节
  28. tmpSize = tmpSize + 20;
  29. if (tmpSize > SIZE_LIMIT) {
  30. // 单个消息超过了最大的限制
  31. // 忽略,否则会阻塞分裂的进程
  32. if (nextIndex - currIndex == 0) {
  33. // 假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环
  34. nextIndex++;
  35. }
  36. break;
  37. }
  38. if (tmpSize + totalSize > SIZE_LIMIT) {
  39. break;
  40. } else {
  41. totalSize += tmpSize;
  42. }
  43. }
  44. List<Message> subList = messages.subList(currIndex, nextIndex);
  45. currIndex = nextIndex;
  46. return subList;
  47. }
  48. @Override
  49. public void remove() {
  50. }
  51. }

生产者


  
  1. public class ProducerMoreThan4M {
  2. public static void main(String[] args) throws Exception {
  3. // 1.创建消息生产者producer,并制定生产者组名
  4. DefaultMQProducer producer = new DefaultMQProducer("demo_producer_batch_group");
  5. // 2.指定Nameserver地址
  6. producer.setNamesrvAddr("192.168.88.131:9876");
  7. // 3.启动producer
  8. producer.start();
  9. System.out.println("生产者启动");
  10. List<Message> msgs = new ArrayList<Message>();
  11. // 4.创建消息对象,指定主题Topic、Tag和消息体
  12. /**
  13. * 参数一:消息主题Topic
  14. * 参数二:消息Tag
  15. * 参数三:消息内容
  16. */
  17. for (int i = 0; i < 20; i++) {
  18. Message msg = new Message("Topic_batch_demo", "Tag_batch_demo", ("Hello 虚竹,这是批量消息" + i).getBytes());
  19. msgs.add(msg);
  20. }
  21. // 5.发送消息
  22. // 发送批量消息:把大的消息分裂成若干个小的消息
  23. ListSplitter splitter = new ListSplitter(msgs, producer);
  24. while (splitter.hasNext()) {
  25. try {
  26. List<Message> listItem = splitter.next();
  27. SendResult result = producer.send(listItem);
  28. System.out.println("发送结果:" + result);
  29. } catch (Exception e) {
  30. e.printStackTrace();
  31. // 处理error
  32. }
  33. }
  34. // 线程睡1秒
  35. TimeUnit.SECONDS.sleep(1);
  36. // 6.关闭生产者producer
  37. producer.shutdown();
  38. System.out.println("生产者关闭");
  39. }
  40. }

效果:

0

第六节:批量消息消费者-大于4MB

跟小于4MB的消费者代码一模一样,就不水字数了。

效果:

0

文章来源: xiaoxuzhu.blog.csdn.net,作者:小虚竹,版权归原作者所有,如需转载,请联系作者。

原文链接:xiaoxuzhu.blog.csdn.net/article/details/115600192

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。