客快物流大数据项目(六十):将消费的kafka数据转换成bean对象

举报
Lansonli 发表于 2022/03/09 23:21:57 2022/03/09
【摘要】 目录 将消费的kafka数据转换成bean对象 一、将OGG数据转换成bean对象 二、将Canal数据转换成bean对象 三、完整代码 将消费的kafka数据转换成bean对象 一、​​​​​​​将OGG数据转换成bean对象 实现步骤: 消费kafka的 logistics Topic数据将消费到的数据转换...

目录

将消费的kafka数据转换成bean对象

一、将OGG数据转换成bean对象

二、将Canal数据转换成bean对象

三、完整代码


将消费的kafka数据转换成bean对象

一、​​​​​​​将OGG数据转换成bean对象

实现步骤:

  • 消费kafka的 logistics Topic数据
  • 将消费到的数据转换成OggMessageBean对象
  • 递交作业启动运行

实现过程:

  • 消费kafka的 logistics Topic数据

      //2.1:获取物流系统相关的数据
      val logisticsDF: DataFrame = getKafkaSource(sparkSession, Configuration.kafkaLogisticsTopic)
  
 
  • 将消费到的数据转换成OggMessageBean对象
    • 默认情况下表名带有数据库名,因此需要删除掉数据库名

      //3.1:物流相关数据的转换
      val logsticsMessageBean: Dataset[OggMessageBean] = logisticsDF.filter(!_.isNullAt(0)).mapPartitions(iters => {
        iters.map(row => {
         //获取到value列的值(字符串)
         val jsonStr: String = row.getAs[String](0)
         //将字符串转换成javabean对象
         JSON.parseObject(jsonStr, classOf[OggMessageBean])
        }).toList.iterator
      })(Encoders.bean(classOf[OggMessageBean]))
  
 
  • 递交作业启动运行

      // 设置Streaming应用输出及启动
      logisticsDF.writeStream.outputMode(OutputMode.Update())
        .format("console").queryName("logistics").start()
  
 

二、​​​​​​​将Canal数据转换成bean对象

实现步骤:

  • 消费kafka的 crm Topic数据
  • 将消费到的数据转换成 CanalMessageBean 对象
  • 递交作业启动运行

实现过程:

  • 消费kafka的 crm Topic数据

      //2.2:获取客户关系系统相关的数据
      val crmDF: DataFrame = getKafkaSource(sparkSession, Configuration.kafkaCrmTopic)
  
 
  • 将消费到的数据转换成CanalMessageBean 对象

      //3.2:客户关系相关数据的转换
      val crmMessageBean: Dataset[CanalMessageBean] = crmDF.filter(!_.isNullAt(0)).mapPartitions(iters=>{
       //canal同步的数据除了增删改操作以外,还有清空表数据的操作,因此将清空表数据的操作过滤掉
        iters.filter(row=>{
         //取到value列的数据
         val line: String = row.getAs[String](0)
         //如果value列的值不为空,且是清空表的操作
         if(line!=null && line.toUpperCase().contains("TRUNCATE")) false else true
        }).map(row=>{
         //取到value列的数据
         val jsonStr: String = row.getAs[String](0)
         //将json字符串转换成javaBean对象
         JSON.parseObject(jsonStr, classOf[CanalMessageBean])
        }).toList.toIterator
      })(Encoders.bean(classOf[CanalMessageBean]))
  
 
  • 递交作业启动运行

      crmDF.writeStream.outputMode(OutputMode.Update())
        .format("console").queryName("crm").start()
  
 

三、完整代码


      package cn.it.logistics.etl.realtime
      import java.sql.Connection
      import cn.it.logistics.common.{Configuration, SparkUtils, TableMapping, Tools}
      import cn.it.logistics.common.beans.parser.{CanalMessageBean, OggMessageBean}
      import cn.it.logistics.etl.parser.DataParser
      import com.alibaba.fastjson.JSON
      import org.apache.spark.SparkConf
      import org.apache.spark.sql.streaming.OutputMode
      import org.apache.spark.sql.{DataFrame, Dataset, Encoders, SparkSession}
      /**
       * 实现KUDU数据库的实时ETL操作
       */
      object KuduStreamApp2 extends StreamApp {
       /**
       * 入口方法
       * @param args
       */
       def main(args: Array[String]): Unit = {
         //创建sparkConf对象
         val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
           SparkUtils.sparkConf(this.getClass.getSimpleName)
          )
         //数据处理
          execute(sparkConf)
        }
       /**
       * 数据的处理
       *
       * @param sparkConf
       */
       override def execute(sparkConf: SparkConf): Unit = {
         /**
       * 实现步骤:
       * 1)创建sparksession对象
       * 2)获取数据源(获取物流相关数据以及crm相关数据)
       * 3)对数据进行处理(返回的数据是字符串类型,需要转换成javabean对象)
       * 4)抽取每条数据的字段信息
       * 5)将过滤出来的每张表写入到kudu数据库
       */
         //1)创建sparksession对象
         val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
          sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)
         //2)获取数据源(获取物流相关数据以及crm相关数据)
         //2.1:获取物流系统相关的数据
         val logisticsDF: DataFrame = getKafkaSource(sparkSession, Configuration.kafkaLogisticsTopic)
         //2.2:获取客户关系系统相关的数据
         val crmDF: DataFrame = getKafkaSource(sparkSession, Configuration.kafkaCrmTopic)
         //导入隐式转换
         import  sparkSession.implicits._
         //导入自定义的POJO的隐士转换
         import  cn.it.logistics.common.BeanImplicit._
         //3)对数据进行处理(返回的数据是字符串类型,需要转换成javabean对象)
         //3.1:物流相关数据的转换
         val logsticsMessageBean: Dataset[OggMessageBean] = logisticsDF.filter(!_.isNullAt(0)).mapPartitions(iters => {
            iters.map(row => {
             //获取到value列的值(字符串)
             val jsonStr: String = row.getAs[String](0)
             //将字符串转换成javabean对象
             JSON.parseObject(jsonStr, classOf[OggMessageBean])
            }).toList.iterator
          })(Encoders.bean(classOf[OggMessageBean]))
         //3.2:客户关系相关数据的转换
         val crmMessageBean: Dataset[CanalMessageBean] = crmDF.filter(!_.isNullAt(0)).mapPartitions(iters=>{
           //canal同步的数据除了增删改操作以外,还有清空表数据的操作,因此将清空表数据的操作过滤掉
            iters.filter(row=>{
             //取到value列的数据
             val line: String = row.getAs[String](0)
             //如果value列的值不为空,且是清空表的操作
             if(line!=null && line.toUpperCase().contains("TRUNCATE")) false else true
            }).map(row=>{
             //取到value列的数据
             val jsonStr: String = row.getAs[String](0)
             //将json字符串转换成javaBean对象
             JSON.parseObject(jsonStr, classOf[CanalMessageBean])
            }).toList.toIterator
          })(Encoders.bean(classOf[CanalMessageBean]))
         //输出数据
         /**
       * +--------------------+--------------------+--------------------+--------------------+-------+--------------------+-------------------+
       * | after| before| current_ts| op_ts|op_type| pos| table|
       * +--------------------+--------------------+--------------------+--------------------+-------+--------------------+-------------------+
       * |[eid -> [], cdt -...|[eid -> [], cdt -...|2020-10-10T02:35:...|2020-10-10 02:35:...| U|00000000200006647808|tbl_collect_package|
       * +--------------------+--------------------+--------------------+--------------------+-------+--------------------+-------------------+
       */
          logsticsMessageBean.writeStream.outputMode(OutputMode.Update()).format("console").queryName("logistics").start()
         /**
       * +--------------------+--------+-----+-------------+---+--------------------+------------------+---+--------------------+-----------+-------------+------+
       * | data|database| ddl| es| id| mysqlType| old|sql| sqlType| table| ts| type|
       * +--------------------+--------+-----+-------------+---+--------------------+------------------+---+--------------------+-----------+-------------+------+
       * |[[cdt -> [], gis_...| crm|false|1602297244000| 18|[cdt -> [], gis_a...|[ {"gis_addr":"1"}]| |[cdt -> [], gis_a...|tbl_address|1602297244211|UPDATE|
       * +--------------------+--------+-----+-------------+---+--------------------+------------------+---+--------------------+-----------+-------------+------+
       */
          crmMessageBean.writeStream.outputMode(OutputMode.Update()).format("console").queryName("crm").start()
         //8)启动运行等待停止
         val stream = sparkSession.streams
         //stream.active:获取当前活动流式查询的列表
          stream.active.foreach(query => println(s"准备启动的查询:${query.name}"))
         //线程阻塞,等待终止
          stream.awaitAnyTermination()
        }
       /**
       * 数据的保存
       * @param dataFrame
       * @param tableName
       * @param isAutoCreateTable
       */
       override def save(dataFrame: DataFrame, tableName: String, isAutoCreateTable: Boolean = true): Unit = {
        }
  
 


  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢大数据系列文章会每天更新,停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

文章来源: lansonli.blog.csdn.net,作者:Lansonli,版权归原作者所有,如需转载,请联系作者。

原文链接:lansonli.blog.csdn.net/article/details/123364958

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。