Kafka中的offset总结

举报
Summer_ 发表于 2020/09/22 17:43:24 2020/09/22
【摘要】 kafka 生产和消费的offset

1.       生产端offset

Kafka接收到生产者发送的消息实际上是以日志文件的形式保存在对应分区的磁盘上。每条消息都有一个offset值来表示它在分区中的位置。每次写入都是追加到文件的末尾,如下图虚线框表示。

image.png

image.png

如上图所示,它代表一个日志文件,这个日志文件中有 9 条消息,第一条消息的 offset( logStartOffset) 0,最后一条消息的 offset 8LEO(Log End Offset) 9 ,代表下一条待写入的消息。日志文件的 HW(Low Watermark) 6,表示消费者只能拉取到 offset 0 5 之间的消息, offset 6 的消息对消费者而言是不可见的。

每个分区副本都会维护自身的LEO,而ISR集合中最小的LEO即为分区的HW

2.      消费端offset

消费者在消费时,也维护一个offset,表示消费到分区中的某个消息所在的位置。

image.png

如上图所示,ConsumerAoffset=9,表示ConsumerA已经消费完offset8的那条数据,提交的offset值为9,下次消费从offset9的数据开始消费。

消费者提交的offset值维护在__consumer_offsets这个Topic中,具体维护在哪个分区中,是由消费者所在的消费者组groupid决定,计算方式是:groupidhashCode值对50取余。当kafka环境正常而消费者不能消费时,有可能是对应的__consumer_offsets分区leadernone-1,或者分区中的日志文件损坏导致。

消费者提交offset方式可以是手动提交也可以是自动提交,相关的参数设置是enable.auto.commit,参数默认为true,表示每5秒拉取分区中最大的消息位移进行提交。参数设置为false时,需要手动提交offset,提交方式有同步提交(commitSync)和异步提交(commitAsync)两种方式。同步提交会根据poll方法拉取最新位移进行提交,只要没有发生不可恢复的错误,它就会阻塞消费线程直至提交完成。异步提交执行时不会阻塞消费线程,但有可能出现先提交的位移失败了而后提交的位移成功了,如果重试,就会发生重复消费。对此,可设置递增的序号来维护异步提交顺序,也可以在退出或者rebalance前使用同步提交。

消费者消费时,如果没有对应的offset记录会按auto.offset.reset的配置来消费,默认值为latest,表示从分区末尾开始消费。如果配置为earliest表示从分区起始处开始消费。在代码中也可以通过seek()方法指定分区具体的offset处开始消费。另外,我们也可以重置消费者组的offset,具体方式见4重置Consumer offset

 

消费者消费提交的offset也会被定期清理,对应的参数是:

offsets.retention.check.interval.msoffset定期检查数据过期周期

offsets.retention.minutesoffset保留时长

超过offsets.retention.minutes时间且offset没有改变时,消费者提交的offset会被清理掉,再次消费时会按auto.offset.reset配置去消费。此时,会有数据丢失或者重复,可通过重置offset来解决。

3.      数据目录中checkpoint维护的offset

image.png

Kafka每一个数据根目录都会包含最基本的N个检查点文件(xxx- checkpoint,之所以是N个,是因为随着版本的更新在不断新增checkpoint文件)和meta.properties文件,在创建topic的时候,如果当前broker中不止配置了一个data目录,那么会挑选分区数量最少的那个data目录来完成本次创建任务。

 

各个checkpoint作用如下:

recovery-point-offset-checkpoint:表示已经刷写到磁盘的offset信息,对应LEO信息。kafka中会有一个定时任务负责将所有分区的LEO刷写到恢复点文件recovery-point-offset-checkpoint中,定时周期由broker端参数log.flush.offset.checkpoint.interval.ms配置,默认值60000,即60sKafka在启动时会检查文件的完整性,如果没有.kafka_cleanshutdown这个文件,就会进入一个recover逻辑,recover就是从此文件中的offset开始。

replication-offset-checkpoint:用来存储每个replicaHW,表示已经被commitedoffset信息。失败的follower开始恢复时,会首先将自己的日志截断到上次的checkpointed时刻的HW,然后向leader拉取消息。kafka有一个定时任务负责将所有分区的HW刷写到复制点文件replication-offset-checkpoint中,定时周期由broker端参数replica.high.watermark.checkpoint.interval.ms配置,默认值5000,即5s

log-start-offset-checkpoint:对应logStartOffset,用来标识日志的起始偏移量。kafka中有一个定时任务负责将所有分区的logStartOffset刷写到起始点文件log-start-offset-checkpoint中,定时周期有broker端参数log.flush.start.offset.checkpoint.interval.ms配置,默认值60000,即60s

cleaner-offset-checkpoint:存了每个log的最后清理offset

4. 重置Consumer offset

更新Offset由三个维度决定:Topic的作用域、重置策略、执行方案。

Topic的作用域

Ø  --all-topics:为consumer group下所有topic的所有分区调整位移)

Ø  --topic t1 --topic t2:为指定的若干个topic的所有分区调整位移

Ø  --topic t1:0,1,2:为指定的topic分区调整位移

重置策略

Ø  --to-earliest:把位移调整到分区当前最小位移

Ø  --to-latest:把位移调整到分区当前最新位移

Ø  --to-current:把位移调整到分区当前位移

Ø  --to-offset <offset> 把位移调整到指定位移处

Ø  --shift-by N 把位移调整到当前位移 + N处,注意N可以是负数,表示向前移动

Ø  --to-datetime <datetime>:把位移调整到大于给定时间的最早位移处,datetime格式是yyyy-MM-ddTHH:mm:ss.xxx,比如2017-08-04T00:00:00.000

Ø  --by-duration <duration>:把位移调整到距离当前时间指定间隔的位移处,duration格式是PnDTnHnMnS,比如PT0H5M0S

Ø  --from-file <file>:从CSV文件中读取调整策略

确定执行方案

Ø  什么参数都不加:只是打印出位移调整方案,不具体执行

Ø  --execute:执行真正的位移调整

Ø  --export:把位移调整方案按照CSV格式打印,方便用户成csv文件,供后续直接使用

常用示例

Ø  更新到当前group最初的offset位置

bin/kafka-consumer-groups.sh --bootstrap-server IP:Port --group test-group --reset-offsets --all-topics --to-earliest --execute

Ø  更新到指定的offset位置

bin/kafka-consumer-groups.sh --bootstrap-server IP:Port --group test-group --reset-offsets --all-topics --to-offset 500000 --execute

Ø  更新到当前offset位置(解决offset的异常)

bin/kafka-consumer-groups.sh --bootstrap-server IP:Port --group test-group --reset-offsets --all-topics --to-current --execute

Ø  offset位置按设置的值进行位移

bin/kafka-consumer-groups.sh --bootstrap-server IP:Port --group test-group --reset-offsets --all-topics --shift-by -100000 --execute

Ø  offset设置到指定时刻开始

bin/kafka-consumer-groups.sh --bootstrap-server IP:Port --group test-group --reset-offsets --all-topics --to-datetime 2017-08-04T14:30:00.000





参考文档:

  1. 深入理解Kafka核心设计与实践原理   朱忠华著

  2. https://cloud.tencent.com/developer/article/1436988

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。