可修改对接地址的kafka客户端
【摘要】 原生的kafka客户端并不支持修改客户端连接的服务器地址,我们有需求需要支持动态地更新kafka客户端的地址,最简单的一个做法就是通过组合的方式,对原生的kafka客户端做一层wrap,来支持修改kafka客户端地址。这层wrap尽量做到无锁化,不影响性能 核心代码 生产者import lombok.extern.slf4j.Slf4j;import org.apache.kafka.cli...
原生的kafka
客户端并不支持修改客户端连接的服务器地址,我们有需求需要支持动态地更新kafka
客户端的地址,最简单的一个做法就是通过组合的方式,对原生的kafka
客户端做一层wrap
,来支持修改kafka
客户端地址。这层wrap尽量做到无锁化,不影响性能
核心代码
生产者
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.Serializer;
import java.util.Properties;
import java.util.concurrent.Future;
/**
* @author hezhangjian
*/
@Slf4j
public class EnhanceKafkaProducer<K, V> {
private volatile KafkaProducer<K, V> producer;
public EnhanceKafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
this.producer = new KafkaProducer<K, V>(properties, keySerializer, valueSerializer);
}
/**
* the caller should ensure thread safe call to this obj
*
* @param properties
* @param keySerializer
* @param valueSerializer
*/
public void changeParam(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
KafkaProducer<K, V> oldProducer = this.producer;
this.producer = null;
try {
oldProducer.close();
} catch (Exception e) {
log.error("ignore the old client close error");
}
this.producer = new KafkaProducer<K, V>(properties, keySerializer, valueSerializer);
}
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
if (producer == null) {
throw new IllegalStateException("kafka producer is switching");
}
return producer.send(record);
}
}
消费者
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.Deserializer;
import java.time.Duration;
import java.util.Properties;
/**
* @author hezhangjian
*/
@Slf4j
public class EnhanceKafkaConsumer<K, V> {
private volatile KafkaConsumer<K, V> consumer;
public EnhanceKafkaConsumer(Properties properties, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
this.consumer = new KafkaConsumer<K, V>(properties, keyDeserializer, valueDeserializer);
}
/**
* the caller should ensure thread safe call to this obj
*
* @param properties
* @param keyDeserializer
* @param valueDeserializer
*/
public void changeParam(Properties properties, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
KafkaConsumer<K, V> oldConsumer = this.consumer;
this.consumer = null;
try {
oldConsumer.close();
} catch (Exception e) {
log.error("ignore the old client close error");
}
this.consumer = new KafkaConsumer<>(properties, keyDeserializer, valueDeserializer);
}
public ConsumerRecords<K, V> poll(final Duration timeout) {
if (consumer == null) {
throw new IllegalStateException("kafka producer is switching");
}
return consumer.poll(timeout);
}
}
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)