【云小课】应用平台第6课 命令行管理Kafka的Topic,方便又实用
Kafka是一款非常热门的消息中间件,它的消息生产与消费都围绕消息主题(Topic)进行:生产者将消息发送给Topic,消息存储在Topic的某个分区,消费者不断从Topic拉取未读消息进行处理。
华为云Kafka服务在控制台提供Topic的部分管理功能,包括创建Topic、删除Topic,以及Topic的老化时间、持久化与消息同步复制等功能设置。
在这里,小编向新手同学介绍Kafka提供的命令行工具,命令行工具相对于云服务控制台,还提供了Topic的参数自定义、分区与副本数的增加等控制台没有提供的功能。
命令行工具准备
JDK环境配置
命令行工具依赖Java环境,需要先安装JDK。以下介绍均以Linux环境为例。
JDK推荐使用Java Development Kit 1.8及以上版本。小编在华为云对象存储(OBS)桶里存了一份JDK,供大家直接下载。Linux系统下可使用如下命令获取:
wget https://407154.obs.cn-south-1.myhuaweicloud.com/jdk-8u231-linux-x64.tar.gz
JDK下载并解压后可直接使用,但建议配置环境变量。小编的配置方式如下:
步骤 1 在环境变量参数中增加JAVA信息:vim ~/.bash_profile
补充如下内容:
export JAVA_HOME=/root/jdk1.8.0_231 export PATH=$JAVA_HOME/bin:$PATH
步骤 2 使环境信息生效:source ~/.bash_profile
环境变量配置完成后,使用java -version验证环境变量是否生效,版本号应与自定义的JDK一致。
下载Kafka命令行工具
Kafka安装包自带了命令行工具,推荐使用2.3.0以上版本,因为低版本的Kafka,Topic管理工具仅支持连接Kafka的Zookeeper组件,不支持直连Broker节点。
客户端下载并解压后即可使用。
wget https://archive.apache.org/dist/kafka/2.6.0/kafka_2.13-2.6.0.tgz
Kafka提供了服务管理、Topic管理、消息管理、消息迁移等30多个小工具,可以通过man手册或者使用 --help选项了解工具的具体使用方法。以下我们重点介绍Topic管理相关的工具kafka-topics.sh(Windows下对应kafka-topics.bat)。
Kafka实例准备
命令行工具客户端与环境准备了,还差Kafka服务:)如果您是华为云Kafka新用户,华为云Kafka在华南-广州区域为您提供了Kafka免费试用版本,省去环境搭建的繁琐。
如下图所示,登录华为云控制台后,找到分布式消息服务Kafka,右上角有一个“免费领取Kafka实例”的快捷入口。注意区域切换到“华南-广州”。
为了方便本地客户端访问,在试用版的Kafka实例创建完成后,我为其开启了公网访问。在实例基本信息中,可查看访问地址。
Topic管理
到现在为止,Kafka实例、命令行工具都已搭建好。以下介绍如何使用命令行工具kafka-topic.sh管理Kafka的Topic。kafka-topics.sh支持创建、查询、删除Topic,以及Topic的各类参数管理。
1.创建Topic
kafka-topics.sh --create --topic {topic_name} --bootstrap-server {broker_ip}:{port} --partitions {partition_num} --replication-factor {replication_num} --config conf_name1=conf_value1 --config conf_name2=conf_value2
create:创建Topic
bootstrap-server:Kafka的broker节点,Kafka集群有多broker节点,这里填1个即可。
partitions:Topic分区数量
replication-factor: 副本数,副本数量不能超过broker节点数量
config:Topic级别参数配置,以“名称=值”的形式定义每一个参数
从华为云控制台创建,部分Topic的配置参数不能自定义,如果您需要自定义这些配置参数,则可以尝试通过命令行工具创建。
以下列出部分常用的配置参数:
参数名称 | 默认值 | 说明 |
cleanup.policy |
delete | 消息过期或达到日志文件的大小上限的清理策略。delete:删除;compact:压缩 |
retention.bytes | -1 | 如果使用delete的策略,则此参数可指定日志文件的大小上限。-1表示无限制。 |
retention.ms | 7天 | 如果使用delete的策略,则此参数可指定日志文件的保留期限。 |
segment.bytes | 1GB | 单个日志文件的大小上限。 |
delete.retentions.ms | 86400000(24小时) | 压缩日志保留的最长时间,也是客户端消费消息的最长时间。 |
file.delete.delay.ms | 600000 | 从文件系统中删除文件之前的等待时间。 |
flush.messages | None |
消息强制持久化(fsync)的分区最大消息数量,比如此选项设置为10,则内存中某个分区达到10条消息则强制持久化到磁盘。 此参数设置需谨慎,过大则每次持久化的时间较长,且遇到故障后可能丢失的消息更多,过小则持久化频繁则容易导致整体的客户端请求有一定延迟。 |
flush.ms | None | 消息强制持久化的时间间隔,比如此选项设置为60000,则内存中的消息分区每60000毫秒持久化1次。 |
max.messages.bytes | 1000000 | Kafka追加消息的最大长度。如果生产者批量发送消息,则单个批次的消息最大长度也受此限制。 |
2.查询Topic列表
kafka-topics.sh --list --bootstrap-server {broker_ip}:{port}
list:查询Topic列表,返回所有topic名称
3.查看Topic的详细信息
kafka-topics.sh --describe --topic {topic_name} --bootstrap-server {broker_ip}:{port}
describe:查看Topic详情。
返回信息包括Topic名称、分区数、副本数、Topic级别的配置信息,以及每一个分区的Leader副本所在broker节点等信息。
4.为Topic增加分区数、副本数等。
Kafka服务的控制台界面目前没有开放Topic的分区和副本数的修改功能,那么您可以通过命令行工具自助完成。
kafka-topics.sh --alter --partitions {partition_num} --bootstrap-server {broker_ip}:{port} --topic {topic_name}
alter:对Topic进行配置变更。
变更命令需同时带上Topic的某个参数,如分区数或者副本数。注意,修改分区数或副本数时,只能增加,不能减少。
5.删除Topic
如果确定不再使用某个主题,最好的方式是删除,这样可以释放一些资源,比如磁盘空间、文件句柄等。
kafka-topics.sh --delete --topic {topic_name} --bootstrap-server {broker_ip}:{port}
delete:删除Topic。
注意,删除主题操作不可逆,一旦删除,与该主题相关的所有消息数据会被全部删除。
6.修改Topic的配置
命令行工具使用broker节点地址修改Topic的配置时,存在bug,需要连接zookeeper地址才可以完成。由于华为云Kafka考虑稳定性等原因,没有开放zookeeper,因此,小编建议您通过登录Kafka-manager修改Topic的配置。将在下一期云小课中进行介绍。
- 点赞
- 收藏
- 关注作者
评论(0)