GaussDB(DWS)数据融合系列第十一期:第三方调度工具Kettle介绍

举报
chinawjb 发表于 2021/03/29 19:31:03 2021/03/29
【摘要】 Kettle是一款国外开元的ETL工具,可以完成数据的抽取、转换、装入和加载。

概述

kettle是什么?

ETL是什么

Kettle是一款国外开元的ETL工具,可以完成数据的抽取、转换、装入和加载。

ETLExtract-Transform-Load),即数据抽取、转换、装载的过程。它是一种思想,主要是说,从不同的数据源获取数据,并通过对数据进行处理(格式,协议等转换),最后将处理后的数据提供给其他系统使用。当然这个过程,就是软件研发,尤其是后端研发最核心的工作。

ETL过程

目前,ETL工具的典型代表有:

商业软件:InformaticaIBM DatastageOracle ODIMicrosoft SSIS…

开源软件:KettleTalendCloverETLKetlOctopus …

kettle概念

kettle,翻译为中文叫做水壶,显而易见,水壶不管壶里面装的是什么液体,最终都会从壶嘴平滑的流出来。就好比,不管水壶装的是什么类型的数据,最后都会通过壶嘴以特定的格式流出来。其实就是对ETL思想的一种实现,它是通过java语言编写,秉承ETL思想的工具。既然是此采用java实现的,那就肯定具有跨平台的特性。kettle官网地址

初始kettle​

kettle组成

kettle既然是一种工具,要处理不同数据源的异构数据,那就要求它能够进行图形化操作,在UI界面上对数据源进行数据治理,最终所有的图形化处理都要能够保存为kettle可识别的文件。

kettle主要生成两种类型的文件,一种转换文件,一种是任务文件,即:transformationjobtransformation完成针对数据的基础转换,job则完成整个工作流的控制。

这两种文件之间是可以相互调用的,已达到最终的数据清洗目的。

 kettle开发工具

 kettle转换

 kettle任务

 

环境安装

1jdk1.8jdk的版本很重要!!!);

27.1.0.0kettle安装包;

3mysql driver驱动放在安装包的lib目录下(用于连接mysql数据库);

4、下载后点击spoon.bat即可运行kettle

优缺点

优点:

1  页面简洁,操作简单,技术门槛较低,不需要创建dataset和写sql语句;

2  支持多种数据源的组合读取与写入,包括数据库的连接、文件的读取等;

3  功能齐全,可处理的应用场景宽泛。

4  调试功能强大,可直接预览中间结果查看报错等;

缺点:

1  入门成本较高,组件数目较多,需要提前了解各个组件的作用才能配合应用;

2  环境安装成本较高,包括包的大小,环境的准备等;

3  不支持sql语句,细节较多;

kettle使用场景(demo

  • 场景一:获取rest接口数据并保存为文本文件

rest接口作为数据源,将物联网数据上传到kettle文件中,供kettle进行转换处理,并将结果保存到txt文件中;从下图可以看出,有一个节点叫做获取参数,这个节点就是用来接收rest接口传递来的数据。还有一个节点叫做响应结果,这个节点是用来将遥感数据保存到txt文件中。

 处理rest接口数据

  • 场景二:任务调度(周期性执行场景一)

如果想要周期性的执行转换,就需要采用任务调度的方式进行处理。针对场景一,对其进行任务调度。从下图可以看出,有一个节点叫做转换,转换节点配置的就是场景一的转换文件;还有一个节点叫做START,就是用来启动任务调度过程的。

 任务调度过程

 

通过Kettle高效导入数据至DWS

准备工作:在使用Kettle对接DWS前,请先安装号Kettle,并确保网络和DWS可以连通。

一、  配置KettleDWS数据库连接

可选:用户可以使用PG原生驱动,或使用DWS提供的驱动替换PG驱动。在Kettle\data-integration\libext\JDBC目录中有jdbc驱动,需要把pg原有的jdbc驱动替换成mppdb的驱动,命名仍按照pg原有的命名。
选择PostgreSQL连接,填写connection namehost namedatabase nameport numberusernamepassword等,点击OK

二、 使用Kettle加载数据至DWS

1) 从源端库导出成文件到OBS或本地服务器,然后用bulk load导入方式(OBS数据加载、本地数据加载),通过DWS创建外表,在转换中添加执行SQL脚本,在执行SQL语句中执行bulk load语句。(推荐,传输速率最快可至集群每个计算节点100MB/s以上)

2) 通过自定义Java代码,走Copy的方式,流式数据入库(时效性高,入库效率百万条/s


示例代码:

import org.postgresql.copy.CopyManager;

import org.postgresql.core.BaseConnection;

import java.sql.Connection;

import java.sql.DriverManager;

import java.util.Properties;

import java.io.StringReader;

 

private Connection con;

 

private StringBuffer tuples;

 

private int count;

 

 

private String sql = "copy dwi_alarm_info(alarm_definition,alarm_level,ext_similarity,last_update_date,occurring_time,source_channel,source_device,space,status,alarm_number) from STDIN WITH(format 'CSV', QUOTE '|')";

 

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {

 

Object[] r = getRow();

 

if (r == null) {

if (count > 0) {

try {

//logError("2***2019***" + String.valueOf(count));

//logError("3----------------------------");

//logError(tuples.toString());

//logError("4----------------------------");

CopyManager cm = new CopyManager((BaseConnection) con);

 

StringReader sr = new StringReader(tuples.toString());

long starttime = System.currentTimeMillis();

long rows = cm.copyIn(sql, sr);

con.commit();

long endtime = System.currentTimeMillis();

long costs = endtime - starttime;

//logError(String.valueOf(rows) + " rows, costs " + String.valueOf(costs) + " ms");

count = 0;

tuples.delete(0, tuples.length());

} catch (Exception se) {

//logError("5----------------------------" + se.getMessage());

se.printStackTrace();

return false;

}

}

setOutputDone();

return false;

}

 

if (first) {

try {

Class.forName("org.postgresql.Driver");

} catch (ClassNotFoundException e) {

// logError("1----------------------------" + e.getMessage());

System.out.println("can not find Driver");

e.printStackTrace();

return false;

}

first = false;

count = 0;

tuples = new StringBuffer();

 

}

 

String alarm_definition = get(Fields.In, "alarm_definition").getString(r);

String alarm_level = get(Fields.In, "alarm_level").getString(r);

 

 

String ext_blacklistpicurl = get(Fields.In, "ext_blacklistpicurl").getString(r);

String ext_similarity = get(Fields.In, "ext_similarity").getString(r);

//String ext_suspectid = get(Fields.In, "ext_suspectid").getString(r);

String last_update_date = get(Fields.In, "last_update_date").getString(r);

String occurring_time = get(Fields.In, "occurring_time").getString(r);

String source_channel = get(Fields.In, "source_channel").getString(r);

String source_device = get(Fields.In, "source_device").getString(r);

String space = get(Fields.In, "space").getString(r);

String status = get(Fields.In, "status").getString(r);

String alarm_number = get(Fields.In, "alarm_number").getString(r);

 

tuples.append(alarm_definition+",").append(alarm_level+",").append(ext_similarity+",").append(last_update_date+",").append(occurring_time+",").append(source_channel+",").append(source_device+",").append(space+",").append(status+",").append(alarm_number).append("\n");

 

long incrementLinesOutput = incrementLinesOutput();

 

count++;

//logError("6***2019***" + tuples.toString());

//logError("7***2019***" + String.valueOf(count));

if (count >= 100) {

try {

//logError("8***2019***");

CopyManager cm = new CopyManager((BaseConnection) con);

 

StringReader sr = new StringReader(tuples.toString());

long starttime = System.currentTimeMillis();

long rows = cm.copyIn(sql, sr);

con.commit();

long endtime = System.currentTimeMillis();

long costs = endtime - starttime;

//logError(String.valueOf(rows) + " rows, costs " + String.valueOf(costs) + " ms");

count = 0;

tuples.delete(0, tuples.length());

} catch (Exception se) {

se.printStackTrace();

//logError("9******"+se.getMessage());

return false;

}

}

return true;

}

 

public boolean init(StepMetaInterface stepMetaInterface, StepDataInterface stepDataInterface) {

if (!parent.initImpl(stepMetaInterface, stepDataInterface)) {

return false;

}

String url = getParameter("URL");

String userName = getParameter("USERNAME");

String passWd = getParameter("PASSWORD");

 

try {

Properties props = new Properties();

props.setProperty("user", userName);

props.setProperty("password", passWd);

con = DriverManager.getConnection(url, props);

con.setAutoCommit(false);

} catch (Exception e) {

//logError("Connecting to database " + url + " failed.", e);

setErrors(1);

return false;

}

 

return true;

}

 

public void dispose(StepMetaInterface smi, StepDataInterface sdi) {

 

try {

if (con != null) {

con.close();

}

} catch (Exception e) {

//logError("close conn failed.", e);

setErrors(1);

return;

}

 

parent.disposeImpl(smi, sdi);

}

 

三、使用Kettle同步数据至DWS

3.1 首次做表数据全量同步(保存为test1.ktr

说明:(增量同步的表的一列需是时间戳或是自增的id列,增量数据同步时基于时间或者是唯一自增的id列),首次数据同步是数据的一次全部同步,只执行一次,以后每次只运行增量数据同步。

DWS中创建存储时间戳货者是自增ID列的最大值如下所示:

create table t_test_time(shijian date);

在转换中分别插入类似如下的表输入和输出:


Oracle表:指的是需要同步的oracle


表输入和插入数据的最大时间:是将本次同步数据中的最大时间插入到DWS中的记录t_test_time


表输入的配置如上图所示:


插入当前数据最大时间如上所示:

首次同步数据:是DWS中的表,接收oracle表同步过来的数据


完成后点击启动数据同步,待数据同步完成后,进行数据增量同步

3.2 数据增量同步,如下表输入和表输出(保存为test.ktr

增量数据同步,原理过滤上次同步后新增的数据(通过时间戳的方式或者是ID自增的方式进行数据过滤,然后进行增量数据的同步),具体增量结构图和配置如下所示:


取上次数据同步时,最大的时间过滤已经同步的数据(配置如下)


需要增量同步的oracle表配置


同步到DWS中表的配置如下:


取本次同步数据中的最大时间或者最大ID方便下一次做数据的增量同步


记录本次增量同步的时间,以便下一次做增量数据同步


启动数据增量同步

四、执行Kettle数据抽取作业调度

创建kettle数据抽取作业调度(参考数据迁移指导书)

点击通用添加START、转换和成功三个选项control S保存为test


转换任务取增量保存转换工作,如下所示:


4.1 配置为重复执行,一次作业完成后接着继续下一次数据增量同步任务

点击START对作业调度进行配置


4.2 配置为定时执行,如下(每小时执行一次增量数据的同步)

 

参考链接:

https://bbs.huaweicloud.com/forum/thread-26690-1-1.html

http://3ms.huawei.com/km/blogs/details/5673795

 

GaussDB(DWS)博文后缀.png

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。