从零开始实施推荐系统的落地部署——26.推荐系统案例(十六)Waterdrop的部署和应用

举报
wuyicom 发表于 2021/01/29 01:04:48 2021/01/29
【摘要】 Waterdrop 是一个非常易用,高性能、支持实时流式和离线批处理的海量数据处理产品,架构于Apache Spark 和 Apache Flink之上。为了Spark的使用更简单,更高效,并将业界和使用Spark的优质经验固化到Waterdrop这个产品中,明显减少学习成本,加快分布式数据处理能力在生产环境落地。它的特性简单易用,灵活配置,无需开发,实时流式处理,高性能,海量数据处理能力,...

1.PNG

Waterdrop 是一个非常易用,高性能、支持实时流式和离线批处理的海量数据处理产品,架构于Apache Spark Apache Flink之上。为了Spark的使用更简单,更高效,并将业界和使用Spark的优质经验固化到Waterdrop这个产品中,明显减少学习成本,加快分布式数据处理能力在生产环境落地。它的特性简单易用,灵活配置,无需开发,实时流式处理,高性能,海量数据处理能力,模块化和插件化,易于扩展,所以能应用在海量数据ETL,海量数据聚合和多源数据处理等场景,

  1. 安装Waterdrop

Wget https://github.com/InterestingLab/waterdrop/releases/download/v1.5.1/waterdrop-1.5.1-with-spark.zip -O waterdrop-1.5.1-with-spark.zip

unzip waterdrop-1.5.1-with-spark.zip

mv waterdrop-1.5.1-with-spark/ waterdrop-1.5.1 /opt/modules/waterdrop

    2. 配置 Waterdrop

(1) 编辑 vi /opt/modules/waterdrop/config/waterdrop-env.sh, 指定必须环境配置如SPARK_HOME

SPARK_HOME=/opt/modules/spark

(2) 编辑 vi /opt/modules/waterdrop/config/application.conf, 它决定了Waterdrop启动后,数据输入,处理,输出的方式和逻辑。

touch /opt/modules/waterdrop/config/application.conf

*******

######

###### This config file is a demonstration of streaming processing in waterdrop config

######

 

spark {

  # You can set spark configuration here

  # Waterdrop defined streaming batch duration in seconds

  spark.streaming.batchDuration = 5

 

  # see available properties defined by spark: https://spark.apache.org/docs/latest/configuration.html#available-properties

  spark.app.name = "Waterdrop"

  spark.executor.instances = 2

  spark.executor.cores = 1

  spark.executor.memory = "1g"

}

 

input {

  # This is a example input plugin **only for test and demonstrate the feature input plugin**

  fakestream {

    content = ["Hello World, InterestingLab"]

    rate = 1

  }

 

 

  # If you would like to get more information about how to configure waterdrop and see full list of input plugins,

  # please go to https://interestinglab.github.io/waterdrop/#/zh-cn/configuration/base

}

 

filter {

  split {

    fields = ["msg", "name"]

    delimiter = ","

  }

 

  # If you would like to get more information about how to configure waterdrop and see full list of filter plugins,

  # please go to https://interestinglab.github.io/waterdrop/#/zh-cn/configuration/base

}

 

output {

  stdout {}

 

 

  # If you would like to get more information about how to configure waterdrop and see full list of output plugins,

  # please go to https://interestinglab.github.io/waterdrop/#/zh-cn/configuration/base

}

*****

cd /opt/modules/waterdrop

./bin/start-waterdrop.sh --master local[4] --deploy-mode client --config ./config/application.conf

2.PNG

3. 如何快速通过waterdropmysql中的数据导入ClickHouse

先删除之前clickhousemyemployees数据库里的employees表,再重新创建。

3.PNG

touch /opt/modules/waterdrop/config/mysql_clickhouse.conf

******

spark {

  spark.streaming.batchDuration = 5

  spark.app.name = "Waterdrop"

  spark.executor.instances = 2

  spark.executor.cores = 1

  spark.executor.memory = "1g"

}

 

input {

  mysql {

    url = "jdbc:mysql://mas5.wuyi.com:3306/myemployees"

    table = "employees"

    result_table_name = "employees"

    user = "root"

    password = "123456"

  }

}

 

filter {

  sql {

    sql = "select * from employees",

  }

 

 

}

 

output {

  ClickHouse {

    host = "mas2.wuyi.com:8123"

    database = "myemployees"

    table = "employees"

   }

 

}

*****

启动waterdrop

cd /opt/modules/waterdrop

./bin/start-waterdrop.sh --master yarn --deploy-mode cluster --config /opt/modules/waterdrop/config/mysql_clickhouse.conf

运行报错,出现下图的情况。

4.PNG

出现这种情况是因为要做类型的转换Timestamp,写入 ClickHouse 之前需要通过 Filter 中的 SQL 或者 Convert 插件将各字段转换为对应格式,否则会产生报错。

以下为转换目标类型对照表(未列出的类型暂不支持)

ClickHouse字段类型 Convert插件转化目标类型 SQL转化表达式 Description
Date string string() yyyy-MM-dd格式字符串
DateTime string string() yyyy-MM-dd HH:mm:ss格式字符串
String string string()
Int8 integer int()
Uint8 integer int()
Int16 integer int()
Uint16 integer int()
Int32 integer int()
Uint32 long bigint()
Int64 long bigint()
Uint64 long bigint()
Float32 float float()
Float64 double double()
Decimal(P, S) - CAST(source AS DECIMAL(P, S)) Decimal32(S), Decimal64(S), Decimal128(S)皆可使用
Array(T) - -
Nullable(T) 取决于T 取决于T
LowCardinality(T) 取决于T 取决于T

filter里添加

  convert {

    source_field = "employee_id"

    new_type = "integer"

  }

  convert {

    source_field = "salary"

    new_type = "double"

  }

  convert {

    source_field = "commission_pct"

    new_type = "double"

  }

  convert {

    source_field = "manager_id"

    new_type = "integer"

  }

  convert {

    source_field = "department_id"

    new_type = "integer"

  }

  convert {

    source_field = "hiredate"

    new_type = "string"

  }

在运行即可成功。


5.PNG

6.PNG

7.PNG

出现上面的情况时说明不支持9000端口写入数据到clickhouse,更换到8123即可。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。