Kafka的Topic级别调优参数

举报
边塞诗人 发表于 2022/07/26 19:45:42 2022/07/26
【摘要】         与Topic相关的配置既有服务器默认值,也可以单独设置覆盖默认值。如果没有给出每个Topic的配置,则使用服务器默认值。可以在创建主题时通过提供一个或多个--config选项来设置覆盖。示例为创建一个名为my-topic 的主题,具有自定义的broker批量接收消息大小和刷新率:bin/kafka-topics.sh --zookeeper <ZK_IP1:ZK_PORT,Z...

        与Topic相关的配置既有服务器默认值,也可以单独设置覆盖默认值。如果没有给出每个Topic的配置,则使用服务器默认值。可以在创建主题时通过提供一个或多个--config选项来设置覆盖。示例为创建一个名为my-topic 的主题,具有自定义的broker批量接收消息大小和刷新率:

bin/kafka-topics.sh --zookeeper <ZK_IP1:ZK_PORT,ZK_IP2:ZK_PORT,.../kafka>  --create --topic my-topic --partitions 1 --replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1

修改Topic配置:

bin/kafka-configs.sh --zookeeper <ZK_IP1:ZK_PORT,ZK_IP2:ZK_PORT,.../kafka> --entity-type topics --entity-name my-topic --alter --config max.message.bytes=128000

查看Topic配置:

bin/kafka-configs.sh --zookeeper <ZK_IP1:ZK_PORT,ZK_IP2:ZK_PORT,.../kafka> --entity-type topics --entity-name my-topic --describe

删除Topic配置:

bin/kafka-configs.sh --zookeeper <ZK_IP1:ZK_PORT,ZK_IP2:ZK_PORT,.../kafka>  --entity-type topics --entity-name my-topic --alter --delete-config max.message.bytes

配置项 缺省值 调优场景 备注
cleanup.policy delete 当消息过期时(超过了要保存的时间),采用的清除策略。delete策略:过期消息直接删除;compact策略:过期消息将会根据消息Key值被压缩,压缩之后相同Key值的消息仅保留一条最新的记录。
compression.type producer 指定topic最终的数据压缩方式,如果这是为producer,那么将保留Producer的压缩方式。
delete.retention.ms 86400000 为日志压缩主题保留删除墓碑标记的时间量。
file.delete.delay.ms 60000 从文件系统中删除文件之前的等待时间。
flush.messages 9223372036854775807 此设置允许指定我们将强制将数据写入日志的时间间隔。
flush.ms 9223372036854775807 此设置允许指定一个时间间隔,在该时间间隔内我们将强制将数据写入日志。
follower.replication.throttled.replicas "" 应在follower侧限制其日志复制的副本列表。
index.interval.bytes 4096 指定Topic每隔多少字节创建一个消息索引。
leader.replication.throttled.replicas "" 应在leader侧限制其日志复制的副本列表。
log.partition.strategy

org.apache.kafka.clients.producer.internals.DefaultPartitioner

当partition日志文件创建时,采用的创建策略,文件个数或者磁盘容量。



max.compaction.lag.ms

9223372036854775807

消息在日志中保持不符合压缩条件的最长时间。仅适用于正在压缩的日志。 Version2.3+
max.message.bytes 1000012 Kafka 允许的最大记录批量大小。如果增加这个值并且有超过 0.10.2 的消费者,消费者的获取大小也必须增加,以便他们可以获取这么大的记录批次。对于未进行压缩的消息来说该值为单条消息记录的大小。
message.format.version
指定Topic的生成日志的格式版本;版本号和kafka API版本号相同。
message.timestamp.difference.max.ms 9223372036854775807 Topic消息时间戳差别最大值,仅当message.timestamp.type设置为CreateTime时有效,若收到的消息中携带的时间戳与当前时间的差值大于该值时,该消息会被拒绝。
message.timestamp.type CreateTime 设置Topic消息时间戳记录的内容,可记录消息创建的时间,或者消息数据写入时间。
min.cleanable.dirty.ratio 0.5 此配置控制日志压缩器尝试清理日志的频率
min.compaction.lag.ms 0 指定消息被清理之前的最小保留时间,此配置仅适用于使用compact策略清理的Topic。
min.insync.replicas 1 当Producer设置acks为-1时,指定需要写入成功的副本的最小数目。
preallocate false 预分配:文件段在磁盘上的预分配。
retention.bytes -1 指定Topic每个Partition上的日志数据所能达到的最大字节。
retention.ms 604800000 指定Topic每个Partition上的日志数据保留时间。
segment.bytes 1073741824 指定Topic的日志数据中分段文件的最大字节数。
segment.index.bytes 10485760 此配置控制将偏移量映射到文件位置的索引的大小。
segment.jitter.ms 0 从计划的段滚动时间中减去的最大随机抖动。
segment.ms 604800000 指定Topic的日志数据中分段文件的保留时间。
unclean.leader.election.enable false 是否允许不在ISR中的副本被选举为Leader,若设置为true,且Producer中配置acks=1或者acks=0,在副本没有同步完成的情况下重启Leader所在的Broker,可能会造成数据丢失。
message.downconversion.enable true 此配置控制是否启用消息格式向下转换来满足消费者的请求 Version2.0+

备注:在kafka客户端执行kafka-topics.sh后,在下方--config后面的参数即为对应版本kafka可以通过上面的命令的方式进行修改以动态生效的Topic参数。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。