Kafka元数据异常导致业务异常应急恢复方案
用版本
6.5.x。
问题背景与现象
执行扩容操作,进行数据迁移在数据迁移过程中由于Kafka节点故障,导致Kafka Topic元数据处于紊乱状态,导致无法进行读写操作。
例如:

Topic:O_PS_SEQ_DATA_BUSI_CRD,其中Partition:289, ISR列表为26,29;Leader为26;Replicas列表为10,6。当需要访问Partition 289时,获取Leader为26,则客户端连接26进行读写请求。
通过Kafka自身携带的console-consumer进行数据消费,可以发现元数据紊乱,导致consumer无法正常消费,提示fetch offsets for partition failed due to obsolete leadership infomation。

影响范围、程度
- Kafka服务异常。
 - 生产者无法发送数据到Kafka。
 - 消费者无法从Kafka中消费数据。
 
预期恢复时长
恢复时长与数据量相关。
处理前置条件
操作影响较大,需要提前研发进行沟通确认,与各方达成一致后方可实施。
执行修复之前,必须先准备好如下信息:
|   序号  |  
        项目  |  
        操作方式  |  
     
|---|---|---|
|   1  |  
        集群账号信息  |  
        申请集群admin账户的密码。  |  
     
|   2  |  
        节点账号信息  |  
        申请集群内节点的omm、root用户密码。  |  
     
|   3  |  
        SSH远程登录工具  |  
        准备PuTTY或SecureCRT等工具。  |  
     
|   4  |  
        客户端  |  
        已提前安装好客户端  |  
     
处理思路简述
针对这种情况,采用对异常Partition重新reassign的方式来进行处理。
根据partition的分布和leader,然后去leader节点上确认leader上的源数据存在,再找一个磁盘空间相对充裕的节点,把有问题的partition重分配到原来的leader节点和一个新的节点上。
操作方法
- 执行命令,进入客户端安装目录,例如“/opt/client”。 
cd /opt/client
 - 执行命令,配置环境变量。 
source bigdata_env
 - 如果集群采用了安全版本,要进行安全认证。 
kinit admin
根据提示输入正确的用户密码。
[root@mgtdat-sh-3-01-3 client]#source bigdata_env #配置环境变量 [root@mgtdat-sh-3-01-3 client]#kinit admin #设置kinit认证 Password for admin@HADOOP.COM: - 执行Kafka相关命令,获取Topic分布信息和副本同步信息,观察返回结果。 
kafka-topics.sh
--describe
--zookeeper <zk_host:port/chroot>
例如:
[root@mgtdat-sh-3-01-3 client]#kafka-topics.sh --describe --zookeeper 10.149.0.90:24002/kafka Topic:topic1 PartitionCount:2 ReplicationFactor:2 Configs: Topic: topic1 Partition: 0 Leader: 26 Replicas: 23,25 Isr: 26 Topic: topic1 Partition: 1 Leader: 24 Replicas: 24,23 Isr: 24,23其中,Replicas对应副本分布信息,Isr对应副本同步信息。
 - 如果Topic Partition对应的Leader中Broker的id不在Replicas(副本)列表中。 
该Partition为异常Partition,需要通过重新reassign进行修复。
 - 手工创建迁移Json文件,内容为需要移动的topic和信息,文件保存为expand-cluster-reassignment.json。 
生成的expand-cluster-reassignment.json放置在客户端安装目录。
根据partition的分布和leader,然后去leader节点上确认leader上的源数据存在,再找一个磁盘空间相对充裕的节点,把有问题的partition重分配到原来的leader节点和一个新的节点上(非Replicas中节点)。
 - 根据上述规则,生成迁移Json文件。 
例如:
{"version":1, "partitions":[{"topic":"topic1","partition":0,"replicas":[26,30]}]}其中26为partition的原来的leader, 30为磁盘相对充裕的节点,且不是Replicas中的节点。
 - 根据手工生成的Partitions分布情况执行重分布。 
kafka-reassign-partitions.sh
--zookeeper <zk_host:port/chroot>
--reassignment-json-file <manual assignment json file path>
--execute
例如:
[root@mgtdat-sh-3-01-3 client]#kafka-reassign-partitions.sh --zookeeper 10.149.0.90:24002/kafka --reassignment-json-file /opt/client/expand-cluster-reassignment.json --execute Current partition replica assignment {"version":1,"partitions":[{"topic":"topic1","partition":1,"replicas":[23,25]},{"topic":"topic1","partition":0,"replicas":[26,30]}]} Save this to use as the --reassignment-json-file option during rollback Successfully started reassignment of partitions {"version":1,"partitions":[{"topic":"topic1","partition":0,"replicas":[26,30]}]}命令会打印Topic Partitions原始分布情况和新的分布情况。
 - 确认Partitions重分布执行结果。 
kafka-reassign-partitions.sh
--zookeeper <zk_host:port/chroot>
--reassignment-json-file <manual assignment json file path>
--verify
例如:
[root@mgtdat-sh-3-01-3 client]#kafka-reassign-partitions.sh --zookeeper 10.149.0.90:24002/kafka --reassignment-json-file /opt/client/expand-cluster-reassignment.json --verify Status of partition reassignment: Reassignment of partition [topic1,0] completed successfully 
修复验证
- 执行命令,进入客户端安装目录,例如“/opt/client”。 
cd /opt/client
 - 执行命令,配置环境变量。 
source bigdata_env
 - 如果集群采用了安全版本,要进行安全认证。 
kinit admin
根据提示输入正确的用户密码。
[root@mgtdat-sh-3-01-3 client]#source bigdata_env #配置环境变量 [root@mgtdat-sh-3-01-3 client]#kinit admin #设置kinit认证 Password for admin@HADOOP.COM: - 执行Kafka相关命令,获取Topic分布信息和副本同步信息,观察返回结果。 
kafka-topics.sh
--describe
--zookeeper <zk_host:port/chroot>
例如:
[root@mgtdat-sh-3-01-3 client]#kafka-topics.sh --describe --zookeeper 10.149.0.90:24002/kafka Topic:topic1 PartitionCount:2 ReplicationFactor:2 Configs: Topic: topic1 Partition: 0 Leader: 26 Replicas: 26,30 Isr: 26,30 Topic: topic1 Partition: 1 Leader: 24 Replicas: 24,23 Isr: 24,23其中,Replicas对应副本分布信息,Isr对应副本同步信息。
 - 根据迁移Partition,找到Partition信息,其中Replicas信息和迁移方案一致,则说明迁移成功。
 - 待某Topic的所有异常Partition均已完成修复,可以通过消费者进行数据消费,从而进行Topic状态测试。 
    
不同版本消费者命令不同,请根据版本确定执行命令。
- FusionInsight HD V100R002C50SPC20*  
--topic <topicName>
--zookeeper <zk_host:port/chroot>
--from-beginning[root@10-10-10-11 client]#kafka-console-consumer.sh --topic topic1 --zookeeper 10.149.0.90:24002/kafka --from-beginning 1 2 3其中,1 ,2 ,3为Topic中消息信息。
 - FusionInsight HD 6.5.0版本  
--topic <topicName>
-- bootstrap-server <brokerlist >
--new-consumer < consumer.properties >
--from-beginning
例如:
[root@10-10-10-11 client]#kafka-console-consumer.sh --topic topic1 --bootstrap-server 10.149.0.90:21007 --new-consumer --consumer.config /opt/client/Kafka/kafka/config/consumer.properties --from-beginning 1 2 3其中,1 ,2 ,3为Topic中消息信息
 
 - FusionInsight HD V100R002C50SPC20*  
 
- 点赞
 - 收藏
 - 关注作者
 
            
           
评论(0)