RocketMQ(十):如何保证消息严格有序?
【摘要】 通俗易懂、图文结合从消息的发送、消费、一致性角度总结出RocketMQ保证消息严格有序的最佳实践
RocketMQ(十):如何保证消息严格有序?
在某些业务场景中,MQ需要保证消息的顺序性,比如支付订单应该在创建订单之后进行
如果不使用保证顺序的手段,由于多队列、网络等因素可能会导致先处理支付订单的消息再处理创建订单的消息,这样就会导致处理失败
为了避免这样的情况发生,使用MQ时有必要保证消息的顺序性,在RocketMQ中通常使用顺序发送消息和顺序消费消息来保证消息的顺序性
本篇文章就来描述RocketMQ下如何确保可靠的消息顺序性,思维导图如下:
往期回顾:
RocketMQ(六):Consumer Rebalanc原理(运行流程、触发时机、导致的问题)
RocketMQ(三):面对高并发请求,如何高效持久化消息?(核心存储文件、持久化核心原理、源码解析)
RocketMQ(二):揭秘发送消息核心原理(源码与设计思想解析)
RocketMQ(一):消息中间件缘起,一览整体架构及核心组件
顺序发送
当队列全局只有一个时,消息全局有序,此时只需要确保为单个生产者发送(多个生产者同时发送无法预估消息到达的顺序)
或者先生产创建订单的消息再生产支付订单的消息(确保消息不丢)由于全局有序只能有一个队列,队列的压力过大,所以不经常使用
更通用的做法是使用队列有序:在发送消息时通过一定的路由算法将需要有序的消息分发到同一个队列中,使用相同的队列保证有序性
当使用队列有序时也需要确保由一个生产者进行串行发送(先创建订单再支付这种情况下多生产者也是可以的,因为创建/支付订单虽然生产者可能不同,但能确保消息到达的情况下,消息也是有序的,满足因果一致性)
在RocketMQ中提供顺序消息的API,与发送其他消息的方法类似,只是参数增加队列选择器MessageQueueSelector和分片键(通常是业务唯一ID)
public void sendOrderMsg(String topic, String msg, SelectMessageQueueByHash selectMessageQueue, String orderId) {
Message message = new Message(topic, msg.getBytes());
message.setBuyerId(orderId);
try {
producer.send(message, selectMessageQueue, orderId);
} catch (MQClientException | RemotingException | InterruptedException | MQBrokerException e) {
throw new RuntimeException(e);
}
}
通常可以选择使用SelectMessageQueueByHash,通过对业务唯一ID进行哈希来分配队列,以此来达到消息的队列有序
在之前的分析发送消息的文章中曾说过发送消息通用流程,顺序消息调用的API虽然与普通消息通用的API不同(调用 sendSelectImpl
)
但流程类似:获取topic、队列数据,根据队列选择器选择队列,期间检测超时,最终调用核心方法 sendKernelImpl
进行发送消息
private SendResult sendSelectImpl(
Message msg,
MessageQueueSelector selector,
Object arg,
final CommunicationMode communicationMode,
final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
this.makeSureStateOK();
//参数校验
Validators.checkMessage(msg, this.defaultMQProducer);
//获取topic
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
MessageQueue mq = null;
try {
//获取队列
List<MessageQueue> messageQueueList =
mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());
Message userMessage = MessageAccessor.cloneMessage(msg);
String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());
userMessage.setTopic(userTopic);
//选择队列
mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));
} catch (Throwable e) {
throw new MQClientException("select message queue threw exception.", e);
}
//超时判断
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeout < costTime) {
throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");
}
if (mq != null) {
//发送消息
return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);
} else {
throw new MQClientException("select message queue return null.", null);
}
}
//失败
validateNameServerSetting();
throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
}
顺序发送API可以看成没有重试机制、可自定义选择队列的同步发送消息版本
SelectMessageQueueByHash的实现也比较简单,就是根据唯一ID哈希模上队列
public class SelectMessageQueueByHash implements MessageQueueSelector {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int value = arg.hashCode() % mqs.size();
if (value < 0) {
value = Math.abs(value);
}
return mqs.get(value);
}
}
总的来说如果要保证顺序发送消息,可以通过业务唯一ID与分片算法将消息放到相同队列,避免并发发送消息(无法预估到达队列顺序)
顺序消费
前文说过消费者消息消息时,为了全力以赴通常都是使用线程池进行并发消费的
当一批顺序消息被同时拉取到消费者时,如果由线程池并发进行消费也会导致消息的顺序性失效
因此在消费端也需要进行顺序消费,使用DefaultMQPushConsumer进行消费时,设置消息监听器为MessageListenerOrderly
在顺序消费的文章中也说过:设置消息监听器为MessageListenerOrderly时,会通过多种加锁的方式保证消费者顺序消费队列中的消息
但如果消费发生失败会阻塞队列导致消息堆积,因此需要注意特殊处理,比如重试次数超过阈值时就记录下来后续再处理
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
try {
for (MessageExt msg : msgs) {
// 获取消息的重试次数
int retryCount = msg.getReconsumeTimes();
System.out.println("Message [" + msg.getMsgId() + "] is reconsumed " + retryCount + " times");
//如果重试次数超过阈值 记录
if (retryCount >= 3) {
System.out.println("Message [" + msg.getMsgId() + "] add DB");
}
// 模拟消费失败
if (retryCount < 3) {
throw new RuntimeException("Consume failed");
}
// 消费成功
System.out.println("Message [" + msg.getMsgId() + "] consumed successfully");
}
return ConsumeOrderlyStatus.SUCCESS;
} catch (Exception e) {
// 记录日志
e.printStackTrace();
// 返回重试状态
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
});
总的来说:使用MessageListenerOrderly保证顺序消费时需要在失败重试时进行特殊处理,如果一直失败会阻塞队列导致消息堆积
一致性
在架构篇中层说过,每个Master Broker都可能负责Topic下不同队列(如图中TopicA的4个队列分布在Broker A和B)
当Broker发生宕机时队列数量会发生改变,可能会导致分片算法将同一ID的消息分发到不同队列从而导致极端情况影响消息的顺序性
如果队列数量不改变则投递到宕机中Broker的消息自然会失败,从而影响系统的可用性
类似于CAP理论,当发生宕机(分区容错 Partition Tolerance)时,要么保证可用性(Availability),要么保证一致性(Consistency)
如果要确保强一致性,需要将Topic中消息类型设置为FIFO,并开启NameServer中的配置 orderMessageEnable
和 returnOrderTopicConfigToBroker
为true
如果即要满足可用性、又要能支持队列数量的水平扩容,还需要确保消息严格顺序:
-
在分片算法上选择满足单调性(在队列数量动态发生变化时,已分配的key只会被分片到原队列或新队列)的算法(一致性哈希)
-
先进行扩容队列 -> 再消费旧队列中遗留的消息 -> 最后再开始消费所有队列,以此来保证消息的严格顺序(因为单调性分片算法还是可能会导致key被分片到其他队列,为了保证严格有序,要将旧队列中遗留消息优先消费,再消费新队列)
单调性:(比如有A、B、C三个旧队列,key被分片到B队列,新增三个D、E、F队列,key只能被分配到B或D或E或F队列,而不会被分配到A或C队列中)
总结
在需要保证消息有序的业务场景下,RocketMQ由顺序发送和顺序消费来保证消息顺序性
顺序发送通常使用队列有序来保证,将唯一的业务ID根据分片算法分发到对应的队列中,要注意避免并发发送,以免无法预估消息到达队列的先后顺序
顺序消费通常使用MessageListenerOrderly消息监听器,它通过加锁的方式顺序消费队列消息,保证消息有序消费但会降低消费吞吐量,并且失败会一直重试,可能导致消息堆积,需要特殊处理失败的情况
当Broker发生宕机时会导致队列数量发生改变,极端情况下会影响消息的顺序性,如果要保证强一致性需要设置Topic的消息类型为FIFO并开启NameServer中的配置 orderMessageEnable
和 returnOrderTopicConfigToBroker
为true
如果即要满足可用性、又要能支持队列数量的水平扩容,还需要确保消息严格顺序,可以在分片算法上选择满足单调性(在队列数量动态发生变化时,已分配的key只会被分片到原队列或新队列)的算法,并进行先扩容队列 -> 再消费旧队列中遗留的消息 -> 最后再开始消费所有队列,以此来保证消息的严格顺序
最后(点赞、收藏、关注求求啦~)
😊我是菜菜,热爱技术交流、分享与写作,喜欢图文并茂、通俗易懂的输出知识
📚在我的博客中,你可以找到Java技术栈的各个专栏:Java并发编程与JVM原理、Spring和MyBatis等常用框架及Tomcat服务器的源码解析,以及MySQL、Redis数据库的进阶知识,同时还提供关于消息中间件和Netty等主题的系列文章,都以通俗易懂的方式探讨这些复杂的技术点
🏆除此之外,我还是掘金优秀创作者、腾讯云年度影响力作者、华为云年度十佳博主…
👫我对技术交流、知识分享以及写作充满热情,如果你愿意,欢迎加我一起交流(vx:CaiCaiJava666),也可以持续关注我的公众号:菜菜的后端私房菜,我会分享更多技术干货,期待与更多志同道合的朋友携手并进,一同在这条充满挑战与惊喜的技术之旅中不断前行
🤝如果觉得菜菜写的不错,可以点赞、关注、收藏支持一下~
📖本篇文章被收入专栏 消息中间件,感兴趣的朋友可以持续关注~
📝本篇文章、笔记以及案例被收入 Gitee-CaiCaiJava、 Github-CaiCaiJava,除此之外还有更多Java进阶相关知识,感兴趣的朋友可以star持续关注~
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
作者其他文章
评论(0)