Kafka的指定副本作为Leader的三种实现方式

举报
石臻臻的杂货铺 发表于 2022/03/02 10:12:43 2022/03/02
【摘要】 哈喽~大家好啊,我是彦祖😋之前,我写过一篇文章叫做 Kafka如何修改分区Leader就是因为在我们实际的运维过程中,需要指定某个副本为ISR,但是呢 Kafka中的Leader选举策略并不支持这个功能,所以需要我们自己来实现它。关于Leader选举策略,你可以看这篇文章Leader选举流程和4种选举策略但是我们在之前的文章中,是留下了一个小尾巴-优化与改进。我们先简单的回顾一下之前的2...
哈喽~大家好啊,我是彦祖😋

之前,我写过一篇文章叫做 Kafka如何修改分区Leader

就是因为在我们实际的运维过程中,需要指定某个副本为ISR,但是呢 Kafka中的Leader选举策略并不支持这个功能,所以需要我们自己来实现它。
关于Leader选举策略,你可以看这篇文章
Leader选举流程和4种选举策略

但是我们在之前的文章中,是留下了一个小尾巴-优化与改进

我们先简单的回顾一下之前的2种方案

方案一: 分区副本重分配 (低成本方案)

之前关于分区副本重分配 我已经写过很多文章了, 这里我就简单说一下;

一般分区副本重分配主要有三个流程

  1. 生成推荐的迁移Json文件
  2. 执行迁移Json文件
  3. 验证迁移流程是否完成

这里我们主要看第2步骤, 来看看迁移文件一般是什么样子的

{
	"version": 1,
	"partitions": [{
		"topic": "topic1",
		"partition": 0,
		"replicas": [0,1,2]
	}]
}

这个迁移Json意思是, 把topic1的「0」号分区的副本分配成[0,1,2] ,也就是说 topic1-0号分区最终有3个副本分别在 {brokerId-0,brokerId-1,brokerId-2} ;

又根据Leader的选举策略得知,不管是什么策略的选择,都是按照AR的顺序来选的

修改AR顺序

AR: 副本的分配顺序

那么我们想要实现我们的需求
是不是把这个Json文件 中的 “replicas”: [0,1,2] 改一下就行了
比如改成 “replicas”: [2,1,0]
改完Json后执行,执行execute, 正式开始重分配流程!
迁移完成之后, 就会发现,Leader已经变成上面的第一个位置的副本「2」

执行Leader选举

修改完AR顺序就结束了吗?

可以说是结束了,也可以说没有结束。

上面只是修改了AR的顺序, 但是没有执行Leader选举呀,这个时候Leader还是原来的,所以我们需要主动触发一下Leader选举

## 石臻臻的杂货铺
## 微信: szzdzhp001

sh bin/kafka-leader-election.sh --bootstrap-server xxxx:9090 --topic Topic1 --election-type PREFERRED --partition 0

这样就会立马切换成我们想要的Leader了。

也可以不主动触发,等Controller自动均衡。

如果你觉得主动触发这个很麻烦,那么没有关系,那就不执行,如果你开启了自动均衡策略的话,默认是开启的。

延伸: 自动均机制

当一个broker停止或崩溃时,这个broker中所有分区的leader将转移给其他副本。这意味着在默认情况下,当这个broker重新启动之后,它的所有分区都将仅作为follower,不再用于客户端的读写操作。

为了避免这种不平衡,Kafka有一个优先副本的概念。如果一个分区的副本列表是1,5,9,节点1将优先作为其他两个副本5和9的leader。

Controller会有一个定时任务,定期执行优先副本选举,这样就不会导致负载不均衡和资源浪费,这就是leader的自动均衡机制

属性 释义 默认
auto.leader.rebalance.enable 是否开启自动均衡 true
leader.imbalance.check.interval.seconds 自动均衡的周期时间,单位秒 300
leader.imbalance.per.broker.percentage 标识每个 Broker 失去平衡的比率,如果超过改比率,则执行重新选举 Broker 的 leader;默认比例是10%; 10

优缺点

优点: 实现了需求, 不需要改源码,也没有额外的开发工作。

缺点: 操作比较复杂容易出错,需要先获取原先的分区分配数据,然后手动修改Json文件,这里比较容易出错,影响会比较大,当然这些都可以通过校验接口来做好限制, 最重要的一点是 副本重分配当前只能有一个任务 !
假如你当前有一个「副本重分配」的任务在,那么这里就不能够执行了。

方案二: 手动修改AR顺序(高成本方案)

  1. 从zk中获取/brokers/topics/{topic名称}节点数据。
  2. 手动调整一下里面的顺序
  3. 将调整后的数据,重新覆盖掉之前的节点。
  4. 删除zk中的/Controller节点,让它触发重新加载,并且同时触发Leader选举。

例如:

在这里插入图片描述

修改的时候请先用get获取数据,在那个基础上改,因为不同版本,里面的数据结构是不一样的,我们只需要改分区AR顺序就行了 “partitions”:{“0”:[0,1,2]}


## get zk 节点数据。

get /szz1/brokers/topics/Topic2

## zk中的修改命令
set /szz1/brokers/topics/Topic2  {"version":2,"partitions":{"0":[0,1,2]},"adding_replicas":{},"removing_replicas":{}}


为什么要删除Controller的zk节点?

之所以删除Controller节点,是因为我们手动修改了zk节点数据之后,因为没有副本的新增,是不会触发Controller去更新AR内存的,就算你主动触发Leader选举,AR还是以前的,并不会达到想要的效果。

删除zk中的/Controller节点,会触发Controller重新选举,重新选举会重新加载所有元数据,所以我们刚刚加载的数据就会生效, 同时Controller重新加载也会触发Leader选举

简单代码
当然上面功能,手动改起来麻烦,那么饿肯定是要集成到LogiKM 3.0中的咯;

优缺点

优点: 实现了目标需求, 简单, 操作方便

缺点: 频繁的Controller重选举对生产环境来说会有一些影响;

方案三:修改源码(高级方案推荐)

我们方案二中的问题就是需要删除/Controller节点发送重新选举,我们能不能不重新选举Controller也能生效呢?

如何让修改后的AR立即生效 ?

Controller会监听每一个topic的节点/brokers/topics/{topic名称}

KafkaController#processPartitionModifications


/**
* 石臻臻的杂货铺
* 微信:szzdzhp001
* 省略部分代码
**/
 private def processPartitionModifications(topic: String): Unit = {
    def restorePartitionReplicaAssignment(
      topic: String,
      newPartitionReplicaAssignment: Map[TopicPartition, ReplicaAssignment]
    ): Unit = {
    
         val partitionReplicaAssignment = zkClient.getFullReplicaAssignmentForTopics(immutable.Set(topic))
    val partitionsToBeAdded = partitionReplicaAssignment.filter { case (topicPartition, _) =>
      controllerContext.partitionReplicaAssignment(topicPartition).isEmpty
    }

    if (topicDeletionManager.isTopicQueuedUpForDeletion(topic)) {
     
    } else if (partitionsToBeAdded.nonEmpty) {
      info(s"New partitions to be added $partitionsToBeAdded")
      partitionsToBeAdded.foreach { case (topicPartition, assignedReplicas) =>
        controllerContext.updatePartitionFullReplicaAssignment(topicPartition, assignedReplicas)
      }
      onNewPartitionCreation(partitionsToBeAdded.keySet)
    }
    }
  }


这段代码省略了很多,我想让你看到的是

只有新增了副本,才会执行更新Controller的内存操作。

那么我们在这里面新增一段逻辑

新增逻辑:如果只是变更了AR的顺序,那么我们也更新一下内存。

来我们改一下源码

    // 1. 找到 AR 顺序有变更的 所有TopicPartition
    val partitionsOrderChange = partitionReplicaAssignment.filter { case (topicPartition, _) =>
      //这里自己写下过滤逻辑 把只是顺序变更的分区找出
      true
    }
    
    if (topicDeletionManager.isTopicQueuedUpForDeletion(topic)) {
      if (partitionsToBeAdded.nonEmpty) {
       
      } else {
       
      }
    } else if (partitionsToBeAdded.nonEmpty) {
      info(s"New partitions to be added $partitionsToBeAdded")
      partitionsToBeAdded.foreach { case (topicPartition, assignedReplicas) =>
        controllerContext.updatePartitionFullReplicaAssignment(topicPartition, assignedReplicas)
      }
      onNewPartitionCreation(partitionsToBeAdded.keySet)
    }else if (partitionsOrderChange.nonEmpty) {
      // ② .在这里加个逻辑
      info(s"OrderChange partitions to be updatecache $partitionsToBeAdded")
      partitionsOrderChange.foreach { case (topicPartition, assignedReplicas) =>
        controllerContext.updatePartitionFullReplicaAssignment(topicPartition, assignedReplicas)
      }
    }

改成这样之后,上面的流程就变成了

  1. 从zk中获取/brokers/topics/{topic名称}节点数据。
  2. 手动调整一下里面的顺序
  3. 将调整后的数据,重新覆盖掉之前的节点。
  4. 手动执行一次,优先副本选举。

完美解决!

思考

方案三 改了之后会对其他的流程有影响吗?

上面更改的方法,一般是在分区副本重分配或者新增分区的时候会触发。

上面新增的逻辑并不会对现有流程有影响,因为假设都是上面的场景的情况下,他们都是会主动更新内存的。

在我看来,这里的改动,完全可以向kafka社区提一个Pr. 来“修复”这个问题。

因为提了这个PR,对我们有收益,没有额外的开销!

欢迎留下你的看法,一起讨论!

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。