流数据处理框架2
Flink 操作 kafka
flink 提供了一个特有的 kafka connector 去读写 kafka topic 的数据。这样在 flink 消费 kafka 数据时,就可以通过 flink 内部去跟踪 offset 和设定 checkpoint 去实现 exactly-once 的语义。
在 Flink 中,我们作为 Consumer 时需要用 Source Connectors 代表连接数据源的连接器,作为 Producer 时需要用 Sink Connector 代表连接数据输出的连接器。
Source Connector
Flink Kafka connector 以并行的方式读入事件流,每个并行的 source task 都可以从一个或多个 partition 读入数据。Task 对于每个它当前正在读的 partition 都会追踪当前的 offset ,并将这些 offset 数据存储到它的 checkpoint 中。当发生故障进行恢复时,offset 被取出并重置,使得数据可以在上次检查点时的 offset 继续读数据。Flink Kafka connector 并不依赖于 Kafka 本身的 offset-tracking 机制(也就是consumer groups机制)。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
// 设定配置
val properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
// 设置消费者并添加源
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<String>(
"topic",
new SimpleStringSchema(),
properties))
);
DataStream[String] stream = env.addSource(myConsumer); Copy to clipboardErrorCopied
初始化 FlinkKafkaConsumer 参数
- topic 名字,用来指定消费一个或者多个topic的数据,也可以是正则表达式。
- 反序列化器(schema),对消费数据进行反序列化,转换成自定义的数据结构。
- kafka 配置信息:如 zk 地址端口,kafka 地址端口等。此对象至少要包含两个条目
bootstrap.servers
与group.id
。
反序列化器主要通过实现 KeyedDeserializationSchema 或者 DeserializationSchema 接口来完成,flink 内置,也可以自定义。
- 转化为 String 类型
SimpleStringSchema
- 转化为其它类型
TypeInformationSerializationSchema<T>
- 转化为键值对类型
TypeInformationKeyValueSerializationSchema<K, V>
- 转化为 JSON 类型
JSONKeyValueDeserializationSchema
消费起始位置
Flink Kafka Consumer 可以配置指定的 Kafka Partition 的起始位置。
myConsumer.setStartFromEarliest() // start from the earliest record possible(默认)
myConsumer.setStartFromLatest() // start from the latest record
myConsumer.setStartFromTimestamp(...) // start from specified epoch timestamp (milliseconds)
myConsumer.setStartFromGroupOffsets() // the default behaviourCopy to clipboardErrorCopied
Sink Connector
Flink 提供为 Kafka 0.8 版本后所有 Kafka 版本的 sink connectors。
// 设定数据流
DataStream[String] stream = environment.fromElements("1", "2", "3", "4", "5", "6");
// 设置生产者并添加到 sink
FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<String>(
"localhost:9092",
"topic",
new SimpleStringSchema)
stream.addSink(myProducer)Copy to clipboardErrorCopied
初始化 FlinkKafkaProducer 参数
- broker 列表,要发往的 brokers , 用逗号分割。
- topic 名字,用来指定生产一个或者多个 topic 的数据,也可以是正则表达式。
- 序列化器(schema),对消费数据进行序列化,将目标类型转换成字节数组。
序列化器类比于反序列化器实现:
- 转化为 String 类型
SimpleStringSchema
- 转化为其它类型
TypeInformationSerializationSchema<T>
- 转化为键值对类型
TypeInformationKeyValueSerializationSchema<K, V>
- 转化为 JSON 类型
JSONKeyValueDeserializationSchema
Kakfa 容错机制
在 Kafka 0.9 之前不提供任何机制去保证 at-least-once 或 exactly-once 的语义。 但后续版本的 Kafka 可以通过以下方式来实现出错后恢复且不丢失数据:
- 启用 Checkpoint
在默认启用 Checkpoint 的状况下, FlinkKafkaConsumer 将消费来自 Topic 的记录,并以一致的方式周期性地 Checkpoint 其所有 Kafka Offset 以及其它操作的状态。万一作业失败,Flink 将把流失程序恢复到最新 Checkpoint的状态,并且重新消费 Kafka 中的数据。这确保了在 Kafka Broker 中所提交的 Offset 和 Checkpointed State 中的 Offset 是一致的。此时 FlinkKafkaProducer 可以提供 exactly-once 的投递语义。
如果 Checkpointing 没有启用,KafkaFlinkConsumer 将会周期性的提交 Offset 到 Zookeeper 中去。
- 配置 Semantic 参数
除了启用 Flink 的 Checkpointing,还可以通过传递恰当的 semantic 参数给 FlinkKafkaProducer 选择 3 种不同的操作模式:
emantic.None
: Flink 什么也不会保证,所产生的记录可能会被丢失或者重复。Semantic.AT_LEASET_ONCE
(默认): Flink 保证 at-least-once ,没有记录会被丢失,但可能会重复。Semantic.EXACTLY_ONCE
: 使用 Kafka 的事务机制来保证 exactly-once。
Semantic.EXACTLY_ONCE 模式依赖于提交事务的能力,这些事务是在 taking a checkpoint 之前,从该 Checkpoint 恢复之后启动的。如果 Flink 应用崩溃且完成重启的时间比 Kafka 事务超时的时间大,则数据将会丢失(Kafka 将自动的终止超过超时时间的事务)。请务必根据预期的故障时间来配置你的事务超时。
kafka 分区发现
FlinkKafkaConsumer 支持发现动态创建的 Kafka Partition,并且以 exactly-once 语义保证来消费其中的数据。默认情况下分区发现是禁用的,要启用该特性在提供的属性配置中为参数 flink.partition-discovery.interval-millis 设置一个非负数的值,表示发现间隔(以毫秒为单位)。
- 点赞
- 收藏
- 关注作者
评论(0)