《Flink原理、实战与性能优化》 —3.4.2 TypeInformation信息获取
3.4.2 TypeInformation信息获取
通常情况下Flink都能正常进行数据类型推断,并选择合适的serializers以及comparators。但在某些情况下却无法直接做到,例如定义函数时如果使用到了泛型,JVM就会出现类型擦除的问题,使得Flink并不能很容易地获取到数据集中的数据类型信息。同时在Scala API和Java API中,Flink分别使用了不同的方式重构了数据类型信息。
1. Scala API类型信息
Scala API通过使用Manifest和类标签,在编译器运行时获取类型信息,即使是在函数定义中使用了泛型,也不会像Java API出现类型擦除的问题,这使得Scala API具有非常精密的类型管理机制。同时在Flink中使用到Scala Macros框架,在编译代码的过程中推断函数输入参数和返回值的类型信息,同时在Flink中注册成TypeInformation以支持上层计算算子使用。
当使用Scala API开发Flink应用,如果使用到Flink已经通过TypeInformation定义的数据类型,TypeInformation类不会自动创建,而是使用隐式参数的方式引入,代码不会直接抛出编码异常,但是当启动Flink应用程序时就会报”could not find implicit value for evidence parameter of type TypeInformation”的错误。这时需要将TypeInformation类隐式参数引入到当前程序环境中,代码实例如下:
import org.apache.flink.api.scala._
2. Java API类型信息
由于Java的泛型会出现类型擦除问题,Flink通过Java反射机制尽可能重构类型信息,例如使用函数签名以及子类的信息等。同时类型推断在当输出类型依赖于输入参数类型时相对比较容易做到,但是如果函数的输出类型不依赖于输入参数的类型信息,这个时候就需要借助于类型提示(Ctype Himts)来告诉系统函数中传入的参数类型信息和输出参数信息。如代码清单3-3通过在returns方法中传入TypeHint<Integer>实例指定输出参数类型,帮助Flink系统对输出类型进行数据类型参数的推断和收集。
代码清单3-3 定义Type Hint输出类型参数
在使用Java API定义POJOs类型数据时,PojoTypeInformation为POJOs类中的所有字段创建序列化器,对于标准的类型,例如Integer、String、Long等类型是通过Flink自带的序列化器进行数据序列化,对于其他类型数据都是直接调用Kryo序列化工具来进行序列化。
通常情况下,如果Kryo序列化工具无法对POJOs类序列化时,可以使用Avro对POJOs类进行序列化,如下代码通过在ExecutionConfig中调用enableForceAvro()来开启Avro序列化。
ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
//开启Avro序列化方式
env.getConfig().enableForceAvro();
如果用户想使用Kryo序列化工具来序列化POJOs所有字段,则在ExecutionConfig中调用enableForceKryo()来开启Kryo序列化。
final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableForceKryo();
如果默认的Kryo序列化类不能序列化POJOs对象,通过调用ExecutionConfig的addDefault-KryoSerializer()方法向Kryo中添加自定义的序列化器。
env.getConfig().addDefaultKryoSerializer(Class<?> type, Class<? extends
Serializer<?>> serializerClass)
3. 自定义TypeInformation
除了使用已有的TypeInformation所定义的数据格式类型之外,用户也可以自定义实现TypeInformation,来满足的不同的数据类型定义需求。Flink提供了可插拔的Type Information Factory让用户将自定义的TypeInformation注册到Flink类型系统中。如下代码所示只需要通过实现org.apache.flink.api.common.typeinfo.TypeInfoFactory接口,返回相应的类型信息。
通过@TypeInfo注解创建数据类型,定义CustomTuple数据类型。
@TypeInfo(CustomTypeInfoFactory.class)
public class CustomTuple<T0, T1> {
public T0 field0;
public T1 field1;
}
然后定义CustomTypeInfoFactory类继承于TypeInfoFactory,参数类型指定CustomTuple。最后重写createTypeInfo方法,创建的CustomTupleTypeInfo就是CustomTuple数据类型TypeInformation。
public class CustomTypeInfoFactory extends TypeInfoFactory<CustomTuple> {
@Override
public TypeInformation<CustomTuple> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParameters) {
return new CustomTupleTypeInfo(genericParameters.get("T0"), genericParameters.get("T1"));
}
}
- 点赞
- 收藏
- 关注作者
评论(0)