客快物流大数据项目(五十九):定义解析kafka数据的Bean对象类

举报
Lansonli 发表于 2022/03/09 00:25:26 2022/03/09
【摘要】 定义解析kafka数据的Bean对象类 一、定义消费kafka字符串的Bean对象基类 根据数据来源不同可以分为OGG数据和Canal数据,两者之间有相同的属性:table,因此将该属性作为公共属性进行提取,抽象成基类 实现步骤: 在公共模块的java目录下的 parser 包下创建 MessageBean ...

定义解析kafka数据的Bean对象类

一、定义消费kafka字符串的Bean对象基类

根据数据来源不同可以分为OGG数据Canal数据,两者之间有相同的属性:table,因此将该属性作为公共属性进行提取,抽象成基类

实现步骤:

  • 公共模块java目录下的 parser 包下创建 MessageBean 抽象类
  • 编写代码
    • 继承自 Serializable 接口
    • 创建 serialVersionUID 属性
    • 定义 table 属性,实现 setter/getter 方法

参考代码:


      package cn.it.logistics.common.beans.parser;
      import java.io.Serializable;
      /**
       * 根据数据源定义抽象类,数据源:
       * 1)ogg
       * 2)canal
       * 两者有共同的table属性
       */
      public abstract class MessageBean implements Serializable {
         private static final long serialVersionUID = -8216415778785426469L;
         private String table;
         public String getTable() {
             return table;
          }
         public void setTable(String table) {
             this.table = table;
          }
         @Override
         public String toString() {
             return table;
          }
      }
  

为什么创建serialVersionUID

serialVersionUID适用于Java的序列化机制。简单来说,Java的序列化机制是通过判断类的serialVersionUID来验证版本一致性的。在进行反序列化时,JVM会把传来的字节流中的serialVersionUID与本地相应实体类的serialVersionUID进行比较,如果相同就认为是一致的,可以进行反序列化,否则就会出现序列化版本不一致的异常,即是InvalidCastException。

使用idea生成serialVersionUID

操作步骤

说明

1

设置自动生成 serialVersionUID

 

 


2

选中对应的类名,然后按 alt+enter 快捷键

 

 


3

结果显示

 

 



二、​​​​​​​定义消费OGG字符串的Bean对象

实现步骤:

  • 公共模块parser 包下创建 OggMessageBean
  • 继承自 MessageBean 抽象类

参考代码:


      package cn.it.logistics.common.beans.parser;
      import javax.print.DocFlavor;
      import java.util.Map;
      /**
       * 定义消费出来的ogg的数据的javaBean对象
       * {
       * "table": "IT.tbl_route", //表名:库名.表名
       * "op_type": "U", //操作类型:U表示修改
       * "op_ts": "2020-10-08 09:10:54.000774",
       * "current_ts": "2020-10-08T09:11:01.925000",
       * "pos": "00000000200006645758",
       * "before": { //操作前的字段集合
       * "id": 104,
       * "start_station": "东莞中心",
       * "start_station_area_id": 441900,
       * "start_warehouse_id": 1,
       * "end_station": "蚌埠中转部",
       * "end_station_area_id": 340300,
       * "end_warehouse_id": 107,
       * "mileage_m": 1369046,
       * "time_consumer_minute": 56172,
       * "state": 1,
       * "cdt": "2020-02-02 18:51:39",
       * "udt": "2020-02-02 18:51:39",
       * "remark": null
       * },
       * "after": { //操作后的字段集合
       * "id": 104,
       * "start_station": "东莞中心",
       * "start_station_area_id": 441900,
       * "start_warehouse_id": 1,
       * "end_station": "TBD",
       * "end_station_area_id": 340300,
       * "end_warehouse_id": 107,
       * "mileage_m": 1369046,
       * "time_consumer_minute": 56172,
       * "state": 1,
       * "cdt": "2020-02-02 18:51:39",
       * "udt": "2020-02-02 18:51:39",
       * "remark": null
       * }
       * }
       */
      public class OggMessageBean extends MessageBean {
         //定义操作类型
         private String op_type;
         @Override
         public void setTable(String table) {
             //如果表名不为空
             if (table != null && !table.equals("")) {
                  table = table.replaceAll("[A-Z]+\\.", "");
              }
             super.setTable(table);
          }
         public String getOp_type() {
             return op_type;
          }
         public void setOp_type(String op_type) {
             this.op_type = op_type;
          }
         public String getOp_ts() {
             return op_ts;
          }
         public void setOp_ts(String op_ts) {
             this.op_ts = op_ts;
          }
         public String getCurrent_ts() {
             return current_ts;
          }
         public void setCurrent_ts(String current_ts) {
             this.current_ts = current_ts;
          }
         public String getPos() {
             return pos;
          }
         public void setPos(String pos) {
             this.pos = pos;
          }
         public Map<String, Object> getBefore() {
             return before;
          }
         public void setBefore(Map<String, Object> before) {
             this.before = before;
          }
         public Map<String, Object> getAfter() {
             return after;
          }
         public void setAfter(Map<String, Object> after) {
             this.after = after;
          }
         //操作时间
         private String op_ts;
         @Override
         public String toString() {
             return "OggMessageBean{" +
                     "table='" + super.getTable() + '\'' +
                     ", op_type='" + op_type + '\'' +
                     ", op_ts='" + op_ts + '\'' +
                     ", current_ts='" + current_ts + '\'' +
                     ", pos='" + pos + '\'' +
                     ", before=" + before +
                     ", after=" + after +
                     '}';
          }
         /**
       * 返回需要处理的列的集合
       * @return
       */
         public Map<String, Object> getValue() {
             //如果执行的是删除操作,则返回before节点的列的集合,如果执行的是插入和更新操作,则返回after节点的列的集合
             if (after == null) {
                 return before;
              } else {
                 return after;
              }
          }
         //同步时间
         private String current_ts;
         //偏移量
         private String pos;
         //操作之前的数据
         private Map<String, Object> before;
         //操作之后的数据
         private Map<String, Object> after;
      }
  


三、​​​​​​​定义消费Canal字符串的Bean对象

实现步骤:

  • 公共模块parser 包下创建 CanalMessageBean
  • 继承自 MessageBean 抽象类

参考代码:


      package cn.it.logistics.common.beans.parser;
      import java.util.List;
      import java.util.Map;
      /**
       * 定义消费出来的canal的数据对应的javaBean对象
       * {
       * "data": [{
       * "id": "1",
       * "name": "北京",
       * "tel": "222",
       * "mobile": "1111",
       * "detail_addr": "北京",
       * "area_id": "1",
       * "gis_addr": "1",
       * "cdt": "2020-10-08 17:20:12",
       * "udt": "2020-11-05 17:20:16",
       * "remark": null
       * }],
       * "database": "crm",
       * "es": 1602148867000,
       * "id": 15,
       * "isDdl": false,
       * "mysqlType": {
       * "id": "bigint(20)",
       * "name": "varchar(50)",
       * "tel": "varchar(20)",
       * "mobile": "varchar(20)",
       * "detail_addr": "varchar(100)",
       * "area_id": "bigint(20)",
       * "gis_addr": "varchar(20)",
       * "cdt": "datetime",
       * "udt": "datetime",
       * "remark": "varchar(100)"
       * },
       * "old": [{
       * "tel": "111"
       * }],
       * "sql": "",
       * "sqlType": {
       * "id": -5,
       * "name": 12,
       * "tel": 12,
       * "mobile": 12,
       * "detail_addr": 12,
       * "area_id": -5,
       * "gis_addr": 12,
       * "cdt": 93,
       * "udt": 93,
       * "remark": 12
       * },
       * "table": "crm_address",
       * "ts": 1602148867311,
       * "type": "UPDATE" //修改数据
       * }
       */
      public class CanalMessageBean extends MessageBean {
         //操作的数据集合
         private List<Map<String, Object>> data;
         public List<Map<String, Object>> getData() {
             return data;
          }
         public void setData(List<Map<String, Object>> data) {
             this.data = data;
          }
         public String getDatabase() {
             return database;
          }
         public void setDatabase(String database) {
             this.database = database;
          }
         public Long getEs() {
             return es;
          }
         public void setEs(Long es) {
             this.es = es;
          }
         public Long getId() {
             return id;
          }
         public void setId(Long id) {
             this.id = id;
          }
         public boolean isDdl() {
             return isDdl;
          }
         public void setDdl(boolean ddl) {
              isDdl = ddl;
          }
         public Map<String, Object> getMysqlType() {
             return mysqlType;
          }
         public void setMysqlType(Map<String, Object> mysqlType) {
             this.mysqlType = mysqlType;
          }
         public String getOld() {
             return old;
          }
         public void setOld(String old) {
             this.old = old;
          }
         public String getSql() {
             return sql;
          }
         public void setSql(String sql) {
             this.sql = sql;
          }
         public Map<String, Object> getSqlType() {
             return sqlType;
          }
         public void setSqlType(Map<String, Object> sqlType) {
             this.sqlType = sqlType;
          }
         public Long getTs() {
             return ts;
          }
         public void setTs(Long ts) {
             this.ts = ts;
          }
         public String getType() {
             return type;
          }
         public void setType(String type) {
             this.type = type;
          }
         //数据库名称
         private String database;
         private Long es;
         private Long id;
         private boolean isDdl;
         private Map<String, Object> mysqlType;
         private String old;
         private String sql;
         private Map<String, Object> sqlType;
         private Long ts;
         private String type;
         /**
       * 重写父类的settable方法,将表名修改成统一的前缀
       * @param table
       */
         @Override
         public void setTable(String table) {
             if(table!=null && !table.equals("")){
                 if(table.startsWith("crm_")) {
                      table = table.replace("crm_", "tbl_");
                  }
              }
             super.setTable(table);
          }
      }
  

【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。