大数据物流项目:实时增量ETL存储Kudu(八点五)

举报
Maynor学长 发表于 2022/06/29 20:23:11 2022/06/29
【摘要】 08-[掌握]-实时ETL开发之数据转换Bean及测试任务:==编写代码,解析JSON字符串为MessageBean对象,属于实时ETL转换第一步。==1)、如何解析JSON字符串为JavaBean对象呢???使用阿里巴巴JSON库:fastJson,既能解析JSON为Bean对象,又能转换Bean对象为JSON字符串为什么使用fastJson解析?? fastJson解析Json字符串时...

08-[掌握]-实时ETL开发之数据转换Bean及测试

任务:==编写代码,解析JSON字符串为MessageBean对象,属于实时ETL转换第一步。==

  • 1)、如何解析JSON字符串为JavaBean对象呢???

    使用阿里巴巴JSON库:fastJson,既能解析JSON为Bean对象,又能转换Bean对象为JSON字符串

    为什么使用fastJson解析??
    	fastJson解析Json字符串时,使用起来比较简单,此外库基于Java语言开发,对JavaBean对象支持非常的好,对Scala语言支持不好,所以MessageBean使用Java语言定义的,没有使用Scala语言。
    
    • 转换JSON为Bean对象:JSON.parseObject(jsonStr, classOf[StuBean])

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-DiTGRUf4-1652014600919)(1616054946045.)]

    • 将Bean对象转换为JSON字符串:JSON.toJSONString(stuBean, true)

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wAfJcjCm-1652014600920)(1616054978516.)]

package cn.itcast.logistics.test;

import java.util.Objects;

public class StuBean {

	private Integer id ;
	private String name ;

	public StuBean() {
	}

	public StuBean(Integer id, String name) {
		this.id = id;
		this.name = name;
	}

	public Integer getId() {
		return id;
	}

	public void setId(Integer id) {
		this.id = id;
	}

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	@Override
	public boolean equals(Object o) {
		if (this == o) return true;
		if (o == null || getClass() != o.getClass()) return false;
		StuBean stuBean = (StuBean) o;
		return Objects.equals(id, stuBean.id) &&
			Objects.equals(name, stuBean.name);
	}

	@Override
	public int hashCode() {
		return Objects.hash(id, name);
	}

	@Override
	public String toString() {
		return "StuBean{" +
			"id=" + id +
			", name='" + name + '\'' +
			'}';
	}
}

// ===============================================================

package cn.itcast.logistics.test

import com.alibaba.fastjson.JSON

object FastJsonTest {
	
	
	def main(args: Array[String]): Unit = {
		
		// 定义json字符串
		val jsonStr: String =
			"""
			  |{
			  |  "id": 10001,
			  |  "name": "zhangsan"
			  |}
			  |""".stripMargin
		// JSON -> JavaBean
		val stuBean: StuBean = JSON.parseObject(jsonStr, classOf[StuBean])
		println(stuBean)
		
		
		// JavaBean转换为JSON字符串
		val stuJson: String = JSON.toJSONString(stuBean, true)
		println(stuJson)
	}
	
}

任务:编写代码,将从Kafka消费JSON字符串数据,解析为MessageBean对象即可

  • 1)、首先对物流系统数据,使用OGG采集数数据进行解析处理,核心代码如下:
			case "logistics" =>
				val oggBeanStreamDS: Dataset[OggMessageBean] = streamDF
					// 由于从Kafka消费数据,只获取value消息,将其转换DataSet
					.as[String]
					// 过滤数据
					.filter(msg => null != msg && msg.trim.length > 0)
					// 解析每条数据
					.map{
						msg => JSON.parseObject(msg, classOf[OggMessageBean])
					}(Encoders.bean(classOf[OggMessageBean])) // TODO: 指定编码器
				// 返回数据
				oggBeanStreamDS.toDF()

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-R4yvOxD0-1652014600920)(1616055524533.)]

​ 当将DataFrame转换Dataset进行操作时,尤其调用转换函数(比如map、filter、flatMap)等等,需要指定编码器Encoder(Dataset 强类型数据结构,指定类型编码)。

默认情况下,当Dataset数据类型为:元组类型、CaseClass或基本数据类型,都会提供默认编码器Encoder,除此之外数据类型,比如自定义Java语言Bean对象,必须指定编码器。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-PdlhZQfN-1652014600921)(1616055716033.)]

运行测试程序,在Oracle数据库中操作数据,查看控制台打印结果

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-VvKeLUp6-1652014600921)(1616055935505.)]

  • 2)、将Canal采集JSON数据,转换MessageBean对象,核心代码
// TODO: CRM系统业务数据
			case "crm" =>
				implicit val canalEncoder: Encoder[CanalMessageBean] = Encoders.bean(classOf[CanalMessageBean])
				val canalBeanStreamDS: Dataset[CanalMessageBean] = streamDF
					// 过滤数据
					.filter(row => !row.isNullAt(0))
					// 解析数据,对分区数据操作
					.mapPartitions { iter =>
						iter.map { row =>
							val jsonValue: String = row.getAs[String]("value")
							// 解析JSON字符串
							JSON.parseObject(jsonValue, classOf[CanalMessageBean])
						}
					}
				
				// 返回转换后的数据
				canalBeanStreamDS.toDF()

其中采用隐式参数方式,传递定义编码器Encoder

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-EC1VljWg-1652014600921)(1616056559325.)]

再次运行流式计算程序:KuduStreamApp,在MySQL数据库中修改表的数据,查看控制台打印结果

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Rg1sVo7i-1652014600923)(1616056711227.)]

09-[理解]-实时ETL开发之转换POJO【思路】

任务:分析从Kafka消费数据(JSON转换MessageBean对象),哪些字段是关系值。

  • OGG采集Oracle数据库数据:7个字段
  • Canal采集MySQL数据库数据:12个字段

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-tc5InAk0-1652014600923)(image-20210526072519874.)]

OGG采集Oracle数据库表的数据,发送到Kafka中JSON字符串,转换为MessageBean以后:

  • 第一个字段:table,对哪个表进行操作
  • 第二个字段:op_type,数据操作类型,三个值【插入I、更新U、删除D
  • 第三个字段:数据字段,可能是after(插入和更新),可能是before(删除),提供方法getValue

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-VOZkVqp6-1652014600923)(1616118817076-1621984868452.)]

Canal采集MySQL数据库数据时,业务中关心字段:与OGG采集数据关心字段基本一致

  • 第一个字段:table,表的名称,对哪个表进行操作
  • 第二个字段:type,数据操作类型,三个值【INSERT、UPDATE、DELETE
  • 第三个字段:data,真正操作数据

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4F8lXt9R-1652014600924)(1616119230880-1621984868452.)]

​ 经过前面分析可知,无论OGG采集海Canal采集数据,将JSON字符串封装为MessageBean以后,需要提取其中核心字段数据,进行转换操作。

将提取字段【type】类型和【data】数据,封装到具体表table的实体类【POJO】中,后面方便进行操作。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-7WYVty8J-1652014600924)(1616119695869-1621984868453.)]

10-[掌握]-实时ETL开发之OggBean转换POJO编程

任务:以OGG采集数据为例,针对其中一个张表【tbl_areas】进行转换操作。

1)、依据table字段判断数据:tbl_areas
2)、获取数据字段值:getValue方法,将其转换为POJO对象
3)、过滤掉转换为null数据
  • 1)、定义表名称的隐射

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Ff2HZHtE-1652014600924)(1616120220576-1621984868453.)]

package cn.itcast.logistics.common

/**
 * 定义表名:根据表的名字定义属性
 */
object TableMapping {
	
	// Logistics 物流系统表名称
	val AREAS: String = "tbl_areas"
	val CHARGE_STANDARD: String = "tbl_charge_standard"
	val CODES: String = "tbl_codes"
	val COLLECT_PACKAGE: String = "tbl_collect_package"
	val COMPANY: String = "tbl_company"
	val COMPANY_DOT_MAP: String = "tbl_company_dot_map"
	val COMPANY_TRANSPORT_ROUTE_MA: String = "tbl_company_transport_route_ma"
	val COMPANY_WAREHOUSE_MAP: String = "tbl_company_warehouse_map"
	val CONSUMER_SENDER_INFO: String = "tbl_consumer_sender_info"
	val COURIER: String = "tbl_courier"
	val DELIVER_PACKAGE: String = "tbl_deliver_package"
	val DELIVER_REGION: String = "tbl_deliver_region"
	val DELIVERY_RECORD: String = "tbl_delivery_record"
	val DEPARTMENT: String = "tbl_department"
	val DOT: String = "tbl_dot"
	val DOT_TRANSPORT_TOOL: String = "tbl_dot_transport_tool"
	val DRIVER: String = "tbl_driver"
	val EMP: String = "tbl_emp"
	val EMP_INFO_MAP: String = "tbl_emp_info_map"
	val EXPRESS_BILL: String = "tbl_express_bill"
	val EXPRESS_PACKAGE: String = "tbl_express_package"
	val FIXED_AREA: String = "tbl_fixed_area"
	val GOODS_RACK: String = "tbl_goods_rack"
	val JOB: String = "tbl_job"
	val OUT_WAREHOUSE: String = "tbl_out_warehouse"
	val OUT_WAREHOUSE_DETAIL: String = "tbl_out_warehouse_detail"
	val PKG: String = "tbl_pkg"
	val POSTAL_STANDARD: String = "tbl_postal_standard"
	val PUSH_WAREHOUSE: String = "tbl_push_warehouse"
	val PUSH_WAREHOUSE_DETAIL: String = "tbl_push_warehouse_detail"
	val ROUTE: String = "tbl_route"
	val SERVICE_EVALUATION: String = "tbl_service_evaluation"
	val STORE_GRID: String = "tbl_store_grid"
	val TRANSPORT_RECORD: String = "tbl_transport_record"
	val TRANSPORT_TOOL: String = "tbl_transport_tool"
	val VEHICLE_MONITOR: String = "tbl_vehicle_monitor"
	val WAREHOUSE: String = "tbl_warehouse"
	val WAREHOUSE_EMP: String = "tbl_warehouse_emp"
	val WAREHOUSE_RACK_MAP: String = "tbl_warehouse_rack_map"
	val WAREHOUSE_RECEIPT: String = "tbl_warehouse_receipt"
	val WAREHOUSE_RECEIPT_DETAIL: String = "tbl_warehouse_receipt_detail"
	val WAREHOUSE_SEND_VEHICLE: String = "tbl_warehouse_send_vehicle"
	val WAREHOUSE_TRANSPORT_TOOL: String = "tbl_warehouse_transport_tool"
	val WAREHOUSE_VEHICLE_MAP: String = "tbl_warehouse_vehicle_map"
	val WAY_BILL: String = "tbl_waybill"
	val WAYBILL_LINE: String = "tbl_waybill_line"
	val WAYBILL_STATE_RECORD: String = "tbl_waybill_state_record"
	val WORK_TIME: String = "tbl_work_time"
	
	// CRM 系统业务数据表名称
	val ADDRESS: String = "tbl_address"
	val CONSUMER_ADDRESS_MAP: String = "tbl_consumer_address_map"
	val CUSTOMER: String = "tbl_customer"
}
  • 2)、编写核心代码,从OggMessageBean中提取字段值,封装到具体POJO对象中

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-clUR4VHN-1652014600926)(1616120552233-1621984868453.)]

11-[掌握]-实时ETL开发之转换POJO【数据解析器】

任务:编写数据解析器中方法【toAreaBean】实现,从MessageBean中提取字段值,封装到POJO实体类对象。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-i2Q21jEU-1652014600927)(1616121581555-1621984868453.)]

  • 当提取MessageBean中数据字段值,如何将其封装到POJO对象中呢??

首先将数据段值:Map数据类型 转换 JSON字符串,再将JSON字符串转换为 POJO对象

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5HvECsad-1652014600927)(1616121249979-1621984868453.)]

最终实现代码如下所示:

package cn.itcast.logistics.etl.parse

import java.util

import cn.itcast.logistics.common.beans.logistics.AreasBean
import cn.itcast.logistics.common.beans.parser.{CanalMessageBean, MessageBean, OggMessageBean}
import com.alibaba.fastjson.JSON

object DataParser {
	
	/**
	 * 判断messageBean是否是OggMessageBean
	 */
	private def getOggMessageBean(bean: MessageBean): OggMessageBean = {
		bean match {
			case ogg: OggMessageBean => ogg
		}
	}
	
	/**
	 * 判断messageBean是否是CanalMessageBean
	 */
	private def getCanalMessageBean(bean: MessageBean): CanalMessageBean = {
		bean match {
			case canal: CanalMessageBean => canal
		}
	}
	
	/**
	 * 提取ogg(I、U、D)和canal(insert、update、delete)数据的optype属性,转换成统一的操作字符串
	 *
	 * @param opType 数据操作类型:insert、update、delete,任意一种
	 */
	private def getOpType(opType: String): String = {
		opType match {
			case "I" => "insert"
			case "U" => "update"
			case "D" => "delete"
			case "INSERT" => "insert"
			case "UPDATE" => "update"
			case "DELETE" => "delete"
			case _ => "insert"
		}
	}
	
	// ================== 物流Logistics系统业务数据解析 ==================
	/*
		从MessageBean提取数据字段值和数据操作类型,将其封装到具体POJO对象中
		TODO: 将物流Logistics系统:tbl_areas表的字段信息转换成AreaBean对象
	 */
	def toAreaBean(bean: MessageBean): AreasBean = {
		// a. 转换MessageBean对象为OggMessageBean对象
		val oggMessageBean: OggMessageBean = getOggMessageBean(bean)
		// b. 获取数据操作类型
		val opType: String = getOpType(oggMessageBean.getOp_type)
		// c. 获取操作数据值
		val dataValue: util.Map[String, AnyRef] = oggMessageBean.getValue
		// d. 将数据封装到POJO对象
		// 第一步、转换map为json字符串
		val dataJson: String = JSON.toJSONString(dataValue, true)
		println(dataJson)
		// 第二步、json字符串为pojo
		val areasBean = JSON.parseObject(dataJson, classOf[AreasBean])
		// 第三步、判断解析不为null时,设置数据操作类型
		if(null != areasBean){
			areasBean.setOpType(opType)
		}
		// e. 返回封装对象
		areasBean
	}
	
}

12-[理解]-实时ETL开发之转换POJO【隐式转换】

​ 当对Dataset数据结构进行操作时(调用函数,转换函数,比如map、flatMap)等,数据返回类型如果不是元组、CaseClass及基本类型,需要指定编码器Encoder

将JSON字符串转换为MessageBean对象时,指定Encoder编码器。

  • 方式一、方法后面紧跟圆括号,指定对应编码器

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-A9yz4kyl-1652014600927)(1616123417063-1621984868453.)]

  • 方式二、隐式参数,指定编码器,程序自动传入

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-TtT2n2Ek-1652014600928)(1616123450168-1621984868453.)]

​ 当从MessageBean中提取数据字段值以后,将其封装到对应POJO对象时,需要指定编码器,否则程序报错,具体如下图所示:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-KoWkGwAc-1652014600928)(1616123541413-1621984868453.)]

学习SparkSQL框架,如何导入元组类型、CaseClass类型和基本类型 编码器。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-MhaVBfT0-1652014600928)(1616123609397-1621984868453.)]

​ 查看implicits父类:SQLImplicits对象,包含很对定义隐式转换函数,返回类型都是各种编码器Encoder

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-kXuGyKtt-1652014600929)(1616123726466-1621984868453.)]

参考SQLImplicits对象中编码器定义,自己定义编码器:BeanImplicits,实现隐式导入和转换操作。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-kraEvGbD-1652014600929)(1616123830045-1621984868453.)]

package cn.itcast.logistics.common

import cn.itcast.logistics.common.beans.crm._
import cn.itcast.logistics.common.beans.logistics._
import cn.itcast.logistics.common.beans.parser.{CanalMessageBean, OggMessageBean}
import org.apache.spark.sql.{Encoder, Encoders}

/**
 * 扩展自定义POJO的隐式转换实现
 */
object BeanImplicits {
	
	// 定义MessageBean隐式参数Encoder值
	implicit def newOggMessageBeanEncoder: Encoder[OggMessageBean] = Encoders.bean(classOf[OggMessageBean])
	
	implicit def newCanalMessageBeanEncoder: Encoder[CanalMessageBean] = Encoders.bean(classOf[CanalMessageBean])
	
	// Logistics Bean
	implicit def newAreasBeanEncoder: Encoder[AreasBean] = Encoders.bean(classOf[AreasBean])
	
	implicit def newChargeStandardBeanEncoder: Encoder[ChargeStandardBean] = Encoders.bean(classOf[ChargeStandardBean])
	
	implicit def newCodesBeanEncoder: Encoder[CodesBean] = Encoders.bean(classOf[CodesBean])
	
	implicit def newCollectPackageBeanEncoder: Encoder[CollectPackageBean] = Encoders.bean(classOf[CollectPackageBean])
	
	implicit def newCompanyBeanEncoder: Encoder[CompanyBean] = Encoders.bean(classOf[CompanyBean])
	
	implicit def newCompanyDotMapBeanEncoder: Encoder[CompanyDotMapBean] = Encoders.bean(classOf[CompanyDotMapBean])
	
	implicit def newCompanyTransportRouteMaBeanEncoder: Encoder[CompanyTransportRouteMaBean] = Encoders.bean(classOf[CompanyTransportRouteMaBean])
	
	implicit def newCompanyWarehouseMapBeanEncoder: Encoder[CompanyWarehouseMapBean] = Encoders.bean(classOf[CompanyWarehouseMapBean])
	
	implicit def newConsumerSenderInfoBeanEncoder: Encoder[ConsumerSenderInfoBean] = Encoders.bean(classOf[ConsumerSenderInfoBean])
	
	implicit def newCourierBeanEncoder: Encoder[CourierBean] = Encoders.bean(classOf[CourierBean])
	
	implicit def newDeliverPackageBeanEncoder: Encoder[DeliverPackageBean] = Encoders.bean(classOf[DeliverPackageBean])
	
	implicit def newDeliverRegionBeanEncoder: Encoder[DeliverRegionBean] = Encoders.bean(classOf[DeliverRegionBean])
	
	implicit def newDeliveryRecordBeanEncoder: Encoder[DeliveryRecordBean] = Encoders.bean(classOf[DeliveryRecordBean])
	
	implicit def newDepartmentBeanEncoder: Encoder[DepartmentBean] = Encoders.bean(classOf[DepartmentBean])
	
	implicit def newDotBeanEncoder: Encoder[DotBean] = Encoders.bean(classOf[DotBean])
	
	implicit def newDotTransportToolBeanEncoder: Encoder[DotTransportToolBean] = Encoders.bean(classOf[DotTransportToolBean])
	
	implicit def newDriverBeanEncoder: Encoder[DriverBean] = Encoders.bean(classOf[DriverBean])
	
	implicit def newEmpBeanEncoder: Encoder[EmpBean] = Encoders.bean(classOf[EmpBean])
	
	implicit def newEmpInfoMapBeanEncoder: Encoder[EmpInfoMapBean] = Encoders.bean(classOf[EmpInfoMapBean])
	
	implicit def newExpressBillBeanEncoder: Encoder[ExpressBillBean] = Encoders.bean(classOf[ExpressBillBean])
	
	implicit def newExpressPackageBeanEncoder: Encoder[ExpressPackageBean] = Encoders.bean(classOf[ExpressPackageBean])
	
	implicit def newFixedAreaBeanEncoder: Encoder[FixedAreaBean] = Encoders.bean(classOf[FixedAreaBean])
	
	implicit def newGoodsRackBeanEncoder: Encoder[GoodsRackBean] = Encoders.bean(classOf[GoodsRackBean])
	
	implicit def newJobBeanEncoder: Encoder[JobBean] = Encoders.bean(classOf[JobBean])
	
	implicit def newOutWarehouseBeanEncoder: Encoder[OutWarehouseBean] = Encoders.bean(classOf[OutWarehouseBean])
	
	implicit def newOutWarehouseDetailBeanEncoder: Encoder[OutWarehouseDetailBean] = Encoders.bean(classOf[OutWarehouseDetailBean])
	
	implicit def newPkgBeanEncoder: Encoder[PkgBean] = Encoders.bean(classOf[PkgBean])
	
	implicit def newPostalStandardBeanEncoder: Encoder[PostalStandardBean] = Encoders.bean(classOf[PostalStandardBean])
	
	implicit def newPushWarehouseBeanEncoder: Encoder[PushWarehouseBean] = Encoders.bean(classOf[PushWarehouseBean])
	
	implicit def newPushWarehouseDetailBeanEncoder: Encoder[PushWarehouseDetailBean] = Encoders.bean(classOf[PushWarehouseDetailBean])
	
	implicit def newRouteBeanEncoder: Encoder[RouteBean] = Encoders.bean(classOf[RouteBean])
	
	implicit def newServiceEvaluationBeanEncoder: Encoder[ServiceEvaluationBean] = Encoders.bean(classOf[ServiceEvaluationBean])
	
	implicit def newStoreGridBeanEncoder: Encoder[StoreGridBean] = Encoders.bean(classOf[StoreGridBean])
	
	implicit def newTransportToolBeanEncoder: Encoder[TransportToolBean] = Encoders.bean(classOf[TransportToolBean])
	
	implicit def newVehicleMonitorBeanEncoder: Encoder[VehicleMonitorBean] = Encoders.bean(classOf[VehicleMonitorBean])
	
	implicit def newWarehouseBeanEncoder: Encoder[WarehouseBean] = Encoders.bean(classOf[WarehouseBean])
	
	implicit def newWarehouseEmpBeanEncoder: Encoder[WarehouseEmpBean] = Encoders.bean(classOf[WarehouseEmpBean])
	
	implicit def newWarehouseRackMapBeanEncoder: Encoder[WarehouseRackMapBean] = Encoders.bean(classOf[WarehouseRackMapBean])
	
	implicit def newWarehouseReceiptBeanEncoder: Encoder[WarehouseReceiptBean] = Encoders.bean(classOf[WarehouseReceiptBean])
	
	implicit def newWarehouseReceiptDetailBeanEncoder: Encoder[WarehouseReceiptDetailBean] = Encoders.bean(classOf[WarehouseReceiptDetailBean])
	
	implicit def newWarehouseSendVehicleBeanEncoder: Encoder[WarehouseSendVehicleBean] = Encoders.bean(classOf[WarehouseSendVehicleBean])
	
	implicit def newWarehouseTransportToolBeanEncoder: Encoder[WarehouseTransportToolBean] = Encoders.bean(classOf[WarehouseTransportToolBean])
	
	implicit def newWarehouseVehicleMapBeanEncoder: Encoder[WarehouseVehicleMapBean] = Encoders.bean(classOf[WarehouseVehicleMapBean])
	
	implicit def newWaybillBeanEncoder: Encoder[WaybillBean] = Encoders.bean(classOf[WaybillBean])
	
	implicit def newWaybillLineBeanEncoder: Encoder[WaybillLineBean] = Encoders.bean(classOf[WaybillLineBean])
	
	implicit def newWaybillStateRecordBeanEncoder: Encoder[WaybillStateRecordBean] = Encoders.bean(classOf[WaybillStateRecordBean])
	
	implicit def newWorkTimeBeanEncoder: Encoder[WorkTimeBean] = Encoders.bean(classOf[WorkTimeBean])
	
	implicit def newTransportRecordBeanEncoder: Encoder[TransportRecordBean] = Encoders.bean(classOf[TransportRecordBean])
	
	// CRM Bean
	implicit def newCustomerBeanEncoder: Encoder[CustomerBean] = Encoders.bean(classOf[CustomerBean])
	
	implicit def newAddressBeanEncoder: Encoder[AddressBean] = Encoders.bean(classOf[AddressBean])
	
	implicit def newConsumerAddressMapBeanEncoder: Encoder[ConsumerAddressMapBean] = Encoders.bean(classOf[ConsumerAddressMapBean])
	
}

在流式程序代码中,导入自定义隐式转换对象即可。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-oNnoiPwH-1652014600929)(1616123959706-1621984868453.)]

13-[掌握]-实时ETL开发之OggBean转换POJO测试

任务:启动容器,进入容器中启动Oracle数据库和OGG,修改Oracle数据库表的数据测试应用

  • 1)、启动Kafka消息队列

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-iEpB4Ge7-1652014600930)(1616124159348-1621984868453.)]

  • 2)、启动Oracle数据库和OGG

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-BWH9eKqo-1652014600930)(1616124185566-1621984868453.)]

  • 3)、运行流式应用程序

首先,如果检查点目录存在,最好将其删除;此外,注释掉消费CRM系统数据代码,此时仅仅测试物流系统数据

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-t8nzhTJ0-1652014600930)(1616124259387-1621984868453.)]

启动流式应用程式,执行操作,查看控制台输出结果,可以看到将Map集合(存储数据)转换为JSON字符串

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-7dGFJr0B-1652014600931)(1616124564721-1621984868453.)]

  • 4)、使用DBeave对数据库表的数据进行更新和删除

更新2条数据,删除1条数据,看到如下界面:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4ADiDT4K-1652014600931)(1616124523908-1621984868453.)]

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。