你知道Kafka创建Topic这个过程做了哪些事情吗?(附视频) (下)

举报
石臻臻的杂货铺 发表于 2021/08/09 19:57:08 2021/08/09
【摘要】 配套视频:Topic创建流程分析+常见问题日常运维 、问题排查 怎么能够少了滴滴开源的滴滴开源LogiKM一站式Kafka监控与管控平台 脚本参数 源码总结 Q&A 创建Topic的时候 在Zk上创建了哪些节点 创建Topic的时候 什么时候在Broker磁盘上创建的日志文件 如果我没有指定分区数或者副本数,那么会如何创建 如果我手动删除了/brokers/topics/下的某个节点会怎么样...

配套视频:
Topic创建流程分析+常见问题

日常运维问题排查 怎么能够少了滴滴开源的
滴滴开源LogiKM一站式Kafka监控与管控平台

脚本参数

请看 上一篇文章

源码总结

如果上面的源码分析,你不想看,那么你可以直接看这里的简洁叙述

  1. 根据是否有传入参数--zookeeper 来判断创建哪一种 对象topicService
    如果传入了--zookeeper 则创建 类 ZookeeperTopicService的对象
    否则创建类AdminClientTopicService的对象(我们主要分析这个对象)
  2. 如果有入参--command-config ,则将这个文件里面的参数都放到mapl类型 commandConfig里面, 并且也加入bootstrap.servers的参数;假如配置文件里面已经有了bootstrap.servers配置,那么会将其覆盖
  3. 将上面的commandConfig作为入参调用Admin.create(commandConfig)创建 Admin; 这个时候调用的Client模块的代码了, 从这里我们就可以猜测,我们调用kafka-topic.sh脚本实际上是kafka模拟了一个客户端Client来创建Topic的过程;
  4. 一些异常检查
    ①.如果配置了副本副本数–replication-factor 一定要大于0
    ②.如果配置了–partitions 分区数 必须大于0
    ③.去zk查询是否已经存在该Topic
  5. 判断是否配置了参数--replica-assignment ; 如果配置了,那么Topic就会按照指定的方式来配置副本情况
  6. 解析配置--config 配置放到configsMap中; configsMap给到NewTopic对象
  7. 将上面所有的参数包装成一个请求参数CreateTopicsRequest ;然后找到是Controller的节点发起请求(ControllerNodeProvider)
  8. 服务端收到请求之后,开始根据CreateTopicsRequest来调用创建Topic的方法; 不过首先要判断一下自己这个时候是不是Controller; 有可能这个时候Controller重新选举了; 这个时候要抛出异常
  9. 服务端进行一下请求参数检查
    ①.检查Topic是否存在
    ②.检查 --replica-assignment参数和 (--partitions || --replication-factor ) 不能同时使用
  10. 如果(--partitions || --replication-factor ) 没有设置,则使用 Broker的默认配置(这个Broker肯定是Controller)
  11. 计算分区副本分配方式;如果是传入了 --replica-assignment;则会安装自定义参数进行组装;否则的话系统会自动计算分配方式; 具体详情请看 【kafka源码】创建Topic的时候是如何分区和副本的分配规则
  12. createTopicPolicy根据Broker是否配置了创建Topic的自定义校验策略; 使用方式是自定义实现org.apache.kafka.server.policy.CreateTopicPolicy接口;并 在服务器配置 create.topic.policy.class.name=自定义类; 比如我就想所有创建Topic的请求分区数都要大于10; 那么这里就可以实现你的需求了
  13. zk中写入Topic配置信息 发起CreateRequest请求,这里写入的数据,是我们入参时候传的topic配置--config; 这里的配置会覆盖默认配置;并且节点类型是持久节点;path = /config/topics/Topic名称
  14. zk中写入Topic分区副本信息 发起CreateRequest请求 ,将已经分配好的副本分配策略 写入到 /brokers/topics/Topic名称中; 节点类型 持久节点
  15. Controller监听zk上面的topic信息; 根据zk上变更的topic信息;计算出新增/删除了哪些Topic; 然后拿到新增Topic的 副本分配信息; 并做一些状态流转
  16. 向新增Topic所在Broker发送leaderAndIsrRequest请求,
  17. Broker收到发送leaderAndIsrRequest请求; 创建副本Log文件;

在这里插入图片描述

Q&A

创建Topic的时候 在Zk上创建了哪些节点

接受客户端请求阶段:

  1. topic的配置信息 /config/topics/Topic名称 持久节点
  2. topic的分区信息/brokers/topics/Topic名称 持久节点

Controller监听zk节点/brokers/topics变更阶段

  1. /brokers/topics/{topicName}/partitions/持久节点; 无数据
  2. 向zk中写入/brokers/topics/{topicName}/partitions/{分区号} 持久节点; 无数据
  3. 向zk中写入/brokers/topics/{topicName}/partitions/{分区号}/state 持久节点;

创建Topic的时候 什么时候在Broker磁盘上创建的日志文件

当Controller监听zk节点/brokers/topics变更之后,将新增的Topic 解析好的分区状态流转
NonExistentPartition->NewPartition->OnlinePartition 当流转到OnlinePartition的时候会像分区分配到的Broker发送一个leaderAndIsrRequest请求,当Broker们收到这个请求之后,根据请求参数做一些处理,其中就包括检查自身有没有这个分区副本的本地Log;如果没有的话就重新创建;

如果我没有指定分区数或者副本数,那么会如何创建

我们都知道,如果我们没有指定分区数或者副本数, 则默认使用Broker的配置, 那么这么多Broker,假如不小心默认值配置不一样,那究竟使用哪一个呢? 那肯定是哪台机器执行创建topic的过程,就是使用谁的配置;
所以是谁执行的? 那肯定是Controller啊! 上面的源码我们分析到了,创建的过程,会指定Controller这台机器去进行;

如果我手动删除了/brokers/topics/下的某个节点会怎么样?

详情请看 【kafka实战】一不小心删除了/brokers/topics/下的某个Topic

如果我手动在zk中添加/brokers/topics/{TopicName}节点会怎么样

先说结论: 根据上面分析过的源码画出的时序图可以指定; 客户端发起创建Topic的请求,本质上是去zk里面写两个数据

  1. topic的配置信息 /config/topics/Topic名称 持久节点
  2. topic的分区信息/brokers/topics/Topic名称 持久节点
    所以我们绕过这一步骤直接去写入数据,可以达到一样的效果;不过我们的数据需要保证准确
    因为在这一步已经没有了一些基本的校验了; 假如这一步我们写入的副本Brokerid不存在会怎样,从时序图中可以看到,leaderAndIsrRequest请求; 就不会正确的发送的不存在的BrokerId上,那么那台机器就不会创建Log文件;

下面不妨让我们来验证一下;
创建一个节点/brokers/topics/create_topic_byhand_zk 节点数据为下面数据;

{"version":2,"partitions":{"2":[3],"1":[3],"0":[3]},"adding_replicas":{},"removing_replicas":{}}

在这里插入图片描述
这里我用的工具PRETTYZOO手动创建的,你也可以用命令行创建;
创建完成之后我们再看看本地有没有生成一个Log文件
在这里插入图片描述
可以看到我们指定的Broker,已经生成了对应的分区副本Log文件;
而且zk中也写入了其他的数据在这里插入图片描述
在我们写入zk数据的时候,就已经确定好了哪个每个分区的Leader是谁了,那就是第一个副本默认为Leader

如果写入/brokers/topics/{TopicName}节点之后Controller挂掉了会怎么样

先说结论:Controller 重新选举的时候,会有一些初始化的操作; 会把创建过程继续下去

然后我们来模拟这么一个过程,先停止集群,然后再zk中写入/brokers/topics/{TopicName}节点数据; 然后再启动一台Broker;
源码分析: 我们之前分析过Controller的启动过程与选举 有提到过,这里再提一下Controller当选之后有一个地方处理这个事情

replicaStateMachine.startup()
partitionStateMachine.startup()

启动状态机的过程是不是跟上面的6.1 onNewPartitionCreation 状态流转 的过程很像; 最终都把状态流转到了OnlinePartition; 伴随着是不发起了leaderAndIsrRequest请求; 是不是Broker收到请求之后,创建本地Log文件了

附件

–config 可生效参数

请以sh bin/kafka-topic -help 为准

configurations:                      
                  cleanup.policy                        
                  compression.type                      
                  delete.retention.ms                   
                  file.delete.delay.ms                  
                  flush.messages                        
                  flush.ms                              
                  follower.replication.throttled.       
replicas                             
                index.interval.bytes                  
                leader.replication.throttled.replicas 
                max.compaction.lag.ms                 
                max.message.bytes                     
                message.downconversion.enable         
                message.format.version                
                message.timestamp.difference.max.ms   
                message.timestamp.type                
                min.cleanable.dirty.ratio             
                min.compaction.lag.ms                 
                min.insync.replicas                   
                preallocate                           
                retention.bytes                       
                retention.ms                          
                segment.bytes                         
                segment.index.bytes                   
                segment.jitter.ms                     
                segment.ms                            
                unclean.leader.election.enable

Tips:如果关于本篇文章你有疑问,可以公众号留言

PS: 文章阅读的源码版本是kafka-2.5

图片替换文本
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。