【Kafka笔记】Kafka 多线程消费消息

举报
Copy工程师 发表于 2022/01/17 20:05:23 2022/01/17
【摘要】 Kafka多线程消费理解 Kafka Java Consumer设计Kafka Java Consumer采用的是单线程的设计。其入口类KafkaConsumer是一个双线程的设计,即用户主线程和心跳线程。用户主线程,指的是启动Consumer应用程序main方法的线程,心跳线程(Heartbeat Thread)只负责定期给对应的Broker机器发送心跳请求,以表示消费者应用的存活性。官...

Kafka多线程消费理解

Kafka Java Consumer设计

Kafka Java Consumer采用的是单线程的设计。其入口类KafkaConsumer是一个双线程的设计,即用户主线程和心跳线程。
用户主线程,指的是启动Consumer应用程序main方法的线程,心跳线程(Heartbeat Thread)只负责定期给对应的Broker机器发送心跳请求,以表示消费者应用的存活性。

官网文档对于consumer多线程的处理方式 :

  1. 每个线程一个消费者

    每个线程自己的消费者实例。这里是这种方法的优点和缺点:

    • PRO: 这是最容易实现的
    • PRO: 因为它不需要在线程之间协调,所以通常它是最快的。
    • CON: 更多的消费者意味着更多的TCP连接到集群(每个线程一个)。一般kafka处理连接非常的快,所以这是一个小成本。
    • CON: 更多的消费者意味着更多的请求被发送到服务器,但稍微较少的数据批次可能导致I/O吞吐量的一些下降。
    • CON: 所有进程中的线程总数受到分区总数的限制。
  2. 解耦消费和处理

    另一个替代方式是一个或多个消费者线程,它来消费所有数据,其消费所有数据并将ConsumerRecords实例切换到由实际处理记录处理的处理器线程池来消费的阻塞队列。这个选项同样有利弊:

    • 可扩展消费者和处理进程的数量。这样单个消费者的数据可分给多个处理器线程来执行,避免对分区的任何限制。
    • CON: 跨多个处理器的顺序保证需要特别注意,因为线程是独立的执行,后来的消息可能比遭到的消息先处理,这仅仅是因为线程执行的运气。如果对排序没有问题,这就不是个问题。
    • CON: 手动提交变得更困难,因为它需要协调所有的线程以确保处理对该分区的处理完成。

这是两种不同的处理方式。

Kafka多线程消费实例

1. 消费者程序拥有多个消费者并拥有自己的主题

解释: 消费者程序启动多个线程,每个线程维护专属的KafkaConsumer,负责完整的消息获取、消息处理流程。 (其实就是一个消费者客户端开启多个线程,每个线程都有各自的Consumer对同一个topic或者多个topic进行消费,这些消费者(线程)组成了一个消费者组)

借用网上的图:

image.png

topic数据实例:

image.png

代码:

public class KafkaConsumerThread  implements Runnable{

    private KafkaConsumer<String,String> consumer;
    private AtomicBoolean closed = new AtomicBoolean(false);
    public KafkaConsumerThread(){

    }
    // 构造方法 生成自己的consumer
    public KafkaConsumerThread(Properties props) {
        this.consumer = new KafkaConsumer<>(props);
    }

    @Override
    public void run() {
        try {
            // 消费同一主题
            consumer.subscribe(Collections.singletonList("six-topic"));
            // 线程名称
            String threadName = Thread.currentThread().getName();
            while (!closed.get()){
                ConsumerRecords<String, String> records = consumer.poll(3000);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("Context: Thread-name= %s, topic= %s partition= %s, offset= %d, key= %s,value= %s\n",threadName,record.topic(),record.partition(),record.offset(),record.key(),record.value());
                }
            }
        }catch (WakeupException e){
            e.printStackTrace();
        }finally {
            consumer.close();
        }
    }

    /**
     * 关闭消费
     */
    public void shutdown(){
        closed.set(true);
        // wakeup 可以安全地从外部线程来中断活动操作
        consumer.wakeup();
    }

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "XXXXXXX:9093");
        props.put("group.id", "thread-1");//消费者组,只要group.id相同,就属于同一个消费者组
        props.put("enable.auto.commit", "true");//自动提交offset
        props.put("auto.offset.reset", "earliest");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("session.timeout.ms", "30000");
        props.put("max.poll.records",6);
        // 运行三个线程,消费同一个topic 这个topic的分区必须大于等于3 否则会有消费者消费不到数据
        for (int i = 0; i < 3 ; i++) {
            new Thread(new KafkaConsumerThread(props),"Thread"+i).start();
        }
    }
}

日志:

Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 0, key= ImKey-0-one,value= ImValue-0-one
Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 0, key= ImKey-1-one,value= ImValue-1-one
Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 1, key= ImKey-5-one,value= ImValue-5-one
Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 2, key= ImKey-8-one,value= ImValue-8-one
Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 3, key= ImKey-10-one,value= ImValue-10-one
Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 4, key= ImKey-13-one,value= ImValue-13-one
Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 5, key= ImKey-14-one,value= ImValue-14-one
Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 0, key= ImKey-4-one,value= ImValue-4-one
Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 1, key= ImKey-6-one,value= ImValue-6-one
Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 2, key= ImKey-7-one,value= ImValue-7-one
Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 3, key= ImKey-11-one,value= ImValue-11-one
Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 4, key= ImKey-15-one,value= ImValue-15-one
Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 5, key= ImKey-21-one,value= ImValue-21-one
Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 6, key= ImKey-25-one,value= ImValue-25-one
Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 7, key= ImKey-27-one,value= ImValue-27-one
Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 8, key= ImKey-29-one,value= ImValue-29-one
Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 1, key= ImKey-2-one,value= ImValue-2-one
Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 2, key= ImKey-3-one,value= ImValue-3-one
Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 3, key= ImKey-9-one,value= ImValue-9-one
Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 4, key= ImKey-12-one,value= ImValue-12-one
Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 5, key= ImKey-16-one,value= ImValue-16-one
Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 6, key= ImKey-17-one,value= ImValue-17-one
Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 7, key= ImKey-24-one,value= ImValue-24-one
Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 8, key= ImKey-32-one,value= ImValue-32-one
Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 6, key= ImKey-18-one,value= ImValue-18-one
Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 7, key= ImKey-19-one,value= ImValue-19-one
Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 8, key= ImKey-20-one,value= ImValue-20-one
Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 9, key= ImKey-22-one,value= ImValue-22-one
Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 10, key= ImKey-23-one,value= ImValue-23-one
Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 11, key= ImKey-26-one,value= ImValue-26-one
Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 12, key= ImKey-28-one,value= ImValue-28-one
Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 13, key= ImKey-30-one,value= ImValue-30-one
Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 14, key= ImKey-31-one,value= ImValue-31-one
Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 15, key= ImKey-33-one,value= ImValue-33-one
Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 16, key= ImKey-34-one,value= ImValue-34-one

可以看到三个线程,一个消费者组,每个线程消费者得到一个topic的分区去消费消息。

2. 单个消费者,多个线程处理消息

解释: 消费者程序使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑。获取消息的线程可以是一个或多个,每个维护专属KafkaConsumer实例,处理消息交由特定线程池来做,从而实现消息获取与消息处理的真正解耦。

image.png

这里的多线程处理消息逻辑可以有多种方法,这里就列出来几种:

使用队列存储消息,多线程处理队列:

  1. 使用独立锁(FIFO)队列LinkedBlockingQueue

    该队列是线程安全的先进先出队列

    public class KafkaConsumerThread2 implements Runnable {
        // 存储消息 先进先出队列
        private LinkedBlockingQueue<ConsumerRecords<String,String>> list;
        private AtomicBoolean closed = new AtomicBoolean(false);
    
        public KafkaConsumerThread2() {
        }
    
        public KafkaConsumerThread2(LinkedBlockingQueue<ConsumerRecords<String, String>> list) {
            this.list = list;
        }
    
        @Override
        public void run() {
            // 线程名称
            String threadName = Thread.currentThread().getName();
            // 处理消息
            while (!closed.get()){
                try {
                    ConsumerRecords<String, String> records = list.take();
                    System.out.println("消息数量"+records.count());
                    if (records.isEmpty()){
                        System.out.printf("队列为空,不消费数据,Thread-name= %s\n",threadName);
                    }else {
                        for (ConsumerRecord<String, String> record : records) {
                            Thread.sleep(3000);
                            System.out.printf("Context: Thread-name= %s, topic= %s partition= %s, offset= %d, key= %s,value= %s\n",threadName,record.topic(),record.partition(),record.offset(),record.key(),record.value());
                        }
                    }
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
            }
        }
    
        public static void main(String[] args) {
            LinkedBlockingQueue<ConsumerRecords<String, String>> list = new LinkedBlockingQueue<>();
            Properties props = new Properties();
            props.put("bootstrap.servers", "10.33.68.68:9093");
            props.put("group.id", "thread-5");//消费者组,只要group.id相同,就属于同一个消费者组
            props.put("enable.auto.commit", "true");//自动提交offset
            props.put("auto.offset.reset", "earliest");
            props.put("auto.commit.interval.ms", "1000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("session.timeout.ms", "30000");
            props.put("max.poll.records",5);
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            // 消费同一主题
            consumer.subscribe(Collections.singletonList("six-topic"));
            // 开启三个线程处理队列中的消息
            for (int i = 0; i <3 ; i++) {
                new Thread(new KafkaConsumerThread2(list),"thread-"+i).start();
            }
            while (true){
                ConsumerRecords<String, String> records = consumer.poll(1000);
                try {
                    list.put(records);
                    //Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
  2. 创建线程池,使用线程池处理消息逻辑

    逻辑处理类ConsumerDealThread:

    public class ConsumerDealThread implements Runnable{
        private ConsumerRecord record;
    
        public ConsumerDealThread(ConsumerRecord record) {
            this.record = record;
        }
    
        public void run() {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.printf("Context: Thread-name= %s, topic= %s partition= %s, offset= %d, key= %s,value= %s\n",Thread.currentThread().getName(),record.topic(),record.partition(),record.offset(),record.key(),record.value());
        }
    }
    

    运行类KafkaConsumerThread3:

    public class KafkaConsumerThread3 {
        public static void main(String[] args) {
            LinkedBlockingQueue<ConsumerRecords<String, String>> list = new LinkedBlockingQueue<>();
            Properties props = new Properties();
            props.put("bootstrap.servers", "10.33.68.68:9093");
            props.put("group.id", "thread-18");//消费者组,只要group.id相同,就属于同一个消费者组
            props.put("enable.auto.commit", "true");//自动提交offset
            props.put("auto.offset.reset", "earliest");
            props.put("auto.commit.interval.ms", "1000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("session.timeout.ms", "30000");
            props.put("max.poll.records",5);
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            // 消费同一主题
            consumer.subscribe(Collections.singletonList("six-topic"));
            ExecutorService executor = new ThreadPoolExecutor(3, 3, 0, TimeUnit.MILLISECONDS,
                    new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
            while (true){
                ConsumerRecords<String, String> records = consumer.poll(1000);
                try {
                    for (ConsumerRecord<String, String> record : records) {
                        executor.submit(new ConsumerDealThread(record));
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    consumer.wakeup();
                    executor.shutdown();
                    try {
                        if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
                            System.out.println("超时,未关闭线程池");
                        }
                    } catch (InterruptedException e2) {
                        e.printStackTrace();
                    }
                }
                BlockingQueue<Runnable> queue = ((ThreadPoolExecutor) executor).getQueue();
                System.out.println("队列数量:"+queue.size());
            }
        }
    }
    
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。