Flink 操作 kafka (二)

举报
武师叔 发表于 2022/09/29 16:36:13 2022/09/29
【摘要】 theme: condensed-night-purple携手创作,共同成长!这是我参与「掘金日新计划 · 8 月更文挑战」的第30天,点击查看活动详情 Sink ConnectorFlink 提供为 Kafka 0.8 版本后所有 Kafka 版本的 sink connectors。// 设定数据流DataStream[String] stream = environment.fromE...

theme: condensed-night-purple

携手创作,共同成长!这是我参与「掘金日新计划 · 8 月更文挑战」的第30天,点击查看活动详情

Sink Connector

Flink 提供为 Kafka 0.8 版本后所有 Kafka 版本的 sink connectors。

// 设定数据流
DataStream[String] stream = environment.fromElements("1", "2", "3", "4", "5", "6");

// 设置生产者并添加到 sink
FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<String>(
  "localhost:9092",                            
  "topic",                                     
  new SimpleStringSchema)                    

stream.addSink(myProducer)Copy to clipboardErrorCopied

初始化 FlinkKafkaProducer 参数

  1. broker 列表,要发往的 brokers , 用逗号分割。
  2. topic 名字,用来指定生产一个或者多个 topic 的数据,也可以是正则表达式。
  3. 序列化器(schema) ,对消费数据进行序列化,将目标类型转换成字节数组。

序列化器类比于反序列化器实现:

  • 转化为 String 类型 SimpleStringSchema
  • 转化为其它类型 TypeInformationSerializationSchema<T>
  • 转化为键值对类型 TypeInformationKeyValueSerializationSchema<K, V>
  • 转化为 JSON 类型 JSONKeyValueDeserializationSchema

Kakfa 容错机制

在 Kafka 0.9 之前不提供任何机制去保证 at-least-once 或 exactly-once 的语义。 但后续版本的 Kafka 可以通过以下方式来实现出错后恢复且不丢失数据:

  1. 启用 Checkpoint

在默认启用 Checkpoint 的状况下, FlinkKafkaConsumer 将消费来自 Topic 的记录,并以一致的方式周期性地 Checkpoint 其所有 Kafka Offset 以及其它操作的状态。万一作业失败,Flink 将把流失程序恢复到最新 Checkpoint的状态,并且重新消费 Kafka 中的数据。这确保了在 Kafka Broker 中所提交的 Offset 和 Checkpointed State 中的 Offset 是一致的。此时 FlinkKafkaProducer 可以提供 exactly-once 的投递语义。

如果 Checkpointing 没有启用,KafkaFlinkConsumer 将会周期性的提交 Offset 到 Zookeeper 中去。

  • 配置 Semantic 参数

除了启用 Flink 的 Checkpointing,还可以通过传递恰当的 semantic 参数给 FlinkKafkaProducer 选择 3 种不同的操作模式:

  • emantic.None : Flink 什么也不会保证,所产生的记录可能会被丢失或者重复。
  • Semantic.AT_LEASET_ONCE(默认): Flink 保证 at-least-once ,没有记录会被丢失,但可能会重复。
  • Semantic.EXACTLY_ONCE : 使用 Kafka 的事务机制来保证 exactly-once。

Semantic.EXACTLY_ONCE 模式依赖于提交事务的能力,这些事务是在 taking a checkpoint 之前,从该 Checkpoint 恢复之后启动的。如果 Flink 应用崩溃且完成重启的时间比 Kafka 事务超时的时间大,则数据将会丢失(Kafka 将自动的终止超过超时时间的事务)。请务必根据预期的故障时间来配置你的事务超时。

kafka 分区发现

FlinkKafkaConsumer 支持发现动态创建的 Kafka Partition,并且以 exactly-once 语义保证来消费其中的数据。默认情况下分区发现是禁用的,要启用该特性在提供的属性配置中为参数 flink.partition-discovery.interval-millis 设置一个非负数的值,表示发现间隔(以毫秒为单位)。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。