如何在Spark上运行apache beam

举报
breakDawn 发表于 2020/12/15 17:00:50 2020/12/15
【摘要】 如何用spark提交apachebeam

为了方便,以下面这个名字替换的程序做简单例子:
```java
/**
 * The ReplaceMyName
 * 把Create数组里的myName替换成xxx
 * @since 2019/12/17
 */
public class OutpuMyName {
    public static void main(String[] args) {
        MyTestOption myTestOption = PipelineOptionsFactory
                // 读入args数组
                .fromArgs(args)
                // withValidation指需要对args里的所有参数做校验,如果有不存在Option里的参数键值就会抛异常
                .withValidation()
                // 通过as进行最后的生成操作
                .as(MyTestOption.class);

        Pipeline pipeline = Pipeline.create(myTestOption);
        pipeline.apply(Create.of("lsx ah", "lsx sf", "lsx is me"))
                .apply(ParDo.of(new ReplaceMyNameFn()))
                .apply(ParDo.of(new PrintStrFn()));

        pipeline.run().waitUntilFinish();
    }

    /**
     * 替换名字的转换过程
     */
    static class ReplaceMyNameFn extends DoFn<String, String> {
        @ProcessElement
        public void processElement(ProcessContext context) {
            String inputElement = context.element();

            // 从MytestOption中获取需要的元素
            MyTestOption myTestOption = context.getPipelineOptions().as(MyTestOption.class);
            String myName = myTestOption.getMyName();

            // 替换字符串
            String outputElement = inputElement.replace(myName, "xxx");
            context.output(outputElement);
        }
    }

```

该例子会将Create.of输入的字符串里 进行替换, 把输入参数里的名字替换成xxx。

### 打包
在spark上提交时需要打成jar包,  打包的话pom文件可以参考如下,注意spark-runner包必须添加:
```xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>lsx.beam</groupId>
    <artifactId>beam-demo</artifactId>
    <version>0.0.1</version>

    <properties>
        <beam.version>2.13.0</beam.version>
    </properties>

    <packaging>jar</packaging>

    <dependencies>

        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-runners-direct-java</artifactId>
            <version>${beam.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-runners-spark</artifactId>
            <version>${beam.version}</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.1</version>
                <configuration>
                    <transformers>
                        <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                        <!--<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"/>-->
                    </transformers>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
```

打成jar包后, 就可以用spark-launcher或者spark-submit客户端进行提交了。

## 提交
这里我用spark-submit客户部脚本进行提交,命令行如下:
```shell
/opt/All_Client/Spark2x/spark/bin/spark-submit --class lsx.beam.demo.OutpuMyName --master local  /opt/application_test/beam-demo-0.0.1.jar --runner=SparkRunner --myName=lsx
```
--class 后面跟的是你需要执行的main主类
master我这里选择local模式, 否则无法在控制台看到system.out的输出(如果不用local模式,system.out可能输出在任何节点,以至于无法在client端看到)。
在jar包路径之后, 跟的就是beam的args输入参数了。  注意如果args要提供给option进行初始化,必须为--key=value的形式。

执行后可看到 如下结果(只有local模式才能看到控制台打印):

## 如何定义存储级别
beam-sparkRunner中, 默认存储级别为MEMORY_ONLY(即始终用内存存数据), 如果需要修改存储级别,需要让你的pipelineOption继承自SparkContextOptions
```java
public interface MyTestOption extends SparkContextOptions
```
接着就可以通过setStorageLevel进行存储级别设置
```java
myTestOption.setStorageLevel("MEMORY_AND_DISK_SER");
```

## 如何设置提交应用名
以yarn模式提交,并且当执行pipeline.run()时, 就会往yarn上提交任务。 
如果希望修改提交时的应用名,则可以让option继承SparkContextOptions或者ApplicationNameOptions
然后执行
```java
myTestOption.setAppName("beam_test_application");
```
可看到yarnUI或者sparkUI上看到的appName已被改变
image.png

## 其他问题总结

一、加入了beam-sdks-java-io-hadoop-file-system 这个包,试图读写hdfs文件,但是报告The HadoopFileSystemRegistrar currently only supports at most a single Hadoop configuration

问题定位

那是因为source加载客户端环境变量时, 加载了多个hdfs配置路径, 我是因为包括了Spark2x和Yarn里关于hdfs的配置文件,导致报错

解决方法:

把Yarn客户端删了或者单独下载一份Spark2x并加载对应环境变量即可。

二、提交spark时,报告“Method not supported”错误

问题定位:

一般这类问题都是beam引入的jar包与spark客户端jar包冲突引发,spark依赖的jar包的版本,比我们使用的包的版本低, 例如spark那边连接hive用的是hive-jdbc-1.2.1, 而beam依赖的是hive-jdbc-2.1.0, beam里需要调用hive-jdbc中的一个方法,但是该方法在1.2.1中未实现,发生报错。

解决方法:

  1. maven项目可以通过maven-shade-plugin插件,将有冲突的jar包shade化, 注意relocation中的部分。  <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.1.0</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <relocations> <relocation> <pattern>org.apache.hive</pattern> <shadedPattern>dlg.hive</shadedPattern> </relocation> </relocations> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>

  2. shade之后,重新mvn打包, 项目中引用的冲突包已被替换成dlg.hive,且import处也作了修改,所以项目引用的都是我们替换后的包,所以不会再涉及spark那边的低版本冲突包(注意: 该方法建立在高版本冲突包向下兼容的基础上)

  3. 如果替换后, 报错没有改变,需要查看冲突类是如何加载的。

例如beam中, 对于jdbc driver的加载是通过命令行参数中的dbDriver,根据对应的类名进行加载, 所以即使我们shade之后,程序仍然调用org.apache.hive.hive-jdbc。

这里只要我们修改dbDriver的全类名为dlg.hive.xxxxx即可。

三、用spark提交后,在生成文本阶段报告“Java.io.IOException: Unable to create parent directories for '/opt/.temp-xxxxxxx'”

问题定位:

因为我们是直接写入到“本地”, 这个本地并未指明节点, 而spark是分布式运行的,最终生成在哪个节点的对应路径是不确定的。如果目录不存在或者没有目录写权限,会导致失败

解决方法:

所以我们想要通过beam和spark在本地生成文本文件的话,要保证执行spark-submit命令的目录必须在3个节点都存在,同时该目录具有spark用户可写权限。执行后,去每个spark节点上找一下,看下生成在哪。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。