StringBoot集成Rabbit,根据业务返回ACK
        【摘要】 
                    为了维护消息的有效性,当消费消息时候处理失败时候,不进行消费,需要我们根据业务区返回ACK,本项目我使用Redis和ack机制双重保险,保障消息一定能够正确的消费 
首先,接着上部分内容,使用Topic,机制(不明白的,可以回顾上部分内容)上部分内容,我们使用SpringBoot注解,去实现,但是控制权不完全账务,当进行大规模项目时候,...
    
    
    
    为了维护消息的有效性,当消费消息时候处理失败时候,不进行消费,需要我们根据业务区返回ACK,本项目我使用Redis和ack机制双重保险,保障消息一定能够正确的消费
首先,接着上部分内容,使用Topic,机制(不明白的,可以回顾上部分内容)
上部分内容,我们使用SpringBoot注解,去实现,但是控制权不完全账务,当进行大规模项目时候,不太建议使用
  
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
      @RabbitListener(queues = TopicRabbitConfig.USER_QUEUE)
     
    
 
   - 
    
     
    
    
         @RabbitHandler
     
    
 
   - 
    
     
    
    
     
          public void processUser(String message) {
     
    
 
   - 
    
     
    
    
             threadPool.execute(new Runnable() {
     
    
 
   - 
    
     
    
    
                 @Override
     
    
 
   - 
    
     
    
    
     
                  public void run() {
     
    
 
   - 
    
     
    
    
                     logger.info("用户侧流水:{}",message);
     
    
 
   - 
    
     
    
    
     
                  }
     
    
 
   - 
    
     
    
    
     
              });
     
    
 
   - 
    
     
    
    
     
          }
     
    
 
  
 
根据源码分析,当然这里不分析源码,有兴趣的可以多失败几次就ok明白了
在配置类中定义监听器,监听这个序列(
AcknowledgeMode.MANUAL是必须的哦)
  
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
         /**
     
    
 
   - 
    
     
    
    
     
       * 接受消息的监听,这个监听客户交易流水的消息
     
    
 
   - 
    
     
    
    
     
       * 针对消费者配置
     
    
 
   - 
    
     
    
    
     
       * @return
     
    
 
   - 
    
     
    
    
     
       */
     
    
 
   - 
    
     
    
    
         @Bean
     
    
 
   - 
    
     
    
    
         public SimpleMessageListenerContainer messageContainer1(ConnectionFactory connectionFactory, TransactionConsumeImpl transactionConsume) {
     
    
 
   - 
    
     
    
    
             SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
     
    
 
   - 
    
     
    
    
     
              container.setQueues(queueMessage());
     
    
 
   - 
    
     
    
    
     
              container.setExposeListenerChannel(true);
     
    
 
   - 
    
     
    
    
     
              container.setMaxConcurrentConsumers(1);
     
    
 
   - 
    
     
    
    
     
              container.setConcurrentConsumers(1);
     
    
 
   - 
    
     
    
    
     
              container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认
     
    
 
   - 
    
     
    
    
     
              container.setMessageListener(transactionConsume);
     
    
 
   - 
    
     
    
    
             return container;
     
    
 
   - 
    
     
    
    
     
          }
     
    
 
   - 
    
     
    
    
      
     
    
 
  
 
 
这个 TransactionConsumeImpl 要继承ChannelAwareMessageListener,主要说的手动返回ACK就是channel。调用
  
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
     
      @Component
     
    
 
   - 
    
     
    
    
     
      public class TransactionConsumeImpl implements ChannelAwareMessageListener {
     
    
 
   - 
    
     
    
    
         private static final Logger logger = LoggerFactory.getLogger(TransactionConsumeImpl.class);
     
    
 
   - 
    
     
    
    
         private static final Gson gson = new Gson();
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
         @Autowired
     
    
 
   - 
    
     
    
    
     
          JedisShardInfo jedisShardInfo;
     
    
 
   - 
    
     
    
    
         @Autowired
     
    
 
   - 
    
     
    
    
     
          ExecutorService threadPool;
     
    
 
   - 
    
     
    
    
         @Autowired
     
    
 
   - 
    
     
    
    
     
          BoluomeFlowService boluomeFlowService;
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
         @Override
     
    
 
   - 
    
     
    
    
         public void onMessage(Message message, Channel channel) throws Exception {
     
    
 
   - 
    
     
    
    
             String boby = new String(message.getBody(), "utf-8");//转换消息,我们是使用json数据格式
     
    
 
   - 
    
     
    
    
     
              threadPool.execute(new Runnable() {   //多线程处理
     
    
 
   - 
    
     
    
    
                 @Override
     
    
 
   - 
    
     
    
    
                 public void run() {
     
    
 
   - 
    
     
    
    
                     Jedis jedis = jedisShardInfo.createResource();
     
    
 
   - 
    
     
    
    
     
                      jedis.sadd(TopicRabbitConfig.TRANSACTION_QUEUE, boby);//添加到key为当前消息类型的集合里面,防止丢失消息
     
    
 
   - 
    
     
    
    
                     BoluomeFlow flow = gson.fromJson(boby, BoluomeFlow.class);
     
    
 
   - 
    
     
    
    
                     String json = gson.toJson(flow);
     
    
 
   - 
    
     
    
    
                     if (boluomeFlowService.insert(flow)) {  //当添加成功时候返回成功
     
    
 
   - 
    
     
    
    
     
                          logger.info("客户交易流水添加1条记录:{}", json);
     
    
 
   - 
    
     
    
    
     
                          jedis.srem(TopicRabbitConfig.TRANSACTION_QUEUE, boby);//从当前消息类型集合中移除已经消费过的消息
     
    
 
   - 
    
     
    
    
                         try {
     
    
 
   - 
    
     
    
    
     
                              channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//手工返回ACK,通知此消息已经争取消费
     
    
 
   - 
    
     
    
    
     
                          } catch (IOException ie) {
     
    
 
   - 
    
     
    
    
     
                              logger.error("消费成功回调成功,io操作异常");
     
    
 
   - 
    
     
    
    
     
                          }
     
    
 
   - 
    
     
    
    
     
                      } else {
     
    
 
   - 
    
     
    
    
     
                          logger.info("客户交易流水添加失败记录:{}", json);
     
    
 
   - 
    
     
    
    
     
                      }
     
    
 
   - 
    
     
    
    
     
                  }
     
    
 
   - 
    
     
    
    
     
              });
     
    
 
   - 
    
     
    
    
     
          }
     
    
 
   - 
    
     
    
    
     
      }
     
    
 
   - 
    
     
    
    
      
     
    
 
  
 
 文章来源: springlearn.blog.csdn.net,作者:西魏陶渊明,版权归原作者所有,如需转载,请联系作者。
原文链接:springlearn.blog.csdn.net/article/details/102425287
        【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
            cloudbbs@huaweicloud.com
        
        
        
        
        
        
        - 点赞
 - 收藏
 - 关注作者
 
            
           
评论(0)