【SparkSQL笔记】SparkSQL高并发读取数据库和存储数据到数据库

举报
Copy工程师 发表于 2022/01/17 09:25:03 2022/01/17
【摘要】 1. SparkSql 高并发读取数据库SparkSql连接数据库读取数据给了三个API://Construct a DataFrame representing the database table accessible via JDBC URL url named table and connection properties.Dataset<Row> jdbc(String url...

1. SparkSql 高并发读取数据库

SparkSql连接数据库读取数据给了三个API:

//Construct a DataFrame representing the database table accessible via JDBC URL url named table and connection properties.
Dataset<Row> 	jdbc(String url, String table, java.util.Properties properties)
//Construct a DataFrame representing the database table accessible via JDBC URL url named table using connection properties.
Dataset<Row> 	jdbc(String url, String table, String[] predicates, java.util.Properties connectionProperties)
//Construct a DataFrame representing the database table accessible via JDBC URL url named table.
Dataset<Row> 	jdbc(String url, String table, String columnName, long lowerBound, long upperBound, int numPartitions, java.util.Properties connectionProperties)

三个API介绍:

  1. 单个分区,单个task执行,无并发

    遇到数据量很大的表,抽取速度慢。

    实例:

    SparkSession sparkSession = SparkSession.builder().appName("SPARK_FENGDING_TASK1").master("local").config("spark.testing.memory", 471859200).getOrCreate();
    // 配置连接属性
    Properties dbProps = new Properties();
    dbProps.put("user","user");
    dbProps.put("password","pwd");
    dbProps.put("driver","oracle.jdbc.driver.OracleDriver");
    // 连接数据库 获取数据 要使用自己的数据库连接串
    Dataset<Row> tableDf = sparkSession.read().jdbc("jdbc:oracle:thin:@IP:1521:DEMO", "TABLE_DEMO", dbProps);
    // 返回1
    tableDf.rdd().getPartitions();
    

    该API的并发数为1,单分区,不管你留给该任务节点多少资源,都只有一个task执行任务

  2. 任意字段分区

    该API是第二个API,根据设置的分层条件设置并发度:

    def jdbc(
        url: String,
        table: String,
        predicates: Array[String], #这个是分层的条件,一个数组
        connectionProperties: Properties): DataFrame = {
        val parts: Array[Partition] = predicates.zipWithIndex.map { case (part, i) =>
            JDBCPartition(part, i) : Partition
        }
        jdbc(url, table, parts, connectionProperties)
    }
    

    实例:

    // 设置分区条件 通过入库时间 把 10月和11月 的数据 分两个分区
    String[] patitions = {"rksj >= '1569859200' and rksj < '1572537600'","rksj >= '1572537600' and rksj < '1575129600'"};
    // 根据StudentId 分15个分区,就会有15个task抽取数据
    Dataset<Row> tableDf3 = sparkSession.read().jdbc("jdbc:oracle:thin:@IP:1521:DEMO", "TABLE_DEMO",patitions,dbProps);
    // 返回2
    tableDf3.rdd().getPartitions();
    

    该API操作相对自由,就是设置分区条件麻烦一点。

  3. 根据Long类型字段分区
    该API是第三个API,根据设置的分区数并发抽取数据:

    def jdbc(
        url: String,
        table: String,
        columnName: String,    # 根据该字段分区,需要为整形,比如id等
        lowerBound: Long,      # 分区的下界
        upperBound: Long,      # 分区的上界
        numPartitions: Int,    # 分区的个数
        connectionProperties: Properties): DataFrame = {
        val partitioning = JDBCPartitioningInfo(columnName, lowerBound, upperBound, numPartitions)
        val parts = JDBCRelation.columnPartition(partitioning)
        jdbc(url, table, parts, connectionProperties)
    }
    

    实例:

    // 根据StudentId 分15个分区,就会有15个task抽取数据
    Dataset<Row> tableDf2 = sparkSession.read().jdbc("jdbc:oracle:thin:@IP:1521:DEMO", "TABLE_DEMO", "studentId",0,1500,15,dbProps);
    // 返回10
    tableDf2.rdd().getPartitions();
    

    该操作根据分区数设置并发度,缺点是只能用于Long类型字段。

2. 存储数据到数据库

存储数据库API给了Class DataFrameWriter<T>类,该类有存储到文本,Hive,数据库的API。这里只说数据库的API,提一句,如果保存到Text格式,只支持保存一列。。。就很难受。

实例:

有三种写法

// 第一张写法,指定format类型,使用save方法存储数据库
jdbcDF.write()
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .save();
// 第二种写法 使用jdbc写入数据库
jdbcDF2.write()
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);

// 第三种写法,也是使用jdbc,只不过添加createTableColumnTypes,创建表的时候使用该属性字段创建表字段
jdbcDF.write()
  .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);

当我们的表已经存在的时候,使用上面的语句就会报错表已存在,这是因为我们没有指定存储模式,默认是ErrorIfExists

保存模式:

SaveMode 实例 含义
Append SaveMode.Append 当保存DF到数据库,如果表已经存在,我们则会在表中追加数据
Overwrite SaveMode.Overwrite 当保存DF到数据库,如果表已经存在,我们则会重写表的数据,和truncate搭配使用
ErrorIfExists SaveMode.ErrorIfExists 当保存DF到数据库,如果表已经存在,报错,提示表已经存在
Ignore SaveMode.Ignore 当保存DF到数据库,如果表已经存在,不做任何操作

所以一般都是这样用:

tableDf3.write().mode(SaveMode.Append).jdbc("jdbc:oracle:thin:@IP:1521:DEMO", "TABLE_DEMO", connectionProperties);

对于connectionProperties还有很多其他选项:

Property Name Meaning
url The JDBC URL to connect to. The source-specific connection properties may be specified in the URL. e.g., jdbc:postgresql://localhost/test?user=fred&password=secret
dbtable The JDBC table that should be read from or written into. Note that when using it in the read path anything that is valid in a FROM clause of a SQL query can be used. For example, instead of a full table you could also use a subquery in parentheses. It is not allowed to specify dbtable and query options at the same time.
query A query that will be used to read data into Spark. The specified query will be parenthesized and used as a subquery in the FROM clause. Spark will also assign an alias to the subquery clause. As an example, spark will issue a query of the following form to the JDBC Source. SELECT FROM () spark_gen_alias Below are couple of restrictions while using this option. It is not allowed to specify dbtable and query options at the same time. It is not allowed to specify query and partitionColumn options at the same time. When specifying partitionColumn option is required, the subquery can be specified using dbtable option instead and partition columns can be qualified using the subquery alias provided as part of dbtable. Example: spark.read.format("jdbc") .option("url", jdbcUrl) .option("query", "select c1, c2 from t1") .load()
driver The class name of the JDBC driver to use to connect to this URL.
partitionColumn, lowerBound, upperBound These options must all be specified if any of them is specified. In addition, numPartitions must be specified. They describe how to partition the table when reading in parallel from multiple workers. partitionColumn must be a numeric, date, or timestamp column from the table in question. Notice that lowerBound and upperBound are just used to decide the partition stride, not for filtering the rows in table. So all rows in the table will be partitioned and returned. This option applies only to reading.
numPartitions The maximum number of partitions that can be used for parallelism in table reading and writing. This also determines the maximum number of concurrent JDBC connections. If the number of partitions to write exceeds this limit, we decrease it to this limit by calling coalesce(numPartitions) before writing.
queryTimeout The number of seconds the driver will wait for a Statement object to execute to the given number of seconds. Zero means there is no limit. In the write path, this option depends on how JDBC drivers implement the API setQueryTimeout, e.g., the h2 JDBC driver checks the timeout of each query instead of an entire JDBC batch. It defaults to 0.
fetchsize The JDBC fetch size, which determines how many rows to fetch per round trip. This can help performance on JDBC drivers which default to low fetch size (eg. Oracle with 10 rows). This option applies only to reading.
batchsize The JDBC batch size, which determines how many rows to insert per round trip. This can help performance on JDBC drivers. This option applies only to writing. It defaults to 1000.
isolationLevel The transaction isolation level, which applies to current connection. It can be one of NONE, READ_COMMITTED, READ_UNCOMMITTED, REPEATABLE_READ, or SERIALIZABLE, corresponding to standard transaction isolation levels defined by JDBC’s Connection object, with default of READ_UNCOMMITTED. This option applies only to writing. Please refer the documentation in java.sql.Connection.
sessionInitStatement After each database session is opened to the remote DB and before starting to read data, this option executes a custom SQL statement (or a PL/SQL block). Use this to implement session initialization code. Example: option("sessionInitStatement", """BEGIN execute immediate 'alter session set "_serial_direct_read"=true'; END;""")
truncate This is a JDBC writer related option. When SaveMode.Overwrite is enabled, this option causes Spark to truncate an existing table instead of dropping and recreating it. This can be more efficient, and prevents the table metadata (e.g., indices) from being removed. However, it will not work in some cases, such as when the new data has a different schema. It defaults to false. This option applies only to writing.
cascadeTruncate This is a JDBC writer related option. If enabled and supported by the JDBC database (PostgreSQL and Oracle at the moment), this options allows execution of a TRUNCATE TABLE t CASCADE (in the case of PostgreSQL a TRUNCATE TABLE ONLY t CASCADE is executed to prevent inadvertently truncating descendant tables). This will affect other tables, and thus should be used with care. This option applies only to writing. It defaults to the default cascading truncate behaviour of the JDBC database in question, specified in the isCascadeTruncate in each JDBCDialect.
createTableOptions This is a JDBC writer related option. If specified, this option allows setting of database-specific table and partition options when creating a table (e.g., CREATE TABLE t (name string) ENGINE=InnoDB.). This option applies only to writing.
createTableColumnTypes The database column data types to use instead of the defaults, when creating the table. Data type information should be specified in the same format as CREATE TABLE columns syntax (e.g: "name CHAR(64), comments VARCHAR(1024)"). The specified types should be valid spark sql data types. This option applies only to writing.
customSchema The custom schema to use for reading data from JDBC connectors. For example, "id DECIMAL(38, 0), name STRING". You can also specify partial fields, and the others use the default type mapping. For example, "id DECIMAL(38, 0)". The column names should be identical to the corresponding column names of JDBC table. Users can specify the corresponding data types of Spark SQL instead of using the defaults. This option applies only to reading.
pushDownPredicate The option to enable or disable predicate push-down into the JDBC data source. The default value is true, in which case Spark will push down filters to the JDBC data source as much as possible. Otherwise, if set to false, no filter will be pushed down to the JDBC data source and thus all filters will be handled by Spark. Predicate push-down is usually turned off when the predicate filtering is performed faster by Spark than by the JDBC data source.

这里面的truncate就是说当使用SaveMode.Overwrite的时候,设置truncatetrue,就会对表进行truncate语句清理表,不再是删除表在重建表的操作。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。