Flink 操作 kafka (一)

举报
武师叔 发表于 2022/09/29 16:36:57 2022/09/29
【摘要】 theme: condensed-night-purple携手创作,共同成长!这是我参与「掘金日新计划 · 8 月更文挑战」的第30天,点击查看活动详情 Flink 操作 kafkahttps://zhuanlan.zhihu.com/p/92289771flink 提供了一个特有的 kafka connector 去读写 kafka topic 的数据。这样在 flink 消费 kafk...

theme: condensed-night-purple

携手创作,共同成长!这是我参与「掘金日新计划 · 8 月更文挑战」的第30天,点击查看活动详情

Flink 操作 kafka

https://zhuanlan.zhihu.com/p/92289771

flink 提供了一个特有的 kafka connector 去读写 kafka topic 的数据。这样在 flink 消费 kafka 数据时,就可以通过 flink 内部去跟踪 offset 和设定 checkpoint 去实现 exactly-once 的语义。

在 Flink 中,我们作为 Consumer 时需要用 Source Connectors 代表连接数据源的连接器,作为 Producer 时需要用 Sink Connector 代表连接数据输出的连接器。

Source Connector

Flink Kafka connector 以并行的方式读入事件流,每个并行的 source task 都可以从一个或多个 partition 读入数据。Task 对于每个它当前正在读的 partition 都会追踪当前的 offset ,并将这些 offset 数据存储到它的 checkpoint 中。当发生故障进行恢复时,offset 被取出并重置,使得数据可以在上次检查点时的 offset 继续读数据。Flink Kafka connector 并不依赖于 Kafka 本身的 offset-tracking 机制(也就是consumer groups机制)。

source

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()

// 设定配置
val properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");

// 设置消费者并添加源
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<String>(
    "topic",                                 
    new SimpleStringSchema(),                
    properties))   
);
DataStream[String] stream = env.addSource(myConsumer);                    Copy to clipboardErrorCopied

初始化 FlinkKafkaConsumer 参数

  1. topic 名字,用来指定消费一个或者多个topic的数据,也可以是正则表达式。
  2. 反序列化器(schema) ,对消费数据进行反序列化,转换成自定义的数据结构。
  3. kafka 配置信息:如 zk 地址端口,kafka 地址端口等。此对象至少要包含两个条目 bootstrap.servers 与 group.id

反序列化器主要通过实现 KeyedDeserializationSchema 或者 DeserializationSchema 接口来完成,flink 内置,也可以自定义。

  • 转化为 String 类型 SimpleStringSchema
  • 转化为其它类型 TypeInformationSerializationSchema<T>
  • 转化为键值对类型 TypeInformationKeyValueSerializationSchema<K, V>
  • 转化为 JSON 类型 JSONKeyValueDeserializationSchema

消费起始位置

Flink Kafka Consumer 可以配置指定的 Kafka Partition 的起始位置。

myConsumer.setStartFromEarliest()             // start from the earliest record possible(默认)
myConsumer.setStartFromLatest()               // start from the latest record
myConsumer.setStartFromTimestamp(...)         // start from specified epoch timestamp (milliseconds)
myConsumer.setStartFromGroupOffsets()         // the default behaviour
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。