Java编程操作Kafka消息中间件

举报
tea_year 发表于 2025/04/27 20:34:58 2025/04/27
【摘要】 5.1 同步生产消息到Kafka中5.1.1 需求接下来,我们将编写Java程序,将1-100的数字消息写入到Kafka中。5.1.2 准备工作5.1.2.1 导入Maven Kafka POM依赖<repositories><!-- 代码库 --> <repository> <id>central</id> <url>http://maven.aliyun...

Apache Kafka,以其高性能、高吞吐量和可扩展性,成为大数据处理和实时数据流处理领域的首选消息队列。不同于传统消息中间件,Kafka以发布/订阅模式为核心,设计为分布式系统,特别适合处理大规模的数据流。本文将快速概览Kafka的基础概念、常见的陷阱与应对策略,并通过Java代码示例加深理解。

Kafka基础
Kafka由生产者、消费者、主题(Topics)和代理(Brokers)组成。生产者向特定主题发布消息,而消费者订阅这些主题来消费消息。Kafka的存储基于分区(Partitions),每个主题可分割成多个分区,这不仅提高了并发处理能力,也使得消息具有顺序性。

常见问题与易错点
1. 分区选择不当
分区数量不合理或分区策略不合适,会影响消息的分布均衡和消费速率。

避免方法:根据预期的吞吐量和消费者数量合理设置分区数。对于需保证消息顺序的应用,确保同类消息发送至同一分区。

2. 偏移量管理混乱
消费者偏移量管理不当,可能导致消息丢失或重复消费。

避免方法:利用Kafka自动提交偏移量的特性,或手动控制偏移量提交时机,确保消费进度的准确记录。

3. 资源与性能监控不足
忽视监控,可能导致资源耗尽或性能瓶颈未及时发现。

避免方法:利用Kafka自带的监控工具如Kafka Monitor,或集成外部监控系统,持续跟踪broker、topic和消费者的状态。

一、 同步生产消息到Kafka中

1.1.1 需求

接下来,我们将编写Java程序,将1-100的数字消息写入到Kafka中。

1.1.2 准备工作

1.1.2.1 导入Maven Kafka POM依赖
<repositories><!-- 代码库 -->
    <repository>
        <id>central</id>
        <url>http://maven.aliyun.com/nexus/content/groups/public//</url>
        <releases>
            <enabled>true</enabled>
        </releases>
        <snapshots>
            <enabled>true</enabled>
            <updatePolicy>always</updatePolicy>
            <checksumPolicy>fail</checksumPolicy>
        </snapshots>
    </repository>
</repositories>
​
<dependencies>
    <!-- kafka客户端工具 -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.4.1</version>
    </dependency>
​
    <!-- 工具类 -->
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-io</artifactId>
        <version>1.3.2</version>
    </dependency>
​
    <!-- SLF桥接LOG4J日志 -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.6</version>
    </dependency>
​
    <!-- SLOG4J日志 -->
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.16</version>
    </dependency>
</dependencies>
​
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.1.0</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
    </plugins>
</build>
1.1.2.2 导入log4j.properties

将log4j.properties配置文件放入到resources文件夹中

log4j.rootLogger=INFO,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender 
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout 
log4j.appender.stdout.layout.ConversionPattern=%5p - %m%n
1.1.2.3 创建包和类

创建包cn.yh.kafka,并创建KafkaProducerTest类。

1.1.3 代码开发

可以参考以下方式来编写第一个Kafka示例程序

参考以下文档:http://kafka.apache.org/24/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

  1. 创建用于连接Kafka的Properties配置

Properties props = new Properties();
props.put("bootstrap.servers", "192.168.88.100:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  1. 创建一个生产者对象KafkaProducer

  2. 调用send发送1-100消息到指定Topic test,并获取返回值Future,该对象封装了返回值

  3. 再调用一个Future.get()方法等待响应

  4. 关闭生产者

参考代码:

public class KafkaProducerTest {
    public static void main(String[] args) {
        // 1. 创建用于连接Kafka的Properties配置
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.88.100:9092");
        props.put("acks", "all");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
​
        // 2. 创建一个生产者对象KafkaProducer
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
​
        // 3. 调用send发送1-100消息到指定Topic test
        for(int i = 0; i < 100; ++i) {
            try {
                // 获取返回值Future,该对象封装了返回值
                Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>("test", null, i + ""));
                // 调用一个Future.get()方法等待响应
                future.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
​
        // 5. 关闭生产者
        producer.close();
    }
}

二、从Kafka的topic中消费消息

2.2.1 需求

从 test topic中,将消息都消费,并将记录的offset、key、value打印出来

2.2.2 准备工作

在cn.yh.kafka包下创建KafkaConsumerTest类

2.2.3 开发步骤

  1. 创建Kafka消费者配置

Properties props = new Properties();
props.setProperty("bootstrap.servers", "node1:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  1. 创建Kafka消费者

  2. 订阅要消费的主题

  3. 使用一个while循环,不断从Kafka的topic中拉取消息

  4. 将记录(record)的offset、key、value都打印出来

5.2.4 参考代码

public class KafkaConsumerTest {
​
    public static void main(String[] args) throws InterruptedException {
        // 1.创建Kafka消费者配置
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
        // 消费者组(可以使用消费者组将若干个消费者组织到一起),共同消费Kafka中topic的数据
        // 每一个消费者需要指定一个消费者组,如果消费者的组名是一样的,表示这几个消费者是一个组中的
        props.setProperty("group.id", "test");
        // 自动提交offset
        props.setProperty("enable.auto.commit", "true");
        // 自动提交offset的时间间隔
        props.setProperty("auto.commit.interval.ms", "1000");
        // 拉取的key、value数据的
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
​
        // 2.创建Kafka消费者
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
​
        // 3. 订阅要消费的主题
        // 指定消费者从哪个topic中拉取数据
        kafkaConsumer.subscribe(Arrays.asList("test"));
​
        // 4.使用一个while循环,不断从Kafka的topic中拉取消息
        while(true) {
            // Kafka的消费者一次拉取一批的数据
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(5));
            // 5.将将记录(record)的offset、key、value都打印出来
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                // 主题
                String topic = consumerRecord.topic();
                // offset:这条消息处于Kafka分区中的哪个位置
                long offset = consumerRecord.offset();
                // key\value
                String key = consumerRecord.key();
                String value = consumerRecord.value();
​
                System.out.println("topic: " + topic + " offset:" + offset + " key:" + key + " value:" + value);
            }
            // Thread.sleep(1000);
        }
    }
}

参考官网API文档:

http://kafka.apache.org/24/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

三、 异步使用带有回调函数方法生产消息

如果我们想获取生产者消息是否成功,或者成功生产消息到Kafka中后,执行一些其他动作。此时,可以很方便地使用带有回调函数来发送消息。

需求:

  1. 在发送消息出现异常时,能够及时打印出异常信息

  2. 在发送消息成功时,打印Kafka的topic名字、分区id、offset

public class KafkaProducerTest {
    public static void main(String[] args) {
        // 1. 创建用于连接Kafka的Properties配置
        Properties props = new Properties();
        props.put("bootstrap.servers", "node1:9092");
        props.put("acks", "all");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
​
        // 2. 创建一个生产者对象KafkaProducer
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
​
        // 3. 调用send发送1-100消息到指定Topic test
        for(int i = 0; i < 100; ++i) {
            // 一、同步方式
            // 获取返回值Future,该对象封装了返回值
            // Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>("test", null, i + ""));
            // 调用一个Future.get()方法等待响应
            // future.get();
​
            // 二、带回调函数异步方式
            producer.send(new ProducerRecord<String, String>("test", null, i + ""), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if(exception != null) {
                        System.out.println("发送消息出现异常");
                    }
                    else {
                        String topic = metadata.topic();
                        int partition = metadata.partition();
                        long offset = metadata.offset();
                        System.out.println("发送消息到Kafka中的名字为" + topic + "的主题,第" + partition + "分区,第" + offset + "条数据成功!");
                    }
                }
            });
        }
        // 5. 关闭生产者
        producer.close();
    }
}

总结

本文详细介绍了如何在Java中使用Kafka,包括创建生产者和消费者的基本步骤,以及一些高级配置与优化建议。通过本文的学习,相信大家能够掌握基本的Kafka使用方法,并能在实际项目中应用。

【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。