五分钟带你玩转rabbitmq(四)延时重试队列

举报
小鲍侃java 发表于 2021/09/11 01:35:53 2021/09/11
【摘要】 如果只是网络抖动 出现异常那么直接进入死信队列 那么是不合理的 这就可以使用延时重试队列 原理: 1.发送到业务队里 如果正常收到 正常运行 2.如果处理失败 重试  并投入延时队列 如果超过延时时间 重新投入业务队列 3.如果重试次数大于3 那么进入死信队列 1.业务队列 @Configurationpub...

如果只是网络抖动 出现异常那么直接进入死信队列 那么是不合理的

这就可以使用延时重试队列

原理:

1.发送到业务队里 如果正常收到 正常运行

2.如果处理失败 重试  并投入延时队列 如果超过延时时间 重新投入业务队列

3.如果重试次数大于3 那么进入死信队列

1.业务队列


  
  1. @Configuration
  2. public class BusinessConfig {
  3. /**
  4. * yewu1模块direct交换机的名字
  5. */
  6. public static final String YEWU1_EXCHANGE = "yewu1_direct_exchange";
  7. /**
  8. * demo业务的队列名称
  9. */
  10. public static final String YEWU1_DEMO_QUEUE = "yewu1_demo_queue";
  11. /**
  12. * demo业务的routekey
  13. */
  14. public static final String YEWU1_DEMO_ROUTINGKEY = "yewu1_demo_key";
  15. /**
  16. * 业务交换机交换机(一个项目一个业务交换机即可)
  17. * 1.定义direct exchange,绑定queueTest
  18. * 2.durable="true" rabbitmq重启的时候不需要创建新的交换机
  19. * 3.direct交换器相对来说比较简单,匹配规则为:如果路由键匹配,消息就被投送到相关的队列
  20. * fanout交换器中没有路由键的概念,他会把消息发送到所有绑定在此交换器上面的队列中。
  21. * topic交换器你采用模糊匹配路由键的原则进行转发消息到队列中
  22. */
  23. @Bean
  24. public DirectExchange yewu1Exchange() {
  25. DirectExchange directExchange = new DirectExchange(YEWU1_EXCHANGE, true, false);
  26. return directExchange;
  27. }
  28. /**
  29. * 新建队列(一个业务需要一个队列一个routekey 命名格式 项目名-业务名)
  30. * 1.队列名称
  31. * 2.durable="true" 持久化 rabbitmq重启的时候不需要创建新的队列
  32. * 3.exclusive 表示该消息队列是否只在当前connection生效,默认是false
  33. * 4.auto-delete 表示消息队列没有在使用时将被自动删除 默认是false
  34. * 5.对nack或者发送超时的 发送给死信队列 args是绑定死信队列
  35. *
  36. */
  37. @Bean
  38. public Queue yewu1DemoQueue() {
  39. return new Queue(YEWU1_DEMO_QUEUE, true, false, false);
  40. }
  41. /**
  42. * 交换机与routekey绑定
  43. *
  44. * @return
  45. */
  46. @Bean
  47. public Binding yewu1DemoBinding() {
  48. return BindingBuilder.bind(yewu1DemoQueue()).to(yewu1Exchange())
  49. .with(YEWU1_DEMO_ROUTINGKEY);
  50. }
  51. }

2.延时队列


  
  1. @Configuration
  2. public class RetryConfig {
  3. /**
  4. * 延时队列 交换机配置标识符(固定)
  5. */
  6. public static final String RETRY_LETTER_QUEUE_KEY = "x-dead-letter-exchange";
  7. /**
  8. * 延时队列交换机绑定配置键标识符(固定)
  9. */
  10. public static final String RETRY_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";
  11. /**
  12. * 延时队列消息的配置超时时间枚举(固定)
  13. */
  14. public static final String RETRY_MESSAGE_TTL = "x-message-ttl";
  15. /**
  16. * yewu1模块延时队列交换机
  17. */
  18. public final static String YEWU1_RETRY_EXCHANGE_NAME = "yewu1_retry_exchange";
  19. /**
  20. * yewu1模块DEMO业务延时队列
  21. */
  22. public final static String YEWU1_DEMO_RETRY_QUEUE_NAME = "yewu1_demo_retry_queue";
  23. /**
  24. * yewu1模块DEMO延时队列routekey
  25. */
  26. public final static String YEWU1_DEMO_RETRY_ROUTING_KEY = "yewu1_demo_retry_key";
  27. /**
  28. * 延时队列交换机
  29. *
  30. * @return
  31. */
  32. @Bean
  33. public DirectExchange yewu1RetryExchange() {
  34. DirectExchange directExchange = new DirectExchange(YEWU1_RETRY_EXCHANGE_NAME, true, false);
  35. return directExchange;
  36. }
  37. /**
  38. * 新建延时队列 一个业务队列需要一个延时队列
  39. *
  40. * @return
  41. */
  42. @Bean
  43. public Queue yewu1DemoRetryQueue() {
  44. Map<String, Object> args = new ConcurrentHashMap<>(3);
  45. // 将消息重新投递到业务交换机Exchange中
  46. args.put(RETRY_LETTER_QUEUE_KEY, BusinessConfig.YEWU1_EXCHANGE);
  47. args.put(RETRY_LETTER_ROUTING_KEY, BusinessConfig.YEWU1_DEMO_ROUTINGKEY);
  48. // 消息在队列中延迟3s后超时,消息会重新投递到x-dead-letter-exchage对应的队列中,routingkey为自己指定
  49. args.put(RETRY_MESSAGE_TTL, 3 * 1000);
  50. return new Queue(YEWU1_DEMO_RETRY_QUEUE_NAME, true, false, false, args);
  51. }
  52. /**
  53. * 绑定以上定义关系
  54. *
  55. * @return
  56. */
  57. @Bean
  58. public Binding retryDirectBinding() {
  59. return BindingBuilder.bind(yewu1DemoRetryQueue()).to(yewu1RetryExchange())
  60. .with(YEWU1_DEMO_RETRY_ROUTING_KEY);
  61. }
  62. }

3.死信队列


  
  1. @Configuration
  2. public class DeadConfig {
  3. /**
  4. * 死信队列
  5. */
  6. public final static String FAIL_QUEUE_NAME = "fail_queue";
  7. /**
  8. * 死信交换机
  9. */
  10. public final static String FAIL_EXCHANGE_NAME = "fail_exchange";
  11. /**
  12. * 死信routing
  13. */
  14. public final static String FAIL_ROUTING_KEY = "fail_routing";
  15. /**
  16. * 创建配置死信队列
  17. *
  18. */
  19. @Bean
  20. public Queue deadQueue() {
  21. return new Queue(FAIL_QUEUE_NAME, true, false, false);
  22. }
  23. /**
  24. * 死信交换机
  25. *
  26. * @return
  27. */
  28. @Bean
  29. public DirectExchange deadExchange() {
  30. DirectExchange directExchange = new DirectExchange(FAIL_EXCHANGE_NAME, true, false);
  31. return directExchange;
  32. }
  33. /**
  34. * 绑定关系
  35. *
  36. * @return
  37. */
  38. @Bean
  39. public Binding failBinding() {
  40. return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(FAIL_ROUTING_KEY);
  41. }
  42. }

4.生产者


  
  1. @RestController
  2. @RequestMapping("/TestRabbit")
  3. public class ProducerDemo {
  4. @Resource
  5. private RabbitTemplate rabbitTemplate;
  6. //@RequestMapping("/sendDirect")
  7. String sendDirect(@RequestBody String message) throws Exception {
  8. System.out.println("开始生产");
  9. CorrelationData data = new CorrelationData(UUID.randomUUID().toString());
  10. rabbitTemplate.convertAndSend(BusinessConfig.YEWU1_EXCHANGE, BusinessConfig.YEWU1_DEMO_ROUTINGKEY,
  11. message, data);
  12. System.out.println("结束生产");
  13. System.out.println("发送id:" + data);
  14. return "OK,sendDirect:" + message;
  15. }
  16. }

5.消费者


  
  1. public enum RabbitEnum {
  2. /**
  3. * 处理成功
  4. */
  5. ACCEPT,
  6. /**
  7. * 可以重试的错误
  8. */
  9. RETRY,
  10. /**
  11. * 无需重试的错误
  12. */
  13. REJECT

  
  1. @Component
  2. public class ConsumerDemo {
  3. private final static Logger logger = LoggerFactory.getLogger(ConsumerDemo.class);
  4. @Resource
  5. private RabbitTemplate rabbitTemplate;
  6. // @RabbitListener(queues = "yewu1_demo_queue")
  7. protected void consumer(Message message, Channel channel) throws Exception {
  8. RabbitEnum ackSign = RabbitEnum.RETRY;
  9. System.out.println(message.getMessageProperties().getCorrelationId());
  10. try {
  11. // 可以加入重复消费判断
  12. int i = 1 / 0;
  13. } catch (Exception e) {
  14. ackSign = RabbitEnum.RETRY;
  15. throw e;
  16. } finally {
  17. // 通过finally块来保证Ack/Nack会且只会执行一次
  18. if (ackSign == RabbitEnum.ACCEPT) {
  19. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  20. } else if (ackSign == RabbitEnum.RETRY) {
  21. String correlationData =
  22. (String)message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");
  23. System.out.println(message.getMessageProperties().getCorrelationId());
  24. long retryCount = getRetryCount(message.getMessageProperties());
  25. if (retryCount >= 3) {
  26. // 重试次数超过3次,则将消息发送到失败队列等待特定消费者处理或者人工处理
  27. try {
  28. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  29. rabbitTemplate.convertAndSend(DeadConfig.FAIL_EXCHANGE_NAME, DeadConfig.FAIL_ROUTING_KEY,
  30. message, new CorrelationData(correlationData));
  31. logger.info("连续失败三次,将消息发送到死信队列,发送消息:" + new String(message.getBody()));
  32. } catch (Exception e1) {
  33. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
  34. logger.error("发送死信队列报错:" + e1.getMessage() + ",原始消息:" + new String(message.getBody()));
  35. }
  36. } else {
  37. try {
  38. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  39. // 重试次数不超过3次,则将消息发送到重试队列等待重新被消费
  40. rabbitTemplate.convertAndSend(RetryConfig.YEWU1_RETRY_EXCHANGE_NAME,
  41. RetryConfig.YEWU1_DEMO_RETRY_ROUTING_KEY, message,
  42. new CorrelationData(correlationData));
  43. logger.info("消费失败,消息发送到重试队列;" + "原始消息:" + new String(message.getBody()) + ";第"
  44. + (retryCount + 1) + "次重试");
  45. } catch (Exception e1) {
  46. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
  47. logger.error("消息发送到重试队列的时候,异常了:" + e1.getMessage() + ",重新发送消息");
  48. }
  49. }
  50. }
  51. }
  52. }
  53. /**
  54. * 获取消息被重试的次数
  55. */
  56. public long getRetryCount(MessageProperties messageProperties) {
  57. Long retryCount = 0L;
  58. if (null != messageProperties) {
  59. List<Map<String, ?>> deaths = messageProperties.getXDeathHeader();
  60. if (deaths != null && deaths.size() > 0) {
  61. Map<String, Object> death = (Map<String, Object>)deaths.get(0);
  62. retryCount = (Long)death.get("count");
  63. }
  64. }
  65. return retryCount;
  66. }
  67. }

参考:https://www.cnblogs.com/mfrank/p/11260355.html

          https://d.wps.cn/v/8niSQ

文章来源: baocl.blog.csdn.net,作者:小黄鸡1992,版权归原作者所有,如需转载,请联系作者。

原文链接:baocl.blog.csdn.net/article/details/106214436

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。