前言
刚刚开始接触Flink,由于之前有点HBase经验,便想着使用Flink去做个简单的HBase读写程序,后续的Flink学习也就都基于HBase来做。
准备
部署一个单机版的HBase,单机版部署HBase可以参考https://bbs.huaweicloud.com/blogs/197806,在HBase中创建两张表,一个是T2,一个是T3,这个例子便是使用Flink从T2表把数据读取出来并写入到T3表中,我们提前在T2表中写入一定量的数据。
Flink就不单独部署了,这里例子中,我们使用IDE启动的方式,方便调试。
实现Flink写HBase的应用代码
1.建立一个maven工程,pom.xml中加入如下依赖,其中加入org.apache.flink:flink-dist_2.11是
为了在IDE里面运行Flink mini cluster
<dependencies> <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>${commons-logging.version}</version> </dependency> <dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> <version>${commons-lang.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-common</artifactId> <version>${hbase.version}</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>${zookeeper.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>${hbase.version}</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>${guava.version}</version> </dependency> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>${protobuf.version}</version> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>${gson.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-dist_2.11</artifactId> <version>${flink.version}</version> </dependency> |
2. 实现一个读取HBase表的自定义Source
实现一个简单的自定义Source,从HBase表读取数据,
public class HBaseReaderSource extends RichSourceFunction<Tuple2<String, List<Cell>>>
|
open方法里面初始化HBase的连接和HTable
@Override
public void open(Configuration parameters) throws Exception {
System.out.println("Read source open");
super.open(parameters);
org.apache.hadoop.conf.Configuration hbaseConf = HBaseConfiguration.create();
conn = ConnectionFactory.createConnection(hbaseConf);
ParameterTool parameterTool = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
String sourceTable = parameterTool.get(HBaseReaderStream.CONF_FROM_TABLE,"T2");
table = conn.getTable(TableName.valueOf(sourceTable));
scan = new Scan();
}
|
Run方法里面实现简单的读数据逻辑
@Override
public void run(SourceContext<Tuple2<String, List<Cell>>> sourceContext) throws Exception {
System.out.println("Read source run");
ResultScanner scanner = table.getScanner(scan);
Iterator<Result> iterator = scanner.iterator();
while (iterator.hasNext()) {
Result result = iterator.next();
String rowkey = Bytes.toString(result.getRow());
List<Cell> cells = result.listCells();
Tuple2<String, List<Cell>> tuple2 = new Tuple2<>();
tuple2.setFields(rowkey, cells);
sourceContext.collect(tuple2);
}
}
|
cancel方法中释放htable
@Override
public void cancel() {
try {
if (table != null) {
table.close();
}
if (conn != null) {
conn.close();
}
} catch (IOException e) {
logger.error("Close HBase Exception:", e.toString());
}
}
|
3. 实现一个写入HBase表的自定义Sink
实现一个简单的自定义Sink,向HBase目的表写入数据
public class HBaseWriterSink extends RichSinkFunction<Tuple2<String, List<Cell>>>
|
Open方法和Close方法和上面source的写法类似,就不重复写了
Invoke方法实现写入HBase表
@Override
public void invoke(Tuple2<String, List<Cell>> value, Context context) throws Exception {
Put put = new Put(Bytes.toBytes(value.f0));
List<Cell> cells = value.f1;
for (Cell cell : cells) {
put.add(cell);
}
table.put(put);
}
|
4. 应用的启动类
写一个flink应用的启动类,主要是设置source和sink,然后执行任务
Main函数主要代码为:
ParameterTool parameterTool = ParameterTool.fromArgs(args);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameterTool);
env.enableCheckpointing(5000);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
System.out.println("HBase Reader add source");
DataStreamSource<Tuple2<String, List<Cell>>> stream = env.addSource(new HBaseReaderSource());
System.out.println("HBase Reader add sink");
stream.addSink(new HBaseWriterSink());
env.execute();
|
工程中配置hbase-site.xml
在工程中加入一个hbase-site.xml,用于连接HBase,这里因为是非kerberos集群,只需要配置zookeeper地址即可
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<property>
<name>hbase.zookeeper.quorum</name>
<value>ecs-XXXXXX</value>
</property>
</configuration>
|
启动Flink应用
1. 工程写好了之后,可以将此工程打包,使用flink客户端提交的方式去运行,这里我们为了方便,就直接在本地的IDEA里面直接运行程序,
本地运行会启动一个Flink mini cluster,调试起来比较方便,在Main函数的类上直接执行
2. 在hbase中查看,T2表为源表,T3表为目的表,数据已经完完整整的写入到了T3表
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
评论(0)