StringBoot集成Rabbit,根据业务返回ACK

举报
西魏陶渊明 发表于 2022/09/25 05:50:50 2022/09/25
【摘要】 为了维护消息的有效性,当消费消息时候处理失败时候,不进行消费,需要我们根据业务区返回ACK,本项目我使用Redis和ack机制双重保险,保障消息一定能够正确的消费 首先,接着上部分内容,使用Topic,机制(不明白的,可以回顾上部分内容)上部分内容,我们使用SpringBoot注解,去实现,但是控制权不完全账务,当进行大规模项目时候,...
为了维护消息的有效性,当消费消息时候处理失败时候,不进行消费,需要我们根据业务区返回ACK,本项目我使用Redis和ack机制双重保险,保障消息一定能够正确的消费
  • 首先,接着上部分内容,使用Topic,机制(不明白的,可以回顾上部分内容)

  • 上部分内容,我们使用SpringBoot注解,去实现,但是控制权不完全账务,当进行大规模项目时候,不太建议使用


  
  1. @RabbitListener(queues = TopicRabbitConfig.USER_QUEUE)
  2. @RabbitHandler
  3. public void processUser(String message) {
  4. threadPool.execute(new Runnable() {
  5. @Override
  6. public void run() {
  7. logger.info("用户侧流水:{}",message);
  8. }
  9. });
  10. }
  • 根据源码分析,当然这里不分析源码,有兴趣的可以多失败几次就ok明白了

  • 在配置类中定义监听器,监听这个序列(AcknowledgeMode.MANUAL是必须的哦)


  
  1. /**
  2. * 接受消息的监听,这个监听客户交易流水的消息
  3. * 针对消费者配置
  4. * @return
  5. */
  6. @Bean
  7. public SimpleMessageListenerContainer messageContainer1(ConnectionFactory connectionFactory, TransactionConsumeImpl transactionConsume) {
  8. SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
  9. container.setQueues(queueMessage());
  10. container.setExposeListenerChannel(true);
  11. container.setMaxConcurrentConsumers(1);
  12. container.setConcurrentConsumers(1);
  13. container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认
  14. container.setMessageListener(transactionConsume);
  15. return container;
  16. }

这个 TransactionConsumeImpl 要继承ChannelAwareMessageListener,主要说的手动返回ACK就是channel。调用


  
  1. @Component
  2. public class TransactionConsumeImpl implements ChannelAwareMessageListener {
  3. private static final Logger logger = LoggerFactory.getLogger(TransactionConsumeImpl.class);
  4. private static final Gson gson = new Gson();
  5. @Autowired
  6. JedisShardInfo jedisShardInfo;
  7. @Autowired
  8. ExecutorService threadPool;
  9. @Autowired
  10. BoluomeFlowService boluomeFlowService;
  11. @Override
  12. public void onMessage(Message message, Channel channel) throws Exception {
  13. String boby = new String(message.getBody(), "utf-8");//转换消息,我们是使用json数据格式
  14. threadPool.execute(new Runnable() { //多线程处理
  15. @Override
  16. public void run() {
  17. Jedis jedis = jedisShardInfo.createResource();
  18. jedis.sadd(TopicRabbitConfig.TRANSACTION_QUEUE, boby);//添加到key为当前消息类型的集合里面,防止丢失消息
  19. BoluomeFlow flow = gson.fromJson(boby, BoluomeFlow.class);
  20. String json = gson.toJson(flow);
  21. if (boluomeFlowService.insert(flow)) { //当添加成功时候返回成功
  22. logger.info("客户交易流水添加1条记录:{}", json);
  23. jedis.srem(TopicRabbitConfig.TRANSACTION_QUEUE, boby);//从当前消息类型集合中移除已经消费过的消息
  24. try {
  25. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//手工返回ACK,通知此消息已经争取消费
  26. } catch (IOException ie) {
  27. logger.error("消费成功回调成功,io操作异常");
  28. }
  29. } else {
  30. logger.info("客户交易流水添加失败记录:{}", json);
  31. }
  32. }
  33. });
  34. }
  35. }

文章来源: springlearn.blog.csdn.net,作者:西魏陶渊明,版权归原作者所有,如需转载,请联系作者。

原文链接:springlearn.blog.csdn.net/article/details/102425287

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。