五分钟带你玩转rabbitmq(七)怎么保证消息不丢失
        【摘要】 
                    先来说明一个概念,什么是可靠投递呢?在RabbitMQ中,一个消息从生产者发送到RabbitMQ服务器,需要经历这么几个步骤: 
生产者准备好需要投递的消息。生产者与RabbitMQ服务器建立连接。生产者发送消息。RabbitMQ服务器接收到消息,并将其路由到指定队列。RabbitMQ服务器发起回调,告知生产者消息发送成功。
所谓可靠投...
    
    
    
    先来说明一个概念,什么是可靠投递呢?在RabbitMQ中,一个消息从生产者发送到RabbitMQ服务器,需要经历这么几个步骤:
- 生产者准备好需要投递的消息。
- 生产者与RabbitMQ服务器建立连接。
- 生产者发送消息。
- RabbitMQ服务器接收到消息,并将其路由到指定队列。
- RabbitMQ服务器发起回调,告知生产者消息发送成功。
所谓可靠投递,就是确保消息能够百分百从生产者发送到服务器。
队列存在的以下问题:消息丢失问题 重复消费问题 以下为解决点
1:队列持久化硬盘
丢失的过程就只有在内存发送到磁盘时会丢失消息 如果保存到磁盘后 重启服务消息不会丢失 但是会影效率
new Queue("demo_queue", true, false, false, args); 第二个参数为true
 2:手动ack
告知生产者消息成功/失败,否则,如果失败此队列会保持挂起状态,他们消息会等待。所以在消费完成之后通知生产者消费是否成功/失败,ack/nack
配置文件
  
   - 
    
     
    
    
       rabbitmq:
     
    
- 
    
     
    
    
         host: 192.168.xx.xx
     
    
- 
    
     
    
    
         port: 5672
     
    
- 
    
     
    
    
         username: root
     
    
- 
    
     
    
    
         password: root
     
    
- 
    
     
    
    
         virtual-host: /
     
    
- 
    
     
    
    
         listener:
     
    
- 
    
     
    
    
           simple:
     
    
- 
    
     
    
    
             acknowledge-mode: manual  #手动应答
     
    
- 
    
     
    
    
             prefetch: 1 # 每次只处理一个信息
     
    
- 
    
     
    
    
         publisher-confirms: true #开启消息确认机制
     
    
- 
    
     
    
    
         publisher-returns: true #支持消息发送失败返回队列
     
    
 
  
   - 
    
     
    
    
     
      @RabbitListener(queues = "demo_queue")
     
    
- 
    
     
    
    
         protected void consumerDead(Message message, Channel channel) throws Exception {
     
    
- 
    
     
    
    
     
              RabbitEnum ackSign = RabbitEnum.ACCEPT;
     
    
- 
    
     
    
    
             try {
     
    
- 
    
     
    
    
                 int i = 10 / 0;
     
    
- 
    
     
    
    
     
              } catch (Exception e) {
     
    
- 
    
     
    
    
     
                  ackSign = RabbitEnum.RETRY;
     
    
- 
    
     
    
    
                 throw e;
     
    
- 
    
     
    
    
     
              } finally {
     
    
- 
    
     
    
    
                 // 通过finally块来保证Ack/Nack会且只会执行一次
     
    
- 
    
     
    
    
                 if (ackSign == RabbitEnum.ACCEPT) {
     
    
- 
    
     
    
    
     
                      channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
     
    
- 
    
     
    
    
     
                  } else if (ackSign == RabbitEnum.RETRY) {
     
    
- 
    
     
    
    
     
                      channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
     
    
- 
    
     
    
    
     
                  }
     
    
- 
    
     
    
    
     
              }
     
    
- 
    
     
    
    
     
          }
     
    
 3:确认是否发送成功
判断消息是否发送到交换机
  
   - 
    
     
    
    
         @Bean
     
    
- 
    
     
    
    
         public RabbitTemplate rabbitTemplate() {
     
    
- 
    
     
    
    
     
              Logger logger = LoggerFactory.getLogger(RabbitTemplate.class);
     
    
- 
    
     
    
    
     
              RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
     
    
- 
    
     
    
    
     
              // 消息发送失败返回到队列中, yml需要配置 publisher-returns: true
     
    
- 
    
     
    
    
     
              rabbitTemplate.setMandatory(true);
     
    
- 
    
     
    
    
             // 发送消息确认, yml需要配置 publisher-confirms: true 消息是否发送的核心配置
     
    
- 
    
     
    
    
     
              rabbitTemplate.setConfirmCallback(msgSendConfirmCallBack());
     
    
- 
    
     
    
    
             // 消息返回, yml需要配置 publisher-returns: true 消息返回的配置
     
    
- 
    
     
    
    
     
              rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
     
    
- 
    
     
    
    
     
                  String correlationId = message.getMessageProperties().getCorrelationId().toString();
     
    
- 
    
     
    
    
     
                  logger.debug("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {} 路由键: {}", correlationId, replyCode, replyText, exchange,
     
    
- 
    
     
    
    
     
                      routingKey);
     
    
- 
    
     
    
    
     
              });
     
    
- 
    
     
    
    
             return rabbitTemplate;
     
    
- 
    
     
    
    
     
          }
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
         /**
     
    
- 
    
     
    
    
     
       * 确认发送消息是否成功
     
    
- 
    
     
    
    
     
       *
     
    
- 
    
     
    
    
     
       * @return
     
    
- 
    
     
    
    
     
       */
     
    
- 
    
     
    
    
         @Bean
     
    
- 
    
     
    
    
         public MsgSendConfirmCallBack msgSendConfirmCallBack() {
     
    
- 
    
     
    
    
             return new MsgSendConfirmCallBack();
     
    
- 
    
     
    
    
     
          }
     
    
 
  
   - 
    
     
    
    
     
      public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback {
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
         /**
     
    
- 
    
     
    
    
     
       * 回调方法
     
    
- 
    
     
    
    
     
       * @param correlationData
     
    
- 
    
     
    
    
     
       * @param ack
     
    
- 
    
     
    
    
     
       * @param cause
     
    
- 
    
     
    
    
     
       */
     
    
- 
    
     
    
    
     
          @Override
     
    
- 
    
     
    
    
         public void confirm(CorrelationData correlationData, boolean ack, String cause) {
     
    
- 
    
     
    
    
     
              System.out.println("MsgSendConfirmCallBack , 回调id:" + correlationData);
     
    
- 
    
     
    
    
             if (ack) {
     
    
- 
    
     
    
    
     
                  System.out.println("消息发送成功");
     
    
- 
    
     
    
    
     
              } else {
     
    
- 
    
     
    
    
                 //可以将消息写入本地,使用定时任务重新发送
     
    
- 
    
     
    
    
     
                  System.out.println("消息发送失败:" + cause + "\n重新发送");
     
    
- 
    
     
    
    
     
              }
     
    
- 
    
     
    
    
     
          }
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
     
      }
     
    
 4:集群化处理
5:异地容灾
6:发送消息持久化到db中 进行消息的重新发送
7:消费者消息固话到db中 通过消息id判断是否重复消费
参考:https://www.freesion.com/article/1880596463/
文章来源: baocl.blog.csdn.net,作者:小黄鸡1992,版权归原作者所有,如需转载,请联系作者。
原文链接:baocl.blog.csdn.net/article/details/113875988
        【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
            cloudbbs@huaweicloud.com
        
        
        
        
        
        
        - 点赞
- 收藏
- 关注作者
 
            
 
           
评论(0)