Pyflink 1.14简介

举报
想要一只猫 发表于 2022/05/29 00:07:00 2022/05/29
【摘要】 本文主要介绍pyflink 1.14的一些特性

简介

Flink版本1.14

PyFlink功能还不是特别完善,本文使用PyFlink的版本为1.14,相较于1.14之前的版本有了很大的更新:

1、支持State TTL

2、Python Profile

3、print打印日志

4、Local Debug

State TTL

State TTL是flink很早就支持的版本,而pyflink直到1.14才支持

// From pyflink/examples/state_access.py

from pyflink.common import Time
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext
from pyflink.datastream.state import ValueStateDescriptor, StateTtlConfig

class Sum(KeyedProcessFunction):

    def __init__(self):
        self.state = None

    def open(self, runtime_context: RuntimeContext):
        # 定义state ttl的方式和java/scala的方式是一致的
        state_descriptor = ValueStateDescriptor("state", Types.FLOAT())
        state_ttl_config = StateTtlConfig \
            .new_builder(Time.seconds(1)) \
            .set_update_type(StateTtlConfig.UpdateType.OnReadAndWrite) \
            .disable_cleanup_in_background() \
            .build()
        state_descriptor.enable_time_to_live(state_ttl_config)
        self.state = runtime_context.get_state(state_descriptor)

    def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
        # retrieve the current count
        current = self.state.value()
        if current is None:
            current = 0

        # update the state's count
        current += value[1]
        self.state.update(current)

        yield value[0], current

Python Profile

Flink 1.14通过使用cProfile + pstats的方式来统计定位Python的性能,Task manager会定时在log文件中打印python worker的profile信息

# From pyflink/fn_execution/profiler.py
import cProfile
import pstats

class Profiler(object):
    def __init__(self):
        self._pr = cProfile.Profile()

    def start(self):
        self._pr.enable()

    def close(self):
        self._pr.disable()
        ps = pstats.Stats(self._pr).sort_stats('cumulative')
        ps.print_stats()

下面是开启profile的参数,python.profile.enable设为true即开启profile功能,profiler会对每个bundle数据处理完后打印profile信息,因此bundle.size和bundle.time决定了profile的打印频率。

python.profile.enabled: true              // 开启python profile功能
python.fn-execution.bundle.size: 100000   // Python UDF一批次处理的最大数据量
python.fn-execution.bundle.time: 1000     // 每批次的等待时间(单位毫秒) 

print打印日志

1.14之前pyflink打印自定义的 log 信息必须使用 Python 自定义的 logging 模块,1.14支持print打印日志

Local Debug

在 1.14 以前,用户如果使用 Python 自定义的函数在本地开发 PyFlink 作业,必须使用 remote debug 方式调试自定义逻辑,但它使用起来相对比较繁琐,而且使用门槛较高。在 1.14 改变了这种模式,如果在本地编写一个 PyFlink 作业使用了 Python 自定义函数,可以自动切到 local debug 模式,可以在 ide 里面直接 debug 自定义 Python 函数。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。