客快物流大数据项目(六十):将消费的kafka数据转换成bean对象

举报
Lansonli 发表于 2022/03/09 23:21:57 2022/03/09
【摘要】 目录 将消费的kafka数据转换成bean对象 一、将OGG数据转换成bean对象 二、将Canal数据转换成bean对象 三、完整代码 将消费的kafka数据转换成bean对象 一、​​​​​​​将OGG数据转换成bean对象 实现步骤: 消费kafka的 logistics Topic数据将消费到的数据转换...

目录

将消费的kafka数据转换成bean对象

一、将OGG数据转换成bean对象

二、将Canal数据转换成bean对象

三、完整代码


将消费的kafka数据转换成bean对象

一、​​​​​​​将OGG数据转换成bean对象

实现步骤:

  • 消费kafka的 logistics Topic数据
  • 将消费到的数据转换成OggMessageBean对象
  • 递交作业启动运行

实现过程:

  • 消费kafka的 logistics Topic数据

  
  1. //2.1:获取物流系统相关的数据
  2. val logisticsDF: DataFrame = getKafkaSource(sparkSession, Configuration.kafkaLogisticsTopic)
  • 将消费到的数据转换成OggMessageBean对象
    • 默认情况下表名带有数据库名,因此需要删除掉数据库名

  
  1. //3.1:物流相关数据的转换
  2. val logsticsMessageBean: Dataset[OggMessageBean] = logisticsDF.filter(!_.isNullAt(0)).mapPartitions(iters => {
  3. iters.map(row => {
  4. //获取到value列的值(字符串)
  5. val jsonStr: String = row.getAs[String](0)
  6. //将字符串转换成javabean对象
  7. JSON.parseObject(jsonStr, classOf[OggMessageBean])
  8. }).toList.iterator
  9. })(Encoders.bean(classOf[OggMessageBean]))
  • 递交作业启动运行

  
  1. // 设置Streaming应用输出及启动
  2. logisticsDF.writeStream.outputMode(OutputMode.Update())
  3. .format("console").queryName("logistics").start()

二、​​​​​​​将Canal数据转换成bean对象

实现步骤:

  • 消费kafka的 crm Topic数据
  • 将消费到的数据转换成 CanalMessageBean 对象
  • 递交作业启动运行

实现过程:

  • 消费kafka的 crm Topic数据

  
  1. //2.2:获取客户关系系统相关的数据
  2. val crmDF: DataFrame = getKafkaSource(sparkSession, Configuration.kafkaCrmTopic)
  • 将消费到的数据转换成CanalMessageBean 对象

  
  1. //3.2:客户关系相关数据的转换
  2. val crmMessageBean: Dataset[CanalMessageBean] = crmDF.filter(!_.isNullAt(0)).mapPartitions(iters=>{
  3. //canal同步的数据除了增删改操作以外,还有清空表数据的操作,因此将清空表数据的操作过滤掉
  4. iters.filter(row=>{
  5. //取到value列的数据
  6. val line: String = row.getAs[String](0)
  7. //如果value列的值不为空,且是清空表的操作
  8. if(line!=null && line.toUpperCase().contains("TRUNCATE")) false else true
  9. }).map(row=>{
  10. //取到value列的数据
  11. val jsonStr: String = row.getAs[String](0)
  12. //将json字符串转换成javaBean对象
  13. JSON.parseObject(jsonStr, classOf[CanalMessageBean])
  14. }).toList.toIterator
  15. })(Encoders.bean(classOf[CanalMessageBean]))
  • 递交作业启动运行

  
  1. crmDF.writeStream.outputMode(OutputMode.Update())
  2. .format("console").queryName("crm").start()

三、完整代码


  
  1. package cn.it.logistics.etl.realtime
  2. import java.sql.Connection
  3. import cn.it.logistics.common.{Configuration, SparkUtils, TableMapping, Tools}
  4. import cn.it.logistics.common.beans.parser.{CanalMessageBean, OggMessageBean}
  5. import cn.it.logistics.etl.parser.DataParser
  6. import com.alibaba.fastjson.JSON
  7. import org.apache.spark.SparkConf
  8. import org.apache.spark.sql.streaming.OutputMode
  9. import org.apache.spark.sql.{DataFrame, Dataset, Encoders, SparkSession}
  10. /**
  11. * 实现KUDU数据库的实时ETL操作
  12. */
  13. object KuduStreamApp2 extends StreamApp {
  14. /**
  15. * 入口方法
  16. * @param args
  17. */
  18. def main(args: Array[String]): Unit = {
  19. //创建sparkConf对象
  20. val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
  21. SparkUtils.sparkConf(this.getClass.getSimpleName)
  22. )
  23. //数据处理
  24. execute(sparkConf)
  25. }
  26. /**
  27. * 数据的处理
  28. *
  29. * @param sparkConf
  30. */
  31. override def execute(sparkConf: SparkConf): Unit = {
  32. /**
  33. * 实现步骤:
  34. * 1)创建sparksession对象
  35. * 2)获取数据源(获取物流相关数据以及crm相关数据)
  36. * 3)对数据进行处理(返回的数据是字符串类型,需要转换成javabean对象)
  37. * 4)抽取每条数据的字段信息
  38. * 5)将过滤出来的每张表写入到kudu数据库
  39. */
  40. //1)创建sparksession对象
  41. val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
  42. sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)
  43. //2)获取数据源(获取物流相关数据以及crm相关数据)
  44. //2.1:获取物流系统相关的数据
  45. val logisticsDF: DataFrame = getKafkaSource(sparkSession, Configuration.kafkaLogisticsTopic)
  46. //2.2:获取客户关系系统相关的数据
  47. val crmDF: DataFrame = getKafkaSource(sparkSession, Configuration.kafkaCrmTopic)
  48. //导入隐式转换
  49. import sparkSession.implicits._
  50. //导入自定义的POJO的隐士转换
  51. import cn.it.logistics.common.BeanImplicit._
  52. //3)对数据进行处理(返回的数据是字符串类型,需要转换成javabean对象)
  53. //3.1:物流相关数据的转换
  54. val logsticsMessageBean: Dataset[OggMessageBean] = logisticsDF.filter(!_.isNullAt(0)).mapPartitions(iters => {
  55. iters.map(row => {
  56. //获取到value列的值(字符串)
  57. val jsonStr: String = row.getAs[String](0)
  58. //将字符串转换成javabean对象
  59. JSON.parseObject(jsonStr, classOf[OggMessageBean])
  60. }).toList.iterator
  61. })(Encoders.bean(classOf[OggMessageBean]))
  62. //3.2:客户关系相关数据的转换
  63. val crmMessageBean: Dataset[CanalMessageBean] = crmDF.filter(!_.isNullAt(0)).mapPartitions(iters=>{
  64. //canal同步的数据除了增删改操作以外,还有清空表数据的操作,因此将清空表数据的操作过滤掉
  65. iters.filter(row=>{
  66. //取到value列的数据
  67. val line: String = row.getAs[String](0)
  68. //如果value列的值不为空,且是清空表的操作
  69. if(line!=null && line.toUpperCase().contains("TRUNCATE")) false else true
  70. }).map(row=>{
  71. //取到value列的数据
  72. val jsonStr: String = row.getAs[String](0)
  73. //将json字符串转换成javaBean对象
  74. JSON.parseObject(jsonStr, classOf[CanalMessageBean])
  75. }).toList.toIterator
  76. })(Encoders.bean(classOf[CanalMessageBean]))
  77. //输出数据
  78. /**
  79. * +--------------------+--------------------+--------------------+--------------------+-------+--------------------+-------------------+
  80. * | after| before| current_ts| op_ts|op_type| pos| table|
  81. * +--------------------+--------------------+--------------------+--------------------+-------+--------------------+-------------------+
  82. * |[eid -> [], cdt -...|[eid -> [], cdt -...|2020-10-10T02:35:...|2020-10-10 02:35:...| U|00000000200006647808|tbl_collect_package|
  83. * +--------------------+--------------------+--------------------+--------------------+-------+--------------------+-------------------+
  84. */
  85. logsticsMessageBean.writeStream.outputMode(OutputMode.Update()).format("console").queryName("logistics").start()
  86. /**
  87. * +--------------------+--------+-----+-------------+---+--------------------+------------------+---+--------------------+-----------+-------------+------+
  88. * | data|database| ddl| es| id| mysqlType| old|sql| sqlType| table| ts| type|
  89. * +--------------------+--------+-----+-------------+---+--------------------+------------------+---+--------------------+-----------+-------------+------+
  90. * |[[cdt -> [], gis_...| crm|false|1602297244000| 18|[cdt -> [], gis_a...|[ {"gis_addr":"1"}]| |[cdt -> [], gis_a...|tbl_address|1602297244211|UPDATE|
  91. * +--------------------+--------+-----+-------------+---+--------------------+------------------+---+--------------------+-----------+-------------+------+
  92. */
  93. crmMessageBean.writeStream.outputMode(OutputMode.Update()).format("console").queryName("crm").start()
  94. //8)启动运行等待停止
  95. val stream = sparkSession.streams
  96. //stream.active:获取当前活动流式查询的列表
  97. stream.active.foreach(query => println(s"准备启动的查询:${query.name}"))
  98. //线程阻塞,等待终止
  99. stream.awaitAnyTermination()
  100. }
  101. /**
  102. * 数据的保存
  103. * @param dataFrame
  104. * @param tableName
  105. * @param isAutoCreateTable
  106. */
  107. override def save(dataFrame: DataFrame, tableName: String, isAutoCreateTable: Boolean = true): Unit = {
  108. }


  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢大数据系列文章会每天更新,停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

文章来源: lansonli.blog.csdn.net,作者:Lansonli,版权归原作者所有,如需转载,请联系作者。

原文链接:lansonli.blog.csdn.net/article/details/123364958

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。