beam入门宝典之初次使用
咱们不多废话,先直接来如何简单使用beam框架。
beam入门宝典之初次使用
这里我不使用常见的wordCount做例子,而是一个大写转小写的例子,语言选用java语言
这个例子里我们会初步学到:
1. 如何建立管道
2. 如何手动生成数据
3. 如何转换
4. 如何查看输出
首先我们要新建1个maven工程,然后在pom.xml中加入如下依赖:
<dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-direct-java</artifactId> <version>${beam.version}</version> </dependency>
beam.version版本选择beam官网上最新,笔者编写此文时使用的版本是2.13.0
接着我们新建1个HowToCreateAndShowData类,然后开始例子
建立管道
任何beam程序,都需要先建立1个管道选项option,再建立1个初始管道
// 建立选项 PipelineOptions pipelineOptions = PipelineOptionsFactory.create(); // 建立管道 Pipeline pipeline = Pipeline.create(pipelineOptions);
关于选项option和pipeline的更多用法,后面的章节会继续介绍
手动生成数据
我们有了pipeline之后,就要往里面塞入数据
beam里提供了手动输入数据的方式,如下:
// 生成初始的输入数据 // 相当于往管道里塞入了3个自己写的字符串元素 PCollection<String> pcStart = pipeline.apply( Create.of( "HELLO!", "THIS IS BEAM DEMO!", "HAPPY STUDY!"));
我们调用pipeline的apply方法来输入1个Create对象,里面的元素就是我们的输入元素
并且返回1个PCollection<String>的对象,我们称之为数据集。
\<String\>指的是数据集中数据的类型
如何转换
要实现转换,需要先编写1个DoFn的子类,并实现processElement方法,代码和讲解如下:
// 把字符串转成小写的转换方法类 // DoFn<String,String>中的第一个String是输入的类型,第二个String是输出的类型 static class StrToLowerCaseFn extends DoFn<String, String> { /** * processElement,过程元素处理方法,类似于spark、mr中的map操作 * 必须加上@ProcessElement注解,并实现processElement方法 * @param context */ @ProcessElement public void processElement(ProcessContext context) { // 从管道中取出的1个元素 String inputStr = context.element(); // 转成大写 String outputStr = inputStr.toLowerCase(); // 输出结果 context.output(outputStr); } }
接着将这个计算方法,用数据集.apply(ParDo.of(计算类))的方式组装到刚才的pcStart中
// 组装小写转换 PCollection<String> pcMid = pcStart.apply(ParDo.of(new StrToLowerCaseFn()));
#### 如何输出
输出的话,我们可以按照上面的方法再编写1个DoFn子类,用于将数据集中输入的元素打印到控制台
// 打印结果方法类 // 因为不需要再往下输出,所以 static class PrintStrFn extends DoFn<String, Void> { /** * processElement,过程元素处理方法,类似于spark、mr中的map操作 * 必须加上@ProcessElement注解,并实现processElement方法 * @param context */ @ProcessElement public void processElement(ProcessContext context) { // 从管道中取出的1个元素 String inputStr = context.element(); // 输出 System.out.println(inputStr); } }
然后组装
// 组装输出操作 pcMid.apply(ParDo.of(new PrintStrFn()));
#### 运行
刚才的3次apply结束后,其实转换都还没有开始,仅仅只是组装计算拓扑的1个流程。
真正开始计算需要调用下面的代码:
// 运行结果 pipeline.run().waitUntilFinish();
执行main方法,输出如下结果:
#### 完整代码
/** * The howToCreateAndShowData * * @since 2019/12/3 */ public class HowToCreateAndShowData { public static void main(String[] args) { PipelineOptions pipelineOptions = PipelineOptionsFactory.create(); Pipeline pipeline = Pipeline.create(pipelineOptions); // 生成初始的输入数据 // 相当于往管道里塞入了3个自己写的字符串元素 PCollection<String> pcStart = pipeline.apply( Create.of( "HELLO!", "THIS IS BEAM DEMO!", "HAPPY STUDY!")); // 组装小写转换 PCollection<String> pcMid = pcStart.apply(ParDo.of(new StrToLowerCaseFn())); // 组装输出操作 pcMid.apply(ParDo.of(new PrintStrFn())); // 运行结果 pipeline.run().waitUntilFinish(); } // 把字符串转成小写的转换方法类 // DoFn<String,String>中的第一个String是输入的类型,第二个String是输出的类型 static class StrToLowerCaseFn extends DoFn<String, String> { /** * processElement,过程元素处理方法,类似于spark、mr中的map操作 * 必须加上@ProcessElement注解,并实现processElement方法 * @param context */ @ProcessElement public void processElement(ProcessContext context) { // 从管道中取出的1个元素 String inputStr = context.element(); // 转成大写 String outputStr = inputStr.toLowerCase(); // 输出结果 context.output(outputStr); } } // 打印结果方法类 // 因为不需要再往下输出,所以 static class PrintStrFn extends DoFn<String, Void> { /** * processElement,过程元素处理方法,类似于spark、mr中的map操作 * 必须加上@ProcessElement注解,并实现processElement方法 * @param context */ @ProcessElement public void processElement(ProcessContext context) { // 从管道中取出的1个元素 String inputStr = context.element(); // 输出 System.out.println(inputStr); } } }
- 点赞
- 收藏
- 关注作者
评论(0)