你知道Kafka创建Topic这个过程做了哪些事情吗?(附视频) (下)
配套视频:
Topic创建流程分析+常见问题
日常运维 、问题排查 怎么能够少了滴滴开源的
滴滴开源LogiKM一站式Kafka监控与管控平台
脚本参数
请看 上一篇文章
源码总结
如果上面的源码分析,你不想看,那么你可以直接看这里的简洁叙述
- 根据是否有传入参数
--zookeeper
来判断创建哪一种 对象topicService
如果传入了--zookeeper
则创建 类ZookeeperTopicService
的对象
否则创建类AdminClientTopicService
的对象(我们主要分析这个对象) - 如果有入参
--command-config
,则将这个文件里面的参数都放到mapl类型commandConfig
里面, 并且也加入bootstrap.servers
的参数;假如配置文件里面已经有了bootstrap.servers
配置,那么会将其覆盖 - 将上面的
commandConfig
作为入参调用Admin.create(commandConfig)
创建 Admin; 这个时候调用的Client模块的代码了, 从这里我们就可以猜测,我们调用kafka-topic.sh
脚本实际上是kafka模拟了一个客户端Client来创建Topic的过程; - 一些异常检查
①.如果配置了副本副本数–replication-factor 一定要大于0
②.如果配置了–partitions 分区数 必须大于0
③.去zk查询是否已经存在该Topic - 判断是否配置了参数
--replica-assignment
; 如果配置了,那么Topic就会按照指定的方式来配置副本情况 - 解析配置
--config
配置放到configsMap
中; configsMap给到NewTopic对象 - 将上面所有的参数包装成一个请求参数
CreateTopicsRequest
;然后找到是Controller
的节点发起请求(ControllerNodeProvider
) - 服务端收到请求之后,开始根据
CreateTopicsRequest
来调用创建Topic的方法; 不过首先要判断一下自己这个时候是不是Controller
; 有可能这个时候Controller重新选举了; 这个时候要抛出异常 - 服务端进行一下请求参数检查
①.检查Topic是否存在
②.检查--replica-assignment
参数和 (--partitions
||--replication-factor
) 不能同时使用 - 如果(
--partitions
||--replication-factor
) 没有设置,则使用 Broker的默认配置(这个Broker肯定是Controller) - 计算分区副本分配方式;如果是传入了
--replica-assignment
;则会安装自定义参数进行组装;否则的话系统会自动计算分配方式; 具体详情请看 【kafka源码】创建Topic的时候是如何分区和副本的分配规则 createTopicPolicy
根据Broker是否配置了创建Topic的自定义校验策略; 使用方式是自定义实现org.apache.kafka.server.policy.CreateTopicPolicy
接口;并 在服务器配置create.topic.policy.class.name
=自定义类; 比如我就想所有创建Topic的请求分区数都要大于10; 那么这里就可以实现你的需求了- zk中写入Topic配置信息 发起
CreateRequest
请求,这里写入的数据,是我们入参时候传的topic配置--config
; 这里的配置会覆盖默认配置;并且节点类型是持久节点;path =/config/topics/Topic名称
- zk中写入Topic分区副本信息 发起
CreateRequest
请求 ,将已经分配好的副本分配策略 写入到/brokers/topics/Topic名称
中; 节点类型 持久节点 Controller
监听zk上面的topic信息; 根据zk上变更的topic信息;计算出新增/删除了哪些Topic; 然后拿到新增Topic的 副本分配信息; 并做一些状态流转- 向新增Topic所在Broker发送
leaderAndIsrRequest
请求, - Broker收到
发送leaderAndIsrRequest请求
; 创建副本Log文件;
Q&A
创建Topic的时候 在Zk上创建了哪些节点
接受客户端请求阶段:
- topic的配置信息
/config/topics/Topic名称
持久节点- topic的分区信息
/brokers/topics/Topic名称
持久节点Controller监听zk节点
/brokers/topics
变更阶段
/brokers/topics/{topicName}/partitions/
持久节点; 无数据- 向zk中写入
/brokers/topics/{topicName}/partitions/{分区号}
持久节点; 无数据- 向zk中写入
/brokers/topics/{topicName}/partitions/{分区号}/state
持久节点;
创建Topic的时候 什么时候在Broker磁盘上创建的日志文件
当Controller监听zk节点
/brokers/topics
变更之后,将新增的Topic 解析好的分区状态流转
NonExistentPartition
->NewPartition
->OnlinePartition
当流转到OnlinePartition
的时候会像分区分配到的Broker发送一个leaderAndIsrRequest
请求,当Broker们收到这个请求之后,根据请求参数做一些处理,其中就包括检查自身有没有这个分区副本的本地Log;如果没有的话就重新创建;
如果我没有指定分区数或者副本数,那么会如何创建
我们都知道,如果我们没有指定分区数或者副本数, 则默认使用Broker的配置, 那么这么多Broker,假如不小心默认值配置不一样,那究竟使用哪一个呢? 那肯定是哪台机器执行创建topic的过程,就是使用谁的配置;
所以是谁执行的? 那肯定是Controller啊! 上面的源码我们分析到了,创建的过程,会指定Controller这台机器去进行;
如果我手动删除了/brokers/topics/
下的某个节点会怎么样?
如果我手动在zk中添加/brokers/topics/{TopicName}
节点会怎么样
先说结论: 根据上面分析过的源码画出的时序图可以指定; 客户端发起创建Topic的请求,本质上是去zk里面写两个数据
- topic的配置信息
/config/topics/Topic名称
持久节点- topic的分区信息
/brokers/topics/Topic名称
持久节点
所以我们绕过这一步骤直接去写入数据,可以达到一样的效果;不过我们的数据需要保证准确
因为在这一步已经没有了一些基本的校验了; 假如这一步我们写入的副本Brokerid不存在会怎样,从时序图中可以看到,leaderAndIsrRequest请求
; 就不会正确的发送的不存在的BrokerId上,那么那台机器就不会创建Log文件;下面不妨让我们来验证一下;
创建一个节点/brokers/topics/create_topic_byhand_zk
节点数据为下面数据;{"version":2,"partitions":{"2":[3],"1":[3],"0":[3]},"adding_replicas":{},"removing_replicas":{}}
这里我用的工具PRETTYZOO
手动创建的,你也可以用命令行创建;
创建完成之后我们再看看本地有没有生成一个Log文件
可以看到我们指定的Broker,已经生成了对应的分区副本Log文件;
而且zk中也写入了其他的数据
在我们写入zk数据的时候,就已经确定好了哪个每个分区的Leader是谁了,那就是第一个副本默认为Leader
如果写入/brokers/topics/{TopicName}
节点之后Controller挂掉了会怎么样
先说结论:Controller 重新选举的时候,会有一些初始化的操作; 会把创建过程继续下去
然后我们来模拟这么一个过程,先停止集群,然后再zk中写入
/brokers/topics/{TopicName}
节点数据; 然后再启动一台Broker;
源码分析: 我们之前分析过Controller的启动过程与选举 有提到过,这里再提一下Controller当选之后有一个地方处理这个事情replicaStateMachine.startup() partitionStateMachine.startup()
启动状态机的过程是不是跟上面的6.1 onNewPartitionCreation 状态流转 的过程很像; 最终都把状态流转到了
OnlinePartition
; 伴随着是不发起了leaderAndIsrRequest
请求; 是不是Broker收到请求之后,创建本地Log文件了
附件
–config 可生效参数
请以sh bin/kafka-topic -help
为准
configurations:
cleanup.policy
compression.type
delete.retention.ms
file.delete.delay.ms
flush.messages
flush.ms
follower.replication.throttled.
replicas
index.interval.bytes
leader.replication.throttled.replicas
max.compaction.lag.ms
max.message.bytes
message.downconversion.enable
message.format.version
message.timestamp.difference.max.ms
message.timestamp.type
min.cleanable.dirty.ratio
min.compaction.lag.ms
min.insync.replicas
preallocate
retention.bytes
retention.ms
segment.bytes
segment.index.bytes
segment.jitter.ms
segment.ms
unclean.leader.election.enable
Tips:如果关于本篇文章你有疑问,可以公众号留言
PS: 文章阅读的源码版本是kafka-2.5
- 点赞
- 收藏
- 关注作者
评论(0)