【云小课】EI第37课 MRS基础原理之Kafka组件介绍

举报
Hi,EI 发表于 2021/12/08 21:21:19 2021/12/08
【摘要】 Kafka是一个分布式的、分区的、多副本的消息发布-订阅系统,它提供了类似于JMS的特性,但在设计上完全不同,它具有消息持久化、高吞吐、分布式、多客户端支持、实时等特性,适用于离线和在线的消息消费,如常规的消息收集、网站活性跟踪、聚合统计系统运营数据(监控数据)、日志收集等大量数据的互联网服务的数据收集场景。

Kafka是一个分布式的、分区的、多副本的消息发布-订阅系统,它提供了类似于JMS的特性,但在设计上完全不同,它具有消息持久化、高吞吐、分布式、多客户端支持、实时等特性,适用于离线和在线的消息消费,如常规的消息收集、网站活性跟踪、聚合统计系统运营数据(监控数据)、日志收集等大量数据的互联网服务的数据收集场景。

Kafka主要特点如下:

  • 可靠性:提供At-Least OnceAt-Most OnceExactly Once消息可靠传递。消息被处理的状态是在Consumer端维护,需要结合应用层实现Exactly Once
  • 高吞吐:同时为发布和订阅提供高吞吐量。
  • 持久化:将消息持久化到磁盘,因此可用于批量消费,以及实时应用程序。通过将数据持久化到硬盘以及replication防止数据丢失。
  • 分布式:分布式系统,易于向外扩展。所有的Producer、Broker和Consumer都支持部署多个形成分布式的集群。无需停机即可扩展系统。

image008.jpg

Kafka结构

生产者(Producer)将消息发布到Kafka主题(Topic)上,消费者(Consumer)订阅这些主题并消费这些消息。在Kafka集群上一个服务器称为一个Broker。对于每一个主题,Kafka集群保留一个用于缩放、并行化和容错性的分区(Partition)。每个分区是一个有序、不可变的消息序列,并不断追加到提交日志文件。分区的消息每个也被赋值一个称为偏移顺序(Offset)的序列化编号。

image010.png

名称

说明

Broker

Kafka集群上一个服务器称为一个Broker

Topic/主题

一个Topic就是一个类别或者一个可订阅的条目名称,也即一类消息。一个主题可以有多个分区,这些分区可以作为并行的一个单元。

Partition/分区

是一个有序的、不可变的消息序列,这个序列可以被连续地追加—个提交日志。在分区内的每条消息都有一个有序的ID号,这个ID号被称为偏移(Offset),这个偏移量可以唯一确定每条消息在分区内的位置。

Producer/生产者

Kafka的主题发布消息。

Consumer/消费者

Topic订阅,并且接收发布到这些Topic的消息。


image012.png

消费者使用一个消费者组名称来标记自己,主题的每个消息被传递给每个订阅消费者组中的一个消费者。如果所有的消费者实例都属于同样的消费组,它们就以传统队列负载均衡方式工作。如上图中,Consumer1与Consumer2之间为负载均衡方式;Consumer3、Consumer4、Consumer5与Consumer6之间为负载均衡方式。如果消费者实例都属于不同的消费组,则消息会被广播给所有消费者。如上图中,Topic1中的消息,同时会广播到Consumer Group1与Consumer Group2中。

Kafka原理

    消息可靠性

    Kafka Broker收到消息后,会持久化到磁盘,同时,Topic的每个Partition有自己的Replica(备份),每个Replica分布在不同的Broker节点上,以保证当某一节点失效时,可以自动故障转移到可用消息节点。

    高吞吐量

    Kafka通过以下方式提供系统高吞吐量:

    • 数据磁盘持久化:消息不在内存中cache,直接写入到磁盘,充分利用磁盘的顺序读写性能。
    • Zero-copy:减少IO操作步骤。
    • 数据批量发送:提高网络利用率。
    • Topic划分为多个Partition,提高并发度,可以由多个ProducerConsumer数目之间的关系并发来读、写消息。Producer根据用户指定的算法,将消息发送到指定的Partition

    消息订阅-通知机制

    消费者对感兴趣的主题进行订阅,并采取pull的方式消费数据,使得消费者可以根据其消费能力自主地控制消息拉取速度,同时,可以根据自身情况自主选择消费模式,例如批量、重复消费,从尾端开始消费等;另外,需要消费者自己负责维护其自身消息的消费记录。

    可扩展性

    当在Kafka集群中可通过增加Broker节点以提供更大容量时。新增的Broker会向ZooKeeper注册,而ProducerConsumer会及时从ZooKeeper感知到这些变化,并及时作出调整。

    使用Kafka客户端

    MRS集群的创建您可参考创建集群

    Kafka客户端的安装您可以参考安装客户端

    1. 以客户端安装用户,登录安装客户端的节点。执行以下命令,切换到客户端安装目录。
      cd /opt/Bigdata/client
      source bigdata_env
    2. 若集群开启了Kerberos认证,需提前准备具有Kafka操作权限的用户并进行认证。
      kinit 组件业务用户
    3. 创建一个Topic
      cd Kafka/kafka/bin/
      sh kafka-topics.sh --create --topic 主题名称 --partitions 主题占用的分区数 --replication-factor 主题的备份个数 --zookeeper ZooKeeper角色实例所在节点IP地址:clientPort/kafka
      例如执行:
      sh kafka-topics.sh --create --topic Test --partitions 2 --replication-factor 2 --zookeeper 192.168.67.136:24002/kafka
    4. 执行以下命令,查询集群中的Topic信息:
      sh kafka-topics.sh --list --zookeeper ZooKeeper角色实例所在节点IP地址:ZooKeeper端口/kafka
      例如执行:
      sh kafka-topics.sh --list --zookeeper 192.168.67.136:24002/kafka
      ...
      Test
      __KafkaMetricReport
      __consumer_offsets
      __default_metrics
      ...
    5. 登录集群的FusionInsight Manager,单击“集群 > 服务 > Kafka > KafkaTopic监控”可从管理界面中快速查看Topic状态。

      image014.png

    6. 继续执行以下命令,向Test Topic中写入数据。
      cat ../NOTICE | kafka-console-producer.sh --broker-list Broker实例IP地址:Broker端口 --topic Topic名称 --producer.config ../config/producer.properties
      例如执行:
      cat ../NOTICE | kafka-console-producer.sh --broker-list 192.168.42.14:21007 --topic Test --producer.config ../config/producer.properties
    7. 将Test Topic中的内容读取出来。

      kafka-console-consumer.sh --consumer.config ../config/consumer.properties --bootstrap-server Broker实例IP地址:Broker端口 --topic Topic名称 --from-beginning
      例如执行:
      kafka-console-consumer.sh --consumer.config ../config/consumer.properties --bootstrap-server 192.168.42.14:21007 --topic Test --from-beginning
      Apache Kafka
      Copyright 2019 The Apache Software Foundation.
      
      This product includes software developed at
      The Apache Software Foundation (https://www.apache.org/).
      
      This distribution has a binary dependency on jersey, which is available under the CDDL
      License. The source code of jersey can be found at https://github.com/jersey/jersey/.
      ...
    8. 退出显示窗口,执行以下命令删除创建的Topic
      sh kafka-topics.sh --delete --topic Test --zookeeper 192.168.67.136:24002/kafka


    关于Kafka应用开发及相关样例代码介绍,请参考Kafka开发指南》

    image016.png

    好了,本期云小课就介绍到这里,快去体验MapReduce(MRS)更多功能吧!猛戳这里

    【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
    • 点赞
    • 收藏
    • 关注作者

    评论(0

    0/1000
    抱歉,系统识别当前为高风险访问,暂不支持该操作

    全部回复

    上滑加载中

    设置昵称

    在此一键设置昵称,即可参与社区互动!

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。