客快物流大数据项目(五十九):定义解析kafka数据的Bean对象类

举报
Lansonli 发表于 2022/03/09 00:25:26 2022/03/09
【摘要】 定义解析kafka数据的Bean对象类 一、定义消费kafka字符串的Bean对象基类 根据数据来源不同可以分为OGG数据和Canal数据,两者之间有相同的属性:table,因此将该属性作为公共属性进行提取,抽象成基类 实现步骤: 在公共模块的java目录下的 parser 包下创建 MessageBean ...

定义解析kafka数据的Bean对象类

一、定义消费kafka字符串的Bean对象基类

根据数据来源不同可以分为OGG数据Canal数据,两者之间有相同的属性:table,因此将该属性作为公共属性进行提取,抽象成基类

实现步骤:

  • 公共模块java目录下的 parser 包下创建 MessageBean 抽象类
  • 编写代码
    • 继承自 Serializable 接口
    • 创建 serialVersionUID 属性
    • 定义 table 属性,实现 setter/getter 方法

参考代码:


  
   
    
     
    
    
     
      package cn.it.logistics.common.beans.parser;
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
     
      import java.io.Serializable;
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
     
      /**
     
    
   
    
     
    
    
     
       * 根据数据源定义抽象类,数据源:
     
    
   
    
     
    
    
     
       * 1)ogg
     
    
   
    
     
    
    
     
       * 2)canal
     
    
   
    
     
    
    
     
       * 两者有共同的table属性
     
    
   
    
     
    
    
     
       */
     
    
   
    
     
    
    
     
      public abstract class MessageBean implements Serializable {
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         private static final long serialVersionUID = -8216415778785426469L;
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         private String table;
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         public String getTable() {
     
    
   
    
     
    
    
             return table;
     
    
   
    
     
    
    
     
          }
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         public void setTable(String table) {
     
    
   
    
     
    
    
             this.table = table;
     
    
   
    
     
    
    
     
          }
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         @Override
     
    
   
    
     
    
    
         public String toString() {
     
    
   
    
     
    
    
             return table;
     
    
   
    
     
    
    
     
          }
     
    
   
    
     
    
    
     
      }
     
    
  

为什么创建serialVersionUID

serialVersionUID适用于Java的序列化机制。简单来说,Java的序列化机制是通过判断类的serialVersionUID来验证版本一致性的。在进行反序列化时,JVM会把传来的字节流中的serialVersionUID与本地相应实体类的serialVersionUID进行比较,如果相同就认为是一致的,可以进行反序列化,否则就会出现序列化版本不一致的异常,即是InvalidCastException。

使用idea生成serialVersionUID

操作步骤

说明

1

设置自动生成 serialVersionUID

 

 


2

选中对应的类名,然后按 alt+enter 快捷键

 

 


3

结果显示

 

 



二、​​​​​​​定义消费OGG字符串的Bean对象

实现步骤:

  • 公共模块parser 包下创建 OggMessageBean
  • 继承自 MessageBean 抽象类

参考代码:


  
   
    
     
    
    
     
      package cn.it.logistics.common.beans.parser;
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
     
      import javax.print.DocFlavor;
     
    
   
    
     
    
    
     
      import java.util.Map;
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
     
      /**
     
    
   
    
     
    
    
     
       * 定义消费出来的ogg的数据的javaBean对象
     
    
   
    
     
    
    
     
       * {
     
    
   
    
     
    
    
     
       * "table": "IT.tbl_route", //表名:库名.表名
     
    
   
    
     
    
    
     
       * "op_type": "U", //操作类型:U表示修改
     
    
   
    
     
    
    
     
       * "op_ts": "2020-10-08 09:10:54.000774",
     
    
   
    
     
    
    
     
       * "current_ts": "2020-10-08T09:11:01.925000",
     
    
   
    
     
    
    
     
       * "pos": "00000000200006645758",
     
    
   
    
     
    
    
     
       * "before": { //操作前的字段集合
     
    
   
    
     
    
    
     
       * "id": 104,
     
    
   
    
     
    
    
     
       * "start_station": "东莞中心",
     
    
   
    
     
    
    
     
       * "start_station_area_id": 441900,
     
    
   
    
     
    
    
     
       * "start_warehouse_id": 1,
     
    
   
    
     
    
    
     
       * "end_station": "蚌埠中转部",
     
    
   
    
     
    
    
     
       * "end_station_area_id": 340300,
     
    
   
    
     
    
    
     
       * "end_warehouse_id": 107,
     
    
   
    
     
    
    
     
       * "mileage_m": 1369046,
     
    
   
    
     
    
    
     
       * "time_consumer_minute": 56172,
     
    
   
    
     
    
    
     
       * "state": 1,
     
    
   
    
     
    
    
     
       * "cdt": "2020-02-02 18:51:39",
     
    
   
    
     
    
    
     
       * "udt": "2020-02-02 18:51:39",
     
    
   
    
     
    
    
     
       * "remark": null
     
    
   
    
     
    
    
     
       * },
     
    
   
    
     
    
    
     
       * "after": { //操作后的字段集合
     
    
   
    
     
    
    
     
       * "id": 104,
     
    
   
    
     
    
    
     
       * "start_station": "东莞中心",
     
    
   
    
     
    
    
     
       * "start_station_area_id": 441900,
     
    
   
    
     
    
    
     
       * "start_warehouse_id": 1,
     
    
   
    
     
    
    
     
       * "end_station": "TBD",
     
    
   
    
     
    
    
     
       * "end_station_area_id": 340300,
     
    
   
    
     
    
    
     
       * "end_warehouse_id": 107,
     
    
   
    
     
    
    
     
       * "mileage_m": 1369046,
     
    
   
    
     
    
    
     
       * "time_consumer_minute": 56172,
     
    
   
    
     
    
    
     
       * "state": 1,
     
    
   
    
     
    
    
     
       * "cdt": "2020-02-02 18:51:39",
     
    
   
    
     
    
    
     
       * "udt": "2020-02-02 18:51:39",
     
    
   
    
     
    
    
     
       * "remark": null
     
    
   
    
     
    
    
     
       * }
     
    
   
    
     
    
    
     
       * }
     
    
   
    
     
    
    
     
       */
     
    
   
    
     
    
    
     
      public class OggMessageBean extends MessageBean {
     
    
   
    
     
    
    
         //定义操作类型
     
    
   
    
     
    
    
         private String op_type;
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         @Override
     
    
   
    
     
    
    
         public void setTable(String table) {
     
    
   
    
     
    
    
             //如果表名不为空
     
    
   
    
     
    
    
             if (table != null && !table.equals("")) {
     
    
   
    
     
    
    
     
                  table = table.replaceAll("[A-Z]+\\.", "");
     
    
   
    
     
    
    
     
              }
     
    
   
    
     
    
    
             super.setTable(table);
     
    
   
    
     
    
    
     
          }
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         public String getOp_type() {
     
    
   
    
     
    
    
             return op_type;
     
    
   
    
     
    
    
     
          }
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         public void setOp_type(String op_type) {
     
    
   
    
     
    
    
             this.op_type = op_type;
     
    
   
    
     
    
    
     
          }
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         public String getOp_ts() {
     
    
   
    
     
    
    
             return op_ts;
     
    
   
    
     
    
    
     
          }
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         public void setOp_ts(String op_ts) {
     
    
   
    
     
    
    
             this.op_ts = op_ts;
     
    
   
    
     
    
    
     
          }
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         public String getCurrent_ts() {
     
    
   
    
     
    
    
             return current_ts;
     
    
   
    
     
    
    
     
          }
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         public void setCurrent_ts(String current_ts) {
     
    
   
    
     
    
    
             this.current_ts = current_ts;
     
    
   
    
     
    
    
     
          }
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         public String getPos() {
     
    
   
    
     
    
    
             return pos;
     
    
   
    
     
    
    
     
          }
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         public void setPos(String pos) {
     
    
   
    
     
    
    
             this.pos = pos;
     
    
   
    
     
    
    
     
          }
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         public Map<String, Object> getBefore() {
     
    
   
    
     
    
    
             return before;
     
    
   
    
     
    
    
     
          }
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         public void setBefore(Map<String, Object> before) {
     
    
   
    
     
    
    
             this.before = before;
     
    
   
    
     
    
    
     
          }
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         public Map<String, Object> getAfter() {
     
    
   
    
     
    
    
             return after;
     
    
   
    
     
    
    
     
          }
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         public void setAfter(Map<String, Object> after) {
     
    
   
    
     
    
    
             this.after = after;
     
    
   
    
     
    
    
     
          }
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         //操作时间
     
    
   
    
     
    
    
         private String op_ts;
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         @Override
     
    
   
    
     
    
    
         public String toString() {
     
    
   
    
     
    
    
             return "OggMessageBean{" +
     
    
   
    
     
    
    
                     "table='" + super.getTable() + '\'' +
     
    
   
    
     
    
    
                     ", op_type='" + op_type + '\'' +
     
    
   
    
     
    
    
                     ", op_ts='" + op_ts + '\'' +
     
    
   
    
     
    
    
                     ", current_ts='" + current_ts + '\'' +
     
    
   
    
     
    
    
                     ", pos='" + pos + '\'' +
     
    
   
    
     
    
    
                     ", before=" + before +
     
    
   
    
     
    
    
                     ", after=" + after +
     
    
   
    
     
    
    
                     '}';
     
    
   
    
     
    
    
     
          }
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         /**
     
    
   
    
     
    
    
     
       * 返回需要处理的列的集合
     
    
   
    
     
    
    
     
       * @return
     
    
   
    
     
    
    
     
       */
     
    
   
    
     
    
    
         public Map<String, Object> getValue() {
     
    
   
    
     
    
    
             //如果执行的是删除操作,则返回before节点的列的集合,如果执行的是插入和更新操作,则返回after节点的列的集合
     
    
   
    
     
    
    
             if (after == null) {
     
    
   
    
     
    
    
                 return before;
     
    
   
    
     
    
    
     
              } else {
     
    
   
    
     
    
    
                 return after;
     
    
   
    
     
    
    
     
              }
     
    
   
    
     
    
    
     
          }
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         //同步时间
     
    
   
    
     
    
    
         private String current_ts;
     
    
   
    
     
    
    
         //偏移量
     
    
   
    
     
    
    
         private String pos;
     
    
   
    
     
    
    
         //操作之前的数据
     
    
   
    
     
    
    
         private Map<String, Object> before;
     
    
   
    
     
    
    
         //操作之后的数据
     
    
   
    
     
    
    
         private Map<String, Object> after;
     
    
   
    
     
    
    
     
      }
     
    
  


三、​​​​​​​定义消费Canal字符串的Bean对象

实现步骤:

  • 公共模块parser 包下创建 CanalMessageBean
  • 继承自 MessageBean 抽象类

参考代码:


  
   
    
     
    
    
     
      package cn.it.logistics.common.beans.parser;
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
     
      import java.util.List;
     
    
   
    
     
    
    
     
      import java.util.Map;
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
     
      /**
     
    
   
    
     
    
    
     
       * 定义消费出来的canal的数据对应的javaBean对象
     
    
   
    
     
    
    
     
       * {
     
    
   
    
     
    
    
     
       * "data": [{
     
    
   
    
     
    
    
     
       * "id": "1",
     
    
   
    
     
    
    
     
       * "name": "北京",
     
    
   
    
     
    
    
     
       * "tel": "222",
     
    
   
    
     
    
    
     
       * "mobile": "1111",
     
    
   
    
     
    
    
     
       * "detail_addr": "北京",
     
    
   
    
     
    
    
     
       * "area_id": "1",
     
    
   
    
     
    
    
     
       * "gis_addr": "1",
     
    
   
    
     
    
    
     
       * "cdt": "2020-10-08 17:20:12",
     
    
   
    
     
    
    
     
       * "udt": "2020-11-05 17:20:16",
     
    
   
    
     
    
    
     
       * "remark": null
     
    
   
    
     
    
    
     
       * }],
     
    
   
    
     
    
    
     
       * "database": "crm",
     
    
   
    
     
    
    
     
       * "es": 1602148867000,
     
    
   
    
     
    
    
     
       * "id": 15,
     
    
   
    
     
    
    
     
       * "isDdl": false,
     
    
   
    
     
    
    
     
       * "mysqlType": {
     
    
   
    
     
    
    
     
       * "id": "bigint(20)",
     
    
   
    
     
    
    
     
       * "name": "varchar(50)",
     
    
   
    
     
    
    
     
       * "tel": "varchar(20)",
     
    
   
    
     
    
    
     
       * "mobile": "varchar(20)",
     
    
   
    
     
    
    
     
       * "detail_addr": "varchar(100)",
     
    
   
    
     
    
    
     
       * "area_id": "bigint(20)",
     
    
   
    
     
    
    
     
       * "gis_addr": "varchar(20)",
     
    
   
    
     
    
    
     
       * "cdt": "datetime",
     
    
   
    
     
    
    
     
       * "udt": "datetime",
     
    
   
    
     
    
    
     
       * "remark": "varchar(100)"
     
    
   
    
     
    
    
     
       * },
     
    
   
    
     
    
    
     
       * "old": [{
     
    
   
    
     
    
    
     
       * "tel": "111"
     
    
   
    
     
    
    
     
       * }],
     
    
   
    
     
    
    
     
       * "sql": "",
     
    
   
    
     
    
    
     
       * "sqlType": {
     
    
   
    
     
    
    
     
       * "id": -5,
     
    
   
    
     
    
    
     
       * "name": 12,
     
    
   
    
     
    
    
     
       * "tel": 12,
     
    
   
    
     
    
    
     
       * "mobile": 12,
     
    
   
    
     
    
    
     
       * "detail_addr": 12,
     
    
   
    
     
    
    
     
       * "area_id": -5,
     
    
   
    
     
    
    
     
       * "gis_addr": 12,
     
    
   
    
     
    
    
     
       * "cdt": 93,
     
    
   
    
     
    
    
     
       * "udt": 93,
     
    
   
    
     
    
    
     
       * "remark": 12
     
    
   
    
     
    
    
     
       * },
     
    
   
    
     
    
    
     
       * "table": "crm_address",
     
    
   
    
     
    
    
     
       * "ts": 1602148867311,
     
    
   
    
     
    
    
     
       * "type": "UPDATE" //修改数据
     
    
   
    
     
    
    
     
       * }
     
    
   
    
     
    
    
     
       */
     
    
   
    
     
    
    
     
      public class CanalMessageBean extends MessageBean {
     
    
   
    
     
    
    
         //操作的数据集合
     
    
   
    
     
    
    
         private List<Map<String, Object>> data;
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         public List<Map<String, Object>> getData() {
     
    
   
    
     
    
    
             return data;
     
    
   
    
     
    
    
     
          }
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         public void setData(List<Map<String, Object>> data) {
     
    
   
    
     
    
    
             this.data = data;
     
    
   
    
     
    
    
     
          }
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         public String getDatabase() {
     
    
   
    
     
    
    
             return database;
     
    
   
    
     
    
    
     
          }
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         public void setDatabase(String database) {
     
    
   
    
     
    
    
             this.database = database;
     
    
   
    
     
    
    
     
          }
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         public Long getEs() {
     
    
   
    
     
    
    
             return es;
     
    
   
    
     
    
    
     
          }
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         public void setEs(Long es) {
     
    
   
    
     
    
    
             this.es = es;
     
    
   
    
     
    
    
     
          }
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         public Long getId() {
     
    
   
    
     
    
    
             return id;
     
    
   
    
     
    
    
     
          }
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         public void setId(Long id) {
     
    
   
    
     
    
    
             this.id = id;
     
    
   
    
     
    
    
     
          }
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         public boolean isDdl() {
     
    
   
    
     
    
    
             return isDdl;
     
    
   
    
     
    
    
     
          }
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         public void setDdl(boolean ddl) {
     
    
   
    
     
    
    
     
              isDdl = ddl;
     
    
   
    
     
    
    
     
          }
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         public Map<String, Object> getMysqlType() {
     
    
   
    
     
    
    
             return mysqlType;
     
    
   
    
     
    
    
     
          }
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         public void setMysqlType(Map<String, Object> mysqlType) {
     
    
   
    
     
    
    
             this.mysqlType = mysqlType;
     
    
   
    
     
    
    
     
          }
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         public String getOld() {
     
    
   
    
     
    
    
             return old;
     
    
   
    
     
    
    
     
          }
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         public void setOld(String old) {
     
    
   
    
     
    
    
             this.old = old;
     
    
   
    
     
    
    
     
          }
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         public String getSql() {
     
    
   
    
     
    
    
             return sql;
     
    
   
    
     
    
    
     
          }
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         public void setSql(String sql) {
     
    
   
    
     
    
    
             this.sql = sql;
     
    
   
    
     
    
    
     
          }
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         public Map<String, Object> getSqlType() {
     
    
   
    
     
    
    
             return sqlType;
     
    
   
    
     
    
    
     
          }
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         public void setSqlType(Map<String, Object> sqlType) {
     
    
   
    
     
    
    
             this.sqlType = sqlType;
     
    
   
    
     
    
    
     
          }
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         public Long getTs() {
     
    
   
    
     
    
    
             return ts;
     
    
   
    
     
    
    
     
          }
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         public void setTs(Long ts) {
     
    
   
    
     
    
    
             this.ts = ts;
     
    
   
    
     
    
    
     
          }
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         public String getType() {
     
    
   
    
     
    
    
             return type;
     
    
   
    
     
    
    
     
          }
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         public void setType(String type) {
     
    
   
    
     
    
    
             this.type = type;
     
    
   
    
     
    
    
     
          }
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         //数据库名称
     
    
   
    
     
    
    
         private String database;
     
    
   
    
     
    
    
         private Long es;
     
    
   
    
     
    
    
         private Long id;
     
    
   
    
     
    
    
         private boolean isDdl;
     
    
   
    
     
    
    
         private Map<String, Object> mysqlType;
     
    
   
    
     
    
    
         private String old;
     
    
   
    
     
    
    
         private String sql;
     
    
   
    
     
    
    
         private Map<String, Object> sqlType;
     
    
   
    
     
    
    
         private Long ts;
     
    
   
    
     
    
    
         private String type;
     
    
   
    
     
    
    
      
     
    
   
    
     
    
    
         /**
     
    
   
    
     
    
    
     
       * 重写父类的settable方法,将表名修改成统一的前缀
     
    
   
    
     
    
    
     
       * @param table
     
    
   
    
     
    
    
     
       */
     
    
   
    
     
    
    
         @Override
     
    
   
    
     
    
    
         public void setTable(String table) {
     
    
   
    
     
    
    
             if(table!=null && !table.equals("")){
     
    
   
    
     
    
    
                 if(table.startsWith("crm_")) {
     
    
   
    
     
    
    
     
                      table = table.replace("crm_", "tbl_");
     
    
   
    
     
    
    
     
                  }
     
    
   
    
     
    
    
     
              }
     
    
   
    
     
    
    
             super.setTable(table);
     
    
   
    
     
    
    
     
          }
     
    
   
    
     
    
    
     
      }
     
    
  

【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。