张小白FlinkX踩坑记(一)
FlinkX跟DataX的功能类似,它是由袋鼠云开发的一套开源产品,代码仓地址为:https://github.com/DTStack/flinkx
浪子闲话少说,我们来实战吧!
在CentOS7的系统内,建一个flinkx的用户,然后git clone这个代码仓:
git clone https://github.com/DTStack/flinkx
根据 https://github.com/DTStack/flinkx/blob/master/docs/quickstart.md ,我们开始编译:
mvn clean package -DskipTests
到github.com的连接时会等好久:
等了好一会儿(大约几个小时吧。。。)
突然告知,oracle驱动有问题。
好在前面看了readme,需要先在外面执行下驱动的安装:
耐心等待驱动安装完毕:
再切换回flinkx的目录继续编译:
mvn package -DskipTests
此处我们去掉clean参数,希望不删除已有的东西,大概是希望编译速度能快一点吧。。。
又等了好一会儿:
在编译hbase相关内容的时候,报github连不上的错。
众所周知,这种情况,只有重试。。。
mvn package -DskipTests
这是一个非常漫长的等待过程,张小白这个时候做了好多其他的事情。。。
。。。
终于接近了尾声。。
当然了,大家可能会以为编译只花了18分钟,其实张小白在编译过程中看到卡住了,然后CTRL-C中断,再mvn package -DskipTests 继续编译。
直到最后一次编译花了 将近16分钟。
我们来准备一个json文件:myflink.json
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
{
"name": "user_id",
"type": "string"
},
{
"name": "user_name",
"type": "string"
}
],
"username": "root",
"password": "******",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://xxx.xxx.xxx.xxx:3306/zhanghui?useSSL=false"
],
"table": [
"my_user"
]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"username": "root",
"password": "******",
"connection": [
{
"jdbcUrl": "jdbc:mysql://xxx.xxx.xxx.xxx:3306/zhanghui?useSSL=false",
"table": [
"my_user_flinkx"
]
}
],
"writeMode": "insert",
"column": [
{
"name": "user_id",
"type": "string"
},
{
"name": "user_name",
"type": "string"
}
]
}
}
}
],
"setting": {
"speed": {
"channel": 1,
"bytes": 0
}
}
}
}
这个json文件跟DataX的配置文件极其相似。
具体可以在 https://github.com/DTStack/flinkx/blob/master/flinkx-examples/json/mysql/mysql_mysql_realtime.json 看到。
使用local模式运行一下试试:
bin/flinkx -mode local -jobType sync -job myflinkx.json -flinkxDistDir flinkx-dist
日志在nohup.out中:
报目的表 my_user_flinkx不存在。
原来flinkx跟datax的差异在于,如果目的表不存在,datax会自动建表,而flinkx会直接报错。
于是,使用mysql的SQL命令行建立my_user_flinkx表:
CREATE TABLE
my_user_flink
(
user_id VARCHAR(250) COLLATE utf8mb4_bin NOT NULL COMMENT '用户ID',
user_name VARCHAR(250) COLLATE utf8mb4_bin NOT NULL COMMENT '用户姓名',
PRIMARY KEY (user_id),
CONSTRAINT pf_user_name UNIQUE (user_name)
)
ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='目的用户表';
执行这段SQL:
表已建好。
然后再执行flinkx:
查看nohup.out:
看起来是写入成功了。
查看目的表my_user_flinkx看看:
可见目的表已有该记录。
这说明flinkx数据同步执行成功了。
(未完待续)
- 点赞
- 收藏
- 关注作者
评论(0)