消息系统之 Kafka (03)

举报
幼儿园老大* 发表于 2025/01/31 22:10:53 2025/01/31
140 0 0
【摘要】 JAVA 消费消息导入依赖<!-- 导入 0.10.2 版本 Kafka --><dependency> <groupId>org.apache.Kafka</groupId> <artifactId>Kafka-clients</artifactId> <version>0.10.2.0</version></dependency>Copy to clipboardErr...

JAVA 消费消息

导入依赖

<!-- 导入 0.10.2 版本 Kafka -->
<dependency>
    <groupId>org.apache.Kafka</groupId>
    <artifactId>Kafka-clients</artifactId>
    <version>0.10.2.0</version>
</dependency>Copy to clipboardErrorCopied

配置消费者

在创建 Consumer 对象前,必须配置以下属性:

属性 含义 备注
bootstrap.servers Kafka broker 地址 如果有多个地址用逗号分割
group.id 所属消费组
key.deserializer key 的反序列化类 必须实现 Kafka 的 Serializer 接口
value.deserializer value 的反序列化类 必须实现 Kafka 的 Serializer 接口

开发者还可以选择配置如下属性:

属性 含义 备注
fetch.max.bytes consumer 端一次拉取数据的最大字节数
fetch.min.bytes consumer 端一次拉取数据的最大字节数,默认为 1B。
max.poll.records consumer 端一次拉取数据的最大条数,默认为 500。
fetch.max.wait.ms 服务器最大等待时间,默认为 500ms。超过时间后返回所有可用数据。

通过读取配置,即可生成 Consumer 对象。

Properties kafkaProps = new Properties();
kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.23.76:9092");                           
kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);                 
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);                     
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "duanjt_test");                                           
KafkaConsumer<String, String> KafkaConsumer = new KafkaConsumer<String, String>(kafkaProps);Copy to clipboardErrorCopied

订阅消息

消费者可以通过以下两种方式订阅 Topic:

  1. subscribe 方法:动态调整组内各个消费者与分区的关系,实现负载均衡。
  2. assign 方法:订阅确定的主题和分区。
// 订阅
consumer.subscribe(Collections.singletonList(Producer.topic));
consumer.assign(Collections.singletonList(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())));
// 解除订阅
consumer.unsubscribe();Copy to clipboardErrorCopied

拉取消息

Kafka Consumer 采用主动拉取消息系统数据 poll 的方式进行消费,可以对服务器的数据进行延迟处理。以防止消息系统向 Consumer 推送数据过多,导致 Consumer 积压而不堪重负的情况。为避免在服务器无数据的时候一直轮询, Kafka 在 poll 方法有参数允许消费者请求在长轮询中阻塞,等待数据到达。

获取到消息组 ConsumerRecords 后,内部包含多个 ConsumerRecord 对象,记录消息的 topic/partition/offset/key/value 信息。

// 每隔 1s 拉取一次数据
ConsumerRecords<String, String> records = KafkaConsumer.poll(1000);
// 打印数据
records.foreach(record -> {
    System.out.println(String.format("topic:%s,offset:%d,消息:%s", record.topic(), record.offset(), record.value()));
});Copy to clipboardErrorCopied

提交 Offset

对于消费者而言,异步模式下 committed offset 是落后于 current position 的。如果 consumer 挂掉,那么下一次消费数据又只会从 committed offset 的位置拉取数据,就会导致数据被重复消费。

消费者 offset 更新有以下两种方式:

  1. 自动提交 at-most-once

设置 enable.auto.commit=true(默认),更新的频率根据参数 auto.commit.interval.ms 来定,定时系统会根据当时 Consumer 收到的消息数量自动更新 offset 。

这可能导致两个问题:

  1. Consumer 程序崩溃,而 Offset 尚未更新。会重复消费部分数据。
  2. Consumer 程序崩溃,但 Offset 已被更新。已收到但未消费的数据永久丢失。
  1. 手动提交 at-least-once

设置 enable.auto.commit=false,Consumer 收到消息并消费后,再调用方法 consumer.commitSync() 手动更新 offset 。

如果消费失败,则 offset 也不会更新,此条消息会被重复消费。

消费示例

import java.util.Collections;
import java.util.Properties;
import org.apache.Kafka.clients.consumer.ConsumerConfig;
import org.apache.Kafka.clients.consumer.ConsumerRecord;
import org.apache.Kafka.clients.consumer.ConsumerRecords;
import org.apache.Kafka.clients.consumer.KafkaConsumer;
import org.apache.Kafka.common.serialization.StringDeserializer;

public class Consumer {

    public static void main(String[] args) {
        Properties p = new Properties();
        p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.23.76:9092");                           
        p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);                 
        p.put(ConsumerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringDeserializer.class);                     
        p.put(ConsumerConfig.GROUP_ID_CONFIG, "duanjt_test");                                           

        KafkaConsumer<String, String> KafkaConsumer = new KafkaConsumer<String, String>(p);
        KafkaConsumer.subscribe(Collections.singletonList("test"));

        while (true) {
            ConsumerRecords<String, String> records = KafkaConsumer.poll(100);
            records.foreach(record -> {
                System.out.println(String.format("topic:%s,offset:%d,消息:%s", record.topic(), record.offset(), record.value()));
            });
        }
    }
}
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

作者其他文章

评论(0

抱歉,系统识别当前为高风险访问,暂不支持该操作

    全部回复

    上滑加载中

    设置昵称

    在此一键设置昵称,即可参与社区互动!

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。