GaussDB(DWS)《通过JDBC,实现远程COPY导入数据到DWS》
通过JDBC,实现远程COPY导入数据到DWS
1 前言
DWS云数仓的用户,因为业务需要或者进行如TPC-DS等模型性能摸底时。存在从文件导入大量数据到DWS,或从DWS导出数据到ECS上的数据文件的使用需求。DWS云数仓支持JDBC的远程连接,并且管理界面提供DWS数仓对应JDBC驱动的下载。用户仅需下载对应DWS版本的驱动,和java源码同时上传到ECS,进行编译打包后,即可通过JDBC实现COPY导入或者导出。如果用shell对JDBC调用进行二次封装,可以更进一步简化使用步骤,降低维护成本。
当然,JDBC接口不限于COPY的执行,任何DWS支持的sql语法都可执行。本文以最常用的COPY作为实例,详细描述JDBC使用每一步的实现和注意事项。
长篇大论前,先给出整体思路的流程图,让大家拥有共同的逻辑基础,方便对后续环节的理解。
2 必要文件上传ECS,及ECS上JAVA JDK检查
2.1 上传必要文件到ECS目录
- root用户在ECS创建目录dws_copy, mkdir /data1/dws_copy
- root用户上传java源码和JDBC驱动jar包到ECS的/data1/dws_copy目录
Copy.java源码如下,
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import org.postgresql.copy.CopyManager;
import org.postgresql.core.BaseConnection;
public class Copy {
public Copy() {
}
public static void main(String[] args) {
if (args.length != 9) {
System.out.println("arguments number is not right");
System.out.println("we need hostname port user password in|out database table file delimiter");
} else {
String hostname = args[0];
String port = args[1];
String username = args[2];
String password = args[3];
String action = args[4];
String dataBase = args[5];
String tableName = args[6];
String file = args[7];
String delimiter = args[8];
String urls = new String("jdbc:postgresql://" + hostname + ":" + port + "/" + dataBase);
String driver = "org.postgresql.Driver";
Connection conn = null;
try {
Class.forName(driver);
conn = DriverManager.getConnection(urls, username, password);
} catch (ClassNotFoundException var18) {
var18.printStackTrace(System.out);
} catch (SQLException var19) {
var19.getSQLState();
var19.printStackTrace(System.out);
}
if (action.equals("out")) {
try {
copyToFile(conn, file, tableName, delimiter);
} catch (SQLException var16) {
var16.printStackTrace();
} catch (IOException var17) {
var17.printStackTrace();
}
} else if (action.equals("in")) {
try {
copyFromFile(conn, file, tableName, delimiter);
} catch (SQLException var14) {
var14.printStackTrace();
} catch (IOException var15) {
var15.printStackTrace();
}
} else {
System.out.println("please input the corrent action ,in or out");
}
}
}
public static void copyFromFile(Connection connection, String filePath, String tableName, String delimiter) throws SQLException, IOException {
FileInputStream fileInputStream = null;
try {
CopyManager copyManager = new CopyManager((BaseConnection)connection);
fileInputStream = new FileInputStream(filePath);
copyManager.copyIn("COPY " + tableName + " FROM STDIN DELIMITER '" + delimiter + "'", fileInputStream);
} finally {
if (fileInputStream != null) {
try {
fileInputStream.close();
} catch (IOException var11) {
var11.printStackTrace();
}
}
}
}
public static void copyToFile(Connection connection, String filePath, String tableOrQuery, String delimiter) throws SQLException, IOException {
FileOutputStream fileOutputStream = null;
try {
CopyManager copyManager = new CopyManager((BaseConnection)connection);
fileOutputStream = new FileOutputStream(filePath);
copyManager.copyOut("COPY " + tableOrQuery + " TO STDOUT DELIMITER '" + delimiter + "'", fileOutputStream);
} finally {
if (fileOutputStream != null) {
try {
fileOutputStream.close();
} catch (IOException var11) {
var11.printStackTrace();
}
}
}
}
}
注1:代码中,String urls = new String("jdbc:postgresql://" + hostname + ":" + port + "/" + dataBase) 代码段
是DWS集群的JDBC连接字符串,可以从管理面的集群连接页获取具体对应的字符串;
注2:以上代码主要包括参数校验和copy功能实现两部分,更多开发信息,请参考华为云官网文档JDBC开发章节,https://support.huaweicloud.com/devg2-dws/dws_0402_0065.html;
- 创建目录/data1/data_file,上传或生成用于copy的数据文件
2.2 检查部署ECS java jdk
- root用户,在ECS上执行java –version;javac –version;jar –version;
检查java JDK
- 若有缺少的JDK组件,请自行下载部署
3 制作可执行jar包并进行shell二次封装
- 进入/data1/dws_copy目录,编译java源码,javac Copy.class –cp driver.jar
- 进入/data1/dws_copy目录,打包java源码为jar包,
2.1,编写保存manifest文件:
Manifest-Version: 1.0
Main-Class: Copy
Class-Path: [JDBC驱动jar包名称]
注:manifest文件中, ’:’后有一个空格
- 生成jar包jar, jar cvfm Copy.jar manifest ./Copy.class ./manifest
- java –jar执行时,需要大量的入参,命令长且不易维护。固使用shell对java –jar命令执行进行二次封装,达到简化执行和维护的目的(入参写到到配置文件读取执行)。
示例shell脚本run_jdbc_copy.sh代码如下,主要用于单表或多表并发COPY ECS上本地文件的数据到DWS集群内的表中,用户可以根据自己的需求,自行设计
#!/bin/bash
script_path=$(cd `dirname $0`;pwd)
source ${script_path}/jdbc_copy.conf
function usage()
{
echo "This srcipt is used to copy tpch/tpcds data to remote database.Before running the script,u must configured the file: jdbc_copy.conf."
echo -e " "
echo "u can run the script as following:"
echo -e " "
echo " 1.copy data in batch mode: sh run_jdbc_copy.sh batch"
echo -e " "
echo " 2.copy data in single mode: sh run_jdbc_copy.sh single [tablename]"
}
function batch_copy()
{
echo "[INFO]: Batch mode copying data to ${remote_ip} database ${db_name} by JDBC started."
for t in `cat ${table_list}`
do
table_name=`echo "${t}"|awk -F '.' {'print$1'}`
echo "[INFO]: Copying data to ${table_name}."
java -jar ${script_path}/Copy.jar ${remote_ip} ${db_port} ${db_user} ${user_passwd} in ${db_name} ${table_name} ${data_abs_path}/${t} "|" > /dev/null 2>&1
if [ $? -eq 0 ]
then echo "[INFO]: Successfully copied data to ${table_name}."
else
echo "[WARNIG]: Something unexpected happened.plz using single mode to check."
fi
done
echo "[INFO]: Batch mode copyed data to ${remote_ip} database ${db_name} by JDBC done."
}
function single_copy()
{
echo "[INFO]: Single mode copying data to ${remote_ip} database ${db_name} $1 by JDBC started."
java -jar ${script_path}/Copy.jar ${remote_ip} ${db_port} ${db_user} ${user_passwd} in ${db_name} $1 ${data_abs_path}/$1.* "|"
echo "[INFO]: Single mode copyed data to $1 done."
}
case $1 in
batch)batch_copy;;
single)single_copy $2;;
*)echo "[ERROR]: Invalid para.";usage;;
esac
注:其中jdbc_copy.conf配置文件,内容如下:
remote_ip=192.168.xxx.xxx (dws集群连接域名或内网IP)
db_port=8000 (C连接端口号)
db_user=user (数据库用户)
user_passwd=passwd (数据库用户密码)
db_name=gaussdb (连接的目标数据库名称)
data_abs_path=/data1/data_file/ (Copy from数据文件的本地路径)
delimiter="|" (读取数据文件的分隔符)
table_list="/data1/dws_copy/table.lst" (要执行copy的目标表清单,格式为每行schema.tablename)
4 执行shell,通过JDCB实现COPY功能
4.1 执行shell,实现从ECS对DWS集群的Copy操作
root用户,
cd /data1/dws_copy;
copy数据到单张表:
sh run_jdbc_copy.sh single_copy ss.col_t
copy数据到多张表:
添加如下到/data1/dws_copy/table.lst,
ss.t1
ss.t2
ss.t3
保存后执行,
sh run_jdbc_copy.sh batch_copy
- 点赞
- 收藏
- 关注作者
评论(0)