GaussDB(DWS)《通过JDBC,实现远程COPY导入数据到DWS》

举报
一剑战八荒 发表于 2022/04/29 17:31:30 2022/04/29
【摘要】 JDBC连接DWS执行COPY

通过JDBC,实现远程COPY导入数据到DWS

1       前言

    DWS云数仓的用户,因为业务需要或者进行如TPC-DS等模型性能摸底时。存在从文件导入大量数据到DWS,或从DWS导出数据到ECS上的数据文件的使用需求。DWS云数仓支持JDBC的远程连接,并且管理界面提供DWS数仓对应JDBC驱动的下载。用户仅需下载对应DWS版本的驱动,和java源码同时上传到ECS,进行编译打包后,即可通过JDBC实现COPY导入或者导出。如果用shellJDBC调用进行二次封装,可以更进一步简化使用步骤,降低维护成本。

  当然,JDBC接口不限于COPY的执行,任何DWS支持的sql语法都可执行。本文以最常用的COPY作为实例,详细描述JDBC使用每一步的实现和注意事项。

  长篇大论前,先给出整体思路的流程图,让大家拥有共同的逻辑基础,方便对后续环节的理解。

2     必要文件上传ECS,及ECSJAVA JDK检查

2.1      上传必要文件到ECS目录

  1. root用户在ECS创建目录dws_copy, mkdir /data1/dws_copy
  2. root用户上传java源码和JDBC驱动jar包到ECS/data1/dws_copy目录

Copy.java源码如下,


import java.io.FileInputStream;

import java.io.FileOutputStream;

import java.io.IOException;

import java.sql.Connection;

import java.sql.DriverManager;

import java.sql.SQLException;

import org.postgresql.copy.CopyManager;

import org.postgresql.core.BaseConnection;

 

public class Copy {

    public Copy() {

    }

 

    public static void main(String[] args) {

        if (args.length != 9) {

            System.out.println("arguments number is not right");

            System.out.println("we need hostname port user password in|out database table file delimiter");

        } else {

            String hostname = args[0];

            String port = args[1];

            String username = args[2];

            String password = args[3];

            String action = args[4];

            String dataBase = args[5];

            String tableName = args[6];

            String file = args[7];

            String delimiter = args[8];

            String urls = new String("jdbc:postgresql://" + hostname + ":" + port + "/" + dataBase);

            String driver = "org.postgresql.Driver";

            Connection conn = null;

 

            try {

                Class.forName(driver);

                conn = DriverManager.getConnection(urls, username, password);

            } catch (ClassNotFoundException var18) {

                var18.printStackTrace(System.out);

            } catch (SQLException var19) {

                var19.getSQLState();

                var19.printStackTrace(System.out);

            }

 

            if (action.equals("out")) {

                try {

                    copyToFile(conn, file, tableName, delimiter);

                } catch (SQLException var16) {

                    var16.printStackTrace();

                } catch (IOException var17) {

                    var17.printStackTrace();

                }

            } else if (action.equals("in")) {

                try {

                    copyFromFile(conn, file, tableName, delimiter);

                } catch (SQLException var14) {

                    var14.printStackTrace();

                } catch (IOException var15) {

                    var15.printStackTrace();

                }

            } else {

                System.out.println("please input the corrent action ,in or out");

            }

 

        }

    }

 

    public static void copyFromFile(Connection connection, String filePath, String tableName, String delimiter) throws SQLException, IOException {

        FileInputStream fileInputStream = null;

 

        try {

            CopyManager copyManager = new CopyManager((BaseConnection)connection);

            fileInputStream = new FileInputStream(filePath);

            copyManager.copyIn("COPY " + tableName + " FROM STDIN DELIMITER '" + delimiter + "'", fileInputStream);

        } finally {

            if (fileInputStream != null) {

                try {

                    fileInputStream.close();

                } catch (IOException var11) {

                    var11.printStackTrace();

                }

            }

 

        }

 

    }

 

    public static void copyToFile(Connection connection, String filePath, String tableOrQuery, String delimiter) throws SQLException, IOException {

        FileOutputStream fileOutputStream = null;

 

        try {

            CopyManager copyManager = new CopyManager((BaseConnection)connection);

            fileOutputStream = new FileOutputStream(filePath);

            copyManager.copyOut("COPY " + tableOrQuery + " TO STDOUT DELIMITER '" + delimiter + "'", fileOutputStream);

        } finally {

            if (fileOutputStream != null) {

                try {

                    fileOutputStream.close();

                } catch (IOException var11) {

                    var11.printStackTrace();

                }

            }

 

        }

 

    }

}

1:代码中,String urls = new String("jdbc:postgresql://" + hostname + ":" + port + "/" + dataBase) 代码段

DWS集群的JDBC连接字符串,可以从管理面的集群连接页获取具体对应的字符串;

2:以上代码主要包括参数校验和copy功能实现两部分,更多开发信息,请参考华为云官网文档JDBC开发章节,https://support.huaweicloud.com/devg2-dws/dws_0402_0065.html

  1. 创建目录/data1/data_file,上传或生成用于copy的数据文件

2.2      检查部署ECS java jdk

  1. root用户,在ECS上执行java –version;javac –version;jar –version;

检查java JDK

  1. 若有缺少的JDK组件,请自行下载部署

3     制作可执行jar包并进行shell二次封装

  1. 进入/data1/dws_copy目录,编译java源码,javac Copy.class –cp driver.jar
  2. 进入/data1/dws_copy目录,打包java源码为jar包,

2.1,编写保存manifest文件:

Manifest-Version: 1.0

Main-Class: Copy

Class-Path: [JDBC驱动jar包名称]

注:manifest文件中, ’:’后有一个空格

  1. 生成jarjarjar cvfm Copy.jar manifest ./Copy.class ./manifest
  2. java –jar执行时,需要大量的入参,命令长且不易维护。固使用shelljava –jar命令执行进行二次封装,达到简化执行和维护的目的(入参写到到配置文件读取执行)

示例shell脚本run_jdbc_copy.sh代码如下,主要用于单表或多表并发COPY ECS上本地文件的数据到DWS集群内的表中,用户可以根据自己的需求,自行设计

#!/bin/bash

script_path=$(cd `dirname $0`;pwd)

source ${script_path}/jdbc_copy.conf

 

function usage()

{

echo "This srcipt is used to copy tpch/tpcds data to remote database.Before running the script,u must configured the file: jdbc_copy.conf."

echo -e " "

echo "u can run the script as following:"

echo -e " "

echo "  1.copy data in batch mode: sh run_jdbc_copy.sh batch"

echo -e " "

echo "  2.copy data in single mode: sh run_jdbc_copy.sh single [tablename]"

}

 

function batch_copy()

{

echo "[INFO]: Batch mode copying data to ${remote_ip} database ${db_name} by JDBC started."

for t in `cat ${table_list}`

do

  table_name=`echo "${t}"|awk -F '.' {'print$1'}`

 

  echo "[INFO]: Copying data to ${table_name}."

  java -jar ${script_path}/Copy.jar ${remote_ip} ${db_port} ${db_user} ${user_passwd} in ${db_name} ${table_name} ${data_abs_path}/${t} "|" > /dev/null 2>&1

 

  if [ $? -eq 0 ]

    then  echo "[INFO]: Successfully copied data to ${table_name}."

  else

    echo "[WARNIG]: Something unexpected happened.plz using single mode to check."

  fi

done

echo "[INFO]: Batch mode copyed data to ${remote_ip} database ${db_name} by JDBC done."

}

 

function single_copy()

{

echo "[INFO]: Single mode copying data to ${remote_ip} database ${db_name} $1 by JDBC started."

java -jar ${script_path}/Copy.jar ${remote_ip} ${db_port} ${db_user} ${user_passwd} in ${db_name} $1 ${data_abs_path}/$1.* "|"

echo "[INFO]: Single mode copyed data to $1 done."

}

 

case $1 in

batch)batch_copy;;

single)single_copy $2;;

*)echo "[ERROR]: Invalid para.";usage;;

esac

注:其中jdbc_copy.conf配置文件,内容如下:

remote_ip=192.168.xxx.xxx  (dws集群连接域名或内网IP)

db_port=8000  (C连接端口号)

db_user=user  (数据库用户)

user_passwd=passwd  (数据库用户密码)

db_name=gaussdb  (连接的目标数据库名称)

data_abs_path=/data1/data_file/  (Copy from数据文件的本地路径)

delimiter="|"  (读取数据文件的分隔符)

table_list="/data1/dws_copy/table.lst"  (要执行copy的目标表清单,格式为每行schema.tablename)

4     执行shell,通过JDCB实现COPY功能

4.1      执行shell,实现从ECSDWS集群的Copy操作

root用户,

cd /data1/dws_copy;

copy数据到单张表:

sh run_jdbc_copy.sh single_copy ss.col_t

copy数据到多张表:

添加如下到/data1/dws_copy/table.lst

ss.t1

ss.t2

ss.t3

保存后执行,

sh run_jdbc_copy.sh batch_copy

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。