【云驻共创】教你学习 Kafka分布式消息订阅系统

天阳 发表于 2021/12/21 16:59:11 2021/12/21
【摘要】 主要讲述Kafka基本概念、架构及功能。重在了解Kafka是如何保证数据存储、传输的可靠性,以及对于旧数据的处理方式。

前言

本章主要讲述Kafka基本概念、架构及功能。重在了解Kafka是如何保证数据存储、传输的可靠性,以及对于旧数据的处理方式。

目标

学完本章后,您将能够:

掌握消息系统的基本概念掌握Kafka系统架构

一、kafka简介

Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统。

主要应用场景是:目志收集系统和消息系统。

分布式消息传递基于可靠的消息队列,在客户端应用和消息系统之间异步传递消息。有两种主要的消息传递模式:点对点传递模式、发布-订阅模式。大部分的消息系统选用发布-订阅模式。Kafka就是一种发布-订阅模式。

点对点消息传递模式

在点对点消息系统中,消息持久化到一个队列中。此时,将有一个或多个消费者消费队列中的数据』但是一条消息只能被消费一次。当-个消费者消费了队列中的某条数据之后,该条数据则从消息队列中删除。该模式即使有多个消费者同时消费数据,也能保证数据处理的顺序。生产者发送一条消息到queue,只有一个消费者能收到

  • 点对点的特点:
    1.每个消息只有一个接收者(Consumer)(即一旦被消费,消息就不再在消息队列中);
    2.发送者和接收者间没有依赖性,发送者发送消息之后,不管有没有接收者在运行,都不会影响到发送者下次发送消息;
    3.接收者在成功接收消息之后需向队列应答成功,以便消息队列删除当前接收的消息;

发布-订阅消息传递模式

在发布-订阅消息系统中,消息被持久化到一个topic中。与点对点消息系统不同的是,消费者可以订阅一个或多个topic,消费者可以消费该topic中所有的数据,同一条数据可以被多个消费者消费,数据被消费后不会立马删除。在发布-订阅消息系统中,消息的生产者称为发布者,消费者称为订阅者。发布者发送到topic的消息,只有订阅了topic的订阅者才会收到消息

  • 发布/订阅模式特点:
    1.每个消息可以有多个订阅者;
    2.发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息。
    3.为了消费消息,订阅者需要提前订阅该角色主题,并保持在线运行;

Kafka特点

  • 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能。

  • 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输。

  • 支持Kafka Server间的消息分区,及分布式消费,同时保证每个partition内的消息顺序传输。

  • 同时支持离线数据处理和实时数据处理。

  • Scale out:支持在线水平扩展

高吞吐量:Kafka 每秒可以生产约 25 万消息(50 MB),每秒处理 55 万消息(110 MB)
持久化数据存储:可进行持久化操作。将消息持久化到磁盘,因此可用于批量消费,例如 ETL,以及实时应用程序。通过将数据持久化到硬盘以及replication 防止数据丢失。
分布式系统易于扩展:所有的 producer、broker 和 consumer 都会有多个,均为分布式的。无需停机即可扩展机器。
客户端状态维护:消息被处理的状态是在 consumer 端维护,而不是由 server 端维护。当失败时能自动平衡。
kafka支持的在线水平扩展,当kafka的性能不能够满足目前业务计算能力的需求,或者是存储能力的需求,我们可以给kafka去增加一些节点,也就是说热插拔节点能够去提升它的水平的扩展能力、水平的服务能力。


二、kafka架构与功能

kafka拓扑结构图

kafka可以包含多个Push,Push就是生产者可以包含多个Broker。Broker实际上是kafka的实例,可以认为是一个软件运行起来的进程。kafka它可以包含了多个消费者,并且kafka还需要使用的Zookeeper来进行通信一致性协调服务。

Broker: 最基本的组件,Kafka集群包含一个或多个服务实例,这些服务实例被称为Broker。

Topic:每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic.

比如说我们针对了日志系统,我们可以去划分不同的日志的类型。比如说有一些日志属于是数据库的日志,有些日志是网站日志,我们都可以在kafka上面进行存储,

去命名不同的Topic,但Topic的名字可以是自身去命名的。

Partition: Kafka将Topic分成一个或者多个Partition,每个Partition在物理上对应一个文件夹,该文件夹下存储这个Partition的所有消息。

通常一个topic包含了大量的消息,这些消息我们存储的时候,我们可以对它进行分区,也就是Partition。Partition需要进行落盘存储,每个Partition都会在kafka的某个相关目录创建文件夹,这个文件夹存储Partition的所有消息。

Producer:负责发布消息到Kafka Broker。

Consumer:消息消费者,从Kafka Broker读取消息的客户端。

Consumer Group:每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name )。

Kafka Topics

每条发布到kafka的消息都有一个类别,这个类别被称为Topic,也可以理解为一个存储消息的队列。所有的消息都已topic作为单位进行归类。例如:天气作为一个Topic,每天的温度消息就可以存储在“天气”这个队列里。

Kafka Partition

为了提高Kafka的吞吐量,物理上把Topic分成一个或多个Partition,每个Partition都是有序且不可变的消息队列。每个Partition在物理上对应一个文件夹,该文件夹下存储这个Partition的所有消息和索引文件。

在多分区的情况下,需要有自己的一套机制来如何来把消息写入到相应的分区?

一般情况下有三种情况,一种情况是指定了分区的编号,可以直接写入到相应的分区。如果说没有指定分区的编号。这个消息有k值、有键值,就会根据哈希算法,把相同键值的消息放到同一个分区,当然键值不同,可能会放到跟一些键值相同的消息在同一个分区,也可能放到不同分区,是由哈希算法来决定的。如果说不满足前两种情况啊,默认情况,它可以去采用随机的方式,随机的放入到一个分区中。

Kafka Partition offset

每条消息在文件中的位置称为offset(偏移量),offset是一个long型数字,它唯一标记一条消息。消费者通过(offset、partition、topic)跟踪记录。

如图所示,消费消息的时候它是有顺序的,它实际上是根据这个offset决定消费哪一条消息。一般情况下它会如果是新的一个连接读取里面数据,它会消费就是offset的下一条数据。

offset存储机制

Consumer在从broker读取消息后,可以选择commit,该操作会在Kakfa中保存该Consumer在该Partition中读取的消息的offset。该Consumer下一次再读该Partition时会从下一条开始读取。

通过这一特性可以保证同一消费者从Kafka中不会重复消费数据。

消费者group位移保存在_consumer_offsets的目录上:

计算公式: Math.abs(grouplD.hashCode()) % 50

 kafka-logs目录,里面有多个目录,因为kafka默认会生成50个__consumer_offsets-n目录

Consumer group

每个consumer都属于一个consumer group,每条消息只能被consumer group中的一个Consumer消费,但可以被多个consumer group消费。即组间数据是共享的,,组内数据是竞争的。consumer group是kafka提供的可扩展且具有容错性的消费者机制。既然是一个组,那么组内必然可以有多个消费者或消费者实例(consumer instance),它们共享一个公共的ID,即group ID。组内的所有消费者协调在一起来消费订阅主题(subscribed topics)的所有分区(partition)。当然,每个分区只能由同一个消费组内的一个consumer来消费。

Kafka其他重要概念

replica:partition的副本,保障partition的高可用。

leader:replica中的一个角色,producer和consumer只跟leader交互。

follower:replica中的一个角色,从leader中复制数据。

controler:kafka集群中的其中一个服务器,用来进行leader election以及各种failover 


三、kafka数据管理

Kafka Partition Replica

kafka是一种高可靠性以及高效率的发布订阅消息和管理信息系统。kafka它如何去做到数据存储的可靠性,以及我们数据消费的高吞吐量呢?这是kafka的数据备份机制。如图所示,这里面有Broker1、Broker2、Broker3、Broker4,4个Broker,每个Broker里面又分别存储了不同的分区。假如说我们说目前一个topic有了3个分区,大家看一看这3个分区是怎么存储的。我们说在Broker1里面有Partition-0,备份存储在了Broker2中,Partition-1存储在Broker2中,备份存储在Broker3中。

每一个分区它的备份的数量,我们可以在kafka的配置文件中进行设置。我们这个图来演示的是每个分区是两个备份,那采用分区它有什么好处呢?采用分区的备份机制有什么好处?它的好处就是当一个分区失效的时候,它可以用其他的分区提供了服务,保证了数据的可靠性。另外我们提供服务的分区称为了leader,出于了备用的分区呢称为follower。如果说当提供服务的分区发生了故障,我们会选举处于备用的分区做为的leader继续提供服务。同时为了提高了kafka集群的吞吐量,我们会把不同的分区放在了不同的Broker上,也就是节点上,并且不同的节点都可以去充当了某个分区的leader。因此我们去读取数据的时候、消费数据的时候,我们就不会只访问某一个节点,这样可以去提高了kafka的高吞吐量。

我们看一看kafka的分区是如何来进行备份的,如上图所示,生产者处于leader的分区写入了数据,commit后,我们的follower Partition就会从Leader分区中拉取相应的分析数据。那Leader和Follower是怎么样去拉取这个分区数据呢?它会产生了一个线程,这个线程叫Replica Fetcher Thread这个线程,然后从里面去拉取。采用这种方式可以去提高了kafka的高吞吐量。

Kafka HA

  • 同一个partition可能会有多个replica (对应server.properties配置中的default.replication.factor=N )。
  • 没有replica的情况下,一旦broker宕机,其上所有patition的数据都不可被消费,同时producer也不能再将数据存于其上的 patition。
  • 引入replication之后,同一个partition可能会有多个replica,而这时需要在这些replica之间选出一个leader,producer和consumer只与这个leader交互,其它replica作为follower从leader中复制数据。

Leader Failover 

当partition对应的leader宕机时,需要从follower中选举出新leader。在选举新leader时,一个基本原则是,新的leader必须拥有旧leader commit过的所有消息。由写入流程可知ISR里面的所有replica都跟上了leader,只有ISR里面的成员才能选为leader。

对于f+1个replica,partition可以在容忍f个replica失效的情况下保证消息不丢失。

当所有replica都不工作时,有两种可行的方案:

  • 等待ISR中的任一个replica活过来,并选它作为leader。可保障数据不丢失,但时间可能相对较长。
  • 选择第一个活过来的replica(不一定是ISR成员)作为leader。无法保障数据不丢失,但相对不可用时间较短。

Kafka数据可靠性

Kafka所有消息都会被持久化到硬盘中,同时Kafka通过对Topic Partition设置Replication来保障数据可靠。

那么,在消息传输过程中有没有可靠性保证呢?

消息传输语义是消息传输保证的依据

消息传输保障通常有以下三种:

  • 最多一次(At Most Once):消息可能丢失。消息不会重复发送和处理。
  • 最少一次(At Lease Once):消息不会丢失。消息可能会重复发送和处理。
  • 仅有一次(Exactly Once):消息不会丢失。消息仅被处理一次。

可靠性保证–幂等性

一个幂等性的操作就是一种被执行多次造成的影响和只执行一次造成的影响一样。

原理:

  • 每发送到Kafka的消息都将包含一个序列号,broker将使用这个序列号来删除重复数据
  • 这个序列号被持久化到副本日志,所以,即使分区的leader挂了,其他的broker接管了leader,新leader仍可以判断重新发送的是否重复了。
  • 这种机制的开销非常低:每批消息只有几个额外的字段。

可靠性保证– acks机制

producer需要server接收到数据之后发出的确认接收的信号,此项配置就是指procuder需要多少个这样的确认信号。此配置实际上代表了数据备份的可用性。以下设置为常用选项:

acks=0:设置为0表示producer不需要等待任何确认收到的信息。副本将立即加到socket buffer并认为已经发送。没有任何保障可以保证此种情况下server已经成功接收数据,同时重试配置不会发生作用(因为客户端不知道是否失败)回馈的offset会总是设置为-1;

acks=1:这意味着至少要等待leader已经成功将数据写入本地log,但是并没有等待所有follower是否成功写入。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失。

acks=all:这意味着leader需要等待所有备份都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的保证。

旧数据处理方式

Kafka把Topic中一个Parition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。

1640076771753016572.png

对于传统的message queue而言,一般会删除已经被消费的消息,而Kafka集群会保留所有的消息,无论其被消费与否。当然,因为磁盘限制,不可能永久保留所有数据(实际上也没必要),因此Kafka需要处理旧数据。

配置位置:$KAFKA_HOME/config/server.properties

Kafka Log Cleanupess

日志的清理方式有两种: delete和compact。

删除的阈值有两种:过期的时间和分区内总日志大小。

Kafka中,存在数据过期的机制,称为data expire。如何处理过期数据是根据指定的policy(策略)决定的,而处理过期数据的行为,即为log cleanup

Kafka Log Compact

压缩前的原日志文件,原日志文件它存储了偏移量,同时也存储了它的键以及值,我们进行压缩的时候,我们就把具有相同键的数据,也就是消息只保留一条,而多余的都给删除掉。那它具体选择哪一条,它肯定是要选择最新的一条。那怎么样知道是最新的呢?我们根据它的偏移量就可以知道了哪一条数据、哪条信息是最新的。对于每一个kafka partition的日志,以segment为单位,都会被分为两部分,已清理未清理的部分。同时,未清理的那部分又分为可以清理的不可清理的


总结

本章主要介绍了消息系统的基本概念和Kafka的应用场景,以及Kafka的系统架构和数据管理的内容。

本文整理自华为云社区【内容共创系列】活动。
查看活动详情:https://bbs.huaweicloud.com/blogs/314887
相关任务详情:Kafka分布式消息订阅系统

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区),文章链接,文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件至:cloudbbs@huaweicloud.com进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容。
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。