快速的把HDFS中的数据导入ClickHouse

举报
1226741228 发表于 2019/01/16 11:28:57 2019/01/16
【摘要】 之前介绍的有关数据入库的经验都是基于实时数据流,数据存储在kafka中,使用Java或者Golang将数据从kafka中读取、解析、清洗之后写入ClickHouse中,实现数据的快速接入。在很多的使用场景中,数据都不是实时的,可能需要将HDFS或者是hive的数据导入ClickHouse,可以通过编写Spark程序实现数据的导入。HDFS to ClickHouse假设日志存储在HDFS中,...

之前介绍的有关数据入库的经验都是基于实时数据流,数据存储在kafka中,使用Java或者Golang将数据从kafka中读取、解析、清洗之后写入ClickHouse中,实现数据的快速接入。在很多的使用场景中,数据都不是实时的,可能需要将HDFS或者是hive的数据导入ClickHouse,可以通过编写Spark程序实现数据的导入。

HDFS to ClickHouse

假设日志存储在HDFS中,需要将日志进行解析并筛选出关心的字段,将对应的字段写入ClickHouse表中。

Log Sample

在HDFS存储的日志格式如下,

10.41.1.28 github.com 114.250.140.241 0.001s "127.0.0.1:80" [26/Oct/2018:03:09:32 +0800] "GET /InterestingLab/waterdrop HTTP/1.1" 200 0 "-" - "Dalvik/2.1.0 (Linux; U; Android 7.1.1; OPPO R11 Build/NMF26X)" "196" "-" "mainpage" "443" "-" "172.16.181.129"

ClickHouse Schema

ClickHouse建表如下,表按日进行分区

****** TABLE cms.cms_msg (    date Date,    datetime DateTime,    url String,    request_time Float32,    status String,    hostname String,    domain String,    remote_addr String,    data_size Int32,    pool String ) ENGINE = MergeTree PARTITION BY date ORDER BY date SETTINGS index_granularity = 16384

Waterdrop with ClickHouse

Waterdrop

Waterdrop是一个非常易用、高性能,能够应对海量数据的实时数据处理产品,它构建在Spark之上。Waterdrop拥有着非常丰富的插件,kafka、HDFS、kadu中读取数据,进行各种各样的数据处理,并将结果写入ClickHouse、Elasticsearch或者kafka中。

Prerequisites

首先需要安装Waterdrop,安装十分简单,无需配置环境变量

1.准备Spark环境

2.安装Waterdrop

3.配置Waterdrop

cd /usr/local wget https://archive.apache.org/dist/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz tar -xvf https://archive.apache.org/dist/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz wget https://github.com/InterestingLab/waterdrop/releases/download/v1.1.1/waterdrop-1.1.1.zip unzip waterdrop-1.1.1.zip cd waterdrop-1.1.1 vim config/waterdrop-env.sh # 指定Spark安装路径 SPARK_HOME=${SPARK_HOME:-/usr/local/spark-2.2.0-bin-hadoop2.7}

Waterdrop Pipeline

仅需要编写一个Waterdrop Pipeline的配置文件即可完成数据的导入。配置文件包括四个部分,分别是Spark、Input、filter和Output。

Spark

这一部分是Spark的相关配置,主要配置Spark执行时所需的资源大小。

spark {

spark.app.name = "Waterdrop"

spark.executor.instances = 2

spark.executor.cores = 1

spark.executor.memory = "1g"

}

Input

这一部分定义数据源,如下是从HDFS文件中读取text格式数据的配置案例。

input {

hdfs {

path = "hdfs://nomanode:8020/rowlog/accesslog"

table_name = "access_log"

format = "text"

}

}

Filter

在Filter部分,这里配置一系列的转化,包括正则解析将日志进行拆分、时间转换将HTTPDATE转化为ClickHouse支持的日志格式、对Number类型的字段进行类型转换以及通过SQL进行字段筛减等

filter {

# 使用正则解析原始日志

grok {

source_field = "raw_message"

pattern = '%{IP:ha_ip}\\s%{NOTSPACE:domain}\\s%{IP:remote_addr}\\s%{NUMBER:request_time}s\\s\"%{DATA:upstream_ip}\"\\s\\[%{HTTPDATE:timestamp}\\]\\s\"%{NOTSPACE:method}\\s%{DATA:url}\\s%{NOTSPACE:http_ver}\"\\s%{NUMBER:status}\\s%{NUMBER:body_bytes_send}\\s%{DATA:referer}\\s%{NOTSPACE:cookie_info}\\s\"%{DATA:user_agent}\"\\s%{DATA:uid}\\s%{DATA:session_id}\\s\"%{DATA:pool}\"\\s\"%{DATA:tag2}\"\\s%{DATA:tag3}\\s%{DATA:tag4}'

}

# 将"dd/MMM/yyyy:HH:mm:ss Z"格式的数据转换为

# "yyyy/MM/dd HH:mm:ss"格式的数据

date {

source_field = "timestamp"

target_field = "datetime"

source_time_format = "dd/MMM/yyyy:HH:mm:ss Z"

target_time_format = "yyyy/MM/dd HH:mm:ss"

}

# 使用SQL筛选关注的字段,并对字段进行处理

# 甚至可以通过过滤条件过滤掉不关心的数据

sql {

table_name = "access"

sql = "select substring(date, 1, 10) as date, datetime, hostname, url, http_code, float(request_time), int(data_size), domain from access"

}

}

Output

最后将处理好的结构化数据写入ClickHouse

output {

clickhouse {

host = "your.clickhouse.host:8123"

database = "waterdrop"

table = "access_log"

fields = ["date", "datetime", "hostname", "uri", "http_code", "request_time", "data_size", "domain"]

username = "username"

password = "password"

}

}

Running Waterdrop

将上述四部分配置组合成为配置文件config/batch.conf

vim config/batch.conf

spark {

spark.app.name = "Waterdrop"

spark.executor.instances = 2

spark.executor.cores = 1

spark.executor.memory = "1g"

}

input {

hdfs {

path = "hdfs://nomanode:8020/rowlog/accesslog"

table_name = "access_log"

format = "text"

}

}

filter {

# 使用正则解析原始日志

grok {

source_field = "raw_message"

pattern = '%{IP:ha_ip}\\s%{NOTSPACE:domain}\\s%{IP:remote_addr}\\s%{NUMBER:request_time}s\\s\"%{DATA:upstream_ip}\"\\s\\[%{HTTPDATE:timestamp}\\]\\s\"%{NOTSPACE:method}\\s%{DATA:url}\\s%{NOTSPACE:http_ver}\"\\s%{NUMBER:status}\\s%{NUMBER:body_bytes_send}\\s%{DATA:referer}\\s%{NOTSPACE:cookie_info}\\s\"%{DATA:user_agent}\"\\s%{DATA:uid}\\s%{DATA:session_id}\\s\"%{DATA:pool}\"\\s\"%{DATA:tag2}\"\\s%{DATA:tag3}\\s%{DATA:tag4}'

}

# 将"dd/MMM/yyyy:HH:mm:ss Z"格式的数据转换为

# "yyyy/MM/dd HH:mm:ss"格式的数据

date {

source_field = "timestamp"

target_field = "datetime"

source_time_format = "dd/MMM/yyyy:HH:mm:ss Z"

target_time_format = "yyyy/MM/dd HH:mm:ss"

}

# 使用SQL筛选关注的字段,并对字段进行处理

# 甚至可以通过过滤条件过滤掉不关心的数据

sql {

table_name = "access"

sql = "select substring(date, 1, 10) as date, datetime, hostname, url, http_code, float(request_time), int(data_size), domain from access"

}

}

output {

clickhouse {

host = "your.clickhouse.host:8123"

database = "waterdrop"

table = "access_log"

fields = ["date", "datetime", "hostname", "uri", "http_code", "request_time", "data_size", "domain"]

username = "username"

password = "password"

}

}

执行命令,指定配置文件,运行Waterdrop,即可将数据写入ClickHouse。

./bin/start-waterdrop.sh --config config/batch.conf -e client -m 'local[2]'

以上介绍了如何使用Waterdrop将HDFS中的Nginx日志文件导入ClickHouse中。仅通过一个配置文件便可快速完成数据的导入,无需编写任何代码。除了支持HDFS数据源之外,Waterdrop同样支持将数据从kafka中实时读取处理写入ClickHouse中。


【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。