如何在Spark上运行apache beam
为了方便,以下面这个名字替换的程序做简单例子:
```java
/**
* The ReplaceMyName
* 把Create数组里的myName替换成xxx
* @since 2019/12/17
*/
public class OutpuMyName {
public static void main(String[] args) {
MyTestOption myTestOption = PipelineOptionsFactory
// 读入args数组
.fromArgs(args)
// withValidation指需要对args里的所有参数做校验,如果有不存在Option里的参数键值就会抛异常
.withValidation()
// 通过as进行最后的生成操作
.as(MyTestOption.class);
Pipeline pipeline = Pipeline.create(myTestOption);
pipeline.apply(Create.of("lsx ah", "lsx sf", "lsx is me"))
.apply(ParDo.of(new ReplaceMyNameFn()))
.apply(ParDo.of(new PrintStrFn()));
pipeline.run().waitUntilFinish();
}
/**
* 替换名字的转换过程
*/
static class ReplaceMyNameFn extends DoFn<String, String> {
@ProcessElement
public void processElement(ProcessContext context) {
String inputElement = context.element();
// 从MytestOption中获取需要的元素
MyTestOption myTestOption = context.getPipelineOptions().as(MyTestOption.class);
String myName = myTestOption.getMyName();
// 替换字符串
String outputElement = inputElement.replace(myName, "xxx");
context.output(outputElement);
}
}
```
该例子会将Create.of输入的字符串里 进行替换, 把输入参数里的名字替换成xxx。
### 打包
在spark上提交时需要打成jar包, 打包的话pom文件可以参考如下,注意spark-runner包必须添加:
```xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>lsx.beam</groupId>
<artifactId>beam-demo</artifactId>
<version>0.0.1</version>
<properties>
<beam.version>2.13.0</beam.version>
</properties>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>${beam.version}</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-spark</artifactId>
<version>${beam.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<!--<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"/>-->
</transformers>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
```
打成jar包后, 就可以用spark-launcher或者spark-submit客户端进行提交了。
## 提交
这里我用spark-submit客户部脚本进行提交,命令行如下:
```shell
/opt/All_Client/Spark2x/spark/bin/spark-submit --class lsx.beam.demo.OutpuMyName --master local /opt/application_test/beam-demo-0.0.1.jar --runner=SparkRunner --myName=lsx
```
--class 后面跟的是你需要执行的main主类
master我这里选择local模式, 否则无法在控制台看到system.out的输出(如果不用local模式,system.out可能输出在任何节点,以至于无法在client端看到)。
在jar包路径之后, 跟的就是beam的args输入参数了。 注意如果args要提供给option进行初始化,必须为--key=value的形式。
执行后可看到 如下结果(只有local模式才能看到控制台打印):
## 如何定义存储级别
beam-sparkRunner中, 默认存储级别为MEMORY_ONLY(即始终用内存存数据), 如果需要修改存储级别,需要让你的pipelineOption继承自SparkContextOptions
```java
public interface MyTestOption extends SparkContextOptions
```
接着就可以通过setStorageLevel进行存储级别设置
```java
myTestOption.setStorageLevel("MEMORY_AND_DISK_SER");
```
## 如何设置提交应用名
以yarn模式提交,并且当执行pipeline.run()时, 就会往yarn上提交任务。
如果希望修改提交时的应用名,则可以让option继承SparkContextOptions或者ApplicationNameOptions
然后执行
```java
myTestOption.setAppName("beam_test_application");
```
可看到yarnUI或者sparkUI上看到的appName已被改变
## 其他问题总结
一、加入了beam-sdks-java-io-hadoop-file-system 这个包,试图读写hdfs文件,但是报告The HadoopFileSystemRegistrar currently only supports at most a single Hadoop configuration
问题定位
那是因为source加载客户端环境变量时, 加载了多个hdfs配置路径, 我是因为包括了Spark2x和Yarn里关于hdfs的配置文件,导致报错
解决方法:
把Yarn客户端删了或者单独下载一份Spark2x并加载对应环境变量即可。
二、提交spark时,报告“Method not supported”错误
问题定位:
一般这类问题都是beam引入的jar包与spark客户端jar包冲突引发,spark依赖的jar包的版本,比我们使用的包的版本低, 例如spark那边连接hive用的是hive-jdbc-1.2.1, 而beam依赖的是hive-jdbc-2.1.0, beam里需要调用hive-jdbc中的一个方法,但是该方法在1.2.1中未实现,发生报错。
解决方法:
-
maven项目可以通过maven-shade-plugin插件,将有冲突的jar包shade化, 注意relocation中的部分。
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.1.0</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <relocations> <relocation> <pattern>org.apache.hive</pattern> <shadedPattern>dlg.hive</shadedPattern> </relocation> </relocations> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
-
shade之后,重新mvn打包, 项目中引用的冲突包已被替换成dlg.hive,且import处也作了修改,所以项目引用的都是我们替换后的包,所以不会再涉及spark那边的低版本冲突包(注意: 该方法建立在高版本冲突包向下兼容的基础上)
-
如果替换后, 报错没有改变,需要查看冲突类是如何加载的。
例如beam中, 对于jdbc driver的加载是通过命令行参数中的dbDriver,根据对应的类名进行加载, 所以即使我们shade之后,程序仍然调用org.apache.hive.hive-jdbc。
这里只要我们修改dbDriver的全类名为dlg.hive.xxxxx即可。
三、用spark提交后,在生成文本阶段报告“Java.io.IOException: Unable to create parent directories for '/opt/.temp-xxxxxxx'”
问题定位:
因为我们是直接写入到“本地”, 这个本地并未指明节点, 而spark是分布式运行的,最终生成在哪个节点的对应路径是不确定的。如果目录不存在或者没有目录写权限,会导致失败
解决方法:
所以我们想要通过beam和spark在本地生成文本文件的话,要保证执行spark-submit命令的目录必须在3个节点都存在,同时该目录具有spark用户可写权限。执行后,去每个spark节点上找一下,看下生成在哪。
- 点赞
- 收藏
- 关注作者
评论(0)