【Kafka笔记】Kafka 多线程消费消息
Kafka多线程消费理解
Kafka Java Consumer设计
Kafka Java Consumer采用的是单线程的设计。其入口类KafkaConsumer是一个双线程的设计,即用户主线程和心跳线程。
用户主线程,指的是启动Consumer应用程序main方法的线程,心跳线程(Heartbeat Thread)只负责定期给对应的Broker机器发送心跳请求,以表示消费者应用的存活性。
官网文档对于consumer多线程的处理方式 :
-
每个线程一个消费者
每个线程自己的消费者实例。这里是这种方法的优点和缺点:
- PRO: 这是最容易实现的
- PRO: 因为它不需要在线程之间协调,所以通常它是最快的。
- CON: 更多的消费者意味着更多的TCP连接到集群(每个线程一个)。一般kafka处理连接非常的快,所以这是一个小成本。
- CON: 更多的消费者意味着更多的请求被发送到服务器,但稍微较少的数据批次可能导致I/O吞吐量的一些下降。
- CON: 所有进程中的线程总数受到分区总数的限制。
-
解耦消费和处理
另一个替代方式是一个或多个消费者线程,它来消费所有数据,其消费所有数据并将ConsumerRecords实例切换到由实际处理记录处理的处理器线程池来消费的阻塞队列。这个选项同样有利弊:
- 可扩展消费者和处理进程的数量。这样单个消费者的数据可分给多个处理器线程来执行,避免对分区的任何限制。
- CON: 跨多个处理器的顺序保证需要特别注意,因为线程是独立的执行,后来的消息可能比遭到的消息先处理,这仅仅是因为线程执行的运气。如果对排序没有问题,这就不是个问题。
- CON: 手动提交变得更困难,因为它需要协调所有的线程以确保处理对该分区的处理完成。
这是两种不同的处理方式。
Kafka多线程消费实例
1. 消费者程序拥有多个消费者并拥有自己的主题
解释: 消费者程序启动多个线程,每个线程维护专属的KafkaConsumer,负责完整的消息获取、消息处理流程。 (其实就是一个消费者客户端开启多个线程,每个线程都有各自的Consumer对同一个topic或者多个topic进行消费,这些消费者(线程)组成了一个消费者组)
借用网上的图:
topic数据实例:
代码:
public class KafkaConsumerThread implements Runnable{
private KafkaConsumer<String,String> consumer;
private AtomicBoolean closed = new AtomicBoolean(false);
public KafkaConsumerThread(){
}
// 构造方法 生成自己的consumer
public KafkaConsumerThread(Properties props) {
this.consumer = new KafkaConsumer<>(props);
}
@Override
public void run() {
try {
// 消费同一主题
consumer.subscribe(Collections.singletonList("six-topic"));
// 线程名称
String threadName = Thread.currentThread().getName();
while (!closed.get()){
ConsumerRecords<String, String> records = consumer.poll(3000);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Context: Thread-name= %s, topic= %s partition= %s, offset= %d, key= %s,value= %s\n",threadName,record.topic(),record.partition(),record.offset(),record.key(),record.value());
}
}
}catch (WakeupException e){
e.printStackTrace();
}finally {
consumer.close();
}
}
/**
* 关闭消费
*/
public void shutdown(){
closed.set(true);
// wakeup 可以安全地从外部线程来中断活动操作
consumer.wakeup();
}
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "XXXXXXX:9093");
props.put("group.id", "thread-1");//消费者组,只要group.id相同,就属于同一个消费者组
props.put("enable.auto.commit", "true");//自动提交offset
props.put("auto.offset.reset", "earliest");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("session.timeout.ms", "30000");
props.put("max.poll.records",6);
// 运行三个线程,消费同一个topic 这个topic的分区必须大于等于3 否则会有消费者消费不到数据
for (int i = 0; i < 3 ; i++) {
new Thread(new KafkaConsumerThread(props),"Thread"+i).start();
}
}
}
日志:
Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 0, key= ImKey-0-one,value= ImValue-0-one
Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 0, key= ImKey-1-one,value= ImValue-1-one
Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 1, key= ImKey-5-one,value= ImValue-5-one
Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 2, key= ImKey-8-one,value= ImValue-8-one
Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 3, key= ImKey-10-one,value= ImValue-10-one
Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 4, key= ImKey-13-one,value= ImValue-13-one
Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 5, key= ImKey-14-one,value= ImValue-14-one
Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 0, key= ImKey-4-one,value= ImValue-4-one
Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 1, key= ImKey-6-one,value= ImValue-6-one
Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 2, key= ImKey-7-one,value= ImValue-7-one
Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 3, key= ImKey-11-one,value= ImValue-11-one
Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 4, key= ImKey-15-one,value= ImValue-15-one
Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 5, key= ImKey-21-one,value= ImValue-21-one
Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 6, key= ImKey-25-one,value= ImValue-25-one
Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 7, key= ImKey-27-one,value= ImValue-27-one
Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 8, key= ImKey-29-one,value= ImValue-29-one
Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 1, key= ImKey-2-one,value= ImValue-2-one
Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 2, key= ImKey-3-one,value= ImValue-3-one
Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 3, key= ImKey-9-one,value= ImValue-9-one
Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 4, key= ImKey-12-one,value= ImValue-12-one
Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 5, key= ImKey-16-one,value= ImValue-16-one
Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 6, key= ImKey-17-one,value= ImValue-17-one
Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 7, key= ImKey-24-one,value= ImValue-24-one
Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 8, key= ImKey-32-one,value= ImValue-32-one
Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 6, key= ImKey-18-one,value= ImValue-18-one
Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 7, key= ImKey-19-one,value= ImValue-19-one
Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 8, key= ImKey-20-one,value= ImValue-20-one
Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 9, key= ImKey-22-one,value= ImValue-22-one
Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 10, key= ImKey-23-one,value= ImValue-23-one
Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 11, key= ImKey-26-one,value= ImValue-26-one
Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 12, key= ImKey-28-one,value= ImValue-28-one
Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 13, key= ImKey-30-one,value= ImValue-30-one
Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 14, key= ImKey-31-one,value= ImValue-31-one
Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 15, key= ImKey-33-one,value= ImValue-33-one
Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 16, key= ImKey-34-one,value= ImValue-34-one
可以看到三个线程,一个消费者组,每个线程消费者得到一个topic的分区去消费消息。
2. 单个消费者,多个线程处理消息
解释: 消费者程序使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑。获取消息的线程可以是一个或多个,每个维护专属KafkaConsumer实例,处理消息交由特定线程池来做,从而实现消息获取与消息处理的真正解耦。
这里的多线程处理消息逻辑可以有多种方法,这里就列出来几种:
使用队列存储消息,多线程处理队列:
-
使用独立锁(FIFO)队列LinkedBlockingQueue
该队列是线程安全的先进先出队列
public class KafkaConsumerThread2 implements Runnable { // 存储消息 先进先出队列 private LinkedBlockingQueue<ConsumerRecords<String,String>> list; private AtomicBoolean closed = new AtomicBoolean(false); public KafkaConsumerThread2() { } public KafkaConsumerThread2(LinkedBlockingQueue<ConsumerRecords<String, String>> list) { this.list = list; } @Override public void run() { // 线程名称 String threadName = Thread.currentThread().getName(); // 处理消息 while (!closed.get()){ try { ConsumerRecords<String, String> records = list.take(); System.out.println("消息数量"+records.count()); if (records.isEmpty()){ System.out.printf("队列为空,不消费数据,Thread-name= %s\n",threadName); }else { for (ConsumerRecord<String, String> record : records) { Thread.sleep(3000); System.out.printf("Context: Thread-name= %s, topic= %s partition= %s, offset= %d, key= %s,value= %s\n",threadName,record.topic(),record.partition(),record.offset(),record.key(),record.value()); } } }catch (InterruptedException e){ e.printStackTrace(); } } } public static void main(String[] args) { LinkedBlockingQueue<ConsumerRecords<String, String>> list = new LinkedBlockingQueue<>(); Properties props = new Properties(); props.put("bootstrap.servers", "10.33.68.68:9093"); props.put("group.id", "thread-5");//消费者组,只要group.id相同,就属于同一个消费者组 props.put("enable.auto.commit", "true");//自动提交offset props.put("auto.offset.reset", "earliest"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("session.timeout.ms", "30000"); props.put("max.poll.records",5); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // 消费同一主题 consumer.subscribe(Collections.singletonList("six-topic")); // 开启三个线程处理队列中的消息 for (int i = 0; i <3 ; i++) { new Thread(new KafkaConsumerThread2(list),"thread-"+i).start(); } while (true){ ConsumerRecords<String, String> records = consumer.poll(1000); try { list.put(records); //Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
-
创建线程池,使用线程池处理消息逻辑
逻辑处理类ConsumerDealThread:
public class ConsumerDealThread implements Runnable{ private ConsumerRecord record; public ConsumerDealThread(ConsumerRecord record) { this.record = record; } public void run() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.printf("Context: Thread-name= %s, topic= %s partition= %s, offset= %d, key= %s,value= %s\n",Thread.currentThread().getName(),record.topic(),record.partition(),record.offset(),record.key(),record.value()); } }
运行类KafkaConsumerThread3:
public class KafkaConsumerThread3 { public static void main(String[] args) { LinkedBlockingQueue<ConsumerRecords<String, String>> list = new LinkedBlockingQueue<>(); Properties props = new Properties(); props.put("bootstrap.servers", "10.33.68.68:9093"); props.put("group.id", "thread-18");//消费者组,只要group.id相同,就属于同一个消费者组 props.put("enable.auto.commit", "true");//自动提交offset props.put("auto.offset.reset", "earliest"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("session.timeout.ms", "30000"); props.put("max.poll.records",5); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // 消费同一主题 consumer.subscribe(Collections.singletonList("six-topic")); ExecutorService executor = new ThreadPoolExecutor(3, 3, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy()); while (true){ ConsumerRecords<String, String> records = consumer.poll(1000); try { for (ConsumerRecord<String, String> record : records) { executor.submit(new ConsumerDealThread(record)); } } catch (Exception e) { e.printStackTrace(); consumer.wakeup(); executor.shutdown(); try { if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { System.out.println("超时,未关闭线程池"); } } catch (InterruptedException e2) { e.printStackTrace(); } } BlockingQueue<Runnable> queue = ((ThreadPoolExecutor) executor).getQueue(); System.out.println("队列数量:"+queue.size()); } } }
- 点赞
- 收藏
- 关注作者
评论(0)