实时即未来,大数据项目车联网之原始数据实时ETL任务HBase调优(9)

举报
Maynor学长 发表于 2022/10/31 12:29:27 2022/10/31
【摘要】 theme: smartblue持续创作,加速成长!这是我参与「掘金日新计划 · 10 月更文挑战」的第9天,点击查看活动详情 1. 原始数据实时ETL任务HBase调优 1.1 数据写入hbase优化上一节写入数据,一条条数据put到表中,对于大量数据的写入,效率极低,因此针对此项进行优化使用hbase客户端写缓存进行批量写入数据到hbase中hbase客户端写缓存对象:Buffer...

theme: smartblue

持续创作,加速成长!这是我参与「掘金日新计划 · 10 月更文挑战」的第9天,点击查看活动详情

1. 原始数据实时ETL任务HBase调优

1.1 数据写入hbase优化

上一节写入数据,一条条数据put到表中,对于大量数据的写入,效率极低,因此针对此项进行优化

  • 使用hbase客户端写缓存进行批量写入数据到hbase中

  • hbase客户端写缓存对象:BufferedMutator

hbase的每一次put操作写入数据,实际上是一个RPC操作,将客户端的数据传输到hbase服务器再返回结果,适合小数据量写入,当需要写入大量数据时,每一次put连接一次RPC连接,会增加连接次数,因此会增加传输时间和IO消耗

BufferedMutator通过mutate方法提交数据,flush方法可以强制刷新缓冲区提交数据,最后执行close方法也会刷新缓冲区

使用BufferedMutator数据写与直接写数据到hbase的区别:

image-20221018103117195

  • 写入步骤与第一节类似,这里针对hbase客户端写入数据方式进行优化

1.2 优化后的代码实现

  • 自定义sink,继承RichSinkFunction类

  • json解析成功后需要存到hbase的数据类型为:

img

  • 创建全局变量,应用于程序逻辑实现
public class SourceDataHBaseSinkOptimize extends RichSinkFunction<ItcastDataObj> {
    private Logger logger = LoggerFactory.getLogger("SourceDataHBaseSinkOptimize");

    private String tableName;
    private Connection conn = null;
    // hbase客户端中的数据写缓存对象
    private BufferedMutator mutator = null;
    private String cf = "cf";

    public SourceDataHBaseSinkOptimize(String tableName) {
        this.tableName = tableName;
    }
}
  • 重写方法,实现数据写入hbase逻辑

  • 重写open方法:加载资源配置

创建hbase配置对象,设置hbase配置信息,添加客户端相关配置

跟据hbase的连接地址,获得hbase连接

根据hbase连接和表名,获得hbase客户端Table对象

@Override
public void open(Configuration parameters) throws Exception {
    org.apache.hadoop.conf.Configuration config = HBaseConfiguration.create();
    config.set("hbase.zookeeper.property.clientPort", ConfigLoader.getProperty("zookeeper.clientPort"));
    config.set("hbase.zookeeper.quorum", ConfigLoader.getProperty("zookeeper.quorum"));

    TableName tname = TableName.valueOf(tableName);
    config.set(TableInputFormat.INPUT_TABLE, tname.getNameAsString());

    conn = ConnectionFactory.createConnection(config);
    BufferedMutatorParams params = new BufferedMutatorParams(tname);
    //设置缓存10MB,当达到10MB时数据会自动刷到HBase
    params.writeBufferSize(1024 * 1024 * 10);
    // 强制缓冲区提交数据
    mutator = conn.getBufferedMutator(params);
}	
  • 重写invoke方法:数据写入hbase逻辑

设计hbase的rowkey、根据列族封装Put对象

把封装好的Put对象,写入HBase表中

@Override
public void open(Configuration parameters) throws Exception {
    org.apache.hadoop.conf.Configuration config = HBaseConfiguration.create();
    config.set("hbase.zookeeper.property.clientPort", ConfigLoader.getProperty("zookeeper.clientPort"));
    config.set("hbase.zookeeper.quorum", ConfigLoader.getProperty("zookeeper.quorum"));

    TableName tname = TableName.valueOf(tableName);
    config.set(TableInputFormat.INPUT_TABLE, tname.getNameAsString());

    conn = ConnectionFactory.createConnection(config);
    BufferedMutatorParams params = new BufferedMutatorParams(tname);
    //设置缓存10MB,当达到10MB时数据会自动刷到HBase
    params.writeBufferSize(1024 * 1024 * 10);
    // 强制缓冲区提交数据
    mutator = conn.getBufferedMutator(params);
}

由于hbasePut方法中属性比较多,提取到文件中:讲义关联资料\hbasePut方法.md

  • 重写close方法:销毁对象,释放资源

遵循“先创建后关闭“原则

@Override
public void close() throws Exception {
    if (mutator != null) {
        // 也会强制缓冲区提交数据
        mutator.close();
    }
    if (conn != null || !conn.isClosed()) {
        conn.close();
    }
}
  • 在KafkaSourceDataTask主任务中调用

  • 创建自定义sink对象,同时传入参数:hbase表名

// 定义hbaseSink
SourceDataHBaseSinkOptimize hbaseSinkOptimize  = new SourceDataHBaseSinkOptimize("itcast_src");
// 保存原始数据存入 hbase
successJsonData.addSink(hbaseSinkOptimize);

1.3 数据写入hbase进行预分区

针对原始数据实时ETL落地hbase,设置好预分区,解决数据热点问题,当已知rowkey的值的时候,设置预分区可以根据rowkey的区间进行分区

  • 创建预分区语法

create ‘itcast_src’,‘cf’,{SPLITS => [‘10000’,‘50000’]}

  • 预分区的含义

  • rowkey所在的范围

分区1: startKey 为空,endKey为10000

分区2: startKey 10000,endKey为50000

分区3: startKey 50000,endKey为空

1.4 数据写入hbase预写日志(WAL)

1.4.1 预写日志(WAL)

预写日志(WAL)最重要的作用是容灾,当服务器发生宕机,我们可以通过hbase客户端操作时预写到服务器上的日志(内存和hdfs上)恢复宕机之前的数据,如果预写日志失败,客户端的操作也将失败

image-20221018103142925

  • 当客户端发起修改数据操作请求:put新增数据、delete删除数据、incr增量计数器

  • 每一个修改动作被封装成Key/Value对象,通过RPC请求到master服务

l hbase服务master将单个或批量请求发送给匹配region的RegionServer处理

  • 数据先被写入到WAL,然后放到实际拥有记录的存储文件内存中(MemStore)

  • 当内存达到一定大小,数据会异步持续写入到文件系统中(HDFS)

1.4.2 memstore在hbase读写中的作用

l RegionServer收到写请求时,会将请求定向到特定的Region。每个区域存储一组行。行数据可以分为多个列族。特定列族的数据存储在HStore中,该HStore由Memstore和一组HFiles组成。Memstore保留在RegionServer主存储器中,而HFiles被写入HDFS。处理写请求后,首先将数据写入Memstore。然后,当满足某些阈值时(主内存是有限的),Memstore数据将刷新到HFile中。

image-20221018103157587

  • 针对hbase优化,可以从数据写入方式,预分区,预写日志等方面着手,思考如何在同一磁盘空间上,存储更多的数据?

1.5 数据写入hbase合理使用压缩与编码

l HBase基于列簇存储数据,列簇是表的schema信息的一部分,列是基于列簇为前缀加上属性存储,因此对hbase表进行编码压缩,可以认为是对列数据的压缩。

注意:数据Block编码/压缩可以一起使用在同一个列簇。

  • 官方说明:

https://hbase.apache.org/book.html#compression

  • 编码压缩的优势:

  • 默认建表不启用压缩编码,对hbase开发人员不友好

  • 设置编码压缩能够节约磁盘占用空间,节约硬件磁盘存储费用

  • 编码压缩通常能提高系统吞吐率,提高服务器系统性能

  • 编码类型

  • Prefix

键非常相似。具体而言,keys通常共享一个公共前缀,并且仅在末尾有所不同。例如:一个键可能是RowKey:Family:Qualifier0,下一个键可能是RowKey:Family:Qualifier1。

在prefix编码中,添加了一个额外的列,用于保存当前key和上一个key之间共享的前缀长度。假设此处的第一个键与之前的键完全不同,则其前缀长度为0。

  • 没有编码的ColumnFamily

img

具有prefix编码的ColumnFamily

  • 第二个键的前缀长度为23,因为它们具有前23个共同的字符。

  • 显然,如果密钥之间没有共同之处,那么前缀将不会带来太多好处。

img

  • Diff

diff编码在前缀编码的基础上扩展。代替顺序地将密钥视为一个整体的字节序列,可以拆分每个密钥字段,以便可以更有效地压缩密钥的每个部分。

添加了两个新字段:timestamp和type

如果ColumnFamily与上一行相同,则将其从当前行中省略。如果键的长度,值的长度或类型与上一行相同,则省略该字段。

此外,为了增加压缩率,时间戳记将存储为前一行时间戳记的Diff,而不是全部存储。默认情况下,Diff编码是禁用的,因为写入和扫描速度较慢,但是会缓存更多数据。

具有Diff编码的ColumnFamily

img

  • Fast Diff

Fast Diff与Diff相似,但是使用了更快的实现。它还添加了另一个字段,该字段存储一个位以跟踪数据本身是否与上一行相同。如果是,则不会再次存储数据。

如果您有长键或多列,建议使用Fast Diff编解码器。

数据格式几乎与Diff编码相同

  • Prefix Tree

前缀树编码是HBase 0.96中的一项实验功能,在hbase-2.0.0中将其删除,当前版本中已被删除

它提供了与前缀,差异和快速差异编码器类似的节省内存,但以较低的编码速度为代价提供了更快的随机访问。在hbase-2.0.0中将其删除。

  • 编解码器内置在HBase中,因此不需要额外的配置。通过设置DATA_BLOCK_ENCODING属性在表上启用编解码器。在更改其DATA_BLOCK_ENCODING设置之前,请禁用该表。

查看表的编码:

img

更改表的编码

alter ‘itcast_src’, { NAME => ‘cf’, DATA_BLOCK_ENCODING => ‘FAST_DIFF’ }

  • 压缩算法

  • none

默认创建表压缩方式:数据存储无压缩设置

  • GZ

CLASSPATH上有本地Hadoop库,HBase使用Java的内置GZip支持

已配置好

  • 使用”gz”压缩格式创建原始数据实时ETL写入hbase表

create ‘itcast_src’,{NAME => ‘cf’,COMPRESSION => ‘gz’}

alter ‘itcast_src’,{NAME => ‘cf’,COMPRESSION => ‘gz’}

效果:http://node01:60010/

  1. 找到Tables Details

img

  1. 进入Tables Details

img

  1. 选择对应的表进入,找到Table Regions的表存储的RegionServer

img

  1. 进入表所在的RegionServer,查看Storefile Metrics(如果有多个region,则重复第3步和本步骤)

img

  • Snappy

snappy压缩需要手动在hadoop与snappy中配置

  • 已配置好

  • 配置参考:

http://archive.cloudera.com/cdh5/cdh/5/hbase-0.98.6-cdh5.3.1/book/apds03.html#compressor.install

  • 使用”snappy”压缩格式创建原始数据实时ETL写入hbase表

create ‘itcast_src_snappy’, { NAME => ‘cf’, COMPRESSION => ‘SNAPPY’ }

  • 效果

img

  • LZO

HBase无法与LZO一起使用,因为使用Apache软件许可证(ASL)的HBase和使用GPL许可证的LZO之间不兼容

未配置

  • LZ4

LZ4支持与Hadoop捆绑在一起。启动HBase时,请确保可访问hadoop共享库(libhadoop.so

  • 已配置好

  • 使用”lz4”压缩格式创建原始数据实时ETL写入hbase表

create ‘itcast_src_lz4’, { NAME => ‘cf’, COMPRESSION => ‘LZ4’ }

  • 效果

img

  • 压缩方式对比(参考)

同等数据98192 row(s),虚拟机服务环境(6g{node01},3g{node02},3g{node03})压缩率顺序为gz > snappy > lz4

压缩率对比,还应该看压数据总量,压缩速度和解压速度,也就是压缩和解压消耗的时间

数据量更大的情况下,可能snappy压缩率更高,按实际情况测试选择压缩算法

  • 思考:原始数据合理存储到hive和hbase上之后,如何利用现有数据进行分析,分析哪些指标,用户关心的、车企关心的、政府车辆监控关心的指标有哪些?
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。