Kafka基本概念和相关参数
Kafka基本概念
1.1.1 什么是Kafka
Kafka是一个分布式流数据系统,使用Zookeeper进行集群的管理。与其他消息系统类似,整个系统由生产者、Broker Server和消费者三部分组成,生产者和消费者由开发人员编写,通过API连接到Broker Server进行数据操作。具有快速、可扩展、高吞吐率、可容错的特征
1.1.2 Topic
是Kafka下消息的类别,类似于RabbitMQ中的Exchange的概念。这是逻辑上的概念,用来区分、隔离不同的消息数据,屏蔽了底层复杂的存储方式。对于大多数人来说,在开发的时候只需要关注数据写入到了哪个topic、从哪个topic取出数据。
1.1.3 Partition
是Kafka下数据存储的基本单元,这个是物理上的概念。对应到机器上就是一个或者多个点目录,同一个topic的数据,会被分散的存储到多个partition中,这些partition可以在同一台机器上,也可以是在多台机器上,比如下图所示的topic就有4个partition,分散在两台机器上。这种方式在大多数分布式存储中都可以见到,比如MongoDB、Elasticsearch的分片技术,其优势在于:有利于水平扩展,避免单台机器在磁盘空间和性能上的限制,同时可以通过复制来增加数据冗余性,提高容灾能力。为了做到均匀分布,通常partition的数量通常是Broker Server数量的整数倍。每个分区都是一个有序的、不可变的消息序列,后续新来的消息会源源不断地、持续追加到分区的后面,这相当于一种结构化的提交日志(类似于Git的提交日志),分区中的每一条消息都会被分配一个连续的id值(即offset),该值用于唯一标识分区中的每一条消息。
- 分区中的消息数据是存储在日志文件中的,而且同一分区中的消息数据是按照发送顺序严格有序的。分区在逻辑上对应一个日志,当生产者将消息写入分区中时,实际上是写到了分区所对应的日志当中。而日志可以看作是一种逻辑上的概念,它对应于磁盘上的一个目录。一个日志文件由多个Segment(段)来构成,每个Segment对应于一个索引文件与一个日志文件。
- 借助于分区,我们可以实现Kafka Server的水平扩展。对于一台机器来说,无论是物理机还是虚拟机,其运行能力总归是有上限的。当一台机器到达其能力上限时就无法再扩展了,即垂直扩展能力总是受到硬件制约的。通过使用分区,我们可以将一个主题中的消息分散到不同的Kafka Server上(这里需要使用Kafka集群),这样当机器的能力不足时,我们只需要添加机器就可以了,在新的机器上创建新的分区,这样理论上就可以实现无限的水平扩展能力。
- 分区还可以实现并行处理能力,向一个主题所发送的消息会发送给该主题所拥有的不同的分区中,这样消息就可以实现并行发送与处理,由多个分区来接收所发送的消息。
一个partition在物理上是由一个或者多个segment所构成的。每个segment中则保存了真实的消息数据。
1.1.4 Segment
每个partition都相当于一个大型文件被分配到多个大小相等的segment数据文件中,每个segment中的消息数量未必相等(这与消息大小有着紧密的关系,不同的消息所占据的磁盘空间显然是不一样的),这个特点使得老的segment文件可以很容易就被删除掉,有助于提升磁盘的利用效率。每个partition只需要支持顺序读写即可,segment文件的生命周期是由Kafka Server的配置参数所决定的。比如说,server.properties文件中的参数项log.retention.hours=168就表示7天后删除老的消息文件,一个segment文件的默认大小为1G(可以通过配置文件修改)
00000000000000000000.index:它是segment文件的索引文件,它与接下来我们要介绍的00000000000000000000.log数据文件是成对出现的。后缀.index就表示这是个索引文件。
00000000000000000000.log:它是segment文件的数据文件,用于存储实际的消息。该文件是二进制格式的。segment文件的命名规则是partition全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值。没有数字则用0填充。由于这里的主题的消息数量较少,因此只有一个数据文件。
offset:表示的是相对于该分区的记录偏移量,指的是第几条记录,比如0代表第一条记录。
position:表示该记录相对于当前片段文件的偏移量。
CreateTime:记录创建的时间。
isvalid:记录是否有效。
keysize:表示key的长度。
valuesize:表示value的长度
magic:表示本次发布kafka服务程序协议版本号。
compresscodec:压缩工具。
producerId:生产者ID(用于幂等机制)。
sequence:消息的序列号(用于幂等机制)。
payload:表示具体的消息
00000000000000000000.timeindex:该文件是一个基于消息日期的索引文件,主要用途是在一些根据日期或是时间来寻找消息的场景下使用,此外在基于时间的日志rolling或是基于时间的日志保留策略等情况下也会使用。实际上,该文件是在Kafka较新的版本中才增加的,老版本Kafka是没有该文件的。它是对*.index文件的一个有益补充。*.index文件是基于偏移量的索引文件,而*.timeindex则是基于时间戳的索引文件。
leader-epoch-checkpoint:是leader的一个缓存文件。实际上,它是与Kafka的HW(High Watermark)与LEO(Log End Offset)相关的一个重要文件
Kafka里面每一条消息,都有自己的offset(相对偏移量),存在物理磁盘上面,在position Position:物理位置(磁盘上面哪个地方)也就是说一条消息就有两个位置:offset:相对偏移量(相对位置)position:磁盘物理位置 稀疏索引: Kafka中采用了稀疏索引的方式读取索引,kafka每当写入了4k大小的日志(.log),就往index里写入一个记录索引。其中会采用二分查找。
1.1.5 Consumer Group
同样是逻辑上的概念,是Kafka实现单播和广播两种消息模型的手段。同一个topic的数据,会广播给不同的group;同一个group中的worker,只有一个worker能拿到这个数据。换句话说,对于同一个topic,每个group都可以拿到同样的所有数据,但是数据进入group后只能被其中的一个worker消费。group内的worker可以使用多线程或多进程来实现,也可以将进程分散在多台机器上,worker的数量通常不超过partition的数量,且二者最好保持整数倍关系,因为Kafka在设计时假定了一个partition只能被一个worker消费(同一group内)。
组内consumer与partition的关系是 1:n
Partition与组内consumer的关系是 1:1
消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响,所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
消费者组初始化流程:
消费者组消费流程:
1.1.7 coordinator
Coordinator一般指的是运行在每个broker上的group coordinator进程,用于管理consumer group中的各个成员,主要用于Offset位移管理和Rebalance, 一个coordinator可以同时管理多个consumer group
1.1.8 kafka版本火车
2. Kafka参数
2.1 broker参数
- dirs: 这是非常重要的参数,指定了Broker需要使用的若干个文件目录路径。要知道这个参数是没有默认值的,这说明什么?这说明它必须由你亲自指定, 在线上生产环境中一定要为log.dirs配置多个路径,具体格式是一个CSV格式,也就是用逗号分隔的多个路径,比如/home/kafka1,/home/kafka2,/home/kafka3这样。如果有条件的话你最好保证这些目录挂载到不同的物理磁盘上。这样做有两个好处:
- 提升读写性能:比起单块磁盘,多块物理磁盘同时读写数据有更高的吞吐量。
- 能够实现故障转移:即Failover。这是Kafka 1.1版本新引入的强大功能。要知道在以前,只要Kafka Broker使用的任何一块磁盘挂掉了,整个Broker进程都会关闭。但是自1开始,这种情况被修正了,坏掉的磁盘上的数据会自动地转移到其他正常的磁盘上,而且Broker还能正常工作。还记得上一期我们关于Kafka是否需要使用RAID的讨论吗?这个改进正是我们舍弃RAID方案的基础:没有这种Failover的话,我们只能依靠RAID来提供保障。
- connect: 这也是一个CSV格式的参数,比如我可以指定它的值为zk1:2181,zk2:2181,zk3:2181。2181是ZooKeeper的默认端口。现在问题来了,如果我让多个Kafka集群使用同一套ZooKeeper集群,那么这个参数应该怎么设置呢?这时候chroot就派上用场了。这个chroot是ZooKeeper的概念,类似于别名。如果你有两套Kafka集群,假设分别叫它们kafka1和kafka2,那么两套集群的zookeeper.connect参数可以这样指定:zk1:2181,zk2:2181,zk3:2181/kafka1和zk1:2181,zk2:2181,zk3:2181/kafka2。切记chroot只需要写一次,而且是加到最后的。我经常碰到有人这样指定:zk1:2181/kafka1,zk2:2181/kafka2,zk3:2181/kafka3,这样的格式是不对的。
- create.topics.enable:是否允许自动创建Topic。(最好设置成false,生产容易造成topic管理混乱
- leader.election.enable:是否允许Unclean Leader选举,是否允许保存数据少的副本参与Leader选举(建议设置成false)
- leader.rebalance.enable:是否允许定期进行Leader选举。(最好设置成false)
- retention.{hours|minutes|ms}:这是个“三兄弟”,都是控制一条消息数据被保存多长时间。从优先级上来说ms设置最高、minutes次之、hours最低。
- retention.bytes:这是指定Broker为消息保存的总磁盘容量大小。(默认为-1)不进行限制
- max.bytes:控制Broker能够接收的最大消息大小。默认的1000012太少了,还不到1MB,建议设置大一些
- topic.num.partitions
- topic.replication.factor
2.2 Topic级别参数
- ms:规定了该Topic消息被保存的时长。默认是7天,即该Topic只保存最近7天的消息。一旦设置了这个值,它会覆盖掉Broker端的全局参数值
- bytes:规定了要为该Topic预留多大的磁盘空间。和全局参数作用相似,这个值通常在多租户的Kafka集群中会有用武之地。当前默认值是-1,表示可以无限使用磁盘空间。
- bytes:规定了要为该Topic预留多大的磁盘空间。和全局参数作用相似,这个值通常在多租户的Kafka集群中会有用武之地。当前默认值是-1,表示可以无限使用磁盘空间。
2.3 JVM参数
Kafka服务器端代码是用Scala语言编写的,但终归还是编译成Class文件在JVM上运行,因此JVM参数设置对于Kafka集群的重要性不言而喻。
首先我先说说Java版本,我个人极其不推荐将Kafka运行在Java 6或7的环境上。Java 6实在是太过陈旧了,没有理由不升级到更新版本。另外Kafka自2.0.0版本开始,已经正式摒弃对Java 7的支持了,所以有条件的话至少使用Java 8吧。
说到JVM端设置,堆大小这个参数至关重要。虽然在后面我们还会讨论如何调优Kafka性能的问题,但现在我想无脑给出一个通用的建议:将你的JVM堆大小设置成6GB吧,这是目前业界比较公认的一个合理值。我见过很多人就是使用默认的Heap Size来跑Kafka,说实话默认的1GB有点小,毕竟Kafka Broker在与客户端进行交互时会在JVM堆上创建大量的ByteBuffer实例,Heap Size不能太小。
JVM端配置的另一个重要参数就是垃圾回收器的设置,也就是平时常说的GC设置。如果你依然在使用Java 7,那么可以根据以下法则选择合适的垃圾回收器:如果Broker所在机器的CPU资源非常充裕,建议使用CMS收集器。启用方法是指定-XX:+UseCurrentMarkSweepGC。否则,使用吞吐量收集器。开启方法是指定-XX:+UseParallelGC。
当然了,如果你在使用Java 8,那么可以手动设置使用G1收集器。在没有任何调优的情况下,G1表现得要比CMS出色,主要体现在更少的Full GC,需要调整的参数更少等,G1比CMS垃圾回收处理器的优势:
- G1是一种适用于服务器端的垃圾回收器,很好的平衡了吞吐量和响应能力
- 对于内存的划分方法不同,Eden、 Survivor、Old区域不再固定,使用内存更高效,G1通过对内存进行Region的划分,有效的避免了内存碎片问题
- G1可以指定GC时用于暂停线程时间(不保证严格遵守), 而CMS并不提供可控选项
- CMS只有在FullGC之后重新合并压缩内存,而G1把回收和合并集合在一起
- CMS只能使用在Old区,在清理yong时一般是配合parNew,而G1可以统一两类分区的回收算法
G1的使用场景:(1)JVM占用内存较大(至少4G)(2)应用本身频繁申请、释放内存,进而产生大量内存碎片时(3)对于GC时间较为敏感的应用
所以使用G1就好了。现在我们确定好了要设置的JVM参数,我们该如何为Kafka进行设置呢?有些奇怪的是,这个问题居然在Kafka官网没有被提及。其实设置的方法也很简单,你只需要设置下面这两个环境变量即可:
KAFKA_HEAP_OPTS:指定堆大小。
KAFKA_JVM_PERFORMANCE_OPTS:指定GC参数。
比如你可以这样启动Kafka Broker,即在启动Kafka Broker之前,先设置上这两个环境变量:
$> export KAFKA_HEAP_OPTS=--Xms6g --Xmx6g
$> export KAFKA_JVM_PERFORMANCE_OPTS= -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true
$> bin/kafka-server-start.sh config/server.properties
2.4 操作系统参数
最后我们来聊聊Kafka集群通常都需要设置哪些操作系统参数。通常情况下,Kafka并不需要设置太多的OS参数,但有些因素最好还是关注一下,比如下面这几个:
文件描述符限制
文件系统类型
Swappiness
提交时间
首先是ulimit -n。我觉得任何一个Java项目最好都调整下这个值。实际上,文件描述符系统资源并不像我们想象的那样昂贵,你不用太担心调大此值会有什么不利的影响。通常情况下将它设置成一个超大的值是合理的做法,比如ulimit -n 1000000。还记得电影《让子弹飞》里的对话吗:“你和钱,谁对我更重要?都不重要,没有你对我很重要!”。这个参数也有点这么个意思。其实设置这个参数一点都不重要,但不设置的话后果很严重,比如你会经常看到“Too many open files”的错误。
其次是文件系统类型的选择。这里所说的文件系统指的是如ext3、ext4或XFS这样的日志型文件系统。根据官网的测试报告,XFS的性能要强于ext4,所以生产环境最好还是使用XFS。对了,最近有个Kafka使用ZFS的数据报告,貌似性能更加强劲,有条件的话不妨一试。
第三是swap的调优。网上很多文章都提到设置其为0,将swap完全禁掉以防止Kafka进程使用swap空间。我个人反倒觉得还是不要设置成0比较好,我们可以设置成一个较小的值。为什么呢?因为一旦设置成0,当物理内存耗尽时,操作系统会触发OOM killer这个组件,它会随机挑选一个进程然后kill掉,即根本不给用户任何的预警。但如果设置成一个比较小的值,当开始使用swap空间时,你至少能够观测到Broker性能开始出现急剧下降,从而给你进一步调优和诊断问题的时间。基于这个考虑,我个人建议将swappniess配置成一个接近0但不为0的值,比如1。
最后是提交时间或者说是Flush落盘时间。向Kafka发送数据并不是真要等数据被写入磁盘才会认为成功,而是只要数据被写入到操作系统的页缓存(Page Cache)上就可以了,随后操作系统根据LRU算法会定期将页缓存上的“脏”数据落盘到物理磁盘上。这个定期就是由提交时间来确定的,默认是5秒。一般情况下我们会认为这个时间太频繁了,可以适当地增加提交间隔来降低物理磁盘的写操作。当然你可能会有这样的疑问:如果在页缓存中的数据在写入到磁盘前机器宕机了,那岂不是数据就丢失了。的确,这种情况数据确实就丢失了,但鉴于Kafka在软件层面已经提供了多副本的冗余机制,因此这里稍微拉大提交间隔去换取性能还是一个合理的做法。
2.5 producer参数
- acks(默认值为1)
在消息被认为是“已提交”之前,producer需要leader确认请求的应答数。该参数用于控制消息的持久性,目前提供了3个取值:
acks = 0: 表示producer请求立即返回,不需要等待leader的任何确认。这种方案有最高的吞吐率,但是不保证消息是否真的发送成功。
acks = -1:表示分区leader必须等待消息被成功写入到所有的ISR副本(同步副本)中才认为请求成功。这种方案提供最高的消息持久性保证,但是理论上吞吐率也是最差的。
acks = 1: 表示leader必须应答此请求并写入消息到本地日志则请求被认为成功。如果此时leader应答请求之后挂掉了,消息会丢失。折中的方案提供了不错的持久性保证和吞吐。
- memory(默认值为33554432)
该参数用于指定producer端用于缓存消息的缓冲区大小,单位为字节,默认值为:33554432,合计为32M。kafka采用的是异步发送的消息架构,producer启动时会首先创建一块内存缓冲区用于保存待发送的消息,然后由一个专属线程负责从缓冲区读取消息进行真正的发送。消息持续发送过程中,当缓冲区被填满后,producer立即进入阻塞状态直到空闲内存被释放出来,这段时间不能超过max.blocks.ms设置的值,一旦超过,producer则会抛出TimeoutException 异常,因为Producer是线程安全的,若一直报TimeoutException,需要考虑调高buffer.memory了。用户在使用多个线程共享kafka producer时,很容易把 buffer.memory 打满。
- block.ms(默认值为60000)
KafkaProducer.send() and KafkaProducer.partitionsFor() 方法最大的阻塞时间,当buffer满了或者集群的Metadata不可用时,这两个方法会被阻塞。
- size(默认值为16384)
Producer都是按照batch进行发送的,因此batch大小的选择对于producer性能至关重要。producer会把发往同一分区的多条消息封装进一个batch中,当batch满了后,producer才会把消息发送出去。但是也不一定等到满了,这和另外一个参数linger.ms有关。batch.size的默认值为16384,合计为16K。如果producer发送的目标topic有10个分区,则需要16K*10=160K的内存来缓存这些数据,这个值不能大于buffer.memory指定的值。
- ms(默认值为0)
Producer是按照batch进行发送的,但是还要看linger.ms的值,默认是0,表示不做停留。这种情况下,可能有的batch中没有包含足够多的produce请求就被发送出去了,造成了大量的小batch,给网络IO带来的极大的压力。如果数据的产生速度在时间T内能达到batch.size大小,则linger.ms设置的值不应该比T小,否则batch.size将失去意义。
- type(默认值为none)
Kafka是端到端压缩,producer端开启了压缩,则在数据发往Broker的过程中是压缩的,Broker端的存储也是压缩的,Consumer从Broker端拉去的数据也是压缩的。这样,不仅减小了带宽,也减少了Broker端的磁盘占用。目前支持none(不压缩),gzip,snappy、ztsd和lz4。建议使用gzip或者lz4。
- 在吞吐量方面:LZ4 > Snappy > zstd > GZIP;
- 而在压缩比方面,zstd > LZ4 > GZIP > Snappy。
- 具体到物理资源,使用 Snappy 算法占用的网络带宽最多,zstd 最少,这是合理的,毕竟 zstd 就是要提供超高的压缩比;
- 在 CPU 使用率方面,各个算法表现得差不多,只是在压缩时 Snappy 算法使用的 CPU 较多一些,而在解压缩时 GZIP 算法则可能使用更多的 CPU。
- retries(默认值为2147483647)
Producer重试的次数设置。重试时,producer会重新发送之前由于瞬时原因出现失败的消息。瞬时失败的原因可能包括:元数据信息失效、副本数量不足、超时、位移越界或未知分区等。倘若设置了retries > 0,那么这些情况下producer会尝试重试。producer还有个参数:max.in.flight.requests.per.connection。如果设置该参数大于1,那么设置retries就有可能造成发送消息的乱序。版本为0.11.1.0的kafka已经支持"精确到一次的语义”,因此消息的重试不会造成消息的重复发送。
- backoff.ms(默认值为100)
两次retry的时间间隔,可以根据是网络情况和服务器情况调整,正常情况下没必要调整。
- in.flight.requests.per.connection(默认值为5)
Producer的IO线程在单个Socket连接上能够发送未应答请求的最大数量。即客户端到服务端的网络上最多允许的请求数量。增加此值应该可以增加IO线程的吞吐量,从而整体上提升producer的性能。不过就像之前说的如果。开启了重试机制,那么设置该参数大于1的话有可能造成消息的乱序。默认值5是一个比较好的起始点,如果发现producer的瓶颈在IO线程,同时各个broker端负载不高,那么可以尝试适当增加该值.过大增加该参数会造成producer的整体内存负担,同时还可能造成不必要的锁竞争反而会降低TPS。
- request.size(默认值为1048576)
Producer单次发往某个Borker的请求最大值。Sender线程将属于某个Broker的多个ProducerBatch封装成一个ClientRequest,多个ProducerBatch大小之和不能超过max.request.size设置的值。max.request.size设置的值不应该比Broker端设置的message.max.bytes大。
- idempotence(默认值为false)
是否使用幂等性。如果设置为true,表示producer将确保每一条消息都恰好有一份备份;如果设置为false,则表示producer因发送数据到broker失败重试使,可能往数据流中写入多分重试的消息。如果enable.idempotence为true,那么要求配置项max.in.flight.requests.per.connection的值必须小于或等于5;配置项retries的值必须大于0;acks配置项必须设置为all。如果这些值没有被用户明确地设置,那么系统将自动选择合适的值。如果设置的值不合适,那么会抛出ConfigException异常。
- timeout.ms(默认值为30000)
Producer向Broker发送请求以后,等待响应的最长时间。
2.6 Consumer参数
- poll.interval.ms:可以根据消费速度的快慢来设置,因为如果两次poll的时间超出了30s的时间间隔,kafka会认为消费能力过弱,将其踢出消费组,将分区分配给其他消费者。会触发rebalance机制,rebalance机制会造成性能开销,可以通过设置max.poll.records参数,让一次poll的消息条数少一点,如果consumer两次poll操作间隔超过了这个时间,broker就会认为这个consumer处理能力太弱,会将其踢出消费组,将分区分配给别的consumer消费 ,触发rebalance 。比下游处理时间稍大,减少rebanlance,实际应用中,消费到的数据处理时长不宜超过max.poll.interval.ms,否则会触发rebalance如果处理消费到的数据耗时,可以尝试通过减小max.poll.records的方式减小单次拉取的记录数(默认是500条)
- auto.commi: 指定了消费者是否自动提交消费位移,默认为true。如果需要减少重复消费或者数据丢失,你可以设置为false。如果为true,需要关注自动提交的时间间隔,该间隔由auto.commit.interval.ms设置。默认为5S
- interval.ms:每个consumer 都会根据 heartbeat.interval.ms 参数指定的时间周期性地向group coordinator发送 hearbeat,group coordinator会给各个consumer响应,若发生了 rebalance,各个consumer收到的响应中会包含 REBALANCE_IN_PROGRESS 标识,这样各个consumer就知道已经发生了rebalance,同时 group coordinator也知道了各个consumer的存活情况。
- timeout.ms 组管理协议中用来检测消费者是否失效的超时时间, 消费者在被认为死亡之前可以与服务器断开连接的时间,默认是 10s。session.timeout.ms >= 3 * heartbeat.interval.ms。减少rebalance
- offset.reset 有效值为“ earliest ”" latest ” “ none”,默认值latest
- earliest :当各分区下存在已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset 时,从头开始消费。
- latest :当各分区下存在已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset 时,消费该分区下新产生的数据。
- none :topic 各分区都存在已提交的 offset 时,从 offset 后开始消费;只要有一个分区不存在已提交的offset,则抛出异常。
- assignment.strategy 无默认值
- min.bytes Comsumer在一次拉取中从Kafka中拉取的最小数据量,默认1(B)
- max.bytes Comsumer在一次拉取中从Kafka中拉取的最大数据量,默认50MB
- poll.records:Consumer在一次拉取请求中拉取的最大消息数
- max.idle.ms用来指定在多久之后关闭限制的连接,默认值9分钟
- level事务隔离级别。字符串类型,有效值为“ read_uncommitted ,和“ read committed ",表示消费者所消费到的位置,可以消费到 HW (High Watermark )处的位置,默认值:read_ uncommitted
- buffer.bytes & send.buffer.bytes 这两个参数分别指定 TCP socket 接收和发送数据包缓冲区的大小,-1 代表使用操作系统的默认值。
- max.wait.ms broker 返回给消费者数据的等待时间,默认是 500ms。
- 点赞
- 收藏
- 关注作者
评论(0)