beam入门宝典之初次使用
咱们不多废话,先直接来如何简单使用beam框架。  
beam入门宝典之初次使用
这里我不使用常见的wordCount做例子,而是一个大写转小写的例子,语言选用java语言
这个例子里我们会初步学到:
1. 如何建立管道
2. 如何手动生成数据
3. 如何转换
4. 如何查看输出
首先我们要新建1个maven工程,然后在pom.xml中加入如下依赖:
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-runners-direct-java</artifactId>
            <version>${beam.version}</version>
        </dependency>
beam.version版本选择beam官网上最新,笔者编写此文时使用的版本是2.13.0
接着我们新建1个HowToCreateAndShowData类,然后开始例子
建立管道
任何beam程序,都需要先建立1个管道选项option,再建立1个初始管道
// 建立选项 PipelineOptions pipelineOptions = PipelineOptionsFactory.create(); // 建立管道 Pipeline pipeline = Pipeline.create(pipelineOptions);
关于选项option和pipeline的更多用法,后面的章节会继续介绍
手动生成数据
我们有了pipeline之后,就要往里面塞入数据
beam里提供了手动输入数据的方式,如下:
// 生成初始的输入数据 // 相当于往管道里塞入了3个自己写的字符串元素 PCollection<String> pcStart = pipeline.apply( Create.of( "HELLO!", "THIS IS BEAM DEMO!", "HAPPY STUDY!"));
我们调用pipeline的apply方法来输入1个Create对象,里面的元素就是我们的输入元素
并且返回1个PCollection<String>的对象,我们称之为数据集。
\<String\>指的是数据集中数据的类型
如何转换
要实现转换,需要先编写1个DoFn的子类,并实现processElement方法,代码和讲解如下:
    // 把字符串转成小写的转换方法类
    // DoFn<String,String>中的第一个String是输入的类型,第二个String是输出的类型
    static class StrToLowerCaseFn extends DoFn<String, String> {
        /**
         * processElement,过程元素处理方法,类似于spark、mr中的map操作
         * 必须加上@ProcessElement注解,并实现processElement方法
         * @param context
         */
        @ProcessElement
        public void processElement(ProcessContext context) {
            // 从管道中取出的1个元素
            String inputStr = context.element();
            // 转成大写
            String outputStr = inputStr.toLowerCase();
            // 输出结果
            context.output(outputStr);
        }
    }
接着将这个计算方法,用数据集.apply(ParDo.of(计算类))的方式组装到刚才的pcStart中
// 组装小写转换 PCollection<String> pcMid = pcStart.apply(ParDo.of(new StrToLowerCaseFn()));
#### 如何输出
输出的话,我们可以按照上面的方法再编写1个DoFn子类,用于将数据集中输入的元素打印到控制台
    // 打印结果方法类
    // 因为不需要再往下输出,所以
    static class PrintStrFn extends DoFn<String, Void> {
        /**
         * processElement,过程元素处理方法,类似于spark、mr中的map操作
         * 必须加上@ProcessElement注解,并实现processElement方法
         * @param context
         */
        @ProcessElement
        public void processElement(ProcessContext context) {
            // 从管道中取出的1个元素
            String inputStr = context.element();
            // 输出
            System.out.println(inputStr);
        }
    }
然后组装
// 组装输出操作 pcMid.apply(ParDo.of(new PrintStrFn()));
#### 运行
刚才的3次apply结束后,其实转换都还没有开始,仅仅只是组装计算拓扑的1个流程。
真正开始计算需要调用下面的代码:
// 运行结果 pipeline.run().waitUntilFinish();
执行main方法,输出如下结果:

#### 完整代码
  /**
 * The howToCreateAndShowData
 *
 * @since 2019/12/3
 */
public class HowToCreateAndShowData {
    public static void main(String[] args) {
        PipelineOptions pipelineOptions = PipelineOptionsFactory.create();
        Pipeline pipeline = Pipeline.create(pipelineOptions);
        // 生成初始的输入数据
        // 相当于往管道里塞入了3个自己写的字符串元素
        PCollection<String> pcStart = pipeline.apply(
                Create.of(
                        "HELLO!",
                        "THIS IS BEAM DEMO!",
                        "HAPPY STUDY!"));
        // 组装小写转换
        PCollection<String> pcMid = pcStart.apply(ParDo.of(new StrToLowerCaseFn()));
        // 组装输出操作
        pcMid.apply(ParDo.of(new PrintStrFn()));
        // 运行结果
        pipeline.run().waitUntilFinish();
    }
    // 把字符串转成小写的转换方法类
    // DoFn<String,String>中的第一个String是输入的类型,第二个String是输出的类型
    static class StrToLowerCaseFn extends DoFn<String, String> {
        /**
         * processElement,过程元素处理方法,类似于spark、mr中的map操作
         * 必须加上@ProcessElement注解,并实现processElement方法
         * @param context
         */
        @ProcessElement
        public void processElement(ProcessContext context) {
            // 从管道中取出的1个元素
            String inputStr = context.element();
            // 转成大写
            String outputStr = inputStr.toLowerCase();
            // 输出结果
            context.output(outputStr);
        }
    }
    // 打印结果方法类
    // 因为不需要再往下输出,所以
    static class PrintStrFn extends DoFn<String, Void> {
        /**
         * processElement,过程元素处理方法,类似于spark、mr中的map操作
         * 必须加上@ProcessElement注解,并实现processElement方法
         * @param context
         */
        @ProcessElement
        public void processElement(ProcessContext context) {
            // 从管道中取出的1个元素
            String inputStr = context.element();
            // 输出
            System.out.println(inputStr);
        }
    }
}
- 点赞
- 收藏
- 关注作者
 
             
           
评论(0)