张小白FlinkX踩坑记(一)

举报
张辉 发表于 2022/02/21 12:33:11 2022/02/21
【摘要】 使用Flinkx实现MySQL数据库表迁移。

FlinkX跟DataX的功能类似,它是由袋鼠云开发的一套开源产品,代码仓地址为:https://github.com/DTStack/flinkx

浪子闲话少说,我们来实战吧!

在CentOS7的系统内,建一个flinkx的用户,然后git clone这个代码仓:

git clone https://github.com/DTStack/flinkx

根据 https://github.com/DTStack/flinkx/blob/master/docs/quickstart.md ,我们开始编译:

mvn clean package -DskipTests 

到github.com的连接时会等好久:

等了好一会儿(大约几个小时吧。。。)

突然告知,oracle驱动有问题。

好在前面看了readme,需要先在外面执行下驱动的安装:

耐心等待驱动安装完毕:

再切换回flinkx的目录继续编译:

mvn  package -DskipTests 

此处我们去掉clean参数,希望不删除已有的东西,大概是希望编译速度能快一点吧。。。

又等了好一会儿:

在编译hbase相关内容的时候,报github连不上的错。

众所周知,这种情况,只有重试。。。

mvn  package -DskipTests 

这是一个非常漫长的等待过程,张小白这个时候做了好多其他的事情。。。

。。。

终于接近了尾声。。

当然了,大家可能会以为编译只花了18分钟,其实张小白在编译过程中看到卡住了,然后CTRL-C中断,再mvn  package -DskipTests 继续编译。

直到最后一次编译花了 将近16分钟。

我们来准备一个json文件:myflink.json

{
    "job": {
      "content": [
        {
          "reader": {
            "name": "mysqlreader",
            "parameter": {
              "column": [
                {
                  "name": "user_id",
                  "type": "string"
                },
                {
                  "name": "user_name",
                  "type": "string"
                }
              ],
              "username": "root",
              "password": "******",
              "connection": [
                {
                  "jdbcUrl": [
                    "jdbc:mysql://xxx.xxx.xxx.xxx:3306/zhanghui?useSSL=false"
                  ],
                  "table": [
                    "my_user"
                  ]
                }
              ]
            }
          },
         "writer": {
            "name": "mysqlwriter",
            "parameter": {
              "username": "root",
              "password": "******",
              "connection": [
                {
                  "jdbcUrl": "jdbc:mysql://xxx.xxx.xxx.xxx:3306/zhanghui?useSSL=false",
                  "table": [
                    "my_user_flinkx"
                  ]
                }
              ],
              "writeMode": "insert",
              "column": [
                {
                  "name": "user_id",
                  "type": "string"
                },
                {
                  "name": "user_name",
                  "type": "string"
                }
              ]
            }
          }
        }
      ],
      "setting": {
        "speed": {
          "channel": 1,
          "bytes": 0
        }
      }
    }
  }


这个json文件跟DataX的配置文件极其相似。

具体可以在 https://github.com/DTStack/flinkx/blob/master/flinkx-examples/json/mysql/mysql_mysql_realtime.json 看到。

使用local模式运行一下试试:

bin/flinkx -mode local -jobType sync -job myflinkx.json -flinkxDistDir flinkx-dist

日志在nohup.out中:

报目的表 my_user_flinkx不存在。

原来flinkx跟datax的差异在于,如果目的表不存在,datax会自动建表,而flinkx会直接报错。

于是,使用mysql的SQL命令行建立my_user_flinkx表:

CREATE TABLE
    my_user_flink
    (
        user_id VARCHAR(250) COLLATE utf8mb4_bin NOT NULL COMMENT '用户ID',
        user_name VARCHAR(250) COLLATE utf8mb4_bin NOT NULL COMMENT '用户姓名',
    
        PRIMARY KEY (user_id),
        CONSTRAINT pf_user_name UNIQUE (user_name)
       
    )
    ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='目的用户表';

执行这段SQL:

表已建好。

然后再执行flinkx:

查看nohup.out:

看起来是写入成功了。

查看目的表my_user_flinkx看看:

可见目的表已有该记录。

这说明flinkx数据同步执行成功了。

(未完待续)

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。