【云小课】应用平台第6课 命令行管理Kafka的Topic,方便又实用

举报
应用万花筒 发表于 2020/08/23 21:06:08 2020/08/23
【摘要】 Kafka是一款非常热门的消息中间件,它的消息生产与消费都围绕消息主题(Topic)进行,生产者将消息发送给Topic,消息存储在Topic的某个分区,消费者不断从Topic拉取未读消息进行处理。华为云Kafka服务在控制台提供Topic的部分管理功能,包括创建Topic、删除Topic,以及Topic的老化时间、持久化与消息同步复制等功能设置。在这里,小编向新手们介绍Kafka提供的命令行...

Kafka是一款非常热门的消息中间件,它的消息生产与消费都围绕消息主题(Topic)进行:生产者将消息发送给Topic,消息存储在Topic的某个分区,消费者不断从Topic拉取未读消息进行处理。

华为云Kafka服务在控制台提供Topic的部分管理功能,包括创建Topic、删除Topic,以及Topic的老化时间、持久化与消息同步复制等功能设置。

在这里,小编向新手同学介绍Kafka提供的命令行工具,命令行工具相对于云服务控制台,还提供了Topic的参数自定义、分区与副本数的增加等控制台没有提供的功能。

命令行工具准备

JDK环境配置

命令行工具依赖Java环境,需要先安装JDK。以下介绍均以Linux环境为例。

JDK推荐使用Java Development Kit 1.8及以上版本。小编在华为云对象存储(OBS)桶里存了一份JDK,供大家直接下载。Linux系统下可使用如下命令获取:

wget https://407154.obs.cn-south-1.myhuaweicloud.com/jdk-8u231-linux-x64.tar.gz

JDK下载并解压后可直接使用,但建议配置环境变量。小编的配置方式如下:

步骤 1 在环境变量参数中增加JAVA信息:vim ~/.bash_profile

    补充如下内容:

    export JAVA_HOME=/root/jdk1.8.0_231   
    export PATH=$JAVA_HOME/bin:$PATH

    

步骤 2 使环境信息生效:source ~/.bash_profile

    环境变量配置完成后,使用java -version验证环境变量是否生效,版本号应与自定义的JDK一致。

    

下载Kafka命令行工具

Kafka安装包自带了命令行工具,推荐使用2.3.0以上版本,因为低版本的Kafka,Topic管理工具仅支持连接Kafka的Zookeeper组件,不支持直连Broker节点。

客户端下载并解压后即可使用。

    wget https://archive.apache.org/dist/kafka/2.6.0/kafka_2.13-2.6.0.tgz

    

Kafka提供了服务管理、Topic管理、消息管理、消息迁移等30多个小工具,可以通过man手册或者使用 --help选项了解工具的具体使用方法。以下我们重点介绍Topic管理相关的工具kafka-topics.sh(Windows下对应kafka-topics.bat)。

Kafka实例准备

命令行工具客户端与环境准备了,还差Kafka服务:)如果您是华为云Kafka新用户,华为云Kafka在华南-广州区域为您提供了Kafka免费试用版本,省去环境搭建的繁琐。

如下图所示,登录华为云控制台后,找到分布式消息服务Kafka,右上角有一个“免费领取Kafka实例”的快捷入口。注意区域切换到“华南-广州”。

 为了方便本地客户端访问,在试用版的Kafka实例创建完成后,我为其开启了公网访问。在实例基本信息中,可查看访问地址。

Topic管理

到现在为止,Kafka实例、命令行工具都已搭建好。以下介绍如何使用命令行工具kafka-topic.sh管理Kafka的Topic。kafka-topics.sh支持创建、查询、删除Topic,以及Topic的各类参数管理。

1.创建Topic

kafka-topics.sh --create --topic {topic_name} --bootstrap-server {broker_ip}:{port} --partitions {partition_num} --replication-factor {replication_num} --config conf_name1=conf_value1 --config conf_name2=conf_value2

create:创建Topic

bootstrap-server:Kafka的broker节点,Kafka集群有多broker节点,这里填1个即可。

partitions:Topic分区数量

replication-factor: 副本数,副本数量不能超过broker节点数量

config:Topic级别参数配置,以“名称=值”的形式定义每一个参数

从华为云控制台创建,部分Topic的配置参数不能自定义,如果您需要自定义这些配置参数,则可以尝试通过命令行工具创建。

以下列出部分常用的配置参数:

参数名称 默认值 说明

cleanup.policy

delete 消息过期或达到日志文件的大小上限的清理策略。delete:删除;compact:压缩
retention.bytes -1 如果使用delete的策略,则此参数可指定日志文件的大小上限。-1表示无限制。
retention.ms 7天 如果使用delete的策略,则此参数可指定日志文件的保留期限。
segment.bytes 1GB 单个日志文件的大小上限。
delete.retentions.ms 86400000(24小时) 压缩日志保留的最长时间,也是客户端消费消息的最长时间。
file.delete.delay.ms 600000 从文件系统中删除文件之前的等待时间。
flush.messages None

消息强制持久化(fsync)的分区最大消息数量,比如此选项设置为10,则内存中某个分区达到10条消息则强制持久化到磁盘。

此参数设置需谨慎,过大则每次持久化的时间较长,且遇到故障后可能丢失的消息更多,过小则持久化频繁则容易导致整体的客户端请求有一定延迟。

flush.ms None 消息强制持久化的时间间隔,比如此选项设置为60000,则内存中的消息分区每60000毫秒持久化1次。
max.messages.bytes 1000000 Kafka追加消息的最大长度。如果生产者批量发送消息,则单个批次的消息最大长度也受此限制。

2.查询Topic列表

kafka-topics.sh --list --bootstrap-server {broker_ip}:{port}

list:查询Topic列表,返回所有topic名称

3.查看Topic的详细信息

kafka-topics.sh --describe --topic {topic_name} --bootstrap-server {broker_ip}:{port}

describe:查看Topic详情。

返回信息包括Topic名称、分区数、副本数、Topic级别的配置信息,以及每一个分区的Leader副本所在broker节点等信息。

4.为Topic增加分区数、副本数等。

Kafka服务的控制台界面目前没有开放Topic的分区和副本数的修改功能,那么您可以通过命令行工具自助完成。

kafka-topics.sh --alter --partitions {partition_num} --bootstrap-server {broker_ip}:{port}  --topic {topic_name}

alter:对Topic进行配置变更。

变更命令需同时带上Topic的某个参数,如分区数或者副本数。注意,修改分区数或副本数时,只能增加,不能减少。

5.删除Topic

如果确定不再使用某个主题,最好的方式是删除,这样可以释放一些资源,比如磁盘空间、文件句柄等。

kafka-topics.sh --delete --topic {topic_name} --bootstrap-server {broker_ip}:{port}

delete:删除Topic。

注意,删除主题操作不可逆,一旦删除,与该主题相关的所有消息数据会被全部删除。

6.修改Topic的配置

命令行工具使用broker节点地址修改Topic的配置时,存在bug,需要连接zookeeper地址才可以完成。由于华为云Kafka考虑稳定性等原因,没有开放zookeeper,因此,小编建议您通过登录Kafka-manager修改Topic的配置。将在下一期云小课进行介绍


【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。