RabbitMQ之幂等性问题处理

举报
yd_249383650 发表于 2023/10/30 16:16:21 2023/10/30
【摘要】 ​ 目录基本介绍RabbitMQ幂等性问题如何避免消息的重复消费问题?基本介绍消息消费时的幂等性(消息不被重复消费),同一个消息,第一次接收,正常处理业务,如果该消息第二次再接收,那就不能再处理业务,否则就处理重复了;幂等性:对于一个资源,不管你请求一次还是请求多次,对该资源本身造成的影响应该是相同的,不能因为重复的请求而对该资源重复造成影响;接口幂等性是指:一个接口用同样的参数反复调用,不...

 

目录

基本介绍

RabbitMQ幂等性问题

如何避免消息的重复消费问题?




基本介绍

消息消费时的幂等性(消息不被重复消费),同一个消息,第一次接收,正常处理业务,如果该消息第二次再接收,那就不能再处理业务,否则就处理重复了;

幂等性:对于一个资源,不管你请求一次还是请求多次,对该资源本身造成的影响应该是相同的,不能因为重复的请求而对该资源重复造成影响;

接口幂等性是指:一个接口用同样的参数反复调用,不会造成业务错误,那么这个接口就是具有幂等性的,比如:注册接口;发送短信验证码接口;还有比如同一个订单我支付两次,但是只会扣款一次,第二次支付不会扣款,这说明这个支付接口是具有幂等性的。

幂等性是分布式系统设计中的一个重要概念,是在做系统或者接口设计时要着重考虑的问题,尤其像支付宝、银行、互联网金融等涉及钱的系统,既要高效,数据也要准确,绝对不能出现多扣款,多打款等问题,幂等性的设计就显得更为重要了。

RabbitMQ幂等性问题

在RabbitMQ也回出现这个问题,RabbitMQ把消息发送给消费者消费,消费者消费成功并返回ack消息,但这时候网络中断了,RabbitMQ没有收到ack消息,这就让RabbitMQ误以为消息消费失败了,然后RabbitMQ会重新把该条消息发送给其他的消费者,或者等网络重连后再次发送给该候消费者,这时候就会造成重复消费的问题。

  • 生产者重复向RabbitMQ代理的消息队列发送消息

消息发送到代理并持久化后,由于网络断开或者客户端崩溃,代理未能回复客户端,导致生产者认为代理没有收到消息而重新发送,结果消费者收到两条具有相同内容和消息ID的消息

  • RabbitMQ代理向消费者重复传递消息

消息发送给消费者后,由于网络断开等原因,消费者客户端没有向broker返回ACK响应,代理不知道消息是否被消费,为了确保消息至少被消费一次,代理在网络恢复后再次传递消息,结果消费者就收到了两条具有相同内容和消息ID的消息。

如何避免消息的重复消费问题?

全局唯一ID + Redis

生产者在发送消息时,为每条消息设置一个全局唯一的messageId,消费者拿到消息后,使用setnx命令,将messageId作为key放到redis中:setnx(messageId, 1),若返回1,说明之前没有消费过,正常消费;若返回0,说明这条消息之前已消费过,抛弃;

@RabbitListener(queues = "durable-queue")
@RabbitHandler
public void processIde(Message message, Channel channel) throws IOException {
 
    if (stringRedisTemplate.opsForValue().setIfAbsent(message.getMessageProperties().getMessageId(),"1")){
        // 业务操作...
        System.out.println("消费消息:"+ new String(message.getBody(), "UTF-8"));
 
        // 手动确认
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

 Message

在消息传递的过程中,实际上传递的对象为 org.springframework.amqp.core.Message ,它主要由两部分组成:

  1. MessageProperties // 消息属性

  2. byte[] body // 消息内容

@RabbitListener

使用 @RabbitListener 注解标记方法,当监听到队列 debug 中有消息时则会进行接收并处理

  • 消息处理方法参数是由 MessageConverter 转化,若使用自定义 MessageConverter 则需要在 RabbitListenerContainerFactory 实例中去设置(默认 Spring 使用的实现是 SimpleRabbitListenerContainerFactory)

  • 消息的 content_type 属性表示消息 body 数据以什么数据格式存储,接收消息除了使用 Message 对象接收消息(包含消息属性等信息)之外,还可直接使用对应类型接收消息 body 内容,但若方法参数类型不正确会抛异常:

    • application/octet-stream:二进制字节数组存储,使用 byte[]
    • application/x-java-serialized-object:java 对象序列化格式存储,使用 Object、相应类型(反序列化时类型应该同包同名,否者会抛出找不到类异常)
    • text/plain:文本数据类型存储,使用 String
    • application/json:JSON 格式,使用 Object、相应类型




【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。