Kafka中的offset总结
1. 生产端offset
Kafka接收到生产者发送的消息实际上是以日志文件的形式保存在对应分区的磁盘上。每条消息都有一个offset值来表示它在分区中的位置。每次写入都是追加到文件的末尾,如下图虚线框表示。
如上图所示,它代表一个日志文件,这个日志文件中有 9 条消息,第一条消息的 offset( logStartOffset)为 0,最后一条消息的 offset 为 8,LEO(Log End Offset)为 9 ,代表下一条待写入的消息。日志文件的 HW(Low Watermark)为 6,表示消费者只能拉取到 offset 在 0 至 5 之间的消息, 而 offset 为 6 的消息对消费者而言是不可见的。
每个分区副本都会维护自身的LEO,而ISR集合中最小的LEO即为分区的HW。
2. 消费端offset
消费者在消费时,也维护一个offset,表示消费到分区中的某个消息所在的位置。
如上图所示,ConsumerA的offset=9,表示ConsumerA已经消费完offset为8的那条数据,提交的offset值为9,下次消费从offset为9的数据开始消费。
消费者提交的offset值维护在__consumer_offsets这个Topic中,具体维护在哪个分区中,是由消费者所在的消费者组groupid决定,计算方式是:groupid的hashCode值对50取余。当kafka环境正常而消费者不能消费时,有可能是对应的__consumer_offsets分区leader为none或-1,或者分区中的日志文件损坏导致。
消费者提交offset方式可以是手动提交也可以是自动提交,相关的参数设置是enable.auto.commit,参数默认为true,表示每5秒拉取分区中最大的消息位移进行提交。参数设置为false时,需要手动提交offset,提交方式有同步提交(commitSync)和异步提交(commitAsync)两种方式。同步提交会根据poll方法拉取最新位移进行提交,只要没有发生不可恢复的错误,它就会阻塞消费线程直至提交完成。异步提交执行时不会阻塞消费线程,但有可能出现先提交的位移失败了而后提交的位移成功了,如果重试,就会发生重复消费。对此,可设置递增的序号来维护异步提交顺序,也可以在退出或者rebalance前使用同步提交。
消费者消费时,如果没有对应的offset记录会按auto.offset.reset的配置来消费,默认值为latest,表示从分区末尾开始消费。如果配置为earliest表示从分区起始处开始消费。在代码中也可以通过seek()方法指定分区具体的offset处开始消费。另外,我们也可以重置消费者组的offset,具体方式见4重置Consumer offset。
消费者消费提交的offset也会被定期清理,对应的参数是:
offsets.retention.check.interval.ms:offset定期检查数据过期周期
offsets.retention.minutes:offset保留时长
超过offsets.retention.minutes时间且offset没有改变时,消费者提交的offset会被清理掉,再次消费时会按auto.offset.reset配置去消费。此时,会有数据丢失或者重复,可通过重置offset来解决。
3. 数据目录中checkpoint维护的offset
Kafka每一个数据根目录都会包含最基本的N个检查点文件(xxx- checkpoint,之所以是N个,是因为随着版本的更新在不断新增checkpoint文件)和meta.properties文件,在创建topic的时候,如果当前broker中不止配置了一个data目录,那么会挑选分区数量最少的那个data目录来完成本次创建任务。
各个checkpoint作用如下:
recovery-point-offset-checkpoint:表示已经刷写到磁盘的offset信息,对应LEO信息。kafka中会有一个定时任务负责将所有分区的LEO刷写到恢复点文件recovery-point-offset-checkpoint中,定时周期由broker端参数log.flush.offset.checkpoint.interval.ms配置,默认值60000,即60s。Kafka在启动时会检查文件的完整性,如果没有.kafka_cleanshutdown这个文件,就会进入一个recover逻辑,recover就是从此文件中的offset开始。
replication-offset-checkpoint:用来存储每个replica的HW,表示已经被commited的offset信息。失败的follower开始恢复时,会首先将自己的日志截断到上次的checkpointed时刻的HW,然后向leader拉取消息。kafka有一个定时任务负责将所有分区的HW刷写到复制点文件replication-offset-checkpoint中,定时周期由broker端参数replica.high.watermark.checkpoint.interval.ms配置,默认值5000,即5s。
log-start-offset-checkpoint:对应logStartOffset,用来标识日志的起始偏移量。kafka中有一个定时任务负责将所有分区的logStartOffset刷写到起始点文件log-start-offset-checkpoint中,定时周期有broker端参数log.flush.start.offset.checkpoint.interval.ms配置,默认值60000,即60s。
cleaner-offset-checkpoint:存了每个log的最后清理offset。
4. 重置Consumer offset
更新Offset由三个维度决定:Topic的作用域、重置策略、执行方案。
Topic的作用域
Ø --all-topics:为consumer group下所有topic的所有分区调整位移)
Ø --topic t1 --topic t2:为指定的若干个topic的所有分区调整位移
Ø --topic t1:0,1,2:为指定的topic分区调整位移
重置策略
Ø --to-earliest:把位移调整到分区当前最小位移
Ø --to-latest:把位移调整到分区当前最新位移
Ø --to-current:把位移调整到分区当前位移
Ø --to-offset <offset>: 把位移调整到指定位移处
Ø --shift-by N: 把位移调整到当前位移 + N处,注意N可以是负数,表示向前移动
Ø --to-datetime <datetime>:把位移调整到大于给定时间的最早位移处,datetime格式是yyyy-MM-ddTHH:mm:ss.xxx,比如2017-08-04T00:00:00.000
Ø --by-duration <duration>:把位移调整到距离当前时间指定间隔的位移处,duration格式是PnDTnHnMnS,比如PT0H5M0S
Ø --from-file <file>:从CSV文件中读取调整策略
确定执行方案
Ø 什么参数都不加:只是打印出位移调整方案,不具体执行
Ø --execute:执行真正的位移调整
Ø --export:把位移调整方案按照CSV格式打印,方便用户成csv文件,供后续直接使用
常用示例
Ø 更新到当前group最初的offset位置
bin/kafka-consumer-groups.sh --bootstrap-server IP:Port --group test-group --reset-offsets --all-topics --to-earliest --execute
Ø 更新到指定的offset位置
bin/kafka-consumer-groups.sh --bootstrap-server IP:Port --group test-group --reset-offsets --all-topics --to-offset 500000 --execute
Ø 更新到当前offset位置(解决offset的异常)
bin/kafka-consumer-groups.sh --bootstrap-server IP:Port --group test-group --reset-offsets --all-topics --to-current --execute
Ø offset位置按设置的值进行位移
bin/kafka-consumer-groups.sh --bootstrap-server IP:Port --group test-group --reset-offsets --all-topics --shift-by -100000 --execute
Ø offset设置到指定时刻开始
bin/kafka-consumer-groups.sh --bootstrap-server IP:Port --group test-group --reset-offsets --all-topics --to-datetime 2017-08-04T14:30:00.000
参考文档:
深入理解Kafka核心设计与实践原理 朱忠华著
- 点赞
- 收藏
- 关注作者
评论(0)