rabbitmq整个消息投递的路径

举报
酸菜鱼. 发表于 2022/11/30 23:29:45 2022/11/30
【摘要】 rabbitmq整个消息投递的路径是producer—>rabbitmq broker—>exchange—>queue—>consumer。生产者将消息投递到Broker时产生confirm状态,会出现二种情况,ack:表示已经被Broker签收。nack:表示表示已经被Broker拒收,原因可能有队列满了,限流,IO异常等。生产者将消息投递到Broker,被Broker签收,但是没有对应...

rabbitmq整个消息投递的路径是producer—>rabbitmq broker—>exchange—>queue—>consumer。
生产者将消息投递到Broker时产生confirm状态,会出现二种情况,ack:表示已经被Broker签收。nack:表示表示已经被Broker拒收,原因可能有队列满了,限流,IO异常等。生产者将消息投递到Broker,被Broker签收,但是没有对应的队列进行投递,将消息回退给生产者会产生return状态。这二种状态是rabbitmq提供的消息可靠投递机制,生产者开启确认模式和退回模式。使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到queue失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息退回给producer。消费者在rabbit:listener-container标签中设置acknowledge属性,设置ack方式 none:自动确认,manual:手动确认。none自动确认模式很危险,当生产者发送多条消息,消费者接收到一条信息时,会自动认为当前发送的消息已经签收了,这个时候消费者进行业务处理时出现了异常情况,也会认为消息已经正常签收处理了,而队列里面显示都被消费掉了。所以真实开发都会改为手动签收,可以防止消息丢失。消费者如果在消费端没有出现异常,则调用channel.basicAck方法确认签收消息。消费者如果出现异常,则在catch中调用 basicNack或 basicReject,拒绝消息,让MQ重新发送消息。通过一系列的操作,可以保证消息的可靠投递以及防止消息丢失的情况。

然后说一下rocketmq,生产者使用事务消息机制保证消息零丢失,第一步就是确保Producer发送消息到了Broker这个过程不会丢消息。发送half消息给rocketmq,这个half消息是在生产者操作前发送的,对下游服务的消费者是不可见的。这个消息主要是确认RocketMQ的服务是否正常,通知RocketMQ,马上要发一个消息了,做好准备。half消息如果写入失败就认为MQ的服务是有问题的,这个时候就不能通知下游服务了,给生产者的操作加上一个状态标记,然后等待MQ服务正常后再进行补偿操作,等MQ服务正常后重新下单通知下游服务。然后执行本地事务,比如说下了个订单,把下单数据写入到mysql,返回本地事务状态给rocketmq,在这个过程中,如果写入数据库失败,可能是数据库崩了,需要等一段时间才能恢复,这个时候把订单一直标记为"新下单"的状态,订单的消息先缓存起来,比如Redis、文本或者其他方式,然后给RocketMQ返回一个未知状态,未知状态的事务状态回查是由RocketMQ的Broker主动发起的,RocketMQ过一段时间来回查事务状态,在回查事务状态的时候,再尝试把数据写入数据库,如果数据库这时候已经恢复了,继续后面的业务。而且即便这个时候half消息写入成功后RocketMQ挂了,只要存储的消息没有丢失,等RocketMQ恢复后,RocketMQ就会再次继续状态回查的流程。第二步就是确保Broker接收到的消息不会丢失,因为RocketMQ为了减少磁盘的IO,会先将消息写入到os缓存中,不是直接写入到磁盘里面,消费者从os缓存中获取消息,类似于从内存中获取消息,速度更快,过一段时间会由os线程异步的将消息刷入磁盘中,此时才算真正完成了消息的持久化。在这个过程中,如果消息还没有完成异步刷盘,RocketMQ中的Broker宕机的话,就会导致消息丢失。所以第二步,消息支持持久化到Commitlog里面,即使宕机后重启,未消费的消息也是可以加载出来的。把RocketMQ的刷盘方式 flushDiskType配置成同步刷盘,一旦同步刷盘返回成功,可以保证接收到的消息一定存储在本地的内存中。采用主从机构,集群部署,Leader中的数据在多个Follower中都存有备份,防止单点故障,同步复制可以保证即使Master 磁盘崩溃,消息仍然不会丢失。但是这里还会有一个问题,主从结构是只做数据备份,没有容灾功能的。也就是说当一个master节点挂了后,slave节点是无法切换成master节点继续提供服务的。所以在RocketMQ4.5以后的版本支持Dledge,DLedger是基于Raft协议选举Leader Broker的,当master节点挂了后,Dledger会接管Broker的CommitLog消息存储 ,在Raft协议中进行多台机器的Leader选举,发起一轮一轮的投票,通过多台机器互相投票选出来一个Leader,完成master节点往slave节点的消息同步。数据同步会通过两个阶段,一个是uncommitted阶段,一个是commited阶段。Leader Broker上的Dledger收到一条数据后,会标记为uncommitted状态,然后他通过自己的DledgerServer组件把这个uncommitted数据发给Follower Broker的DledgerServer组件。接着Follower Broker的DledgerServer收到uncommitted消息之后,必须返回一个ack给Leader Broker的Dledger。然后如果Leader Broker收到超过半数的Follower Broker返回的ack之后,就会把消息标记为committed状态。再接下来, Leader Broker上的DledgerServer就会发送committed消息给Follower Broker上的DledgerServer,让他们把消息也标记为committed状态。这样,就基于Raft协议完成了两阶段的数据同步。第三步,Cunmser确保拉取到的消息被成功消费,就需要消费者不要使用异步消费,有可能造成消息状态返回后消费者本地业务逻辑处理失败造成消息丢失的可能。用同步消费方式,消费者端先处理本地事务,然后再给MQ一个ACK响应,这时MQ就会修改Offset,将消息标记为已消费,不再往其他消费者推送消息,在Broker的这种重新推送机制下,消息是不会在传输过程中丢失的。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。