五分钟带你玩转rocketMQ(七)吐血总结延时队列,批处理,条件过滤与日志配置
【摘要】
一.延时队列
定时队列 -它们要在规定的时间之后才能传递
1.修改调用类即可
@RestController@RequestMapping("/test")public class TestControllor { private static final Logger logger = LoggerFactory.getL...
一.延时队列
定时队列 -它们要在规定的时间之后才能传递
1.修改调用类即可
-
@RestController
-
@RequestMapping("/test")
-
public class TestControllor {
-
private static final Logger logger = LoggerFactory.getLogger(TestControllor.class);
-
-
/**
-
* 使用RocketMq的生产者
-
*/
-
@Resource(name = "customRocketMQProducer")
-
private DefaultMQProducer defaultMQProducer;
-
-
@RequestMapping("/send")
-
public void send() throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
-
for (int i = 0; i < 100; i++) {
-
final int index = i;
-
String msg = "demo msg test";
-
logger.info("开始发送消息:" + msg);
-
Message sendMsg = new Message("DemoTopic", "TagA", String.valueOf(i), msg.getBytes());
-
//预设值的延迟时间间隔为:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h
-
//设置延时时间 3即为10s
-
sendMsg.setDelayTimeLevel(3);
-
//默认3秒超时
-
SendResult sendResult = defaultMQProducer.send(sendMsg);
-
logger.info("消息发送响应信息:" + sendResult.toString() + "当前为第" + i + "次");
-
}
-
}
-
}
二.批处理
成批发送消息提高了传递小消息的性能。
使用限制
同一批的消息应该有:相同的主题,相同的waitStoreMsgOK,并且不支持调度。
此外,一批消息的总大小不应超过1MiB。
如何使用批处理
如果一次只发送不超过1MiB的消息,则很容易使用批处理:
-
@RestController
-
@RequestMapping("/test")
-
public class TestControllor {
-
private static final Logger logger = LoggerFactory.getLogger(TestControllor.class);
-
-
/**
-
* 使用RocketMq的生产者
-
*/
-
@Resource(name = "customRocketMQProducer")
-
private DefaultMQProducer defaultMQProducer;
-
-
@RequestMapping("/send")
-
public void send() throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
-
//for (int i = 0; i < 100; i++) {
-
//final int index = i;
-
String msg = "demo msg test";
-
logger.info("开始发送消息:" + msg);
-
List<Message> messages = new ArrayList<>();
-
messages.add(new Message("DemoTopic", "TagA", "OrderID001", "Hello world 0".getBytes()));
-
messages.add(new Message("DemoTopic", "TagA", "OrderID002", "Hello world 1".getBytes()));
-
messages.add(new Message("DemoTopic", "TagA", "OrderID003", "Hello world 2".getBytes()));
-
//默认3秒超时
-
SendResult sendResult = defaultMQProducer.send(messages);
-
}
-
}
条件过滤
在使用时需要开启条件过滤启动配置 否则会报错
The broker does not support consumer to filter message by SQL92
修改broker.conf文件 加入
enablePropertyFilter = true
同时启动broker 命令为
The broker does not support consumer to filter message by SQL92
筛选固定条件消息
-
数值比较:>, >=, <, <=, BETWEEN, =;
-
字符比较 =, <>, IN;
-
IS NULL or IS NOT NULL;
-
逻辑:AND, OR, NOT;
使用限制:
只有push consumer可以通过SQL92选择消息。接口是:
public void subscribe(final String topic, final MessageSelector messageSelector)
代码示例
调用方法
-
@RestController
-
@RequestMapping("/test")
-
public class TestControllor {
-
private static final Logger logger = LoggerFactory.getLogger(TestControllor.class);
-
-
/**
-
* 使用RocketMq的生产者
-
*/
-
@Resource(name = "customRocketMQProducer")
-
private DefaultMQProducer defaultMQProducer;
-
-
@RequestMapping("/send")
-
public void send() throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
-
for (int i = 0; i < 100; i++) {
-
final int index = i;
-
String msg = "demo msg test";
-
logger.info("开始发送消息:" + msg);
-
Message sendMsg = new Message("DemoTopic",
-
"DemoTag",
-
("Hello RocketMQ " + i).getBytes()
-
);
-
//消费者根据a进行过滤
-
sendMsg.putUserProperty("a", String.valueOf(i));
-
SendResult sendResult = defaultMQProducer.send(sendMsg);
-
logger.info("消息发送响应信息:" + sendResult.toString() + "当前为第" + i + "次");
-
}
-
}
-
}
消费者
-
@SpringBootConfiguration
-
public class MQConsumerConfiguration {
-
-
public static final Logger LOGGER = LoggerFactory.getLogger(MQConsumerConfiguration.class);
-
@Value("${rocketmq.consumer.namesrvAddr}")
-
private String namesrvAddr;
-
@Value("${rocketmq.consumer.groupName}")
-
private String groupName;
-
@Value("${rocketmq.consumer.consumeThreadMin}")
-
private int consumeThreadMin;
-
@Value("${rocketmq.consumer.consumeThreadMax}")
-
private int consumeThreadMax;
-
@Value("${rocketmq.consumer.topics}")
-
private String topics;
-
@Value("${rocketmq.consumer.consumeMessageBatchMaxSize}")
-
private int consumeMessageBatchMaxSize;
-
@Autowired
-
private MQConsumeMsgListenerProcessor mqMessageListenerProcessor;
-
-
@Bean
-
public DefaultMQPushConsumer testRocketMQConsumer() throws Exception {
-
if (StringUtils.isEmpty(groupName)){
-
throw new Exception("groupName is null !!!");
-
}
-
if (StringUtils.isEmpty(namesrvAddr)){
-
throw new Exception("namesrvAddr is null !!!");
-
}
-
if(StringUtils.isEmpty(topics)){
-
throw new Exception("topics is null !!!");
-
}
-
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
-
consumer.setNamesrvAddr(namesrvAddr);
-
consumer.setConsumeThreadMin(consumeThreadMin);
-
consumer.setConsumeThreadMax(consumeThreadMax);
-
consumer.registerMessageListener(mqMessageListenerProcessor);
-
-
/**
-
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
-
* 如果非第一次启动,那么按照上次消费的位置继续消费
-
*/
-
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
-
/**
-
* 设置消费模型,集群还是广播,默认为集群
-
*/
-
consumer.setMessageModel(MessageModel.CLUSTERING);
-
/**
-
* 设置一次消费消息的条数,默认为1条
-
*/
-
consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
-
-
try {
-
//根据条件过滤消息
-
consumer.subscribe("DemoTopic", MessageSelector.bySql("a between 0 and 3"));
-
consumer.start();
-
LOGGER.info("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}",groupName,topics,namesrvAddr);
-
}catch (MQClientException e){
-
LOGGER.error("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}",groupName,topics,namesrvAddr,e);
-
throw new Exception(e);
-
}
-
return consumer;
-
}
-
}
日志配置
-
-Drocketmq.client.logRoot=E:\logs -Drocketmq.client.logLevel=ALL
-
-
-Drocketmq.client.logRoot=E:\logs -Drocketmq.client.logLevel=ERROR
文章来源: baocl.blog.csdn.net,作者:小黄鸡1992,版权归原作者所有,如需转载,请联系作者。
原文链接:baocl.blog.csdn.net/article/details/103426858
【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)