五分钟带你玩转rocketMQ(七)吐血总结延时队列,批处理,条件过滤与日志配置

举报
小鲍侃java 发表于 2021/09/10 22:12:27 2021/09/10
【摘要】 一.延时队列 定时队列 -它们要在规定的时间之后才能传递 1.修改调用类即可 @RestController@RequestMapping("/test")public class TestControllor { private static final Logger logger = LoggerFactory.getL...

一.延时队列

定时队列 -它们要在规定的时间之后才能传递

1.修改调用类即可


  
  1. @RestController
  2. @RequestMapping("/test")
  3. public class TestControllor {
  4. private static final Logger logger = LoggerFactory.getLogger(TestControllor.class);
  5. /**
  6. * 使用RocketMq的生产者
  7. */
  8. @Resource(name = "customRocketMQProducer")
  9. private DefaultMQProducer defaultMQProducer;
  10. @RequestMapping("/send")
  11. public void send() throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  12. for (int i = 0; i < 100; i++) {
  13. final int index = i;
  14. String msg = "demo msg test";
  15. logger.info("开始发送消息:" + msg);
  16. Message sendMsg = new Message("DemoTopic", "TagA", String.valueOf(i), msg.getBytes());
  17. //预设值的延迟时间间隔为:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h
  18. //设置延时时间 3即为10s
  19. sendMsg.setDelayTimeLevel(3);
  20. //默认3秒超时
  21. SendResult sendResult = defaultMQProducer.send(sendMsg);
  22. logger.info("消息发送响应信息:" + sendResult.toString() + "当前为第" + i + "次");
  23. }
  24. }
  25. }

二.批处理

成批发送消息提高了传递小消息的性能。

使用限制

同一批的消息应该有:相同的主题,相同的waitStoreMsgOK,并且不支持调度。

此外,一批消息的总大小不应超过1MiB。

如何使用批处理

如果一次只发送不超过1MiB的消息,则很容易使用批处理:


  
  1. @RestController
  2. @RequestMapping("/test")
  3. public class TestControllor {
  4. private static final Logger logger = LoggerFactory.getLogger(TestControllor.class);
  5. /**
  6. * 使用RocketMq的生产者
  7. */
  8. @Resource(name = "customRocketMQProducer")
  9. private DefaultMQProducer defaultMQProducer;
  10. @RequestMapping("/send")
  11. public void send() throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  12. //for (int i = 0; i < 100; i++) {
  13. //final int index = i;
  14. String msg = "demo msg test";
  15. logger.info("开始发送消息:" + msg);
  16. List<Message> messages = new ArrayList<>();
  17. messages.add(new Message("DemoTopic", "TagA", "OrderID001", "Hello world 0".getBytes()));
  18. messages.add(new Message("DemoTopic", "TagA", "OrderID002", "Hello world 1".getBytes()));
  19. messages.add(new Message("DemoTopic", "TagA", "OrderID003", "Hello world 2".getBytes()));
  20. //默认3秒超时
  21. SendResult sendResult = defaultMQProducer.send(messages);
  22. }
  23. }

条件过滤

在使用时需要开启条件过滤启动配置 否则会报错

The broker does not support consumer to filter message by SQL92

修改broker.conf文件 加入

enablePropertyFilter = true

 

同时启动broker 命令为

The broker does not support consumer to filter message by SQL92

 

筛选固定条件消息


  
  1. 数值比较:>, >=, <, <=, BETWEEN, =;
  2. 字符比较 =, <>, IN;
  3. IS NULL or IS NOT NULL;
  4. 逻辑:AND, OR, NOT;

使用限制:

只有push consumer可以通过SQL92选择消息。接口是:

public void subscribe(final String topic, final MessageSelector messageSelector)

 

代码示例

调用方法


  
  1. @RestController
  2. @RequestMapping("/test")
  3. public class TestControllor {
  4. private static final Logger logger = LoggerFactory.getLogger(TestControllor.class);
  5. /**
  6. * 使用RocketMq的生产者
  7. */
  8. @Resource(name = "customRocketMQProducer")
  9. private DefaultMQProducer defaultMQProducer;
  10. @RequestMapping("/send")
  11. public void send() throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  12. for (int i = 0; i < 100; i++) {
  13. final int index = i;
  14. String msg = "demo msg test";
  15. logger.info("开始发送消息:" + msg);
  16. Message sendMsg = new Message("DemoTopic",
  17. "DemoTag",
  18. ("Hello RocketMQ " + i).getBytes()
  19. );
  20. //消费者根据a进行过滤
  21. sendMsg.putUserProperty("a", String.valueOf(i));
  22. SendResult sendResult = defaultMQProducer.send(sendMsg);
  23. logger.info("消息发送响应信息:" + sendResult.toString() + "当前为第" + i + "次");
  24. }
  25. }
  26. }

消费者


  
  1. @SpringBootConfiguration
  2. public class MQConsumerConfiguration {
  3. public static final Logger LOGGER = LoggerFactory.getLogger(MQConsumerConfiguration.class);
  4. @Value("${rocketmq.consumer.namesrvAddr}")
  5. private String namesrvAddr;
  6. @Value("${rocketmq.consumer.groupName}")
  7. private String groupName;
  8. @Value("${rocketmq.consumer.consumeThreadMin}")
  9. private int consumeThreadMin;
  10. @Value("${rocketmq.consumer.consumeThreadMax}")
  11. private int consumeThreadMax;
  12. @Value("${rocketmq.consumer.topics}")
  13. private String topics;
  14. @Value("${rocketmq.consumer.consumeMessageBatchMaxSize}")
  15. private int consumeMessageBatchMaxSize;
  16. @Autowired
  17. private MQConsumeMsgListenerProcessor mqMessageListenerProcessor;
  18. @Bean
  19. public DefaultMQPushConsumer testRocketMQConsumer() throws Exception {
  20. if (StringUtils.isEmpty(groupName)){
  21. throw new Exception("groupName is null !!!");
  22. }
  23. if (StringUtils.isEmpty(namesrvAddr)){
  24. throw new Exception("namesrvAddr is null !!!");
  25. }
  26. if(StringUtils.isEmpty(topics)){
  27. throw new Exception("topics is null !!!");
  28. }
  29. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
  30. consumer.setNamesrvAddr(namesrvAddr);
  31. consumer.setConsumeThreadMin(consumeThreadMin);
  32. consumer.setConsumeThreadMax(consumeThreadMax);
  33. consumer.registerMessageListener(mqMessageListenerProcessor);
  34. /**
  35. * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
  36. * 如果非第一次启动,那么按照上次消费的位置继续消费
  37. */
  38. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
  39. /**
  40. * 设置消费模型,集群还是广播,默认为集群
  41. */
  42. consumer.setMessageModel(MessageModel.CLUSTERING);
  43. /**
  44. * 设置一次消费消息的条数,默认为1条
  45. */
  46. consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
  47. try {
  48. //根据条件过滤消息
  49. consumer.subscribe("DemoTopic", MessageSelector.bySql("a between 0 and 3"));
  50. consumer.start();
  51. LOGGER.info("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}",groupName,topics,namesrvAddr);
  52. }catch (MQClientException e){
  53. LOGGER.error("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}",groupName,topics,namesrvAddr,e);
  54. throw new Exception(e);
  55. }
  56. return consumer;
  57. }
  58. }

日志配置


  
  1. -Drocketmq.client.logRoot=E:\logs -Drocketmq.client.logLevel=ALL
  2. -Drocketmq.client.logRoot=E:\logs -Drocketmq.client.logLevel=ERROR

文章来源: baocl.blog.csdn.net,作者:小黄鸡1992,版权归原作者所有,如需转载,请联系作者。

原文链接:baocl.blog.csdn.net/article/details/103426858

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。