Apache Kafka-生产消费基础篇

举报
小工匠 发表于 2021/09/11 00:50:53 2021/09/11
【摘要】 文章目录 POM 依赖生产者消费者测试 POM 依赖 版本请同使用的kafka服务端的版本保持一致 <dependency> <groupId>o...


在这里插入图片描述

POM 依赖

版本请同使用的kafka服务端的版本保持一致

		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>2.4.1</version>
		</dependency>

  
 

生产者

请小伙伴注意一下注释,这里就不做多余的解释啦

package com.artisan.kafka.first;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

/**
 * @author 小工匠
 * @version 1.0
 * @description: TODO
 * @date 2021/2/17 19:45
 * @mark: show me the code , change the world
 */
public class ProduceClient {

    private static final String TOPIC = "artisanTopic";

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        // 属性设置 
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.126.140:9092");
        properties.put(ProducerConfig.ACKS_CONFIG,"1");
        properties.put(ProducerConfig.RETRIES_CONFIG,3);

        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // 根据属性实例化KafkaProducer
        Producer<String,String>  producer = new KafkaProducer<String, String>(properties);

        // 创建消息  三个参数,分别是 Topic ,消息的 key ,消息的 message 。
        String message = "mockValue";
        ProducerRecord<String ,String> producerRecord = new ProducerRecord<>(TOPIC, "mockKey", message);

        // 发送消息  (同步)
        Future<RecordMetadata> result = producer.send(producerRecord);

        // 获取同步发送的结果
        RecordMetadata recordMetadata = result.get();

        System.out.println(String.format("Message[ %s ] sent to Topic: %s  || Partition: %s || Offset: %s",message, recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()));
    }

}
  
 

消费者

请小伙伴注意一下注释,这里就不做多余的解释啦

package com.artisan.kafka.first;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

/**
 * @author 小工匠
 * @version 1.0
 * @description: TODO
 * @date 2021/2/17 20:09
 * @mark: show me the code , change the world
 */
public class ConsumerClient {

    private static final String TOPIC = "artisanTopic";
    public static void main(String[] args) {

        // 属性设置
        Properties properties = new Properties();

        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG , "192.168.126.140:9092");  // Broker 的地址
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"artisan-consumer-group");// 消费者分组
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");  // 设置消费者分组最初的消费进度为 earliest
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true"); // 是否自动提交消费进度
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000); // 自动提交消费进度频率
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 消息的 key 的反序列化方式
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 消息的 value 的反序列化方式

        // 根据设置实例化KafkaConsumer
        Consumer<String,String>  consumer = new KafkaConsumer<>(properties);

        // 订阅消息
        consumer.subscribe(Collections.singleton(TOPIC));
        // 循环拉取消息
        while (true){
            // 拉取消息
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
            // 遍历处理消息
            records.forEach(record -> System.out.println(String.format("接收到消息:Key %s  || 内容 %s" , record.key(),record.value())));
        }
    }
}
  
 

属性的话,需要结合kafka的特性来讲解,后面的单独介绍


测试

运行Produce

在这里插入图片描述

运行消费端

在这里插入图片描述

文章来源: artisan.blog.csdn.net,作者:小小工匠,版权归原作者所有,如需转载,请联系作者。

原文链接:artisan.blog.csdn.net/article/details/113837579

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。