Java编程操作Kafka消息中间件
Apache Kafka,以其高性能、高吞吐量和可扩展性,成为大数据处理和实时数据流处理领域的首选消息队列。不同于传统消息中间件,Kafka以发布/订阅模式为核心,设计为分布式系统,特别适合处理大规模的数据流。本文将快速概览Kafka的基础概念、常见的陷阱与应对策略,并通过Java代码示例加深理解。
Kafka基础
Kafka由生产者、消费者、主题(Topics)和代理(Brokers)组成。生产者向特定主题发布消息,而消费者订阅这些主题来消费消息。Kafka的存储基于分区(Partitions),每个主题可分割成多个分区,这不仅提高了并发处理能力,也使得消息具有顺序性。
常见问题与易错点
1. 分区选择不当
分区数量不合理或分区策略不合适,会影响消息的分布均衡和消费速率。
避免方法:根据预期的吞吐量和消费者数量合理设置分区数。对于需保证消息顺序的应用,确保同类消息发送至同一分区。
2. 偏移量管理混乱
消费者偏移量管理不当,可能导致消息丢失或重复消费。
避免方法:利用Kafka自动提交偏移量的特性,或手动控制偏移量提交时机,确保消费进度的准确记录。
3. 资源与性能监控不足
忽视监控,可能导致资源耗尽或性能瓶颈未及时发现。
避免方法:利用Kafka自带的监控工具如Kafka Monitor,或集成外部监控系统,持续跟踪broker、topic和消费者的状态。
一、 同步生产消息到Kafka中
1.1.1 需求
接下来,我们将编写Java程序,将1-100的数字消息写入到Kafka中。
1.1.2 准备工作
1.1.2.1 导入Maven Kafka POM依赖
<repositories><!-- 代码库 -->
<repository>
<id>central</id>
<url>http://maven.aliyun.com/nexus/content/groups/public//</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
<updatePolicy>always</updatePolicy>
<checksumPolicy>fail</checksumPolicy>
</snapshots>
</repository>
</repositories>
<dependencies>
<!-- kafka客户端工具 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
<!-- 工具类 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-io</artifactId>
<version>1.3.2</version>
</dependency>
<!-- SLF桥接LOG4J日志 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.6</version>
</dependency>
<!-- SLOG4J日志 -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.16</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
1.1.2.2 导入log4j.properties
将log4j.properties配置文件放入到resources文件夹中
log4j.rootLogger=INFO,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p - %m%n
1.1.2.3 创建包和类
创建包cn.yh.kafka,并创建KafkaProducerTest类。
1.1.3 代码开发
可以参考以下方式来编写第一个Kafka示例程序
参考以下文档:http://kafka.apache.org/24/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
-
创建用于连接Kafka的Properties配置
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.88.100:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-
创建一个生产者对象KafkaProducer
-
调用send发送1-100消息到指定Topic test,并获取返回值Future,该对象封装了返回值
-
再调用一个Future.get()方法等待响应
-
关闭生产者
参考代码:
public class KafkaProducerTest {
public static void main(String[] args) {
// 1. 创建用于连接Kafka的Properties配置
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.88.100:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 2. 创建一个生产者对象KafkaProducer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
// 3. 调用send发送1-100消息到指定Topic test
for(int i = 0; i < 100; ++i) {
try {
// 获取返回值Future,该对象封装了返回值
Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>("test", null, i + ""));
// 调用一个Future.get()方法等待响应
future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
// 5. 关闭生产者
producer.close();
}
}
二、从Kafka的topic中消费消息
2.2.1 需求
从 test topic中,将消息都消费,并将记录的offset、key、value打印出来
2.2.2 准备工作
在cn.yh.kafka包下创建KafkaConsumerTest类
2.2.3 开发步骤
-
创建Kafka消费者配置
Properties props = new Properties();
props.setProperty("bootstrap.servers", "node1:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
-
创建Kafka消费者
-
订阅要消费的主题
-
使用一个while循环,不断从Kafka的topic中拉取消息
-
将记录(record)的offset、key、value都打印出来
5.2.4 参考代码
public class KafkaConsumerTest {
public static void main(String[] args) throws InterruptedException {
// 1.创建Kafka消费者配置
Properties props = new Properties();
props.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
// 消费者组(可以使用消费者组将若干个消费者组织到一起),共同消费Kafka中topic的数据
// 每一个消费者需要指定一个消费者组,如果消费者的组名是一样的,表示这几个消费者是一个组中的
props.setProperty("group.id", "test");
// 自动提交offset
props.setProperty("enable.auto.commit", "true");
// 自动提交offset的时间间隔
props.setProperty("auto.commit.interval.ms", "1000");
// 拉取的key、value数据的
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 2.创建Kafka消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
// 3. 订阅要消费的主题
// 指定消费者从哪个topic中拉取数据
kafkaConsumer.subscribe(Arrays.asList("test"));
// 4.使用一个while循环,不断从Kafka的topic中拉取消息
while(true) {
// Kafka的消费者一次拉取一批的数据
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(5));
// 5.将将记录(record)的offset、key、value都打印出来
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
// 主题
String topic = consumerRecord.topic();
// offset:这条消息处于Kafka分区中的哪个位置
long offset = consumerRecord.offset();
// key\value
String key = consumerRecord.key();
String value = consumerRecord.value();
System.out.println("topic: " + topic + " offset:" + offset + " key:" + key + " value:" + value);
}
// Thread.sleep(1000);
}
}
}
参考官网API文档:
http://kafka.apache.org/24/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
三、 异步使用带有回调函数方法生产消息
如果我们想获取生产者消息是否成功,或者成功生产消息到Kafka中后,执行一些其他动作。此时,可以很方便地使用带有回调函数来发送消息。
需求:
-
在发送消息出现异常时,能够及时打印出异常信息
-
在发送消息成功时,打印Kafka的topic名字、分区id、offset
public class KafkaProducerTest {
public static void main(String[] args) {
// 1. 创建用于连接Kafka的Properties配置
Properties props = new Properties();
props.put("bootstrap.servers", "node1:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 2. 创建一个生产者对象KafkaProducer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
// 3. 调用send发送1-100消息到指定Topic test
for(int i = 0; i < 100; ++i) {
// 一、同步方式
// 获取返回值Future,该对象封装了返回值
// Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>("test", null, i + ""));
// 调用一个Future.get()方法等待响应
// future.get();
// 二、带回调函数异步方式
producer.send(new ProducerRecord<String, String>("test", null, i + ""), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(exception != null) {
System.out.println("发送消息出现异常");
}
else {
String topic = metadata.topic();
int partition = metadata.partition();
long offset = metadata.offset();
System.out.println("发送消息到Kafka中的名字为" + topic + "的主题,第" + partition + "分区,第" + offset + "条数据成功!");
}
}
});
}
// 5. 关闭生产者
producer.close();
}
}
总结
本文详细介绍了如何在Java中使用Kafka,包括创建生产者和消费者的基本步骤,以及一些高级配置与优化建议。通过本文的学习,相信大家能够掌握基本的Kafka使用方法,并能在实际项目中应用。
- 点赞
- 收藏
- 关注作者
评论(0)