Kafka最佳使用实践-Kafka分区散列到新扩容节点
1.基本信息
当集群规模不足以应对业务数据量的压力时,扩容集群节点是必要的手段。Kafka集群在扩容后通常会选择两种方案来均衡集群中的数据。
方案I 执行rebalance均衡操作,将整个集群的分区均衡的分配到每个节点。
方案II 扩容分区,重新创建分区,并且将分区分布到新扩容的节点上。
a) 根据以上两种方案分别对应于三种不同的场景:
场景I 如果扩容前每个节点的总分区数量超过了5000,则优先使用rebalance操作操作均衡整个集群的分区。
场景II 如果扩容前每个节点的总分区数量没有超过3000,则优先使用扩容分区的方式,将新的分区扩容到新节点。
场景III 如果扩容前每个节点的 3000<总分区数<5000。可以先对数据量最大的topic先进行分区扩容并移动到新节点,然后对剩余的topic分区进行rebalance。
Rebalance的方法详见对应版本的《容量调整指导书》。以下介绍两种分区扩容的方式。
2.需求描述
Kafka分区扩容操作是集群扩容后的常规操作之一,由于kafka自身机制原理,无法做到自动将扩容后的分区散列到已经扩容的集群中。
Kafka的分区散列原理为:先随机挑选一个节点,然后从这个节点开始顺序的分配每一个分区。如下图:
因此,如果kafka集群扩容以后,增加的topic分区是无法保证自动散列到新扩容的节点的。本文提供了两种方案实现将新增加的分区散列到新扩容的节点上。
3.处理方案
3.1 处理方案简述
本文提供的两种操作方案:
方案一:使用扩容命令行直接指定新增加分区副本到指定的节点。
方案二:先执行命令增加分区,然后使用迁移命令将新增加的分区迁移到指定的节点上。
两种方案的比较:
方案一:扩容命令需要手动指定分区所在的节点,如果要扩容的分区数量较多,整个工作量会比较繁琐,该方法适用于新扩分区数很少的场景。
方案二:该方案较方案一可以处理大量的分区扩容,但适合对增加的分区数<=当前总分区数的场景,(即,如果当前的分区数为100,那么一次扩容的分区数不能大于100,即扩容最终结果的分区总数不能大于200),该方案不会移动旧的分区,新的分区由于刚创建仅有极少量数据,因此在移动过程中会非常快,不会对整个集群的数据io造成负担。
3.1.1 方案一:使用扩容命令行直接指定新增加分区副本到指定的节点:
1)使用命令查出kafka的brokerID和集群业务IP之间的对应关系,命令如下:
kafka-broker-info.sh --zookeeper ZKIP:24002/kafka
例如如下查询结果中,假设5,6是新扩容的节点
2)查询test、test1的topic状态:
kafka-topics.sh --describe --zookeeper ZKIP:24002/kafka --topic test
3)如上图test已经有5个分区,现在给test分别再扩5个分区,将分区数增加到10,且要求新扩的分区指定到brokerid为4,5的节点。
命令一:(以test为例)
./kafka-topics.sh --topic test --partitions 10 --alter --replica-assignment 6:7,7:8,8:6,8:7,7:6,4:5,4:5,4:5,4:5,4:5 --zookeeper ZKIP:24002/kafka
执行完后查询topic状态,可以发现分区已经扩容成功:
操作注意事项:
1. --partitions 10 指的是扩容后最终的分区总个数,本次由5扩到10因此写10。
2. --replica-assignment 6:7,7:8,8:6,8:7,7:6,4:5,4:5,4:5,4:5,4:5
是指定了每个分区的每个副本所在的broker节点,与图中的Replicas中的内容对应。
例如,第一组数字 6:7,对应的是partition:0 中的Replicas:6,7。Topic总共有多少个分区就有多少组数字,每组数字以逗号分隔,例如:上图中最终的分区数为10,就有10组数字以逗号分隔6:7,7:8,8:6,8:7,7:6,4:5,4:5,4:5,4:5,4:5。注意,这里的组数必须是10,如果大于或者小于10组会报错。
3. 每次扩容过程中,首先要把旧的分区副本写好。即需要将原有的分区按照其replicas的分布方式替换成冒号分隔的格式即可,例如上图中,前5个分区是旧的分区,按照replicas的分布方式替换为,6:7,7:8,8:6,8:7,7:6 。 后5个分区是要扩容的分区,是需要手动指定新扩容节点的分区。即,4:5,4:5,4:5,4:5,4:5。即,每次执行时,--replica-assignment后面跟的内容必须是:旧分区的副本内容+新分区的副本内容。
4. 由于是手动指定新扩容的分区所在的节点,因此,只要保证每个副本在每个broker节点均衡散列即可。建议使用kafka的分配策略,如下:假设新增加的broker节点为5,6,7,8,9。同时需要某个topic新增加10个副本,分配策略可以如下:
5:6,6:7,7:8,8:9,9:5,5:6,6:7,7:8,8:9,9:5
命令二:(以test1为例子)
./kafka-topics.sh --topic test1 --partitions 10 --alter --replica-assignment 1,1,1,1,1,4:5,4:5,4:5,4:5,4:5 --zookeeper ZKIP:24002/kafka
执行完后状态如下:
操作注意事项:
1. 使用命令二时,--partitions 10 指的是扩容后分区总个数,本次由5扩到10因此写10
2. --replica-assignment 1,1,1,1,1,4:5,4:5,4:5,4:5,4:5是指定了每个分区的每个副本所在的broker节点,与图中的Replicas中的内容对应,由于旧的分区已经指定了节点,因此这里可以以任意数字代替。每个数字之间以逗号分隔,有几个旧的分区就写几个。例如:将前5个分区组以1,1,1,1,1代替,后面要手动新增加的分区与命令一中的内容一致。
3.1.2 方案二:先执行命令增加分区,然后使用迁移命令将新增加的分区迁移到指定的节点上。
例:test已经有11个分区,现在给test再扩11个分区,将分区数增加到22,且要求新扩的分区指定到brokerid为6,7,8的节点
1) 查询topic状态:kafka-topics.sh --describe --zookeeper ZKIP:24002/kafka --topic test
2) 查询原topic分布情况,创建生成策略json文件generate.json。内容如下:
{"topics":[{"topic": "test"}],"version":1}
3) 对要扩分区副本所在位置进行预设置(作用:提前进行预设置,为扩分区后尽快完成副本迁移做准备)
kafka-reassign-partitions.sh --zookeeper ZKIP:24002/kafka --topics-to-move-json-file generate.json --broker-list 6,7,8 --generate
说明:因为新扩的分区想要落在6,7,8节点,所以仅指定--broker-list 6,7,8,这样kafka会根据这三个节点给出预方案,将红框中的与方案粘贴保存至move.json,并做相应修改:
将上一步查询结果红框中的内容粘贴出来写入move.json文件中如下所示:
因为新扩分区是分区编号是顺序递增的,因此新扩容的11个的分区编号肯定是11~21,因此只移动将要增加的分区即可,所以需要将上面文件中move.json中的分区号由原来的0-10修改为11-21,修改后如下:
内容格式如下:
{"version":1,
"partitions":[
{"topic":"test","partition":11,"replicas":[8,6],"log_dirs":["any","any"]},
{"topic":"test","partition":12,"replicas":[8,7],"log_dirs":["any","any"]},
{"topic":"test","partition":13,"replicas":[7,6],"log_dirs":["any","any"]},
{"topic":"test","partition":14,"replicas":[8,6],"log_dirs":["any","any"]},
{"topic":"test","partition":15,"replicas":[7,8],"log_dirs":["any","any"]},
{"topic":"test","partition":16,"replicas":[7,6],"log_dirs":["any","any"]},
{"topic":"test","partition":17,"replicas":[7,8],"log_dirs":["any","any"]},
{"topic":"test","partition":18,"replicas":[6,8],"log_dirs":["any","any"]},
{"topic":"test","partition":19,"replicas":[6,7],"log_dirs":["any","any"]},
{"topic":"test","partition":20,"replicas":[6,7],"log_dirs":["any","any"]},
{"topic":"test","partition":21,"replicas":[8,7],"log_dirs":["any","any"]}]}
修改完成后保存
注意事项:
1.这一步一定要检查好move.json的内容,不要包含原始的分区,否则会将原有的分区也进行迁移。
2.将其中any更换成对应分区的要移动的目录,如果不考虑目录的位置则可以不用写。例如:
{"version":1,
"partitions":[
{"topic":"test","partition":11,"replicas":[8,6] },
{"topic":"test","partition":12,"replicas":[8,7] },
{"topic":"test","partition":13,"replicas":[7,6] },
{"topic":"test","partition":14,"replicas":[8,6] },
{"topic":"test","partition":15,"replicas":[7,8] },
{"topic":"test","partition":16,"replicas":[7,6] },
{"topic":"test","partition":17,"replicas":[7,8] },
{"topic":"test","partition":18,"replicas":[6,8] },
{"topic":"test","partition":19,"replicas":[6,7] },
{"topic":"test","partition":20,"replicas":[6,7] },
{"topic":"test","partition":21,"replicas":[8,7]]}
4) 将topic:test分区扩至22
kafka-topics.sh --zookeeper ZKIP:24002/kafka --alter --topic test --partitions 22
使用如下查询状态,可以看到新扩的分区在所有节点是自由分布的。
kafka-topics.sh --describe --zookeeper ZKIP:24002/kafka --topic test
5) 执行预分配脚本对新扩分区进行分配
kafka-reassign-partitions.sh --zookeeper ZKIP:24002/kafka --reassignment-json-file move.json --execute
6)查询topic状态,可以看到新扩的分区已经按照我们预设置的脚本进行分配
kafka-topics.sh --describe --zookeeper ZKIP:24002/kafka --topic test
操作注意事项:
1, 整个移动过程中,为了保证移动迅速完成,在执行完第4步后迅速执行第5步可以保证执行过程很快完成,并且不会对原始数据造成影响。
- 点赞
- 收藏
- 关注作者
评论(0)