Hive 拉链表详解及实例

举报
Byyyi耀 发表于 2024/05/06 10:57:48 2024/05/06
【摘要】 拉链表版本迭代:hive 0.14 slowly changing dimension => hive 2.6.0 merge 事务管理原来采用分区表,用户分区存储历史增量数据,缺点是重复数据太多定义:数仓用于解决持续增长且存在一定时间时间范围内重复的数据存储:创建拉链表时使用列式存储ORC:不能使用load加载数据压缩比高 效率高场景:【数据规模庞大】,新数据【在有限的时间】内存在多种状...

拉链表

  • 版本迭代:hive 0.14 slowly changing dimension => hive 2.6.0 merge 事务管理

    • 原来采用分区表,用户分区存储历史增量数据,缺点是重复数据太多
  • 定义:数仓用于解决持续增长且存在一定时间时间范围内重复的数据

  • 存储:创建拉链表时使用列式存储ORC
    不能使用load加载数据
    压缩比高 效率高

  • 场景:【数据规模庞大】,新数据【在有限的时间】内存在多种状态变化

  • 优点:节约空间(一份订单只有一条数据)

  • 举例:

原始表订单:
		order_id,order_timestamp,user_id,order_status
		1,2024_01_21 16:12:37.259,87986321,0
		1,2024_01_21 16:12:47.003,87986321,1
		1,2024_01_22 09:00:28.022,87986321,2
		1,2024_01_24 15:00:00.123,87986321,3
		1,2024_02_01 00:30:00.227,87986321,4
		order_detail_id,fk_order_id,goods_id,buy_count,goods_price

		拉链表订单:
		order_id,user_id,order_create_timestamp,order_modify_timestamp,order_amount,order_current_status
		1,87986321,2024_01_21 16:12:37.259,2024_02_01 00:30:00.227,3242.66,4
  • 配置:
set hive.support.concurrency=true;
set hive.enforce.bucketing=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
set hive.compactor.initiator.on=true; -- 表合并开启
set hive.compactor.worker.threads=1; -- 表合并线程必须为一
set hive.auto.convert.join=false; -- 关闭 mapjoin
set hive.merge.cardinality.check=false; -- 关闭检查数据列的基数(列值的差异性)
set mapreduce.job.reduces=4;
  • 拉链表实例:
// 创建原始表格
	create table yb12211_2.hive_zipper_order(
		order_id bigint,
		user_id bigint,
		order_modify_dt timestamp,
		order_money decimal(10,2),
		current_status int
	)
	row format delimited fields terminated by ',';
	// 将数据文件导入原始表格
	load data local inpath '/root/hive/data/course/order_record.log'
	overwrite into table yb12211_2.hive_zipper_order;
	
	// 创建拉链表
	// 操作历史全量数据用动态分区
	set hive.support.concurrency=true;
	set hive.enforce.bucketing=true;
	set hive.exec.dynamic.partition.mode=nonstrict;
	set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
	set hive.compactor.initiator.on=true;
	set hive.compactor.worker.threads=1;
	set hive.auto.convert.join=false;
	set hive.merge.cardinality.check=false;
	set mapreduce.job.reduces=4;

	drop table if exists yb12211_2.hive_zipper_pc_order;
	create table yb12211_2.hive_zipper_pc_order(
		order_id bigint,
		user_id bigint,
		order_create_dt timestamp,
		order_modify_dt timestamp,
		order_money decimal(10,2),
		current_status int
	) partitioned by(year int,month int,day int)
	clustered by(order_create_dt) into 4 buckets
	row format delimited fields terminated by ','
	stored as orc
	tblproperties("transactional"="true");
	
	// 对拉链表的数据进行聚合,获取订单信息的创建日期、修改日期和订单状态
	with zip_src as (
		select order_id,user_id,order_money,
			min(order_modify_dt) as order_create_dt,
			max(order_modify_dt) as order_modify_dt,
			max(current_status) as current_status
		from yb12211_2.hive_zipper_order
		group by order_id,user_id,order_money
	)
	
	// 将原始数据灌入拉链表
	insert overwrite table yb12211_2.hive_zipper_pc_order partition(year,month,day)
	select
		order_id,
		user_id,
		order_create_dt,
		order_modify_dt,
		order_money,
		current_status,
		year(order_create_dt) as year,
		month(order_create_dt) as month,
		day(order_create_dt) as day
	from zip_src;
	
	// 拉链表查询 查询之前必须先有这两句配置
	set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
	set hive.support.concurrency=true;
	select * from yb12211_2.hive_zipper_pc_order
	where to_date(order_modify_dt)='2021-02-04'
	order by order_modify_dt desc;
	
	// 对于追加增量数据,将增量数据覆盖在原始数据表中
	load data local inpath '/root/hive/data/course/order_record_2021_02_05.log'
	overwrite into table yb12211_2.hive_zipper_order;	
	
	// 将原始数据表中的增量数据插入拉链表
	// 利用源数据和目标表的order_id进行匹配,若匹配则更新现有订单信息,若不匹配则插入新订单。
	merge into yb12211_2.hive_zipper_pc_order as O
	using (
		select
			order_id,
			user_id,
			order_create_dt,
			order_modify_dt,
			order_money,
			current_status,
			year(order_create_dt) as year,
			month(order_create_dt) as month,
			day(order_create_dt) as day
		from (
			select order_id,user_id,order_money,
				min(order_modify_dt) as order_create_dt,
				max(order_modify_dt) as order_modify_dt,
				max(current_status) as current_status
			from yb12211_2.hive_zipper_order
			group by order_id,user_id,order_money
		)T
	) as H
	on O.order_id=H.order_id
	when matched then
	update set order_modify_dt=H.order_modify_dt,current_status=H.current_status
	when not matched then
	insert values(H.order_id,H.user_id,H.order_create_dt,H.order_modify_dt,H.order_money,H.current_status,H.year,H.month,H.day);
	
	// 验证拉链结果:最后修改时间是否大于创建时间
	select * from yb12211_2.hive_zipper_pc_order
	where to_date(order_modify_dt)>to_date(order_create_dt);
  • 验证数据变化的三种情况:
    • 新增数据,插入原始表中的所有字段信息。
    • 更改数据,更改修改时间|结束时间|数据状态。
    • 删除数据:只需将结束日期改为删除当天即可。

构造增量数据

此处提供了订单日增量数据的自动生成代码,读者可利用代码实现对增量数据文件的生成,以便于体验拉链表的作用。

  • HiveZipMaker类
package cn.ybg;

import java.io.*;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;


public class HiveZipTableMaker {
    static Properties pro = new Properties();
    static Random rand = new Random();
    static Calendar calendar = Calendar.getInstance(Locale.CHINA);
    static SimpleDateFormat timeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:dd.SSS");
    static SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
    static long orderId = 0;
    static long timestamp = 0;
    static String order_log = "order/order_log.properties";
    static String order_record = "order/order_record_2021_02_05.log";
    static String order_list = "order/order_list.log";
    static BufferedWriter bw;
    static long size;
    static LinkedList<Order> orders;

    static {
        try {
            pro.load(new FileReader(order_log));
            timestamp = Long.parseLong(pro.getProperty("timestamp"));
            calendar.setTimeInMillis(timestamp);
            orderId = Long.parseLong(pro.getProperty("orderId"));
            size = new File(order_record).length();
            bw = new BufferedWriter(new FileWriter(order_record,true));
            File orderList = new File(order_list);
            if (orderList.exists() && orderList.length()>0){
                ObjectInputStream ois = new ObjectInputStream(new FileInputStream(orderList));
                orders = (LinkedList<Order>) ois.readObject();
                ois.close();
            }else{
                orders = new LinkedList<>();
            }
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(-1);
        }
    }

    static int userId(){
        return rand.nextInt(10000000)+1;
    }

    static long orderId(){
        return ++orderId;
    }

    static Date now(){
        calendar.add(Calendar.MILLISECOND,rand.nextInt(5000));
        return calendar.getTime();
    }

    static void write(String line) throws IOException {
        if (orderId>1){
            bw.newLine();
        }
        bw.write(line);
    }

    static void close(){
        try {
            bw.close();
            pro.setProperty("timestamp",String.valueOf(timestamp));
            pro.setProperty("orderId",String.valueOf(orderId));
            pro.store(new FileWriter(order_log,false),timeFormat.format(new Date()));
            if (orders.size()>0){
                ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(order_list,false));
                oos.writeObject(orders);
                oos.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    static double randMoney(){
        return new BigDecimal(rand.nextInt(500000)/100+10)
                .setScale(2, RoundingMode.HALF_UP)
                .doubleValue();
    }

    static boolean percentage(){
        return rand.nextInt(100)<65;
    }

    public static void main(String[] args) throws IOException, ParseException {
        /**
         * 订单编号:
         * 用户编号:
         * 操作日期:
         * 订单金额:
         * 当前状态:0(suspending),1(paid),2(signed),3(completed),4(return)
         */

        final long TWO_WEEKS = 14*24*60*60*1000;
        String format = dateFormat.format(now());
        Order order;
        Date now;
        Date dateEnd = dateFormat.parse("2021-02-06");
        while ((now = now()).before(dateEnd)){
            timestamp = now.getTime();
            String nowFormat = dateFormat.format(now);
            if (!format.equals(nowFormat)){
                format = nowFormat;
                if (orders.size()>0) {
                    Date finalNow = now;
                    orders.removeIf(o2-> finalNow.getTime()-o2.getModifyDate().getTime()>TWO_WEEKS);
                    orders.remove(rand.nextInt(orders.size()));
                }
            }
            if (percentage() && orders.size()>=20+rand.nextInt(21)) {
                for (int j = 0; j < rand.nextInt(orders.size()/15) ; j++){
                    int index = rand.nextInt(orders.size());
                    order = orders.get(index);
                    if (order.getStatus()<2){
                        order.setStatus(order.getStatus()+1);
                    }else{
                        order.setStatus(percentage()?3:4);
                        orders.remove(index);
                    }
                    order.setModifyDate(now);
                    write(order.toString());
                }
            }else{
                orderId = orderId();
                int userId = userId();
                double money = randMoney();
                order = new Order(orderId, userId, now, money, 0);
                orders.add(order);
                write(order.toString());
            }
        }
        close();
    }
}

  • Order类
package cn.ybg;

import java.io.Serializable;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * 订单编号:
 * 用户编号:
 * 操作日期:
 * 订单金额:
 * 当前状态:0(suspending),1(paid),2(signed),3(completed),4(return)
 */
public class Order implements Serializable {
    static SimpleDateFormat timeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:dd.SSS");
    private long orderId;
    private int userId;
    private Date modifyDate;
    private double money;
    private int status;

    public Order(long orderId, int userId, Date modifyDate, double money, int status) {
        this.orderId = orderId;
        this.userId = userId;
        this.modifyDate = modifyDate;
        this.money = money;
        this.status = status;
    }

    public long getOrderId() {
        return orderId;
    }

    public void setOrderId(long orderId) {
        this.orderId = orderId;
    }

    public int getUserId() {
        return userId;
    }

    public void setUserId(int userId) {
        this.userId = userId;
    }

    public Date getModifyDate() {
        return modifyDate;
    }

    public void setModifyDate(Date modifyDate) {
        this.modifyDate = modifyDate;
    }

    public double getMoney() {
        return money;
    }

    public void setMoney(double money) {
        this.money = money;
    }

    public int getStatus() {
        return status;
    }

    public void setStatus(int status) {
        this.status = status;
    }

    @Override
    public String toString() {
        return String.format("%d,%d,%s,%.2f,%d",orderId,userId,timeFormat.format(modifyDate),money,status);
    }
}

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。