消息系统之 Kafka (01)
【摘要】 什么是消息系统消息系统是专用的中间件,负责将数据从一个应用传递到另外一个应用。使应用只需关注于数据,无需关注数据在两个或多个应用间是如何传递的。消息系统一般基于可靠的消息队列来实现,使用点对点模式或发布订阅模式。数据实时在消息系统中传递,被看作流。为什么使用消息系统使用消息系统具有以下优势:解耦:发送方和接收方统一使用消息系统提供的接口进行通信,易修改易扩展。持久化:传递过程中消息存储到本地...
什么是消息系统
消息系统是专用的中间件,负责将数据从一个应用传递到另外一个应用。使应用只需关注于数据,无需关注数据在两个或多个应用间是如何传递的。
消息系统一般基于可靠的消息队列来实现,使用点对点模式或发布订阅模式。数据实时在消息系统中传递,被看作流。
为什么使用消息系统
使用消息系统具有以下优势:
- 解耦:发送方和接收方统一使用消息系统提供的接口进行通信,易修改易扩展。
- 持久化:传递过程中消息存储到本地磁盘,防止处理数据失败导致数据丢失。
- 均衡负载:分布式系统能根据负载灵活调整机器数量,能够处理高吞吐量和流量突增的情况。
除此之外,消息系统还可以保障:
- 保障有序:数据处理的顺序不被打乱。
- 传递加速:通过缓冲层控制和优化数据流经过系统的速度。
- 延时处理:提供了异步处理机制,允许用户把消息放入队列,但并不立即处理它。
什么是 Kafka
Kafka 作为当前最常用的消息系统之一,一般用于日志收集的离线系统。采用发布订阅模式,由通过高性能 TCP 网络协议进行通信的服务器和客户端组成。
Kafka 使用 scala 开发,由 LinkedIn 开源,目前已捐献给 Apache 基金会。
Kafka 官网 http://Kafka.apache.org/
Kafka 的优劣势
优势
- 快速持久化,可以在O(1)的系统开销下进行消息持久化;
- IO 吞吐量高,使用 partition 把队列流量均匀分散在多台机器上,单台服务器可以达到 10W/s 的吞吐速率。
劣势
- 不进行消息重复性检查,可能导致消费重复数据或者异常情况下的数据丢失。
- 实时性方面也存在少量延迟。
生产者/消费者模式
Kafka 是一个分布式系统,由服务器和客户端组成,之间通过高性能 TCP 网络协议进行通信。
-
服务器以
Cluster
为单位向外提供服务,由多个Broker
组成。Broker 作为 Kafka 的服务节点,接收外部生产的数据,在本地磁盘对数据进行备份,并提供数据给指定的接收者。 -
客户端分为以下两种类型:
Producer
: 数据生产者,向 Kafka 集群生产数据。Consumer
:数据消费者,读取 Kafka 集群生产者生产的消息。
-
组件之间通过
Zookeeper
进行协调。ZooKeeper 会保存 Broker 和 Consumer 的元数据信息,并进行数据变更的监控。并负责选举出 Cluster 内的 Controller (其中一个 Broker),管理 Zookeeper 上的元数据信息。
数据分片模型
Kafka 消息按照 Topic
进行数据的组织和隔离,Producer/Consumer 会向指定的 Topic 收发数据。
在服务器端,Topic 则按 Patition
进行分区,同一个 Topic 的 Partition 会散落在多个 Broker 上,存储为一个阻塞队列,从而达到了数据分布式存储的目的。Producer 可以指定发送的 Partition 以保证消息有序到达。
每个 Consumer Group
都会消费一个 Topic 全量的数据,彼此之间互不干扰。同一个 Consumer Group 下的 Consumer 只能消费到其中一部分 Partition ,通过多个 Consumer 可以达到并行消费的目的。Partition 数量推荐设为 Consumer 数量的整数倍,便于均分。
多副本模型
为了提高可用性,避免 Broker 损坏导致的 Partition 不可用或者丢失问题,Kafka 会对每个 Partition 提供多个副本(默认为 3 个),其中有且仅有一个作为 Leader
,负责数据的读写。其他副本 Follower
将存放在不同的 Broker 上,通过接收 Leader 广播将数据同步到本地。
每个 Leader Partition 维护一个独立的 ISR
列表,记录当前同步的 Follower 集合:
- 如果 Follower 不能及时同步(延迟时间高或延迟条数超过阈值)就会被暂时踢出 ISR 。
- 如果 Leader 不可用将从 ISR 中选出一个 Follower 担任 Leader 。
消息定位
定位方式
kafka 用 Offset
表示 Message 在 Partition 中的偏移量,通过 Offset 可以唯一确定 Partition 中的一条 Message 。
- 生产者 Offset (current position)
每个 Partition 只有一个,表示当前消息生产到的位置。
- 消费者 Offset (committed offset)
每个 Partition 可以有多个,取决于消费的 ConsumeGroup 数量。消费者 Offset 会记录到 Kafka 自带 Topic(__consumer_offsets) 内,表示当前消费到的位置。
参数 | 含义 |
---|---|
Group | 消费者组 |
Topic | topic 名称 |
Pid | partition ID |
Offset | 消费者在对应分区上已消费消息数 |
logSize | 已经写到该分区的消息数 |
Lag | 还有多少消息未读取(Lag = logSize - Offset) |
Owner | 分区所属 broker |
搭建 Broker
在服务器搭建 Broker ,需要通过指令来完成。本文所有的操作都是在MacOS系统上使用。如果是在Linux操作系统下进行实验,使用的命令是相同的;如果是在windows操作系统下进行实验,则需要使用对应的bin/windows目录下的bat文件。
# 最大offset
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test_topic --time -1
# 最小offset
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test_topic --time -2
# offset
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test_topicCopy to clipboardErrorCopied
# 列出当前 kafka 所有的 topic
bin/kafka-topics.sh --zookeeper localhost:2181 --list
# 创建 topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic test_topic --replication-factor 1 --partitions 1
bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic test_topic --replication-factor 3 --partitions 10 --config cleanup.policy=compact
bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic test_topic --partitions 1 --replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1
# 查看某 topic 具体情况
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test_topic
# 修改 topic (分区数、特殊配置如compact属性、数据保留时间等)
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --partitions 3 --config cleanup.policy=compact --topic test_topic
# 修改 topic (也可以用这种)
bin/kafka-configs.sh --alter --zookeeper localhost:2181 --entity-name test_topic --entity-type topics --add-config cleanup.policy=compact
bin/kafka-configs.sh --alter --zookeeper localhost:2181 --entity-name test_topic --entity-type topics --delete-config cleanup.policyCopy to clipboardErrorCopied
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)