解读Spark Datasource
在日常的工作中,我们会接触到各种各样的数据库,他们存储的数据也是各式各样。当我们使用Spark去处理数据的时候,我们常常会遇到数据来源于不同数据源的情况。如果重新加载和保存数据的话,会非常的麻烦,还浪费空间,而且很多时候还需要考虑数据格式转换的问题。
为了解决这样的问题,Spark提供了一个框架,叫Datasource。我目前就在学习这个框架的内容,并尝试着用Spark Datesource这个框架实现一个与自建数据源连接的功能。
下面就是我通过学习记录的内容。
1.概念介绍
1.1 什么是Spark Datasource
Spark Datasource是连接外部数据源和Spark引擎的框架,是一个连接器。可以利用这个连接器来对外部的数据源进行一个读写的操作。华为云的DLI支持原生Spark的Datasource能力,并在其基础上进行了扩展。
DLI利用Spark Datasource构建的功能为跨源连接。当前跨源连接分为以下两种:
- 经典型跨源连接
DLI 经典型跨源连接可用于访问CloudTable的Hbase和OpenTSDB,MRS的OpenTSDB,DWS,RDS,CSS数据源。
- 增强型跨源连接
DLI 增强型跨源连接底层采用对等连接,直接打通DLI集群与目的数据源的vpc网络,通过点对点的方式实现数据互通,能够提供比经典型跨源更加灵活的使用场景与更加强劲的性能。增强型跨源支持所有DLI服务已实现的跨源业务,包括CloudTable的Hbase和OpenTSDB,MRS的OpenTSDB,DWS,RDS,CSS,DCS,DDS等数据源。并且通过Spark作业方式能够实现与自建数据源之间的访问。
1.2 Spark Datasource如何使用?
当你使用华为云DLI的跨源连接服务时,首先需要按照官网上面的知识将跨源连接创建好,将队列绑定好。然后就可以创建作业,然后用绑定好的这个队列来运行你的作业。本文主要讲述的是Spark Datasource,所以具体使用讲的是spark作业相关,以scala语言为例。
利用spark作业读取外部数据源的模式总共有两种:
- DataFrame模式
利用DataFrame的API读取外部数据源的方式如下:
val jdbcDF = sparkSession
.read //表示是读取数据(write就是写)
.format("jdbc") //驱动类,这里连接的是jdbc数据源
.option("url", "jdbc:postgresql:dbserver") //option表示的是填入的参数。
.load()
- SparkSql模式
利用Spark SQL的方式读取外部数据源的方式如下:
sparkSession.sql("
CREATE TABLE IF NOT EXISTS dli_to_rds //创建的spark sql表名
USING JDBC OPTIONS //驱动类,表明连接的是jdbc数据源
( 'url'='jdbc:mysql://to-rds-1174404209-cA37siB6.datasource.com:3306',
'driver'='com.mysql.jdbc.Driver' //上述是数据源的一些具体参数。
)")
sparkSession.sql("select * from dli_to_rds")
因为sql的简单易用以及易上手性,所以我们推荐大家使用Spark SQL模式。
2.运行原理
2.1 读流程
def read: DataFrameReader = new DataFrameReader(self)
read对应的是DataFrameReader,这是Spark Datasource读取外部数据源的入口。当我们使用sparkSession.read的时候,就会构造出一个DataFrameReader,这是后面参数设置的基础,其中内置了读取多种数据源的方法,还包括参数配置的接口。
def format(source: String): DataFrameReader = {this.source = source}
format需要填入的是数据源的名称,表明连接的是什么连接源。Spark原生支持的数据源有csv、orc、parquet、jdbc等,通过在format这里填写来表示连接的数据源具体是什么。除此之外,format这里也是匹配自建数据源的一个重要窗口。当我们需要连接自己的数据源时,需要设置源的名字是什么,也就是source是什么,设置好以后通过format这里的填写,Spark Datasource框架会利用Spi机制连接创建我们所需要的连接。
def option(key: String, value: String): DataFrameReader = {
this.extraOptions = this.extraOptions + (key -> value)}
option的作用就是具体的参数的填写。以连接一个jdbc的数据源为例,需要填入的参数有:
- url 这是jdbc数据源的host地址。
- dbtable 这是具体要读取的表的表名。
- user 这是数据源的用户名。
- password 这是数据源的密码。
- driver 这是数据源的驱动器,jdbc的驱动器就是com.mysql.jdbc.Driver。
.load()
最后通过这个load的方法真正建立Spark与数据源之间的连接,并且得到一个DataFrame。
2.2 写流程
def write: DataFrameWriter[T] = {new DataFrameWriter[T](this)}
写的流程是与读流程类似的,区别的是write返回的是一个DataFrameWrite类型对象,这是Spark Datasource写数据的入口。
2.3 连接是如何创建的?
这里主要以读取华为云JDBC数据源流程为例来讲解。
如果不关注Spark Datasource内部是如何运行的话,那么对用使用者来说,load完就是成功建立起连接并读取到外部数据源中的数据了。但是,如果关注Spark Datasource内部是如何运作的话,那么你就会知道,load对于这个流程来说,仅仅是个开始。
def load(paths: String*): DataFrame = {
DataSource.lookupDataSourceV2(source, ....).map {......}.getOrElse(loadV1Source(paths: _*))
}
与load相关的函数主要有两个:
- lookupDataSourceV2
这个函数的主要作用为将source name匹配为RelationProvider的名字。如果source name为jdbc,那么匹配到的名字就是“org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider”。
- loadV1Source
这个函数的主要作用为利用上面函数得到的RelationProvider建立与外部数据源之间的连接,并读取option中设置的数据,最终返回一个包含了我们希望读取到的数据的DataFrame。
private def loadV1Source(paths: String*) = {
...
// Code path for data source v1.
sparkSession.baseRelationToDataFrame((...).resolveRelation())
}
loadV1Source中调用的函数主要为二:
- resolveRelation
这个函数就是通过上面匹配后的source name来返回一个JdbcRelationProvider。在这个函数的内部,通过匹配来判断获取的RelationProvider类型,如果是外部的自建数据源,则会利用java的Spi机制来获取外部的RelationProvider。每个RelationProvider对象中都有着createRelation的功能,在这个函数内部,通过调用这些createRelation方法建立起了与外部数据源的连接,并利用之前的option获取了相关的数据信息。
- baseRelationToDataFrame
这个函数则是将RelationProvider转换为DataFrame。
到这里,一个与外部数据源建立连接并获取相关数据的过程就完成了。
3.DLI是如何使用Spark Datasource的
DLI支持原生Spark的DataSource能力,并在其基础上进行了相应的扩展,能够利用spark作业去访问其他华为云的数据源并导入、查询和分析处理其中的数据。
目前支持DLI跨源访问的服务有:
-
云搜索服务CSS
-
分布式缓存服务DCS
-
文档数据库服务DDS
-
文档数据库服务DDS
-
云数据库RDS
-
MapReduce服务MRS
-
云数据库RDS
-
等
上述服务中,CSS集群存在着安全和非安全两种情况、MRS集群存在着是否开启Kerberos认证的情况,DLI均支持。只需要按照相关的指南配置连接文件和参数,即可利用DLI对CSS、MRS集群进行数据的操作。
- 点赞
- 收藏
- 关注作者
评论(0)