kafka位移

举报
Rolle 发表于 2023/11/30 17:29:49 2023/11/30
【摘要】 诞生背景老版本的Kafka会把位移信息保存在Zk中,当Consumer重启后,自动从Zk中读取位移信息。这种设计使Kafka Broker不需要保存位移数据,可减少Broker端需要持有的状态空间,有利于实现高伸缩性。但zk不适用于高频的写操作,这令zk集群性能严重下降,在新版本中将消费者的位移数据作为一条条普通的Kafka消息,提交至内部主题(_consumer_offsets)中保存。...

诞生背景

  • 老版本的Kafka会把位移信息保存在Zk中,当Consumer重启后,自动从Zk中读取位移信息。这种设计使Kafka Broker不需要保存位移数据,可减少Broker端需要持有的状态空间,有利于实现高伸缩性。
  • 但zk不适用于高频的写操作,这令zk集群性能严重下降,在新版本中将消费者的位移数据作为一条条普通的Kafka消息,提交至内部主题(_consumer_offsets)中保存。实现高持久性和高频写操作。特点:
  • 位移主题是一个普通主题,同样可以被手动创建,修改,删除。
  • 位移主题的消息格式是kafka定义的,不可以被手动修改,若修改格式不正确,kafka将会崩溃。
  • 位移主题保存了三部分内容:Group ID,主题名,分区号。 创建:
  • 当Kafka集群中的第一个Consumer程序启动时,Kafka会自动创建位移主题。也可以手动创建
  • 分区数依赖于Broker端的offsets.topic.num.partitions的取值,默认为50
  • 副本数依赖于Broker端的offsets.topic.replication.factor的取值,默认为3 使用:
  • 当Kafka提交位移消息时会使用这个主题
  • 位移提交得分方式有两种:手动和自动提交位移。
  • 推荐使用手动提交位移,自动提交位移会存在问题:只有consumer一直启动设置,他就会无限期地向主题写入消息。清理:
  • Kafka使用Compact策略来删除位移主题中的过期消息,避免位移主题无限膨胀。
  • kafka提供专门的后台线程定期巡检待compcat的主题,查看是否存在满足条件的可删除数据。

注意事项:

  • 建议不要修改默认分区数,在kafka中有些许功能写死的是50个分区
  • 建议不要使用自动提交模式,采用手动提交,避免消费者无限制的写入消息。
  • 后台定期巡检线程叫Log Cleaner,若线上遇到位移主题无限膨胀占用过多磁盘,应该检查此线程的工作状态。

消费者提了异步 commit 实际还没更新完offset,消费者再不断地poll,其实会有重复消费的情况?

  • 只要consumer没有重启,不会发生重复消费。因为在运行过程中consumer会记录已获取的消息位移

Topic是由Partition构成的。构成Partition的Segment文件里有以下两个文件

  • *.index:这个文件记录了Message Offset,可以让Kafka通过Message Offset快速定位到Message。
  • *.timeindex:这个文件记录了Message的时间戳,可以让Kafka通过绝对时间定位到Message。

评论再过一遍

消费端可以通过设置参数 enable.auto.commit 来控制是自动提交还是手动,如果值为 true 则表示自动提交,在消费端的后台会定时的提交消费位移信息,时间间隔由 auto.commit.interval.ms(默认为5秒)。可能存在重复的位移数据提交到消费位移主题中,因为每隔5秒会往主题中写入一条消息,不管是否有新的消费记录,这样就会产生大量的同 key 消息,其实只需要一条,因此需要依赖前面提到日志压缩策略来清理数据。重复消费,假设位移提交的时间间隔为5秒,那么在5秒内如果发生了 rebalance,则所有的消费者会从上一次提交的位移处开始消费,那么期间消费的数据则会再次被消费。我们来看看集中 Delivery Guarantee:读完消息先 commit 再处理消息。这种模式下,如果 Consumer 在 commit 后还没来得及处理消息就 crash 了,下次重新开始工作后就无法读到刚刚已提交而未处理的消息,这就对应于 At most once。读完消息先处理再 commit。这种模式下,如果在处理完消息之后 commit 之前 Consumer crash 了,下次重新开始工作时还会处理刚刚未 commit 的消息,实际上该消息已经被处理过了。这就对应于At least once。

1 概念区分 A :Consumer端的位移概念和消息分区的位移概念不是一回事。 B :Consumer的消费位移,记录的是Consumer要消费的下一条消息的位移。 2 提交位移 A :Consumer 要向Kafka汇报自己的位移数据,这个汇报过程被称为提交位移(Committing Offsets)。 B :Consumer需要为分配给它的每个分区提交各自的位移数据。 3提交位移的作用 A :提交位移主要是为了表征Consumer的消费进度,这样当Consumer发生故障重启后,能够从kafka中读取之前提交的位移值,从相应的位置继续消费,避免从头在消费一遍。 4 位移提交的特点 A :位移提交的语义保障是由你来负责的,Kafka只会“无脑”地接受你提交的位移。位移提交错误,就会消息消费错误。 5 位移提交方式 A :从用户的角度讲,位移提交分为自动提交和手动提交;从Consumer端的角度而言,位移提交分为同步提交和异步提交。 B :自动提交:由Kafka consumer在后台默默的执行提交位移,用户不用管。开启简单,使用方便,但可能会出现重复消费。 C :手动提交:好处在更加灵活,完全能够把控位移提交的时机和频率。 (1)同步提交:在调用commitSync()时,Consumer程序会处于阻塞状态,直到远端Broker返回提交结果,这个状态才会结束。对TPS影响显著 (2)异步提交:在调用commitAsync()时,会立即给响应,但是出问题了它不会自动重试。 (3)手动提交最好是同步和异步结合使用,正常用异步提交,如果异步提交失败,用同步提交方式补偿提交。 D :批次提交:对于一次要处理很多消费的Consumer而言,将一个大事务分割成若干个小事务分别提交。这可以有效减少错误恢复的时间,避免大批量的消息重新消费。 (1)使用commitSync(Map<TopicPartition,Offset>)和commitAsync(Map<TopicPartition,OffsetAndMetadata>)。

Consumer Exactly Once

Flink 提供的 checkpoint 机制,结合 Source/Sink 端配合支持 Exactly Once 语义,以 Hive 为例:从 Kafka 消费数据,写入到临时目录ck snapshot 阶段,将 Offset 存储到 State 中,Sink 端关闭写入的文件句柄,以及保存 ckid 到 State 中ck complete 阶段,commit kafka offset,将临时目录中的数据移到正式目录ck recover 阶段,恢复 state 信息,reset kafka offset;恢复 last ckid,将临时目录的数据移动到正式目录

CommitFailedException异常

本次提交位移失败了,原因是消费者组已经开启了 Rebalance 过程,并且将要提交位移的分区分配给了另一个消费者实例。出现这个情况的原因是,你的消费者实例连续两次调用 poll 方法的时间间隔超过了期望的 max.poll.interval.ms 参数值。这通常表明,你的消费者实例花费了太长的时间进行消息处理,耽误了调用 poll 方法。

社区给出了两个相应的解决办法

  • 增加期望的时间间隔 max.poll.interval.ms 参数值。
  • 减少 poll 方法一次性返回的消息数量,即减少 max.poll.records 参数值。

场景1

当消息处理的总时间超过预设的 max.poll.interval.ms 参数值时,Kafka Consumer 端会抛出 CommitFailedException 异常。

办法

  1. 缩短单条消息处理的时间。比如,之前下游系统消费一条消息的时间是 100 毫秒,优化之后成功地下降到 50 毫秒,那么此时 Consumer 端的 TPS 就提升了一倍。
  2. 增加 Consumer 端允许下游系统消费一批消息的最大时长。
  3. 减少下游系统一次性消费的消息总数。这取决于 Consumer 端参数 max.poll.records 的值。当前该参数的默认值是 500 条,表明调用一次 KafkaConsumer.poll 方法,最多返回 500 条消息。可以说,该参数规定了单次 poll 方法能够返回的消息总数的上限。如果前两种方法对你都不适用的话,降低此参数值是避免 CommitFailedException 异常最简单的手段。
  4. 下游系统使用多线程来加速消费。这应该算是“最高级”同时也是最难实现的解决办法了。具体的思路就是,让下游系统手动创建多个消费线程处理 poll 方法返回的一批消息。之前你使用 Kafka Consumer 消费数据更多是单线程的,所以当消费速度无法匹及 Kafka Consumer 消息返回的速度时,它就会抛出 CommitFailedException 异常。如果是多线程,你就可以灵活地控制线程数量,随时调整消费承载能力,再配以目前多核的硬件条件,该方法可谓是防止 CommitFailedException 最高档的解决之道。事实上,很多主流的大数据流处理框架使用的都是这个方法,比如 Apache Flink 在集成 Kafka 时,就是创建了多个 KafkaConsu

除了调整 max.poll.interval.ms 之外,你还可以选择调整 max.poll.records 值,减少每次 poll 方法返回的消息数。

重置位移策略

image.png

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。