Kafka详解
Kafka介绍
Kafka是由LinkedIn开发的一个分布式的消息系统,使用Scala编写,它以可水平扩展和高吞吐率而被广泛使用。目前越来越多的开源分布式处理系统如Cloudera、Apache Storm、Spark都支持与Kafka集成。Kafka是一种高吞吐量的分布式发布订阅消息系统,有如下特性:
1. 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
2. 高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息。
3. 支持通过Kafka服务器和消费机集群来分区消息。
4. 支持Hadoop并行数据加载。
Kafka架构图
Broker: Kafka集群包含一个或多个服务器,这种服务器被称为broker Topic: 每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
Partition: Parition是物理上的概念,每个Topic包含一个或多个Partition。
Producer: 负责发布消息到Kafka broker
Consumer: 消息消费者,向Kafka broker读取消息的客户端。
Consumer Group:每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。
一个典型的Kafka集群中包含若干Producer(可以是web前端产生的Page View,或者是服务器日志,系统CPU、Memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。
Kafka的消息持久化和顺序读写
Kafka没有使用内存作为缓存,选用了硬盘直接读写,直接将数据顺序地持久化到磁盘上; 直接写到磁盘,防止数据丢失。当然这里也有策略,一个67200rpm STAT RAID5的阵列, 线性读写速度是300MB/sec,如果是随机读写,速度则是50K/sec。差距很明显,所以Kafka选的策略就是利用线性存储,通过顺序读写保证了性能.
Kafka的应用
1. 监控:主机通过Kafka发送与系统和应用程序健康相关的指标,然后这些信息会被收集和处理从而创建监控仪表盘并发送警告。除此之外,LinkedIn还利用Apache Samza实现了一个能够实时处理事件的富调用图分析系统。
2. 传统的消息: 应用程度使用Kafka作为传统的消息系统实现标准的队列和消息的发布—订阅,例如搜索和内容提要(Content Feed)。
3. 分析: 为了更好地理解用户行为,改善用户体验,LinkedIn会将用户查看了哪个页面、点击了哪些内容等信息发送到每个数据中心的Kafka集群上,并通过Hadoop进行分析、生成日常报告。
4. 作为分布式应用程序或平台的构件(日志):大数据仓库解决方案Pinot等产品将Kafka作为核心构件(分布式日志),分布式数据库Espresso将其作为内部副本并改变传播层。
Kafka的客户端
支持多种语言,C/C++、Java、Python、PHP等,参考:https://cwiki.apache.org/confluence/display/KAFKA/Clients Kafka API简单易用,日志存储之利器。
Storm和Kafka的对接:KafkaSpout
Strom从Kafka中读取数据,本质:实现一个Storm中的Spout,来读取Kafka中的数据;这个Spout,可以称为Kafka Spout。支持两种类型的Spout:
1. Core storm spout;
2. Trident spout;
部署
1. 在nimbus、slave1、slave2机器的主目录下分别下载Kafka安装包 wget http://mirror.bit.edu.cn/apache/kafka/0.8.2.2/kafka_2.11-0.8.2.2.tgz
2. 创建kafka目录,并将安装包移到kafka目录下
3. 进入kafka目录,并解压该安装包
4. Kafka配置文件修改
进入kafka配置目录下,修改配置文件zookeeper.properties、consumer.properties、producer.properties、server.properties
a. 修改zookeeper.properties
b. 修改consumer.properties
c. 修改producer.properties
d. 修改server.properties
e. 分别修改nimbus、slave1、slave2上的server.properties(broker.id)
nimbus:
slave1:
slave2:
5. 启动kafka集群
分别在nimbus、slave1、slave2上执行如下命令:
bin/kafka-server-start.sh config/server.properties &
6. 测试集群
a. 在slave2上创建一个生产者(topic会自动生成)
b. 在nimbus上创建一个消费者(topic和生产者的相同)
c. 在生产者上面输入Hello,World
d. 可以看到消费者上面会输出Hello,World
说明Kafka集群搭建成功,后面就可以正常使用Kafka
- 点赞
- 收藏
- 关注作者
评论(0)