如何将spark-sql的Row转成Java对象?

举报
孙中明 发表于 2022/02/23 08:29:05 2022/02/23
【摘要】 如何将spark-sql的Row转成Java对象?Dataset转POJO将查询出的结果转为RDD将RDD创建为DataFrame,并传入schema参数调用as方法,将Dataset转为相应的POJO Dataset调用collectAsList()方法SparkSession spark = CloudUtils.getSparkSession(); // 查询原始数据 ...

如何将spark-sql的Row转成Java对象?

Dataset转POJO

  1. 将查询出的结果转为RDD
  2. 将RDD创建为DataFrame,并传入schema参数
  3. 调用as方法,将Dataset转为相应的POJO Dataset
  4. 调用collectAsList()方法
SparkSession spark = CloudUtils.getSparkSession();

        // 查询原始数据
        Dataset<Row> student = spark.sql("select * from `event`.`student`");
        // 生成schema
        List<StructField> fields = new ArrayList<>();
        fields.add(DataTypes.createStructField("id", DataTypes.StringType, true));
        fields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
        fields.add(DataTypes.createStructField("major", DataTypes.StringType, true));
        StructType schema = DataTypes.createStructType(fields);

        // 转换查询结果为POJO List
        List<Student> students = spark.createDataFrame(student.toJavaRDD(), schema)
                .as(Encoders.bean(Student.class))
                .collectAsList();
        System.out.println(students);


Dataset中的日期类型为timestamp和java中的Date类型不兼容,和Timestamp类型相互兼容。
为了解决上述问题,我们可以先将Dataset转为JSON,然后将JSON转为POJO,代码

        // 查出数据并转为json集合
        List<String> jsonList = spark.sql("select * from `event`.`user`")
                .toJSON()
                .collectAsList();
        // 将json转为pojo,这里使用的是FastJSON        
        List<User> users = jsonList.stream()
                .map(jsonString -> JSON.parseObject(jsonString, User.class))
                .collect(Collectors.toList());
        System.out.println(users);

POJO转Dataset

        // 获取users列表
        List<User> users = createUsers();
        // 使用createDataFrame转为dataset
        Dataset<Row> ds = spark.createDataFrame(users, User.class);
        // 将驼峰式列名改为下划线式列名,camelToUnderline方法网上搜索
        String[] columns = ds.columns();
        String[] newColumns = Arrays.stream(columns)
                .map(column -> camelToUnderline(column))
                .toArray(String[]::new);
        // 转为新的df(重命名后的)
        ds.toDF(newColumns);
        ds.show();
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。