消息中间件技术二Kafka的集群部署
在我们大量使用分布式数据库、分布式计算集群的时候,是否会遇到这样的一些问题:
-
我们想分析下用户行为(pageviews),以便我们设计出更好的广告位
-
我想对用户的搜索关键词进行统计,分析出当前的流行趋势
-
有些数据,存储数据库浪费,直接存储硬盘效率又低
这些场景都有一个共同点:
数据是由上游模块产生,上游模块,使用上游模块的数据计算、统计、分析,这个时候就可以使用消息系统,尤其是分布式消息系统!
Kafka的定义
What is Kafka:它是一个分布式消息系统,由linkedin使用scala编写,用作LinkedIn的活动流(Activity Stream)和运营数据处理管道(Pipeline)的基础。具有高水平扩展和高吞吐量。
Zookeeper是一种在分布式系统中被广泛用来作为:分布式状态管理、分布式协调管理、分布式配置管理、和分布式锁服务的集群。kafka增加和减少服务器都会在Zookeeper节点上触发相应的事件kafka系统会捕获这些事件,进行新一轮的负载均衡,客户端也会捕获这些事件来进行新一轮的处理。
二、Kafka相关概念
1、 AMQP协议
Advanced Message Queuing Protocol (高级消息队列协议)
The Advanced Message Queuing Protocol (AMQP):是一个标准开放的应用层的消息中间件(Message Oriented Middleware)协议。AMQP定义了通过网络发送的字节流的数据格式。因此兼容性非常好,任何实现AMQP协议的程序都可以和与AMQP协议兼容的其他程序交互,可以很容易做到跨语言,跨平台。
上面说的3种比较流行的消息队列协议,要么支持AMQP协议,要么借鉴了AMQP协议的思想进行了开发、实现、设计。
2、 一些基本的概念
1、消费者:(Consumer):从消息队列中请求消息的客户端应用程序
2、生产者:(Producer) :向broker发布消息的应用程序
3、AMQP服务端(broker):用来接收生产者发送的消息并将这些消息路由给服务器中的队列,便于fafka将生产者发送的消息,动态的添加到磁盘并给每一条消息一个偏移量,所以对于kafka一个broker就是一个应用程序的实例
kafka支持的客户端语言:Kafka客户端支持当前大部分主流语言,包括:C、C++、Erlang、Java、.net、perl、PHP、Python、Ruby、Go、Javascript
可以使用以上任何一种语言和kafka服务器进行通信(即辨析自己的consumer从kafka集群订阅消息也可以自己写producer程序)
3、Kafka架构
生产者生产消息、kafka集群、消费者获取消息这样一种架构,如下图:
kafka集群中的消息,是通过Topic(主题)来进行组织的,如下图:
一些基本的概念:
1、主题(Topic):一个主题类似新闻中的体育、娱乐、教育等分类概念,在实际工程中通常一个业务一个主题。
2、分区(Partition):一个Topic中的消息数据按照多个分区组织,分区是kafka消息队列组织的最小单位,一个分区可以看作是一个FIFO( First Input First Output的缩写,先入先出队列)的队列。
kafka分区是提高kafka性能的关键所在,当你发现你的集群性能不高时,常用手段就是增加Topic的分区,分区里面的消息是按照从新到老的顺序进行组织,消费者从队列头订阅消息,生产者从队列尾添加消息。
三、Zookeeper集群搭建
Kafka集群是把状态保存在Zookeeper中的,首先要搭建Zookeeper集群。
1、软件环境
(3台服务器-我的测试)
192.168.7.100 server1
192.168.7.101 server2
192.168.7.107 server3
1、Linux服务器一台、三台、五台、(2*n+1),Zookeeper集群的工作是超过半数才能对外提供服务,3台中超过两台超过半数,允许1台挂掉 ,是否可以用偶数,其实没必要。
如果有四台那么挂掉一台还剩下三台服务器,如果在挂掉一个就不行了,这里记住是超过半数。
2、Java jdk1.7 zookeeper是用java写的所以他的需要JAVA环境,java是运行在java虚拟机上的
3、Zookeeper的稳定版本Zookeeper 3.4.6版本
2、配置&安装Zookeeper
下面的操作是:3台服务器统一操作
1、安装Java
yum list java*
yum -y install java-1.7.0-openjdk*
2、下载Zookeeper
首先要注意在生产环境中目录结构要定义好,防止在项目过多的时候找不到所需的项目
#我的目录统一放在/opt下面
#首先创建Zookeeper项目目录
mkdir zookeeper #项目目录
mkdir zkdata #存放快照日志
mkdir zkdatalog#存放事物日志
下载Zookeeper
#下载软件
cd /opt/zookeeper/
wget http://mirrors.cnnic.cn/apache/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz
#解压软件
tar -zxvf zookeeper-3.4.6.tar.gz
3、修改配置文件
进入到解压好的目录里面的conf目录中,查看
#进入conf目录
/opt/zookeeper/zookeeper-3.4.6/conf
#查看
[root@192.168.7.107]$ ll
-rw-rw-r--. 1 1000 1000 535 Feb 20 2014 configuration.xsl
-rw-rw-r--. 1 1000 1000 2161 Feb 20 2014 log4j.properties
-rw-rw-r--. 1 1000 1000 922 Feb 20 2014 zoo_sample.cfg
#zoo_sample.cfg 这个文件是官方给我们的zookeeper的样板文件,给他复制一份命名为zoo.cfg,zoo.cfg是官方指定的文件命名规则。
3台服务器的配置文件
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/opt/zookeeper/zkdata
dataLogDir=/opt/zookeeper/zkdatalog
clientPort=12181
server.1=192.168.7.100:12888:13888
server.2=192.168.7.101:12888:13888
server.3=192.168.7.107:12888:13888
#server.1 这个1是服务器的标识也可以是其他的数字, 表示这个是第几号服务器,用来标识服务器,这个标识要写到快照目录下面myid文件里
#192.168.7.107为集群里的IP地址,第一个端口是master和slave之间的通信端口,默认是2888,第二个端口是leader选举的端口,集群刚启动的时候选举或者leader挂掉之后进行新的选举的端口默认是3888
配置文件解释:
#tickTime:
这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。
#initLimit:
这个配置项是用来配置 Zookeeper 接受客户端(这里所说的客户端不是用户连接 Zookeeper 服务器的客户端,而是 Zookeeper 服务器集群中连接到 Leader 的 Follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过 5个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 5*2000=10 秒
#syncLimit:
这个配置项标识 Leader 与Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是5*2000=10秒
#dataDir:
快照日志的存储路径
#dataLogDir:
事物日志的存储路径,如果不配置这个那么事物日志会默认存储到dataDir制定的目录,这样会严重影响zk的性能,当zk吞吐量较大的时候,产生的事物日志、快照日志太多
#clientPort:
这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。修改他的端口改大点
创建myid文件
#server1
echo "1" > /opt/zookeeper/zkdata/myid
#server2
echo "2" > /opt/zookeeper/zkdata/myid
#server3
echo "3" > /opt/zookeeper/zkdata/myid
4、重要配置说明
1、myid文件和server.myid 在快照目录下存放的标识本台服务器的文件,他是整个zk集群用来发现彼此的一个重要标识。
2、zoo.cfg 文件是zookeeper配置文件 在conf目录里。
3、log4j.properties文件是zk的日志输出文件 在conf目录里用java写的程序基本上有个共同点日志都用log4j,来进行管理。
configuration for log4j
4、zkEnv.sh和zkServer.sh文件
zkServer.sh 主的管理程序文件
zkEnv.sh 是主要配置,zookeeper集群启动时配置环境变量的文件
5、还有一个需要注意
ZooKeeper server will not remove old snapshots and log files when using the default configuration (see autopurge below), this is the responsibility of the operator
zookeeper不会主动的清除旧的快照和日志文件,这个是操作者的责任。
但是可以通过命令去定期的清理。
#!/bin/bash
#snapshot file dir
dataDir=/opt/zookeeper/zkdata/version-2
#tran log dir
dataLogDir=/opt/zookeeper/zkdatalog/version-2
#Leave 66 files
count=66
count=$[$count+1]
ls -t $dataLogDir/log.* | tail -n +$count | xargs rm -f
ls -t $dataDir/snapshot.* | tail -n +$count | xargs rm -f
#以上这个脚本定义了删除对应两个目录中的文件,保留最新的66个文件,可以将他写到crontab中,设置为每天凌晨2点执行一次就可以了。
#zk log dir del the zookeeper log
#logDir=
#ls -t $logDir/zookeeper.log.* | tail -n +$count | xargs rm -f
其他方法:
第二种:使用ZK的工具类PurgeTxnLog,它的实现了一种简单的历史文件清理策略,可以在这里看一下他的使用方法 http://zookeeper.apache.org/doc/r3.4.6/zookeeperAdmin.html
第三种:对于上面这个执行,ZK自己已经写好了脚本,在bin/zkCleanup.sh中,所以直接使用这个脚本也是可以执行清理工作的。
第四种:从3.4.0开始,zookeeper提供了自动清理snapshot和事务日志的功能,通过配置 autopurge.snapRetainCount 和 autopurge.purgeInterval 这两个参数能够实现定时清理了。这两个参数都是在zoo.cfg中配置的:
autopurge.purgeInterval 这个参数指定了清理频率,单位是小时,需要填写一个1或更大的整数,默认是0,表示不开启自己清理功能。
autopurge.snapRetainCount 这个参数和上面的参数搭配使用,这个参数指定了需要保留的文件数目。默认是保留3个。
推荐使用第一种方法,对于运维人员来说,将日志清理工作独立出来,便于统一管理也更可控。毕竟zk自带的一些工具并不怎么给力。
5、启动服务并查看
1、启动服务
#进入到Zookeeper的bin目录下
cd /opt/zookeeper/zookeeper-3.4.6/bin
#启动服务(3台都需要操作)
./zkServer.sh start
2、检查服务状态
#检查服务器状态
./zkServer.sh status
通过status就能看到状态:
./zkServer.sh status
JMX enabled by default
Using config: /opt/zookeeper/zookeeper-3.4.6/bin/../conf/zoo.cfg #配置文件
Mode: follower #他是否为领导
zk集群一般只有一个leader,多个follower,主一般是相应客户端的读写请求,而从主同步数据,当主挂掉之后就会从follower里投票选举一个leader出来。
可以用“jps”查看zk的进程,这个是zk的整个工程的main
#执行命令jps
20348 Jps
4233 QuorumPeerMain
四、搭建Kafka集群
-
将Kafka的安装包上传到虚拟机,并解压
cd /export/software/
tar -xvzf kafka_2.12-2.4.1.tgz -C ../server/
cd /export/server/kafka_2.12-2.4.1/
2.修改 server.properties
cd /export/server/kafka_2.12-2.4.1/config
vim server.properties
# 指定broker的id
broker.id=0
# 指定Kafka数据的位置
log.dirs=/export/server/kafka_2.12-2.4.1/data
# 配置zk的三个节点
zookeeper.connect=node1:2181,node2:2181,node3:2181
3.将安装好的kafka复制到另外两台服务器
cd /export/server
scp -r kafka_2.12-2.4.1/ node2:$PWD
scp -r kafka_2.12-2.4.1/ node3:$PWD
修改另外两个节点的broker.id分别为1和2
---------node2--------------
cd /export/server/kafka_2.12-2.4.1/config
vim erver.properties
broker.id=1
--------node3--------------
cd /export/server/kafka_2.12-2.4.1/config
vim server.properties
broker.id=2
4.配置KAFKA_HOME环境变量
vim /etc/profile
export KAFKA_HOME=/export/server/kafka_2.12-2.4.1
export PATH=:$PATH:${KAFKA_HOME}
分发到各个节点
scp /etc/profile node2:$PWD
scp /etc/profile node3:$PWD
每个节点加载环境变量
source /etc/profile
5.启动服务器
# 启动ZooKeeper
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
# 启动Kafka
cd /export/server/kafka_2.12-2.4.1
nohup bin/kafka-server-start.sh config/server.properties &
# 测试Kafka集群是否启动成功
bin/kafka-topics.sh --bootstrap-server node1:9092 --list
总结
Kafka集群部署需遵循分布式架构设计原则,核心环节包括环境准备、配置优化与服务验证。首先基于CentOS/Linux环境搭建多节点集群,通过SSH密钥实现节点间免密通信,并同步系统时间(NTP)。安装阶段需下载对应版本Kafka二进制包及ZooKeeper组件,优先采用独立部署模式以降低耦合风险。
配置层面,server.properties文件需重点设置broker.id唯一标识、log.dirs日志存储路径及zookeeper.connect连接地址,启用auto.create.topics.enable自动主题创建功能。生产环境建议开启SASL/SCRAM认证与SSL传输加密,通过advertised.listeners配置外部访问地址。ZooKeeper集群需满足奇数节点原则(如3/5节点),调整tickTime与session超时参数匹配业务需求。
- 点赞
- 收藏
- 关注作者
评论(0)