Kafka生产者幂等性

举报
tea_year 发表于 2025/04/28 15:58:27 2025/04/28
【摘要】 Kafka通过幂等生产者(Idempotent Producer)机制来实现消息的幂等性,确保每条消息在Kafka中只被处理一次,即使在生产者重试发送的情况下也不会导致重复消息。以下是Kafka实现幂等性的详细说明:1. 幂等生产者的基本概念 幂等性(Idempotence):指操作的结果不会因为多次执行而改变。在Kafka中,幂等性确保每条消息在Topic的Partition中只被写入一次。

幂等性

1.1 简介

拿http举例来说,一次或多次请求,得到的响应是一致的(网络超时等问题除外),换句话说,就是执行多次操作与执行一次操作的影响是一样的。

image-20201228115544346.png

如果,某个系统是不具备幂等性的,如果用户重复提交了某个表格,就可能会造成不良影响。例如:用户在浏览器上点击了多次提交订单按钮,会在后台生成多个一模一样的订单。

1.2 Kafka生产者幂等性

image-20201228115938744.png

在生产者生产消息时,如果出现retry时,有可能会一条消息被发送了多次,如果Kafka不具备幂等性的,就有可能会在partition中保存多条一模一样的消息。

1.3 配置幂等性

props.put("enable.idempotence",true);

1.4 幂等性原理

为了实现生产者的幂等性,Kafka引入了 Producer ID(PID)和 Sequence Number的概念。

  • PID:每个Producer在初始化时,都会分配一个唯一的PID,这个PID对用户来说,是透明的。

  • Sequence Number:针对每个生产者(对应PID)发送到指定主题分区的消息都对应一个从0开始递增的Sequence Number。

image-20201228120129383.png

  1. 生产者消息重复的问题:

kafka生产者生产消息到partition,如果直接发送消息,kafka会将消息保存到分区,会返回一个ack给生产者,表示当前操作是否成功,是否已经保存了这条消息。如果ack响应的过程失败了,此时生产者会重试,继续发送没有发送成功的消息,kafka又会保存一条一模一样的消息。

  1. 在kafka中开启幂等性:

  • 当kafka的生产者生产消息时,会增加一个pid(生产者的唯一标识)和 sequence number(针对消息的一个递增序列)

  • 发送消息会带着pid和sequence number一块发送

  • kafka接收消息,会将pid和sequence number一块保存下来

  • 如果ack响应失败,生产者重试,再次发送消息时,kafka会根据pid和sequence number来决定是否需要保存消息。

  • 判断条件:生产者发送过来的sequence number是否小于等于partition中pid对应的sequence number.

    接下来剖析Kafka通过幂等生产者机制来实现消息的幂等性,确保每条消息在Kafka中只被处理一次,即使在生产者重试发送的情况下也不会导致重复消息。以下是Kafka实现幂等性的详细说明:

1. 幂等生产者的基本概念
幂等性(Idempotence):指操作的结果不会因为多次执行而改变。在Kafka中,幂等性确保每条消息在Topic的Partition中只被写入一次。

2. 幂等生产者的启用
要启用幂等生产者,需要在生产者配置中设置以下参数:

enable.idempotence: 设置为 true 以启用幂等生产者。
transactional.id: 可选参数,用于事务性生产者。如果需要事务支持,必须设置此参数。
示例配置:

bootstrap.servers=localhost:9092
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
enable.idempotence=true

3. 幂等生产者的内部机制
3.1. 生产者ID(Producer ID)
唯一标识:每个幂等生产者在启动时会从Kafka集群获取一个唯一的 Producer ID。
持久化:Producer ID 由Kafka集群持久化存储,确保在生产者重启后仍然有效。
3.2. 序列号(Sequence Number)
递增序列:每个Partition维护一个递增的序列号。
唯一性:每条消息在发送时会携带一个唯一的序列号,确保消息在Partition中的唯一性。
3.3. 请求重试
自动重试:幂等生产者会自动重试发送失败的消息。
幂等性保证:即使消息被重试多次,Kafka也会确保每条消息只被写入一次。
4. 幂等生产者的使用示例
4.1. 配置生产者

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class IdempotentProducerExample {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        try {
            for (int i = 0; i < 10; i++) {
                ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key-" + i, "value-" + i);
                producer.send(record, (metadata, exception) -> {
                    if (exception != null) {
                        exception.printStackTrace();
                    } else {
                        System.out.printf("Sent message to topic %s partition %d with offset %d%n",
                                metadata.topic(), metadata.partition(), metadata.offset());
                    }
                });
            }
        } finally {
            producer.close();
        }
    }
}


4.2. 验证幂等性
发送重复消息:即使生产者发送了重复的消息,Kafka也会确保每条消息只被写入一次。
验证结果:可以通过Kafka的消费者来验证Topic中的消息是否重复。
5. 幂等生产者的限制
单分区幂等性:幂等性保证的是单个Partition内的消息唯一性,而不是整个Topic。
顺序保证:幂等生产者保证消息在Partition内的顺序,但不保证消息在Topic内的全局顺序。
性能影响:启用幂等性可能会带来一定的性能开销,尤其是在高吞吐量的场景下。
6. 幂等生产者与事务性生产者的关系
幂等生产者:确保单条消息的幂等性。
事务性生产者:提供更高级的事务支持,确保多条消息的原子性。
6.1. 启用事务性生产者
transactional.id:必须设置此参数。
enable.idempotence:默认为 true,不需要显式设置。
6.2. 示例配置

bootstrap.servers=localhost:9092
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
transactional.id=my-transactional-id


6.3. 示例代码

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class TransactionalProducerExample {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id");

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        producer.initTransactions();

        try {
            producer.beginTransaction();
            producer.send(new ProducerRecord<>("my-topic", "key1", "value1"));
            producer.send(new ProducerRecord<>("my-topic", "key2", "value2"));
            producer.commitTransaction();
        } catch (Exception e) {
            producer.abortTransaction();
            e.printStackTrace();
        } finally {
            producer.close();
        }
    }
}

总结


Kafka通过幂等生产者机制确保每条消息在Partition中只被写入一次,即使在生产者重试发送的情况下也不会导致重复消息。启用幂等生产者需要设置 enable.idempotence=true,并且Kafka会自动处理消息的唯一性和顺序性。对于更高级的事务支持,可以使用事务性生产者,设置 transactional.id 参数。

通过合理配置和使用幂等生产者,可以有效避免因Rebalance或其他原因引起的重复消费问题,确保消息的可靠性和一致性。

【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。