Flink Data Source

举报
tea_year 发表于 2025/04/30 17:11:42 2025/04/30
【摘要】 Apache Flink 作为一个分布式流批一体处理框架,其数据处理流程始于数据源(Data Source)。Data Source 是Flink程序中负责读取原始数据的组件,决定了数据如何进入Flink系统。Flink提供了丰富的数据源连接器,支持从各种存储系统和消息队列中读取数据。一、内置 Data SourceFlink Data Source 用于定义 Flink 程序的数据来源,F...

Apache Flink 作为一个分布式流批一体处理框架,其数据处理流程始于数据源(Data Source)。Data Source 是Flink程序中负责读取原始数据的组件,决定了数据如何进入Flink系统。Flink提供了丰富的数据源连接器,支持从各种存储系统和消息队列中读取数据。

一、内置 Data Source

Flink Data Source 用于定义 Flink 程序的数据来源,Flink 官方提供了多种数据获取方法,用于帮助开发者简单快速地构建输入流,具体如下:

1.1 基于文件构建

1. readTextFile(path):按照 TextInputFormat 格式读取文本文件,并将其内容以字符串的形式返回。示例如下:

env.readTextFile(filePath).print();

2. readFile(fileInputFormat, path) :按照指定格式读取文件。

3. readFile(inputFormat, filePath, watchType, interval, typeInformation):按照指定格式周期性的读取文件。其中各个参数的含义如下:

  • inputFormat:数据流的输入格式。

  • filePath:文件路径,可以是本地文件系统上的路径,也可以是 HDFS 上的文件路径。

  • watchType:读取方式,它有两个可选值,分别是 FileProcessingMode.PROCESS_ONCEFileProcessingMode.PROCESS_CONTINUOUSLY:前者表示对指定路径上的数据只读取一次,然后退出;后者表示对路径进行定期地扫描和读取。需要注意的是如果 watchType 被设置为 PROCESS_CONTINUOUSLY,那么当文件被修改时,其所有的内容 (包含原有的内容和新增的内容) 都将被重新处理,因此这会打破 Flink 的 exactly-once 语义。

  • interval:定期扫描的时间间隔。

  • typeInformation:输入流中元素的类型。

使用示例如下:

final String filePath = "D:\\log4j.properties";
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.readFile(new TextInputFormat(new Path(filePath)),
             filePath,
             FileProcessingMode.PROCESS_ONCE,
             1,
             BasicTypeInfo.STRING_TYPE_INFO).print();
env.execute();

1.2 基于集合构建

1. fromCollection(Collection):基于集合构建,集合中的所有元素必须是同一类型。示例如下:

env.fromCollection(Arrays.asList(1,2,3,4,5)).print();

2. fromElements(T ...): 基于元素构建,所有元素必须是同一类型。示例如下:

env.fromElements(1,2,3,4,5).print();

3. generateSequence(from, to):基于给定的序列区间进行构建。示例如下:

env.generateSequence(0,100);

4. fromCollection(Iterator, Class):基于迭代器进行构建。第一个参数用于定义迭代器,第二个参数用于定义输出元素的类型。使用示例如下:

env.fromCollection(new CustomIterator(), BasicTypeInfo.INT_TYPE_INFO).print();

其中 CustomIterator 为自定义的迭代器,这里以产生 1 到 100 区间内的数据为例,源码如下。需要注意的是自定义迭代器除了要实现 Iterator 接口外,还必须要实现序列化接口 Serializable ,否则会抛出序列化失败的异常:

import java.io.Serializable;
import java.util.Iterator;public class CustomIterator implements Iterator<Integer>, Serializable {
    private Integer i = 0;
​
    @Override
    public boolean hasNext() {
        return i < 100;
    }
​
    @Override
    public Integer next() {
        i++;
        return i;
    }
}

5. fromParallelCollection(SplittableIterator, Class):方法接收两个参数,第二个参数用于定义输出元素的类型,第一个参数 SplittableIterator 是迭代器的抽象基类,它用于将原始迭代器的值拆分到多个不相交的迭代器中。

1.3 基于 Socket 构建

Flink 提供了 socketTextStream 方法用于构建基于 Socket 的数据流,socketTextStream 方法有以下四个主要参数:

  • hostname:主机名;

  • port:端口号,设置为 0 时,表示端口号自动分配;

  • delimiter:用于分隔每条记录的分隔符;

  • maxRetry:当 Socket 临时关闭时,程序的最大重试间隔,单位为秒。设置为 0 时表示不进行重试;设置为负值则表示一直重试。示例如下:

 env.socketTextStream("192.168.8.120", 9999, "\n", 3).print();

二、自定义 Data Source

2.1 SourceFunction

除了内置的数据源外,用户还可以使用 addSource 方法来添加自定义的数据源。自定义的数据源必须要实现 SourceFunction 接口,这里以产生 [0 , 1000) 区间内的数据为例,代码如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
​
env.addSource(new SourceFunction[Long] {
      var count = 0L
      var isRunning = true
​
      override def cancel(): Unit = isRunning = false
​
      override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
        while (isRunning && count < 1000) {
          // 通过collect将输入发送出去
          ctx.collect(count)
          count += 1
        }
      }
    }).print()
env.execute()

2.2 ParallelSourceFunction 和 RichParallelSourceFunction

上面通过 SourceFunction 实现的数据源是不具有并行度的,即不支持在得到的 DataStream 上调用 setParallelism(n) 方法,此时会抛出如下的异常:

Exception in thread "main" java.lang.IllegalArgumentException: Source: 1 is not a parallel source

如果你想要实现具有并行度的输入流,则需要实现 ParallelSourceFunction 或 RichParallelSourceFunction 接口,其与 SourceFunction 的关系如下图:

flink-RichParallelSourceFunction.png

ParallelSourceFunction 直接继承自 ParallelSourceFunction,具有并行度的功能。RichParallelSourceFunction 则继承自 AbstractRichFunction,同时实现了 ParallelSourceFunction 接口,所以其除了具有并行度的功能外,还提供了额外的与生命周期相关的方法,如 open() ,closen() 。

三、Streaming Connectors

3.1 内置连接器

除了自定义数据源外, Flink 还内置了多种连接器,用于满足大多数的数据收集场景。当前内置连接器的支持情况如下:

  • Apache Kafka (支持 source 和 sink)

  • Apache Cassandra (sink)

  • Amazon Kinesis Streams (source/sink)

  • Elasticsearch (sink)

  • Hadoop FileSystem (sink)

  • RabbitMQ (source/sink)

  • Apache NiFi (source/sink)

  • Twitter Streaming API (source)

  • Google PubSub (source/sink)

除了上述的连接器外,你还可以通过 Apache Bahir 的连接器扩展 Flink。Apache Bahir 旨在为分布式数据分析系统 (如 Spark,Flink) 等提供功能上的扩展,当前其支持的与 Flink 相关的连接器如下:

  • Apache ActiveMQ (source/sink)

  • Apache Flume (sink)

  • Redis (sink)

  • Akka (sink)

  • Netty (source)

随着 Flink 的不断发展,可以预见到其会支持越来越多类型的连接器,关于连接器的后续发展情况,可以查看其官方文档:Streaming Connectors 。在所有 DataSource 连接器中,使用的广泛的就是 Kafka,所以这里我们以其为例,来介绍 Connectors 的整合步骤。

3.2 整合 Kakfa

1. 导入依赖

整合 Kafka 时,一定要注意所使用的 Kafka 的版本,不同版本间所需的 Maven 依赖和开发时所调用的类均不相同,具体如下:

Maven 依赖 Flink 版本 Consumer and Producer 类的名称 Kafka 版本
flink-connector-kafka-0.8_2.11 1.0.0 + FlinkKafkaConsumer08 FlinkKafkaProducer08 0.8.x
flink-connector-kafka-0.9_2.11 1.0.0 + FlinkKafkaConsumer09 FlinkKafkaProducer09 0.9.x
flink-connector-kafka-0.10_2.11 1.2.0 + FlinkKafkaConsumer010 FlinkKafkaProducer010 0.10.x
flink-connector-kafka-0.11_2.11 1.4.0 + FlinkKafkaConsumer011 FlinkKafkaProducer011 0.11.x
flink-connector-kafka_2.11 1.7.0 + FlinkKafkaConsumer FlinkKafkaProducer >= 1.0.0

这里我使用的 Kafka 版本为 kafka_2.12-2.2.0,添加的依赖如下:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.9.1</version>
</dependency>

2. 代码开发

这里以最简单的场景为例,接收 Kafka 上的数据并打印,代码如下:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
// 指定Kafka的连接位置
properties.setProperty("bootstrap.servers", "hadoop001:9092");
// 指定监听的主题,并定义Kafka字节消息到Flink对象之间的转换规则
DataStream<String> stream = env
    .addSource(new FlinkKafkaConsumer<>("flink-stream-in-topic", new SimpleStringSchema(), properties));
stream.print();
env.execute("Flink Streaming");

3.3 整合测试

1. 启动 Kakfa

Kafka 的运行依赖于 zookeeper,需要预先启动,可以启动 Kafka 内置的 zookeeper,也可以启动自己安装的:

# zookeeper启动命令
bin/zkServer.sh start
​
# 内置zookeeper启动命令
bin/zookeeper-server-start.sh config/zookeeper.properties

启动单节点 kafka 用于测试:

# bin/kafka-server-start.sh config/server.properties

2. 创建 Topic

# 创建用于测试主题
bin/kafka-topics.sh --create \
                    --bootstrap-server hadoop001:9092 \
                    --replication-factor 1 \
                    --partitions 1  \
                    --topic flink-stream-in-topic
​
# 查看所有主题
 bin/kafka-topics.sh --list --bootstrap-server hadoop001:9092

3. 启动 Producer

这里 启动一个 Kafka 生产者,用于发送测试数据:

bin/kafka-console-producer.sh --broker-list hadoop001:9092 --topic flink-stream-in-topic

4. 测试结果

在 Producer 上输入任意测试数据,之后观察程序控制台的输出:

flink-kafka-datasource-producer.png

程序控制台的输出如下:

flink-kafka-datasource-console.png

可以看到已经成功接收并打印出相关的数据。

Flink Transformation

一、Transformations 分类

Flink 的 Transformations 操作主要用于将一个和多个 DataStream 按需转换成新的 DataStream。它主要分为以下三类:

  • DataStream Transformations:进行数据流相关转换操作;

  • Physical partitioning:物理分区。Flink 提供的底层 API ,允许用户定义数据的分区规则;

  • Task chaining and resource groups:任务链和资源组。允许用户进行任务链和资源组的细粒度的控制。

以下分别对其主要 API 进行介绍:

二、DataStream Transformations

2.1 Map [DataStream → DataStream]

对一个 DataStream 中的每个元素都执行特定的转换操作:

DataStream<Integer> integerDataStream = env.fromElements(1, 2, 3, 4, 5);
integerDataStream.map((MapFunction<Integer, Object>) value -> value * 2).print();
// 输出 2,4,6,8,10

2.2 FlatMap [DataStream → DataStream]

FlatMap 与 Map 类似,但是 FlatMap 中的一个输入元素可以被映射成一个或者多个输出元素,示例如下:

String string01 = "one one one two two";
String string02 = "third third third four";
DataStream<String> stringDataStream = env.fromElements(string01, string02);
stringDataStream.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String value, Collector<String> out) throws Exception {
        for (String s : value.split(" ")) {
            out.collect(s);
        }
    }
}).print();
// 输出每一个独立的单词,为节省排版,这里去掉换行,后文亦同
one one one two two third third third four

2.3 Filter [DataStream → DataStream]

用于过滤符合条件的数据:

env.fromElements(1, 2, 3, 4, 5).filter(x -> x > 3).print();

2.4 KeyBy 和 Reduce

  • KeyBy [DataStream → KeyedStream] :用于将相同 Key 值的数据分到相同的分区中;

  • Reduce [KeyedStream → DataStream] :用于对数据执行归约计算。

如下例子将数据按照 key 值分区后,滚动进行求和计算:

DataStream<Tuple2<String, Integer>> tuple2DataStream = env.fromElements(new Tuple2<>("a", 1),
                                                                        new Tuple2<>("a", 2), 
                                                                        new Tuple2<>("b", 3), 
                                                                        new Tuple2<>("b", 5));
KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = tuple2DataStream.keyBy(0);
keyedStream.reduce((ReduceFunction<Tuple2<String, Integer>>) (value1, value2) ->
                   new Tuple2<>(value1.f0, value1.f1 + value2.f1)).print();// 持续进行求和计算,输出:
(a,1)
(a,3)
(b,3)
(b,8)

KeyBy 操作存在以下两个限制:

  • KeyBy 操作用于用户自定义的 POJOs 类型时,该自定义类型必须重写 hashCode 方法;

  • KeyBy 操作不能用于数组类型。

2.5 Aggregations [KeyedStream → DataStream]

Aggregations 是官方提供的聚合算子,封装了常用的聚合操作,如上利用 Reduce 进行求和的操作也可以利用 Aggregations 中的 sum 算子重写为下面的形式:

tuple2DataStream.keyBy(0).sum(1).print();

除了 sum 外,Flink 还提供了 min , max , minBy,maxBy 等常用聚合算子:

// 滚动计算指定key的最小值,可以通过index或者fieldName来指定key
keyedStream.min(0);
keyedStream.min("key");
// 滚动计算指定key的最大值
keyedStream.max(0);
keyedStream.max("key");
// 滚动计算指定key的最小值,并返回其对应的元素
keyedStream.minBy(0);
keyedStream.minBy("key");
// 滚动计算指定key的最大值,并返回其对应的元素
keyedStream.maxBy(0);
keyedStream.maxBy("key");

2.6 Union [DataStream* → DataStream]

用于连接两个或者多个元素类型相同的 DataStream 。当然一个 DataStream 也可以与其本生进行连接,此时该 DataStream 中的每个元素都会被获取两次:

DataStreamSource<Tuple2<String, Integer>> streamSource01 = env.fromElements(new Tuple2<>("a", 1), 
                                                                            new Tuple2<>("a", 2));
DataStreamSource<Tuple2<String, Integer>> streamSource02 = env.fromElements(new Tuple2<>("b", 1), 
                                                                            new Tuple2<>("b", 2));
streamSource01.union(streamSource02);
streamSource01.union(streamSource01,streamSource02);

2.7 Connect [DataStream,DataStream → ConnectedStreams]

Connect 操作用于连接两个或者多个类型不同的 DataStream ,其返回的类型是 ConnectedStreams ,此时被连接的多个 DataStreams 可以共享彼此之间的数据状态。但是需要注意的是由于不同 DataStream 之间的数据类型是不同的,如果想要进行后续的计算操作,还需要通过 CoMap 或 CoFlatMap 将 ConnectedStreams 转换回 DataStream:

DataStreamSource<Tuple2<String, Integer>> streamSource01 = env.fromElements(new Tuple2<>("a", 3), 
                                                                            new Tuple2<>("b", 5));
DataStreamSource<Integer> streamSource02 = env.fromElements(2, 3, 9);
// 使用connect进行连接
ConnectedStreams<Tuple2<String, Integer>, Integer> connect = streamSource01.connect(streamSource02);
connect.map(new CoMapFunction<Tuple2<String, Integer>, Integer, Integer>() {
    @Override
    public Integer map1(Tuple2<String, Integer> value) throws Exception {
        return value.f1;
    }
​
    @Override
    public Integer map2(Integer value) throws Exception {
        return value;
    }
}).map(x -> x * 100).print();// 输出:
300 500 200 900 300

2.8 Split 和 Select

  • Split [DataStream → SplitStream]:用于将一个 DataStream 按照指定规则进行拆分为多个 DataStream,需要注意的是这里进行的是逻辑拆分,即 Split 只是将数据贴上不同的类型标签,但最终返回的仍然只是一个 SplitStream;

  • Select [SplitStream → DataStream]:想要从逻辑拆分的 SplitStream 中获取真实的不同类型的 DataStream,需要使用 Select 算子,示例如下:

DataStreamSource<Integer> streamSource = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8);
// 标记
SplitStream<Integer> split = streamSource.split(new OutputSelector<Integer>() {
    @Override
    public Iterable<String> select(Integer value) {
        List<String> output = new ArrayList<String>();
        output.add(value % 2 == 0 ? "even" : "odd");
        return output;
    }
});
// 获取偶数数据集
split.select("even").print();
// 输出 2,4,6,8

2.9 project [DataStream → DataStream]

project 主要用于获取 tuples 中的指定字段集,示例如下:

DataStreamSource<Tuple3<String, Integer, String>> streamSource = env.fromElements(
                                                                         new Tuple3<>("li", 22, "2018-09-23"),
                                                                         new Tuple3<>("ming", 33, "2020-09-23"));
streamSource.project(0,2).print();// 输出
(li,2018-09-23)
(ming,2020-09-23)

总结

  1. 统一Source/Sink API:FLIP-27标准的全面采用

  2. 更多内置连接器:支持更多新兴数据系统

  3. 自适应Source:根据负载动态调整读取速率

  4. 更细粒度监控:提供更详细的Source级别指标

  5. 云原生集成:与各云平台数据服务的深度集成

通过合理选择和配置Data Source,可以充分发挥Flink的流批一体处理能力,构建高效可靠的数据处理管道。在实际项目中,需要根据数据特征、业务需求和系统环境选择最适合的数据源实现方式。

【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。