开源Flink Kafka Connector源码学习
简介
Flink版本:1.15
组件:Kafka Connector
本文通过阅读Kafka Connector代码的调用,分析了Kafka partition分配、Commit、线程模型等
源码阅读
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase#open
--> subscribedPartitionsToStartOffsets = new HashMap<>() // 重要的参数
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer#discoverPartitions
--> List<KafkaTopicPartition> newDiscoveredPartitions = getAllPartitionsForTopics(topics) // 获取topic的所有partition
--> kafkaConsumer.partitionsFor(topic)
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer#setAndCheckDiscoveredPartition // 根据并发度、subTaskIndex过滤掉partition
--> KafkaTopicPartitionAssigner.assign(partition, numParallelSubtasks) == indexOfThisSubtask
int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks;
return (startIndex + partition.getPartition()) % numParallelSubtasks
>> 返回剩下的partitions
>> if RestoredState != null
--> 遍历partitions,如果partition不在restoredState中,则在restoredState中添加该partition,offset设置为KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET
>> 遍历restoredState,如果partition符合KafkaTopicPartitionAssigner.assign(partition, numParallelSubtasks) == indexOfThisSubtask,则subscribedPartitionsToStartOffsets设置该partition和offset
>> 如果filterRestoredPartitionsWithCurrentTopicsDescriptor=true,则从过滤掉subscribedPartitionsToStartOffsets中与此次作业topic设置不匹配的topic对应的partition
|| >> if RestoredState == null
--> 如果startupMode 是SPECIFIC_OFFSETS或TIMESTAMP,则设置;其他partition的offset设置为startupMode.getStateSentinel()
--> 如果是SPECIFIC_OFFSETS,则遍历subscribedPartitionsToStartOffsets的partition的offset,如果partition和配置的specificStartupOffsets能匹配,则设置该offset,否则offset设置为KafkaTopicPartitionStateSentinel.GROUP_OFFSET
>> org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema#open
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase#run
--> this.offsetCommitCallback = new KafkaCommitCallback
>> 如果subscribedPartitionsToStartOffsets.isEmpty(),则将此source设为IDLE状态
>> this.kafkaFetcher = createFetcher
--> adjustAutoCommitConfig
// 如果offset commit mode为ON_CHECKPOINTS,则设置关闭kafka auto commit
--> properties.setProperty("enable.auto.commit", "false")
>> new KafkaFetcher
--> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher#AbstractFetcher
--> this.checkpointLock = sourceContext.getCheckpointLock();
>> this.unassignedPartitionsQueue = new ClosableBlockingQueue<>();
>>
for (KafkaTopicPartitionState<T, KPH> partition : subscribedPartitionStates) {
unassignedPartitionsQueue.add(partition);
}
>> this.subscribedPartitionStates =
createPartitionStateHolders(
seedPartitionsWithInitialOffsets,
timestampWatermarkMode,
watermarkStrategy,
userCodeClassLoader)
--> partitionStates = new CopyOnWriteArrayList<>()
>> this.handover = new Handover() // 用于存储从kafka消费的数据,
>> this.consumerThread = new KafkaConsumerThread // kafka数据消费线程
--> this.nextOffsetsToCommit = new AtomicReference<>()
>> this.kafkaCollector = new KafkaCollector()
>> kafkaFetcher.runFetchLoop
--> consumerThread.start() // 启动kafka消费线程
--> consuemrThread.run()
--> this.consumer = getConsumer(kafkaProperties) // 初始化KafkaConsumer
>> while (running) 循环从kafka拉数据
>> if !commitInProgress
>> commitOffsetsAndCallback = nextOffsetsToCommit.getAndSet(null); // 第一次时返回null,只有在checkpoint的时候org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase#notifyCheckpointComplete设置
>> if commitOffsetsAndCallback != null
>> commitInProgress = true; // 表示正在commit中
>> consumer.commitAsync( // 运行commit
commitOffsetsAndCallback.f0,
new CommitCallback(commitOffsetsAndCallback.f1)
>> if hasAssignedPartitions // 第一次为false
>> newPartitions = unassignedPartitionsQueue.pollBatch();
else
>> newPartitions = unassignedPartitionsQueue.getBatchBlocking() // 第一次执行这里
>> if newPartitions != null
>> org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread#reassignPartitions // 关键操作
--> hasAssignedPartitions = true;
>> oldPartition + newPartitions再reassign一下
>> for newPartitions // 遍历newPartitons
>> 如果partition offset为KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET
>> org.apache.kafka.clients.consumer.KafkaConsumer#seekToBeginning
>> 如果partition offset为KafkaTopicPartitionStateSentinel.LATEST_OFFSET
>> org.apache.kafka.clients.consumer.KafkaConsumer#seekToEnd
>> 如果partition offset为KafkaTopicPartitionStateSentinel.GROUP_OFFSET
>> 直接newPartitionState.setOffset(
>> if records == null
>> records = consumer.poll(pollTimeout); // 从kafka拉取数据
>> handover.produce(records);
org.apache.flink.streaming.connectors.kafka.internals.Handover#produce
--> synchronized (lock)
>> next = element;
>> while(running) // 循环处理consumerThread从kafka消费的数据
>> records = handover.pollNext()
--> synchronized (lock)
>> n = next; next = null; return n;
// 遍历subscribedPartitionsToStartOffsets针对指定的partition,调用org.apache.kafka.clients.consumer.ConsumerRecords#records(org.apache.kafka.common.TopicPartition)获取指定partitoin的records
// 然后org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher#partitionConsumerRecordsHandler(partitionRecords, partition)处理partition数据
--> 遍历partitionRecords
>> deserializer.deserialize(record, kafkaCollector) // 反序列化record,并设置到kafkaCollector
// org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema#deserialize(org.apache.kafka.clients.consumer.ConsumerRecord<byte[],byte[]>, org.apache.flink.util.Collector<T>)
>> emitRecordsWithTimestamps(kafkaCollector.getRecords(), partition, record.offset(), record.timestamp()) // 发送到下游
// org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher#emitRecordsWithTimestamps
--> synchronized (checkpointLock)
>> 循环从records.poll()取出数据
>> org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState#extractTimestamp 抽取kafka event或者自定一个timeassigner
>> org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext#collectWithTimestamp // 发送到下游
- 点赞
- 收藏
- 关注作者
评论(0)