Kafka 运维部署
【摘要】 环境Centos 7.9JDK 11Kafka 版本:kafka_2.12-3.0.1.tgzkafka 配置知识(https://doc.knowstreaming.com/study-kafka/6-operation#611-%E4%BC%98%E9%9B%85%E5%85%B3%E6%9C%BA) 安装步骤 准备 服务器三台做集群部署192.168.3.100 master192....
环境
- Centos 7.9
- JDK 11
- Kafka 版本:kafka_2.12-3.0.1.tgz
- kafka 配置知识(https://doc.knowstreaming.com/study-kafka/6-operation#611-%E4%BC%98%E9%9B%85%E5%85%B3%E6%9C%BA)
安装步骤
准备
服务器三台做集群部署
192.168.3.100 master
192.168.3.101 nameNode01
192.168.3.102 nameNode02
关闭防火墙
systemctl stop firewalld.service
systemctl disable firewalld.service
配置hostname
cat << EOF >> /etc/hosts
192.168.3.100 master
192.168.3.101 nameNode01
192.168.3.102 nameNode02
EOF
安装JDK
yum install -y java-1.8.0-openjdk-devel.x86_64
安装和配置Kafka
解压文件
#解压文件到当前目录
tar -xvzf kafka_2.12-3.3.1.tgz -C /data
初始化相关路径
mkdir /data/kafka
mkdir /data/kafka/zk
mkdir /data/kafka/zk/data
mkdir /data/kafka/zk/logs
mkdir /data/kafka/data
mkdir /data/kafka/log/
mkdir /data/kafka/log/kafka
Zookeeper配置&启动(使用的是Kafka 自带的zk)
zookeeper id编号配置
注意每台机器都myid的值不同,master 1, nameNode01 2,nameNode02 3(后续和zooker中的配置需要匹配上)
touch /data/kafka/zk/data/myid
echo "1" > /data/kafka/zk/data/myid
zookeeper配置(conf/zookeeper.properties)
#计时周期时间,zookeeper中使用的基本时间单位, 毫秒值.
tickTime=2000
#同步限制,该参数配置leader和follower之间发送消息, 请求和应答的最大时间长度. 此时该参数设置为5, 说明时间限制为5倍tickTime, 即10000ms.
syncLimit=5
#初始化限制,zookeeper集群中的包含多台server, 其中一台为leader, 集群中其余的server为follower。 initLimit参数配置初始化连接时, follower和leader之间的最长心跳时间. 此时该参数设置为10, 说明时间限制为5倍tickTime, 即10*2000=20000ms=20s.
initLimit=10
#最大客户端连接数
maxClientCnxns=1000
#最小会话超时
minSessionTimeout=4000
#最大会话超时
maxSessionTimeout=60000
dataDir=/data/kafka/zk/data
dataLogDir=/data/kafka/zk/logs
# the port at which the clients will connect
clientPort=2181
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080
server.1=master:2888:3888
server.2=nameNode01:2888:3888
server.3=nameNode02:2888:3888
zookeeper 启动&验证
注意要进入kafka解压目录,/data/kafka_2.13-3.0.1
#启动命令
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
#关闭命令
bin/zookeeper-server-stop.sh -daemon config/zookeeper.properties
查看zookeeper 启动是否成功
bin/kafka-run-class.sh org.apache.zookeeper.client.FourLetterWordMain localhost 2181 srvr
部署成功可以看到(可以看到Mode为leader,另外两个机器的状态flower,这种情况下为集群部署成功)
Zookeeper version: 3.6.3--6401e4ad2087061bc6b9f80dec2d69f2e3c8660a, built on 04/08/2021 16:35 GMT
Latency min/avg/max: 0/0.7383/9
Received: 1471
Sent: 1470
Connections: 1
Outstanding: 0
Zxid: 0x20000015a
Mode: leader
Node count: 151
Proposal sizes last/min/max: 48/36/866
日志存储目录在kafka_2.13-3.0.1/logs,查看启动日志是否正常
tail -f logs/zookeeper.out
kafka配置&启动
kafka配置(conf/server.properties)
注意每台机器都broker.id各不相同,master 1, nameNode01 2,nameNode02 3(需要跟zookeeper中的myid匹配上),listeners=PLAINTEXT://xxx:9092 是外网的访问地址,通过指定IP方式,这里每台机器都需要根据实际的IP地址进行修改
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults
############################# Server Basics #############################
#在执行第一次再平衡之前,group协调员将等待更多消费者加入group的时间。 延迟时间越长意味着重新平衡的可能性越小,但是等待处理开始的时间增加
group.initial.rebalance.delay.ms=0
#broker.id是kafka broker的编号,集群里每个broker的id需不同,只要保证不同,可以为任意数字
broker.id=2
#listeners是监听地址,需要提供外网服务的话,要设置本地的IP地址
listeners=PLAINTEXT://192.168.3.100:9092
#kafka数据存储目录
log.dirs=/data/kafka/data
#设置Zookeeper集群地址
zookeeper.connect=master:2181,nameNode01:2181,nameNode02:2181
#为新建Topic的默认Partition数量
num.partitions=1
#follow从leader拉取消息进行同步数据的拉取线程数
num.replica.fetchers=1
#kafka日志存放路径
kafka.log4j.dir=/data/log/kafka
#是否自动创建topic,当读写数据时,如果topic没有则会自动创建
auto.create.topics.enable=true
#启用自动均衡
auto.leader.rebalance.enable=true
#broker在关闭时会向controller报告自己关闭,这样controller可以对broker的下线及时做一些操作,比如partition的重新选举、分区副本的关闭、通知其他的broker元数据变动等
controlled.shutdown.enable=true
#broker关闭时向contoller发送报告的重试次数
controlled.shutdown.max.retries=3
#ZooKeeper的最大超时时间,就是心跳的间隔,若是没有反映,那么认为已经死了,不易过大
zookeeper.session.timeout.ms=6000
#ZooKeeper的连接超时时间
zookeeper.connection.timeout.ms =18000
#topic默认分区的replication个数 ,不能大于集群中broker的个数。
default.replication.factor=2
#broker处理消息的最大线程数,一般情况下不需要去修改,默认3
num.network.threads=3
#broker处理磁盘IO的线程数,数值应该大于你的硬盘数
num.io.threads=8
#socket的发送缓冲区,socket的调优参数SO_SNDBUFF
socket.send.buffer.bytes=102400
#socket的接受缓冲区,socket的调优参数SO_RCVBUFF
socket.receive.buffer.bytes=102400
#socket请求的最大数值,防止serverOOM,message.max.bytes必然要小于socket.request.max.bytes,会被topic创建时的指定参数覆盖
socket.request.max.bytes=104857600
#broker启动的过程中会加载此节点上所有topic的log文件,如果数据量非常大会导致加载时间过长,通过修改处理线程数可以加快log的恢复速度。默认1
num.recovery.threads.per.data.dir=1
# topic的offset的备份份数(消费者offset)。建议设置更高的数字保证更高的可用性
offsets.topic.replication.factor=3
#事务日志Topic副本数
transaction.state.log.replication.factor=3
#指定事务日志Topic的ISR中的最小副本数是多少,用于服务min.insync.replicas,默认值为1
transaction.state.log.min.isr=2
#启用删除主题。 如果此配置已关闭,则通过管理工具删除主题将不起作用。删除topic是影响注册在/admin/delete_topics的监听
delete.topic.enable=true
#日志达到删除大小的阈值,-1为不限制
log.retention.bytes=-1
#检查日志段文件的间隔时间,以确定是否文件属性是否到达删除要求
log.retention.check.interval.ms=300000
#每个日志文件删除之前保存的时间,单位小时
log.retention.hours=12
#topic 分区的日志存放在某个目录下诸多文件中,这些文件将partition的日志切分成一段一段的,这就是段文件(segment file);一个topic的一个分区对应的所有segment文件称为log。这个设置控制着一个segment文件的最大的大小,如果超过了此大小,就会生成一个新的segment文件
log.segment.bytes=1073741824
#这个设置会强制Kafka去新建一个新的log segment文件,即使当前使用的segment文件的大小还没有超过log.segment.bytes
log.roll.hours=168
#分区rebalance检查的频率,由控制器触发,默认300
leader.imbalance.check.interval.seconds=300
#每个broker允许的不平衡的leader的百分比。如果每个broker超过了这个百分比,复制控制器会对分区进行重新的平衡。该值以百分比形式指定,默认10
leader.imbalance.per.broker.percentage=10
#日志压缩去重时候的缓存空间,在空间允许的情况下,越大越好,默认134217728
log.cleaner.dedupe.buffer.size=134217728
#对于压缩的日志保留的最长时间,也是客户端消费消息的最长时间,同log.retention.minutes的区别在于一个控制未压缩数据,一个控制压缩后的数据,默认86400000
log.cleaner.delete.retention.ms=86400000
#启用日志清理器进程在服务器上运行。使用了cleanup.policy = compact的主题,包括内部offsets主题,都应该启动该选项。如果被禁用的话,这些话题将不会被压缩,并且会不断增长,默认true
log.cleaner.enable=true
#控制了log compactor进行clean操作的频率。默认情况下,当log的50%以上已被clean时,就不用继续clean了。此配置可以被覆盖。默认0.5
log.cleaner.min.cleanable.ratio=0.5
#用于日志清理的后台线程的数量,默认1
log.cleaner.threads=1
#kafka允许的最大的一个批次的消息大小。 如果这个数字增加,并且有0.10.2版本以下的消费者,那么消费者的提取大小也必须增加,以便他们可以取得这么大的记录批次。在最新的消息格式版本中,记录总是被组合到一个批次以提高效率。 在以前的消息格式版本中,未压缩的记录不会分组到批次中,并且此限制仅适用于该情况下的单个记录。可以使用主题级别max.message.bytes来设置每个主题,默认1000012
message.max.bytes=1000000
#当生产者将ack设置为“全部”(或“-1”)时,min.insync.replicas指定必须确认写入被认为成功的最小副本数(必须确认每一个repica的写数据都是成功的)。 如果这个最小值不能满足,那么生产者将会引发一个异常(NotEnoughReplicas或NotEnoughReplicasAfterAppend)。当一起使用时,min.insync.replicas和acks允许您强制更大的耐久性保证。 一个典型的情况是创建一个复制因子为3的主题,将min.insync.replicas设置为2,并且生产者使用“all”选项。 这将确保如果大多数副本没有写入生产者则抛出异常。默认1
min.insync.replicas=1
#erver端处理请求时的I/O线程的数量,不要小于磁盘的数量。默认8
num.io.threads=8
#Offsets topic的分区数量(部署后不应更改),默认50
offsets.topic.num.partitions=50
#仅在未设置“listeners”时使用。 使用listeners来代替。 这个端口来监听和接受连接
#port=9092
#为每个分区设置获取的消息的字节数。 这不是绝对最大值,如果第一个非空分区中的第一个record batch大于此值,那么record batch仍将被返回以确保可以进行。 代理接受的最大记录批量大小通过message.max.bytes(broker config)或max.message.bytes(topic config)进行定义,默认1048576
replica.fetch.max.bytes=1048576
#如果一个follower在这个时间内没有发送fetch请求,leader将从ISR重移除这个follower,并认为这个follower已经挂了,默认10000
replica.lag.time.max.ms=10000
#指明了是否能够使不在ISR中replicas follower设置用来作为leader,默认aflse
unclean.leader.election.enable=false
#用于经纪人之间沟通的监听协议
security.inter.broker.protocol=PLAINTEXT
#服务器是否允许自动生成broker.id;如果允许则产生的值会交由reserved.broker.max.id审核,默认false
broker.id.generation.enable=false
kafka启动&验证
KAFKA_HEAP_OPTS 参数设置内存JVM内存
export JMX_PORT="8849"
if [ $# -lt 1 ];
then
echo "USAGE: $0 [-daemon] server.properties [--override property=value]*"
exit 1
fi
base_dir=$(dirname $0)
if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties"
fi
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx2G -Xms2G"
fi
EXTRA_ARGS=${EXTRA_ARGS-'-name kafkaServer -loggc'}
COMMAND=$1
case $COMMAND in
-daemon)
EXTRA_ARGS="-daemon "$EXTRA_ARGS
shift
;;
*)
;;
esac
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
bin/kafka-server-start.sh -daemon config/server.properties
bin/kafka-server-stop.sh -daemon config/server.properties
cat logs/kafkaServer.out
#创建topic
bin/kafka-topics.sh --bootstrap-server master:9092 --create --topic my-topic --replication-factor 3 --partitions 3
#推送数据测试
bin/kafka-verifiable-producer.sh --topic test --bootstrap-server master:9092 --max-messages 1000
#消费指令测试
sh bin/kafka-consumer-perf-test.sh -topic test --bootstrap-server master:9092 --messages 100
Kafka 部署完成
【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)