消息中间件之Kafka原理

举报
tea_year 发表于 2025/04/30 08:50:22 2025/04/30
【摘要】 一 分区的leader与follower1.1 Leader和Follower在Kafka中,每个topic都可以配置多个分区以及多个副本。每个分区都有一个leader以及0个或者多个follower,在创建topic时,Kafka会将每个分区的leader均匀地分配在每个broker上。我们正常使用kafka是感觉不到leader、follower的存在的。但其实,所有的读写操作都是由le...

Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目。

Kafka的设计目标是能够处理大量的数据流,支持数百万的消息/秒,具有高可用性和可伸缩性。Kafka提供了许多功能,包括持久化消息、分布式消息发布和订阅、多副本备份、数据压缩、流数据处理等。

一 分区的leader与follower

1.1 Leader和Follower

在Kafka中,每个topic都可以配置多个分区以及多个副本。每个分区都有一个leader以及0个或者多个follower,在创建topic时,Kafka会将每个分区的leader均匀地分配在每个broker上。我们正常使用kafka是感觉不到leader、follower的存在的。但其实,所有的读写操作都是由leader处理,而所有的follower都复制leader的日志数据文件,如果leader出现故障时,follower就会被选举为leader。所以,可以这样说:

  • Kafka中的leader负责处理读写操作,而follower只负责副本数据的同步

  • 如果leader出现故障,其他follower会被重新选举为leader

  • follower像一个consumer一样,拉取leader对应分区的数据,并保存到日志数据文件中

leader 与 follower的职责:


1.2 查看某个partition的leader

使用Kafka-eagle查看某个topic的partition的leader在哪个服务器中。为了方便观察,我们创建一个名为test的3个分区、3个副本的topic。


  1. 点击「Topic」菜单下的「List」

image-20201229003705391.png

  1. 任意点击选择一个Topicimage-20201229003626096.png

1.3 AR、ISR、OSR

在实际环境中,leader有可能会出现一些故障,所以Kafka一定会选举出新的leader。在讲解leader选举之前,我们先要明确几个概念。Kafka中,把follower可以按照不同状态分为三类——AR、ISR、OSR。

  • 分区的所有副本称为 「AR」(Assigned Replicas——已分配的副本)

  • 所有与leader副本保持一定程度同步的副本(包括 leader 副本在内)组成 「ISR」(In-Sync Replicas——在同步中的副本)

  • 由follower副本同步滞后过多的副本(不包括 leader 副本)组成 「OSR」(Out-of-Sync Replias)

  • AR = ISR + OSR

  • 正常情况下,所有的follower副本都应该与leader副本保持同步,即AR = ISR,OSR集合为空。

1.4 查看分区的ISR

  1. 使用Kafka Eagle查看某个Topic的partition的ISR有哪几个节点。

  1. 尝试关闭id为0的broker(杀掉该broker的进程),参看topic的ISR情况。

1.5 Leader选举

leader对于消息的写入以及读取是非常关键的,此时有两个疑问:

  1. Kafka如何确定某个partition是leader、哪个partition是follower呢?

  2. 某个leader崩溃了,如何快速确定另外一个leader呢?因为Kafka的吞吐量很高、延迟很低,所以选举leader必须非常快

1.5.1 如果leader崩溃,Kafka会如何?

使用Kafka Eagle找到某个partition的leader,再找到leader所在的broker。在Linux中强制杀掉该Kafka的进程,然后观察leader的情况。

通过观察,我们发现,leader在崩溃后,Kafka又从其他的follower中快速选举出来了leader。

1.5.2 Controller介绍
  • Kafka启动时,会在所有的broker中选择一个controller

  • 前面leader和follower是针对partition,而controller是针对broker的

  • 创建topic、或者添加分区、修改副本数量之类的管理任务都是由controller完成的

  • Kafka分区leader的选举,也是由controller决定的

1.5.3 Controller的选举
  • 在Kafka集群启动的时候,每个broker都会尝试去ZooKeeper上注册成为Controller(ZK临时节点)

  • 但只有一个竞争成功,其他的broker会注册该节点的监视器

  • 一点该临时节点状态发生变化,就可以进行相应的处理

  • Controller也是高可用的,一旦某个broker崩溃,其他的broker会重新注册为Controller

1.5.4 找到当前Kafka集群的controller
  1. 点击Kafka Tools的「Tools」菜单,找到「ZooKeeper Brower...」

  2. 点击左侧树形结构的controller节点,就可以查看到哪个broker是controller了。

image-20201229010220091.png

1.5.5 测试controller选举

通过kafka tools找到controller所在的broker对应的kafka进程,杀掉该进程,重新打开ZooKeeper brower,观察kafka是否能够选举出来新的Controller。

image-20201229010307875.png

1.5.6 Controller选举partition leader
  • 所有Partition的leader选举都由controller决定

  • controller会将leader的改变直接通过RPC的方式通知需为此作出响应的Broker

  • controller读取到当前分区的ISR,只要有一个Replica还幸存,就选择其中一个作为leader, 否则,则任意选这个一个Replica作为leader

  • 如果该partition的所有Replica都已经宕机,则新的leader为-1

为什么不能通过ZK的方式来选举partition的leader?

  • Kafka集群如果业务很多的情况下,会有很多的partition

  • 假设某个broker宕机,就会出现很多的partiton都需要重新选举leader

  • 如果使用zookeeper选举leader,会给zookeeper带来巨大的压力。所以,kafka中leader的选举不能使用ZK来实现

1.6 leader负载均衡

1.6.1 Preferred Replica
  • Kafka中引入了一个叫做「preferred-replica」的概念,意思就是:优先的Replica

  • 在ISR列表中,第一个replica就是preferred-replica

  • 第一个分区存放的broker,肯定就是preferred-replica

  • 执行以下脚本可以将preferred-replica设置为leader,均匀分配每个分区的leader。

./kafka-leader-election.sh --bootstrap-server node1:9092 --topic 主题 --partition=1 --election-type preferred
1.6.2 确保leader在broker中负载均衡

杀掉test主题的某个broker,这样kafka会重新分配leader。等到Kafka重新分配leader之后,再次启动kafka进程。此时:观察test主题各个分区leader的分配情况。

image-20201229010737242.png

此时,会造成leader分配是不均匀的,所以可以执行以下脚本来重新分配leader:

bin/kafka-leader-election.sh --bootstrap-server node1:9092 --topic test --partition=2 --election-type preferred

--partition:指定需要重新分配leader的partition编号

wps49DE.tmp.jpg

二、Kafka生产、消费数据工作流程

2.1 Kafka数据写入流程

image-20201229011010253.png

  • 生产者先从 zookeeper 的 "/brokers/topics/主题名/partitions/分区名/state"节点找到该 partition 的leader

image-20201229011056888.png

  • 生产者在ZK中找到该ID找到对应的broker

图片3.png

  • broker进程上的leader将消息写入到本地log中

  • follower从leader上拉取消息,写入到本地log,并向leader发送ACK

  • leader接收到所有的ISR中的Replica的ACK后,并向生产者返回ACK。

2.2 Kafka数据消费流程

2.2.1 两种消费模式

  • kafka采用拉取模型,由消费者自己记录消费状态,每个消费者互相独立地顺序拉取每个分区的消息

  • 消费者可以按照任意的顺序消费消息。比如,消费者可以重置到旧的偏移量,重新处理之前已经消费过的消息;或者直接跳到最近的位置,从当前的时刻开始消费。

2.2.2 Kafka消费数据流程

image-20201229011742187.png

  • 每个consumer都可以根据分配策略(默认RangeAssignor),获得要消费的分区

  • 获取到consumer对应的offset(默认从ZK中获取上一次消费的offset)

  • 找到该分区的leader,拉取数据

  • 消费者提交offset

总结

分布式架构:Kafka采用分布式架构,数据分布在多个服务器(称为broker)上。这种架构使得Kafka能够处理大量数据,并通过复制机制提高数据的可靠性和可用性。
高吞吐量:Kafka通过批量处理和零拷贝机制(Zero-Copy Mechanism)优化I/O操作,实现了高吞吐量。这意味着Kafka能够以非常高的速度处理和传输数据。
持久化存储:Kafka使用磁盘进行持久化存储,这意味着即使在系统故障的情况下,数据也不会丢失。它支持数据的持久化,并通过日志(Log)结构来存储数据,确保数据的顺序性和完整性。
多副本机制:为了增强数据的可靠性和容错能力,Kafka允许为每个分区创建多个副本(replica)。这些副本分布在不同的broker上,其中一个副本是leader,负责处理读写请求,而其他副本则是follower,用于数据的备份和故障恢复。
分区与消费者组:Kafka通过将数据流分成多个分区(Partition),使得并行处理成为可能。每个分区可以独立地被多个消费者实例消费。通过消费者组(Consumer Group),Kafka允许多个消费者实例协同工作,共同处理来自一个或多个分区的消息。
生产者与消费者API:Kafka提供了丰富的API,包括生产者API和消费者API。生产者API允许应用程序向Kafka主题(Topic)发送消息,而消费者API允许应用程序订阅并消费主题中的消息。
高可用性与容错性:通过多副本机制和自动的leader选举,Kafka能够保证即使在部分服务器故障的情况下,系统仍然能够保持高可用性和数据的完整性。
总之,Kafka通过其分布式架构、高吞吐量设计、持久化存储、多副本机制、分区与消费者组、以及灵活的API支持,构建了一个强大而高效的流处理平台,广泛应用于日志收集、消息传递、流处理等多种场景。

【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。