Flink SQL UDF重复调用问题解决方案
【摘要】 UDF重复调用的问题在某些情况下可能会对Flink SQL用户造成困扰
UDF重复调用问题
UDF重复调用的问题在某些情况下可能会对Flink SQL用户造成困扰,例如下面的SQL语句:
SELECT my_map['key1'] as key1, my_map['key2'] as key2, my_map['key3'] as key3
FROM (
SELECT dump_json_to_map(col1) as my_map
FROM T
)
dump_json_to_map会被执行3次。分析对应的graph日志开看,Flink会把我们的代码反向’优化’成类似如下sql:
SELECT dump_json_to_map(col1)['key1'] as key1, dump_json_to_map(col1)['key2'] as key2, dump_json_to_map(col1)['key3'] as key3 FROM T
会造成性能和正确性的问题:
- UDF包含计算密集型的逻辑,整个作业的性能就会受到很大影响
- UDF是有状态的UDF(如链接Redis等外部存储),则会导致重复计算,中间状态可能因为无法幂等的操作而被破坏,最终导致正确性出现问题
这个Flink社区已有对应的讨论,但是已知没有具体的后续,详见:FLINK-21573
解决方案
解决方案一
修改Flink内核源码,需要团队成员具备维护Flink内核的能力和权力。
参考这篇文章。
思路摘要:
- 复写udf的
isDeterministic()
方法 - 在
CodeGeneratorContext
中添加可重用的UDF表达式及其result term的容器 - 从
ExprCodeGenerator
入手(函数调用都属于RexCall
),找到UDF代码生成的方法,即BridgingFunctionGenUtil#generateScalarFunctionCall()
,if (isDeterministic)块内的代码实现了UDF表达式重用,即重用生成的第一个result term。
解决方案二(推荐)
来自好友kyle大佬的实战经验:增加一层透传专用的UDTF。
实现参考:
public class PassThroughUdtf extends TableFunction<String> {
private static final long serialVersionUID = 1093578798410129502L;
// 仅为示例,需要根据自己的场景修改入参和输出的数据类型
public void eval(String param){
collect(param);
}
}
然后改造下原有SQL
SELECT my_map['key1'] as key1, my_map['key2'] as key2, my_map['key3'] as key3
FROM (
SELECT my_map
FROM T ,lateral table(passThrough(dump_json_to_map(col1))) as T(my_map)
)
增加PassThroughUdtf后对整体性能影响不大,就可以相对简单地解决UDF重复调用的问题。
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)