kafka生产者消息分区机制原理剖析

举报
Rolle 发表于 2023/11/30 17:32:46 2023/11/30
【摘要】 分区的作用就是提供负载均衡的能力,或者说对数据进行分区的主要原因,就是为了实现系统的高伸缩性(Scalability)。不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的读写请求处理。并且,还可以通过添加新的节点机器来增加整体系统的吞吐量。​ 分区策略分区策略是决定生产者将消息发送到哪个分区的算法 轮询策略轮询策...

分区的作用就是提供负载均衡的能力,或者说对数据进行分区的主要原因,就是为了实现系统的高伸缩性(Scalability)。不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的读写请求处理。并且,还可以通过添加新的节点机器来增加整体系统的吞吐量。

分区策略

分区策略是决定生产者将消息发送到哪个分区的算法

轮询策略

轮询策略 是生产者 API 默认提供的分区策略(一个主题下有 3 个分区,那么第一条消息被发送到分区 0,第二条被发送到分区 1,第三条被发送到分区 2)

轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略,也是我们最常用的分区策略之一。

随机策略

指定key 策略

Kafka 允许为每条消息定义消息键,简称为 Key 一旦消息被定义了 Key,那么你就可以保证同一个 Key 的所有消息都进入到相同的分区里面

Producer发送消息的时候可以直接指定key,比如producer.send(new ProducerRecord("my-topic", "key", "value"));

一个生产者,发两次消息,但是网络原因,消息到达的顺序和消息发送的顺序不一致

  • 防止乱序可以通过设置max.in.flight.requests.per.connection=1来保证

默认分区规则:

  1. 如果指定了分区编号,用它
  2. 如果没有指定分区号,但指定了key,按照hash计算分区号
  3. 既没有分区号,也没有key,用 round-robin (轮询)

默认分区存在问题:

  1. 通过key的hash计算分区号,存在hash冲突的可能
  2. 如果后期增加分区,散列计算分区号,相同key将会落到和之前不一样的分区。

Producer在发送生产出的数据给Broker时,可以选择三种模式,称为acks,它是Acknowledgment的缩写。意思是Broker对Producer即将发送来的数据采用何种确认方式。

  • acks=0
    • 在该模式下,Producer不会等待Broker的确认反馈,即不关心Broker是否正确的将发送来的数据持久化,所以在这种模式下,很有可能会丢失数据。因为如果Broker挂了,Producer不会被通知到,所以还会不停的发送数据导致数据丢失。在对数据完整性需求不强烈的场景下,这种模式可以提高性能。
  • acks=1
    • 默认采用的模式,该模式下Producer会等待Leader Broker的确认反馈,当Broker确实将数据持久化到至少一个Partition中后,给予Producer确认反馈,Producer才会继续发送数据。该模式下有几点需要注意:
      • 不保证Replicas也持久化了数据。
      • 当Producer没有收到Broker的确认反馈时,Producer会尝试重新发送数据。
      • 当Leader Broker挂了,但是Replicas又没有持久化数据时,还是会丢失数据。
      • 该模式只能说是可以有效防止数据丢失。
  • acks=all
    • 该模式下,Producer同样需要等待Broker的确认,但是确认更为严格,需要所有的Partition(Leader + Replicas)都持久化数据后才返回确认信息。这种模式下,只要Replicas足够多,数据基本不会丢失。
    • 在该模式下,还有一个重要的参数min.insync.replicas需要配置。该参数的意思是当acks=all时,至少有多少个Replicas需要确认已成功持久化数据,这个Replicas数量也包括Leader。

Retry

有时候Producer发送Message失败可能并不是因为Broker挂了,可能是因为网络问题,没有连接到Broker等等。这种问题可能在很短暂的时间内就会自动修复,那么在这种情况下,我们希望Producer在发送失败后能重新尝试发送。这里就需要设置retries这个参数,意思就是重试的次数,默认是0次,可以根据实际业务情况设置。

但是当设置了retries参数大于0后,有可能会带来新的问题。假如我们需要相同Key的Message进入特定的Partition,并且是要严格按照Producer生产Message的顺序排序。那么此时如果第一条Message发送失败,第二条Message发送成功了,第一条通过重试发送成功了,那Message的顺序就发生了变化。

这里又会引出一个参数max.in.flight.requests.per.connection,这个参数默认是5,意思是在被Broker阻止前,未通过acks确认的发送请求最大数,也就是在Broker处排队等待acks确认的Message数量。所以刚才那个场景,第一条和第二条Message都在Broker那排队等待确认放行,这时第一条失败了,等重试的第一条Message再来排队时,第二条早都通过进去了,所以排序就乱了。

如果想在设置了retries还要严格控制Message顺序,可以把max.in.flight.requests.per.connection设置为1。让Broker处永远只有一条Message在排队,就可以严格控制顺序了。但是这样做会严重影响性能(接收Message的吞吐量)。

Kafka在0.11版本之后,就为我们提供了定义幂等Producer的能力

  • retries=Integer.MAX_VALUE
  • max.in.flight.requests.per.connection=1 (Kafka >= v0.11 & < v1.1)
  • max.in.flight.requests.per.connection=5 (Kafka >= v1.1)
  • acks=all

Message Batch

max.in.flight.requests.per.connection参数,默认会在Broker那排队5条Message,那么如果第六条来了怎么办呢?这时候Kafka会自动开启批量处理Message的模式,将这6条Message作为一个批次进行处理。这一个批次可以看作是一次Message处理请求。

开启批量模式后,会引出两个参数:

  • linger.ms:每次批量处理的间隔时间。如果设为5,那么就是每5毫秒对Message进行一次批量处理。
  • batch.size:每个批次的最大字节数,默认是16KB,可以设置为32KB或者64KB,可以提高性能。

Producer Buffer

在大多数情况下,Consumer消费Message的速率是远不如Producer生产Message的速率的。所以Producer有一个缓存机制,将Broker还没来得及接收的Message缓存在内存中。缓存的大小可以通过buffer.memory配置,默认大小是32MB。默认存储时间为7天,这个时间可以通过设置Broker的offset.retention.minutes属性改变。

如果Producer的缓存被打满后,Producer会被阻塞,阻塞的最大时间可以通过max.block.ms配置,默认大小是60秒。

概括一下,就是当Producer生产Message的速率大于Broker接收Message(Consumer消费数据)的速率时,Producer会把Broker还没来得及接收的Message存在缓存里(内存),当存满设置的缓存大小后,Producer将不再发送Message给Broker,也就是进入阻塞状态,如果在设置的阻塞时间内,缓存还没有被释放出有用空间,那么Producer将抛出异常。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。