hive数据迁移到clickhouse—使用Waterdrop工具
hive数据迁移到clickhouse—使用Waterdrop工具
基于客户的需求,测试Waterdrop从云EI的hive-spark集群上迁移数据到clickhouse集群
Waterdrop
Waterdrop详细介绍和使用请参考InterestingLab/waterdrop
从官网上下载Waterdrop:spark >= 2.3 ,因此下载waterdrop-1.5.1.zip
解压到集群随意目录上:/root/waterdrop-1.5.1
hive-spark集群
从MRS服务上构建Hadoop分析集群(Waterdrop依赖hive和Spark),建集群时要自定义购买,设置关闭k8s验证,否则beeline登录有问题。
从公网ip登录集群后台,在hive上建测试表,并插入一条测试数据(当前暂不考虑数据迁移的性能,只测试可用性):
[root@node-master2HnlP waterdrop-1.5.1]# beeline
0: jdbc:hive2://192.168.8.58:2181,192.168.8.1> CREATE TABLE `nginx_msg_detail`( `hostname` string, `domain` string, `remote_addr` string, `request_time` float, `datetime` string, `url` string, `status` int, `data_size` int, `referer` string, `cookie_info` string, `user_agent` string, `minute` string) PARTITIONED BY ( `date` string, `hour` string) row format delimited fields terminated by ',';
0: jdbc:hive2://192.168.8.58:2181,192.168.8.1> insert into nginx_msg_detail values('ClickHousegpKs0001', 'one', '127.0.0.1:9000', 32.33, '2020-01-01 11:10:25','http://001', 1, 1024, 'no_way', 'UID', 'proxy','30s', '2020-3-23', '11');
clickhouse集群
集群和单机不影响本次测试,因网络原因,直接在hive的集群环境上部署了一个节点clickhouse进行测试,使用的是rpm方式,clickhouse建表语句如下:
-- 对应clickhouse建表
CREATE TABLE cms_msg
(
date Date,
datetime DateTime,
url String,
request_time Float32,
status Int32,
hostname String,
domain String,
remote_addr String,
data_size Int32
) ENGINE = MergeTree PARTITION BY date ORDER BY (date, hostname) SETTINGS index_granularity = 16384
batch.conf
Waterdrop的使用比较简单,只需要完成配置文件并执行sh脚本即可进行数据迁移,配置文件说明如下:
spark {
spark.app.name = "Waterdrop" // 无需修改,可任意取名
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
}
input {
hive {
pre_sql = "select * from nginx_msg_detail" // hive侧的查询语句,一般可以select *
table_name = "nginx_msg_detail"
}
}
filter {
remove {
source_field = ["referer", "cookie_info", "user_agent", "minute", "hour"] // 将不需要的字段进行过滤
}
}
output {
clickhouse {
host = "127.0.0.1:8123" // 配置clickhouse的监听ip和http端口
database = "default" // clickhouse上表所在的数据库
table = "cms_msg"
fields = ["date", "datetime", "url", "request_time", "status", "hostname", "domain", "remote_addr", "data_size"]
username = "default"
password = ""
}
}
数据迁移
将batch.conf放置到/root/waterdrop-1.5.1/config目录下,执行以下命令:
cd /root/waterdrop-1.5.1
./bin/start-waterdrop.sh --master local[2] --deploy-mode client --config ./config/batch.conf
迁移结果
[root@node-master2HnlP waterdrop-1.5.1]# clickhouse-client
ClickHouse client version 20.8.7.15 (official build).
Connecting to localhost:9000 as user default.
Connected to ClickHouse server version 20.8.7 revision 54438.
node-master2HnlP :) select * from cms_msg
SELECT *
FROM cms_msg
┌───────date─┬────────────datetime─┬─url────────┬─request_time─┬─status─┬─hostname───────────┬─domain─┬─remote_addr────┬─data_size─┐
│ 2020-03-23 │ 2020-01-01 11:10:25 │ http://001 │ 32.33 │ 1 │ ClickHousegpKs0001 │ one │ 127.0.0.1:9000 │ 1024 │
└────────────┴─────────────────────┴────────────┴──────────────┴────────┴────────────────────┴────────┴────────────────┴───────────┘
1 rows in set. Elapsed: 0.009 sec.
waterdrop性能测试
生成hive测试数据
#!/bin/bash
# 生成1百万条数据,文件大小140M
for (( i=1; i<= 1000000; i++ )); do
hostname="ClickHousegpKs000"$i
domain="domain"$i
remote_addr="127.0.0.$i:9000"
request_time="32.$i"
datetime="2020-01-24 11:10:25"
url="http://001$i"
status="1"
data_size="$i"
referer="no_way"
cookie_info="UID"
user_agent="proxy"
minute="30s"
date="2020-3-23" # date 和hour是分区值,hive里默认设置了100个分区的限制,不能设置分区值太多
hour="24"
echo "$hostname,$domain,$remote_addr,$request_time,$datetime,$url,$status,$data_size,$referer,$cookie_info,$user_agent,$minute,$date,$hour" >> hive_data.txt
done
# 导入hdfs, 如果不导入hdfs,直接用LOAD DATA LOCAL INPATH在hive中加载数据,一直报路径找不到
hdfs dfs -mkdir /input
hdfs dfs -put hive_data.txt /input
hdfs dfs -ls /input
在hive中beeline登录后加载数据
LOAD DATA INPATH '/input/hive_data.txt' OVERWRITE INTO TABLE nginx_msg_detail;
这时候加载完文件后查询表可能会出现以下一种情况:
INFO : OK
INFO : Concurrency mode is disabled, not creating a lock manager
+----------------------------------------------------+--------------------------+-------------------------------+---------------
| nginx_msg_detail.hostname | nginx_msg_detail.domain | nginx_msg_detail.remote_addr | nginx_msg_detail.request_time | nginx_msg_detail.datetime | nginx_msg_detail.url | nginx_msg_detail.status | nginx_msg_detail.data_size | nginx_msg_detail.referer | nginx_msg_detail.cookie_info | nginx_msg_detail.user_agent | nginx_msg_detail.minute | nginx_msg_detail.date | nginx_msg_detail.hour |
+----------------------------------------------------+--------------------------+-------------------------------+---------------
| ClickHousegpKs0002 one 127.0.0.1:9000 32.33 '2020-01-01 11:10:25' http://001 1 1024 no_way UID proxy 30s 2020-3-23 11 | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | __HIVE_DEFAULT_PARTITION__ | __HIVE_DEFAULT_PARTITION__ |
2 rows selected (0.742 seconds)
插入列为NULL,这是由于txt文件中分隔符和hive建表语句中的分割符不一致导致的,在建表结尾加入**row format delimited fields terminated by ','
**即可。
执行waterdrop脚本进行数据迁移
[root@node-master2HnlP waterdrop-1.5.1]# date && ./bin/start-waterdrop.sh --master local[2] --deploy-mode client --config ./config/batch.conf && date
Wed Mar 24 16:16:16 CST 2021
Wed Mar 24 16:16:47 CST 2021
1百万条数据迁移用时30秒左右,33000r/s.
hive集群的鉴权问题
如果在建hive集群的时候开启Kerberos认证,可能在登录到集群后台执行beeline或者hdfs等命令时可能报鉴权失败的错误,这是集群做了安全控制导致的。处理步骤可以按照以下链接进行操作:开启认证后如何访问Hive 配置Hive表权限 配置Hive角色,此处只简要说明一下我的处理流程。
创建hive角色(在mrs的manager管控界面上操作)
测试中将角色的hive、HDFS、Yarn都全部给予
创建用户
登录到集群后台,执行kinit chenfangye并输入密码鉴权后可以正常执行beeline和hdfs命令:
[root@node-master1ylnR ~]# kinit chenfangye
Password for chenfangye@AC01A26E_D00B_405E_BFDF_E53A22D82AAD.COM:
[root@node-master1ylnR ~]# beeline
Connecting to jdbc:hive2://192.168.8.30:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2;sasl.qop=auth-conf;auth=KERBEROS;principal=hive/hadoop.ac01a26e_d00b_405e_bfdf_e53a22d82aad.com@AC01A26E_D00B_405E_BFDF_E53A22D82AAD.COM
log4j:ERROR A "org.apache.hadoop.hive.ql.log.HiveEventCounter" object is not assignable to a "org.apache.log4j.Appender" variable.
log4j:ERROR The class "org.apache.log4j.Appender" was loaded by
log4j:ERROR [sun.misc.Launcher$AppClassLoader@511d50c0] whereas object of type
log4j:ERROR "org.apache.hadoop.hive.ql.log.HiveEventCounter" was loaded by [sun.misc.Launcher$AppClassLoader@511d50c0].
log4j:ERROR Could not instantiate appender named "EventCounter".
Debug is true storeKey false useTicketCache true useKeyTab false doNotPrompt false ticketCache is null isInitiator true KeyTab is null refreshKrb5Config is false principal is null tryFirstPass is false useFirstPass is false storePass is false clearPass is false
Acquire TGT from Cache
Principal is chenfangye@AC01A26E_D00B_405E_BFDF_E53A22D82AAD.COM
Commit Succeeded # 登录成功
Connected to: Apache Hive (version 3.1.0-mrs-2.1)
Driver: Hive JDBC (version 3.1.0-mrs-2.1)
Transaction isolation: TRANSACTION_REPEATABLE_READ
Beeline version 3.1.0-mrs-2.1 by Apache Hive
0: jdbc:hive2://192.168.8.30:2181/>
但是hdfs没有在根目录下建文件夹的权限,可以把上述的/input路径变成/tmp/hive/input
hdfs dfs -mkdir /tmp/hive/input
hdfs dfs -put hive_data.txt /tmp/hive/input
LOAD DATA INPATH ' /tmp/hive/input/hive_data.txt' OVERWRITE INTO TABLE nginx_msg_detail;
# 报错如下
Error: Error while compiling statement: FAILED: HiveAuthzPluginException Error getting object from metastore for Object [type=TABLE_OR_VIEW, name=default.nginx_msg_detail__temp_table_for_load_data__] (state=42000,code=40000)
# 在2.1.1版本中会出现该问题,在最新版本中已经修复,本人在新版本MRS 3.1.0.T4中已经没有发现
# 如果在2.1.1中可以执行load data inpath 'hdfs://hacluster/tmp/hive_data.txt' into table nginx_msg_detail partition (`date`='2020-3-23',`hour`='24');来解决该问题,要增加partition子句
# 参考:https://issues.apache.org/jira/browse/HIVE-10625
# 在新版本中已经默认了hive.default.fileformat=RCFile,因此如果前期建表语句中没有指定表的存储类型,则可能报以下报错
0: jdbc:hive2://192.168.8.140:10000/> LOAD DATA INPATH 'hdfs://hacluster/tmp/hive_data.txt' OVERWRITE INTO TABLE nginx_msg_detail;
INFO : State: Compiling.
ERROR : Ended Job = job_1616672391711_0006 with errors
ERROR : FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask
INFO : MapReduce Jobs Launched:
INFO : Stage-Stage-1: Map: 2 HDFS Read: 0 HDFS Write: 0 FAIL
INFO : Total MapReduce CPU Time Spent: 0 msec
INFO : Completed executing command(queryId=omm_20210325203318_e2295e46-a1ea-4c33-bf7b-4a6b4d96dea7); Time taken: 32.369 seconds
INFO : Concurrency mode is disabled, not creating a lock manager
Error: Error while processing statement: FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask (state=08S01,code=2)
# 查看hive日志 vi /var/log/Bigdata/hive/hiveserver/hive.log
Error: java.io.IOException: java.lang.reflect.InvocationTargetException
at org.apache.hadoop.hive.io.HiveIOExceptionHandlerChain.handleRecordReaderCreationException(HiveIOExceptionHandlerChain.java:97)
at org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil.handleRecordReaderCreationException(HiveIOExceptionHandlerUtil.java:57)
at org.apache.hadoop.hive.shims.HadoopShimsSecure$CombineFileRecordReader.initNextRecordReader(HadoopShimsSecure.java:271)
at org.apache.hadoop.hive.shims.HadoopShimsSecure$CombineFileRecordReader.<init>(HadoopShimsSecure.java:217)
at org.apache.hadoop.hive.shims.HadoopShimsSecure$CombineFileInputFormatShim.getRecordReader(HadoopShimsSecure.java:345)
at org.apache.hadoop.hive.ql.io.CombineHiveInputFormat.getRecordReader(CombineHiveInputFormat.java:886)
at org.apache.hadoop.mapred.MapTask$TrackedRecordReader.<init>(MapTask.java:175)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:444)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:349)
at org.apache.hadoop.mapred.YarnChild$1.run(YarnChild.java:183)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1761)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:177)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.hadoop.hive.shims.HadoopShimsSecure$CombineFileRecordReader.initNextRecordReader(HadoopShimsSecure.java:257)
... 11 more
Caused by: java.io.IOException: hdfs://hacluster/tmp/hive_data.txt not a RCFile and has magic of Cli
at org.apache.hadoop.hive.ql.io.RCFile$Reader.init(RCFile.java:1507)
at org.apache.hadoop.hive.ql.io.RCFile$Reader.<init>(RCFile.java:1397)
at org.apache.hadoop.hive.ql.io.RCFile$Reader.<init>(RCFile.java:1377)
at org.apache.hadoop.hive.ql.io.RCFileRecordReader.<init>(RCFileRecordReader.java:100)
at org.apache.hadoop.hive.ql.io.RCFileInputFormat.getRecordReader(RCFileInputFormat.java:58)
at org.apache.hadoop.hive.ql.io.CombineHiveRecordReader.<init>(CombineHiveRecordReader.java:102)
... 16 more
| org.apache.hadoop.hive.ql.session.SessionState$LogHelper.printError(SessionState.java:1345)
在hive.log日志中可以发现“not a RCFile and has magic of Cli”,在beeline客户端查看hive的默认存储格式,因此该问题是由于hive的默认存储格式为RCFile,那么表的格式也是RCFile,导入使用的txt与之有冲突。
0: jdbc:hive2://192.168.8.140:10000/> set hive.default.fileformat;
+---------------------------------+
| set |
+---------------------------------+
| hive.default.fileformat=RCFile |
+---------------------------------+
解决方法有以下三种:
- 删除重建表,在建表语句末尾指定存储格式
CREATE TABLE `nginx_msg_detail`( `hostname` string, `domain` string, `remote_addr` string, `request_time` float, `datetime` string, `url` string, `status` int, `data_size` int, `referer` string, `cookie_info` string, `user_agent` string, `minute` string) PARTITIONED BY ( `date` string, `hour` string) row format delimited fields terminated by ',' STORED AS TEXTFILE;
- 在manager界面上修改hive.default.fileformat,修改后重启相关实例,但是也需要重新建表,否则还是会出错
- 或者使用RCFile格式文件加载
- 点赞
- 收藏
- 关注作者
评论(0)