手搭手RocketMQ重复消费问题

举报
QGS 发表于 2024/03/16 19:49:05 2024/03/16
【摘要】 手搭手RocketMQ重复消费问题

环境介绍

技术栈

springboot+mybatis-plus+mysql+rocketmq

软件

版本

mysql

8

IDEA

IntelliJ IDEA 2022.2.1

JDK

17

Spring Boot

3.1.7

dynamic-datasource

3.6.1

mybatis-plus

3.5.3.2

rocketmq

4.9.4


加入依赖

<dependencies>
    <dependency>
        <groupId>
org.springframework.boot</groupId>
        <artifactId>
spring-boot-starter-web</artifactId>
        <exclusions>
<!-- 排除logback依赖 -->
           
<exclusion>
                <groupId>
org.springframework.boot</groupId>
                <artifactId>
spring-boot-starter-logging</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

   
<!--Log4j2场景启动器 -->
   
<dependency>
        <groupId>
org.springframework.boot</groupId>
        <artifactId>
spring-boot-starter-log4j2</artifactId>
    </dependency>

    <dependency>
        <groupId>
com.baomidou</groupId>
        <artifactId>
mybatis-plus-boot-starter</artifactId>
        <version>
3.5.3</version>
        <exclusions>
            <exclusion>
                <groupId>
com.baomidou</groupId>
                <artifactId>
mybatis-plus-generator</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

    <dependency>
        <groupId>
com.mysql</groupId>
        <artifactId>
mysql-connector-j</artifactId>
        <scope>
runtime</scope>
    </dependency>
    <dependency>
        <groupId>
org.projectlombok</groupId>
        <artifactId>
lombok</artifactId>
        <optional>
true</optional>
    </dependency>
    <dependency>
        <groupId>
org.springframework.boot</groupId>
        <artifactId>
spring-boot-starter-test</artifactId>
        <scope>
test</scope>
    </dependency>

    <dependency>
        <groupId>
com.alibaba</groupId>
        <artifactId>
druid-spring-boot-starter</artifactId>
        <version>
1.1.14</version>
    </dependency>
    <dependency>
        <groupId>
com.baomidou</groupId>
        <artifactId>
dynamic-datasource-spring-boot-starter</artifactId>
        <version>
3.6.1</version>
    </dependency>
    <dependency>
        <groupId>
p6spy</groupId>
        <artifactId>
p6spy</artifactId>
        <version>
3.9.1</version>
    </dependency>

    <dependency>
        <groupId>
org.apache.rocketmq</groupId>
        <artifactId>
rocketmq-client</artifactId>
        <version>
4.9.2</version>
    </dependency>

</dependencies>



消息中间件的对比


消息中间件: activeMQ:java(jms协议),性能一般,吞吐量低。rabbitMQ:erlang(amqp协议),性能好,功能丰富,吞吐量一般。rocketMQ:java,性能好,吞吐量丰富,功能丰富。Kafka: scala,吞吐量最大,功能单一,大数据领域


RocketMQ 是阿里开源的分布式消息中间件,跟其它中间件相比,RocketMQ 的特点是纯JAVA实现,是一套提供了消息生产,存储,消费全过程API的软件系统。


RocketMQ的作用:数据收集、限流削峰、异步解耦

数据收集

分布式系统会产生海量级数据流,如:业务日志、监控数据、用户行为等。针对这些数据流进行实时或批量采集汇总,然后对这些数据流进行大数据分析,这是当前互联网平台的必备技术。通过MQ完成此类数据收集是最好的选择。


限流削峰

MQ可以将系统的超量请求暂存其中,以便系统后期可以慢慢进行处理,从而避免了请求的丢失或系统被压垮。


异步解耦

上游系统对下游系统的调用若为同步调用,则会大大降低系统的吞吐量与并发度,且系统耦合度太高、而异步调用则会解决这些问题。所以两层之间若要实现由同步到异步的转化,一般性做法就是,在这两层间添加一个MQ层。


rocketmq.apache.org

Broker:经纪人(经理人)

Topic主题:消息区分,分类,虚拟结构

Queue:消息队列


Apache RocketMQ 是一款低延迟、高并发、高可用、高可靠的分布式消息中间件。消息队列 RocketMQ 可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。


消费重复问题

1、 生产者多次投递

2、负载均衡模式消费者扩容时重试


解决办法

RocketMQ无法避免消息重复(Exactly-Once),所以如果业务对消费重复非常敏感,务必要在业务层面进行去重处理。可以借助关系数据库进行去重。首先需要确定消息的唯一键,可以是msgId,也可以是消息内容中的唯一标识字段,例如订单Id等。在消费之前判断唯一键是否在关系数据库中存在。如果不存在则插入,并消费,否则跳过。(实际过程要考虑原子性问题,判断是否存在可以尝试插入,如果报主键冲突,则插入失败,直接跳过)


msgId一定是全局唯一标识符,但是实际使用中,可能会存在相同的消息有两个不同msgId的情况(消费者主动重发、因客户端重投机制导致的重复等,需要控制消息的幂等性),这种情况就需要使业务字段进行重复消费。


幂等性:多次操作产生的结果和第一次操作产生的结果一致。


存储key可用mysql,oracle,redis等数据库做验证


本次解决方案为:将key插入Mysql数据库,创建唯一索引,插入成功执行业务逻辑,插入失败为重复消息


使用Myabtis-plus逆向工程

生产者

@Test

void repeatProducerTest()throws Exception{

    //创建生产者

    DefaultMQProducer producer = new DefaultMQProducer("repeatGroup");

    //连接namesrv

    producer.setNamesrvAddr("192.168.68.133:9876");

    //启动

    producer.start();

    //自身业务key唯一

    String Key = UUID.randomUUID().toString();

    System.out.println(Key);

    //创建消息

    Message message = new Message("repeatTopic", null,Key, "重复内存内容".getBytes());

    Message message2 = new Message("repeatTopic", null,Key, "重复内存内容".getBytes());

    //发送消息

    producer.send(message);

    producer.send(message2);

    System.out.println("发送成功");

    //关闭生产者

    producer.shutdown();

}

消费者

@Autowired

private OrderLogMapper orderMapper;



@Test

void repeatConsumerTest() throws Exception {

    //创建消费者

    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("repeatConsumerTest");

    //连接namesrv

    consumer.setNamesrvAddr("192.168.68.133:9876");

    //订阅主题   *表示该主题的所有消息

    consumer.subscribe("repeatTopic","*");



    //设置监听器(一直,异步回调方式) MessageListenerConcurrently并发模式

    consumer.registerMessageListener(new MessageListenerConcurrently() {

        //消费方法

        @Override

        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

            //业务处理

            //获取key

            for (MessageExt messageExt : msgs) {

                String key = messageExt.getKeys();

                //数据库中OrderID创建了唯一索引

                 OrderLog orderLog = new OrderLog();

                 orderLog.setType(1);

                 orderLog.setOrderid(key);

                 orderLog.setUsername("测试");



                 try {

                     orderMapper.insert(orderLog);

                 }catch (Exception e){

                     if(e instanceof SQLIntegrityConstraintViolationException){

                         e.printStackTrace();

                         System.out.println("重复消费");

                         //签收该消息

                         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

                     }



                 }

            }

            //CONSUME_SUCCESS成功  RECONSUME_LATER失败

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

        }

    });

    //启动

    consumer.start();

    //挂起当前jvm

    System.in.read();

    //关闭 consumer.shutdown();

}



可能遇到的问题

问题

***************************

APPLICATION FAILED TO START

***************************

 

Description:

 

Failed to configure a DataSource: 'url' attribute is not specified and no embedded datasource could be configured.

Reason: Failed to determine a suitable driver class

解决办法

1、方法一

<dependency>

<groupId>com.baomidou</groupId>

<artifactId>mybatis-plus-boot-starter</artifactId>

<version>3.5.4.1</version>

<exclusions>

<exclusion>

<groupId>com.baomidou</groupId>

<artifactId>mybatis-plus-generator</artifactId>

</exclusion>

</exclusions>

</dependency>

2、方法二

<dependency>
    <groupId>com.baomidou</groupId>
    <artifactId>mybatis-plus-boot-starter</artifactId>
    <version>3.5.3</version>
    <exclusions>
        <exclusion>
            <artifactId>mybatis-spring</artifactId>
            <groupId>org.mybatis</groupId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.mybatis</groupId>
    <artifactId>mybatis-spring</artifactId>
    <version>3.0.3</version>
</dependency>

3、方法三

升级dynamic-datasource-spring-boot-starter版本至3.6.1或最新版本




【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。