客快物流大数据项目(六十):将消费的kafka数据转换成bean对象
【摘要】
目录
将消费的kafka数据转换成bean对象
一、将OGG数据转换成bean对象
二、将Canal数据转换成bean对象
三、完整代码
将消费的kafka数据转换成bean对象
一、将OGG数据转换成bean对象
实现步骤:
消费kafka的 logistics Topic数据将消费到的数据转换...
目录
将消费的kafka数据转换成bean对象
一、将OGG数据转换成bean对象
实现步骤:
- 消费kafka的 logistics Topic数据
- 将消费到的数据转换成OggMessageBean对象
- 递交作业启动运行
实现过程:
- 消费kafka的 logistics Topic数据
-
//2.1:获取物流系统相关的数据
-
val logisticsDF: DataFrame = getKafkaSource(sparkSession, Configuration.kafkaLogisticsTopic)
- 将消费到的数据转换成OggMessageBean对象
- 默认情况下表名带有数据库名,因此需要删除掉数据库名
-
//3.1:物流相关数据的转换
-
val logsticsMessageBean: Dataset[OggMessageBean] = logisticsDF.filter(!_.isNullAt(0)).mapPartitions(iters => {
-
iters.map(row => {
-
//获取到value列的值(字符串)
-
val jsonStr: String = row.getAs[String](0)
-
//将字符串转换成javabean对象
-
JSON.parseObject(jsonStr, classOf[OggMessageBean])
-
}).toList.iterator
-
})(Encoders.bean(classOf[OggMessageBean]))
- 递交作业启动运行
-
// 设置Streaming应用输出及启动
-
logisticsDF.writeStream.outputMode(OutputMode.Update())
-
.format("console").queryName("logistics").start()
二、将Canal数据转换成bean对象
实现步骤:
- 消费kafka的 crm Topic数据
- 将消费到的数据转换成 CanalMessageBean 对象
- 递交作业启动运行
实现过程:
- 消费kafka的 crm Topic数据
-
//2.2:获取客户关系系统相关的数据
-
val crmDF: DataFrame = getKafkaSource(sparkSession, Configuration.kafkaCrmTopic)
- 将消费到的数据转换成CanalMessageBean 对象
-
//3.2:客户关系相关数据的转换
-
val crmMessageBean: Dataset[CanalMessageBean] = crmDF.filter(!_.isNullAt(0)).mapPartitions(iters=>{
-
//canal同步的数据除了增删改操作以外,还有清空表数据的操作,因此将清空表数据的操作过滤掉
-
iters.filter(row=>{
-
//取到value列的数据
-
val line: String = row.getAs[String](0)
-
//如果value列的值不为空,且是清空表的操作
-
if(line!=null && line.toUpperCase().contains("TRUNCATE")) false else true
-
}).map(row=>{
-
//取到value列的数据
-
val jsonStr: String = row.getAs[String](0)
-
//将json字符串转换成javaBean对象
-
JSON.parseObject(jsonStr, classOf[CanalMessageBean])
-
}).toList.toIterator
-
})(Encoders.bean(classOf[CanalMessageBean]))
- 递交作业启动运行
-
crmDF.writeStream.outputMode(OutputMode.Update())
-
.format("console").queryName("crm").start()
三、完整代码
-
package cn.it.logistics.etl.realtime
-
import java.sql.Connection
-
-
import cn.it.logistics.common.{Configuration, SparkUtils, TableMapping, Tools}
-
import cn.it.logistics.common.beans.parser.{CanalMessageBean, OggMessageBean}
-
import cn.it.logistics.etl.parser.DataParser
-
import com.alibaba.fastjson.JSON
-
import org.apache.spark.SparkConf
-
import org.apache.spark.sql.streaming.OutputMode
-
import org.apache.spark.sql.{DataFrame, Dataset, Encoders, SparkSession}
-
-
/**
-
* 实现KUDU数据库的实时ETL操作
-
*/
-
object KuduStreamApp2 extends StreamApp {
-
-
/**
-
* 入口方法
-
* @param args
-
*/
-
def main(args: Array[String]): Unit = {
-
//创建sparkConf对象
-
val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
-
SparkUtils.sparkConf(this.getClass.getSimpleName)
-
)
-
-
//数据处理
-
execute(sparkConf)
-
}
-
-
/**
-
* 数据的处理
-
*
-
* @param sparkConf
-
*/
-
override def execute(sparkConf: SparkConf): Unit = {
-
/**
-
* 实现步骤:
-
* 1)创建sparksession对象
-
* 2)获取数据源(获取物流相关数据以及crm相关数据)
-
* 3)对数据进行处理(返回的数据是字符串类型,需要转换成javabean对象)
-
* 4)抽取每条数据的字段信息
-
* 5)将过滤出来的每张表写入到kudu数据库
-
*/
-
//1)创建sparksession对象
-
val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
-
sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)
-
-
//2)获取数据源(获取物流相关数据以及crm相关数据)
-
//2.1:获取物流系统相关的数据
-
val logisticsDF: DataFrame = getKafkaSource(sparkSession, Configuration.kafkaLogisticsTopic)
-
-
//2.2:获取客户关系系统相关的数据
-
val crmDF: DataFrame = getKafkaSource(sparkSession, Configuration.kafkaCrmTopic)
-
-
//导入隐式转换
-
import sparkSession.implicits._
-
-
//导入自定义的POJO的隐士转换
-
import cn.it.logistics.common.BeanImplicit._
-
-
//3)对数据进行处理(返回的数据是字符串类型,需要转换成javabean对象)
-
//3.1:物流相关数据的转换
-
val logsticsMessageBean: Dataset[OggMessageBean] = logisticsDF.filter(!_.isNullAt(0)).mapPartitions(iters => {
-
iters.map(row => {
-
//获取到value列的值(字符串)
-
val jsonStr: String = row.getAs[String](0)
-
//将字符串转换成javabean对象
-
JSON.parseObject(jsonStr, classOf[OggMessageBean])
-
}).toList.iterator
-
})(Encoders.bean(classOf[OggMessageBean]))
-
-
//3.2:客户关系相关数据的转换
-
val crmMessageBean: Dataset[CanalMessageBean] = crmDF.filter(!_.isNullAt(0)).mapPartitions(iters=>{
-
//canal同步的数据除了增删改操作以外,还有清空表数据的操作,因此将清空表数据的操作过滤掉
-
iters.filter(row=>{
-
//取到value列的数据
-
val line: String = row.getAs[String](0)
-
//如果value列的值不为空,且是清空表的操作
-
if(line!=null && line.toUpperCase().contains("TRUNCATE")) false else true
-
}).map(row=>{
-
//取到value列的数据
-
val jsonStr: String = row.getAs[String](0)
-
//将json字符串转换成javaBean对象
-
JSON.parseObject(jsonStr, classOf[CanalMessageBean])
-
}).toList.toIterator
-
})(Encoders.bean(classOf[CanalMessageBean]))
-
-
//输出数据
-
/**
-
* +--------------------+--------------------+--------------------+--------------------+-------+--------------------+-------------------+
-
* | after| before| current_ts| op_ts|op_type| pos| table|
-
* +--------------------+--------------------+--------------------+--------------------+-------+--------------------+-------------------+
-
* |[eid -> [], cdt -...|[eid -> [], cdt -...|2020-10-10T02:35:...|2020-10-10 02:35:...| U|00000000200006647808|tbl_collect_package|
-
* +--------------------+--------------------+--------------------+--------------------+-------+--------------------+-------------------+
-
*/
-
logsticsMessageBean.writeStream.outputMode(OutputMode.Update()).format("console").queryName("logistics").start()
-
-
/**
-
* +--------------------+--------+-----+-------------+---+--------------------+------------------+---+--------------------+-----------+-------------+------+
-
* | data|database| ddl| es| id| mysqlType| old|sql| sqlType| table| ts| type|
-
* +--------------------+--------+-----+-------------+---+--------------------+------------------+---+--------------------+-----------+-------------+------+
-
* |[[cdt -> [], gis_...| crm|false|1602297244000| 18|[cdt -> [], gis_a...|[ {"gis_addr":"1"}]| |[cdt -> [], gis_a...|tbl_address|1602297244211|UPDATE|
-
* +--------------------+--------+-----+-------------+---+--------------------+------------------+---+--------------------+-----------+-------------+------+
-
*/
-
crmMessageBean.writeStream.outputMode(OutputMode.Update()).format("console").queryName("crm").start()
-
-
//8)启动运行等待停止
-
val stream = sparkSession.streams
-
//stream.active:获取当前活动流式查询的列表
-
stream.active.foreach(query => println(s"准备启动的查询:${query.name}"))
-
//线程阻塞,等待终止
-
stream.awaitAnyTermination()
-
}
-
-
/**
-
* 数据的保存
-
* @param dataFrame
-
* @param tableName
-
* @param isAutoCreateTable
-
*/
-
override def save(dataFrame: DataFrame, tableName: String, isAutoCreateTable: Boolean = true): Unit = {
-
}
- 📢博客主页:https://lansonli.blog.csdn.net
- 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
- 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
- 📢大数据系列文章会每天更新,停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨
文章来源: lansonli.blog.csdn.net,作者:Lansonli,版权归原作者所有,如需转载,请联系作者。
原文链接:lansonli.blog.csdn.net/article/details/123364958
【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)