开源Flink Kafka Connector源码学习

举报
想要一只猫 发表于 2023/02/14 11:25:08 2023/02/14
【摘要】 本文通过阅读Kafka Connector代码的调用,分析了Kafka partition分配、Commit、线程模型等

简介

Flink版本:1.15

组件:Kafka Connector

本文通过阅读Kafka Connector代码的调用,分析了Kafka partition分配、Commit、线程模型等

源码阅读

org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase#open

        --> subscribedPartitionsToStartOffsets = new HashMap<>() // 重要的参数

        >> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer#discoverPartitions 

                --> List<KafkaTopicPartition> newDiscoveredPartitions = getAllPartitionsForTopics(topics) // 获取topic的所有partition

                        --> kafkaConsumer.partitionsFor(topic)

                        >> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer#setAndCheckDiscoveredPartition // 根据并发度、subTaskIndex过滤掉partition

                                --> KafkaTopicPartitionAssigner.assign(partition, numParallelSubtasks) == indexOfThisSubtask

                                int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks;

                                return (startIndex + partition.getPartition()) % numParallelSubtasks

 

                        >> 返回剩下的partitions

        >> if RestoredState != null

                --> 遍历partitions,如果partition不在restoredState中,则在restoredState中添加该partition,offset设置为KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET

                >>  遍历restoredState,如果partition符合KafkaTopicPartitionAssigner.assign(partition, numParallelSubtasks) == indexOfThisSubtask,则subscribedPartitionsToStartOffsets设置该partition和offset

                >>  如果filterRestoredPartitionsWithCurrentTopicsDescriptor=true,则从过滤掉subscribedPartitionsToStartOffsets中与此次作业topic设置不匹配的topic对应的partition

 

        || >> if RestoredState == null

                --> 如果startupMode 是SPECIFIC_OFFSETS或TIMESTAMP,则设置;其他partition的offset设置为startupMode.getStateSentinel()

                        --> 如果是SPECIFIC_OFFSETS,则遍历subscribedPartitionsToStartOffsets的partition的offset,如果partition和配置的specificStartupOffsets能匹配,则设置该offset,否则offset设置为KafkaTopicPartitionStateSentinel.GROUP_OFFSET

 

                >>  org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema#open

 

org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase#run

        -->  this.offsetCommitCallback = new KafkaCommitCallback

        >>  如果subscribedPartitionsToStartOffsets.isEmpty(),则将此source设为IDLE状态

        >>  this.kafkaFetcher = createFetcher

                --> adjustAutoCommitConfig

                     // 如果offset commit mode为ON_CHECKPOINTS,则设置关闭kafka auto commit

                     --> properties.setProperty("enable.auto.commit", "false")

                >> new KafkaFetcher

                        --> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher#AbstractFetcher

                                --> this.checkpointLock = sourceContext.getCheckpointLock();

                                >> this.unassignedPartitionsQueue = new ClosableBlockingQueue<>();

                                >> 

                                for (KafkaTopicPartitionState<T, KPH> partition : subscribedPartitionStates) {

                                            unassignedPartitionsQueue.add(partition);

                                        }

                                >> this.subscribedPartitionStates =

                                                createPartitionStateHolders(

                                                        seedPartitionsWithInitialOffsets,

                                                        timestampWatermarkMode,

                                                        watermarkStrategy,

                                                        userCodeClassLoader)

                                        --> partitionStates = new CopyOnWriteArrayList<>()

                        >> this.handover = new Handover() // 用于存储从kafka消费的数据,

                        >> this.consumerThread = new KafkaConsumerThread  // kafka数据消费线程

                                --> this.nextOffsetsToCommit = new AtomicReference<>()

                                >> this.kafkaCollector = new KafkaCollector()

        >> kafkaFetcher.runFetchLoop

                --> consumerThread.start() // 启动kafka消费线程

                        --> consuemrThread.run()

                                --> this.consumer = getConsumer(kafkaProperties) // 初始化KafkaConsumer

                                >>  while (running) 循环从kafka拉数据

                                >> if !commitInProgress 

                                        >> commitOffsetsAndCallback = nextOffsetsToCommit.getAndSet(null); // 第一次时返回null,只有在checkpoint的时候org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase#notifyCheckpointComplete设置

                                        >> if commitOffsetsAndCallback != null

                                        >> commitInProgress = true; // 表示正在commit中

                                                >> consumer.commitAsync(  // 运行commit

                                                                                                commitOffsetsAndCallback.f0,

                                                                                                new CommitCallback(commitOffsetsAndCallback.f1)

                                >> if hasAssignedPartitions // 第一次为false

                                        >> newPartitions = unassignedPartitionsQueue.pollBatch();

                                else

                                        >> newPartitions = unassignedPartitionsQueue.getBatchBlocking() // 第一次执行这里

 

                                >> if newPartitions != null  

                                        >> org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread#reassignPartitions // 关键操作

                                                --> hasAssignedPartitions = true;

                                                >> oldPartition + newPartitions再reassign一下

                                                >> for newPartitions // 遍历newPartitons

                                                >> 如果partition offset为KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET

                                                        >> org.apache.kafka.clients.consumer.KafkaConsumer#seekToBeginning

                                                >> 如果partition offset为KafkaTopicPartitionStateSentinel.LATEST_OFFSET

                                                        >> org.apache.kafka.clients.consumer.KafkaConsumer#seekToEnd

                                                >> 如果partition offset为KafkaTopicPartitionStateSentinel.GROUP_OFFSET

                                                        >> 直接newPartitionState.setOffset(

                                >> if records == null

                                        >> records = consumer.poll(pollTimeout); // 从kafka拉取数据

                                        >> handover.produce(records);

                                                org.apache.flink.streaming.connectors.kafka.internals.Handover#produce

                                                --> synchronized (lock)

                                                >> next = element;

 

                >>  while(running) // 循环处理consumerThread从kafka消费的数据

                        >> records = handover.pollNext()

                                --> synchronized (lock)

                                >> n = next; next = null; return n;

                        // 遍历subscribedPartitionsToStartOffsets针对指定的partition,调用org.apache.kafka.clients.consumer.ConsumerRecords#records(org.apache.kafka.common.TopicPartition)获取指定partitoin的records

                        // 然后org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher#partitionConsumerRecordsHandler(partitionRecords, partition)处理partition数据

                                --> 遍历partitionRecords

                                >> deserializer.deserialize(record, kafkaCollector) // 反序列化record,并设置到kafkaCollector

                                 // org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema#deserialize(org.apache.kafka.clients.consumer.ConsumerRecord<byte[],byte[]>, org.apache.flink.util.Collector<T>) 

                                >> emitRecordsWithTimestamps(kafkaCollector.getRecords(), partition, record.offset(), record.timestamp()) // 发送到下游

                                        // org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher#emitRecordsWithTimestamps

                                        --> synchronized (checkpointLock)

                                        >>  循环从records.poll()取出数据

                                        >>  org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState#extractTimestamp 抽取kafka event或者自定一个timeassigner

                                        >>  org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext#collectWithTimestamp // 发送到下游

 

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。