PyFlink实现自定义SourceFunction
【摘要】 本文主要介绍pyflink实现自定义SourceFunction的两种方法
简介
pyflink当前是无法像Map、FlatMap一样定义python UDF而实现Source UDF的,而是需要先实现Java SourceFunction,然后在python作业中引入。
// pyflink中SourceFunction的定义
class SourceFunction(JavaFunctionWrapper):
"""
Base class for all stream data source in Flink.
"""
def __init__(self, source_func: Union[str, JavaObject]):
"""
Constructor of SinkFunction.
:param source_func: The java SourceFunction object.
"""
super(SourceFunction, self).__init__(source_func)
方法一
首先先用java实现简单的SourceFunction。
public class MyCustomSourceFunction implements SourceFunction<Row> {
private static final String[] NAMES = {"Bob", "Marry", "Henry", "Mike", "Ted", "Jack"};
public void run(SourceContext sourceContext) {
Random random = new Random();
for (int i = 0; i < NAMES.length; i++) {
Row row = Row.of(i, NAMES[i]);
sourceContext.collect(row);
}
}
@Override
public void cancel() {}
}
之后再python作业中引入SourceFunction
# 直接填写类路径,实现对java source的引用
custom_source = SourceFunction("org.apache.flink.python.util.MyCustomSourceFunction")
# add_source
ds = env.add_source(custom_source, type_info=Types.ROW([Types.INT(), Types.STRING()]))
方法二
方法一已经能够实现简单的Source了,但是存在一个问题,就是方法一无法传递参数,比如要实现一个RedisSource,希望通过python作业传递参数,如redis的host和port,此时方法一是无法做到的,因此才有了方法二。
package com.test;
public class MyRedisSourceFunction implements SourceFunction<String> {
private string host;
private int port;
public MyRedisSourceFunction(string host, int port) {
this.host = host;
this.port = port;
}
public void run(SourceContext sourceContext) {
Jedis jedis = xxxx; // 初始化jedis
while(true) {
// 连接redis,获取数据
}
}
@Override
public void cancel() {}
}
此时python该怎么写呢?首先需要定义一个Python class,在内部初始化Java Redis Source。
# 实现Python sourcefunction,内部初始化java sourcefunction
class PyRedisSourceFunction(SourceFunction):
def __init__(self, host, port):
# 获取Java RedisSource类
JMyRedisSourceFunction = get_gateway().jvm.com.test.MyRedisSourceFunction
# 初始化Java redis source
j_redis_source = JMyRedisSourceFunction(host, port)
# 调用父类__init__
super(PyRedisSourceFunction, self).__init__(sink_func=j_redis_source)
然后在main函数中直接初始化Python class
host = 127.0.0.1
port = 12345
# 初始化Python class
redis_source = PyRedisSourceFunction(host, port)
# add_source
ds = env.add_source(redis_source, type_info=Types.STRING())
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)