实时即未来,大数据项目车联网之原始数据实时ETL任务HBase调优(9)
theme: smartblue
持续创作,加速成长!这是我参与「掘金日新计划 · 10 月更文挑战」的第9天,点击查看活动详情
1. 原始数据实时ETL任务HBase调优
1.1 数据写入hbase优化
上一节写入数据,一条条数据put到表中,对于大量数据的写入,效率极低,因此针对此项进行优化
-
使用hbase客户端写缓存进行批量写入数据到hbase中
-
hbase客户端写缓存对象:BufferedMutator
hbase的每一次put操作写入数据,实际上是一个RPC操作,将客户端的数据传输到hbase服务器再返回结果,适合小数据量写入,当需要写入大量数据时,每一次put连接一次RPC连接,会增加连接次数,因此会增加传输时间和IO消耗
BufferedMutator通过mutate方法提交数据,flush方法可以强制刷新缓冲区提交数据,最后执行close方法也会刷新缓冲区
使用BufferedMutator数据写与直接写数据到hbase的区别:
- 写入步骤与第一节类似,这里针对hbase客户端写入数据方式进行优化
1.2 优化后的代码实现
-
自定义sink,继承RichSinkFunction类
-
json解析成功后需要存到hbase的数据类型为:
- 创建全局变量,应用于程序逻辑实现
public class SourceDataHBaseSinkOptimize extends RichSinkFunction<ItcastDataObj> {
private Logger logger = LoggerFactory.getLogger("SourceDataHBaseSinkOptimize");
private String tableName;
private Connection conn = null;
// hbase客户端中的数据写缓存对象
private BufferedMutator mutator = null;
private String cf = "cf";
public SourceDataHBaseSinkOptimize(String tableName) {
this.tableName = tableName;
}
}
-
重写方法,实现数据写入hbase逻辑
-
重写open方法:加载资源配置
创建hbase配置对象,设置hbase配置信息,添加客户端相关配置
跟据hbase的连接地址,获得hbase连接
根据hbase连接和表名,获得hbase客户端Table对象
@Override
public void open(Configuration parameters) throws Exception {
org.apache.hadoop.conf.Configuration config = HBaseConfiguration.create();
config.set("hbase.zookeeper.property.clientPort", ConfigLoader.getProperty("zookeeper.clientPort"));
config.set("hbase.zookeeper.quorum", ConfigLoader.getProperty("zookeeper.quorum"));
TableName tname = TableName.valueOf(tableName);
config.set(TableInputFormat.INPUT_TABLE, tname.getNameAsString());
conn = ConnectionFactory.createConnection(config);
BufferedMutatorParams params = new BufferedMutatorParams(tname);
//设置缓存10MB,当达到10MB时数据会自动刷到HBase
params.writeBufferSize(1024 * 1024 * 10);
// 强制缓冲区提交数据
mutator = conn.getBufferedMutator(params);
}
- 重写invoke方法:数据写入hbase逻辑
设计hbase的rowkey、根据列族封装Put对象
把封装好的Put对象,写入HBase表中
@Override
public void open(Configuration parameters) throws Exception {
org.apache.hadoop.conf.Configuration config = HBaseConfiguration.create();
config.set("hbase.zookeeper.property.clientPort", ConfigLoader.getProperty("zookeeper.clientPort"));
config.set("hbase.zookeeper.quorum", ConfigLoader.getProperty("zookeeper.quorum"));
TableName tname = TableName.valueOf(tableName);
config.set(TableInputFormat.INPUT_TABLE, tname.getNameAsString());
conn = ConnectionFactory.createConnection(config);
BufferedMutatorParams params = new BufferedMutatorParams(tname);
//设置缓存10MB,当达到10MB时数据会自动刷到HBase
params.writeBufferSize(1024 * 1024 * 10);
// 强制缓冲区提交数据
mutator = conn.getBufferedMutator(params);
}
由于hbasePut方法中属性比较多,提取到文件中:讲义关联资料\hbasePut方法.md
- 重写close方法:销毁对象,释放资源
遵循“先创建后关闭“原则
@Override
public void close() throws Exception {
if (mutator != null) {
// 也会强制缓冲区提交数据
mutator.close();
}
if (conn != null || !conn.isClosed()) {
conn.close();
}
}
-
在KafkaSourceDataTask主任务中调用
-
创建自定义sink对象,同时传入参数:hbase表名
// 定义hbaseSink
SourceDataHBaseSinkOptimize hbaseSinkOptimize = new SourceDataHBaseSinkOptimize("itcast_src");
// 保存原始数据存入 hbase
successJsonData.addSink(hbaseSinkOptimize);
1.3 数据写入hbase进行预分区
针对原始数据实时ETL落地hbase,设置好预分区,解决数据热点问题,当已知rowkey的值的时候,设置预分区可以根据rowkey的区间进行分区
- 创建预分区语法
create ‘itcast_src’,‘cf’,{SPLITS => [‘10000’,‘50000’]}
-
预分区的含义
-
rowkey所在的范围
分区1: startKey 为空,endKey为10000
分区2: startKey 10000,endKey为50000
分区3: startKey 50000,endKey为空
1.4 数据写入hbase预写日志(WAL)
1.4.1 预写日志(WAL)
预写日志(WAL)最重要的作用是容灾,当服务器发生宕机,我们可以通过hbase客户端操作时预写到服务器上的日志(内存和hdfs上)恢复宕机之前的数据,如果预写日志失败,客户端的操作也将失败
-
当客户端发起修改数据操作请求:put新增数据、delete删除数据、incr增量计数器
-
每一个修改动作被封装成Key/Value对象,通过RPC请求到master服务
l hbase服务master将单个或批量请求发送给匹配region的RegionServer处理
-
数据先被写入到WAL,然后放到实际拥有记录的存储文件内存中(MemStore)
-
当内存达到一定大小,数据会异步持续写入到文件系统中(HDFS)
1.4.2 memstore在hbase读写中的作用
l RegionServer收到写请求时,会将请求定向到特定的Region。每个区域存储一组行。行数据可以分为多个列族。特定列族的数据存储在HStore中,该HStore由Memstore和一组HFiles组成。Memstore保留在RegionServer主存储器中,而HFiles被写入HDFS。处理写请求后,首先将数据写入Memstore。然后,当满足某些阈值时(主内存是有限的),Memstore数据将刷新到HFile中。
- 针对hbase优化,可以从数据写入方式,预分区,预写日志等方面着手,思考如何在同一磁盘空间上,存储更多的数据?
1.5 数据写入hbase合理使用压缩与编码
l HBase基于列簇存储数据,列簇是表的schema信息的一部分,列是基于列簇为前缀加上属性存储,因此对hbase表进行编码压缩,可以认为是对列数据的压缩。
注意:数据Block编码/压缩可以一起使用在同一个列簇。
- 官方说明:
https://hbase.apache.org/book.html#compression
-
编码压缩的优势:
-
默认建表不启用压缩编码,对hbase开发人员不友好
-
设置编码压缩能够节约磁盘占用空间,节约硬件磁盘存储费用
-
编码压缩通常能提高系统吞吐率,提高服务器系统性能
-
编码类型
-
Prefix
键非常相似。具体而言,keys通常共享一个公共前缀,并且仅在末尾有所不同。例如:一个键可能是RowKey:Family:Qualifier0,下一个键可能是RowKey:Family:Qualifier1。
在prefix编码中,添加了一个额外的列,用于保存当前key和上一个key之间共享的前缀长度。假设此处的第一个键与之前的键完全不同,则其前缀长度为0。
- 没有编码的ColumnFamily
具有prefix编码的ColumnFamily
-
第二个键的前缀长度为23,因为它们具有前23个共同的字符。
-
显然,如果密钥之间没有共同之处,那么前缀将不会带来太多好处。
- Diff
diff编码在前缀编码的基础上扩展。代替顺序地将密钥视为一个整体的字节序列,可以拆分每个密钥字段,以便可以更有效地压缩密钥的每个部分。
添加了两个新字段:timestamp和type
如果ColumnFamily与上一行相同,则将其从当前行中省略。如果键的长度,值的长度或类型与上一行相同,则省略该字段。
此外,为了增加压缩率,时间戳记将存储为前一行时间戳记的Diff,而不是全部存储。默认情况下,Diff编码是禁用的,因为写入和扫描速度较慢,但是会缓存更多数据。
具有Diff编码的ColumnFamily
- Fast Diff
Fast Diff与Diff相似,但是使用了更快的实现。它还添加了另一个字段,该字段存储一个位以跟踪数据本身是否与上一行相同。如果是,则不会再次存储数据。
如果您有长键或多列,建议使用Fast Diff编解码器。
数据格式几乎与Diff编码相同
- Prefix Tree
前缀树编码是HBase 0.96中的一项实验功能,在hbase-2.0.0中将其删除,当前版本中已被删除。
它提供了与前缀,差异和快速差异编码器类似的节省内存,但以较低的编码速度为代价提供了更快的随机访问。在hbase-2.0.0中将其删除。
- 编解码器内置在HBase中,因此不需要额外的配置。通过设置DATA_BLOCK_ENCODING属性在表上启用编解码器。在更改其DATA_BLOCK_ENCODING设置之前,请禁用该表。
查看表的编码:
更改表的编码
alter ‘itcast_src’, { NAME => ‘cf’, DATA_BLOCK_ENCODING => ‘FAST_DIFF’ }
-
压缩算法
-
none
默认创建表压缩方式:数据存储无压缩设置
- GZ
CLASSPATH上有本地Hadoop库,HBase使用Java的内置GZip支持
已配置好
- 使用”gz”压缩格式创建原始数据实时ETL写入hbase表
create ‘itcast_src’,{NAME => ‘cf’,COMPRESSION => ‘gz’}
alter ‘itcast_src’,{NAME => ‘cf’,COMPRESSION => ‘gz’}
- 找到Tables Details
- 进入Tables Details
- 选择对应的表进入,找到Table Regions的表存储的RegionServer
- 进入表所在的RegionServer,查看Storefile Metrics(如果有多个region,则重复第3步和本步骤)
- Snappy
snappy压缩需要手动在hadoop与snappy中配置
-
已配置好
-
配置参考:
http://archive.cloudera.com/cdh5/cdh/5/hbase-0.98.6-cdh5.3.1/book/apds03.html#compressor.install
- 使用”snappy”压缩格式创建原始数据实时ETL写入hbase表
create ‘itcast_src_snappy’, { NAME => ‘cf’, COMPRESSION => ‘SNAPPY’ }
- 效果
- LZO
HBase无法与LZO一起使用,因为使用Apache软件许可证(ASL)的HBase和使用GPL许可证的LZO之间不兼容
未配置
- LZ4
LZ4支持与Hadoop捆绑在一起。启动HBase时,请确保可访问hadoop共享库(libhadoop.so)
-
已配置好
-
使用”lz4”压缩格式创建原始数据实时ETL写入hbase表
create ‘itcast_src_lz4’, { NAME => ‘cf’, COMPRESSION => ‘LZ4’ }
- 效果
- 压缩方式对比(参考)
同等数据98192 row(s),虚拟机服务环境(6g{node01},3g{node02},3g{node03})压缩率顺序为gz > snappy > lz4
压缩率对比,还应该看压数据总量,压缩速度和解压速度,也就是压缩和解压消耗的时间
数据量更大的情况下,可能snappy压缩率更高,按实际情况测试选择压缩算法
- 思考:原始数据合理存储到hive和hbase上之后,如何利用现有数据进行分析,分析哪些指标,用户关心的、车企关心的、政府车辆监控关心的指标有哪些?
- 点赞
- 收藏
- 关注作者
评论(0)