可修改对接地址的kafka客户端

举报
张俭 发表于 2023/12/30 10:41:54 2023/12/30
【摘要】 原生的kafka客户端并不支持修改客户端连接的服务器地址,我们有需求需要支持动态地更新kafka客户端的地址,最简单的一个做法就是通过组合的方式,对原生的kafka客户端做一层wrap,来支持修改kafka客户端地址。这层wrap尽量做到无锁化,不影响性能 核心代码 生产者import lombok.extern.slf4j.Slf4j;import org.apache.kafka.cli...

原生的kafka客户端并不支持修改客户端连接的服务器地址,我们有需求需要支持动态地更新kafka客户端的地址,最简单的一个做法就是通过组合的方式,对原生的kafka客户端做一层wrap,来支持修改kafka客户端地址。这层wrap尽量做到无锁化,不影响性能

核心代码

生产者

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.Serializer;

import java.util.Properties;
import java.util.concurrent.Future;

/**
 * @author hezhangjian
 */
@Slf4j
public class EnhanceKafkaProducer<K, V> {

    private volatile KafkaProducer<K, V> producer;

    public EnhanceKafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
        this.producer = new KafkaProducer<K, V>(properties, keySerializer, valueSerializer);
    }

    /**
     * the caller should ensure thread safe call to this obj
     *
     * @param properties
     * @param keySerializer
     * @param valueSerializer
     */
    public void changeParam(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
        KafkaProducer<K, V> oldProducer = this.producer;
        this.producer = null;
        try {
            oldProducer.close();
        } catch (Exception e) {
            log.error("ignore the old client close error");
        }
        this.producer = new KafkaProducer<K, V>(properties, keySerializer, valueSerializer);
    }

    public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
        if (producer == null) {
            throw new IllegalStateException("kafka producer is switching");
        }
        return producer.send(record);
    }

}

消费者

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.Deserializer;

import java.time.Duration;
import java.util.Properties;

/**
 * @author hezhangjian
 */
@Slf4j
public class EnhanceKafkaConsumer<K, V> {

    private volatile KafkaConsumer<K, V> consumer;

    public EnhanceKafkaConsumer(Properties properties, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        this.consumer = new KafkaConsumer<K, V>(properties, keyDeserializer, valueDeserializer);
    }

    /**
     * the caller should ensure thread safe call to this obj
     *
     * @param properties
     * @param keyDeserializer
     * @param valueDeserializer
     */
    public void changeParam(Properties properties, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        KafkaConsumer<K, V> oldConsumer = this.consumer;
        this.consumer = null;
        try {
            oldConsumer.close();
        } catch (Exception e) {
            log.error("ignore the old client close error");
        }
        this.consumer = new KafkaConsumer<>(properties, keyDeserializer, valueDeserializer);
    }

    public ConsumerRecords<K, V> poll(final Duration timeout) {
        if (consumer == null) {
            throw new IllegalStateException("kafka producer is switching");
        }
        return consumer.poll(timeout);
    }

}

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。