消息中间件之三Kafa分析使用
Kafka 是个分布式的、持分区的(partition)、多副本的 (replica),基于 zookeeper 协调的分布式消息系统,它最大的特性就是可以实时处理大量数据以满足各类需求场景:
-
日志收集:使用 Kafka 收集各种服务的日志,并通过 kafka 以统一接口服务的方式开放给各种 consumer,例如 hadoop、Hbase、Solr 等
-
消息系统:解耦和生产者和消费者、缓存消息等
-
用户活动跟踪:Kafka 经常被用来记录 web 用户或者 app 用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到 kafka 的 topic 中,然后订阅者通过订阅这些 topic 来做实时的监控分析,或者装载到 hadoop、数据仓库中做离线分析和挖掘
-
运营指标:Kafka 也经常用来记录运营监控数据,包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告
日志是kafka服务端代码的重要组件之一,很多其他的核心组件是以日志为基础的,后续会讲解状态机和副本管理等。日志文件中一串的0是该日志段的起始位移值,即base offset。Kafka日志在磁盘上的组织目录结构如下图所示。
日志中包含多个日志段,而每个日志段又包含:消息日志文件、位移索引文件、时间戳索引文件、已终止的事务索引文件。后续将对kafa 日志模块的源码进行详细分析。
目录名称 | 说明 |
---|---|
bin | Kafka的所有执行脚本都在这里。例如:启动Kafka服务器、创建Topic、生产者、消费者程序等等 |
config | Kafka的所有配置文件 |
libs | 运行Kafka所需要的所有JAR包 |
logs | Kafka的所有日志文件,如果Kafka出现一些问题,需要到该目录中去查看异常信息 |
site-docs | Kafka的网站帮助文件 |
二、Kafka一键启动/关闭脚本
为了方便将来进行一键启动、关闭Kafka,我们可以编写一个shell脚本来操作。将来只要执行一次该脚本就可以快速启动/关闭Kafka。
启动:
vim start-kafka.sh
#!/bin/sh
for host in node1 node2 node3
do
ssh $host "source /etc/profile;nohup kafka-server-start.sh /export/servers/kafka/config/server.properties >/dev/null 2>&1 &"
echo "$host kafka is running"
done
# 写好后保存,加上执行权限
chmod u+x start-kafka.sh
关闭:
-
-
kafka 的config目录下 有些问题,需要先修改一下官方提供的stop脚本,集群中的每一台机器都要改(注意 kafka 里面的k是小写)
PIDS=$(ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk '{print $1}') 改为 PIDS=$(ps ax | grep -i 'kafka' | grep java | grep -v grep | awk '{print $1}')
-
-
-
新建stop-kafka.sh
vim stop-kafka.sh #! /bin/sh for host in node-1 node-2 node-3 do ssh $host "source /etc/profile; /export/servers/kafka/bin/kafka-server-stop.sh" echo "$host kafka is stopping" done # 写好后保存,加上执行权限 chmod u+x stop-kafka.sh
-
运行:
./start-kafka.sh
./stop-kafka.sh
三、性能优化
Kafka在提高效率方面做了很大努力。Kafka的一个主要使用场景是处理网站活动日志,吞吐量是非常大的,每个页面都会产生好多次写操作。读方面,假设每个消息只被消费一次,读的量的也是很大的,Kafka也尽量使读的操作更轻量化。
我们之前讨论了磁盘的性能问题,线性读写的情况下影响磁盘性能问题大约有两个方面:太多的琐碎的I/O操作和太多的字节拷贝。I/O问题发生在客户端和服务端之间,也发生在服务端内部的持久化的操作中。
消息集(message set)
为了避免这些问题,Kafka建立了“消息集(message set)”的概念,将消息组织到一起,作为处理的单位。以消息集为单位处理消息,比以单个的消息为单位处理,会提升不少性能。Producer把消息集一块发送给服务端,而不是一条条的发送;服务端把消息集一次性的追加到日志文件中,这样减少了琐碎的I/O操作。consumer也可以一次性的请求一个消息集。
另外一个性能优化是在字节拷贝方面。在低负载的情况下这不是问题,但是在高负载的情况下它的影响还是很大的。为了避免这个问题,Kafka使用了标准的二进制消息格式,这个格式可以在producer,broker和producer之间共享而无需做任何改动。
zero copy
Broker维护的消息日志仅仅是一些目录文件,消息集以固定队的格式写入到日志文件中,这个格式producer和consumer是共享的,这使得Kafka可以一个很重要的点进行优化:消息在网络上的传递。现代的unix操作系统提供了高性能的将数据从页面缓存发送到socket的系统函数,在linux中,这个函数是sendfile.
为了更好的理解sendfile的好处,我们先来看下一般将数据从文件发送到socket的数据流向:
-
操作系统把数据从文件拷贝内核中的页缓存中
-
应用程序从页缓存从把数据拷贝自己的内存缓存中
-
应用程序将数据写入到内核中socket缓存中
-
操作系统把数据从socket缓存中拷贝到网卡接口缓存,从这里发送到网络上。
这显然是低效率的,有4次拷贝和2次系统调用。Sendfile通过直接将数据从页面缓存发送网卡接口缓存,避免了重复拷贝,大大的优化了性能。
在一个多consumers的场景里,数据仅仅被拷贝到页面缓存一次而不是每次消费消息的时候都重复的进行拷贝。这使得消息以近乎网络带宽的速率发送出去。这样在磁盘层面你几乎看不到任何的读操作,因为数据都是从页面缓存中直接发送到网络上去了。
数据压缩
很多时候,性能的瓶颈并非CPU或者硬盘而是网络带宽,对于需要在数据中心之间传送大量数据的应用更是如此。当然用户可以在没有Kafka支持的情况下各自压缩自己的消息,但是这将导致较低的压缩率,因为相比于将消息单独压缩,将大量文件压缩在一起才能起到最好的压缩效果。
Kafka采用了端到端的压缩:因为有“消息集”的概念,客户端的消息可以一起被压缩后送到服务端,并以压缩后的格式写入日志文件,以压缩的格式发送到consumer,消息从producer发出到consumer拿到都被是压缩的,只有在consumer使用的时候才被解压缩,所以叫做“端到端的压缩”。
总结
Kafka作为分布式消息中间件,其核心架构基于Broker集群与ZooKeeper协调,通过Producer-Consumer模式实现高吞吐消息传递。部署时需构建多Broker集群,配置broker.id、log.dirs及zookeeper.connect等参数,优先采用SSD存储日志并启用分区副本机制。启动顺序需确保ZooKeeper集群先行,再逐台启动Broker以完成元数据注册。
使用层面,Producer通过key-value分区策略实现负载均衡,Consumer以Consumer Group形式订阅主题,利用Offset管理消费进度。为提升性能,需调整batch.size(批次大小)、linger.ms(发送延迟)及启用compression.type(如LZ4/Snappy)。硬件层面建议配置RAID卡或NVMe磁盘,调大JVM堆内存并优化GC策略。操作系统需调高file-max文件句柄数,启用TCP nodelay减少网络延迟。
监控方面,通过JMX暴露Broker吞吐量、Replication延迟等指标,结合Grafana可视化异常波动。针对数据倾斜问题,可动态扩缩分区或调整Key哈希算法。生产环境推荐开启SASL认证与SSL加密,并通过min.insync.replicas保障消息可靠性。
- 点赞
- 收藏
- 关注作者
评论(0)