kafka 面试题
5. 消息
5.1. 请简述一下消息的顺序
- Kafka保证一个Partition内消息的有序性,但是并不保证多个Partition之间的数据有顺序。 每个Topic可以划分成多个分区( 每个Topic都至少有一个分区),同一Topic下的不同分区包含的消息是不同的。每个消息在被添加到分区时,都会被分配一个offset,它是消息在此分区中的唯一编号,Kafka 通过offset保证消息在分区内的顺序, offset 的顺序性不跨分区,即Kafka只保证在同一个分区内的消息是有序的,同一Topic的多个分区内的消息,Kafka并不保证其顺序性
- kafka消息有序。单分区可以。也可以使用key+多分区。保证同一个 Key 的所有消息都进入到相同的分区里面
- 防止乱序可以通过设置max.in.flight.requests.per.connection=1来保证
5.2. 如何保证消息的有序
- 一个生产者,发两次消息,但是网络原因,消息到达的顺序和消息发送的顺序不一致
- 设置max.in.flight.requests.per.connection=1来保证
5.3. 消息堆积可能原因
- 生产速度大于消费速度
- 可以适当增加分区,增加consumer数量,提升消费TPS;
- consumer消费性能低
- 查一下是否有很重的消费逻辑,看看是否可以优化consumer TPS;
- 确保consumer端没有因为异常而导致消费hang住;
- 如果你使用的是消费者组,确保没有频繁地发生rebalance
5.4. 有哪些情况下会出现生产消息重复
- 一个consumer正在消费一个分区的一条消息,还没有消费完,发生了rebalance(加入了一个consumer),从而导致这条消息没有消费成功,rebalance后,另一个consumer又把这条消息消费一遍
- 生产者重复发送。比如说你的业务在发送消息的时候,收到了一个超时响应,这个时候你很难确定这个消息是否真的发送出去了,那么你就会考虑重试,重试就可能导致同一个消息发送了多次。
5.5. 那些情景下会造成消息漏消费
- 自动提交 设置offset为自动定时提交,当offset被自动定时提交时,数据还在内存中未处理,此时刚好把线程kill掉,那么offset已经提交,但是数据未处理,导致这部分内存中的数据丢失。
- 生产者发送消息 发送消息设置的是fire-and-forget(发后即忘),它只管往 Kafka 中发送消息而并不关心消息是否正确到达。不过在某些时候(比如发生不可重试异常时)会造成消息的丢失。这种发送方式的性能最高,可靠性也最差。
- 消费者端 先提交位移,但是消息还没消费完就宕机了,造成了消息没有被消费。自动位移提交同理
- acks没有设置为all 如果在broker还没把消息同步到其他broker的时候宕机了,那么消息将会丢失
5.6. 有哪些情形会造成重复消费?
- Rebalance 一个consumer正在消费一个分区的一条消息,还没有消费完,发生了rebalance(加入了一个consumer),从而导致这条消息没有消费成功,rebalance后,另一个consumer又把这条消息消费一遍。
- 消费者端手动提交 如果先消费消息,再更新offset位置,导致消息重复消费。
- 消费者端自动提交 设置offset为自动提交,关闭kafka时,如果在close之前,调用 consumer.unsubscribe() 则有可能部分offset没提交,下次重启会重复消费。
- 生产者端 生产者因为业务问题导致的宕机,在重启之后可能数据会重发
5.7. Kafka中是怎么体现消息顺序性的?
- 可以通过分区策略体现消息顺序性。 分区策略有轮询策略、随机策略、按消息键保序策略。
按消息键保序策略:一旦消息被定义了 Key,那么你就可以保证同一个 Key 的所有消息都进入到相同的分区里面,由于每个分区下的消息处理都是有顺序的,故这个策略被称为按消息键保序策略
5.8. Kafka 中最基本的数据单元是消息 message(Kafka 中的消息理解成数据库里的一条行或者一条记录)
5.9. Kafka中的幂等是怎么实现的
- 在 Kafka 中,Producer 默认不是幂等性的,可以创建幂等性 Producer。它其实是 0.11.0.0 版本引入的新功能。在此之前,Kafka 向分区发送数据时,可能会出现同一条消息被发送了多次,导致消息重复的情况。在 0.11 之后,指定 Producer 幂等性的方法很简单,仅需要设置一个参数即可,即props.put(“enable.idempotence”, ture),或 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)。
- 底层具体的原理,就是经典的用空间去换时间的优化思路,即在 Broker 端多保存一些字段。当 Producer 发送了具有相同字段值的消息后,Broker 能够自动知晓这些消息已经重复了
5.10. 幂等性 Producer 作用范围
- 它只能保证单分区上的幂等性,即一个幂等性 Producer 能够保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性。其次,它只能实现单会话上的幂等性,不能实现跨会话的幂等性。这里的会话,你可以理解为 Producer 进程的一次运行。当你重启了 Producer 进程之后,这种幂等性保证就丧失了。
5.11. 解释Kafka的用户如何消费信息?
- 在Kafka中传递消息是通过使用sendfile API完成的。它支持将字节从套接口转移到磁盘,通过内核空间保存副本,并在内核用户之间调用内核。
5.12. Kafka中怎么做消息轨迹
- 消息轨迹指的是一条消息从生产者发出,经由 broker 存储,再到消费者消费的整个过程中,各个相关节点的状态、时间、地点等数据汇聚而成的完整链路信息。生产者、broker、消费者这3个角色在处理消息的过程中都会在链路中增加相应的信息,将这些信息汇聚、处理之后就可以查询任意消息的状态,进而为生产环境中的故障排除提供强有力的数据支持。对消息轨迹而言,最常见的实现方式是封装客户端,在保证正常生产消费的同时添加相应的轨迹信息埋点逻辑。无论生产,还是消费,在执行之后都会有相应的轨迹信息,我们需要将这些信息保存起来。同样可以将轨迹信息保存到 Kafka 的某个主题中,比如下图中的主题 trace_topic。生产者在将消息正常发送到用户主题 real_topic 之后(或者消费者在拉取到消息消费之后)会将轨迹信息发送到主题 trace_topic 中。
5.13. Kafka为什么这么快
- Kafka重度依赖底层操作系统提供的PageCache功能。当上层有写操作时,操作系统只是将数据写入PageCache,同时标记Page属性为Dirty。当读操作发生时,先从PageCache中查找,如果发生缺页才进行磁盘调度,最终返回需要的数据。实际上PageCache是把尽可能多的空闲内存都当做了磁盘缓存来使用。同时如果有其他进程申请内存,回收PageCache的代价又很小,所以现代的OS都支持PageCache。 使用PageCache功能同时可以避免在JVM内部缓存数据,JVM为我们提供了强大的GC能力,同时也引入了一些问题不适用与Kafka的设计。
5.14. kafka为什么不像MySQL那样允许追随者副本对外提供读服务
- kafka的分区已经让读是从多个broker读从而负载均衡,不是MySQL的主从,压力都在主上
- kafka保存的数据和数据库的性质有实质的区别就是数据具有消费的概念,是流数据,kafka是消息队列,所以消费需要位移,而数据库是实体数据不存在这个概念,如果从kafka的follower读,消费端offset控制更复杂
- 生产者来说,kafka可以通过配置来控制是否等待follower对消息确认的,如果从上面读,也需要所有的follower都确认了才可以回复生产者,造成性能下降,如果follower出问题了也不好处理
- 首先会存在数据一致性的问题,消息从主节点同步到从节点需要时间,可能造成主从节点的数据不一致。主写从读无非就是为了减轻leader节点的压力,将读请求的负载均衡到follower节点,如果Kafka的分区相对均匀地分散到各个broker上,同样可以达到负载均衡的效果,没必要刻意实现主写从读增加代码实现的复杂程度
5.15. Producer端,网络,数据格式等因素,会不会导致Kafka只有一个分区接收到数据顺序跟Producer发送数据顺序不一致
- 如果retries>0并且max.in.flight.requests.per.connection>1有可能出现消息乱序的情况
5.16. replica的leader和follower之间如何复制数据保证消息的持久化的问题
- follower副本不断地从leader处拉取消息。
- 生产者消息发过来以后,写leader成功后即告知生产者成功,然后异步的将消息同步给其他follower,这种方式效率最高,但可能丢数据;
- 同步等待所有follower都复制成功后通知生产者消息发送成功,这样不会丢数据,但效率不高;
5.17. 在partition增加或减少消息路由重新hash的情况下,消息的顺序性不就没有办法保证了。特别是在相同key的情况下,有状态变更顺序要求的场景。不知道对于类似场景有什么好的解决方案
- 可以自己写个partitioner,让相同的key用于去到相同的分区
5.18. 如果长时间不消费,提交的位移会过期吗?或者提交的位移的数据被清理了,下次启动重新消费时从什么位移开始消费?
- 提交的位移会过期。一旦被清理,从哪里消费取决于auto.offset.reset参数值
5.19. 异步发送消息,如果retry,是否会造成消息乱序?
- 是可能的,所以有max.in.flight.requests.per.connection这个参数
5.20. 消息经常堆积起来,不能消费了。大概会有一些什么情况。如何解决
- 生产速度大于消费速度,这样可以适当增加分区,增加consumer数量,提升消费TPS
- consumer消费性能低,查一下是否有很重的消费逻辑(比如拿到消息后写HDFS或HBASE这种逻辑就挺重的),看看是否可以优化consumer TPS
- 确保consumer端没有因为异常而导致消费hang住
- 如果你使用的是消费者组,确保没有频繁地发生rebalance
5.21. 如何判定 生产者速度大于消费者
- 用kafka自带的命令行工具kafka-consumer-groups.sh。可以查看指定消费者组对其消费的所有partition的位移落后情况(也就是业务上的堆积量)。在一段时间内多次使用这个工具查看消费位移落后的情况,如果越来越大,就说明消费慢于生产。
5.22. 在规划消息磁盘的时候会考虑什么
- 新增消息数
- 消息留存时间
- 平均消息大小
- 备份数
- 是否启用压缩
5.23. Kafka 无消息丢失的配置
- 不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。
- 设置 acks = all。acks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。
- 设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。
- 设置 unclean.leader.election.enable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。
- 设置 replication.factor >= 3。这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。
- 设置 min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。
- 确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1。
- 确保消息消费完成再提交。Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的。
5.24. 如何保证消息的不丢
6. 消费组
- 简述消费者与消费组之间的关系
- Consumer Group 下可以有一个或多个 Consumer 实例。这里的实例可以是一个单独的进程,也可以是同一进程下的线程。在实际场景中,使用进程更为常见一些
- Group ID 是一个字符串,在一个 Kafka 集群中,它标识唯一的一个 Consumer Group
- Consumer Group 下所有实例订阅的主题的单个分区,只能分配给组内的某个 Consumer 实例消费。这个分区当然也可以被其他的 Group 消费。
7. 位移
- 消费者提交消费位移时提交的是当前消费到的最新消息的offset还是offset+1?
- 在旧消费者客户端中,消费位移是存储在 ZooKeeper 中的。而在新消费者客户端中,消费位移存储在 Kafka 内部的主题__consumer_offsets 中。 当前消费者需要提交的消费位移是offset+1
- 主题有4个分区,消费者组有2个实例,发布应用的时候,会先新启动一个服务节点,加入消费组,通过重平衡分配到到至少1个最多2个分区,消费者的偏移量是 0 还是啥
- 假设C1消费P0,P1, C2消费P2,P3。如果C1从未提交,C1挂掉,C2开始消费P0,P1,发现没有对应提交位移,那么按照C2的auto.offset.reset值决定从那里消费,如果是earliest,从P0,P1的最小位移值(可能不是0)开始消费,如果是latest,从P0, P1的最新位移值(分区高水位值)开始消费。但如果C1之前提交了位移,那么C1挂掉之后C2从C1最新一次提交的位移值开始消费。 所谓的重复消费是指,C1消费了一部分数据,还没来得及提交这部分数据的位移就挂了。C2承接过来之后会重新消费这部分数据。
- 为什么位移主题写入消息时,不直接替换掉原来的数据,像 HashMap 一样呢?而是要堆积起来,另起线程来维护位移主题
- 位移主题也是主题,也要遵循Kafka底层的日志设计思路,即append-only log
- 位移主题用来记住位移,那么这个位移主题的位移由谁来记住呢?
- 位移主题的位移由Kafka内部的Coordinator自行管理
- 消费者提交的位移消息,保存到位移主题分区是随机的吗?
- 不是随机的。通常来说,同一个group下的所有消费者提交的位移数据保存在位移主题的同一个分区下
8. rebalance
- 如何缩短rebalance时间
- 减少consumer个数
当消费者拉取消息或者提交时,便会发送心跳。如果消费者超过一定时间没有发送心跳,那么它的会话(Session)就会过期,组协调者会认为该消费者已经宕机,然后触发重平衡。
另外更高版本的 Kafka 支持配置一个消费者多长时间不拉取消息但仍然保持存活,这个配置可以避免活锁(livelock)。活锁,是指应用没有故障但是由于某些原因不能进一步消费。
1. 缩短单条消息处理的时间:这个涉及业务流程的优化或者改造,得具体问题具体分析,在 bps 这个场景,暂时没有优化的空间 2. 增加消费者端允许下游系统消费一批消息的最大时长:当消费者组完成重平衡之后,每个消费者实例都会定期地向协调者发送心跳请求,表明它还存活着。如果某个消费者实例不能及时地发送这些心跳请求,协调者就会认为该消费者已经“死”了,从而将其从组中移除,然后开启新一轮重平衡。消费者端有个参数,叫 session.timeout.ms,就是被用来表征此事的。该参数的默认值是 10 秒,即如果协调者在 10 秒之内没有收到组内某个消费者实例的心跳,它就会认为这个消费者实例已经挂了。可以这么说,session.timeout.ms 决定了消费者存活性的时间间隔 3. 控制发送心跳请求频率:消费者还提供了一个允许你控制发送心跳请求频率的参数,就是 heartbeat.interval.ms。这个值设置得越小,消费者实例发送心跳请求的频率就越高。频繁地发送心跳请求会额外消耗带宽资源,但好处是能够更加快速地知晓当前是否开启重平衡 4. 减少下游系统一次性消费的消息总数:这取决于消费者端参数 max.poll.records 的值。当前该参数的默认值是 500 条,表明调用一次 KafkaConsumer.poll 方法,最多返回 500 条消息。可以说,该参数规定了单次 poll 方法能够返回的消息总数的上限。如果前两种方法对你都不适用的话,降低此参数值是避免 CommitFailedException 异常最简单的手段 5. 调整两次调用 poll 方法的最大时间间隔:消费者端还有一个参数,用于控制消费者实际消费能力对重平衡的影响,即 max.poll.interval.ms 参数。它限定了消费者端应用程序两次调用 poll 方法的最大时间间隔。它的默认值是 5 分钟,表示你的消费者程序如果在 5 分钟之内无法消费完 poll 方法返回的消息,那么消费者会主动发起“离开组”的请求,协调者也会开启新一轮重平衡 6. 下游系统使用多线程来加速消费:具体的思路就是,让下游系统手动创建多个消费线程处理 poll 方法返回的一批消息。之前你使用 Kafka 消费者消费数据更多是单线程的,所以当消费速度无法匹及 Kafka 消费者消息返回的速度时,它就会抛出 CommitFailedException 异常。如果是多线程,你就可以灵活地控制线程数量,随时调整消费承载能力,再配以目前多核的硬件条件,该方法可谓是防止 CommitFailedException 最高档的解决之道。事实上,很多主流的大数据流处理框架使用的都是这个方法,比如 Apache Flink 在集成 Kafka 时,就是创建了多个 KafkaConsumerThread 线程,自行处理多线程间的数据消费。不过,凡事有利就有弊,这个方法实现起来并不容易,特别是在多个线程间如何处理位移提交这个问题上,更是极容易出错。
- 什么情况下会Rebalance
- 只有consumer成员数量、订阅topic分区数发生增减才会触发
9. Producer
9.1. 谈一谈 Kafka Producer 的 acks 参数的作用。(producer发个broker)
- acks=0
- Producer不会等待Broker的确认反馈,不关心Broker是否正确的将发送来的数据持久化,所以在这种模式下,很有可能会丢失数据。因为如果Broker挂了,Producer不会被通知到,所以还会不停的发送数据导致数据丢失。在对数据完整性需求不强烈的场景下,这种模式可以提高性能。
- acks=1
- 默认采用的模式,该模式下Producer会等待Leader Broker的确认反馈,当Broker确实将数据持久化到至少一个Partition中后,给予Producer确认反馈,Producer才会继续发送数据。该模式下有几点需要注意:
- 不保证Replicas也持久化了数据。
- 当Producer没有收到Broker的确认反馈时,Producer会尝试重新发送数据。
- 当Leader Broker挂了,但是Replicas又没有持久化数据时,还是会丢失数据。
- 该模式只能说是可以有效防止数据丢失。
- 默认采用的模式,该模式下Producer会等待Leader Broker的确认反馈,当Broker确实将数据持久化到至少一个Partition中后,给予Producer确认反馈,Producer才会继续发送数据。该模式下有几点需要注意:
- acks=all
- Producer同样需要等待Broker的确认,但是确认更为严格,需要所有的Partition(Leader + Replicas)都持久化数据后才返回确认信息。这种模式下,只要Replicas足够多,数据基本不会丢失。
- 在该模式下,还有一个重要的参数min.insync.replicas需要配置。该参数的意思是当acks=all时,至少有多少个Replicas需要确认已成功持久化数据,这个Replicas数量也包括Leader。
9.2. 对producer的retry理解
- 有时候Producer发送Message失败可能并不是因为Broker挂了,可能是因为网络问题,没有连接到Broker等。这种问题可能在很短暂的时间内就会自动修复,那么在这种情况下,我们希望Producer在发送失败后能重新尝试发送。这里就需要设置retries这个参数,意思就是重试的次数,默认是0次,可以根据实际业务情况设置。
但是当设置了retries参数大于0后,有可能会带来新的问题。假如我们需要相同Key的Message进入特定的Partition,并且是要严格按照Producer生产Message的顺序排序。那么此时如果第一条Message发送失败,第二条Message发送成功了,第一条通过重试发送成功了,那Message的顺序就发生了变化。
这里又会引出一个参数max.in.flight.requests.per.connection,这个参数默认是5,意思是在被Broker阻止前,未通过acks确认的发送请求最大数,也就是在Broker处排队等待acks确认的Message数量。所以刚才那个场景,第一条和第二条Message都在Broker那排队等待确认放行,这时第一条失败了,等重试的第一条Message再来排队时,第二条早都通过进去了,所以排序就乱了。
如果想在设置了retries还要严格控制Message顺序,可以把max.in.flight.requests.per.connection设置为1。让Broker处永远只有一条Message在排队,就可以严格控制顺序了。但是这样做会严重影响性能(接收Message的吞吐量)。
Kafka在0.11版本之后,就为我们提供了定义幂等Producer的能力
- retries=Integer.MAX_VALUE
- max.in.flight.requests.per.connection=1 (Kafka >= v0.11 & < v1.1)
- max.in.flight.requests.per.connection=5 (Kafka >= v1.1)
- acks=all
9.3. 对Message Batch的理解
max.in.flight.requests.per.connection参数,默认会在Broker那排队5条Message,那么如果第六条来了怎么办呢?这时候Kafka会自动开启批量处理Message的模式,将这6条Message作为一个批次进行处理。这一个批次可以看作是一次Message处理请求。
开启批量模式后,会引出两个参数:
- linger.ms:每次批量处理的间隔时间。如果设为5,那么就是每5毫秒对Message进行一次批量处理。
- batch.size:每个批次的最大字节数,默认是16KB,可以设置为32KB或者64KB,可以提高性能。
9.4. 对 Producer Buffer的理解
在大多数情况下,Consumer消费Message的速率是远不如Producer生产Message的速率的。所以Producer有一个缓存机制,将Broker还没来得及接收的Message缓存在内存中。缓存的大小可以通过buffer.memory配置,默认大小是32MB。默认存储时间为7天,这个时间可以通过设置Broker的offset.retention.minutes属性改变。
如果Producer的缓存被打满后,Producer会被阻塞,阻塞的最大时间可以通过max.block.ms配置,默认大小是60秒。
概括一下,就是当Producer生产Message的速率大于Broker接收Message(Consumer消费数据)的速率时,Producer会把Broker还没来得及接收的Message存在缓存里(内存),当存满设置的缓存大小后,Producer将不再发送Message给Broker,也就是进入阻塞状态,如果在设置的阻塞时间内,缓存还没有被释放出有用空间,那么Producer将抛出异常。
9.5. 幂等生产者
消息交付可靠性保障 Kafka 对 Producer 和 Consumer 要处理的消息提供什么样的承诺。常见承诺有以下三种:
- 最多一次(at most once):消息可能会丢失,但绝不会被重复发送。
- 至少一次(at least once):消息不会丢失,但有可能被重复发送。(kafka默认)
- 精确一次(exactly once):消息不会丢失,也不会被重复发送。
指定 Producer 幂等性的方法很简单,仅需要设置一个参数即可,即 props.put(“enable.idempotence”, ture),或 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)。
底层具体的原理很简单,就是经典的用空间去换时间的优化思路,即在 Broker 端多保存一些字段。当 Producer 发送了具有相同字段值的消息后,Broker 能够自动知晓这些消息已经重复了
幂等性 Producer 作用范围
- 它只能保证单分区上的幂等性,即一个幂等性 Producer 能够保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性。其次,它只能实现单会话上的幂等性,不能实现跨会话的幂等性。这里的会话,你可以理解为 Producer 进程的一次运行。当你重启了 Producer 进程之后,这种幂等性保证就丧失了。
事务
事务型 Producer 能够保证将消息原子性地写入到多个分区中。这批消息要么全部写入成功,要么全部失败。另外,事务型 Producer 也不惧进程的重启。Producer 重启回来后,Kafka 依然保证它们发送消息的精确一次处理。
设置事务型 Producer 的方法也很简单,满足两个要求即可:
- 和幂等性 Producer 一样,开启 enable.idempotence = true。
- 设置 Producer 端参数 transactional. id。最好为其设置一个有意义的名字。
producer端可能发送重复消息,broker端有一套机制来去重(幂等性依赖seq number机制,事务依赖各种marker来标记)
- Producer 发送消息到 Broker 时,会根据Paritition 机制选择将其存储到哪一个Partition。如果 Partition 机制设置合理,所有消息可以均匀分布到不同的 Partition里,这样就实现了负载均衡。指明 Partition 的情况下,直接将给定的 Value 作为 Partition 的值。没有指明 Partition 但有 Key 的情况下,将 Key 的 Hash 值与分区数取余得到 Partition 值。既没有 Partition 有没有 Key 的情况下,第一次调用时随机生成一个整数(后面每次调用都在这个整数上自增),将这个值与可用的分区数取余,得到 Partition 值,也就是常说的 Round-Robin 轮询算法。
10. 调优
10.1. swap 的调优
- swap 的调优。网上很多文章都提到设置其为 0,将 swap 完全禁掉以防止 Kafka 进程使用 swap 空间。我个人反倒觉得还是不要设置成 0 比较好,我们可以设置成一个较小的值。为什么?因为一旦设置成 0,当物理内存耗尽时,操作系统会触发 OOM killer 这个组件,它会随机挑选一个进程然后 kill 掉,即根本不给用户任何的预警。但如果设置成一个比较小的值,当开始使用 swap 空间时,你至少能够观测到 Broker 性能开始出现急剧下降,从而给你进一步调优和诊断问题的时间。基于这个考虑,建议将 swappniess 配置成一个接近 0 但不为 0 的值,比如 1。
11. 其他
- kafka如何支持延迟消息队列
- 利用定时任务调度利用定时任务来实现延迟消息是最好、最简单的办法。对于一个延迟消息来说,一个延迟到 30 分钟后才可以被消费的消息,也可以认为是 30 分钟后才可以发送。也就是说,你可以设定一个定时任务,这个任务会在 30 分钟后把消息发送到消息服务器上。
每个分区独立一个文件存储,在分区数量较多时会退化成全局磁盘随机I/O,这也是Kafka在多Partition时吞吐量大幅下降的原因~
- 分区很多,并且都存在读写的场景才会触发。
- 为什么Kafka分区过多会导致顺序读写变为随机读写
- 因为数据是写入到硬盘的。 如果同时有很多个文件在同时往硬盘去读写的话。从硬盘的角度来看的话,就是同时在硬盘的不同位置去读写,此时硬盘就得去调度不同位置的读写。即使是SSD和NVME的盘,这种在频繁的在硬盘不同位置的读写就是会降低性能。从硬盘角度来看,就是在不同位置的随机读写。
当 Topic 的消息写入存在倾斜,某些分区消息堆积很多,此时选择哪种分区消费模式可以解决问题?
- 如果数据可以丢弃,那么可以通过重置消费位点到最新来解决历史堆积,让消费者可以消费新的数据。不过,这个方案有缺点,重置位点之前的数据会丢失,如果消费性能还是跟不上的话,那么后续还是会堆积。
- 如果数据不能丢弃,不用保证消费顺序,那么可以将消费模式切换到共享消费模式,则有多个消费者同时消费一个分区,可以极大地提升消费速度,还可以通过横向增加消费者,从根本上解决堆积问题。
- 如果数据不能丢弃,且需要保证消费顺序,那么就只能从发送端入手,分析为何发送端写入倾斜,然后解决写入倾斜的问题。
Kafka 从生产到消费的全过程
Kafka 的生产到消费总共经过生产者、Broker、消费者三个模块。大致的流程如下:
- 在生产端,客户端会先和 Broker 建立 TCP 连接,然后通过 Kafka 协议访问 Broker 的 Metadata 接口获取到集群的元数据信息。接着生产者会向 Topic 或分区发送数据,如果是发送到 Topic,那么在客户端会有消息分区分配的过程。因为 Kafka 协议具有批量发送语义,所以客户端会先在客户端缓存数据,然后根据一定的策略,通过异步线程将数据发送到 Broker。
- Broker 接收到数据后,会根据 Kafka 协议解析出请求内容,做好数据校验,然后重整数据结构,将数据按照分区的维度写入到底层不同的文件中。如果分区配置了副本,则消息数据会被同步到不同的 Broker 中进行保存。
- 在消费端,Kafka 提供了消费分组消费和指定分区消费两种模式。消费端也会先经过寻址拿到完整的元数据信息,然后连接上不同的 Broker。如果是消费分组模式消费,则需要经过重平衡、消费分区分配流程,然后连接上对应的分区的 Leader,接着调用 Broker 的 Fetch 接口进行消费。最后一步也是需要提交消费进度来保存消费信息。
哪些环节会存在性能瓶颈和数据可靠性风险?
影响消息队列性能和可靠性的因素很多
- 网络带宽与延迟:消息队列本质上还是一个I/O密集型系统,内部没有太多复杂的计算逻辑,因此网络无论对Producer、Broker还是Consumer来说都比较重要,网络一抖动,全链路的吞吐量可能就会受影响。
- Producer的发送模式:选择Oneway/Sync/Async不同的发送模式,会直接影响Producer的性能和可靠性。
- Broker的物理硬件:特别是磁盘和内存,会直接关系到Broker的存储和消费性能。
- Consumer的Rebalance:在Rebalance期间,整个消费会暂停,因此如何最大程度降低Rebalance的影响,对Consumer端来说比较重要。
Kafka 集群中修改配置 / 权限操作的流程?
Kafka 修改配置 / 权限的实现,是每个 Broker 直接去监听 Broker 中的节点。Broker 会直接监听 ZooKeeper 上的节点,然后根据 Hook 到的信息,做对应的操作。比如修改集群和 Topic 配置,就是 Broker 通过直接监听 ZooKeeper 的不同子节点来实现的。这种方式的好处是,Broker 直接监听 ZooKeeper,避免 Controller 转发一道,从而避免让 Controller 成为瓶颈,整体链路更短,出问题的概率也更低。
- 同一个Group中的不同Consumer实例可以订阅不同的Topic吗
可以的。虽然在实际使用中可能更多的还是同一个group的多个实例订阅相同的topic。
可能无法消费到这个 consumer 没有订阅的主题, 导致某个 consumer 挂掉之后, 有些消息无法消费
Kafka Broker 是如何持久化数据的。总的来说,Kafka 使用消息日志(Log)来保存数据,一个日志就是磁盘上一个只能追加写(Append-only)消息的物理文件。因为只能追加写入,故避免了缓慢的随机 I/O 操作,改为性能较好的顺序 I/O 写操作,这也是实现 Kafka 高吞吐量特性的一个重要手段。不过如果你不停地向一个日志写入消息,最终也会耗尽所有的磁盘空间,因此 Kafka 必然要定期地删除消息以回收磁盘。怎么删除呢?简单来说就是通过日志段(Log Segment)机制。在 Kafka 底层,一个日志又近一步细分成多个日志段,消息被追加写到当前最新的日志段中,当写满了一个日志段后,Kafka 会自动切分出一个新的日志段,并将老的日志段封存起来。Kafka 在后台还有定时任务会定期地检查老的日志段是否能够被删除,从而实现回收磁盘空间的目的。
消费组相关
开始分区1被消费者A消费,rebalance 后被消费者B消费,那么消费者B是对分区从头开始消费还是继承消费者A的位移继续消费?
- 如果A提交了位移,那么rebalance过后B从A提交的位移处继续消费。如果A没有提交过位移,那么视consumer端参数auto.offset.reset值而定
每次重启一个服务,都会产生下线一次rebalance,上线一次rebalance?
- 社区于2.4引入了静态成员变量,可以规避这个问题
rebalance时,全部实例都要参与重新分配。是否能参考 一致性哈希算法,尽量减少对全局的影响
----------------------------------分割线
- 解释Kafka的Zookeeper是什么?我们可以在没有Zookeeper的情况下使用Kafka吗?
- Zookeeper是一个开放源码的、高性能的协调服务,它用于Kafka的分布式应用。不,不可能越过Zookeeper,直接联系Kafka broker。一旦Zookeeper停止工作,它就不能服务客户端请求。·Zookeeper主要用于在集群中不同节点之间进行通信
- Consumer的水平扩展是如何实现的呢?
- Kafka支持Consumer的水平扩展能力。可以让多个Consumer加入一个Consumer Group(消费组),在一个Consumer Group中,每个分区只能分配给一个Consumer消费者,当Kafka服务端通过增加分区数量进行水平扩展后, 可以向Consumer Group中增加新的Consumer来提高整个Consumer Group的消费能力。当Consumer Group中的一个Consumer出现故障下线时,会通过Rebalance操作将下线Consumer,它负责处理的分区将分配给其他Consumer继续处理。当下线Consumer重新上线加人Consumer Group时,会再进行一次Rebalance操作,重新分配分区。
- 为了避免磁盘被占满,Katka会周期性地删除陈旧的消息,删除策略是什么呢?
- Kafka中有两种“保留策略”:一种是根据消息保留的时间,当消息在Kafka中保存的时间超过了指定时间,就可以被删除; 另一种是根据Topic存储的数据大小,当Topic所占的日志文件大小大于一个阈值,则可以开始删除最旧的消息。Kafka会启动一个后台线程,定期检查是否存在可以删除的消息。“保留策略”的配置是非常灵活的,可以有全局的配置,也可以针对Topic进行配置覆盖全局配置。
- 什么是broker?它的作用是什么?
- 一个单独的Kafka Server就是一个Broker。Broker的主要工作就是接收生产者发过来的消息,分配offset,之后保存到磁盘中。同时,接收消费者、其他Broker的请求,根据请求类型进行相应处理并返回响应。在一般的生产环境中,一个Broker独占一台物理服务器。
- 同一分区的多个副本包括的消息是否是一致的?
- 每个副本中包含的消息是一样的,但是在同一时刻,副本之间其实并不是完全一样的。
- Consumer Group中消费者的数量是不是越多越好呢?
- Consumer Group中消费者的数量并不是越多越好,当其中消费者数量超过分区的数量时,会导致有消费者分配不到分区,从而造成消费者的浪费。
- 详述一下消息在kafka中的生命周期?
- 生产者会根据业务逻辑产生消息,之后根据路由规则将消息发送到指定分区的Leader副本所在的Broker上。在Kafka服务端接收到消息后,会将消息追加到Log中保存,之后Follower副本会与Leader副本进行同步,当ISR集合中所有副本都完成了此消息的同步后,则Leader副本的HW会增加,并向生产者返回响应。 消费者加人到Consumer Group时,会触发Rebalance操作将分区分配给不同的消费者消费。随后,消费者会恢复其消费位置,并向Kafka服务端发送拉取消息的请求,Leader副本会验证请求的offset以及其他相关信息,最后返回消息。
某个分区的leader挂了,在切换选举到另外副本为leader时,这个副本还没同步之前的leader数据,这样数据就丢了 ?
对于producer而言,如果在乎数据持久性,那么应该设置acks=all,这样当出现你说的这个情况时,producer会被显式通知消息发送失败,从而可以重试。
Kafka中的ISR、AR又代表什么?ISR的伸缩又指什么
Kafka中的HW、LEO、LSO、LW等分别代表什么?
Kafka中的分区器、序列化器、拦截器是否了解?它们之间的处理顺序是什么?
KafkaConsumer是非线程安全的,那么怎么样实现多线程消费?
如果我指定了一个offset,Kafka怎么查找到对应的消息?
聊一聊你对Kafka的Log Retention的理解
聊一聊你对Kafka的Log Compaction的理解
聊一聊你对Kafka底层存储的理解(页缓存、内核层、块层、设备层)
聊一聊Kafka的延时操作的原理
聊一聊Kafka控制器的作用
消费再均衡的原理是什么?(提示:消费者协调器和消费组协调器)
Kafka中的事务是怎么实现的(这题我去面试6加被问4次,照着答案念也要念十几分钟,面试官简直凑不要脸。实在记不住的话...只要简历上不写精通Kafka一般不会问到,我简历上写的是“熟悉Kafka,了解RabbitMQ....”)
Kafka中有那些地方需要选举?这些地方的选举策略又有哪些?
怎么计算Lag?(注意read_uncommitted和read_committed状态下的不同)
Kafka的那些设计让它有如此高的性能?
在使用Kafka的过程中遇到过什么困难?怎么解决的?
怎么样才能确保Kafka极大程度上的可靠性?
kafka认为写入成功是指写入页缓存成功还是数据刷到磁盘成功算成功呢?还是上次刷盘宕机失败的问题,页缓存的数据如果刷盘失败,是不是就丢了?这个异常会不会响应给生产者让其重发呢?
写入到页缓存即认为成功。如果在flush之前机器就宕机了,的确这条数据在broker上就算丢失了。producer端表现如何取决于acks的设定。如果是acks=1而恰恰是leader broker在flush前宕机,那么的确有可能消息就丢失了,而且producer端不会重发——因为它认为是成功了。
Producer 通过 metadata.max.age.ms定期更新元数据,在连接多个broker的情况下,producer是如何决定向哪个broker发起该请求?
向它认为当前负载最少的节点发送请求,所谓负载最少就是指未完成请求数最少的broker
开始分区1被消费者A消费,rebalance 后被消费者B消费,那么消费者B是对分区从头开始消费还是继承消费者A的位移继续消费?
如果A提交了位移,那么rebalance过后B从A提交的位移处继续消费。如果A没有提交过位移,那么视consumer端参数auto.offset.reset值而定
- 为了提高效率,Kafka以批量的方式写入。一个batch就是一组消息的集合,这一组的数据都会进入同一个topic 和 partition(这个是根据 producer的配置来定的)。每一个消息都进行一次网络传输会很消耗性能,因此把消息收集到一起,再同时处理就高效的多了。当然,这样会引入更高的延迟以及吞吐量: batch 越大,同一时间处理的消息就越多。batch 通常都会进行压缩,这样在传输以及存储的时候效率都更高一些。
- 消息都是以主题 Topic 的方式组织在一起,Topic 也可以理解成传统数据库里的表,或者文件系统里的一个目录。一个主题由 broker 上的一个或者多个 Partition 分区组成。在 Kafka 中数据是以 Log 的方式存储,一个 partition 就是一个单独的 Log。消息通过追加的方式写入日志文件,读取的时候则是从头开始 按照顺序读取。注意,一个主题通常都是由多个分区组成的,每个分区内部保证消息的顺序行,分区之间是不保证顺序的。如果你想要 kafka 中的数据按照时 间的先后顺序进行存储,那么可以设置分区数为 1。如下图所示,一个主题由 4 个分区组成,数据都以追加的方式写入这四个文件。分区的方式为 Kafka 提供 了良好的扩展性,每个分区都可以放在独立的服务器上,这样就相当于主题可以在多个机器间水平扩展,相对于单独的服务器,性能更好。
- Kafka 中主要有两种使用者:Producer 和 consumer
- Producer 用来创建消息。在发布订阅系统中,他们也被叫做 Publisher 发布者或 writer 写作者。通常情况下,消息都会进入特定的主题。默认情况下,生产者 不关系消息到底进入哪个分区,它会自动在多个分区间负载均衡。也有的时候,消息会进入特定的一个分区中。一般都是通过消息的 key 使用哈希的方式确定 它进入哪一个分区。这就意味着如果所有的消息都给定相同的 key,那么他们最终会进入同一个分区。生产者也可以使用自定义的分区器,这样消息可以进入 特定的分区。
- Consumer 读取消息。在发布订阅系统中,也叫做 subscriber 订阅者或者 reader 阅读者。消费者订阅一个或者多个主题,然后按照顺序读取主题中的数据。消 费者需要记录已经读取到消息的位置,这个位置也被叫做 offset。每个消息在给定的分区中只有唯一固定的 offset。通过存储最后消费的 Offset,消费者应用 在重启或者停止之后,还可以继续从之前的位置读取。保存的机制可以是 zookeeper,或者 kafka 自己。
- 消费者是以 consumer group 消费者组的方式工作,由一个或者多个消费者组成一个组,共同消费一个 topic。每个分区在同一时间只能由 group 中的一个消费 者读取,在下图中,有一个由三个消费者组成的 grouop,有一个消费者读取主题中的两个分区,另外两个分别读取一个分区。某个消费者读取某个分区,也 可以叫做某个消费者是某个分区的拥有者。
- 在这种情况下,消费者可以通过水平扩展的方式同时读取大量的消息。另外,如果一个消费者失败了,那么其他的 group 成员会自动负载均衡读取之前失败的 消费者读取的分区。
- 单独的 kafka 服务器也叫做 broker,Broker 从生产者那里获取消息,分配 offset,然后提交存储到磁盘年。他也会提供消费者,让消费者读取分区上的消息, 并把存储的消息传给消费者。依赖于一些精简资源,单独的 broker 也可以轻松的支持每秒数千个分区和百万级的消息。
- Kafka 的 broker 支持集群模式,在 Broker 组成的集群中,有一个节点也被叫做控制器(是在活跃的节点中自动选择的)。这个 controller 控制器负责管理整个 集群的操作,包括分区的分配、失败节点的检测等。一个 partition 只能出现在一个 broker 节点上,并且这个 Broker 也被叫做分区的 leader。一个分区可以分 配多个 Broker,这样可以做到多个机器之间备份的效果。这种多机备份在其中一个 broker 失败的时候,可以自动选举出其他的 broker 提供服务。然而, producer 和 consumer 都必须连接 leader 才能正常工作。
- Kafka 的一个重要特性就是支持数据的过期删除,数据可以在 Broker 上保留一段时间。Kafka 的 broker 支持针对 topic 设置保存的机制,可以按照大小配置也 可以按照时间配置。一旦达到其中的一个限制,可能是时间过期也可能是大小超过配置的数值,那么这部分的数据都会被清除掉。每个 topic 都可以配置它自 己的过期配置,因此消息可以按照业务的需要进行持久化保留。比如,一个数据追踪分析的 topic 可以保留几天时间,一些应用的指标信息则只需要保存几个 小时。topic 支持日志数据的压缩,这样 kafka 仅仅会保留最后一条日志生成的 key。这在修改日志类型的时候会非常有用。
假设有个 Kafka 集群由 2 台 Broker 组成,有个主题有 5 个分区,当一个消费该主题的消费者程序启动时,你认为该程序会创建多少个 Socket 连接?为什么?
- 整个生命周期里会建立4个连接,进入稳定的消费过程后,同时保持3个连接,以下是详细。 第一类连接:确定协调者和获取集群元数据。 一个,初期的时候建立,当第三类连接建立起来之后,这个连接会被关闭。 第二类连接:连接协调者,令其执行组成员管理操作。 一个 第三类连接:执行实际的消息获取。 两个分别会跟两台broker机器建立一个连接,总共两个TCP连接,同一个broker机器的不同分区可以复用一个socket。只有2个 Broker,5个分区的领导者副本,由zookeeper分配Leader,所以默认是均匀的,故有4个TCP连接。
你是如何解决有序消息这个问题的?用的是哪种方案?
- 只需要确保同一个业务的消息发送到同一个分区就可以保证同一个业务的消息是有序的。
怎么保证同一个业务的消息必然发送到同一个分区呢?
- 只需要生产者在发送消息的时候,根据业务特征,比如说业务 ID 计算出目标分区,在发送的时候显式地指定分区就可以了。
如果你用的是单分区解决方案,那么有没有消息积压问题?如果有,你是怎么解决的?
如果你用的是多分区解决方案,那么有没有分区负载不均衡的问题?如果有,你是怎么解决的?
增加分区会引起消息失序
- 它还有另外一个缺点,就是如果中间有增加新的分区,那么就可能引起消息失序。比如说最开始 id 为 3 的订单消息 msg1 发到分区 0 上,但是这时候很不幸分区 0 上积攒了很多消息,所以 msg1 迟迟得不到消费。
- 紧接着我们扩容,增加了一个新的分区。如果这时候来了一个消息 msg2,那么它会被转发到分区 3 上。分区 3 上面没有积攒什么数据,所以消费者 3 直接就消费了这个消息。
- 这时候我们发现,本来 msg1 应该先于 msg2 被消费。而增加分区之后 msg2 反而被先消费了。这就是一个典型的消息失序场景。
针对这个缺点我们也可以进一步提出解决方案。这个消息失序的场景解决起来倒也很简单,就是新增加了分区之后,这些新分区的消费者先等一段时间,比如说三分钟,确保同一个业务在其他分区上的消息已经被消费了。
要解决这个问题也很容易。对于新加入的分区,可以暂停消费一段时间。比如说在前面的例子中,如果我们估算 msg1 会在一分钟内被消费,那么新加入的分区的消费者可以在三分钟后再开始消费。那么大概率 msg1 就会先于 msg2 消费。不过这种等待的解决方式并不能解决根本问题,只能说是很大程度上缓解了问题。但是本身增加分区也是一个很不常见的操作,再叠加消息失序的概率也很低,所以我们也可以通过监控发现这种失序场景,然后再手工修复一下就可以了。
- 点赞
- 收藏
- 关注作者
评论(0)