GaussDB(DWS)数据融合系列第十一期:第三方调度工具Kettle介绍
概述
kettle是什么?
ETL是什么
Kettle是一款国外开元的ETL工具,可以完成数据的抽取、转换、装入和加载。
ETL(Extract-Transform-Load),即数据抽取、转换、装载的过程。它是一种思想,主要是说,从不同的数据源获取数据,并通过对数据进行处理(格式,协议等转换),最后将处理后的数据提供给其他系统使用。当然这个过程,就是软件研发,尤其是后端研发最核心的工作。
ETL过程
目前,ETL工具的典型代表有:
商业软件:Informatica、IBM Datastage、Oracle ODI、Microsoft SSIS…
开源软件:Kettle、Talend、CloverETL、Ketl,Octopus …
kettle概念
kettle,翻译为中文叫做水壶,显而易见,水壶不管壶里面装的是什么液体,最终都会从壶嘴平滑的流出来。就好比,不管水壶装的是什么类型的数据,最后都会通过壶嘴以特定的格式流出来。其实就是对ETL思想的一种实现,它是通过java语言编写,秉承ETL思想的工具。既然是此采用java实现的,那就肯定具有跨平台的特性。kettle官网地址
初始kettle
kettle组成
kettle既然是一种工具,要处理不同数据源的异构数据,那就要求它能够进行图形化操作,在UI界面上对数据源进行数据治理,最终所有的图形化处理都要能够保存为kettle可识别的文件。
kettle主要生成两种类型的文件,一种转换文件,一种是任务文件,即:transformation和job,transformation完成针对数据的基础转换,job则完成整个工作流的控制。
这两种文件之间是可以相互调用的,已达到最终的数据清洗目的。
kettle开发工具
kettle转换
kettle任务
环境安装
1、jdk1.8(jdk的版本很重要!!!);
2、7.1.0.0kettle安装包;
3、mysql driver驱动放在安装包的lib目录下(用于连接mysql数据库);
4、下载后点击spoon.bat即可运行kettle;
优缺点
优点:
1、 页面简洁,操作简单,技术门槛较低,不需要创建dataset和写sql语句;
2、 支持多种数据源的组合读取与写入,包括数据库的连接、文件的读取等;
3、 功能齐全,可处理的应用场景宽泛。
4、 调试功能强大,可直接预览中间结果查看报错等;
缺点:
1、 入门成本较高,组件数目较多,需要提前了解各个组件的作用才能配合应用;
2、 环境安装成本较高,包括包的大小,环境的准备等;
3、 不支持sql语句,细节较多;
kettle使用场景(demo)
- 场景一:获取rest接口数据并保存为文本文件
将rest接口作为数据源,将物联网数据上传到kettle文件中,供kettle进行转换处理,并将结果保存到txt文件中;从下图可以看出,有一个节点叫做获取参数,这个节点就是用来接收rest接口传递来的数据。还有一个节点叫做响应结果,这个节点是用来将遥感数据保存到txt文件中。
处理rest接口数据
- 场景二:任务调度(周期性执行场景一)
如果想要周期性的执行转换,就需要采用任务调度的方式进行处理。针对场景一,对其进行任务调度。从下图可以看出,有一个节点叫做转换,转换节点配置的就是场景一的转换文件;还有一个节点叫做START,就是用来启动任务调度过程的。
任务调度过程
通过Kettle高效导入数据至DWS
准备工作:在使用Kettle对接DWS前,请先安装号Kettle,并确保网络和DWS可以连通。
一、 配置Kettle的DWS数据库连接
可选:用户可以使用PG原生驱动,或使用DWS提供的驱动替换PG驱动。在Kettle\data-integration\libext\JDBC目录中有jdbc驱动,需要把pg原有的jdbc驱动替换成mppdb的驱动,命名仍按照pg原有的命名。
选择PostgreSQL连接,填写connection name、host name、database name、port number、username、password等,点击OK。
二、 使用Kettle加载数据至DWS
1) 从源端库导出成文件到OBS或本地服务器,然后用bulk load导入方式(OBS数据加载、本地数据加载),通过DWS创建外表,在转换中添加执行SQL脚本,在执行SQL语句中执行bulk load语句。(推荐,传输速率最快可至集群每个计算节点100MB/s以上)
2) 通过自定义Java代码,走Copy的方式,流式数据入库(时效性高,入库效率百万条/s)
示例代码:
import org.postgresql.copy.CopyManager;
import org.postgresql.core.BaseConnection;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.Properties;
import java.io.StringReader;
private Connection con;
private StringBuffer tuples;
private int count;
private String sql = "copy dwi_alarm_info(alarm_definition,alarm_level,ext_similarity,last_update_date,occurring_time,source_channel,source_device,space,status,alarm_number) from STDIN WITH(format 'CSV', QUOTE '|')";
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
Object[] r = getRow();
if (r == null) {
if (count > 0) {
try {
//logError("2***2019***" + String.valueOf(count));
//logError("3----------------------------");
//logError(tuples.toString());
//logError("4----------------------------");
CopyManager cm = new CopyManager((BaseConnection) con);
StringReader sr = new StringReader(tuples.toString());
long starttime = System.currentTimeMillis();
long rows = cm.copyIn(sql, sr);
con.commit();
long endtime = System.currentTimeMillis();
long costs = endtime - starttime;
//logError(String.valueOf(rows) + " rows, costs " + String.valueOf(costs) + " ms");
count = 0;
tuples.delete(0, tuples.length());
} catch (Exception se) {
//logError("5----------------------------" + se.getMessage());
se.printStackTrace();
return false;
}
}
setOutputDone();
return false;
}
if (first) {
try {
Class.forName("org.postgresql.Driver");
} catch (ClassNotFoundException e) {
// logError("1----------------------------" + e.getMessage());
System.out.println("can not find Driver");
e.printStackTrace();
return false;
}
first = false;
count = 0;
tuples = new StringBuffer();
}
String alarm_definition = get(Fields.In, "alarm_definition").getString(r);
String alarm_level = get(Fields.In, "alarm_level").getString(r);
String ext_blacklistpicurl = get(Fields.In, "ext_blacklistpicurl").getString(r);
String ext_similarity = get(Fields.In, "ext_similarity").getString(r);
//String ext_suspectid = get(Fields.In, "ext_suspectid").getString(r);
String last_update_date = get(Fields.In, "last_update_date").getString(r);
String occurring_time = get(Fields.In, "occurring_time").getString(r);
String source_channel = get(Fields.In, "source_channel").getString(r);
String source_device = get(Fields.In, "source_device").getString(r);
String space = get(Fields.In, "space").getString(r);
String status = get(Fields.In, "status").getString(r);
String alarm_number = get(Fields.In, "alarm_number").getString(r);
tuples.append(alarm_definition+",").append(alarm_level+",").append(ext_similarity+",").append(last_update_date+",").append(occurring_time+",").append(source_channel+",").append(source_device+",").append(space+",").append(status+",").append(alarm_number).append("\n");
long incrementLinesOutput = incrementLinesOutput();
count++;
//logError("6***2019***" + tuples.toString());
//logError("7***2019***" + String.valueOf(count));
if (count >= 100) {
try {
//logError("8***2019***");
CopyManager cm = new CopyManager((BaseConnection) con);
StringReader sr = new StringReader(tuples.toString());
long starttime = System.currentTimeMillis();
long rows = cm.copyIn(sql, sr);
con.commit();
long endtime = System.currentTimeMillis();
long costs = endtime - starttime;
//logError(String.valueOf(rows) + " rows, costs " + String.valueOf(costs) + " ms");
count = 0;
tuples.delete(0, tuples.length());
} catch (Exception se) {
se.printStackTrace();
//logError("9******"+se.getMessage());
return false;
}
}
return true;
}
public boolean init(StepMetaInterface stepMetaInterface, StepDataInterface stepDataInterface) {
if (!parent.initImpl(stepMetaInterface, stepDataInterface)) {
return false;
}
String url = getParameter("URL");
String userName = getParameter("USERNAME");
String passWd = getParameter("PASSWORD");
try {
Properties props = new Properties();
props.setProperty("user", userName);
props.setProperty("password", passWd);
con = DriverManager.getConnection(url, props);
con.setAutoCommit(false);
} catch (Exception e) {
//logError("Connecting to database " + url + " failed.", e);
setErrors(1);
return false;
}
return true;
}
public void dispose(StepMetaInterface smi, StepDataInterface sdi) {
try {
if (con != null) {
con.close();
}
} catch (Exception e) {
//logError("close conn failed.", e);
setErrors(1);
return;
}
parent.disposeImpl(smi, sdi);
}
三、使用Kettle同步数据至DWS
3.1 首次做表数据全量同步(保存为test1.ktr)
说明:(增量同步的表的一列需是时间戳或是自增的id列,增量数据同步时基于时间或者是唯一自增的id列),首次数据同步是数据的一次全部同步,只执行一次,以后每次只运行增量数据同步。
在DWS中创建存储时间戳货者是自增ID列的最大值如下所示:
create table t_test_time(shijian date);
在转换中分别插入类似如下的表输入和输出:
Oracle表:指的是需要同步的oracle表
表输入和插入数据的最大时间:是将本次同步数据中的最大时间插入到DWS中的记录t_test_time中
表输入的配置如上图所示:
插入当前数据最大时间如上所示:
首次同步数据:是DWS中的表,接收oracle表同步过来的数据
完成后点击启动数据同步,待数据同步完成后,进行数据增量同步
3.2 数据增量同步,如下表输入和表输出(保存为test.ktr)
增量数据同步,原理过滤上次同步后新增的数据(通过时间戳的方式或者是ID自增的方式进行数据过滤,然后进行增量数据的同步),具体增量结构图和配置如下所示:
取上次数据同步时,最大的时间过滤已经同步的数据(配置如下)
需要增量同步的oracle表配置
同步到DWS中表的配置如下:
取本次同步数据中的最大时间或者最大ID方便下一次做数据的增量同步
记录本次增量同步的时间,以便下一次做增量数据同步
启动数据增量同步
四、执行Kettle数据抽取作业调度
创建kettle数据抽取作业调度(参考数据迁移指导书)
点击通用添加START、转换和成功三个选项control S保存为test
转换任务取增量保存转换工作,如下所示:
4.1 配置为重复执行,一次作业完成后接着继续下一次数据增量同步任务
点击START对作业调度进行配置
4.2 配置为定时执行,如下(每小时执行一次增量数据的同步)
参考链接:
https://bbs.huaweicloud.com/forum/thread-26690-1-1.html
http://3ms.huawei.com/km/blogs/details/5673795
- 点赞
- 收藏
- 关注作者
评论(0)