从零开始实施推荐系统的落地部署——26.推荐系统案例(十六)Waterdrop的部署和应用
Waterdrop 是一个非常易用,高性能、支持实时流式和离线批处理的海量数据处理产品,架构于Apache Spark 和 Apache Flink之上。为了Spark的使用更简单,更高效,并将业界和使用Spark的优质经验固化到Waterdrop这个产品中,明显减少学习成本,加快分布式数据处理能力在生产环境落地。它的特性简单易用,灵活配置,无需开发,实时流式处理,高性能,海量数据处理能力,模块化和插件化,易于扩展,所以能应用在海量数据ETL,海量数据聚合和多源数据处理等场景,
- 安装Waterdrop
unzip waterdrop-1.5.1-with-spark.zip
mv waterdrop-1.5.1-with-spark/ waterdrop-1.5.1 /opt/modules/waterdrop
2. 配置 Waterdrop
(1) 编辑 vi /opt/modules/waterdrop/config/waterdrop-env.sh, 指定必须环境配置如SPARK_HOME
SPARK_HOME=/opt/modules/spark
(2) 编辑 vi /opt/modules/waterdrop/config/application.conf, 它决定了Waterdrop启动后,数据输入,处理,输出的方式和逻辑。
touch /opt/modules/waterdrop/config/application.conf
*******
######
###### This config file is a demonstration of streaming processing in waterdrop config
######
spark {
# You can set spark configuration here
# Waterdrop defined streaming batch duration in seconds
spark.streaming.batchDuration = 5
# see available properties defined by spark: https://spark.apache.org/docs/latest/configuration.html#available-properties
spark.app.name = "Waterdrop"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
}
input {
# This is a example input plugin **only for test and demonstrate the feature input plugin**
fakestream {
content = ["Hello World, InterestingLab"]
rate = 1
}
# If you would like to get more information about how to configure waterdrop and see full list of input plugins,
# please go to https://interestinglab.github.io/waterdrop/#/zh-cn/configuration/base
}
filter {
split {
fields = ["msg", "name"]
delimiter = ","
}
# If you would like to get more information about how to configure waterdrop and see full list of filter plugins,
# please go to https://interestinglab.github.io/waterdrop/#/zh-cn/configuration/base
}
output {
stdout {}
# If you would like to get more information about how to configure waterdrop and see full list of output plugins,
# please go to https://interestinglab.github.io/waterdrop/#/zh-cn/configuration/base
}
*****
cd /opt/modules/waterdrop
./bin/start-waterdrop.sh --master local[4] --deploy-mode client --config ./config/application.conf
3. 如何快速通过waterdrop将mysql中的数据导入ClickHouse
先删除之前clickhouse中myemployees数据库里的employees表,再重新创建。
touch /opt/modules/waterdrop/config/mysql_clickhouse.conf
******
spark {
spark.streaming.batchDuration = 5
spark.app.name = "Waterdrop"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
}
input {
mysql {
url = "jdbc:mysql://mas5.wuyi.com:3306/myemployees"
table = "employees"
result_table_name = "employees"
user = "root"
password = "123456"
}
}
filter {
sql {
sql = "select * from employees",
}
}
output {
ClickHouse {
host = "mas2.wuyi.com:8123"
database = "myemployees"
table = "employees"
}
}
*****
启动waterdrop
cd /opt/modules/waterdrop
./bin/start-waterdrop.sh --master yarn --deploy-mode cluster --config /opt/modules/waterdrop/config/mysql_clickhouse.conf
运行报错,出现下图的情况。
出现这种情况是因为要做类型的转换Timestamp,写入 ClickHouse 之前需要通过 Filter 中的 SQL 或者 Convert 插件将各字段转换为对应格式,否则会产生报错。
以下为转换目标类型对照表(未列出的类型暂不支持):
ClickHouse字段类型 | Convert插件转化目标类型 | SQL转化表达式 | Description |
---|---|---|---|
Date | string | string() | yyyy-MM-dd 格式字符串 |
DateTime | string | string() | yyyy-MM-dd HH:mm:ss 格式字符串 |
String | string | string() | |
Int8 | integer | int() | |
Uint8 | integer | int() | |
Int16 | integer | int() | |
Uint16 | integer | int() | |
Int32 | integer | int() | |
Uint32 | long | bigint() | |
Int64 | long | bigint() | |
Uint64 | long | bigint() | |
Float32 | float | float() | |
Float64 | double | double() | |
Decimal(P, S) | - | CAST(source AS DECIMAL(P, S)) | Decimal32(S), Decimal64(S), Decimal128(S)皆可使用 |
Array(T) | - | - | |
Nullable(T) | 取决于T | 取决于T | |
LowCardinality(T) | 取决于T | 取决于T |
在filter里添加
convert {
source_field = "employee_id"
new_type = "integer"
}
convert {
source_field = "salary"
new_type = "double"
}
convert {
source_field = "commission_pct"
new_type = "double"
}
convert {
source_field = "manager_id"
new_type = "integer"
}
convert {
source_field = "department_id"
new_type = "integer"
}
convert {
source_field = "hiredate"
new_type = "string"
}
在运行即可成功。
出现上面的情况时说明不支持9000端口写入数据到clickhouse,更换到8123即可。
- 点赞
- 收藏
- 关注作者
评论(0)