Kafka入门之旅
(续)
5 消费者读取消息
我们已经知道了生产者的工作原理,现在是时候来看看Kafka的消费者。假设你正在构建一个原型应用程序用于展示ZMart最近的销售统计数据。对于这个示例,将消费先前生产者示例中发送的消息。因为这个原型处于早期阶段,所以此时要做的就是消费消息并将消息打印到控制台。
{注意}
因为本书所探讨的Kafka Streams的版本要求Kafka的版本为0.10.2或者更高版本,所以我们仅讨论新的消费者,它是在Kafka 0.9版本中发布的。
KafkaConsumer
是用来从Kafka消费消息的客户端。KafkaConsumer
类很容易使用,但是有一些操作事项需要重视。图2-15展示了ZMart的体系架构,突出了消费者在数据流中所起的作用。
图2-15 这些是从Kafka读取消息的消费者,正如生产者不知道消费者一样,消费者从Kafka读取消息时也不知道是谁生产的消息
5.1 管理偏移量
KafkaProducer
基本上是无状态的,然而KafkaConsumer
需要周期性地提交从Kafka消费的消息的偏移量来管理一些状态。偏移量唯一标识消息,并表示消息在日志中的起始位置。消费者需要周期性地提交它们已接收到的消息的偏移量。
对一个消费者来说,提交一个偏移量有两个含义。
提交意味着消费者已经完全处理了消息。
提交也表示在发生故障或者重启时该消费者消费的起始位置。
如果创建了一个新消费者实例或者发生了某些故障,并且最后提交的偏移量不可用,那么消费者从何处开始消费取决于具体的配置。
auto.offset.reset="earliest"
——将从最早可用的偏移量开始检索消息,任何尚未被日志管理进程移除的消息都会被检索到。auto.offset.reset="latest"
——将从最新的偏移量开始检索消息,本质上仅从消费者加入集群的时间点开始消费消息。auto.offset.reset="none"
——不指定重置策略,代理将会向消费者抛出异常。
从图2-16可以看到选择不同的auto.offset.reset
设置的影响。如果设置为earliest
,那么收到消息的起始偏移量是0
;如果设置为latest
,那么取得消息的起始偏移量为11。
图2-16 将auto.offset.reset
设置为earliest
与latest的
图形对比表示。设置为earliest
,消费者将会得到所有未被删除的消息;设置为latest
意味着消费者需要等待下一条可用消息到达
接下来,我们需要讨论偏移量提交的选项,你可以自动提交也可以手动提交。
5.2 自动提交偏移量
默认情况下,消费者使用的是自动提交偏移量,通过enable.auto.commit
属性进行设置。还有一个与enable.auto.commit
配合使用的配置项auto.commit.interval.ms
,用来指定消费者提交偏移量的频率(默认值是5秒)。调整这个频率值要谨慎,如果设置太小,将会增加网络流量;如果设置太大,可能会导致在发生故障或重启时消费者收到大量重复数据。
5.3 手动提交偏移量
手动提交偏移量有两种方式——同步和异步。同步提交方式的代码如下:
无参的commitSync()
方法在上一次检索(轮询)成功返回所有的偏移量之前会一直阻塞,此方法适用于所有订阅的主题和分区。另一个方法需要一个Map<TopicPartiton
,OffsetAndMetadata>
类型的参数,它只会提交Map
中指定的偏移量、分区和主题。
异步提交也有与同步提交类似的方法,consumer.commitAsync()
方法是完全异步的,提交后立即返回。其中一个重载方法是无参的,两个consumer.commitAsync
方法都可选择地提供一个OffsetCommitCallback
回调对象,它在提交成功或者失败时被调用。通过提供回调实例可以实现异步处理或者异常处理。使用手工提交的好处是可以直接控制记录何时被视为已处理。
5.4 创建消费者
创建一个消费者与创建一个生产者类似,提供一个以java.util.Properties
形式的Java对象的配置,然后返回一个KafkaConsumer
实例。该实例订阅由主题名称列表提供或者由正则表达式指定的主题。通常,会在一个循环中以指定毫秒级的间隔周期性地运行消费者轮询。
轮询的结果是一个ConsumerRecords<K
,V>
对象,ConsumerRecords
实现了Iterable
接口,每次调用next()
方法返回一个包括消息的元数据以及实际的键和值的ConsumerRecord
对象。
在处理完上一次轮询调用返回的所有ConsumerRecord
对象之后,又会返回到循环的顶部,再次轮询指定的同期。实际上,期望消费者以这种轮询方式无限期地运行,除非发生错误或者应用程序需要关闭和重启(这就是提交的偏移量要发挥作用的地方——在重启时,消费者从停止的地方继续消费)。
5.5 消费者和分区
通常需要多个消费者实例——主题的每个分区都有一个消费者实例。可以让一个消费者从多个分区中读取数据,但是通常的做法是使用一个线程数与分区数相等的线程池,每个线程运行一个消费者,每个消费者被分配到一个分区。
这种每个分区一个消费者的模式最大限度地提高了吞吐量,但如果将消费者分散在多个应用程序或者服务器上时,那么所有实例的线程总数不要超过主题的分区总数。任何超过分区总数的线程都将是空闲的。如果一个消费者发生故障,领导者代理将会把分配给该故障消费者的分区重新分配给另一个活跃的消费者。
{注意}
这个例子展示了一个消费者订阅一个主题的情况,但是这种情况仅是为了阐述的目的。大家可以让一个消费者订阅任意数量的主题。
领导者代理将主题的分区分配给具有相同group.id
的所有可用的消费者,group.id
是一个配置项,用来标示消费者属于哪一个消费者组——这样一来,消费者就不需要位于同一台机器上。事实上,最好让消费者分散在几台机器上。这样,当一台服务器发生故障时,领导者代理可以将主题分区重新分配给一台正常运行的机器上的消费者。
5.6 再平衡
在2.5.5节中描述的向消费者添加和移除主题分区(topic-partition)分配的过程被称为再平衡。分配给消费者的主题分区不是静态的,而是动态变化的。当添加一些具有相同消费者组ID的消费者时,将会从活跃的消费者中获取一些当前的主题分区,并将它们分配给新的消费者。这个重新分配的过程持续进行,直到将每个分区都分配给一个正在读取数据的消费者。
在达到这个平衡点之后[17],任何额外的消费者都将处于空闲状态。当消费者不管由于什么原因离开消费者组时,分配给它们的主题分区被重新分配给其他消费者。
5.7 更细粒度的消费者分配
在2.5.5节中,我们描述了使用线程池及多个消费者(在同一个消费者组)订阅同一个主题。尽管Kafka会平衡所有消费者的主题分区负载,但是主题和分区的分配并不是确定性的,你并不知道每个消费者将收到哪个主题分区对。
KafkaConsumer
有一个允许订阅特定主题和分区的方法,代码如下:
在手动进行主题分区分配时,需要权衡以下两点。
故障不会导致重新分配主题分区,即使对于使用相同消费者组ID的消费者。任何分配上的变化都需要调用一次
consumer.assign
方法。消费者指定消费者组是用于提交偏移量,但是由于每个消费者是独立运行的,因此给每个消费者指定一个唯一的消费者组ID是一个很好的想法。
5.8 消费者示例
代码清单2-5给出的是ZMart原型消费者的代码,该消费者消费交易数据并打印到控制台。完整代码可以在源代码src/main/java/bbejeck.chapter_2/consumer/ThreadedConsumerExample.java类中找到。
代码清单2-5 ThreadedConsumerExample
示例
这个例子省略了类的其他代码——它不会独立存在。可以在本章的源代码中找到完整的示例。
6 安装和运行Kafka
当我写本书时,Kafka的最新版本是1.0.0。因为Kafka是一个Scala项目,所以每次发布有两个版本:一个用于Scala 2.11;另一个用于Scala 2.12。本书使用Scala 2.12版本的Kafka。尽管大家可以下载发行版,本书源代码中也包括Kafka的二进制发行版,它将与本书阐述和描述的Kafka Streams一起工作。要安装Kafka,从本书repo管理的源代码中提取.tgz文件,放到自己机器上的某个目录中。
{注意}
Kafka的二进制版本包括Apache ZooKeeper,因此不需要额外的安装工作。
6.1 Kafka本地配置
如果接受Kafka的默认配置,那么本地运行Kafka需要配置的地方就很少。默认情况下,Kafka使用9092端口,ZooKeeper使用2181端口。假设本地没有应用程序使用这些端口,那么一切就绪了。
Kafka将日志写入/tmp/kafka-logs目录下,ZooKeeper使用/tmp/zookeeper目录存储日志。根据自身服务器情况,可能需要更改这些目录的权限或所有权,抑或是需要修改写日志的位置。
为了修改Kafka日志目录,cd
命令进入Kafka安装路径的config目录,打开server. properties文件,找到log.dirs
配置项,修改该配置项的值为任何你想使用的路径。在同一个目录下,打开zookeeper.properties文件,可以修改dataDir
配置项的值。
稍后我们将会在本书中详细介绍Kafka的配置,但现在所需要做的配置仅此而已。需要注意的是,这些说的“日志”是Kafka和ZooKeeper的真实数据,并不是用于跟踪应用行为的应用层面的日志。应用日志位于Kafka安装目录的logs目录下。
6.2 运行Kafka
Kafka启动很简单,由于ZooKeeper对于Kafka集群正确运行(ZooKeeper决定领导者代理、保存主题信息、对集群中各成员执行健康检查等)是必不可少的,因此在启动Kafka之前需要先启动ZooKeeper。
{注意}
从现在开始,所有对目录的引用均假设当前工作在Kafka安装目录下。如果使用的是Windows机器,目录是Kafka安装目录下的/bin/windows。
1.运行ZooKeeper
要启动ZooKeeper,打开命令提示符,输入以下命令:
该命令执行后,在屏幕上会看到很多信息,但结尾会看到与图2-17所示类似的信息。
图2-17 当ZooKeeper启动时,在控制台可以看到的输出信息
2.启动Kafka
打开另一个命令提示符,输入以下命令,启动Kafka:
同样,会在屏幕上看到滚动的文本。当Kafka完全启动时,会看到与图2-18所示类似的信息。
图2-18 Kafka启动时的输出信息
{提示}
ZooKeeper对Kafka运行必不可少,因此在关闭时要调换顺序:先关闭Kafka,再关闭ZooKeeper。要关闭Kafka,可以在Kafka运行终端按下Ctrl+C,或在另一个终端执行kafka-server-stop.sh
脚本。除了关闭脚本是zookeeper-server-stop.sh
,关闭ZooKeeper的操作与关闭Kafka的操作相同。
6.3 发送第一条消息
既然Kafka已启动并开始运行了,现在是时候使用Kafka来发送消息和接收消息了。但是,在发送消息前,需要先为生产者定义一个发送消息的主题。
1.第一个主题
在Kafka中创建一个主题很简单,仅需要运行一个带有一些配置参数的脚本。配置很简单,但是这些配置的设置有广泛的性能影响。
默认情况下,Kafka被配置为自动创建主题,这意味着如果尝试向一个不存在的主题发送或读取消息,那么Kafka代理就会创建一个主题(使用server.properties文件中的默认配置)。即使在开发中,依靠代理创建主题也不是一个好的做法,因为第一次尝试生产或消费会失败,这是由于需要时间来传播关于主题存在的元数据信息。需要确保总是主动地创建主题。
2.创建一个主题
要创建主题,需要运行kafka-topics.sh脚本。打开一个终端窗口,运行以下命令:
当脚本执行后,在终端控制台应该会看到类似如图2-19所示的信息。
图2-19 这是创建主题的结果,事先创建主题很重要,可以提供特定主题的配置。否则,自动创建主题将使用默认配置或者server.properties文件中的配置
前面命令中的大多数配置标记的含义都显而易见,但还是让我们快速了解一下其中的两个配置。
replication-factor
——此标记确定领导者代理在集群中分发消息的副本数。在这种情况下,如果副本因子为1
,那么就不会复制,Kafka中保存的仅是原始消息。副本因子为1
对于快速演示或者原型是可以的,但在实践中,几乎总是希望副本因子为2
或3,
以便在服务器发生故障时保证数据可用性。partitions
——此标记用于指定主题将用到的分区数。同样,这里只有一个分区是可以的,但是如果想要更高的负载,当然就需要更多的分区。确定合适的分区数不是一门精确的科学[18]。
3.发送一条消息
在Kafka中发送消息通常需要编写一个生产者客户端,但Kafka也自带了一个名为kafka- console-producer
的方便脚本,允许从终端窗口发送消息。在这个例子中我们将使用控制台生产者,但是在2.4.1节中,我们已经介绍了如何使用KafkaProducer
。
运行以下命令(图2-20中展示的也是)发送第一条消息:
配置控制台生产者有几个选项,但这里我们仅使用必需的配置:消息送达的主题以及连接到Kafka的一个Kafka代理列表(对于本例,只是本地一台机器)。
启动控制台生产者是一个“阻塞脚本”,因此在执行前面的命令之后,输入一些文本并按回车键。可以发送你想要发送的任何数量的消息。但本例为了演示,可以输入一条消息“the quick brown fox jumped over the lazy dog.”,并按回车键,然后按Ctrl+C让生产者退出。
图2-20 控制台生产者是用来快速测试配置和确保端到端功能的一个很好工具
4.读取一条消息
Kafka也提供了一个控制台消费者用来从命令行读取消息。控制台消费者类似于控制台生产者:一旦启动,将持续从主题中读取消息直到脚本被终止(通过Ctrl+C)。
运行以下命令,启动控制台消费者:
在启动控制台消费者之后,在终端控制台可以看到与图2-21所示类似的信息。
图2-21 控制台消费者是一个方便的工具,可以快速地感知数据
是否正在流动以及消息是否包含预期的信息
--from-beginning
参数指定将会收到来自那个主题的任何未被删除的消息。控制台消费者还没有提交偏移量,因此若没有设置--from-beginning
,那么只会收到控制台消费者启动之后所发送的消息。
我们已完成了Kafka的旋风之旅,并生产和消费了第一条消息。如果你还没有阅读本章第一部分,现在是时候回到本章起始处去学习Kafka工作原理的细节。
7 小结
Kafka是一个消息代理,它接收消息并以一种简单快速的方式存储它们,以响应消费者的请求。消息从不会推送到消费者,Kafka中的消息保留完全独立于消息被消费的时间和频率。
Kafka使用分区来实现高吞吐量,并提供一种按键分组并保证相同键的消息有序的方法。
生产者用来向Kafka发送消息。
空键意味着以轮询的方式分配分区,否则,生产者使用键的散列值与分区总数取模的方法分配分区。
消费者用来从Kafka读取消息。
尝试均匀地将主题分区分配给一个消费者组中的消费者。
[1] Jay Kreps, “The Log: What Every Software Engineer Should Know About Real-time Data’s Unifying Abstraction”(日志:每个软件工程师都应该知道实时数据的统一抽象)。
[2] 这里说的不同的分区器,是指不使用默认分区器,这里指定自定义分区器来覆盖默认分区器。 ——译者注
[3] 代理1是指代理服务器对应的broker.id为1,分区0表示分区编号为0。 ——译者注
[4] 这里的级别是指分区是领导者分区还是追随者分区。——译者注
[5] 代理是动态的是指根据代理的存活情况动态地将代理从ISR集合中移除或将代理加入ISR集合中。——译者注
[6] Kafka官方文档“Replicated Logs: Quorums, ISRs, and State Machines (Oh my!)”。
[7] 复制级别也就是我们通常说的副本数。——译者注
[8] Kafka官方文档“Replication”。
[9] 本节的一些信息来自Gwen Shapira在Qurora上的回答:“What is the actual role of ZooKeeper in Kafka? What benefits will I miss out on if I don’t use ZooKeeper and Kafka together?”。(ZooKeeper在Kafka中的实际角色是什么?如果我们不将ZooKeeper和Kafka一起使用会错失哪些好处?)
[10] Kafka总是将消息追加到活跃日志段的末尾。——译者注
[11] Kafka官方文档“Broker Configs”。
[12] 独立的记录是指若消息有键时,各消息的键都不相同。——译者注
[13] Kafka官方文档“Log Compaction”。
[14] 由于采用删除策略,位于被删除日志段中的数据被删除了,因此在重启后这些数据就丢失了,所以说在启动后就得不到所有的记录。——译者注
[15] 数据库中存在该键对应的记录时就做更新,否则就在数据库中插入一条记录。——译者注
[16] 主题级别的配置将会覆盖代理级别的配置。 ——译者注
[17] 这个平衡点是指同一个消费组下的消费者已将主题分区分配完毕。——译者注
[18] 我们并不能给出一个确切的分区数,这要根据实际应用场景。 ——译者注
本文摘自即将上架的 《Kafka Streams实战》
作者:[美] 小威廉 · P. 贝杰克(William P. Bejeck Jr.)
译者:牟大恩本书教读者在Kafka平台上实现流式处理。在这本易于理解的书中,读者将通过实际的例子来收集、转换和聚合数据、使用多个处理器处理实时事件,甚至可以使用KSQL深入研究流式SQL。本书最后以Kafka Streams应用程序的测试和运维方面的内容(如监控和调试)结束
本文转载自异步社区
原文链接:https://www.epubit.com/articleDetails?id=N2b4ed981-113d-433e-ada6-f64982cb84f0
- 点赞
- 收藏
- 关注作者
评论(0)