CloudTable Doris教程:玩转Doris数据导入 (一)
今天和大家聊聊CloudTable Doris 导入外部数据的问题。如何将外部文件导入Doris?如何抉择stream load还是broker load?关系型数据库Mysql能实时导入CloudTable Doris吗?如何联动常用的大数据组件Spark、Flink、Kafka…将数据导入Doris 等等,你将在本文中找到答案。
本文,就让我们先看下 CloudTable Doris中比较常用,也是官方推荐的数据导入方式:StreamLoad和BrokerLoad。
Part 1: Doris数据导入的常见场景
CloudTable Doris 提供多种数据导入的方案,大家可以根据自己的需求选择最合适的导入方案:
- Stream Load:Stream load 是一个同步的导入方式,用户通过发送 HTTP 协议 发送请求将本地文件或数据流导入到 Doris 中。Stream load 同步执行导入并返回导入结果。用户可直接通过请求的返回体判断本次导入是否成功。Stream load 主要适用于导入本地文件,或通过程序导入数据流中的数据。
- Broker Load:Broker load 是一个异步的导入方式,支持的数据源取决于 Broker 进程支持的数据源。用户需要通过 MySQL 协议 创建 Broker load 导入,并通过查看导入命令检查导入结果。
- spark connector 与 flink connector:Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据,也支持通过Spark写入数据到Doris。Flink Doris Connector 可以支持通过 Flink 操作(读取、插入、修改、删除) Doris 中存储的数据。
- flinkcdc 实时导入:一般在业务数据库中,会使用编号来作为表的主键,比如Student表,会使用编号(id)来作为主键,但是随着业务的发展,数据对应的编号有可能是会发生变化的。 在这种场景下,使用FlinkCDC + Doris Connector同步数据,便可以自动更新Doris主键列的数据。
- jdbc 插入数据:用户可以通过 MySQL 协议,使用 INSERT 语句进行数据导入。
等等。
Part2: Doris导入方式:Stream Load
Stream load 是一个同步的导入方式,用户通过发送 HTTP 协议发送请求将本地文件或数据流导入到 Doris 中。Stream load 同步执行导入并返回导入结果。用户可直接通过请求的返回体判断本次导入是否成功。
Stream load 主要适用于导入本地文件,或通过程序导入数据流中的数据。
(1)Stream Load的基本原理
以下是Stream Load的主要流程:
^ +
| |
| | 1A. User submit load to FE
| |
| +--v-----------+
| | FE |
5. Return result to user | +--+-----------+
| |
| | 2. Redirect to BE
| |
| +--v-----------+
+---+Coordinator BE| 1B. User submit load to BE
+-+-----+----+-+
| | |
+-----+ | +-----+
| | | 3. Distrbute data
| | |
+-v-+ +-v-+ +-v-+
|BE | |BE | |BE |
+---+ +---+ +---+
总结下来:
- 用户提交请求到FE节点;
- FE节点重定向到其中一个BE节点作为 “Coordinator BE节点”(可以理解为:中转站);
- 客户会提交LOAD到该Coordinator BE节点上;
- Coordinator BE节点会分配数据到其他BE节点上存储;
Stream load 中,Doris 会选定一个节点作为 Coordinator 节点。该节点负责接数据并分发数据到其他数据节点。用户通过 HTTP 协议提交导入命令。如果提交到 FE,则 FE 会通过 HTTP redirect 指令将请求转发给某一个 BE。用户也可以直接提交导入命令给某一指定 BE。导入的最终结果由 Coordinator BE 返回给用户。
(2)Stream Load的实战案例
目前 CloudTable Doris 的 Stream Load 支持数据格式:CSV(文本)、JSON、PARQUET 和 ORC格式。
- 使用 Stream Load 导入 csv 格式的数据:
create database test_stream_load;
use test_stream_load;
CREATE TABLE IF NOT EXISTS test_csv
(
`user_id` LARGEINT NOT NULL COMMENT "用户id",
`city` VARCHAR(20) COMMENT "用户所在城市",
`age` SMALLINT COMMENT "用户年龄",
`sex` TINYINT COMMENT "用户性别",
`cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费",
`max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间",
`min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时间"
)
AGGREGATE KEY(`user_id`, `city`, `age`, `sex`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 100
PROPERTIES (
"replication_allocation" = "tag.location.default: 3"
);
使用一台与doris节点同vpc的ecs 执行Stream Load语句,将test_data.csv导入test_csv表中:
如果不会导出这种数据的可以参照python的语法:df.to_csv("./test_data.csv", index=None, header=None, encoding='utf-8')
root@ecs-lizuxian-mysql8-notdelete:~/python_test# curl --location-trusted -u {user}:{password} -H "label:test_csv_20231214-002" -H "column_separator:," -T ./test_data.csv http://{fe_ip}:{http_port,default 8030}/api/test_stream_load/test_csv/_stream_load
{
"TxnId": 30194,
"Label": "test_csv_20231214-001",
"TwoPhaseCommit": "false",
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 999999,
"NumberLoadedRows": 999999,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 47281446,
"LoadTimeMs": 1333,
"BeginTxnTimeMs": 4,
"StreamLoadPutTimeMs": 9,
"ReadDataTimeMs": 386,
"WriteDataTimeMs": 1274,
"CommitAndPublishTimeMs": 39
}
在doris的mysql客户端查询:
mysql> select * from test_csv limit 10;
+---------+-----------------+------+------+---------+----------------+----------------+
| user_id | city | age | sex | cost | max_dwell_time | min_dwell_time |
+---------+-----------------+------+------+---------+----------------+----------------+
| 16 | 1yi3tl5MCFGArml | 59 | 0 | 317844 | 844343 | 154118 |
| 16 | 24EwBA3JUNINyNL | 15 | 1 | 1923780 | 142085 | 770301 |
| 16 | 25SMqfLbtB6qfFz | 64 | 1 | 439054 | 631792 | 823842 |
| 16 | 2yi6oxrHAFY87GG | 4 | 1 | 1723916 | 299258 | 234990 |
| 16 | 3AdVrmWSQc4avEC | 17 | 0 | 203914 | 182567 | 631297 |
| 16 | 4V4TyQae9lr6fJ7 | 11 | 0 | 931320 | 94632 | 205505 |
| 16 | 4dDtDAaHLyyxxQn | 91 | 1 | 289964 | 942567 | 572919 |
| 16 | 6OE1iN4p8oruGbC | 47 | 1 | 505150 | 743670 | 753172 |
| 16 | 6yt1armiAhc2Odz | 72 | 1 | 323502 | 715525 | 68762 |
| 16 | 7IuDVuhqkxmr0Gj | 60 | 1 | 130878 | 800067 | 186116 |
+---------+-----------------+------+------+---------+----------------+----------------+
10 rows in set (0.09 sec)
- 使用 Stream Load 导入 json格式的数据:
create database test_stream_load;
use test_stream_load;
CREATE TABLE IF NOT EXISTS test_json
(
`user_id` LARGEINT NOT NULL COMMENT "用户id",
`city` VARCHAR(20) COMMENT "用户所在城市",
`age` SMALLINT COMMENT "用户年龄",
`sex` TINYINT COMMENT "用户性别",
`cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费",
`max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间",
`min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时间"
)
AGGREGATE KEY(`user_id`, `city`, `age`, `sex`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 100
PROPERTIES (
"replication_allocation" = "tag.location.default: 3"
);
使用一台与doris节点同vpc的ecs 执行Stream Load语句,将test_data.json导入test_json表中:
如果不会导出这种数据的可以参照python的语法:df.to_json('./test_data.json', orient='records', lines=True)
root@ecs-lizuxian-mysql8-notdelete:~/python_test# curl --location-trusted -u {user}:{password} -H "label:test_json_20231214-001" -H "format:json" -H "fuzzy_parse" -H "exec_mem_limit: 5368709120" -H "read_json_by_line:true" -H "streaming_load_json_max_mb:5000" -T ./test_data.json http://{fe_ip}:{http_port,default 8030}/api/test_stream_load/test_json/_stream_load
{
"TxnId": 47620,
"Label": "test_json_20231214-001",
"TwoPhaseCommit": "false",
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 1000000,
"NumberLoadedRows": 1000000,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 123282164,
"LoadTimeMs": 9112,
"BeginTxnTimeMs": 6,
"StreamLoadPutTimeMs": 6,
"ReadDataTimeMs": 8391,
"WriteDataTimeMs": 9067,
"CommitAndPublishTimeMs": 26
}
在doris的mysql客户端查询:
mysql> select * from test_json limit 10;
+---------+-----------------+------+------+--------+----------------+----------------+
| user_id | city | age | sex | cost | max_dwell_time | min_dwell_time |
+---------+-----------------+------+------+--------+----------------+----------------+
| 33 | 1WUvNHUiBZmmklJ | 77 | 1 | 713862 | 409226 | 794179 |
| 33 | 1xzAJw2CSUS5qOE | 94 | 0 | 267592 | 915030 | 867483 |
| 33 | 2aQMjzsy6sufoEy | 78 | 1 | 934307 | 79007 | 128144 |
| 33 | 2t1fUldzVlylkn2 | 86 | 1 | 923127 | 155184 | 548695 |
| 33 | 352rMu9XIFELIB1 | 41 | 1 | 782404 | 551276 | 166169 |
| 33 | 3ULQsZfBhYqSoQ2 | 30 | 1 | 446071 | 101973 | 726908 |
| 33 | 3axPsLLYHnLBl94 | 50 | 1 | 59905 | 767343 | 872882 |
| 33 | 3f4QuUCu4w5Bdlt | 76 | 0 | 520908 | 192915 | 986585 |
| 33 | 3sUgvdcWgDlasET | 69 | 1 | 19857 | 372873 | 398638 |
| 33 | 4F0ljq1AtXaVxCI | 12 | 0 | 22095 | 94146 | 928596 |
+---------+-----------------+------+------+--------+----------------+----------------+
10 rows in set (0.38 sec)
- 使用 Stream Load 导入 PARQUET 格式的数据:
create database test_stream_load;
use test_stream_load;
CREATE TABLE IF NOT EXISTS test_parquet
(
`user_id` LARGEINT NOT NULL COMMENT "用户id",
`city` VARCHAR(20) COMMENT "用户所在城市",
`age` SMALLINT COMMENT "用户年龄",
`sex` TINYINT COMMENT "用户性别",
`cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费",
`max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间",
`min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时间"
)
AGGREGATE KEY(`user_id`, `city`, `age`, `sex`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 100
PROPERTIES (
"replication_allocation" = "tag.location.default: 3"
);
使用一台与doris节点同vpc的ecs 执行Stream Load语句,将test_data.parquet导入test_parquet表中:
如果不会导出这种数据的可以参照python的语法:df.to_parquet("./test_data.parquet", index=False)
root@ecs-lizuxian-mysql8-notdelete:~/python_test# curl --location-trusted -u {user}:{password} -H "label:test_parquet_20231214-001" -H "format:parquet" -T ./test_data.parquet http://{fe_ip}:{http_port,default 8030}/api/test_stream_load/test_parquet/_stream_load
{
"TxnId": 62100,
"Label": "test_parquet_20231214-001",
"TwoPhaseCommit": "false",
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 1000000,
"NumberLoadedRows": 1000000,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 33965271,
"LoadTimeMs": 1483,
"BeginTxnTimeMs": 6,
"StreamLoadPutTimeMs": 9,
"ReadDataTimeMs": 22,
"WriteDataTimeMs": 1400,
"CommitAndPublishTimeMs": 22
}
在doris的mysql客户端查询:
mysql> select * from test_parquet limit 10;
+---------+-----------------+------+------+--------+----------------+----------------+
| user_id | city | age | sex | cost | max_dwell_time | min_dwell_time |
+---------+-----------------+------+------+--------+----------------+----------------+
| 18 | 14ZqoJvsFdO2MS5 | 55 | 0 | 584461 | 109225 | 339355 |
| 18 | 1PDzT3RMvQXfV70 | 28 | 1 | 738864 | 437902 | 617665 |
| 18 | 24KrQ3YBtynUSQo | 73 | 1 | 692823 | 280239 | 354826 |
| 18 | 4xvuG8GFlvkC7h8 | 97 | 0 | 891892 | 581610 | 167029 |
| 18 | 57QYS5HgqAwxk1h | 69 | 1 | 263653 | 544356 | 457608 |
| 18 | 5lAa1RTRNpywmc3 | 99 | 1 | 143878 | 362780 | 575735 |
| 18 | 6LuVmpV7Ayi55MH | 40 | 0 | 151588 | 839104 | 874164 |
| 18 | 6xGoa125bcjL8ds | 10 | 0 | 504910 | 349479 | 762094 |
| 18 | 7l0nvNOgkRDQNfF | 80 | 0 | 23934 | 134383 | 994837 |
| 18 | 8bDEyz7bCb3ysWa | 50 | 1 | 412142 | 573170 | 91236 |
+---------+-----------------+------+------+--------+----------------+----------------+
10 rows in set (0.09 sec)
- 使用 Stream Load 导入 ORC 格式的数据:
create database test_stream_load;
use test_stream_load;
CREATE TABLE IF NOT EXISTS test_orc
(
`user_id` LARGEINT NOT NULL COMMENT "用户id",
`city` VARCHAR(20) COMMENT "用户所在城市",
`age` SMALLINT COMMENT "用户年龄",
`sex` TINYINT COMMENT "用户性别",
`cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费",
`max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间",
`min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时间"
)
AGGREGATE KEY(`user_id`, `city`, `age`, `sex`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 100
PROPERTIES (
"replication_allocation" = "tag.location.default: 3"
);
使用一台与doris节点同vpc的ecs 执行Stream Load语句,将test_data.orc导入test_orc表中:
如果不会导出这种数据的可以参照python的语法:
import pyarrow.orc as orc
import pyarrow as pa
import pandas as pd
# df 是一个pandas 数据框
table = pa.Table.from_pandas(df, preserve_index=False)
orc.write_table(table, './test_data.orc')
StreamLoad实例:
root@ecs-lizuxian-mysql8-notdelete:~/python_test# curl --location-trusted -u {user}:{password} -H "label:test_orc_20231214-001" -H "format:orc" -T ./test_data.orc http://{fe_ip}:{http_port,default 8030}/api/test_stream_load/test_orc/_stream_load
{
"TxnId": 63248,
"Label": "test_orc_20231214-001",
"TwoPhaseCommit": "false",
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 1000000,
"NumberLoadedRows": 1000000,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 30278802,
"LoadTimeMs": 1456,
"BeginTxnTimeMs": 0,
"StreamLoadPutTimeMs": 1,
"ReadDataTimeMs": 20,
"WriteDataTimeMs": 1390,
"CommitAndPublishTimeMs": 29
}
在doris的mysql客户端查询:
mysql> select * from test_orc limit 10;
+---------+-----------------+------+------+--------+----------------+----------------+
| user_id | city | age | sex | cost | max_dwell_time | min_dwell_time |
+---------+-----------------+------+------+--------+----------------+----------------+
| 98 | 0FIHtO29vCvQcr9 | 53 | 0 | 166775 | 502429 | 1367 |
| 98 | 0NSo0CRgdjnjRGR | 79 | 1 | 660935 | 670440 | 24804 |
| 98 | 0P17O93kGctDDS1 | 26 | 1 | 890154 | 110877 | 308874 |
| 98 | 1K64DK52EJFzN4L | 38 | 1 | 263784 | 474463 | 369040 |
| 98 | 1b5YEj5rOSfaSiB | 10 | 1 | 968116 | 43576 | 75587 |
| 98 | 25kNCa8rTR2iIzy | 56 | 0 | 251517 | 305376 | 2191 |
| 98 | 2HqItzolr0QCbd0 | 31 | 1 | 161678 | 784517 | 135597 |
| 98 | 3GaOot54BiJ4gVV | 25 | 1 | 553679 | 812828 | 317705 |
| 98 | 3HHEKlB00ZBIfVP | 58 | 0 | 987960 | 656147 | 752256 |
| 98 | 3hvRYWPgcGj4gOf | 67 | 1 | 118310 | 976024 | 382797 |
+---------+-----------------+------+------+--------+----------------+----------------+
10 rows in set (0.05 sec)
Part3: Doris导入方式:BrokerLoad
Broker load 是一个异步的导入方式,支持的数据源取决于 Broker 进程支持的数据源。用户需要通过 MySQL 协议 创建 Broker load 导入,并通过查看导入命令检查导入结果。
BrokerLoad的适用场景:
- 源数据在 Broker 可以访问的存储系统中,如 HDFS。
- 数据量在 几十到百GB 级别。
(1)Broker Load的基本原理
用户在提交导入任务后,FE 会生成对应的 Plan 并根据目前 BE 的个数和文件的大小,将 Plan 分给 多个 BE 执行,每个 BE 执行一部分导入数据。BE 在执行的过程中会从 Broker 拉取数据,在对数据 transform 之后将数据导入系统。所有 BE 均完成导入,由 FE 最终决定导入是否成功。
+
| 1. user create broker load
v
+----+----+
| |
| FE |
| |
+----+----+
|
| 2. BE etl and load the data
+--------------------------+
| | |
+---v---+ +--v----+ +---v---+
| | | | | |
| BE | | BE | | BE |
| | | | | |
+---+-^-+ +---+-^-+ +--+-^--+
| | | | | |
| | | | | | 3. pull data from broker
+---v-+-+ +---v-+-+ +--v-+--+
| | | | | |
|Broker | |Broker | |Broker |
| | | | | |
+---+-^-+ +---+-^-+ +---+-^-+
| | | | | |
+---v-+-----------v-+----------v-+-+
| HDFS/BOS/AFS cluster |
| |
+----------------------------------+
总结起来,BrokerLoad有几个要点:
- BE需要从Broker拉取数据,并对数据进行transform;
- Broker需要与外部数据源通信(HDFS、OBS等);
- BrokerLoad目前仅支持的数据格式为:csv、json、orc和parquet;
- BrokerLoad适用于导入数据量较大的场景,如:单次导入10G~100G之间的数据;
(2)BrokerLoad的实战案例
- BrokerLoad导入OBS上的数据:
在obs上保证有以下数据:
(1)在doris的mysql客户端执行以下命令创建测试表并使用brokerload将obs的csv文件导入测试表:
create database test_broker_load;
use test_broker_load;
CREATE TABLE IF NOT EXISTS test_csv
(
`user_id` LARGEINT NOT NULL COMMENT "用户id",
`city` VARCHAR(20) COMMENT "用户所在城市",
`age` SMALLINT COMMENT "用户年龄",
`sex` TINYINT COMMENT "用户性别",
`cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费",
`max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间",
`min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时间"
)
AGGREGATE KEY(`user_id`, `city`, `age`, `sex`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 100
PROPERTIES (
"replication_allocation" = "tag.location.default: 3"
);
LOAD LABEL brokerload_test_csv_label001
(
DATA INFILE("obs://****/brokerload_test/test_data.csv")
INTO TABLE `test_csv`
COLUMNS TERMINATED BY ','
)
WITH BROKER "broker1"
(
"fs.obs.access.key" = "xxxx",
"fs.obs.secret.key" = "xxxx",
"fs.obs.endpoint" = "xxxx"
);
在doris的mysql客户端执行命令查询进度:State字段为FINISHED则为导入完毕
mysql> show load order by createtime desc limit 1\G;
*************************** 1. row ***************************
JobId: 296805
Label: brokerload_test_csv_label001
State: LOADING
Progress: ETL:100%; LOAD:0%
Type: BROKER
EtlInfo: NULL
TaskInfo: cluster:N/A; timeout(s):14400; max_filter_ratio:0.0
ErrorMsg: NULL
CreateTime: 2023-12-15 11:25:41
EtlStartTime: 2023-12-15 11:25:44
EtlFinishTime: 2023-12-15 11:25:44
LoadStartTime: 2023-12-15 11:25:44
LoadFinishTime: NULL
URL: NULL
JobDetails: {"Unfinished backends":{"5a5bb0bbf6374e17-bf4db3bc0fb72636":[10068]},"ScannedRows":0,"TaskNumber":1,"LoadBytes":0,"All backends":{"5a5bb0bbf6374e17-bf4db3bc0fb72636":[10068]},"FileNumber":1,"FileSize":47185011}
TransactionId: 220717
ErrorTablets: {}
1 row in set (0.03 sec)
mysql> show load order by createtime desc limit 1\G;
*************************** 1. row ***************************
JobId: 296805
Label: brokerload_test_csv_label001
State: FINISHED
Progress: ETL:100%; LOAD:100%
Type: BROKER
EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=1000000
TaskInfo: cluster:N/A; timeout(s):14400; max_filter_ratio:0.0
ErrorMsg: NULL
CreateTime: 2023-12-15 11:25:41
EtlStartTime: 2023-12-15 11:25:44
EtlFinishTime: 2023-12-15 11:25:44
LoadStartTime: 2023-12-15 11:25:44
LoadFinishTime: 2023-12-15 11:25:49
URL: NULL
JobDetails: {"Unfinished backends":{"5a5bb0bbf6374e17-bf4db3bc0fb72636":[]},"ScannedRows":1000000,"TaskNumber":1,"LoadBytes":60000000,"All backends":{"5a5bb0bbf6374e17-bf4db3bc0fb72636":[10068]},"FileNumber":1,"FileSize":47185011}
TransactionId: 220717
ErrorTablets: {}
1 row in set (0.01 sec)
mysql> select * from test_csv limit 10;
+---------+-----------------+------+------+--------+----------------+----------------+
| user_id | city | age | sex | cost | max_dwell_time | min_dwell_time |
+---------+-----------------+------+------+--------+----------------+----------------+
| 18 | 14ZqoJvsFdO2MS5 | 55 | 0 | 584461 | 109225 | 339355 |
| 18 | 1PDzT3RMvQXfV70 | 28 | 1 | 738864 | 437902 | 617665 |
| 18 | 24KrQ3YBtynUSQo | 73 | 1 | 692823 | 280239 | 354826 |
| 18 | 4xvuG8GFlvkC7h8 | 97 | 0 | 891892 | 581610 | 167029 |
| 18 | 57QYS5HgqAwxk1h | 69 | 1 | 263653 | 544356 | 457608 |
| 18 | 5lAa1RTRNpywmc3 | 99 | 1 | 143878 | 362780 | 575735 |
| 18 | 6LuVmpV7Ayi55MH | 40 | 0 | 151588 | 839104 | 874164 |
| 18 | 6xGoa125bcjL8ds | 10 | 0 | 504910 | 349479 | 762094 |
| 18 | 7l0nvNOgkRDQNfF | 80 | 0 | 23934 | 134383 | 994837 |
| 18 | 8bDEyz7bCb3ysWa | 50 | 1 | 412142 | 573170 | 91236 |
+---------+-----------------+------+------+--------+----------------+----------------+
10 rows in set (0.06 sec)
(2)在doris的mysql客户端执行以下命令创建测试表并使用brokerload将obs的json文件导入测试表:
create database IF NOT EXISTS test_broker_load;
use test_broker_load;
CREATE TABLE IF NOT EXISTS test_json
(
`user_id` LARGEINT NOT NULL COMMENT "用户id",
`city` VARCHAR(20) COMMENT "用户所在城市",
`age` SMALLINT COMMENT "用户年龄",
`sex` TINYINT COMMENT "用户性别",
`cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费",
`max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间",
`min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时间"
)
AGGREGATE KEY(`user_id`, `city`, `age`, `sex`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 100
PROPERTIES (
"replication_allocation" = "tag.location.default: 3"
);
LOAD LABEL brokerload_test_json_label001
(
DATA INFILE("obs://****/brokerload_test/test_data.json")
INTO TABLE `test_json`
FORMAT AS "json"
)
WITH BROKER "broker1"
(
"fs.obs.access.key" = "xxx",
"fs.obs.secret.key" = "xxxx",
"fs.obs.endpoint" = "xxxx"
);
在doris的mysql客户端执行命令查询进度:State字段为FINISHED则为导入完毕
mysql> show load order by createtime desc limit 1\G;
*************************** 1. row ***************************
JobId: 298969
Label: brokerload_test_json_label001
State: LOADING
Progress: ETL:100%; LOAD:30%
Type: BROKER
EtlInfo: NULL
TaskInfo: cluster:N/A; timeout(s):14400; max_filter_ratio:0.0
ErrorMsg: NULL
CreateTime: 2023-12-15 11:37:23
EtlStartTime: 2023-12-15 11:37:24
EtlFinishTime: 2023-12-15 11:37:24
LoadStartTime: 2023-12-15 11:37:24
LoadFinishTime: NULL
URL: NULL
JobDetails: {"Unfinished backends":{"c3d584d9322448c9-a20d378c230f968f":[10068]},"ScannedRows":625856,"TaskNumber":1,"LoadBytes":37551360,"All backends":{"c3d584d9322448c9-a20d378c230f968f":[10068]},"FileNumber":1,"FileSize":123185011}
TransactionId: 222061
ErrorTablets: {}
1 row in set (0.00 sec)
mysql> show load order by createtime desc limit 1\G;
*************************** 1. row ***************************
JobId: 298969
Label: brokerload_test_json_label001
State: FINISHED
Progress: ETL:100%; LOAD:100%
Type: BROKER
EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=1000000
TaskInfo: cluster:N/A; timeout(s):14400; max_filter_ratio:0.0
ErrorMsg: NULL
CreateTime: 2023-12-15 11:37:23
EtlStartTime: 2023-12-15 11:37:24
EtlFinishTime: 2023-12-15 11:37:24
LoadStartTime: 2023-12-15 11:37:24
LoadFinishTime: 2023-12-15 11:37:35
URL: NULL
JobDetails: {"Unfinished backends":{"c3d584d9322448c9-a20d378c230f968f":[]},"ScannedRows":1000000,"TaskNumber":1,"LoadBytes":60000000,"All backends":{"c3d584d9322448c9-a20d378c230f968f":[10068]},"FileNumber":1,"FileSize":123185011}
TransactionId: 222061
ErrorTablets: {}
1 row in set (0.00 sec)
mysql> select * from test_json limit 10;
+---------+-----------------+------+------+--------+----------------+----------------+
| user_id | city | age | sex | cost | max_dwell_time | min_dwell_time |
+---------+-----------------+------+------+--------+----------------+----------------+
| 30 | 0KdoCD5iOjMERrm | 36 | 1 | 886058 | 628531 | 792428 |
| 30 | 0zssOdHjBb3iTO9 | 14 | 1 | 806385 | 488571 | 327795 |
| 30 | 1MyKEVsBFFqqrle | 25 | 1 | 555584 | 64347 | 94759 |
| 30 | 1XKruA3bIbhRiWW | 51 | 1 | 421141 | 582582 | 608166 |
| 30 | 2PnqYMYfTr2Y6ZK | 96 | 0 | 171461 | 786868 | 741986 |
| 30 | 2ZfcWyyAS0it99q | 19 | 0 | 272791 | 646823 | 640081 |
| 30 | 2apAJLcJrJhejQw | 65 | 1 | 789715 | 612145 | 277791 |
| 30 | 36tQg24IKueA1Wj | 88 | 0 | 881280 | 920618 | 397938 |
| 30 | 3S0KsfOlV8WwRPa | 49 | 0 | 608975 | 880464 | 28009 |
| 30 | 3YJ59TzYisHxgWj | 65 | 0 | 1649 | 633309 | 121486 |
+---------+-----------------+------+------+--------+----------------+----------------+
10 rows in set (0.04 sec)
(3)在doris的mysql客户端执行以下命令创建测试表并使用brokerload将obs的parquet文件导入测试表:
create database IF NOT EXISTS test_broker_load;
use test_broker_load;
CREATE TABLE IF NOT EXISTS test_parquet
(
`user_id` LARGEINT NOT NULL COMMENT "用户id",
`city` VARCHAR(20) COMMENT "用户所在城市",
`age` SMALLINT COMMENT "用户年龄",
`sex` TINYINT COMMENT "用户性别",
`cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费",
`max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间",
`min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时间"
)
AGGREGATE KEY(`user_id`, `city`, `age`, `sex`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 100
PROPERTIES (
"replication_allocation" = "tag.location.default: 3"
);
LOAD LABEL brokerload_test_parquet_label001
(
DATA INFILE("obs://****/brokerload_test/test_data.parquet")
INTO TABLE `test_parquet`
FORMAT AS "parquet"
)
WITH BROKER "broker1"
(
"fs.obs.access.key" = "xxxx",
"fs.obs.secret.key" = "xxxx",
"fs.obs.endpoint" = "xxxx"
);
在doris的mysql客户端执行命令查询进度:State字段为FINISHED则为导入完毕
mysql> show load order by createtime desc limit 1\G;
*************************** 1. row ***************************
JobId: 299384
Label: brokerload_test_parquet_label001
State: FINISHED
Progress: ETL:100%; LOAD:100%
Type: BROKER
EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=1000000
TaskInfo: cluster:N/A; timeout(s):14400; max_filter_ratio:0.0
ErrorMsg: NULL
CreateTime: 2023-12-15 11:42:07
EtlStartTime: 2023-12-15 11:42:09
EtlFinishTime: 2023-12-15 11:42:09
LoadStartTime: 2023-12-15 11:42:09
LoadFinishTime: 2023-12-15 11:42:11
URL: NULL
JobDetails: {"Unfinished backends":{"41522d96b34e44ab-8818f255ad23def4":[]},"ScannedRows":1000000,"TaskNumber":1,"LoadBytes":60000000,"All backends":{"41522d96b34e44ab-8818f255ad23def4":[10066]},"FileNumber":1,"FileSize":33965271}
TransactionId: 222067
ErrorTablets: {}
1 row in set (0.02 sec)
mysql> select * from test_parquet limit 10;
+---------+-----------------+------+------+--------+----------------+----------------+
| user_id | city | age | sex | cost | max_dwell_time | min_dwell_time |
+---------+-----------------+------+------+--------+----------------+----------------+
| 18 | 14ZqoJvsFdO2MS5 | 55 | 0 | 584461 | 109225 | 339355 |
| 18 | 1PDzT3RMvQXfV70 | 28 | 1 | 738864 | 437902 | 617665 |
| 18 | 24KrQ3YBtynUSQo | 73 | 1 | 692823 | 280239 | 354826 |
| 18 | 4xvuG8GFlvkC7h8 | 97 | 0 | 891892 | 581610 | 167029 |
| 18 | 57QYS5HgqAwxk1h | 69 | 1 | 263653 | 544356 | 457608 |
| 18 | 5lAa1RTRNpywmc3 | 99 | 1 | 143878 | 362780 | 575735 |
| 18 | 6LuVmpV7Ayi55MH | 40 | 0 | 151588 | 839104 | 874164 |
| 18 | 6xGoa125bcjL8ds | 10 | 0 | 504910 | 349479 | 762094 |
| 18 | 7l0nvNOgkRDQNfF | 80 | 0 | 23934 | 134383 | 994837 |
| 18 | 8bDEyz7bCb3ysWa | 50 | 1 | 412142 | 573170 | 91236 |
+---------+-----------------+------+------+--------+----------------+----------------+
10 rows in set (0.04 sec)
(4)在doris的mysql客户端执行以下命令创建测试表并使用brokerload将obs的orc文件导入测试表:
create database IF NOT EXISTS test_broker_load;
use test_broker_load;
CREATE TABLE IF NOT EXISTS test_orc
(
`user_id` LARGEINT NOT NULL COMMENT "用户id",
`city` VARCHAR(20) COMMENT "用户所在城市",
`age` SMALLINT COMMENT "用户年龄",
`sex` TINYINT COMMENT "用户性别",
`cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费",
`max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间",
`min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时间"
)
AGGREGATE KEY(`user_id`, `city`, `age`, `sex`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 100
PROPERTIES (
"replication_allocation" = "tag.location.default: 3"
);
LOAD LABEL brokerload_test_orc_label001
(
DATA INFILE("obs://****/brokerload_test/test_data.orc")
INTO TABLE `test_orc`
FORMAT AS "orc"
)
WITH BROKER "broker1"
(
"fs.obs.access.key" = "xxxx",
"fs.obs.secret.key" = "xxxx",
"fs.obs.endpoint" = "xxxx"
);
在doris的mysql客户端执行命令查询进度:State字段为FINISHED则为导入完毕
mysql> show load order by createtime desc limit 1\G;
*************************** 1. row ***************************
JobId: 299795
Label: brokerload_test_orc_label001
State: FINISHED
Progress: ETL:100%; LOAD:100%
Type: BROKER
EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=1000000
TaskInfo: cluster:N/A; timeout(s):14400; max_filter_ratio:0.0
ErrorMsg: NULL
CreateTime: 2023-12-15 11:44:06
EtlStartTime: 2023-12-15 11:44:09
EtlFinishTime: 2023-12-15 11:44:09
LoadStartTime: 2023-12-15 11:44:09
LoadFinishTime: 2023-12-15 11:44:12
URL: NULL
JobDetails: {"Unfinished backends":{"5105dbe03efb4348-8ecd47b926d2c9e5":[]},"ScannedRows":1000000,"TaskNumber":1,"LoadBytes":60000000,"All backends":{"5105dbe03efb4348-8ecd47b926d2c9e5":[10066]},"FileNumber":1,"FileSize":30278802}
TransactionId: 222071
ErrorTablets: {}
1 row in set (0.00 sec)
mysql> select * from test_orc limit 10;
+---------+-----------------+------+------+--------+----------------+----------------+
| user_id | city | age | sex | cost | max_dwell_time | min_dwell_time |
+---------+-----------------+------+------+--------+----------------+----------------+
| 52 | 0MIuWEHK3W4bI8b | 25 | 0 | 724049 | 145616 | 820867 |
| 52 | 0itV1cYkZPtUUus | 45 | 1 | 923031 | 106160 | 703906 |
| 52 | 0z2v63jZ0FjkkZ6 | 24 | 0 | 602503 | 898953 | 792456 |
| 52 | 1CaHXTIt6MDFyfN | 23 | 0 | 696754 | 490957 | 67682 |
| 52 | 1cn5mqqvuXzCLvf | 94 | 0 | 269539 | 137037 | 383264 |
| 52 | 1obMBQjopbvDjUY | 60 | 1 | 837550 | 828951 | 370053 |
| 52 | 1rNpaeJPLiFAuiG | 18 | 1 | 652469 | 830079 | 892756 |
| 52 | 2Ln8FdrYPDKN0eK | 80 | 1 | 884577 | 211411 | 42759 |
| 52 | 2mvODsnLA2XOQxT | 69 | 0 | 829031 | 345402 | 769167 |
| 52 | 2nLhQBksWpe7QNk | 87 | 1 | 60280 | 554871 | 817379 |
+---------+-----------------+------+------+--------+----------------+----------------+
10 rows in set (0.05 sec)
- BrokerLoad 导入 HDFS上的数据:(以下案例均在kerberos认证下的hdfs集群,如有需要请联系CloudTable工作人员上传相关keytab与krb文件)
首先,把测试数据上传至hdfs中:
[root@qh-node-master1eGtP client]# hdfs dfs -put /tmp/test_data.csv /broker_data
[root@qh-node-master1eGtP client]# hdfs dfs -put /tmp/test_data.json /broker_data
[root@qh-node-master1eGtP client]# hdfs dfs -put /tmp/test_data.parquet /broker_data
[root@qh-node-master1eGtP client]# hdfs dfs -put /tmp/test_data.orc /broker_data
[root@qh-node-master1eGtP client]# hdfs dfs -ls -h /broker_data
Found 4 items
-rw-r--r-- 3 lizuxian_test supergroup 45.0 M 2023-12-16 15:26 /broker_data/test_data.csv
-rw-r--r-- 3 lizuxian_test supergroup 117.5 M 2023-12-16 15:27 /broker_data/test_data.json
-rw-r--r-- 3 lizuxian_test supergroup 28.9 M 2023-12-16 15:27 /broker_data/test_data.orc
-rw-r--r-- 3 lizuxian_test supergroup 32.4 M 2023-12-16 15:27 /broker_data/test_data.parquet
(1)在doris的mysql客户端执行以下命令创建测试表并使用brokerload将hdfs的csv文件导入测试表:
create database IF NOT EXISTS test_broker_load;
use test_broker_load;
CREATE TABLE IF NOT EXISTS test_csv_hdfs
(
`user_id` LARGEINT NOT NULL COMMENT "用户id",
`city` VARCHAR(20) COMMENT "用户所在城市",
`age` SMALLINT COMMENT "用户年龄",
`sex` TINYINT COMMENT "用户性别",
`cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费",
`max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间",
`min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时间"
)
AGGREGATE KEY(`user_id`, `city`, `age`, `sex`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 100
PROPERTIES (
"replication_allocation" = "tag.location.default: 3"
);
LOAD LABEL brokerload_test_hdfs_csv_label001
(
DATA INFILE("hdfs://{name_node_ip}:
{dfs.namenode.rpc.port}/broker_data/test_data.csv")
INTO TABLE `test_csv_hdfs`
COLUMNS TERMINATED BY ','
FORMAT AS 'csv'
)
WITH BROKER 'broker1' (
'hadoop.security.authentication'='kerberos',
'kerberos_keytab'='{keytab_path}',
'kerberos_principal'='lizuxian_test@C4F9C********D274F518680.COM' -- 可以执行`kinit {用户名}` 获得
)
PROPERTIES
(
'timeout'='1200',
'max_filter_ratio'='0.1'
);
在doris的mysql客户端执行命令查询进度:State字段为FINISHED则为导入完毕
mysql> show load order by createtime desc limit 1\G;
*************************** 1. row ***************************
JobId: 341574
Label: brokerload_test_hdfs_csv_label001
State: FINISHED
Progress: ETL:100%; LOAD:100%
Type: BROKER
EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=1000000
TaskInfo: cluster:N/A; timeout(s):1200; max_filter_ratio:0.1
ErrorMsg: NULL
CreateTime: 2023-12-16 15:38:41
EtlStartTime: 2023-12-16 15:38:44
EtlFinishTime: 2023-12-16 15:38:44
LoadStartTime: 2023-12-16 15:38:44
LoadFinishTime: 2023-12-16 15:38:50
URL: NULL
JobDetails: {"Unfinished backends":{"695c5e8100e249cb-
904526826e9b5b52":
[]},"ScannedRows":1000000,"TaskNumber":1,"LoadBytes":60000000,"All
backends":{"695c5e8100e249cb-904526826e9b5b52":
[10068]},"FileNumber":1,"FileSize":47185011}
TransactionId: 248175
ErrorTablets: {}
1 row in set (0.00 sec)
mysql> select * from test_csv_hdfs limit 10;
+---------+-----------------+------+------+--------+----------------+----------------+
| user_id | city | age | sex | cost | max_dwell_time |min_dwell_time |
+---------+-----------------+------+------+--------+----------------+----------------+
| 21 | 0Ebx7zo8fRAIG71 | 77 | 1 | 4438 | 752439 |748616 |
| 21 | 0cwU5nA9sXkQjAN | 94 | 1 | 137373 | 10643 |610344 |
| 21 | 0otCSIhjkuvetsS | 75 | 1 | 116712 | 624894 |146138 |
| 21 | 1Lh46tLqlLs31YC | 50 | 0 | 718119 | 904591 |85446 |
| 21 | 2054ikYBDzq1COa | 53 | 0 | 954765 | 683905 |310742 |
| 21 | 2U7pZ2ACH3gQOMu | 67 | 1 | 962595 | 168301 |483849 |
| 21 | 3pwyeP3nwtZyRg0 | 58 | 0 | 606347 | 193629 |114854 |
| 21 | 3vCXiQEUTTRi1aK | 34 | 0 | 220115 | 437874 |732262 |
| 21 | 3wz1dlZySof9PGA | 57 | 1 | 562944 | 666807 |588531 |
| 21 | 4Byce2YLt3PQNxH | 53 | 1 | 528431 | 723889 |532362 |
+---------+-----------------+------+------+--------+----------------+----------------+
10 rows in set (0.07 sec)
(2)在doris的mysql客户端执行以下命令创建测试表并使用brokerload将hdfs的json文件导入测试
表:
create database IF NOT EXISTS test_broker_load;
use test_broker_load;
CREATE TABLE IF NOT EXISTS test_json_hdfs
(
`user_id` LARGEINT NOT NULL COMMENT "用户id",
`city` VARCHAR(20) COMMENT "用户所在城市",
`age` SMALLINT COMMENT "用户年龄",
`sex` TINYINT COMMENT "用户性别",
`cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费",
`max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间",
`min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时间"
)
AGGREGATE KEY(`user_id`, `city`, `age`, `sex`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 100
PROPERTIES (
"replication_allocation" = "tag.location.default: 3"
);
LOAD LABEL brokerload_test_hdfs_json_label001
(
DATA INFILE("hdfs://{name_node_ip}:
{dfs.namenode.rpc.port}/broker_data/test_data.json")
INTO TABLE `test_json_hdfs`
FORMAT AS 'json'
)
WITH BROKER 'broker1' (
'hadoop.security.authentication'='kerberos',
'kerberos_keytab'='{keytab_path}',
'kerberos_principal'='lizuxian_test@C4F9*********_0D274F518680.CO
M' -- 可以执行`kinit {用户名}` 获得
)
PROPERTIES
(
'timeout'='1200',
'max_filter_ratio'='0.1'
);
在doris的mysql客户端执行命令查询进度:State字段为FINISHED则为导入完毕
mysql> show load order by createtime desc limit 1\G;
*************************** 1. row ***************************
JobId: 341987
Label: brokerload_test_hdfs_json_label001
State: FINISHED
Progress: ETL:100%; LOAD:100%
Type: BROKER
EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=1000000
TaskInfo: cluster:N/A; timeout(s):1200; max_filter_ratio:0.1
ErrorMsg: NULL
CreateTime: 2023-12-16 15:43:13
EtlStartTime: 2023-12-16 15:43:21
EtlFinishTime: 2023-12-16 15:43:21
LoadStartTime: 2023-12-16 15:43:21
LoadFinishTime: 2023-12-16 15:43:29
URL: NULL
JobDetails: {"Unfinished backends":{"9a5b289be4604780-
80bba11c1a64fb7a":
[]},"ScannedRows":1000000,"TaskNumber":1,"LoadBytes":60000000,"All
backends":{"9a5b289be4604780-80bba11c1a64fb7a":
[10066]},"FileNumber":1,"FileSize":123185011}
TransactionId: 248180
ErrorTablets: {}
1 row in set (0.01 sec)
mysql> select * from test_json_hdfs limit 10;
+---------+-----------------+------+------+--------+----------------+----------------+
| user_id | city | age | sex | cost | max_dwell_time |min_dwell_time |
+---------+-----------------+------+------+--------+----------------+----------------+
| 333 | 1ChKZMB7BcaBII1 | 14 | 0 | 521905 | 121745 |69743 |
| 333 | 1hk2mmhGMid6XzF | 46 | 0 | 7750 | 850394 |127748 |
| 333 | 1j80IZqT0emeX6P | 40 | 1 | 686404 | 733847 |909025 |
| 333 | 2Op7dMBUTfSBmSt | 63 | 0 | 801874 | 820379 |395929 |
| 333 | 2mKmqt0NzBgqfon | 91 | 1 | 124587 | 711769 |450717 |
| 333 | 2tgnUosR4E9BJHC | 98 | 1 | 545540 | 578186 |259229 |
| 333 | 44QZHyforBIuff5 | 0 | 0 | 779132 | 648994 |370163 |
| 333 | 4CDd7uKUmGbtx3M | 42 | 1 | 830187 | 83749 |703733 |
| 333 | 4LOyGiVSHM6Z0SR | 72 | 1 | 614207 | 106296 |947380 |
| 333 | 4MBJnFU0UsYDcVv | 20 | 1 | 515572 | 540372 |251512 |
+---------+-----------------+------+------+--------+----------------+----------------+
10 rows in set (0.06 sec)
(3)在doris的mysql客户端执行以下命令创建测试表并使用brokerload将hdfs的parquet文件导入测试表:
create database IF NOT EXISTS test_broker_load;
use test_broker_load;
CREATE TABLE IF NOT EXISTS test_parquet_hdfs
(
`user_id` LARGEINT NOT NULL COMMENT "用户id",
`city` VARCHAR(20) COMMENT "用户所在城市",
`age` SMALLINT COMMENT "用户年龄",
`sex` TINYINT COMMENT "用户性别",
`cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费",
`max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间",
`min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时间"
)
AGGREGATE KEY(`user_id`, `city`, `age`, `sex`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 100
PROPERTIES (
"replication_allocation" = "tag.location.default: 3"
);
LOAD LABEL brokerload_test_hdfs_parquet_label001
(
DATA INFILE("hdfs://{name_node_ip}:
{dfs.namenode.rpc.port}/broker_data/test_data.parquet")
INTO TABLE `test_parquet_hdfs`
FORMAT AS 'parquet'
)
WITH BROKER 'broker1' (
'hadoop.security.authentication'='kerberos',
'kerberos_keytab'='{keytab_path}',
'kerberos_principal'='lizuxian_test@C4F9C8*******0D274F518680.CO
M' -- 可以执行`kinit {用户名}` 获得
)
PROPERTIES
(
'timeout'='1200',
'max_filter_ratio'='0.1'
);
在doris的mysql客户端执行命令查询进度:State字段为FINISHED则为导入完毕
mysql> show load order by createtime desc limit 1\G;
*************************** 1. row ***************************
JobId: 342398
Label: brokerload_test_hdfs_parquet_label001
State: FINISHED
Progress: ETL:100%; LOAD:100%
Type: BROKER
EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=1000000
TaskInfo: cluster:N/A; timeout(s):1200; max_filter_ratio:0.1
ErrorMsg: NULL
CreateTime: 2023-12-16 15:45:41
EtlStartTime: 2023-12-16 15:45:44
EtlFinishTime: 2023-12-16 15:45:44
LoadStartTime: 2023-12-16 15:45:44
LoadFinishTime: 2023-12-16 15:45:46
URL: NULL
JobDetails: {"Unfinished backends":{"35eb3e403898466dab36749e60f9c7f9":
[]},"ScannedRows":1000000,"TaskNumber":1,"LoadBytes":60000000,"All
backends":{"35eb3e403898466d-ab36749e60f9c7f9":
[10067]},"FileNumber":1,"FileSize":33965271}
TransactionId: 248184
ErrorTablets: {}
1 row in set (0.00 sec)
mysql> select * from test_parquet_hdfs limit 10;
+---------+-----------------+------+------+--------+----------------+----------------+
| user_id | city | age | sex | cost | max_dwell_time |min_dwell_time |
+---------+-----------------+------+------+--------+----------------+----------------+
| 30 | 0KdoCD5iOjMERrm | 36 | 1 | 886058 | 628531 |792428 |
| 30 | 0zssOdHjBb3iTO9 | 14 | 1 | 806385 | 488571 |327795 |
| 30 | 1MyKEVsBFFqqrle | 25 | 1 | 555584 | 64347 |94759 |
| 30 | 1XKruA3bIbhRiWW | 51 | 1 | 421141 | 582582 |608166 |
| 30 | 2PnqYMYfTr2Y6ZK | 96 | 0 | 171461 | 786868 |741986 |
| 30 | 2ZfcWyyAS0it99q | 19 | 0 | 272791 | 646823 |640081 |
| 30 | 2apAJLcJrJhejQw | 65 | 1 | 789715 | 612145 |277791 |
| 30 | 36tQg24IKueA1Wj | 88 | 0 | 881280 | 920618 |397938 |
| 30 | 3S0KsfOlV8WwRPa | 49 | 0 | 608975 | 880464 |28009 |
| 30 | 3YJ59TzYisHxgWj | 65 | 0 | 1649 | 633309 |121486 |
+---------+-----------------+------+------+--------+----------------+----------------+
10 rows in set (0.05 sec)
(4)在doris的mysql客户端执行以下命令创建测试表并使用brokerload将hdfs的orc文件导入测试表:
create database IF NOT EXISTS test_broker_load;
use test_broker_load;
CREATE TABLE IF NOT EXISTS test_orc_hdfs
(
`user_id` LARGEINT NOT NULL COMMENT "用户id",
`city` VARCHAR(20) COMMENT "用户所在城市",
`age` SMALLINT COMMENT "用户年龄",
`sex` TINYINT COMMENT "用户性别",
`cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费",
`max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间",
`min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时间"
)
AGGREGATE KEY(`user_id`, `city`, `age`, `sex`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 100
PROPERTIES (
"replication_allocation" = "tag.location.default: 3"
);
LOAD LABEL brokerload_test_hdfs_orc_label001
(
DATA INFILE("hdfs://{name_node_ip}:
{dfs.namenode.rpc.port}/broker_data/test_data.orc")
INTO TABLE `test_orc_hdfs`
FORMAT AS 'orc'
)
WITH BROKER 'broker1' (
'hadoop.security.authentication'='kerberos',
'kerberos_keytab'='{keytab_path}',
'kerberos_principal'='lizuxian_test@C4F9C837_896********74F518680.CO
M' -- 可以执行`kinit {用户名}` 获得
)
PROPERTIES
(
'timeout'='1200',
'max_filter_ratio'='0.1'
);
在doris的mysql客户端执行命令查询进度:State字段为FINISHED则为导入完毕
mysql> show load order by createtime desc limit 1\G;
*************************** 1. row ***************************
JobId: 342809
Label: brokerload_test_hdfs_orc_label001
State: FINISHED
Progress: ETL:100%; LOAD:100%
Type: BROKER
EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=1000000
TaskInfo: cluster:N/A; timeout(s):1200; max_filter_ratio:0.1
ErrorMsg: NULL
CreateTime: 2023-12-16 15:47:48
EtlStartTime: 2023-12-16 15:47:54
EtlFinishTime: 2023-12-16 15:47:54
LoadStartTime: 2023-12-16 15:47:54
LoadFinishTime: 2023-12-16 15:47:56
URL: NULL
JobDetails: {"Unfinished backends":{"a8855386fee040b3-
bf496de3aeed9b35":
[]},"ScannedRows":1000000,"TaskNumber":1,"LoadBytes":60000000,"All
backends":{"a8855386fee040b3-bf496de3aeed9b35":
[10066]},"FileNumber":1,"FileSize":30278802}
TransactionId: 248188
ErrorTablets: {}
1 row in set (0.00 sec)
mysql> select * from test_orc_hdfs limit 10;
+---------+-----------------+------+------+--------+----------------+----------------+
| user_id | city | age | sex | cost | max_dwell_time |min_dwell_time |
+---------+-----------------+------+------+--------+----------------+----------------+
| 304 | 08fJBA8PC4JVvq9 | 78 | 1 | 524971 | 916140 |86993 |
| 304 | 0GgTHWckbRA8UFg | 36 | 0 | 828414 | 306785 |133672 |
| 304 | 0mKUznbdEGWhVCX | 54 | 1 | 624874 | 794295 |31527 |
| 304 | 1HBHO4opOemHs7F | 72 | 0 | 906372 | 717418 |163735 |
| 304 | 1YU3rngHmeOa0SL | 10 | 1 | 968173 | 35689 |33720 |
| 304 | 28FZpfgyijMSTT7 | 3 | 0 | 176383 | 491327 |699844 |
| 304 | 2NAQaUkC1MIxmi5 | 57 | 0 | 932530 | 614064 |524431 |
| 304 | 2tu5EhavPVzg0Be | 20 | 1 | 524956 | 31996 |836995 |
| 304 | 38y7GD7vmHUqJkv | 59 | 1 | 928067 | 849794 |88436 |
| 304 | 3JIzbSVY85AsC0h | 17 | 0 | 586507 | 309949 |701555 |
+---------+-----------------+------+------+--------+----------------+----------------+
10 rows in set (0.05 sec)
(3)BrokerLoad的注意事项
- Label、导入事务、多表原子性:Doris 中所有导入任务都是原子生效的。并且在同一个导入任务中对多张表的导入也能够保证原子性。同时,Doris 还可以通过 Label 的机制来保证数据导入的不丢不重。
- 超时时间:Broker Load 的默认超时时间为 4 小时。从任务提交开始算起。如果在超时时间内没有完成,则任务会失败。如果想修改超时时间,则可以在PROPERTIES中加入:“timeout” = “3600”(超时时间为3600秒)。
- 数据量和任务数限制:Broker Load 适合在一个导入任务中导入100GB以内的数据。虽然理论上在一个导入任务中导入的数据量没有上限。但是提交过大的导入会导致运行时间较长,并且失败后重试的代价也会增加(也会增加集群压力)。同时受限于集群规模,CloudTable Doris 限制了导入的最大数据量为 ComputeNode 节点数 * 3GB。以保证系统资源的合理利用。如果有大数据量需要导入,建议分成多个导入任务提交。Doris 同时会限制集群内同时运行的导入任务数量,通常在 3-10 个不等。之后提交的导入作业会排队等待。队列最大长度为 100。之后的提交会直接拒绝。注意排队时间也被计算到了作业总时间中。如果超时,则作业会被取消。所以建议通过监控作业运行状态来合理控制作业提交频率。
关于数据量的进一步讨论:
这里仅讨论单个 BE 的情况,如果用户集群有多个 BE 则下面标题中的数据量应该乘以 BE 个数来计算。比如:如果用户有3个 BE,则 3G 以下(包含)则应该乘以 3,也就是 9G 以下(包含)。
-
3G 以下(包含)(如果是3个BE,则是3 * 3G = 9G):用户可以直接提交 Broker load 创建导入请求。
-
3G 以上(如果是3个BE,则是3 * 3G = 9G):调整Broker load 的导入参数来实现大文件的导入。
i. 根据当前 BE 的个数和原始文件的大小修改单个 BE 的最大扫描量和最大并发数。
修改 fe.conf 中配置 (联系CloudTable管理员设置) max_broker_concurrency = BE 个数 当前导入任务单个 BE 处理的数据量 = 原始文件大小 / max_broker_concurrency max_bytes_per_broker_scanner >= 当前导入任务单个 BE 处理的数据量 比如一个 100G 的文件,集群的 BE 个数为 10 个 max_broker_concurrency = 10 max_bytes_per_broker_scanner >= 10G = 100G / 10
ii. 在创建导入的时候自定义当前导入任务的 timeout 时间
当前导入任务单个 BE 处理的数据量 / 用户 Doris 集群最慢导入速度(MB/s) >= 当前导入任务的 timeout 时间 >= 当前导入任务单个 BE 处理的数据量 / 10M/s 比如一个 100G 的文件,集群的 BE 个数为 10个 timeout >= 1000s = 10G / 10M/s
iii. 当发现ii步计算出的 timeout 时间超过系统默认的导入最大超时时间 4小时,不推荐用户将导入最大超时时间直接改大来解决问题。单个导入时间如果超过默认的导入最大超时时间4小时,最好是通过切分待导入文件并且分多次导入来解决问题。主要原因是:单次导入超过4小时的话,导入失败后重试的时间成本很高。
期望最大导入文件数据量 = 14400s * 10M/s * BE 个数 比如:集群的 BE 个数为 10个 期望最大导入文件数据量 = 14400s * 10M/s * 10 = 1440000M ≈ 1440G 注意:一般用户的环境可能达不到 10M/s 的速度,所以建议超过 500G 的文件都进行文件切分,再导入。
以上就是本文关于 StreamLoad和BrokerLoad 的相关使用案例,如果您需要本文中的相关测试数据,可以在评论区评论,如果有需要的话,我可以再给大家出一期如果快速构建测试数据的案例。
- 点赞
- 收藏
- 关注作者
评论(0)