五分钟带你玩转rabbitmq(七)怎么保证消息不丢失

举报
小鲍侃java 发表于 2021/09/09 23:09:29 2021/09/09
【摘要】 先来说明一个概念,什么是可靠投递呢?在RabbitMQ中,一个消息从生产者发送到RabbitMQ服务器,需要经历这么几个步骤: 生产者准备好需要投递的消息。生产者与RabbitMQ服务器建立连接。生产者发送消息。RabbitMQ服务器接收到消息,并将其路由到指定队列。RabbitMQ服务器发起回调,告知生产者消息发送成功。 所谓可靠投...

先来说明一个概念,什么是可靠投递呢?在RabbitMQ中,一个消息从生产者发送到RabbitMQ服务器,需要经历这么几个步骤:

  1. 生产者准备好需要投递的消息。
  2. 生产者与RabbitMQ服务器建立连接。
  3. 生产者发送消息。
  4. RabbitMQ服务器接收到消息,并将其路由到指定队列。
  5. RabbitMQ服务器发起回调,告知生产者消息发送成功。

所谓可靠投递,就是确保消息能够百分百从生产者发送到服务器。

队列存在的以下问题:消息丢失问题 重复消费问题 以下为解决点

1:队列持久化硬盘

丢失的过程就只有在内存发送到磁盘时会丢失消息 如果保存到磁盘后 重启服务消息不会丢失 但是会影效率

new Queue("demo_queue", true, false, false, args); 第二个参数为true
 

2:手动ack

告知生产者消息成功/失败,否则,如果失败此队列会保持挂起状态,他们消息会等待。所以在消费完成之后通知生产者消费是否成功/失败,ack/nack

配置文件


  
  1. rabbitmq:
  2. host: 192.168.xx.xx
  3. port: 5672
  4. username: root
  5. password: root
  6. virtual-host: /
  7. listener:
  8. simple:
  9. acknowledge-mode: manual #手动应答
  10. prefetch: 1 # 每次只处理一个信息
  11. publisher-confirms: true #开启消息确认机制
  12. publisher-returns: true #支持消息发送失败返回队列

  
  1. @RabbitListener(queues = "demo_queue")
  2. protected void consumerDead(Message message, Channel channel) throws Exception {
  3. RabbitEnum ackSign = RabbitEnum.ACCEPT;
  4. try {
  5. int i = 10 / 0;
  6. } catch (Exception e) {
  7. ackSign = RabbitEnum.RETRY;
  8. throw e;
  9. } finally {
  10. // 通过finally块来保证Ack/Nack会且只会执行一次
  11. if (ackSign == RabbitEnum.ACCEPT) {
  12. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  13. } else if (ackSign == RabbitEnum.RETRY) {
  14. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
  15. }
  16. }
  17. }

3:确认是否发送成功

判断消息是否发送到交换机 


  
  1. @Bean
  2. public RabbitTemplate rabbitTemplate() {
  3. Logger logger = LoggerFactory.getLogger(RabbitTemplate.class);
  4. RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
  5. // 消息发送失败返回到队列中, yml需要配置 publisher-returns: true
  6. rabbitTemplate.setMandatory(true);
  7. // 发送消息确认, yml需要配置 publisher-confirms: true 消息是否发送的核心配置
  8. rabbitTemplate.setConfirmCallback(msgSendConfirmCallBack());
  9. // 消息返回, yml需要配置 publisher-returns: true 消息返回的配置
  10. rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
  11. String correlationId = message.getMessageProperties().getCorrelationId().toString();
  12. logger.debug("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {} 路由键: {}", correlationId, replyCode, replyText, exchange,
  13. routingKey);
  14. });
  15. return rabbitTemplate;
  16. }
  17. /**
  18. * 确认发送消息是否成功
  19. *
  20. * @return
  21. */
  22. @Bean
  23. public MsgSendConfirmCallBack msgSendConfirmCallBack() {
  24. return new MsgSendConfirmCallBack();
  25. }

  
  1. public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback {
  2. /**
  3. * 回调方法
  4. * @param correlationData
  5. * @param ack
  6. * @param cause
  7. */
  8. @Override
  9. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  10. System.out.println("MsgSendConfirmCallBack , 回调id:" + correlationData);
  11. if (ack) {
  12. System.out.println("消息发送成功");
  13. } else {
  14. //可以将消息写入本地,使用定时任务重新发送
  15. System.out.println("消息发送失败:" + cause + "\n重新发送");
  16. }
  17. }
  18. }

4:集群化处理 

5:异地容灾

6:发送消息持久化到db中 进行消息的重新发送

7:消费者消息固话到db中 通过消息id判断是否重复消费

参考:https://www.freesion.com/article/1880596463/

文章来源: baocl.blog.csdn.net,作者:小黄鸡1992,版权归原作者所有,如需转载,请联系作者。

原文链接:baocl.blog.csdn.net/article/details/113875988

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。