Kafka元数据异常导致业务异常应急恢复方案

举报
FI小粉丝 发表于 2021/12/27 10:10:17 2021/12/27
【摘要】 用版本6.5.x。问题背景与现象执行扩容操作,进行数据迁移在数据迁移过程中由于Kafka节点故障,导致Kafka Topic元数据处于紊乱状态,导致无法进行读写操作。例如:Topic:O_PS_SEQ_DATA_BUSI_CRD,其中Partition:289, ISR列表为26,29;Leader为26;Replicas列表为10,6。当需要访问Partition 289时,获取Leade...

用版本

6.5.x。

问题背景与现象

执行扩容操作,进行数据迁移在数据迁移过程中由于Kafka节点故障,导致Kafka Topic元数据处于紊乱状态,导致无法进行读写操作。

例如:

Topic:O_PS_SEQ_DATA_BUSI_CRD,其中Partition:289, ISR列表为26,29;Leader为26;Replicas列表为10,6。当需要访问Partition 289时,获取Leader为26,则客户端连接26进行读写请求。

通过Kafka自身携带的console-consumer进行数据消费,可以发现元数据紊乱,导致consumer无法正常消费,提示fetch offsets for partition failed due to obsolete leadership infomation。

影响范围、程度

  1. Kafka服务异常。
  2. 生产者无法发送数据到Kafka。
  3. 消费者无法从Kafka中消费数据。

预期恢复时长

恢复时长与数据量相关。

处理前置条件

操作影响较大,需要提前研发进行沟通确认,与各方达成一致后方可实施。

执行修复之前,必须先准备好如下信息:

表1 修复前准备事项

序号

项目

操作方式

1

集群账号信息

申请集群admin账户的密码。

2

节点账号信息

申请集群内节点的omm、root用户密码。

3

SSH远程登录工具

准备PuTTY或SecureCRT等工具。

4

客户端

已提前安装好客户端

处理思路简述

针对这种情况,采用对异常Partition重新reassign的方式来进行处理。

根据partition的分布和leader,然后去leader节点上确认leader上的源数据存在,再找一个磁盘空间相对充裕的节点,把有问题的partition重分配到原来的leader节点和一个新的节点上。

操作方法

  1. 执行命令,进入客户端安装目录,例如“/opt/client”。

    cd /opt/client

  2. 执行命令,配置环境变量。

    source bigdata_env

  3. 如果集群采用了安全版本,要进行安全认证。

    kinit admin

    根据提示输入正确的用户密码。

    [root@mgtdat-sh-3-01-3 client]#source bigdata_env  #配置环境变量 
    [root@mgtdat-sh-3-01-3 client]#kinit admin         #设置kinit认证 
    Password for admin@HADOOP.COM: 
  4. 执行Kafka相关命令,获取Topic分布信息和副本同步信息,观察返回结果。

    kafka-topics.sh

    --describe

    --zookeeper <zk_host:port/chroot>

    例如:

    [root@mgtdat-sh-3-01-3 client]#kafka-topics.sh --describe --zookeeper 10.149.0.90:24002/kafka
    Topic:topic1   PartitionCount:2  ReplicationFactor:2     Configs:
    Topic: topic1  Partition: 0 Leader: 26 Replicas: 23,25 Isr: 26
    Topic: topic1  Partition: 1 Leader: 24 Replicas: 24,23 Isr: 24,23

    其中,Replicas对应副本分布信息,Isr对应副本同步信息。

  5. 如果Topic Partition对应的Leader中Broker的id不在Replicas(副本)列表中。

    该Partition为异常Partition,需要通过重新reassign进行修复。

  6. 手工创建迁移Json文件,内容为需要移动的topic和信息,文件保存为expand-cluster-reassignment.json

    生成的expand-cluster-reassignment.json放置在客户端安装目录。

    根据partition的分布和leader,然后去leader节点上确认leader上的源数据存在,再找一个磁盘空间相对充裕的节点,把有问题的partition重分配到原来的leader节点和一个新的节点上(非Replicas中节点)。

  7. 根据上述规则,生成迁移Json文件。

    例如:

    {"version":1,
    "partitions":[{"topic":"topic1","partition":0,"replicas":[26,30]}]}

    其中26为partition的原来的leader, 30为磁盘相对充裕的节点,且不是Replicas中的节点。

  8. 根据手工生成的Partitions分布情况执行重分布。

    kafka-reassign-partitions.sh

    --zookeeper <zk_host:port/chroot>

    --reassignment-json-file <manual assignment json file path>

    --execute

    例如:

    [root@mgtdat-sh-3-01-3 client]#kafka-reassign-partitions.sh --zookeeper 10.149.0.90:24002/kafka --reassignment-json-file /opt/client/expand-cluster-reassignment.json --execute
    
    Current partition replica assignment
    
    {"version":1,"partitions":[{"topic":"topic1","partition":1,"replicas":[23,25]},{"topic":"topic1","partition":0,"replicas":[26,30]}]}
    
    Save this to use as the --reassignment-json-file option during rollback
    Successfully started reassignment of partitions {"version":1,"partitions":[{"topic":"topic1","partition":0,"replicas":[26,30]}]}

    命令会打印Topic Partitions原始分布情况和新的分布情况。

  9. 确认Partitions重分布执行结果。

    kafka-reassign-partitions.sh

    --zookeeper <zk_host:port/chroot>

    --reassignment-json-file <manual assignment json file path>

    --verify

    例如:

    [root@mgtdat-sh-3-01-3 client]#kafka-reassign-partitions.sh --zookeeper 10.149.0.90:24002/kafka --reassignment-json-file /opt/client/expand-cluster-reassignment.json --verify
    
    Status of partition reassignment:
    Reassignment of partition [topic1,0] completed successfully

修复验证

  1. 执行命令,进入客户端安装目录,例如“/opt/client”。

    cd /opt/client

  2. 执行命令,配置环境变量。

    source bigdata_env

  3. 如果集群采用了安全版本,要进行安全认证。

    kinit admin

    根据提示输入正确的用户密码。

    [root@mgtdat-sh-3-01-3 client]#source bigdata_env  #配置环境变量 
    [root@mgtdat-sh-3-01-3 client]#kinit admin         #设置kinit认证 
    Password for admin@HADOOP.COM: 
  4. 执行Kafka相关命令,获取Topic分布信息和副本同步信息,观察返回结果。

    kafka-topics.sh

    --describe

    --zookeeper <zk_host:port/chroot>

    例如:

    [root@mgtdat-sh-3-01-3 client]#kafka-topics.sh --describe --zookeeper 10.149.0.90:24002/kafka
    
    Topic:topic1 PartitionCount:2  ReplicationFactor:2     Configs:
    Topic: topic1 Partition: 0  Leader: 26  Replicas: 26,30 Isr: 26,30
    Topic: topic1 Partition: 1  Leader: 24  Replicas: 24,23 Isr: 24,23

    其中,Replicas对应副本分布信息,Isr对应副本同步信息。

  5. 根据迁移Partition,找到Partition信息,其中Replicas信息和迁移方案一致,则说明迁移成功。
  6. 待某Topic的所有异常Partition均已完成修复,可以通过消费者进行数据消费,从而进行Topic状态测试。

    不同版本消费者命令不同,请根据版本确定执行命令。

    • FusionInsight HD V100R002C50SPC20*

      kafka-console-consumer.sh

      --topic <topicName>

      --zookeeper <zk_host:port/chroot>

      --from-beginning
      [root@10-10-10-11 client]#kafka-console-consumer.sh --topic topic1 --zookeeper 10.149.0.90:24002/kafka  --from-beginning
      
      1
      2
      3

      其中,1 ,2 ,3为Topic中消息信息。

    • FusionInsight HD 6.5.0版本

      kafka-console-consumer.sh

      --topic <topicName>

      -- bootstrap-server <brokerlist >

      --new-consumer < consumer.properties >

      --from-beginning

      例如:

      [root@10-10-10-11 client]#kafka-console-consumer.sh --topic topic1 --bootstrap-server 10.149.0.90:21007 --new-consumer --consumer.config /opt/client/Kafka/kafka/config/consumer.properties --from-beginning
      
      1
      2
      3
      

      其中,1 ,2 ,3为Topic中消息信息

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。