beam入门宝典之初次使用

举报
breakDawn 发表于 2020/05/30 11:59:26 2020/05/30
【摘要】 咱们不多废话,先直接来如何简单使用beam框架。 beam入门宝典之初次使用这里我不使用常见的wordCount做例子,而是一个大写转小写的例子,语言选用java语言 这个例子里我们会初步学到: 1. 如何建立管道2. 如何手动生成数据3. 如何转换4. 如何查看输出首先我们要新建1个maven工程,然后在pom.xml中加入如下依赖: <dependency> ...

咱们不多废话,先直接来如何简单使用beam框架。  

beam入门宝典之初次使用

这里我不使用常见的wordCount做例子,而是一个大写转小写的例子,语言选用java语言  

这个例子里我们会初步学到:  

1. 如何建立管道

2. 如何手动生成数据

3. 如何转换

4. 如何查看输出


首先我们要新建1个maven工程,然后在pom.xml中加入如下依赖:

        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-runners-direct-java</artifactId>
            <version>${beam.version}</version>
        </dependency>


beam.version版本选择beam官网上最新,笔者编写此文时使用的版本是2.13.0


接着我们新建1个HowToCreateAndShowData类,然后开始例子


建立管道


任何beam程序,都需要先建立1个管道选项option,再建立1个初始管道

    // 建立选项
    PipelineOptions pipelineOptions = PipelineOptionsFactory.create();
    // 建立管道
    Pipeline pipeline = Pipeline.create(pipelineOptions);


关于选项option和pipeline的更多用法,后面的章节会继续介绍


手动生成数据

我们有了pipeline之后,就要往里面塞入数据  

beam里提供了手动输入数据的方式,如下:

        // 生成初始的输入数据
        // 相当于往管道里塞入了3个自己写的字符串元素
        PCollection<String> pcStart = pipeline.apply(
                Create.of(
                        "HELLO!",
                        "THIS IS BEAM DEMO!",
                        "HAPPY STUDY!"));



我们调用pipeline的apply方法来输入1个Create对象,里面的元素就是我们的输入元素

并且返回1个PCollection<String>的对象,我们称之为数据集。  

\<String\>指的是数据集中数据的类型


如何转换

要实现转换,需要先编写1个DoFn的子类,并实现processElement方法,代码和讲解如下:


    // 把字符串转成小写的转换方法类
    // DoFn<String,String>中的第一个String是输入的类型,第二个String是输出的类型
    static class StrToLowerCaseFn extends DoFn<String, String> {
        /**
         * processElement,过程元素处理方法,类似于spark、mr中的map操作
         * 必须加上@ProcessElement注解,并实现processElement方法
         * @param context
         */
        @ProcessElement
        public void processElement(ProcessContext context) {
            // 从管道中取出的1个元素
            String inputStr = context.element();
            // 转成大写
            String outputStr = inputStr.toLowerCase();
            // 输出结果
            context.output(outputStr);
        }
    }

接着将这个计算方法,用数据集.apply(ParDo.of(计算类))的方式组装到刚才的pcStart中


    // 组装小写转换
    PCollection<String> pcMid = pcStart.apply(ParDo.of(new StrToLowerCaseFn()));



#### 如何输出

输出的话,我们可以按照上面的方法再编写1个DoFn子类,用于将数据集中输入的元素打印到控制台  


    // 打印结果方法类
    // 因为不需要再往下输出,所以
    static class PrintStrFn extends DoFn<String, Void> {
        /**
         * processElement,过程元素处理方法,类似于spark、mr中的map操作
         * 必须加上@ProcessElement注解,并实现processElement方法
         * @param context
         */
        @ProcessElement
        public void processElement(ProcessContext context) {
            // 从管道中取出的1个元素
            String inputStr = context.element();

            // 输出
            System.out.println(inputStr);
        }
    }




然后组装


   // 组装输出操作
   pcMid.apply(ParDo.of(new PrintStrFn()));


#### 运行

刚才的3次apply结束后,其实转换都还没有开始,仅仅只是组装计算拓扑的1个流程。

真正开始计算需要调用下面的代码:

    // 运行结果
    pipeline.run().waitUntilFinish();


执行main方法,输出如下结果:

image.png

  

#### 完整代码

  /**
 * The howToCreateAndShowData
 *
 * @since 2019/12/3
 */
public class HowToCreateAndShowData {
    public static void main(String[] args) {
        PipelineOptions pipelineOptions = PipelineOptionsFactory.create();
        Pipeline pipeline = Pipeline.create(pipelineOptions);

        // 生成初始的输入数据
        // 相当于往管道里塞入了3个自己写的字符串元素
        PCollection<String> pcStart = pipeline.apply(
                Create.of(
                        "HELLO!",
                        "THIS IS BEAM DEMO!",
                        "HAPPY STUDY!"));

        // 组装小写转换
        PCollection<String> pcMid = pcStart.apply(ParDo.of(new StrToLowerCaseFn()));

        // 组装输出操作
        pcMid.apply(ParDo.of(new PrintStrFn()));

        // 运行结果
        pipeline.run().waitUntilFinish();
    }

    // 把字符串转成小写的转换方法类
    // DoFn<String,String>中的第一个String是输入的类型,第二个String是输出的类型
    static class StrToLowerCaseFn extends DoFn<String, String> {
        /**
         * processElement,过程元素处理方法,类似于spark、mr中的map操作
         * 必须加上@ProcessElement注解,并实现processElement方法
         * @param context
         */
        @ProcessElement
        public void processElement(ProcessContext context) {
            // 从管道中取出的1个元素
            String inputStr = context.element();
            // 转成大写
            String outputStr = inputStr.toLowerCase();
            // 输出结果
            context.output(outputStr);
        }
    }

    // 打印结果方法类
    // 因为不需要再往下输出,所以
    static class PrintStrFn extends DoFn<String, Void> {
        /**
         * processElement,过程元素处理方法,类似于spark、mr中的map操作
         * 必须加上@ProcessElement注解,并实现processElement方法
         * @param context
         */
        @ProcessElement
        public void processElement(ProcessContext context) {
            // 从管道中取出的1个元素
            String inputStr = context.element();

            // 输出
            System.out.println(inputStr);
        }
    }
}



【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。