消息系统之 Kafka (02)

举报
幼儿园老大* 发表于 2025/01/31 22:10:22 2025/01/31
144 0 0
【摘要】 JAVA 生产信息导入依赖<!-- 导入 0.10.2 版本 Kafka --><dependency> <groupId>org.apache.Kafka</groupId> <artifactId>Kafka-clients</artifactId> <version>0.10.2.0</version></dependency>Copy to clipboardErr...

JAVA 生产信息

导入依赖

<!-- 导入 0.10.2 版本 Kafka -->
<dependency>
    <groupId>org.apache.Kafka</groupId>
    <artifactId>Kafka-clients</artifactId>
    <version>0.10.2.0</version>
</dependency>Copy to clipboardErrorCopied

配置生产者

在创建 Producer 对象前,必须配置以下属性:

属性 含义 备注
bootstrap.servers Kafka broker 地址 如果有多个地址用逗号分割
key.serializer key 的序列化类 必须实现 Kafka 的 Serializer 接口
value.serializer value 的序列化类 必须实现 Kafka 的 Serializer 接口

开发者还可以选择配置如下属性:

属性 含义 备注
request.required.acks 指定消息系统何时向生产者返回 ACK : 0 不需要、 1 主服务器收到后、 -1 所有服务器收到后。 选择不接收 ACK 时生产者能以最大速度发送消息,但如果 broker 没有收到消息,生产者将无感知。
producer.type 同步发送消息 sync 或异步发送消息 async  异步发送消息会被服务器暂存在一个阻塞队列中,被消费者拉取时再由线程取出并组装。

通过读取配置,即可生成 Producer 对象。

Properties KafkaProps = new Properties();
KafkaProps.put("bootstrap.servers", "broker1:port1, broker2:port2");
KafkaProps.put("key.serializer", "org.apache.Kafka.common.StringSerializer");
KafkaProps.put("value.serializer", "org.apache.Kafka.common.StringSerializer");
producer = new KafkaProducer<String, String>(KafkaProps);Copy to clipboardErrorCopied

构造消息

实例化 ProducerRecord 类得到消息对象。

创建时必须指定消息所属 Topic 和消息值 Value 。消息发往哪个 Partition 通常由负载均衡机制随机选择。若指定了 Partition 则发送到指定的 Partition,如果没有指定 Partition 但指定了 Key,则由 hasy(key) 决定。

由于 Kafka 只能保证 Partition 内消息的有序性,如果需要保证消息有序到达,Producer 必须指定消息到达的 Partition ,这些消息最终只能被 ConsumeGroup 内的一个 Consumer 消费。

// 三种构造方法
ProducerRecord<String, String> record = new ProducerRecord<>(topic, value);
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
ProducerRecord<String, String> record = new ProducerRecord<>(topic, partition, key, value);

// 发送给消息系统
producer.send(record);Copy to clipboardErrorCopied

接收 ACK

发送消息后,生产者有两种方式接收消息系统返回的 ACK :

  1. 通过返回的 Future 判断已经发送成功,get 方法会阻塞线程。实现同步等待。
try {
    Future future = producer.send(record); 
    future.get(10000);
} catch (TimeoutException e) {
    e.printStackTrace();
}Copy to clipboardErrorCopied
  1. 发送消息时传递一个回调对象,实现 Kafka 的 Callback 接口,通过回调判断是否发送成功。实现异步等待。
producer.send(record, new ProducerCallback());

private class ProducerCallback implements Callback {
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
        if (e != null)  e.printStackTrace();
    }
} Copy to clipboardErrorCopied

生产示例

import java.util.Properties;
import org.apache.Kafka.clients.producer.KafkaProducer;
import org.apache.Kafka.clients.producer.ProducerConfig;
import org.apache.Kafka.clients.producer.ProducerRecord;
import org.apache.Kafka.common.serialization.StringSerializer;

public class Producer {
    public static String topic = "test"; 

    public static void main(String[] args) throws InterruptedException {

        Properties p = new Properties();
        p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.23.76:9092,192.168.23.77:9092");          
        p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);       
        p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);    
        p.put("request.required.acks", "-1");                        
        p.put("producer.type", "async");         

        KafkaProducer<String, String> KafkaProducer = new KafkaProducer<>(p);

        try {
            for(int i = 0; i < 100; i++) {
                String msg = "Hello," + i;
                ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, msg);   
                KafkaProducer.send(record);                                  
                Thread.sleep(500);
            }
        } finally {
            KafkaProducer.close();
        }

    }
}Copy to clipboardErrorCopied
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

作者其他文章

评论(0

抱歉,系统识别当前为高风险访问,暂不支持该操作

    全部回复

    上滑加载中

    设置昵称

    在此一键设置昵称,即可参与社区互动!

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。