消息系统之 Kafka (03)
【摘要】 JAVA 消费消息导入依赖<!-- 导入 0.10.2 版本 Kafka --><dependency> <groupId>org.apache.Kafka</groupId> <artifactId>Kafka-clients</artifactId> <version>0.10.2.0</version></dependency>Copy to clipboardErr...
JAVA 消费消息
导入依赖
<!-- 导入 0.10.2 版本 Kafka -->
<dependency>
<groupId>org.apache.Kafka</groupId>
<artifactId>Kafka-clients</artifactId>
<version>0.10.2.0</version>
</dependency>Copy to clipboardErrorCopied
配置消费者
在创建 Consumer 对象前,必须配置以下属性:
属性 | 含义 | 备注 |
---|---|---|
bootstrap.servers |
Kafka broker 地址 | 如果有多个地址用逗号分割 |
group.id |
所属消费组 | |
key.deserializer |
key 的反序列化类 | 必须实现 Kafka 的 Serializer 接口 |
value.deserializer |
value 的反序列化类 | 必须实现 Kafka 的 Serializer 接口 |
开发者还可以选择配置如下属性:
属性 | 含义 | 备注 |
---|---|---|
fetch.max.bytes |
consumer 端一次拉取数据的最大字节数 | |
fetch.min.bytes |
consumer 端一次拉取数据的最大字节数,默认为 1B。 | |
max.poll.records |
consumer 端一次拉取数据的最大条数,默认为 500。 | |
fetch.max.wait.ms |
服务器最大等待时间,默认为 500ms。超过时间后返回所有可用数据。 |
通过读取配置,即可生成 Consumer 对象。
Properties kafkaProps = new Properties();
kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.23.76:9092");
kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "duanjt_test");
KafkaConsumer<String, String> KafkaConsumer = new KafkaConsumer<String, String>(kafkaProps);Copy to clipboardErrorCopied
订阅消息
消费者可以通过以下两种方式订阅 Topic:
- subscribe 方法:动态调整组内各个消费者与分区的关系,实现负载均衡。
- assign 方法:订阅确定的主题和分区。
// 订阅
consumer.subscribe(Collections.singletonList(Producer.topic));
consumer.assign(Collections.singletonList(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())));
// 解除订阅
consumer.unsubscribe();Copy to clipboardErrorCopied
拉取消息
Kafka Consumer 采用主动拉取消息系统数据 poll 的方式进行消费,可以对服务器的数据进行延迟处理。以防止消息系统向 Consumer 推送数据过多,导致 Consumer 积压而不堪重负的情况。为避免在服务器无数据的时候一直轮询, Kafka 在 poll 方法有参数允许消费者请求在长轮询中阻塞,等待数据到达。
获取到消息组 ConsumerRecords 后,内部包含多个 ConsumerRecord 对象,记录消息的 topic/partition/offset/key/value 信息。
// 每隔 1s 拉取一次数据
ConsumerRecords<String, String> records = KafkaConsumer.poll(1000);
// 打印数据
records.foreach(record -> {
System.out.println(String.format("topic:%s,offset:%d,消息:%s", record.topic(), record.offset(), record.value()));
});Copy to clipboardErrorCopied
提交 Offset
对于消费者而言,异步模式下 committed offset 是落后于 current position 的。如果 consumer 挂掉,那么下一次消费数据又只会从 committed offset 的位置拉取数据,就会导致数据被重复消费。
消费者 offset 更新有以下两种方式:
- 自动提交 at-most-once
设置 enable.auto.commit=true(默认),更新的频率根据参数 auto.commit.interval.ms 来定,定时系统会根据当时 Consumer 收到的消息数量自动更新 offset 。
这可能导致两个问题:
- Consumer 程序崩溃,而 Offset 尚未更新。会重复消费部分数据。
- Consumer 程序崩溃,但 Offset 已被更新。已收到但未消费的数据永久丢失。
- 手动提交 at-least-once
设置 enable.auto.commit=false,Consumer 收到消息并消费后,再调用方法 consumer.commitSync() 手动更新 offset 。
如果消费失败,则 offset 也不会更新,此条消息会被重复消费。
消费示例
import java.util.Collections;
import java.util.Properties;
import org.apache.Kafka.clients.consumer.ConsumerConfig;
import org.apache.Kafka.clients.consumer.ConsumerRecord;
import org.apache.Kafka.clients.consumer.ConsumerRecords;
import org.apache.Kafka.clients.consumer.KafkaConsumer;
import org.apache.Kafka.common.serialization.StringDeserializer;
public class Consumer {
public static void main(String[] args) {
Properties p = new Properties();
p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.23.76:9092");
p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
p.put(ConsumerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringDeserializer.class);
p.put(ConsumerConfig.GROUP_ID_CONFIG, "duanjt_test");
KafkaConsumer<String, String> KafkaConsumer = new KafkaConsumer<String, String>(p);
KafkaConsumer.subscribe(Collections.singletonList("test"));
while (true) {
ConsumerRecords<String, String> records = KafkaConsumer.poll(100);
records.foreach(record -> {
System.out.println(String.format("topic:%s,offset:%d,消息:%s", record.topic(), record.offset(), record.value()));
});
}
}
}
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)