《大数据技术丛书Flink原理、实战与性能优化》—3.4 Flink数据类型

举报
华章计算机 发表于 2019/06/01 22:55:01 2019/06/01
【摘要】 本书摘自《大数据技术丛书Flink原理、实战与性能优化》一书中的第3章,第3.4.1节,编著是张利兵。

3.4 Flink数据类型

3.4.1 数据类型支持

Flink支持非常完善的数据类型,数据类型的描述信息都是由TypeInformation定义,比较常用的TypeInformation有BasicTypeInfo、TupleTypeInfo、CaseClassTypeInfo、PojoTypeInfo类等。TypeInformation主要作用是为了在Flink系统内有效地对数据结构类型进行管理,能够在分布式计算过程中对数据的类型进行管理和推断。同时基于对数据的类型信息管理,Flink内部对数据存储也进行了相应的性能优化。Flink能够支持任意的Java或Scala的数据类型,不用像Hadoop中的org.apache.hadoop.io.Writable而实现特定的序列化和反序列化接口,从而让用户能够更加容易使用已有的数据结构类型。另外使用TypeInformation管理数据类型信息,能够在数据处理之前将数据类型推断出来,而不是真正在触发计算后才识别出,这样能够及时有效地避免用户在使用Flink编写应用的过程中的数据类型问题。

1. 原生数据类型

Flink通过实现BasicTypeInfo数据类型,能够支持任意Java 原生基本类型(装箱)或 String 类型,例如Integer、String、Double等,如以下代码所示,通过从给定的元素集中创建DataStream数据集。

//创建Int类型的数据集

val intStream:DataStream[Int] = env.fromElements(3, 1, 2, 1, 5)

//创建String类型的数据集

val dataStream: DataStream[String] = env.fromElements("hello", "flink")

Flink实现另外一种TypeInfomation是BasicArrayTypeInfo,对应的是Java基本类型数组(装箱)或 String对象的数组,如下代码通过使用Array数组和List集合创建DataStream数据集。

//通过从数组中创建数据集

val dataStream: DataStream[Int] = env.fromCollection(Array(3, 1, 2, 1, 5))

//通过List集合创建数据集

val dataStream: DataStream[Int] = env.fromCollection(List(3, 1, 2, 1, 5))

2. Java Tuples类型

通过定义TupleTypeInfo来描述Tuple类型数据,Flink在Java接口中定义了元祖类(Tuple)供用户使用。Flink Tuples是固定长度固定类型的Java Tuple实现,不支持空值存储。目前支持任意的Flink Java Tuple类型字段数量上限为25,如果字段数量超过上限,可以通过继承Tuple类的方式进行拓展。如下代码所示,创建Tuple数据类型数据集。

//通过实例化Tuple2创建具有两个元素的数据集

val tupleStream2: DataStream[Tuple2[String, Int]] = env.fromElements(new Tuple2("a",1), new Tuple2("c", 2))

3. Scala Case Class类型

Flink通过实现CaseClassTypeInfo支持任意的Scala Case Class,包括Scala tuples类型,支持的字段数量上限为22,支持通过字段名称和位置索引获取指标,不支持存储空值。如下代码实例所示,定义WordCount Case Class数据类型,然后通过fromElements方法创建input数据集,调用keyBy()方法对数据集根据word字段重新分区。

//定义WordCount Case Class数据结构

case class WordCount(word: String, count: Int)

//通过fromElements方法创建数据集

val input = env.fromElements(WordCount("hello", 1), WordCount("world", 2))

val keyStream1 = input.keyBy("word") // 根据word字段为分区字段,

val keyStream2 = input.keyBy(0) //也可以通过指定position分区

通过使用Scala Tuple创建DataStream数据集,其他的使用方式和Case Class相似。需要注意的是,如果根据名称获取字段,可以使用Tuple中的默认字段名称。

//通过scala Tuple创建具有两个元素的数据集

val tupleStream: DataStream[Tuple2[String, Int]] = env.fromElements(("a", 1),

  ("c", 2))

//使用默认字段名称获取字段,其中_1表示tuple这种第一个字段

tupleStream.keyBy("_1")

4. POJOs类型

POJOs类可以完成复杂数据结构的定义,Flink通过实现PojoTypeInfo来描述任意的POJOs,包括Java 和Scala类。在Flink中使用POJOs类可以通过字段名称获取字段,例如dataStream.join(otherStream).where("name").equalTo("personName"),对于用户做数据处理则非常透明和简单,如代码清单3-2所示。如果在Flink中使用POJOs数据类型,需要遵循以下要求:

POJOs 类必须是Public修饰且必须独立定义,不能是内部类;

POJOs类中必须含有默认空构造器;

POJOs类中所有的Fields必须是Public或者具有Public修饰的getter和setter方法;

POJOs类中的字段类型必须是Flink支持的。

代码清单3-2 Java POJOs数据类型定义

image.png

定义好POJOs Class后,就可以在Flink环境中使用了,如下代码所示,使用fromElements接口构建Person类的数据集。POJOs类仅支持字段名称指定字段,如代码中通过Person name来指定Keyby字段。

val persionStream = env.fromElements(new Person("Peter",14),new 

Person("Linda",25))

//通过Person.name来指定Keyby字段

persionStream.keyBy("name")

Scala POJOs数据结构定义如下,使用方式与Java POJOs相同。

class Person(var name: String, var age: Int) {

    //默认空构造器

      def this() {

        this(null, -1)

  }

}

5. Flink Value类型

Value数据类型实现了org.apache.flink.types.Value,其中包括read()和write()两个方法完成序列化和反序列化操作,相对于通用的序列化工具会有着比较高效的性能。目前Flink提供了內建的Value类型有IntValue、DoubleValue以及StringValue等,用户可以结合原生数据类型和Value类型使用。

6. 特殊数据类型

在Flink中也支持一些比较特殊的数据数据类型,例如Scala中的List、Map、Either、Option、Try数据类型,以及Java中Either数据类型,还有Hadoop的Writable数据类型。如下代码所示,创建Map和List类型数据集。这种数据类型使用场景不是特别广泛,主要原因是数据中的操作相对不像POJOs类那样方便和透明,用户无法根据字段位置或者名称获取字段信息,同时要借助Types Hint帮助Flink推断数据类型信息,关于Tyeps Hmt介绍可以参考下一小节。

//创建Map类型数据集

val mapStream = 

env.fromElements(Map("name"->"Peter","age"->18),Map("name"->"Linda",

"age"->25))

//创建List类型数据集

val listStream = env.fromElements(List(1,2,3,5),List(2,4,3,2))


【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。