Pyflink 1.14简介
简介
Flink版本1.14
PyFlink功能还不是特别完善,本文使用PyFlink的版本为1.14,相较于1.14之前的版本有了很大的更新:
1、支持State TTL
2、Python Profile
3、print打印日志
4、Local Debug
State TTL
State TTL是flink很早就支持的版本,而pyflink直到1.14才支持
// From pyflink/examples/state_access.py
from pyflink.common import Time
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext
from pyflink.datastream.state import ValueStateDescriptor, StateTtlConfig
class Sum(KeyedProcessFunction):
def __init__(self):
self.state = None
def open(self, runtime_context: RuntimeContext):
# 定义state ttl的方式和java/scala的方式是一致的
state_descriptor = ValueStateDescriptor("state", Types.FLOAT())
state_ttl_config = StateTtlConfig \
.new_builder(Time.seconds(1)) \
.set_update_type(StateTtlConfig.UpdateType.OnReadAndWrite) \
.disable_cleanup_in_background() \
.build()
state_descriptor.enable_time_to_live(state_ttl_config)
self.state = runtime_context.get_state(state_descriptor)
def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
# retrieve the current count
current = self.state.value()
if current is None:
current = 0
# update the state's count
current += value[1]
self.state.update(current)
yield value[0], current
Python Profile
Flink 1.14通过使用cProfile + pstats的方式来统计定位Python的性能,Task manager会定时在log文件中打印python worker的profile信息
# From pyflink/fn_execution/profiler.py
import cProfile
import pstats
class Profiler(object):
def __init__(self):
self._pr = cProfile.Profile()
def start(self):
self._pr.enable()
def close(self):
self._pr.disable()
ps = pstats.Stats(self._pr).sort_stats('cumulative')
ps.print_stats()
下面是开启profile的参数,python.profile.enable设为true即开启profile功能,profiler会对每个bundle数据处理完后打印profile信息,因此bundle.size和bundle.time决定了profile的打印频率。
python.profile.enabled: true // 开启python profile功能
python.fn-execution.bundle.size: 100000 // Python UDF一批次处理的最大数据量
python.fn-execution.bundle.time: 1000 // 每批次的等待时间(单位毫秒)
print打印日志
1.14之前pyflink打印自定义的 log 信息必须使用 Python 自定义的 logging 模块,1.14支持print打印日志
Local Debug
在 1.14 以前,用户如果使用 Python 自定义的函数在本地开发 PyFlink 作业,必须使用 remote debug 方式调试自定义逻辑,但它使用起来相对比较繁琐,而且使用门槛较高。在 1.14 改变了这种模式,如果在本地编写一个 PyFlink 作业使用了 Python 自定义函数,可以自动切到 local debug 模式,可以在 ide 里面直接 debug 自定义 Python 函数。
- 点赞
- 收藏
- 关注作者
评论(0)