初学Flink-使用Flink读写HBase

举报
Lettle whale 发表于 2020/09/29 16:46:27 2020/09/29
【摘要】 Flink初学:读写HBase

前言

刚刚开始接触Flink,由于之前有点HBase经验,便想着使用Flink去做个简单的HBase读写程序,后续的Flink学习也就都基于HBase来做。

准备

部署一个单机版的HBase,单机版部署HBase可以参考https://bbs.huaweicloud.com/blogs/197806,在HBase中创建两张表,一个是T2,一个是T3,这个例子便是使用FlinkT2表把数据读取出来并写入到T3表中,我们提前在T2表中写入一定量的数据。

Flink就不单独部署了,这里例子中,我们使用IDE启动的方式,方便调试。

 

实现FlinkHBase的应用代码

1.建立一个maven工程,pom.xml中加入如下依赖,其中加入org.apache.flink:flink-dist_2.11是
为了在IDE里面运行Flink mini cluster

<dependencies>
      <
dependency>
          <
groupId>commons-logging</groupId>
          <
artifactId>commons-logging</artifactId>
          <
version>${commons-logging.version}</version>
      </
dependency>
      <
dependency>
          <
groupId>commons-lang</groupId>
          <
artifactId>commons-lang</artifactId>
          <
version>${commons-lang.version}</version>
      </
dependency>
      <
dependency>
          <
groupId>org.apache.hbase</groupId>
          <
artifactId>hbase-common</artifactId>
          <
version>${hbase.version}</version>
          <
exclusions>
              <
exclusion>
                  <
groupId>org.apache.zookeeper</groupId>
                  <
artifactId>zookeeper</artifactId>
              </
exclusion>
          </
exclusions>
      </
dependency>
      <
dependency>
          <
groupId>org.apache.zookeeper</groupId>
          <
artifactId>zookeeper</artifactId>
          <
version>${zookeeper.version}</version>
      </
dependency>
      <
dependency>
          <
groupId>org.apache.hbase</groupId>
          <
artifactId>hbase-client</artifactId>
          <
version>${hbase.version}</version>
          <
exclusions>
              <
exclusion>
                  <
groupId>org.apache.zookeeper</groupId>
                  <
artifactId>zookeeper</artifactId>
              </
exclusion>
          </
exclusions>
      </
dependency>
      <
dependency>
          <
groupId>org.apache.hadoop</groupId>
          <
artifactId>hadoop-common</artifactId>
          <
version>${hadoop.version}</version>
      </
dependency>
      <
dependency>
          <
groupId>com.google.guava</groupId>
          <
artifactId>guava</artifactId>
          <
version>${guava.version}</version>
      </
dependency>
      <
dependency>
          <
groupId>com.google.protobuf</groupId>
          <
artifactId>protobuf-java</artifactId>
          <
version>${protobuf.version}</version>
      </
dependency>
      <
dependency>
          <
groupId>com.google.code.gson</groupId>
          <
artifactId>gson</artifactId>
          <
version>${gson.version}</version>
      </
dependency>
      <
dependency>
          <
groupId>org.apache.flink</groupId>
          <
artifactId>flink-streaming-java_2.11</artifactId>
          <
version>${flink.version}</version>
      </
dependency>
      <
dependency>
          <
groupId>org.apache.flink</groupId>
          <
artifactId>flink-dist_2.11</artifactId>
          <
version>${flink.version}</version>
      </
dependency>

 

2.       实现一个读取HBase表的自定义Source

实现一个简单的自定义Source,从HBase表读取数据,

public class HBaseReaderSource extends RichSourceFunction<Tuple2<String, List<Cell>>>

 

open方法里面初始化HBase的连接和HTable

@Override
 
public void open(Configuration parameters) throws Exception {
    System.
out.println("Read source open");
   
super.open(parameters);
    org.apache.hadoop.conf.Configuration hbaseConf = HBaseConfiguration.create();
   
conn = ConnectionFactory.createConnection(hbaseConf);
    ParameterTool parameterTool = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
    String sourceTable = parameterTool.get(HBaseReaderStream.
CONF_FROM_TABLE,"T2");
   
table = conn.getTable(TableName.valueOf(sourceTable));
   
scan = new Scan();
}

Run方法里面实现简单的读数据逻辑

@Override
 
public void run(SourceContext<Tuple2<String, List<Cell>>> sourceContext) throws Exception {
    System.
out.println("Read source run");
    ResultScanner scanner =
table.getScanner(scan);
    Iterator<Result> iterator = scanner.iterator();
   
while (iterator.hasNext()) {
        Result result = iterator.next();
        String rowkey = Bytes.toString(result.getRow());
        List<Cell> cells = result.listCells();
        Tuple2<String, List<Cell>> tuple2 =
new Tuple2<>();
        tuple2.setFields(rowkey, cells);
        sourceContext.collect(tuple2);
    }
}

cancel方法中释放htable

@Override
 
public void cancel() {
   
try {
       
if (table != null) {
           
table.close();
        }
       
if (conn != null) {
           
conn.close();
        }
    }
catch (IOException e) {
       
logger.error("Close HBase Exception:", e.toString());
    }
}

 

3.       实现一个写入HBase表的自定义Sink

实现一个简单的自定义Sink,向HBase目的表写入数据

public class HBaseWriterSink extends RichSinkFunction<Tuple2<String, List<Cell>>>

Open方法和Close方法和上面source的写法类似,就不重复写了

Invoke方法实现写入HBase

@Override
 
public void invoke(Tuple2<String, List<Cell>> value, Context context) throws Exception {
    Put put =
new Put(Bytes.toBytes(value.f0));
    List<Cell> cells = value.
f1;
   
for (Cell cell : cells) {
        put.add(cell);
    }
   
table.put(put);
}

 

4.       应用的启动类

写一个flink应用的启动类,主要是设置sourcesink,然后执行任务

    Main函数主要代码为:

ParameterTool parameterTool = ParameterTool.fromArgs(args);
 
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 
// 参数设置
 
env.getConfig().setGlobalJobParameters(parameterTool);
 
env.enableCheckpointing(
5000);
env.setStreamTimeCharacteristic(TimeCharacteristic.
EventTime);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.
EXACTLY_ONCE);
System.
out.println("HBase Reader add source");
DataStreamSource<Tuple2<String, List<Cell>>> stream = env.addSource(
new HBaseReaderSource());
 
// stream.print().setParallelism(1);
// NO.1 Sink
 
System.out.println("HBase Reader add sink");
stream.addSink(
new HBaseWriterSink());
 
// NO.2 Output
//System.out.println("HBase Reader output format");
//stream.writeUsingOutputFormat(new HBaseOutputFormat());
 
env.execute();

 

工程中配置hbase-site.xml

在工程中加入一个hbase-site.xml,用于连接HBase,这里因为是非kerberos集群,只需要配置zookeeper地址即可

<?xml version="1.0" encoding="UTF-8"?>
 
<configuration>
    <
property>
        <
name>hbase.zookeeper.quorum</name>
        <
value>ecs-XXXXXX</value>
    </
property>
  </
configuration>

 

启动Flink应用

1.       工程写好了之后,可以将此工程打包,使用flink客户端提交的方式去运行,这里我们为了方便,就直接在本地的IDEA里面直接运行程序,

本地运行会启动一个Flink mini cluster,调试起来比较方便,在Main函数的类上直接执行

2.       hbase中查看,T2表为源表,T3表为目的表,数据已经完完整整的写入到了T3


【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。