Apache Kafka-生产消费基础篇
【摘要】
文章目录
POM 依赖生产者消费者测试
POM 依赖
版本请同使用的kafka服务端的版本保持一致
<dependency>
<groupId>o...
POM 依赖
版本请同使用的kafka服务端的版本保持一致
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
生产者
请小伙伴注意一下注释,这里就不做多余的解释啦
package com.artisan.kafka.first;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
* @author 小工匠
* @version 1.0
* @description: TODO
* @date 2021/2/17 19:45
* @mark: show me the code , change the world
*/
public class ProduceClient {
private static final String TOPIC = "artisanTopic";
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 属性设置
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.126.140:9092");
properties.put(ProducerConfig.ACKS_CONFIG,"1");
properties.put(ProducerConfig.RETRIES_CONFIG,3);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// 根据属性实例化KafkaProducer
Producer<String,String> producer = new KafkaProducer<String, String>(properties);
// 创建消息 三个参数,分别是 Topic ,消息的 key ,消息的 message 。
String message = "mockValue";
ProducerRecord<String ,String> producerRecord = new ProducerRecord<>(TOPIC, "mockKey", message);
// 发送消息 (同步)
Future<RecordMetadata> result = producer.send(producerRecord);
// 获取同步发送的结果
RecordMetadata recordMetadata = result.get();
System.out.println(String.format("Message[ %s ] sent to Topic: %s || Partition: %s || Offset: %s",message, recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()));
}
}
消费者
请小伙伴注意一下注释,这里就不做多余的解释啦
package com.artisan.kafka.first;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
/**
* @author 小工匠
* @version 1.0
* @description: TODO
* @date 2021/2/17 20:09
* @mark: show me the code , change the world
*/
public class ConsumerClient {
private static final String TOPIC = "artisanTopic";
public static void main(String[] args) {
// 属性设置
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG , "192.168.126.140:9092"); // Broker 的地址
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"artisan-consumer-group");// 消费者分组
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); // 设置消费者分组最初的消费进度为 earliest
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true"); // 是否自动提交消费进度
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000); // 自动提交消费进度频率
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 消息的 key 的反序列化方式
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 消息的 value 的反序列化方式
// 根据设置实例化KafkaConsumer
Consumer<String,String> consumer = new KafkaConsumer<>(properties);
// 订阅消息
consumer.subscribe(Collections.singleton(TOPIC));
// 循环拉取消息
while (true){
// 拉取消息
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
// 遍历处理消息
records.forEach(record -> System.out.println(String.format("接收到消息:Key %s || 内容 %s" , record.key(),record.value())));
}
}
}
属性的话,需要结合kafka的特性来讲解,后面的单独介绍
测试
运行Produce
运行消费端
文章来源: artisan.blog.csdn.net,作者:小小工匠,版权归原作者所有,如需转载,请联系作者。
原文链接:artisan.blog.csdn.net/article/details/113837579
【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)