CloudTable Doris教程:玩转Doris数据导入 (一)

举报
i数据的程序猿 发表于 2024/01/04 10:06:45 2024/01/04
【摘要】 数据导入作为数据分析任务的先行工作,将直接影响数据分析结果的准确性和可靠性。本系列文章将给大家分享CloudTable Doris 丰富的数据导入生态,6个案例带你轻松玩转Doris数据导入。本文是Doris数据导入系列文章的第一篇,主要介绍CloudTable Doris中比较常用,也是官方推荐的数据导入方式:StreamLoad和BrokerLoad。

今天和大家聊聊CloudTable Doris 导入外部数据的问题。如何将外部文件导入Doris?如何抉择stream load还是broker load?关系型数据库Mysql能实时导入CloudTable Doris吗?如何联动常用的大数据组件Spark、Flink、Kafka…将数据导入Doris 等等,你将在本文中找到答案。

本文,就让我们先看下 CloudTable Doris中比较常用,也是官方推荐的数据导入方式:StreamLoad和BrokerLoad。

Part 1: Doris数据导入的常见场景

CloudTable Doris 提供多种数据导入的方案,大家可以根据自己的需求选择最合适的导入方案:

  • Stream Load:Stream load 是一个同步的导入方式,用户通过发送 HTTP 协议 发送请求将本地文件或数据流导入到 Doris 中。Stream load 同步执行导入并返回导入结果。用户可直接通过请求的返回体判断本次导入是否成功。Stream load 主要适用于导入本地文件,或通过程序导入数据流中的数据
  • Broker Load:Broker load 是一个异步的导入方式,支持的数据源取决于 Broker 进程支持的数据源。用户需要通过 MySQL 协议 创建 Broker load 导入,并通过查看导入命令检查导入结果。
  • spark connector 与 flink connector:Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据,也支持通过Spark写入数据到Doris。Flink Doris Connector 可以支持通过 Flink 操作(读取、插入、修改、删除) Doris 中存储的数据。
  • flinkcdc 实时导入:一般在业务数据库中,会使用编号来作为表的主键,比如Student表,会使用编号(id)来作为主键,但是随着业务的发展,数据对应的编号有可能是会发生变化的。 在这种场景下,使用FlinkCDC + Doris Connector同步数据,便可以自动更新Doris主键列的数据。
  • jdbc 插入数据:用户可以通过 MySQL 协议,使用 INSERT 语句进行数据导入。

等等。

Part2: Doris导入方式:Stream Load

Stream load 是一个同步的导入方式,用户通过发送 HTTP 协议发送请求将本地文件或数据流导入到 Doris 中。Stream load 同步执行导入并返回导入结果。用户可直接通过请求的返回体判断本次导入是否成功。

Stream load 主要适用于导入本地文件,或通过程序导入数据流中的数据。

(1)Stream Load的基本原理

以下是Stream Load的主要流程:

                         ^      +
                         |      |
                         |      | 1A. User submit load to FE
                         |      |
                         |   +--v-----------+
                         |   | FE           |
5. Return result to user |   +--+-----------+
                         |      |
                         |      | 2. Redirect to BE
                         |      |
                         |   +--v-----------+
                         +---+Coordinator BE| 1B. User submit load to BE
                             +-+-----+----+-+
                               |     |    |
                         +-----+     |    +-----+
                         |           |          | 3. Distrbute data
                         |           |          |
                       +-v-+       +-v-+      +-v-+
                       |BE |       |BE |      |BE |
                       +---+       +---+      +---+

总结下来:

  • 用户提交请求到FE节点;
  • FE节点重定向到其中一个BE节点作为 “Coordinator BE节点”(可以理解为:中转站);
  • 客户会提交LOAD到该Coordinator BE节点上;
  • Coordinator BE节点会分配数据到其他BE节点上存储;

Stream load 中,Doris 会选定一个节点作为 Coordinator 节点。该节点负责接数据并分发数据到其他数据节点。用户通过 HTTP 协议提交导入命令。如果提交到 FE,则 FE 会通过 HTTP redirect 指令将请求转发给某一个 BE。用户也可以直接提交导入命令给某一指定 BE。导入的最终结果由 Coordinator BE 返回给用户。

(2)Stream Load的实战案例

目前 CloudTable Doris 的 Stream Load 支持数据格式:CSV(文本)、JSON、PARQUET 和 ORC格式。

  • 使用 Stream Load 导入 csv 格式的数据:
create database test_stream_load;

use test_stream_load;

CREATE TABLE IF NOT EXISTS test_csv
(
    `user_id` LARGEINT NOT NULL COMMENT "用户id",
    `city` VARCHAR(20) COMMENT "用户所在城市",
    `age` SMALLINT COMMENT "用户年龄",
    `sex` TINYINT COMMENT "用户性别",
    `cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费",
    `max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间",
    `min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时间"
)
AGGREGATE KEY(`user_id`, `city`, `age`, `sex`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 100
PROPERTIES (
"replication_allocation" = "tag.location.default: 3"
);

使用一台与doris节点同vpc的ecs 执行Stream Load语句,将test_data.csv导入test_csv表中:

如果不会导出这种数据的可以参照python的语法:df.to_csv("./test_data.csv", index=None, header=None, encoding='utf-8')

root@ecs-lizuxian-mysql8-notdelete:~/python_test# curl --location-trusted -u {user}:{password} -H "label:test_csv_20231214-002" -H "column_separator:," -T ./test_data.csv http://{fe_ip}:{http_port,default 8030}/api/test_stream_load/test_csv/_stream_load
{
    "TxnId": 30194,
    "Label": "test_csv_20231214-001",
    "TwoPhaseCommit": "false",
    "Status": "Success",
    "Message": "OK",
    "NumberTotalRows": 999999,
    "NumberLoadedRows": 999999,
    "NumberFilteredRows": 0,
    "NumberUnselectedRows": 0,
    "LoadBytes": 47281446,
    "LoadTimeMs": 1333,
    "BeginTxnTimeMs": 4,
    "StreamLoadPutTimeMs": 9,
    "ReadDataTimeMs": 386,
    "WriteDataTimeMs": 1274,
    "CommitAndPublishTimeMs": 39
}

在doris的mysql客户端查询:

mysql> select * from test_csv limit 10;
+---------+-----------------+------+------+---------+----------------+----------------+
| user_id | city            | age  | sex  | cost    | max_dwell_time | min_dwell_time |
+---------+-----------------+------+------+---------+----------------+----------------+
| 16      | 1yi3tl5MCFGArml |   59 |    0 |  317844 |         844343 |         154118 |
| 16      | 24EwBA3JUNINyNL |   15 |    1 | 1923780 |         142085 |         770301 |
| 16      | 25SMqfLbtB6qfFz |   64 |    1 |  439054 |         631792 |         823842 |
| 16      | 2yi6oxrHAFY87GG |    4 |    1 | 1723916 |         299258 |         234990 |
| 16      | 3AdVrmWSQc4avEC |   17 |    0 |  203914 |         182567 |         631297 |
| 16      | 4V4TyQae9lr6fJ7 |   11 |    0 |  931320 |          94632 |         205505 |
| 16      | 4dDtDAaHLyyxxQn |   91 |    1 |  289964 |         942567 |         572919 |
| 16      | 6OE1iN4p8oruGbC |   47 |    1 |  505150 |         743670 |         753172 |
| 16      | 6yt1armiAhc2Odz |   72 |    1 |  323502 |         715525 |          68762 |
| 16      | 7IuDVuhqkxmr0Gj |   60 |    1 |  130878 |         800067 |         186116 |
+---------+-----------------+------+------+---------+----------------+----------------+
10 rows in set (0.09 sec)
  • 使用 Stream Load 导入 json格式的数据:
create database test_stream_load;

use test_stream_load;

CREATE TABLE IF NOT EXISTS test_json
(
    `user_id` LARGEINT NOT NULL COMMENT "用户id",
    `city` VARCHAR(20) COMMENT "用户所在城市",
    `age` SMALLINT COMMENT "用户年龄",
    `sex` TINYINT COMMENT "用户性别",
    `cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费",
    `max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间",
    `min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时间"
)
AGGREGATE KEY(`user_id`, `city`, `age`, `sex`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 100
PROPERTIES (
"replication_allocation" = "tag.location.default: 3"
);

使用一台与doris节点同vpc的ecs 执行Stream Load语句,将test_data.json导入test_json表中:

如果不会导出这种数据的可以参照python的语法:df.to_json('./test_data.json', orient='records', lines=True)

root@ecs-lizuxian-mysql8-notdelete:~/python_test# curl --location-trusted -u {user}:{password} -H "label:test_json_20231214-001" -H "format:json" -H "fuzzy_parse" -H "exec_mem_limit: 5368709120" -H "read_json_by_line:true"  -H "streaming_load_json_max_mb:5000" -T ./test_data.json http://{fe_ip}:{http_port,default 8030}/api/test_stream_load/test_json/_stream_load
{
    "TxnId": 47620,
    "Label": "test_json_20231214-001",
    "TwoPhaseCommit": "false",
    "Status": "Success",
    "Message": "OK",
    "NumberTotalRows": 1000000,
    "NumberLoadedRows": 1000000,
    "NumberFilteredRows": 0,
    "NumberUnselectedRows": 0,
    "LoadBytes": 123282164,
    "LoadTimeMs": 9112,
    "BeginTxnTimeMs": 6,
    "StreamLoadPutTimeMs": 6,
    "ReadDataTimeMs": 8391,
    "WriteDataTimeMs": 9067,
    "CommitAndPublishTimeMs": 26
}

在doris的mysql客户端查询:

mysql> select * from test_json limit 10;
+---------+-----------------+------+------+--------+----------------+----------------+
| user_id | city            | age  | sex  | cost   | max_dwell_time | min_dwell_time |
+---------+-----------------+------+------+--------+----------------+----------------+
| 33      | 1WUvNHUiBZmmklJ |   77 |    1 | 713862 |         409226 |         794179 |
| 33      | 1xzAJw2CSUS5qOE |   94 |    0 | 267592 |         915030 |         867483 |
| 33      | 2aQMjzsy6sufoEy |   78 |    1 | 934307 |          79007 |         128144 |
| 33      | 2t1fUldzVlylkn2 |   86 |    1 | 923127 |         155184 |         548695 |
| 33      | 352rMu9XIFELIB1 |   41 |    1 | 782404 |         551276 |         166169 |
| 33      | 3ULQsZfBhYqSoQ2 |   30 |    1 | 446071 |         101973 |         726908 |
| 33      | 3axPsLLYHnLBl94 |   50 |    1 |  59905 |         767343 |         872882 |
| 33      | 3f4QuUCu4w5Bdlt |   76 |    0 | 520908 |         192915 |         986585 |
| 33      | 3sUgvdcWgDlasET |   69 |    1 |  19857 |         372873 |         398638 |
| 33      | 4F0ljq1AtXaVxCI |   12 |    0 |  22095 |          94146 |         928596 |
+---------+-----------------+------+------+--------+----------------+----------------+
10 rows in set (0.38 sec)
  • 使用 Stream Load 导入 PARQUET 格式的数据:
create database test_stream_load;

use test_stream_load;

CREATE TABLE IF NOT EXISTS test_parquet
(
    `user_id` LARGEINT NOT NULL COMMENT "用户id",
    `city` VARCHAR(20) COMMENT "用户所在城市",
    `age` SMALLINT COMMENT "用户年龄",
    `sex` TINYINT COMMENT "用户性别",
    `cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费",
    `max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间",
    `min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时间"
)
AGGREGATE KEY(`user_id`, `city`, `age`, `sex`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 100
PROPERTIES (
"replication_allocation" = "tag.location.default: 3"
);

使用一台与doris节点同vpc的ecs 执行Stream Load语句,将test_data.parquet导入test_parquet表中:

如果不会导出这种数据的可以参照python的语法:df.to_parquet("./test_data.parquet", index=False)

root@ecs-lizuxian-mysql8-notdelete:~/python_test# curl --location-trusted -u {user}:{password} -H "label:test_parquet_20231214-001" -H "format:parquet" -T ./test_data.parquet http://{fe_ip}:{http_port,default 8030}/api/test_stream_load/test_parquet/_stream_load
{
    "TxnId": 62100,
    "Label": "test_parquet_20231214-001",
    "TwoPhaseCommit": "false",
    "Status": "Success",
    "Message": "OK",
    "NumberTotalRows": 1000000,
    "NumberLoadedRows": 1000000,
    "NumberFilteredRows": 0,
    "NumberUnselectedRows": 0,
    "LoadBytes": 33965271,
    "LoadTimeMs": 1483,
    "BeginTxnTimeMs": 6,
    "StreamLoadPutTimeMs": 9,
    "ReadDataTimeMs": 22,
    "WriteDataTimeMs": 1400,
    "CommitAndPublishTimeMs": 22
}

在doris的mysql客户端查询:

mysql> select * from test_parquet limit 10;
+---------+-----------------+------+------+--------+----------------+----------------+
| user_id | city            | age  | sex  | cost   | max_dwell_time | min_dwell_time |
+---------+-----------------+------+------+--------+----------------+----------------+
| 18      | 14ZqoJvsFdO2MS5 |   55 |    0 | 584461 |         109225 |         339355 |
| 18      | 1PDzT3RMvQXfV70 |   28 |    1 | 738864 |         437902 |         617665 |
| 18      | 24KrQ3YBtynUSQo |   73 |    1 | 692823 |         280239 |         354826 |
| 18      | 4xvuG8GFlvkC7h8 |   97 |    0 | 891892 |         581610 |         167029 |
| 18      | 57QYS5HgqAwxk1h |   69 |    1 | 263653 |         544356 |         457608 |
| 18      | 5lAa1RTRNpywmc3 |   99 |    1 | 143878 |         362780 |         575735 |
| 18      | 6LuVmpV7Ayi55MH |   40 |    0 | 151588 |         839104 |         874164 |
| 18      | 6xGoa125bcjL8ds |   10 |    0 | 504910 |         349479 |         762094 |
| 18      | 7l0nvNOgkRDQNfF |   80 |    0 |  23934 |         134383 |         994837 |
| 18      | 8bDEyz7bCb3ysWa |   50 |    1 | 412142 |         573170 |          91236 |
+---------+-----------------+------+------+--------+----------------+----------------+
10 rows in set (0.09 sec)
  • 使用 Stream Load 导入 ORC 格式的数据:
create database test_stream_load;

use test_stream_load;

CREATE TABLE IF NOT EXISTS test_orc
(
    `user_id` LARGEINT NOT NULL COMMENT "用户id",
    `city` VARCHAR(20) COMMENT "用户所在城市",
    `age` SMALLINT COMMENT "用户年龄",
    `sex` TINYINT COMMENT "用户性别",
    `cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费",
    `max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间",
    `min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时间"
)
AGGREGATE KEY(`user_id`, `city`, `age`, `sex`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 100
PROPERTIES (
"replication_allocation" = "tag.location.default: 3"
);

使用一台与doris节点同vpc的ecs 执行Stream Load语句,将test_data.orc导入test_orc表中:

如果不会导出这种数据的可以参照python的语法:

import pyarrow.orc as orc
import pyarrow as pa
import pandas as pd

# df 是一个pandas 数据框
table = pa.Table.from_pandas(df, preserve_index=False)
orc.write_table(table, './test_data.orc')

StreamLoad实例:

root@ecs-lizuxian-mysql8-notdelete:~/python_test# curl --location-trusted -u {user}:{password} -H "label:test_orc_20231214-001" -H "format:orc" -T ./test_data.orc http://{fe_ip}:{http_port,default 8030}/api/test_stream_load/test_orc/_stream_load
{
    "TxnId": 63248,
    "Label": "test_orc_20231214-001",
    "TwoPhaseCommit": "false",
    "Status": "Success",
    "Message": "OK",
    "NumberTotalRows": 1000000,
    "NumberLoadedRows": 1000000,
    "NumberFilteredRows": 0,
    "NumberUnselectedRows": 0,
    "LoadBytes": 30278802,
    "LoadTimeMs": 1456,
    "BeginTxnTimeMs": 0,
    "StreamLoadPutTimeMs": 1,
    "ReadDataTimeMs": 20,
    "WriteDataTimeMs": 1390,
    "CommitAndPublishTimeMs": 29
}

在doris的mysql客户端查询:

mysql> select * from test_orc limit 10;
+---------+-----------------+------+------+--------+----------------+----------------+
| user_id | city            | age  | sex  | cost   | max_dwell_time | min_dwell_time |
+---------+-----------------+------+------+--------+----------------+----------------+
| 98      | 0FIHtO29vCvQcr9 |   53 |    0 | 166775 |         502429 |           1367 |
| 98      | 0NSo0CRgdjnjRGR |   79 |    1 | 660935 |         670440 |          24804 |
| 98      | 0P17O93kGctDDS1 |   26 |    1 | 890154 |         110877 |         308874 |
| 98      | 1K64DK52EJFzN4L |   38 |    1 | 263784 |         474463 |         369040 |
| 98      | 1b5YEj5rOSfaSiB |   10 |    1 | 968116 |          43576 |          75587 |
| 98      | 25kNCa8rTR2iIzy |   56 |    0 | 251517 |         305376 |           2191 |
| 98      | 2HqItzolr0QCbd0 |   31 |    1 | 161678 |         784517 |         135597 |
| 98      | 3GaOot54BiJ4gVV |   25 |    1 | 553679 |         812828 |         317705 |
| 98      | 3HHEKlB00ZBIfVP |   58 |    0 | 987960 |         656147 |         752256 |
| 98      | 3hvRYWPgcGj4gOf |   67 |    1 | 118310 |         976024 |         382797 |
+---------+-----------------+------+------+--------+----------------+----------------+
10 rows in set (0.05 sec)

Part3: Doris导入方式:BrokerLoad

Broker load 是一个异步的导入方式,支持的数据源取决于 Broker 进程支持的数据源。用户需要通过 MySQL 协议 创建 Broker load 导入,并通过查看导入命令检查导入结果。

BrokerLoad的适用场景:

  • 源数据在 Broker 可以访问的存储系统中,如 HDFS。
  • 数据量在 几十到百GB 级别。

(1)Broker Load的基本原理

用户在提交导入任务后,FE 会生成对应的 Plan 并根据目前 BE 的个数和文件的大小,将 Plan 分给 多个 BE 执行,每个 BE 执行一部分导入数据。BE 在执行的过程中会从 Broker 拉取数据,在对数据 transform 之后将数据导入系统。所有 BE 均完成导入,由 FE 最终决定导入是否成功。

                 +
                 | 1. user create broker load
                 v
            +----+----+
            |         |
            |   FE    |
            |         |
            +----+----+
                 |
                 | 2. BE etl and load the data
    +--------------------------+
    |            |             |
+---v---+     +--v----+    +---v---+
|       |     |       |    |       |
|  BE   |     |  BE   |    |   BE  |
|       |     |       |    |       |
+---+-^-+     +---+-^-+    +--+-^--+
    | |           | |         | |
    | |           | |         | | 3. pull data from broker
+---v-+-+     +---v-+-+    +--v-+--+
|       |     |       |    |       |
|Broker |     |Broker |    |Broker |
|       |     |       |    |       |
+---+-^-+     +---+-^-+    +---+-^-+
    | |           | |          | |
+---v-+-----------v-+----------v-+-+
|       HDFS/BOS/AFS cluster       |
|                                  |
+----------------------------------+

总结起来,BrokerLoad有几个要点:

  • BE需要从Broker拉取数据,并对数据进行transform;
  • Broker需要与外部数据源通信(HDFS、OBS等);
  • BrokerLoad目前仅支持的数据格式为:csv、json、orc和parquet;
  • BrokerLoad适用于导入数据量较大的场景,如:单次导入10G~100G之间的数据;

(2)BrokerLoad的实战案例

  • BrokerLoad导入OBS上的数据:

在obs上保证有以下数据:

(1)在doris的mysql客户端执行以下命令创建测试表并使用brokerload将obs的csv文件导入测试表:

create database test_broker_load;

use test_broker_load;

CREATE TABLE IF NOT EXISTS test_csv
(
    `user_id` LARGEINT NOT NULL COMMENT "用户id",
    `city` VARCHAR(20) COMMENT "用户所在城市",
    `age` SMALLINT COMMENT "用户年龄",
    `sex` TINYINT COMMENT "用户性别",
    `cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费",
    `max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间",
    `min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时间"
)
AGGREGATE KEY(`user_id`, `city`, `age`, `sex`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 100
PROPERTIES (
"replication_allocation" = "tag.location.default: 3"
);



LOAD LABEL brokerload_test_csv_label001
(
DATA INFILE("obs://****/brokerload_test/test_data.csv")
INTO TABLE `test_csv`
COLUMNS TERMINATED BY ','
)
WITH BROKER "broker1"
(
    "fs.obs.access.key" = "xxxx",
    "fs.obs.secret.key" = "xxxx",
    "fs.obs.endpoint" = "xxxx"
);

在doris的mysql客户端执行命令查询进度:State字段为FINISHED则为导入完毕

mysql> show load order by createtime desc limit 1\G;
*************************** 1. row ***************************
         JobId: 296805
         Label: brokerload_test_csv_label001
         State: LOADING
      Progress: ETL:100%; LOAD:0%
          Type: BROKER
       EtlInfo: NULL
      TaskInfo: cluster:N/A; timeout(s):14400; max_filter_ratio:0.0
      ErrorMsg: NULL
    CreateTime: 2023-12-15 11:25:41
  EtlStartTime: 2023-12-15 11:25:44
 EtlFinishTime: 2023-12-15 11:25:44
 LoadStartTime: 2023-12-15 11:25:44
LoadFinishTime: NULL
           URL: NULL
    JobDetails: {"Unfinished backends":{"5a5bb0bbf6374e17-bf4db3bc0fb72636":[10068]},"ScannedRows":0,"TaskNumber":1,"LoadBytes":0,"All backends":{"5a5bb0bbf6374e17-bf4db3bc0fb72636":[10068]},"FileNumber":1,"FileSize":47185011}
 TransactionId: 220717
  ErrorTablets: {}
1 row in set (0.03 sec)


mysql> show load order by createtime desc limit 1\G;
*************************** 1. row ***************************
         JobId: 296805
         Label: brokerload_test_csv_label001
         State: FINISHED
      Progress: ETL:100%; LOAD:100%
          Type: BROKER
       EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=1000000
      TaskInfo: cluster:N/A; timeout(s):14400; max_filter_ratio:0.0
      ErrorMsg: NULL
    CreateTime: 2023-12-15 11:25:41
  EtlStartTime: 2023-12-15 11:25:44
 EtlFinishTime: 2023-12-15 11:25:44
 LoadStartTime: 2023-12-15 11:25:44
LoadFinishTime: 2023-12-15 11:25:49
           URL: NULL
    JobDetails: {"Unfinished backends":{"5a5bb0bbf6374e17-bf4db3bc0fb72636":[]},"ScannedRows":1000000,"TaskNumber":1,"LoadBytes":60000000,"All backends":{"5a5bb0bbf6374e17-bf4db3bc0fb72636":[10068]},"FileNumber":1,"FileSize":47185011}
 TransactionId: 220717
  ErrorTablets: {}
1 row in set (0.01 sec)

mysql> select * from test_csv limit 10;
+---------+-----------------+------+------+--------+----------------+----------------+
| user_id | city            | age  | sex  | cost   | max_dwell_time | min_dwell_time |
+---------+-----------------+------+------+--------+----------------+----------------+
| 18      | 14ZqoJvsFdO2MS5 |   55 |    0 | 584461 |         109225 |         339355 |
| 18      | 1PDzT3RMvQXfV70 |   28 |    1 | 738864 |         437902 |         617665 |
| 18      | 24KrQ3YBtynUSQo |   73 |    1 | 692823 |         280239 |         354826 |
| 18      | 4xvuG8GFlvkC7h8 |   97 |    0 | 891892 |         581610 |         167029 |
| 18      | 57QYS5HgqAwxk1h |   69 |    1 | 263653 |         544356 |         457608 |
| 18      | 5lAa1RTRNpywmc3 |   99 |    1 | 143878 |         362780 |         575735 |
| 18      | 6LuVmpV7Ayi55MH |   40 |    0 | 151588 |         839104 |         874164 |
| 18      | 6xGoa125bcjL8ds |   10 |    0 | 504910 |         349479 |         762094 |
| 18      | 7l0nvNOgkRDQNfF |   80 |    0 |  23934 |         134383 |         994837 |
| 18      | 8bDEyz7bCb3ysWa |   50 |    1 | 412142 |         573170 |          91236 |
+---------+-----------------+------+------+--------+----------------+----------------+
10 rows in set (0.06 sec)

(2)在doris的mysql客户端执行以下命令创建测试表并使用brokerload将obs的json文件导入测试表:

create database IF NOT EXISTS test_broker_load;

use test_broker_load;

CREATE TABLE IF NOT EXISTS test_json
(
    `user_id` LARGEINT NOT NULL COMMENT "用户id",
    `city` VARCHAR(20) COMMENT "用户所在城市",
    `age` SMALLINT COMMENT "用户年龄",
    `sex` TINYINT COMMENT "用户性别",
    `cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费",
    `max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间",
    `min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时间"
)
AGGREGATE KEY(`user_id`, `city`, `age`, `sex`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 100
PROPERTIES (
"replication_allocation" = "tag.location.default: 3"
);



LOAD LABEL brokerload_test_json_label001
(
DATA INFILE("obs://****/brokerload_test/test_data.json")
INTO TABLE `test_json`
FORMAT AS "json"

)
WITH BROKER "broker1"
(
    "fs.obs.access.key" = "xxx",
    "fs.obs.secret.key" = "xxxx",
    "fs.obs.endpoint" = "xxxx"
);

在doris的mysql客户端执行命令查询进度:State字段为FINISHED则为导入完毕

mysql> show load order by createtime desc limit 1\G;
*************************** 1. row ***************************
         JobId: 298969
         Label: brokerload_test_json_label001
         State: LOADING
      Progress: ETL:100%; LOAD:30%
          Type: BROKER
       EtlInfo: NULL
      TaskInfo: cluster:N/A; timeout(s):14400; max_filter_ratio:0.0
      ErrorMsg: NULL
    CreateTime: 2023-12-15 11:37:23
  EtlStartTime: 2023-12-15 11:37:24
 EtlFinishTime: 2023-12-15 11:37:24
 LoadStartTime: 2023-12-15 11:37:24
LoadFinishTime: NULL
           URL: NULL
    JobDetails: {"Unfinished backends":{"c3d584d9322448c9-a20d378c230f968f":[10068]},"ScannedRows":625856,"TaskNumber":1,"LoadBytes":37551360,"All backends":{"c3d584d9322448c9-a20d378c230f968f":[10068]},"FileNumber":1,"FileSize":123185011}
 TransactionId: 222061
  ErrorTablets: {}
1 row in set (0.00 sec)


mysql> show load order by createtime desc limit 1\G;
*************************** 1. row ***************************
         JobId: 298969
         Label: brokerload_test_json_label001
         State: FINISHED
      Progress: ETL:100%; LOAD:100%
          Type: BROKER
       EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=1000000
      TaskInfo: cluster:N/A; timeout(s):14400; max_filter_ratio:0.0
      ErrorMsg: NULL
    CreateTime: 2023-12-15 11:37:23
  EtlStartTime: 2023-12-15 11:37:24
 EtlFinishTime: 2023-12-15 11:37:24
 LoadStartTime: 2023-12-15 11:37:24
LoadFinishTime: 2023-12-15 11:37:35
           URL: NULL
    JobDetails: {"Unfinished backends":{"c3d584d9322448c9-a20d378c230f968f":[]},"ScannedRows":1000000,"TaskNumber":1,"LoadBytes":60000000,"All backends":{"c3d584d9322448c9-a20d378c230f968f":[10068]},"FileNumber":1,"FileSize":123185011}
 TransactionId: 222061
  ErrorTablets: {}
1 row in set (0.00 sec)


mysql> select * from test_json limit 10;
+---------+-----------------+------+------+--------+----------------+----------------+
| user_id | city            | age  | sex  | cost   | max_dwell_time | min_dwell_time |
+---------+-----------------+------+------+--------+----------------+----------------+
| 30      | 0KdoCD5iOjMERrm |   36 |    1 | 886058 |         628531 |         792428 |
| 30      | 0zssOdHjBb3iTO9 |   14 |    1 | 806385 |         488571 |         327795 |
| 30      | 1MyKEVsBFFqqrle |   25 |    1 | 555584 |          64347 |          94759 |
| 30      | 1XKruA3bIbhRiWW |   51 |    1 | 421141 |         582582 |         608166 |
| 30      | 2PnqYMYfTr2Y6ZK |   96 |    0 | 171461 |         786868 |         741986 |
| 30      | 2ZfcWyyAS0it99q |   19 |    0 | 272791 |         646823 |         640081 |
| 30      | 2apAJLcJrJhejQw |   65 |    1 | 789715 |         612145 |         277791 |
| 30      | 36tQg24IKueA1Wj |   88 |    0 | 881280 |         920618 |         397938 |
| 30      | 3S0KsfOlV8WwRPa |   49 |    0 | 608975 |         880464 |          28009 |
| 30      | 3YJ59TzYisHxgWj |   65 |    0 |   1649 |         633309 |         121486 |
+---------+-----------------+------+------+--------+----------------+----------------+
10 rows in set (0.04 sec)

(3)在doris的mysql客户端执行以下命令创建测试表并使用brokerload将obs的parquet文件导入测试表:

create database IF NOT EXISTS test_broker_load;

use test_broker_load;

CREATE TABLE IF NOT EXISTS test_parquet
(
    `user_id` LARGEINT NOT NULL COMMENT "用户id",
    `city` VARCHAR(20) COMMENT "用户所在城市",
    `age` SMALLINT COMMENT "用户年龄",
    `sex` TINYINT COMMENT "用户性别",
    `cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费",
    `max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间",
    `min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时间"
)
AGGREGATE KEY(`user_id`, `city`, `age`, `sex`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 100
PROPERTIES (
"replication_allocation" = "tag.location.default: 3"
);



LOAD LABEL brokerload_test_parquet_label001
(
DATA INFILE("obs://****/brokerload_test/test_data.parquet")
INTO TABLE `test_parquet`
FORMAT AS "parquet"

)
WITH BROKER "broker1"
(
    "fs.obs.access.key" = "xxxx",
    "fs.obs.secret.key" = "xxxx",
    "fs.obs.endpoint" = "xxxx"
);

在doris的mysql客户端执行命令查询进度:State字段为FINISHED则为导入完毕

mysql> show load order by createtime desc limit 1\G;
*************************** 1. row ***************************
         JobId: 299384
         Label: brokerload_test_parquet_label001
         State: FINISHED
      Progress: ETL:100%; LOAD:100%
          Type: BROKER
       EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=1000000
      TaskInfo: cluster:N/A; timeout(s):14400; max_filter_ratio:0.0
      ErrorMsg: NULL
    CreateTime: 2023-12-15 11:42:07
  EtlStartTime: 2023-12-15 11:42:09
 EtlFinishTime: 2023-12-15 11:42:09
 LoadStartTime: 2023-12-15 11:42:09
LoadFinishTime: 2023-12-15 11:42:11
           URL: NULL
    JobDetails: {"Unfinished backends":{"41522d96b34e44ab-8818f255ad23def4":[]},"ScannedRows":1000000,"TaskNumber":1,"LoadBytes":60000000,"All backends":{"41522d96b34e44ab-8818f255ad23def4":[10066]},"FileNumber":1,"FileSize":33965271}
 TransactionId: 222067
  ErrorTablets: {}
1 row in set (0.02 sec)


mysql> select * from test_parquet limit 10;
+---------+-----------------+------+------+--------+----------------+----------------+
| user_id | city            | age  | sex  | cost   | max_dwell_time | min_dwell_time |
+---------+-----------------+------+------+--------+----------------+----------------+
| 18      | 14ZqoJvsFdO2MS5 |   55 |    0 | 584461 |         109225 |         339355 |
| 18      | 1PDzT3RMvQXfV70 |   28 |    1 | 738864 |         437902 |         617665 |
| 18      | 24KrQ3YBtynUSQo |   73 |    1 | 692823 |         280239 |         354826 |
| 18      | 4xvuG8GFlvkC7h8 |   97 |    0 | 891892 |         581610 |         167029 |
| 18      | 57QYS5HgqAwxk1h |   69 |    1 | 263653 |         544356 |         457608 |
| 18      | 5lAa1RTRNpywmc3 |   99 |    1 | 143878 |         362780 |         575735 |
| 18      | 6LuVmpV7Ayi55MH |   40 |    0 | 151588 |         839104 |         874164 |
| 18      | 6xGoa125bcjL8ds |   10 |    0 | 504910 |         349479 |         762094 |
| 18      | 7l0nvNOgkRDQNfF |   80 |    0 |  23934 |         134383 |         994837 |
| 18      | 8bDEyz7bCb3ysWa |   50 |    1 | 412142 |         573170 |          91236 |
+---------+-----------------+------+------+--------+----------------+----------------+
10 rows in set (0.04 sec)

(4)在doris的mysql客户端执行以下命令创建测试表并使用brokerload将obs的orc文件导入测试表:

create database IF NOT EXISTS test_broker_load;

use test_broker_load;

CREATE TABLE IF NOT EXISTS test_orc
(
    `user_id` LARGEINT NOT NULL COMMENT "用户id",
    `city` VARCHAR(20) COMMENT "用户所在城市",
    `age` SMALLINT COMMENT "用户年龄",
    `sex` TINYINT COMMENT "用户性别",
    `cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费",
    `max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间",
    `min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时间"
)
AGGREGATE KEY(`user_id`, `city`, `age`, `sex`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 100
PROPERTIES (
"replication_allocation" = "tag.location.default: 3"
);



LOAD LABEL brokerload_test_orc_label001
(
DATA INFILE("obs://****/brokerload_test/test_data.orc")
INTO TABLE `test_orc`
FORMAT AS "orc"

)
WITH BROKER "broker1"
(
    "fs.obs.access.key" = "xxxx",
    "fs.obs.secret.key" = "xxxx",
    "fs.obs.endpoint" = "xxxx"
);

在doris的mysql客户端执行命令查询进度:State字段为FINISHED则为导入完毕

mysql> show load order by createtime desc limit 1\G;
*************************** 1. row ***************************
         JobId: 299795
         Label: brokerload_test_orc_label001
         State: FINISHED
      Progress: ETL:100%; LOAD:100%
          Type: BROKER
       EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=1000000
      TaskInfo: cluster:N/A; timeout(s):14400; max_filter_ratio:0.0
      ErrorMsg: NULL
    CreateTime: 2023-12-15 11:44:06
  EtlStartTime: 2023-12-15 11:44:09
 EtlFinishTime: 2023-12-15 11:44:09
 LoadStartTime: 2023-12-15 11:44:09
LoadFinishTime: 2023-12-15 11:44:12
           URL: NULL
    JobDetails: {"Unfinished backends":{"5105dbe03efb4348-8ecd47b926d2c9e5":[]},"ScannedRows":1000000,"TaskNumber":1,"LoadBytes":60000000,"All backends":{"5105dbe03efb4348-8ecd47b926d2c9e5":[10066]},"FileNumber":1,"FileSize":30278802}
 TransactionId: 222071
  ErrorTablets: {}
1 row in set (0.00 sec)

mysql> select * from test_orc limit 10;
+---------+-----------------+------+------+--------+----------------+----------------+
| user_id | city            | age  | sex  | cost   | max_dwell_time | min_dwell_time |
+---------+-----------------+------+------+--------+----------------+----------------+
| 52      | 0MIuWEHK3W4bI8b |   25 |    0 | 724049 |         145616 |         820867 |
| 52      | 0itV1cYkZPtUUus |   45 |    1 | 923031 |         106160 |         703906 |
| 52      | 0z2v63jZ0FjkkZ6 |   24 |    0 | 602503 |         898953 |         792456 |
| 52      | 1CaHXTIt6MDFyfN |   23 |    0 | 696754 |         490957 |          67682 |
| 52      | 1cn5mqqvuXzCLvf |   94 |    0 | 269539 |         137037 |         383264 |
| 52      | 1obMBQjopbvDjUY |   60 |    1 | 837550 |         828951 |         370053 |
| 52      | 1rNpaeJPLiFAuiG |   18 |    1 | 652469 |         830079 |         892756 |
| 52      | 2Ln8FdrYPDKN0eK |   80 |    1 | 884577 |         211411 |          42759 |
| 52      | 2mvODsnLA2XOQxT |   69 |    0 | 829031 |         345402 |         769167 |
| 52      | 2nLhQBksWpe7QNk |   87 |    1 |  60280 |         554871 |         817379 |
+---------+-----------------+------+------+--------+----------------+----------------+
10 rows in set (0.05 sec)
  • BrokerLoad 导入 HDFS上的数据:(以下案例均在kerberos认证下的hdfs集群,如有需要请联系CloudTable工作人员上传相关keytab与krb文件)

首先,把测试数据上传至hdfs中:

[root@qh-node-master1eGtP client]# hdfs dfs -put /tmp/test_data.csv /broker_data
[root@qh-node-master1eGtP client]# hdfs dfs -put /tmp/test_data.json /broker_data
[root@qh-node-master1eGtP client]# hdfs dfs -put /tmp/test_data.parquet /broker_data
[root@qh-node-master1eGtP client]# hdfs dfs -put /tmp/test_data.orc /broker_data
[root@qh-node-master1eGtP client]# hdfs dfs -ls -h /broker_data
Found 4 items
-rw-r--r-- 3 lizuxian_test supergroup 45.0 M 2023-12-16 15:26 /broker_data/test_data.csv
-rw-r--r-- 3 lizuxian_test supergroup 117.5 M 2023-12-16 15:27 /broker_data/test_data.json
-rw-r--r-- 3 lizuxian_test supergroup 28.9 M 2023-12-16 15:27 /broker_data/test_data.orc
-rw-r--r-- 3 lizuxian_test supergroup 32.4 M 2023-12-16 15:27 /broker_data/test_data.parquet

(1)在doris的mysql客户端执行以下命令创建测试表并使用brokerload将hdfs的csv文件导入测试表:

create database IF NOT EXISTS test_broker_load;
use test_broker_load;

CREATE TABLE IF NOT EXISTS test_csv_hdfs
(
`user_id` LARGEINT NOT NULL COMMENT "用户id",
`city` VARCHAR(20) COMMENT "用户所在城市",
`age` SMALLINT COMMENT "用户年龄",
`sex` TINYINT COMMENT "用户性别",
`cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费",
`max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间",
`min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时间"
)
AGGREGATE KEY(`user_id`, `city`, `age`, `sex`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 100
PROPERTIES (
"replication_allocation" = "tag.location.default: 3"
);


LOAD LABEL brokerload_test_hdfs_csv_label001
(
DATA INFILE("hdfs://{name_node_ip}:
{dfs.namenode.rpc.port}/broker_data/test_data.csv")
INTO TABLE `test_csv_hdfs`
COLUMNS TERMINATED BY ','
FORMAT AS 'csv'
)
WITH BROKER 'broker1' (
'hadoop.security.authentication'='kerberos',
'kerberos_keytab'='{keytab_path}',
'kerberos_principal'='lizuxian_test@C4F9C********D274F518680.COM' -- 可以执行`kinit {用户名}` 获得
)
PROPERTIES
(
'timeout'='1200',
'max_filter_ratio'='0.1'
);

在doris的mysql客户端执行命令查询进度:State字段为FINISHED则为导入完毕

mysql> show load order by createtime desc limit 1\G;
*************************** 1. row ***************************
JobId: 341574
Label: brokerload_test_hdfs_csv_label001
State: FINISHED
Progress: ETL:100%; LOAD:100%
Type: BROKER
EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=1000000
TaskInfo: cluster:N/A; timeout(s):1200; max_filter_ratio:0.1
ErrorMsg: NULL
CreateTime: 2023-12-16 15:38:41
EtlStartTime: 2023-12-16 15:38:44
EtlFinishTime: 2023-12-16 15:38:44
LoadStartTime: 2023-12-16 15:38:44
LoadFinishTime: 2023-12-16 15:38:50
URL: NULL
JobDetails: {"Unfinished backends":{"695c5e8100e249cb-
904526826e9b5b52":
[]},"ScannedRows":1000000,"TaskNumber":1,"LoadBytes":60000000,"All
backends":{"695c5e8100e249cb-904526826e9b5b52":
[10068]},"FileNumber":1,"FileSize":47185011}
TransactionId: 248175
ErrorTablets: {}
1 row in set (0.00 sec)
mysql> select * from test_csv_hdfs limit 10;
+---------+-----------------+------+------+--------+----------------+----------------+
| user_id | city | age | sex | cost | max_dwell_time |min_dwell_time |
+---------+-----------------+------+------+--------+----------------+----------------+
| 21 | 0Ebx7zo8fRAIG71 | 77 | 1 | 4438   | 752439 |748616 |
| 21 | 0cwU5nA9sXkQjAN | 94 | 1 | 137373 | 10643  |610344 |
| 21 | 0otCSIhjkuvetsS | 75 | 1 | 116712 | 624894 |146138 |
| 21 | 1Lh46tLqlLs31YC | 50 | 0 | 718119 | 904591 |85446  |
| 21 | 2054ikYBDzq1COa | 53 | 0 | 954765 | 683905 |310742 |
| 21 | 2U7pZ2ACH3gQOMu | 67 | 1 | 962595 | 168301 |483849 |
| 21 | 3pwyeP3nwtZyRg0 | 58 | 0 | 606347 | 193629 |114854 |
| 21 | 3vCXiQEUTTRi1aK | 34 | 0 | 220115 | 437874 |732262 |
| 21 | 3wz1dlZySof9PGA | 57 | 1 | 562944 | 666807 |588531 |
| 21 | 4Byce2YLt3PQNxH | 53 | 1 | 528431 | 723889 |532362 |
+---------+-----------------+------+------+--------+----------------+----------------+
10 rows in set (0.07 sec)

(2)在doris的mysql客户端执行以下命令创建测试表并使用brokerload将hdfs的json文件导入测试
表:

create database IF NOT EXISTS test_broker_load;
use test_broker_load;
CREATE TABLE IF NOT EXISTS test_json_hdfs
(
`user_id` LARGEINT NOT NULL COMMENT "用户id",
`city` VARCHAR(20) COMMENT "用户所在城市",
`age` SMALLINT COMMENT "用户年龄",
`sex` TINYINT COMMENT "用户性别",
`cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费",
`max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间",
`min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时间"
)
AGGREGATE KEY(`user_id`, `city`, `age`, `sex`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 100
PROPERTIES (
"replication_allocation" = "tag.location.default: 3"
);

LOAD LABEL brokerload_test_hdfs_json_label001
(
DATA INFILE("hdfs://{name_node_ip}:
{dfs.namenode.rpc.port}/broker_data/test_data.json")
INTO TABLE `test_json_hdfs`
FORMAT AS 'json'
)
WITH BROKER 'broker1' (
'hadoop.security.authentication'='kerberos',
'kerberos_keytab'='{keytab_path}',
'kerberos_principal'='lizuxian_test@C4F9*********_0D274F518680.CO
M' -- 可以执行`kinit {用户名}` 获得
)
PROPERTIES
(
'timeout'='1200',
'max_filter_ratio'='0.1'
);

在doris的mysql客户端执行命令查询进度:State字段为FINISHED则为导入完毕

mysql> show load order by createtime desc limit 1\G;
*************************** 1. row ***************************
JobId: 341987
Label: brokerload_test_hdfs_json_label001
State: FINISHED
Progress: ETL:100%; LOAD:100%
Type: BROKER
EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=1000000
TaskInfo: cluster:N/A; timeout(s):1200; max_filter_ratio:0.1
ErrorMsg: NULL
CreateTime: 2023-12-16 15:43:13
EtlStartTime: 2023-12-16 15:43:21
EtlFinishTime: 2023-12-16 15:43:21
LoadStartTime: 2023-12-16 15:43:21
LoadFinishTime: 2023-12-16 15:43:29
URL: NULL
JobDetails: {"Unfinished backends":{"9a5b289be4604780-
80bba11c1a64fb7a":
[]},"ScannedRows":1000000,"TaskNumber":1,"LoadBytes":60000000,"All
backends":{"9a5b289be4604780-80bba11c1a64fb7a":
[10066]},"FileNumber":1,"FileSize":123185011}
TransactionId: 248180
ErrorTablets: {}
1 row in set (0.01 sec)
mysql> select * from test_json_hdfs limit 10;
+---------+-----------------+------+------+--------+----------------+----------------+
| user_id | city | age | sex | cost | max_dwell_time |min_dwell_time |
+---------+-----------------+------+------+--------+----------------+----------------+
| 333 | 1ChKZMB7BcaBII1 | 14 | 0 | 521905 | 121745 |69743  |
| 333 | 1hk2mmhGMid6XzF | 46 | 0 | 7750   | 850394 |127748 |
| 333 | 1j80IZqT0emeX6P | 40 | 1 | 686404 | 733847 |909025 |
| 333 | 2Op7dMBUTfSBmSt | 63 | 0 | 801874 | 820379 |395929 |
| 333 | 2mKmqt0NzBgqfon | 91 | 1 | 124587 | 711769 |450717 |
| 333 | 2tgnUosR4E9BJHC | 98 | 1 | 545540 | 578186 |259229 |
| 333 | 44QZHyforBIuff5 | 0  | 0 | 779132 | 648994 |370163 |
| 333 | 4CDd7uKUmGbtx3M | 42 | 1 | 830187 | 83749  |703733 |
| 333 | 4LOyGiVSHM6Z0SR | 72 | 1 | 614207 | 106296 |947380 |
| 333 | 4MBJnFU0UsYDcVv | 20 | 1 | 515572 | 540372 |251512 |
+---------+-----------------+------+------+--------+----------------+----------------+
10 rows in set (0.06 sec)

(3)在doris的mysql客户端执行以下命令创建测试表并使用brokerload将hdfs的parquet文件导入测试表:

create database IF NOT EXISTS test_broker_load;
use test_broker_load;
CREATE TABLE IF NOT EXISTS test_parquet_hdfs
(
`user_id` LARGEINT NOT NULL COMMENT "用户id",
`city` VARCHAR(20) COMMENT "用户所在城市",
`age` SMALLINT COMMENT "用户年龄",
`sex` TINYINT COMMENT "用户性别",
`cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费",
`max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间",
`min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时间"
)
AGGREGATE KEY(`user_id`, `city`, `age`, `sex`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 100
PROPERTIES (
"replication_allocation" = "tag.location.default: 3"
);
LOAD LABEL brokerload_test_hdfs_parquet_label001
(
DATA INFILE("hdfs://{name_node_ip}:
{dfs.namenode.rpc.port}/broker_data/test_data.parquet")
INTO TABLE `test_parquet_hdfs`
FORMAT AS 'parquet'
)
WITH BROKER 'broker1' (
'hadoop.security.authentication'='kerberos',
'kerberos_keytab'='{keytab_path}',
'kerberos_principal'='lizuxian_test@C4F9C8*******0D274F518680.CO
M' -- 可以执行`kinit {用户名}` 获得
)
PROPERTIES
(
'timeout'='1200',
'max_filter_ratio'='0.1'
);

在doris的mysql客户端执行命令查询进度:State字段为FINISHED则为导入完毕

mysql> show load order by createtime desc limit 1\G;
*************************** 1. row ***************************
JobId: 342398
Label: brokerload_test_hdfs_parquet_label001
State: FINISHED
Progress: ETL:100%; LOAD:100%
Type: BROKER
EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=1000000
TaskInfo: cluster:N/A; timeout(s):1200; max_filter_ratio:0.1
ErrorMsg: NULL
CreateTime: 2023-12-16 15:45:41
EtlStartTime: 2023-12-16 15:45:44
EtlFinishTime: 2023-12-16 15:45:44
LoadStartTime: 2023-12-16 15:45:44
LoadFinishTime: 2023-12-16 15:45:46
URL: NULL
JobDetails: {"Unfinished backends":{"35eb3e403898466dab36749e60f9c7f9":
[]},"ScannedRows":1000000,"TaskNumber":1,"LoadBytes":60000000,"All
backends":{"35eb3e403898466d-ab36749e60f9c7f9":
[10067]},"FileNumber":1,"FileSize":33965271}
TransactionId: 248184
ErrorTablets: {}
1 row in set (0.00 sec)
mysql> select * from test_parquet_hdfs limit 10;
+---------+-----------------+------+------+--------+----------------+----------------+
| user_id | city | age | sex | cost | max_dwell_time |min_dwell_time |
+---------+-----------------+------+------+--------+----------------+----------------+
| 30 | 0KdoCD5iOjMERrm | 36 | 1 | 886058 | 628531 |792428 |
| 30 | 0zssOdHjBb3iTO9 | 14 | 1 | 806385 | 488571 |327795 |
| 30 | 1MyKEVsBFFqqrle | 25 | 1 | 555584 | 64347  |94759  |
| 30 | 1XKruA3bIbhRiWW | 51 | 1 | 421141 | 582582 |608166 |
| 30 | 2PnqYMYfTr2Y6ZK | 96 | 0 | 171461 | 786868 |741986 |
| 30 | 2ZfcWyyAS0it99q | 19 | 0 | 272791 | 646823 |640081 |
| 30 | 2apAJLcJrJhejQw | 65 | 1 | 789715 | 612145 |277791 |
| 30 | 36tQg24IKueA1Wj | 88 | 0 | 881280 | 920618 |397938 |
| 30 | 3S0KsfOlV8WwRPa | 49 | 0 | 608975 | 880464 |28009  |
| 30 | 3YJ59TzYisHxgWj | 65 | 0 | 1649   | 633309 |121486 |
+---------+-----------------+------+------+--------+----------------+----------------+
10 rows in set (0.05 sec)

(4)在doris的mysql客户端执行以下命令创建测试表并使用brokerload将hdfs的orc文件导入测试表:

create database IF NOT EXISTS test_broker_load;
use test_broker_load;
CREATE TABLE IF NOT EXISTS test_orc_hdfs
(
`user_id` LARGEINT NOT NULL COMMENT "用户id",
`city` VARCHAR(20) COMMENT "用户所在城市",
`age` SMALLINT COMMENT "用户年龄",
`sex` TINYINT COMMENT "用户性别",
`cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费",
`max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间",
`min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时间"
)
AGGREGATE KEY(`user_id`, `city`, `age`, `sex`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 100
PROPERTIES (
"replication_allocation" = "tag.location.default: 3"
);
LOAD LABEL brokerload_test_hdfs_orc_label001
(
DATA INFILE("hdfs://{name_node_ip}:
{dfs.namenode.rpc.port}/broker_data/test_data.orc")
INTO TABLE `test_orc_hdfs`
FORMAT AS 'orc'
)
WITH BROKER 'broker1' (
'hadoop.security.authentication'='kerberos',
'kerberos_keytab'='{keytab_path}',
'kerberos_principal'='lizuxian_test@C4F9C837_896********74F518680.CO
M' -- 可以执行`kinit {用户名}` 获得
)
PROPERTIES
(
'timeout'='1200',
'max_filter_ratio'='0.1'
);

在doris的mysql客户端执行命令查询进度:State字段为FINISHED则为导入完毕

mysql> show load order by createtime desc limit 1\G;
*************************** 1. row ***************************
JobId: 342809
Label: brokerload_test_hdfs_orc_label001
State: FINISHED
Progress: ETL:100%; LOAD:100%
Type: BROKER
EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=1000000
TaskInfo: cluster:N/A; timeout(s):1200; max_filter_ratio:0.1
ErrorMsg: NULL
CreateTime: 2023-12-16 15:47:48
EtlStartTime: 2023-12-16 15:47:54
EtlFinishTime: 2023-12-16 15:47:54
LoadStartTime: 2023-12-16 15:47:54
LoadFinishTime: 2023-12-16 15:47:56
URL: NULL
JobDetails: {"Unfinished backends":{"a8855386fee040b3-
bf496de3aeed9b35":
[]},"ScannedRows":1000000,"TaskNumber":1,"LoadBytes":60000000,"All
backends":{"a8855386fee040b3-bf496de3aeed9b35":
[10066]},"FileNumber":1,"FileSize":30278802}
TransactionId: 248188
ErrorTablets: {}
1 row in set (0.00 sec)
mysql> select * from test_orc_hdfs limit 10;
+---------+-----------------+------+------+--------+----------------+----------------+
| user_id | city | age | sex | cost | max_dwell_time |min_dwell_time |
+---------+-----------------+------+------+--------+----------------+----------------+
| 304 | 08fJBA8PC4JVvq9 | 78 | 1 | 524971 | 916140 |86993  |
| 304 | 0GgTHWckbRA8UFg | 36 | 0 | 828414 | 306785 |133672 |
| 304 | 0mKUznbdEGWhVCX | 54 | 1 | 624874 | 794295 |31527  |
| 304 | 1HBHO4opOemHs7F | 72 | 0 | 906372 | 717418 |163735 |
| 304 | 1YU3rngHmeOa0SL | 10 | 1 | 968173 | 35689  |33720  |
| 304 | 28FZpfgyijMSTT7 | 3  | 0 | 176383 | 491327 |699844 |
| 304 | 2NAQaUkC1MIxmi5 | 57 | 0 | 932530 | 614064 |524431 |
| 304 | 2tu5EhavPVzg0Be | 20 | 1 | 524956 | 31996  |836995 |
| 304 | 38y7GD7vmHUqJkv | 59 | 1 | 928067 | 849794 |88436  |
| 304 | 3JIzbSVY85AsC0h | 17 | 0 | 586507 | 309949 |701555 |
+---------+-----------------+------+------+--------+----------------+----------------+
10 rows in set (0.05 sec)

(3)BrokerLoad的注意事项

  • Label、导入事务、多表原子性:Doris 中所有导入任务都是原子生效的。并且在同一个导入任务中对多张表的导入也能够保证原子性。同时,Doris 还可以通过 Label 的机制来保证数据导入的不丢不重。
  • 超时时间:Broker Load 的默认超时时间为 4 小时。从任务提交开始算起。如果在超时时间内没有完成,则任务会失败。如果想修改超时时间,则可以在PROPERTIES中加入:“timeout” = “3600”(超时时间为3600秒)。
  • 数据量和任务数限制:Broker Load 适合在一个导入任务中导入100GB以内的数据。虽然理论上在一个导入任务中导入的数据量没有上限。但是提交过大的导入会导致运行时间较长,并且失败后重试的代价也会增加(也会增加集群压力)。同时受限于集群规模,CloudTable Doris 限制了导入的最大数据量为 ComputeNode 节点数 * 3GB。以保证系统资源的合理利用。如果有大数据量需要导入,建议分成多个导入任务提交。Doris 同时会限制集群内同时运行的导入任务数量,通常在 3-10 个不等。之后提交的导入作业会排队等待。队列最大长度为 100。之后的提交会直接拒绝。注意排队时间也被计算到了作业总时间中。如果超时,则作业会被取消。所以建议通过监控作业运行状态来合理控制作业提交频率。

关于数据量的进一步讨论:

这里仅讨论单个 BE 的情况,如果用户集群有多个 BE 则下面标题中的数据量应该乘以 BE 个数来计算。比如:如果用户有3个 BE,则 3G 以下(包含)则应该乘以 3,也就是 9G 以下(包含)。

  • 3G 以下(包含)(如果是3个BE,则是3 * 3G = 9G):用户可以直接提交 Broker load 创建导入请求。

  • 3G 以上(如果是3个BE,则是3 * 3G = 9G):调整Broker load 的导入参数来实现大文件的导入。

    i. 根据当前 BE 的个数和原始文件的大小修改单个 BE 的最大扫描量和最大并发数。

    修改 fe.conf 中配置 (联系CloudTable管理员设置)
    max_broker_concurrency = BE 个数
    当前导入任务单个 BE 处理的数据量 = 原始文件大小 / max_broker_concurrency
    max_bytes_per_broker_scanner >= 当前导入任务单个 BE 处理的数据量
    
    比如一个 100G 的文件,集群的 BE 个数为 10 个
    max_broker_concurrency = 10
    max_bytes_per_broker_scanner >= 10G = 100G / 10
    

    ii. 在创建导入的时候自定义当前导入任务的 timeout 时间

    当前导入任务单个 BE 处理的数据量 / 用户 Doris 集群最慢导入速度(MB/s) >= 当前导入任务的 timeout 时间 >= 当前导入任务单个 BE 处理的数据量 / 10M/s
    
    比如一个 100G 的文件,集群的 BE 个数为 10timeout >= 1000s = 10G / 10M/s
    

    iii. 当发现ii步计算出的 timeout 时间超过系统默认的导入最大超时时间 4小时,不推荐用户将导入最大超时时间直接改大来解决问题。单个导入时间如果超过默认的导入最大超时时间4小时,最好是通过切分待导入文件并且分多次导入来解决问题。主要原因是:单次导入超过4小时的话,导入失败后重试的时间成本很高。

    期望最大导入文件数据量 = 14400s * 10M/s * BE 个数
    比如:集群的 BE 个数为 10个
    期望最大导入文件数据量 = 14400s * 10M/s * 10 = 1440000M ≈ 1440G
    
    注意:一般用户的环境可能达不到 10M/s 的速度,所以建议超过 500G 的文件都进行文件切分,再导入。
    

以上就是本文关于 StreamLoad和BrokerLoad 的相关使用案例,如果您需要本文中的相关测试数据,可以在评论区评论,如果有需要的话,我可以再给大家出一期如果快速构建测试数据的案例。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。