湖仓一体电商项目(十六):业务实现之编写写入ODS层业务代码

举报
Lansonli 发表于 2022/09/25 06:07:04 2022/09/25
【摘要】 文章目录 业务实现之编写写入ODS层业务代码 一、代码编写 二、创建Iceberg-ODS层表 1、在Hive中添加Iceberg表格式需要的包 2、创建Iceberg表 三、代码测试 1、在Kafka中创建对应的topic 2、将代码中消费Kafka数据改成从头开始消费 3、启动日志采集接口,启动Flume监控 4...

文章目录

业务实现之编写写入ODS层业务代码

一、代码编写

二、创建Iceberg-ODS层表

1、在Hive中添加Iceberg表格式需要的包

2、创建Iceberg表

三、代码测试

1、在Kafka中创建对应的topic

2、将代码中消费Kafka数据改成从头开始消费

3、启动日志采集接口,启动Flume监控

4、执行代码,查看对应topic中的结果

5、执行模拟生产用户日志代码,查看对应topic中的结果


业务实现之编写写入ODS层业务代码

由于本业务涉及到MySQL业务数据和用户日志数据,两类数据是分别采集存储在不同的Kafka Topic中的,所以这里写入ODS层代码由两个代码组成。

一、代码编写

处理MySQL业务库binlog数据的代码复用第一个业务代码只需要在”ProduceKafkaDBDataToODS.scala” 代码中写入存入Icebeg-ODS层表的代码即可,“ProduceKafkaDBDataToODS.scala”代码文件中加入代码如下:


  
  1. //向Iceberg ods 层 ODS_PRODUCT_CATEGORY 表插入数据
  2. tblEnv.executeSql(
  3. """
  4. |insert into hadoop_iceberg.icebergdb.ODS_PRODUCT_CATEGORY
  5. |select
  6. | data['id'] as id ,
  7. | data['p_id'] as p_id,
  8. | data['name'] as name,
  9. | data['pic_url'] as pic_url,
  10. | data['gmt_create'] as gmt_create
  11. | from kafka_db_bussiness_tbl where `table` = 'pc_product_category'
  12. """.stripMargin)
  13. //向Iceberg ods 层 ODS_PRODUCT_INFO 表插入数据
  14. tblEnv.executeSql(
  15. """
  16. |insert into hadoop_iceberg.icebergdb.ODS_PRODUCT_INFO
  17. |select
  18. | data['product_id'] as product_id ,
  19. | data['category_id'] as category_id,
  20. | data['product_name'] as product_name,
  21. | data['gmt_create'] as gmt_create
  22. | from kafka_db_bussiness_tbl where `table` = 'pc_product'
  23. """.stripMargin)
  24. 处理用户日志的代码需要自己编写,代码中的业务逻辑主要是读取存储用户浏览日志数据topic “KAFKA-USER-LOG-DATA”中的数据,通过Flink代码处理将不同类型用户日志处理成json类型数据,将该json结果后续除了存储在Iceberg-ODS层对应的表之外还要将数据存储在Kafka topic “KAFKA-ODS-TOPIC” 中方便后续的业务处理。具体代码参照“ProduceKafkaLogDataToODS.scala”,主要代码逻辑如下:
  25. object ProduceKafkaLogDataToODS {
  26. private val kafkaBrokers: String = ConfigUtil.KAFKA_BROKERS
  27. private val kafkaOdsTopic: String = ConfigUtil.KAFKA_ODS_TOPIC
  28. private val kafkaDwdBrowseLogTopic: String = ConfigUtil.KAFKA_DWD_BROWSELOG_TOPIC
  29. def main(args: Array[String]): Unit = {
  30. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  31. val tblEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
  32. env.enableCheckpointing(5000)
  33. import org.apache.flink.streaming.api.scala._
  34. /**
  35. * 1.需要预先创建 Catalog
  36. * 创建Catalog,创建表需要在Hive中提前创建好,不在代码中创建,因为在Flink中创建iceberg表不支持create table if not exists ...语法
  37. */
  38. tblEnv.executeSql(
  39. """
  40. |create catalog hadoop_iceberg with (
  41. | 'type'='iceberg',
  42. | 'catalog-type'='hadoop',
  43. | 'warehouse'='hdfs://mycluster/lakehousedata'
  44. |)
  45. """.stripMargin)
  46. /**
  47. * {
  48. * "logtype": "browselog",
  49. * "data": {
  50. * "browseProductCode": "eSHd1sFat9",
  51. * "browseProductTpCode": "242",
  52. * "userIp": "251.100.236.37",
  53. * "obtainPoints": 32,
  54. * "userId": "uid208600",
  55. * "frontProductUrl": "https://f/dcjp/nVnE",
  56. * "logTime": 1646980514321,
  57. * "browseProductUrl": "https://kI/DXSNBeP/"
  58. * }
  59. * }
  60. */
  61. /**
  62. * 2.创建 Kafka Connector,连接消费Kafka中数据
  63. * 注意:1).关键字要使用 " 飘"符号引起来 2).对于json对象使用 map < String,String>来接收
  64. */
  65. tblEnv.executeSql(
  66. """
  67. |create table kafka_log_data_tbl(
  68. | logtype string,
  69. | data map<string,string>
  70. |) with (
  71. | 'connector' = 'kafka',
  72. | 'topic' = 'KAFKA-USER-LOG-DATA',
  73. | 'properties.bootstrap.servers'='node1:9092,node2:9092,node3:9092',
  74. | 'scan.startup.mode'='earliest-offset', --也可以指定 earliest-offset 、latest-offset
  75. | 'properties.group.id' = 'my-group-id',
  76. | 'format' = 'json'
  77. |)
  78. """.stripMargin)
  79. /**
  80. * 3.将不同的业务库数据存入各自的Iceberg表
  81. */
  82. tblEnv.executeSql(
  83. """
  84. |insert into hadoop_iceberg.icebergdb.ODS_BROWSELOG
  85. |select
  86. | data['logTime'] as log_time ,
  87. | data['userId'] as user_id,
  88. | data['userIp'] as user_ip,
  89. | data['frontProductUrl'] as front_product_url,
  90. | data['browseProductUrl'] as browse_product_url,
  91. | data['browseProductTpCode'] as browse_product_tpcode,
  92. | data['browseProductCode'] as browse_product_code,
  93. | data['obtainPoints'] as obtain_points
  94. | from kafka_log_data_tbl where `logtype` = 'browselog'
  95. """.stripMargin)
  96. //4.将用户所有日志数据组装成Json数据存入 kafka topic ODS-TOPIC 中
  97. //读取 Kafka 中的数据,将维度数据另外存储到 Kafka 中
  98. val kafkaLogTbl: Table = tblEnv.sqlQuery("select logtype,data from kafka_log_data_tbl")
  99. //将 kafkaLogTbl Table 转换成 DataStream 数据
  100. val userLogDS: DataStream[Row] = tblEnv.toAppendStream[Row](kafkaLogTbl)
  101. //将 userLogDS 数据转换成JSON 数据写出到 kafka topic ODS-TOPIC
  102. val odsSinkDS: DataStream[String] = userLogDS.map(row => {
  103. //最后返回给Kafka 日志数据的json对象
  104. val returnJsonObj = new JSONObject()
  105. val logType: String = row.getField(0).toString
  106. val data: String = row.getField(1).toString
  107. val nObject = new JSONObject()
  108. val arr: Array[String] = data.stripPrefix("{").stripSuffix("}").split(",")
  109. for (elem <- arr) {
  110. //有些数据 “data”中属性没有值,就没有“=”
  111. if (elem.contains("=") && elem.split("=").length == 2) {
  112. val split: Array[String] = elem.split("=")
  113. nObject.put(split(0).trim, split(1).trim)
  114. } else {
  115. nObject.put(elem.stripSuffix("=").trim, "")
  116. }
  117. }
  118. if ("browselog".equals(logType)) {
  119. returnJsonObj.put("iceberg_ods_tbl_name", "ODS_BROWSELOG")
  120. returnJsonObj.put("kafka_dwd_topic",kafkaDwdBrowseLogTopic)
  121. returnJsonObj.put("data",nObject.toString)
  122. } else {
  123. //其他日志,这里目前没有
  124. }
  125. returnJsonObj.toJSONString
  126. })
  127. val props = new Properties()
  128. props.setProperty("bootstrap.servers",kafkaBrokers)
  129. odsSinkDS.addSink(new FlinkKafkaProducer[String](kafkaOdsTopic,new KafkaSerializationSchema[String] {
  130. override def serialize(element: String, timestamp: java.lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
  131. new ProducerRecord[Array[Byte],Array[Byte]](kafkaOdsTopic,null,element.getBytes())
  132. }
  133. },props,FlinkKafkaProducer.Semantic.AT_LEAST_ONCE))
  134. env.execute()
  135. }
  136. }

二、​​​​​​​创建Iceberg-ODS层表

代码在执行之前需要在Hive中预先创建对应的Iceberg表,创建Icebreg表方式如下:

1、在Hive中添加Iceberg表格式需要的包

启动HDFS集群,node1启动Hive metastore服务,在Hive客户端启动Hive添加Iceberg依赖包:


  
  1. #node1节点启动Hive metastore服务
  2. [root@node1 ~]# hive --service metastore &
  3. #在hive客户端node3节点加载两个jar包
  4. add jar /software/hive-3.1.2/lib/iceberg-hive-runtime-0.12.1.jar;
  5. add jar /software/hive-3.1.2/lib/libfb303-0.9.3.jar;

2、创建Iceberg表

这里创建Iceberg表有“ODS_PRODUCT_CATEGORY”、“ODS_PRODUCT_INFO”,创建语句如下:


  
  1. CREATE TABLE ODS_PRODUCT_CATEGORY (
  2. id string,
  3. p_id string,
  4. name string,
  5. pic_url string,
  6. gmt_create string
  7. )STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
  8. LOCATION 'hdfs://mycluster/lakehousedata/icebergdb/ODS_PRODUCT_CATEGORY/'
  9. TBLPROPERTIES ('iceberg.catalog'='location_based_table',
  10. 'write.metadata.delete-after-commit.enabled'= 'true',
  11. 'write.metadata.previous-versions-max' = '3'
  12. );
  13. CREATE TABLE ODS_PRODUCT_INFO (
  14. product_id string,
  15. category_id string,
  16. product_name string,
  17. gmt_create string
  18. )STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
  19. LOCATION 'hdfs://mycluster/lakehousedata/icebergdb/ODS_PRODUCT_INFO/'
  20. TBLPROPERTIES ('iceberg.catalog'='location_based_table',
  21. 'write.metadata.delete-after-commit.enabled'= 'true',
  22. 'write.metadata.previous-versions-max' = '3'
  23. );
  24. CREATE TABLE ODS_BROWSELOG (
  25. log_time string,
  26. user_id string,
  27. user_ip string,
  28. front_product_url string,
  29. browse_product_url string,
  30. browse_product_tpcode string,
  31. browse_product_code string,
  32. obtain_points string
  33. )STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
  34. LOCATION 'hdfs://mycluster/lakehousedata/icebergdb/ODS_BROWSELOG/'
  35. TBLPROPERTIES ('iceberg.catalog'='location_based_table',
  36. 'write.metadata.delete-after-commit.enabled'= 'true',
  37. 'write.metadata.previous-versions-max' = '3'
  38. );

以上语句在Hive客户端执行完成之后,在HDFS中可以看到对应的Iceberg数据目录:

三、代码测试

以上代码编写完成后,代码执行测试步骤如下:

1、在Kafka中创建对应的topic


  
  1. #在Kafka 中创建 KAFKA-USER-LOG-DATA topic
  2. ./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic KAFKA-USER-LOG-DATA --partitions 3 --replication-factor 3
  3. #在Kafka 中创建 KAFKA-ODS-TOPIC topic(第一个业务已创建可忽略)
  4. ./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic KAFKA-ODS-TOPIC --partitions 3 --replication-factor 3
  5. #在Kafka 中创建 KAFKA-DIM-TOPIC topic(第一个业务已创建可忽略)
  6. ./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic KAFKA-DIM-TOPIC --partitions 3 --replication-factor 3
  7. #监控以上两个topic数据
  8. [root@node1 bin]# ./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic KAFKA-ODS-TOPIC
  9. [root@node1 bin]# ./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic KAFKA-DIM-TOPIC

2、将代码中消费Kafka数据改成从头开始消费

代码中Kafka Connector中属性“scan.startup.mode”设置为“earliest-offset”,从头开始消费数据。

这里也可以不设置从头开始消费Kafka数据,而是直接启动实时向MySQL表中写入数据代码“RTMockDBData.java”代码,实时向MySQL对应的表中写入数据,这里需要启动maxwell监控数据,代码才能实时监控到写入MySQL的业务数据。

针对用户日志数据可以启动代码“RTMockUserLogData.java”,实时向日志采集接口写入数据。

3、启动日志采集接口,启动Flume监控

如果上个步骤中设置从“earliest-offset”消费kafka数据,可以暂时不启动日志采集接口和Flume


  
  1. #在node5节点上启动日志采集接口
  2. [root@node5 ~]# cd /software/
  3. [root@node5 software]# java -jar logcollector-0.0.1-SNAPSHOT.jar
  4. #在node5节点上启动Flume
  5. [root@node5 software]# flume-ng agent --name a -f /software/a.properties -Dflume.root.logger=INFO,console

4、执行代码,查看对应topic中的结果

以上代码执行后在,在对应的Kafka “KAFKA-DIM-TOPIC”和“KAFKA-ODS-TOPIC”中都有对应的数据。在Iceberg-ODS层中对应的表中也有数据。

5、执行模拟生产用户日志代码,查看对应topic中的结果

执行模拟产生用户日志数据代码:RTMockUserLogData.java,观察对应的Kafak “KAFKA-ODS-TOPIC”中有实时数据被采集。


  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

文章来源: lansonli.blog.csdn.net,作者:Lansonli,版权归原作者所有,如需转载,请联系作者。

原文链接:lansonli.blog.csdn.net/article/details/126926304

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。