Flink 操作 kafka (一)
theme: condensed-night-purple
携手创作,共同成长!这是我参与「掘金日新计划 · 8 月更文挑战」的第30天,点击查看活动详情
Flink 操作 kafka
https://zhuanlan.zhihu.com/p/92289771
flink 提供了一个特有的 kafka connector 去读写 kafka topic 的数据。这样在 flink 消费 kafka 数据时,就可以通过 flink 内部去跟踪 offset 和设定 checkpoint 去实现 exactly-once 的语义。
在 Flink 中,我们作为 Consumer 时需要用 Source Connectors 代表连接数据源的连接器,作为 Producer 时需要用 Sink Connector 代表连接数据输出的连接器。
Source Connector
Flink Kafka connector 以并行的方式读入事件流,每个并行的 source task 都可以从一个或多个 partition 读入数据。Task 对于每个它当前正在读的 partition 都会追踪当前的 offset ,并将这些 offset 数据存储到它的 checkpoint 中。当发生故障进行恢复时,offset 被取出并重置,使得数据可以在上次检查点时的 offset 继续读数据。Flink Kafka connector 并不依赖于 Kafka 本身的 offset-tracking 机制(也就是consumer groups机制)。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
// 设定配置
val properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
// 设置消费者并添加源
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<String>(
"topic",
new SimpleStringSchema(),
properties))
);
DataStream[String] stream = env.addSource(myConsumer); Copy to clipboardErrorCopied
初始化 FlinkKafkaConsumer 参数
- topic 名字,用来指定消费一个或者多个topic的数据,也可以是正则表达式。
- 反序列化器(schema) ,对消费数据进行反序列化,转换成自定义的数据结构。
- kafka 配置信息:如 zk 地址端口,kafka 地址端口等。此对象至少要包含两个条目
bootstrap.servers
与group.id
。
反序列化器主要通过实现 KeyedDeserializationSchema 或者 DeserializationSchema 接口来完成,flink 内置,也可以自定义。
- 转化为 String 类型
SimpleStringSchema
- 转化为其它类型
TypeInformationSerializationSchema<T>
- 转化为键值对类型
TypeInformationKeyValueSerializationSchema<K, V>
- 转化为 JSON 类型
JSONKeyValueDeserializationSchema
消费起始位置
Flink Kafka Consumer 可以配置指定的 Kafka Partition 的起始位置。
myConsumer.setStartFromEarliest() // start from the earliest record possible(默认)
myConsumer.setStartFromLatest() // start from the latest record
myConsumer.setStartFromTimestamp(...) // start from specified epoch timestamp (milliseconds)
myConsumer.setStartFromGroupOffsets() // the default behaviour
- 点赞
- 收藏
- 关注作者
评论(0)