Flink 操作 kafka (二)
theme: condensed-night-purple
携手创作,共同成长!这是我参与「掘金日新计划 · 8 月更文挑战」的第30天,点击查看活动详情
Sink Connector
Flink 提供为 Kafka 0.8 版本后所有 Kafka 版本的 sink connectors。
// 设定数据流
DataStream[String] stream = environment.fromElements("1", "2", "3", "4", "5", "6");
// 设置生产者并添加到 sink
FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<String>(
"localhost:9092",
"topic",
new SimpleStringSchema)
stream.addSink(myProducer)Copy to clipboardErrorCopied
初始化 FlinkKafkaProducer 参数
- broker 列表,要发往的 brokers , 用逗号分割。
- topic 名字,用来指定生产一个或者多个 topic 的数据,也可以是正则表达式。
- 序列化器(schema) ,对消费数据进行序列化,将目标类型转换成字节数组。
序列化器类比于反序列化器实现:
- 转化为 String 类型
SimpleStringSchema
- 转化为其它类型
TypeInformationSerializationSchema<T>
- 转化为键值对类型
TypeInformationKeyValueSerializationSchema<K, V>
- 转化为 JSON 类型
JSONKeyValueDeserializationSchema
Kakfa 容错机制
在 Kafka 0.9 之前不提供任何机制去保证 at-least-once 或 exactly-once 的语义。 但后续版本的 Kafka 可以通过以下方式来实现出错后恢复且不丢失数据:
- 启用 Checkpoint
在默认启用 Checkpoint 的状况下, FlinkKafkaConsumer 将消费来自 Topic 的记录,并以一致的方式周期性地 Checkpoint 其所有 Kafka Offset 以及其它操作的状态。万一作业失败,Flink 将把流失程序恢复到最新 Checkpoint的状态,并且重新消费 Kafka 中的数据。这确保了在 Kafka Broker 中所提交的 Offset 和 Checkpointed State 中的 Offset 是一致的。此时 FlinkKafkaProducer 可以提供 exactly-once 的投递语义。
如果 Checkpointing 没有启用,KafkaFlinkConsumer 将会周期性的提交 Offset 到 Zookeeper 中去。
- 配置 Semantic 参数
除了启用 Flink 的 Checkpointing,还可以通过传递恰当的 semantic 参数给 FlinkKafkaProducer 选择 3 种不同的操作模式:
emantic.None
: Flink 什么也不会保证,所产生的记录可能会被丢失或者重复。Semantic.AT_LEASET_ONCE
(默认): Flink 保证 at-least-once ,没有记录会被丢失,但可能会重复。Semantic.EXACTLY_ONCE
: 使用 Kafka 的事务机制来保证 exactly-once。
Semantic.EXACTLY_ONCE 模式依赖于提交事务的能力,这些事务是在 taking a checkpoint 之前,从该 Checkpoint 恢复之后启动的。如果 Flink 应用崩溃且完成重启的时间比 Kafka 事务超时的时间大,则数据将会丢失(Kafka 将自动的终止超过超时时间的事务)。请务必根据预期的故障时间来配置你的事务超时。
kafka 分区发现
FlinkKafkaConsumer 支持发现动态创建的 Kafka Partition,并且以 exactly-once 语义保证来消费其中的数据。默认情况下分区发现是禁用的,要启用该特性在提供的属性配置中为参数 flink.partition-discovery.interval-millis 设置一个非负数的值,表示发现间隔(以毫秒为单位)。
- 点赞
- 收藏
- 关注作者
评论(0)