解读Spark Datasource

举报
萌小奇 发表于 2021/09/23 09:09:27 2021/09/23
【摘要】 在日常的工作中,我们会接触到各种各样的数据库,他们存储的数据也是各式各样。当我们使用Spark去处理数据的时候,我们常常会遇到数据来源于不同数据源的情况。如果重新加载和保存数据的话,会非常的麻烦,还浪费空间,而且很多时候还需要考虑数据格式转换的问题。本文主要为讲解Spark Datasource这个连接外部数据源的框架。

在日常的工作中,我们会接触到各种各样的数据库,他们存储的数据也是各式各样。当我们使用Spark去处理数据的时候,我们常常会遇到数据来源于不同数据源的情况。如果重新加载和保存数据的话,会非常的麻烦,还浪费空间,而且很多时候还需要考虑数据格式转换的问题。

为了解决这样的问题,Spark提供了一个框架,叫Datasource。我目前就在学习这个框架的内容,并尝试着用Spark Datesource这个框架实现一个与自建数据源连接的功能。

下面就是我通过学习记录的内容。

1.概念介绍

1.1 什么是Spark Datasource

Spark Datasource是连接外部数据源和Spark引擎的框架,是一个连接器。可以利用这个连接器来对外部的数据源进行一个读写的操作。华为云的DLI支持原生Spark的Datasource能力,并在其基础上进行了扩展。

DLI利用Spark Datasource构建的功能为跨源连接。当前跨源连接分为以下两种:

  • 经典型跨源连接

DLI 经典型跨源连接可用于访问CloudTable的Hbase和OpenTSDB,MRS的OpenTSDB,DWS,RDS,CSS数据源。

  • 增强型跨源连接

DLI 增强型跨源连接底层采用对等连接,直接打通DLI集群与目的数据源的vpc网络,通过点对点的方式实现数据互通,能够提供比经典型跨源更加灵活的使用场景与更加强劲的性能。增强型跨源支持所有DLI服务已实现的跨源业务,包括CloudTable的Hbase和OpenTSDB,MRS的OpenTSDB,DWS,RDS,CSS,DCS,DDS等数据源。并且通过Spark作业方式能够实现与自建数据源之间的访问。

1.2 Spark Datasource如何使用?

当你使用华为云DLI的跨源连接服务时,首先需要按照官网上面的知识将跨源连接创建好,将队列绑定好。然后就可以创建作业,然后用绑定好的这个队列来运行你的作业。本文主要讲述的是Spark Datasource,所以具体使用讲的是spark作业相关,以scala语言为例。

利用spark作业读取外部数据源的模式总共有两种:

  • DataFrame模式

利用DataFrame的API读取外部数据源的方式如下:

val jdbcDF = sparkSession
  .read //表示是读取数据(write就是写)
  .format("jdbc") //驱动类,这里连接的是jdbc数据源
  .option("url", "jdbc:postgresql:dbserver") //option表示的是填入的参数。
  .load() 
  • SparkSql模式

利用Spark SQL的方式读取外部数据源的方式如下:

sparkSession.sql("
CREATE TABLE IF NOT EXISTS dli_to_rds //创建的spark sql表名
USING JDBC OPTIONS  //驱动类,表明连接的是jdbc数据源
(    'url'='jdbc:mysql://to-rds-1174404209-cA37siB6.datasource.com:3306',
     'driver'='com.mysql.jdbc.Driver' //上述是数据源的一些具体参数。
)")

sparkSession.sql("select * from dli_to_rds")

因为sql的简单易用以及易上手性,所以我们推荐大家使用Spark SQL模式。

2.运行原理

2.1 读流程

 def read: DataFrameReader = new DataFrameReader(self)

read对应的是DataFrameReader,这是Spark Datasource读取外部数据源的入口。当我们使用sparkSession.read的时候,就会构造出一个DataFrameReader,这是后面参数设置的基础,其中内置了读取多种数据源的方法,还包括参数配置的接口。

def format(source: String): DataFrameReader = {this.source = source}

format需要填入的是数据源的名称,表明连接的是什么连接源。Spark原生支持的数据源有csv、orc、parquet、jdbc等,通过在format这里填写来表示连接的数据源具体是什么。除此之外,format这里也是匹配自建数据源的一个重要窗口。当我们需要连接自己的数据源时,需要设置源的名字是什么,也就是source是什么,设置好以后通过format这里的填写,Spark Datasource框架会利用Spi机制连接创建我们所需要的连接。

def option(key: String, value: String): DataFrameReader = {
    this.extraOptions = this.extraOptions + (key -> value)}

option的作用就是具体的参数的填写。以连接一个jdbc的数据源为例,需要填入的参数有:

  • url 这是jdbc数据源的host地址。
  • dbtable 这是具体要读取的表的表名。
  • user 这是数据源的用户名。
  • password 这是数据源的密码。
  • driver 这是数据源的驱动器,jdbc的驱动器就是com.mysql.jdbc.Driver。
.load()

最后通过这个load的方法真正建立Spark与数据源之间的连接,并且得到一个DataFrame。

2.2 写流程

 def write: DataFrameWriter[T] = {new DataFrameWriter[T](this)}

写的流程是与读流程类似的,区别的是write返回的是一个DataFrameWrite类型对象,这是Spark Datasource写数据的入口。

2.3 连接是如何创建的?

这里主要以读取华为云JDBC数据源流程为例来讲解。

如果不关注Spark Datasource内部是如何运行的话,那么对用使用者来说,load完就是成功建立起连接并读取到外部数据源中的数据了。但是,如果关注Spark Datasource内部是如何运作的话,那么你就会知道,load对于这个流程来说,仅仅是个开始。

def load(paths: String*): DataFrame = {
    DataSource.lookupDataSourceV2(source, ....).map {......}.getOrElse(loadV1Source(paths: _*))
  }

与load相关的函数主要有两个:

  • lookupDataSourceV2

这个函数的主要作用为将source name匹配为RelationProvider的名字。如果source name为jdbc,那么匹配到的名字就是“org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider”。

  • loadV1Source

这个函数的主要作用为利用上面函数得到的RelationProvider建立与外部数据源之间的连接,并读取option中设置的数据,最终返回一个包含了我们希望读取到的数据的DataFrame。

private def loadV1Source(paths: String*) = {
    ...
    // Code path for data source v1.
    sparkSession.baseRelationToDataFrame((...).resolveRelation())
  }

loadV1Source中调用的函数主要为二:

  • resolveRelation

这个函数就是通过上面匹配后的source name来返回一个JdbcRelationProvider。在这个函数的内部,通过匹配来判断获取的RelationProvider类型,如果是外部的自建数据源,则会利用java的Spi机制来获取外部的RelationProvider。每个RelationProvider对象中都有着createRelation的功能,在这个函数内部,通过调用这些createRelation方法建立起了与外部数据源的连接,并利用之前的option获取了相关的数据信息。

  • baseRelationToDataFrame
    这个函数则是将RelationProvider转换为DataFrame。

到这里,一个与外部数据源建立连接并获取相关数据的过程就完成了。

3.DLI是如何使用Spark Datasource的

DLI支持原生Spark的DataSource能力,并在其基础上进行了相应的扩展,能够利用spark作业去访问其他华为云的数据源并导入、查询和分析处理其中的数据。

目前支持DLI跨源访问的服务有:

  • 云搜索服务CSS

  • 分布式缓存服务DCS

  • 文档数据库服务DDS

  • 文档数据库服务DDS

  • 云数据库RDS

  • MapReduce服务MRS

  • 云数据库RDS

上述服务中,CSS集群存在着安全和非安全两种情况、MRS集群存在着是否开启Kerberos认证的情况,DLI均支持。只需要按照相关的指南配置连接文件和参数,即可利用DLI对CSS、MRS集群进行数据的操作。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。