PyFlink实现自定义SourceFunction

举报
想要一只猫 发表于 2022/06/13 00:39:39 2022/06/13
【摘要】 本文主要介绍pyflink实现自定义SourceFunction的两种方法

简介

pyflink当前是无法像Map、FlatMap一样定义python UDF而实现Source UDF的,而是需要先实现Java SourceFunction,然后在python作业中引入。

// pyflink中SourceFunction的定义

class SourceFunction(JavaFunctionWrapper):
    """
    Base class for all stream data source in Flink.
    """

    def __init__(self, source_func: Union[str, JavaObject]):
        """
        Constructor of SinkFunction.

        :param source_func: The java SourceFunction object.
        """
        super(SourceFunction, self).__init__(source_func)

方法一

首先先用java实现简单的SourceFunction。

public class MyCustomSourceFunction implements SourceFunction<Row> {

    private static final String[] NAMES = {"Bob", "Marry", "Henry", "Mike", "Ted", "Jack"};

    public void run(SourceContext sourceContext) {
        Random random = new Random();
        for (int i = 0; i < NAMES.length; i++) {
            Row row = Row.of(i, NAMES[i]);
            sourceContext.collect(row);
        }
    }

    @Override
    public void cancel() {}
}

之后再python作业中引入SourceFunction

# 直接填写类路径,实现对java source的引用
custom_source = SourceFunction("org.apache.flink.python.util.MyCustomSourceFunction") 

# add_source
ds = env.add_source(custom_source, type_info=Types.ROW([Types.INT(), Types.STRING()]))

方法二

方法一已经能够实现简单的Source了,但是存在一个问题,就是方法一无法传递参数,比如要实现一个RedisSource,希望通过python作业传递参数,如redis的host和port,此时方法一是无法做到的,因此才有了方法二。

package com.test;

public class MyRedisSourceFunction implements SourceFunction<String> {
    private string host;
    private int port;

    public MyRedisSourceFunction(string host, int port) {
        this.host = host;
        this.port = port;
    }    

    public void run(SourceContext sourceContext) {
         Jedis jedis = xxxx; // 初始化jedis
         while(true) {
              // 连接redis,获取数据
         }
    }

    @Override
    public void cancel() {}
}

此时python该怎么写呢?首先需要定义一个Python class,在内部初始化Java Redis Source。

# 实现Python sourcefunction,内部初始化java sourcefunction
class PyRedisSourceFunction(SourceFunction):

    def __init__(self, host, port):
        # 获取Java RedisSource类
        JMyRedisSourceFunction = get_gateway().jvm.com.test.MyRedisSourceFunction
        
        # 初始化Java redis source
        j_redis_source = JMyRedisSourceFunction(host, port)
        
        # 调用父类__init__
        super(PyRedisSourceFunction, self).__init__(sink_func=j_redis_source)

然后在main函数中直接初始化Python class

host = 127.0.0.1
port = 12345
# 初始化Python class
redis_source = PyRedisSourceFunction(host, port) 

# add_source
ds = env.add_source(redis_source, type_info=Types.STRING())


【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。