Rabbitmq业务流程包含容错排查

举报
西魏陶渊明 发表于 2022/09/25 02:35:31 2022/09/25
【摘要】 流程是这样的,订阅者,发送消息到test交换机,通过route key 分发到绑定的队列,这里涉及到交换机的类型,可以看我上一篇文章。如果没有匹配到这个routeKey就默认发送到AE交换机(fanout模式),这个交换机要设置internal:true意为内部交换机 。AE...
4279695-d63d0674ef86f3bd

流程是这样的,订阅者,发送消息到test交换机,通过route key 分发到绑定的队列,这里涉及到交换机的类型,可以看我上一篇文章。如果没有匹配到这个routeKey就默认发送到AE交换机(fanout模式),这个交换机要设置internal:true意为内部交换机 。AE交换机再把错误的消息,发送到其绑定的队列中,如果test交换机,发送消息被匹配到的队里中,而处理该队列的订阅者,拒绝了或者超时了处理,test交换机就将该消息发送到就死信交换机,然后到死信队列中

一、 进入死信队列(进入死信的三种方式)

  • 1.消息被拒绝(basic.reject or basic.nack)并且requeue=false
  • 2.消息TTL过期
  • 3.队列达到最大长度

代码演示


  
  1. - channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // 拒绝消息
  2. - true 发送给下一个消费者
  3. - false 谁都不接受,从队列中删除

Rabbit设置

  • 1.设置AE交换机 设置为内部交换机,模式为fanout
    当发送到正常交换机消息,没有被匹配到route key的消息对进到改交换机

  
  1. FanoutExchange fanoutExchange=new FanoutExchange("alter");
  2. fanoutExchange.setInternal(true);//设置为内部交换机,作为处理了非法的消息,无法匹配到route key的消息
- 为AE交换机绑定队列 `alter_message`

 
4279695-70450cc0d0ac98c8.png
  • 2.设置处理正常的交换机 test

绑定参数,设置没有匹配 route key 的消息发送到AE交换机 alternate-exchange

4279695-1843a88d2830c244.png
  • 3.添加正常的队列

    • hello 测试处理正常逻辑

    • task_queue 模拟被拒绝的消息
      添加超时时间和死信交换机和rk

    x-dead-letter-exchange: dead_letter_exchange

    x-dead-letter-routing-key: task_queue.fail

    x-message-ttl: 600

  • 4.设置死信交换机 dead_letter_exchange

    • 另外创建死信队列 dead
    • 绑定 route key task_queue.fail
4279695-61678db99c5de760.png
死信交换机
4279695-6d8c28ab844a3ae4.png
模拟死信队列

代码实例 Python


  
  1. import pika
  2. #认证,生产者
  3. credentials = pika.PlainCredentials('guest', 'guest')
  4. #链接rabbit服务器(localhost是本机,如果是其他服务器请修改为ip地址)
  5. connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1',5672,'/',credentials))
  6. #通过tcp协议获取一个连接
  7. channel = connection.channel()
  8. #声明一个对下列和贾环加
  9. #channel.queue_declare(queue='hello')
  10. #被hello接受了
  11. channel.basic_publish(exchange='test',
  12. routing_key='hello',
  13. body='Hello World!')
  14. #发送了一个没有匹配的消息,匹配到了alter_message
  15. channel.basic_publish(exchange='test',
  16. routing_key='hello12312',
  17. body='Hello World!')
  18. #模拟一条虽然能被匹配到,但是无法消费的消息,然后被发送到死信队列消息
  19. channel.basic_publish(exchange='test',
  20. routing_key='task_queue',
  21. body='Hello World!')
  • 正常队列


    4279695-a833fdff51361b21.png
  • 没有匹配到的到


    4279695-c96013fbcb16face.png
  • 被拒绝或者超时进入私信队列的


    4279695-6cf8cf917b9b3107.png

使用代码去创建队列和交换机 Java


  
  1. @Bean
  2. public ConnectionFactory connectionFactory() throws Exception {
  3. CachingConnectionFactory connectionFactory = new CachingConnectionFactory("127.0.0.1", 5672);
  4. connectionFactory.setUsername("liuxin");
  5. connectionFactory.setPassword("930914lx");
  6. connectionFactory.setVirtualHost("az");
  7. connectionFactory.setPublisherConfirms(true); // 必须要设置回调
  8. Channel channel = connectionFactory.createConnection().createChannel(false);
  9. //String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments
  10. Map<String, Object> arguments = new HashMap<>();
  11. arguments.put("internal",true);
  12. //String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
  13. //设置AE交换机
  14. channel.exchangeDeclare("alter", "fanout", false, false, false, arguments);
  15. channel.queueDeclare("alter_message", false, false, false, null);
  16. channel.queueBind("alter_message", "alter", "");
  17. //声明死信交换机并绑定
  18. channel.exchangeDeclare("dead_letter_exchange", "direct", false, false, null);
  19. channel.queueDeclare("dead", false, false, false, null);
  20. channel.queueBind("dead", "dead_letter_exchange", "task_queue.fail");
  21. arguments = new HashMap<>();
  22. arguments.put("alternate-exchange", "alter");//指定AE交换机
  23. channel.exchangeDeclare("test", "direct", false, false, arguments);
  24. //声明接受正式的队列,不需要参数
  25. channel.queueDeclare("hello", false, false, false, null);
  26. channel.queueBind("hello", "test", "hello");
  27. arguments = new HashMap<>();
  28. arguments.put("x-dead-letter-exchange", "dead_letter_exchange");
  29. arguments.put("x-dead-letter-routing-key", "task_queue.fail");
  30. arguments.put("x-message-ttl",6000);//6s没有被处理,就死了
  31. //设置测试死信队列的task_queue,推送该队列里面,被拒绝会到dead_letter_exchange,并最终到dead,routeKey,task_queue.fail 为并设置死信队列参数
  32. channel.queueDeclare("task_queue", false, false, false, arguments);
  33. channel.queueBind("task_queue", "test", "task_queue");
  34. return connectionFactory;
  35. }
  36. /**
  37. * 接受消息的监听,这个监听客户交易流水的消息
  38. * 针对消费者配置
  39. *
  40. * @return
  41. */
  42. @Bean
  43. public SimpleMessageListenerContainer messageContainer1(ConnectionFactory connectionFactory, PayMentConsumeImpl transactionConsume) {
  44. SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
  45. container.addQueueNames("hello");
  46. container.setExposeListenerChannel(true);
  47. container.setMaxConcurrentConsumers(8);
  48. container.setConcurrentConsumers(4);
  49. container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认,当设置了此模式,必须返回ACK,否则会进入死信队列
  50. container.setMessageListener(transactionConsume);
  51. container.setPrefetchCount(1000);
  52. return container;
  53. }

文章来源: springlearn.blog.csdn.net,作者:西魏陶渊明,版权归原作者所有,如需转载,请联系作者。

原文链接:springlearn.blog.csdn.net/article/details/102425324

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。