DRS数据同步到KAFKA原理

举报
DRS技术快客 发表于 2021/07/31 10:04:57 2021/07/31
【摘要】 ##一、kafka简单介绍kafka大家的第一印象是一个消息系统。但是kafka的官网的说法是: Apache Kafka® is a distributed streaming platform kafka是一个分布式流处理平台,而流处理平台主要具备以下三种能力:1.发布和订阅消息流,类似于消息队列或企业消息传递系统。2.以容错的持久化方式储存消息流。3.可以在产生消息流的时候,同时进行处...

##一、kafka简单介绍
kafka大家的第一印象是一个消息系统。但是kafka的官网的说法是:

Apache Kafka® is a distributed streaming platform

kafka是一个分布式流处理平台,而流处理平台主要具备以下三种能力:

1.发布和订阅消息流,类似于消息队列或企业消息传递系统。
2.以容错的持久化方式储存消息流。
3.可以在产生消息流的时候,同时进行处理。

而kafka具备以下几个特性:

1.kafka作为一个集群可以运行在一个或者多个服务器上,这些服务器可以跨多个数据中心。
2.kafka集群存储的消息是按照topic(主题)进行分类的。
3.每个消息(也被称为记录)是由一个key,一个value和一个时间戳构成。

kafka对外提供了四种核心API:

1.Producer API,允许应用程序发布消息到kafka集群上的1个或多个的topic。
2.Consumer API,允许应用程序订阅一个或多个topic,并处理这些topic的消息。
3.Streams API,允许应用程序充当一个流处理器,从1个或多个topic消费输入流,并产生一个输出流到1个或多个输出topic,有效地将输入流转换到输出流。
4.Connector API,允许构建运行可重复使用的生产者或消费者,将topic和现有的应用程序或数据系统连接起来。例如,一个关系型数据库的连接器可以捕获到该库下每一个表的变化。

##二、Producer发送数据到kafka的流程
DRS同步到kafka:DRS作为kafka的客户端,利用kafka的Producer API;将源端数据库产生的增量数据写入到目标kafka的topic上。

以mysql到kafka为例,大致流程入下:

1.将源端mysql的binlog日志记录的增量数据作为消息封装成一个Record。
2.经过拦截器,对消息进行过滤。
3.经过序列化器,将消息的key和value进行序列化,当然可以自行定义序列化规则或者自行编写序列化器。
4.消息经过分区器,确定这条消息需要发送到目标topic的分区号。如果在消息里面指定了partion字段,那么就是将消息发送到指定分区。
5.之后消息会封装从成一个一个批次汇总到RecordAccumulator。accumulator可以作为一个缓存,是kafka强大的写入性能原因之一。
6.之后会依赖一个后台唤醒的Sender线程,将数据有序的发送到leader partition所在的broker(kafka集群的每一个服务器都是一个broker)中。
7.在发送消息的过程中,kafka客户端可以从任意一个broker获取到kafka集群的metadata信息,metadata信息里面记录了kafka集群的每个topic的所有partition的信息: leader, fellow, isr, replicas等。
整体的流程如下图所示

三、Producer两个重要参数

1.acks决定了生产者如何在性能与数据可靠之间做取舍,官方源码中描述如下:

public static final String ACKS_CONFIG = "acks";
private static final String ACKS_DOC = "The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the "
                                           + " durability of records that are sent. The following settings are allowed: "
                                           + " <ul>"
                                           + " <li><code>acks=0</code> If set to zero then the producer will not wait for any acknowledgment from the"
                                           + " server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be"
                                           + " made that the server has received the record in this case, and the <code>retries</code> configuration will not"
                                           + " take effect (as the client won't generally know of any failures). The offset given back for each record will"
                                           + " always be set to <code>-1</code>."
                                           + " <li><code>acks=1</code> This will mean the leader will write the record to its local log but will respond"
                                           + " without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after"
                                           + " acknowledging the record but before the followers have replicated it then the record will be lost."
                                           + " <li><code>acks=all</code> This means the leader will wait for the full set of in-sync replicas to"
                                           + " acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica"
                                           + " remains alive. This is the strongest available guarantee. This is equivalent to the acks=-1 setting."
                                           + "</ul>";

对于kafka来说,消息日志是按照topic分类存储的,而对于一个topic来说有partitons分区数,replication-factor副本数。
对于一个topic而言有多个分区,一个分又可以有多个副本。这些副本中,只有一个leader partition。其他都是follower partiton,仅有leader partition可以对外提供服务,follower partiton主要用于冗余备份。
而副本是存放在不同的broker上面的,因此在创建topic的时候,副本数不能大于broker的节点数的。
而acks参数呢,就是和副本有关系。

acks=0:这意味着producer发送数据后,不会等待broker确认,直接发送下一条数据,性能最好
acks=1:为1意味着producer发送数据后,需要等待leader副本确认接收后,才会发送下一条数据,性能次之
acks=-1/all:这个代表的是all,意味着发送的消息写入leader partition后,等到follower从leader拉取到消息后,才会发送下一条数据,性能最差,但可靠性最强

而DRS以可靠性优先,因此设置的acks参数值为all,确保消息写入到所有可用副本后,才进行下一条写入。

2.max.in.flight.requests.per.connection,官方源码描述如下:

public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = "max.in.flight.requests.per.connection";
private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = "The maximum number of unacknowledged requests the client will send on a single connection before blocking."
                                                                            + " Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of"
                                                                            + " message re-ordering due to retries (i.e., if retries are enabled).";
// 在InFlightRequests.java中
/**
  * Can we send more requests to this node?
  *
  * @param node Node in question
  * @return true iff we have no requests still being sent to the given node
  */
public boolean canSendMore(String node) {
      Deque<NetworkClient.InFlightRequest> queue = requests.get(node);
      return queue == null || queue.isEmpty() ||
             (queue.peekFirst().send.completed() && queue.size() < this.maxInFlightRequestsPerConnection);
}
// 在Sender.java中
if (guaranteeMessageOrder) {
    // Mute all the partitions drained
    for (List<ProducerBatch> batchList : batches.values()) {
        for (ProducerBatch batch : batchList)
            this.accumulator.mutePartition(batch.topicPartition);
    }
}

max.in.flight.requests.per.connection表示在单个连接中,最多可以忍受多少个请求处于发送中没有没有响应。kafka源码中这个参数默认是5,可以认为,在一个连接中有5个请求发送出去了,并且Producer都没有收到broker的响应。
如果这个参数大于1,由于有重试机制,可能会存在消息顺序错乱的风险。

如下图,在一个网络连接中将batch封装成不同的request,从batch队列中取出数据,按照顺序封装成不同的request(请求1… 请求5).

如果broker在处理请求2时因为borker节点不可以等因素导致写消息到partition异常了,但是其它请求的数据都正常写入了。此时由于重试机制,Producer会将请求2重新发送。
导致broker写入到leader partition消息顺序错乱。

而DRS为了保证数据写入到kafka是有序的,max.in.flight.requests.per.connection参数设置为1,但是这样降低了kafka的吞吐量。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

举报
请填写举报理由
0/200