《Flink原理、实战与性能优化》 —3.4 Flink数据类型
3.4 Flink数据类型
3.4.1 数据类型支持
Flink支持非常完善的数据类型,数据类型的描述信息都是由TypeInformation定义,比较常用的TypeInformation有BasicTypeInfo、TupleTypeInfo、CaseClassTypeInfo、PojoTypeInfo类等。TypeInformation主要作用是为了在Flink系统内有效地对数据结构类型进行管理,能够在分布式计算过程中对数据的类型进行管理和推断。同时基于对数据的类型信息管理,Flink内部对数据存储也进行了相应的性能优化。Flink能够支持任意的Java或Scala的数据类型,不用像Hadoop中的org.apache.hadoop.io.Writable而实现特定的序列化和反序列化接口,从而让用户能够更加容易使用已有的数据结构类型。另外使用TypeInformation管理数据类型信息,能够在数据处理之前将数据类型推断出来,而不是真正在触发计算后才识别出,这样能够及时有效地避免用户在使用Flink编写应用的过程中的数据类型问题。
1. 原生数据类型
Flink通过实现BasicTypeInfo数据类型,能够支持任意Java 原生基本类型(装箱)或 String 类型,例如Integer、String、Double等,如以下代码所示,通过从给定的元素集中创建DataStream数据集。
//创建Int类型的数据集
val intStream:DataStream[Int] = env.fromElements(3, 1, 2, 1, 5)
//创建String类型的数据集
val dataStream: DataStream[String] = env.fromElements("hello", "flink")
Flink实现另外一种TypeInfomation是BasicArrayTypeInfo,对应的是Java基本类型数组(装箱)或 String对象的数组,如下代码通过使用Array数组和List集合创建DataStream数据集。
//通过从数组中创建数据集
val dataStream: DataStream[Int] = env.fromCollection(Array(3, 1, 2, 1, 5))
//通过List集合创建数据集
val dataStream: DataStream[Int] = env.fromCollection(List(3, 1, 2, 1, 5))
2. Java Tuples类型
通过定义TupleTypeInfo来描述Tuple类型数据,Flink在Java接口中定义了元祖类(Tuple)供用户使用。Flink Tuples是固定长度固定类型的Java Tuple实现,不支持空值存储。目前支持任意的Flink Java Tuple类型字段数量上限为25,如果字段数量超过上限,可以通过继承Tuple类的方式进行拓展。如下代码所示,创建Tuple数据类型数据集。
//通过实例化Tuple2创建具有两个元素的数据集
val tupleStream2: DataStream[Tuple2[String, Int]] = env.fromElements(new Tuple2("a",1), new Tuple2("c", 2))
3. Scala Case Class类型
Flink通过实现CaseClassTypeInfo支持任意的Scala Case Class,包括Scala tuples类型,支持的字段数量上限为22,支持通过字段名称和位置索引获取指标,不支持存储空值。如下代码实例所示,定义WordCount Case Class数据类型,然后通过fromElements方法创建input数据集,调用keyBy()方法对数据集根据word字段重新分区。
//定义WordCount Case Class数据结构
case class WordCount(word: String, count: Int)
//通过fromElements方法创建数据集
val input = env.fromElements(WordCount("hello", 1), WordCount("world", 2))
val keyStream1 = input.keyBy("word") // 根据word字段为分区字段,
val keyStream2 = input.keyBy(0) //也可以通过指定position分区
通过使用Scala Tuple创建DataStream数据集,其他的使用方式和Case Class相似。需要注意的是,如果根据名称获取字段,可以使用Tuple中的默认字段名称。
//通过scala Tuple创建具有两个元素的数据集
val tupleStream: DataStream[Tuple2[String, Int]] = env.fromElements(("a", 1),
("c", 2))
//使用默认字段名称获取字段,其中_1表示tuple这种第一个字段
tupleStream.keyBy("_1")
4. POJOs类型
POJOs类可以完成复杂数据结构的定义,Flink通过实现PojoTypeInfo来描述任意的POJOs,包括Java 和Scala类。在Flink中使用POJOs类可以通过字段名称获取字段,例如dataStream.join(otherStream).where("name").equalTo("personName"),对于用户做数据处理则非常透明和简单,如代码清单3-2所示。如果在Flink中使用POJOs数据类型,需要遵循以下要求:
POJOs 类必须是Public修饰且必须独立定义,不能是内部类;
POJOs类中必须含有默认空构造器;
POJOs类中所有的Fields必须是Public或者具有Public修饰的getter和setter方法;
POJOs类中的字段类型必须是Flink支持的。
代码清单3-2 Java POJOs数据类型定义
定义好POJOs Class后,就可以在Flink环境中使用了,如下代码所示,使用fromElements接口构建Person类的数据集。POJOs类仅支持字段名称指定字段,如代码中通过Person name来指定Keyby字段。
val persionStream = env.fromElements(new Person("Peter",14),new
Person("Linda",25))
//通过Person.name来指定Keyby字段
persionStream.keyBy("name")
Scala POJOs数据结构定义如下,使用方式与Java POJOs相同。
class Person(var name: String, var age: Int) {
//默认空构造器
def this() {
this(null, -1)
}
}
5. Flink Value类型
Value数据类型实现了org.apache.flink.types.Value,其中包括read()和write()两个方法完成序列化和反序列化操作,相对于通用的序列化工具会有着比较高效的性能。目前Flink提供了內建的Value类型有IntValue、DoubleValue以及StringValue等,用户可以结合原生数据类型和Value类型使用。
6. 特殊数据类型
在Flink中也支持一些比较特殊的数据数据类型,例如Scala中的List、Map、Either、Option、Try数据类型,以及Java中Either数据类型,还有Hadoop的Writable数据类型。如下代码所示,创建Map和List类型数据集。这种数据类型使用场景不是特别广泛,主要原因是数据中的操作相对不像POJOs类那样方便和透明,用户无法根据字段位置或者名称获取字段信息,同时要借助Types Hint帮助Flink推断数据类型信息,关于Tyeps Hmt介绍可以参考下一小节。
//创建Map类型数据集
val mapStream =
env.fromElements(Map("name"->"Peter","age"->18),Map("name"->"Linda",
"age"->25))
//创建List类型数据集
val listStream = env.fromElements(List(1,2,3,5),List(2,4,3,2))
- 点赞
- 收藏
- 关注作者
评论(0)