Kafka的消息生产和消费

举报
yd_56919505 发表于 2022/12/30 11:40:39 2022/12/30
【摘要】 1、消息发送的可靠性机制生产者向kafka发送消息时,可以选择需要的可靠性级别。通过acks参数的值进行设置。(1)0值异步发送 生产者向kafka发送消息而不需要kafka反馈成功ack。该方式效率最高,但可靠性最低。其可能会存在消息丢失的情况。 l 在发送之前丢失:生产者首先是将消息写入到buffer中的。当buffer满足消息发送条件(buffer容量、消息数量、时间)时,在buffe...

1、消息发送的可靠性机制

生产者向kafka发送消息时,可以选择需要的可靠性级别。通过acks参数的值进行设置。

(1)0值

异步发送 生产者向kafka发送消息而不需要kafka反馈成功ack。该方式效率最高,但可靠性最低。其可能会存在消息丢失的情况。

l 在发送之前丢失:生产者首先是将消息写入到buffer中的。当buffer满足消息发送条件(buffer容量、消息数量、时间)时,在buffer正要发送但还没有发送时又有消息来了,此时该消息就可能会丢失。

l 在传输过程中丢失:生产者只管发送,根本就不知道broker是否接收到了消息。

21

同步发送,默认值。生产者发送消息给kafka,brokerpartition leader在收到消息后马上发送成功ack(无需等待ISR中的follower同步完成),生产者收到后知道消息发送成功,然后会再发送消息。如果一直未收到kafka的ack,则生产者会认为消息发送失败,会重发消息。

该方式能否保证消息发送成功?不能。

该方式能否使生产者确定消息发送失败?能。

该方式不能保证消息不丢失。若leader收到消息后,在ISR中的follower还没有同步时,leader宕机,此时由于leader在收到消息后其会立即向producer发送ACK,所以对于producer来说,消息已经发送成功了。所以producer不会重新发送该消息。那么该消息就丢失了。

3-1

同步发送。其值等同于all。生产者发送消息给kafka,kafka收到消息后要等到ISR列表中的所有副本都同步消息完成后,才向生产者发送成功ack。如果一直未收到kafka的ack,则认为消息发送失败,会自动重发消息。 该方式的可靠性最高,很少会出现消息丢失。但其效率是最低的。该方式存在一个问题:可能会出现部分follower重复接收消息。producer发送消息给leader,然后follower要同步消息。当follower同步还未完成时leader挂了。此时broker不会发送ack给producer。由于producer没有收到ack,所以其认为上次发送失败了,其会重新再发送一次。那此时新的leader(原来已经同步了一部分的follower)会再次接收这些消息。而这些消息中就存在重复接收的内容。

2、消息写入算法

消息生产者将消息发送给broker,并形成最终的可供消费者消费的log,是一个比较复杂的过程。

  1. producerbroker集群提交连接请求,其所连接上的任意broker都会向其发送broker controller的通信URL,即broker controller主机配置文件中的listeners地址
  2. producer指定了要生产消息的topic后,其会向broker controller发送请求,请求当前topic中所有partitionleader列表地址
  3. broker controller在接收到请求后,会从zk中查找到指定topic的所有partitionleader,并返回给producer
  4. producer在接收到leader列表地址后,根据消息路由策略找到当前要发送消息所要发送的partition leader,然后将消息发送给该leader
  5. leader将消息写入本地log,并通知ISR中的followers
  6. ISR中的followersleader中同步消息后向leader发送ACK
  7. leader收到所有ISR中的followersACK后,增加HW,表示消费者已经可以消费到该位置了
  8. Leader在等待的followerACK超时了,发现还有follower没有发送ACK,则会将没有发送ACKfollowerISR列表中清除(放到了OSR),然后再增加HW

3、 消息消费算法

生产者将消息发送到topic中,消费者即可对其进行消费,其消费过程如下:

1、consumer向broker集群提交连接请求,其所连接上的任意broker都会向其发送broker controller的通信URL,即broker controller主机配置文件中的listeners地址

2、当consumer指定了要消费的topic后,其会向broker controller发送poll请求

3、broker controller会为consumer分配一个或几个partition leader,并将该partitioin的当前offset发送给consumer

4、er会按照broker controller分配的partition对其中的消息进行消费

5、消费者者消费完该条消息后,消费者会向broker发送一个该消息已被消费的反馈,即该消息的offset

6、当broker接到消费者的offset后,会更新到相应的__consumer_offset中

7、以上过程一直重复,直到消费者停止请求消息

8、消费者可以重置offset,从而可以灵活消费存储在broker上的消息

【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。