初学Flink-使用Flink读写HBase

举报
Lettle whale 发表于 2020/09/29 16:46:27 2020/09/29
【摘要】 Flink初学:读写HBase

前言

刚刚开始接触Flink,由于之前有点HBase经验,便想着使用Flink去做个简单的HBase读写程序,后续的Flink学习也就都基于HBase来做。

准备

部署一个单机版的HBase,单机版部署HBase可以参考https://bbs.huaweicloud.com/blogs/197806,在HBase中创建两张表,一个是T2,一个是T3,这个例子便是使用FlinkT2表把数据读取出来并写入到T3表中,我们提前在T2表中写入一定量的数据。

Flink就不单独部署了,这里例子中,我们使用IDE启动的方式,方便调试。

 

实现FlinkHBase的应用代码

1.建立一个maven工程,pom.xml中加入如下依赖,其中加入org.apache.flink:flink-dist_2.11是
为了在IDE里面运行Flink mini cluster

<dependencies>
      <
dependency>
          <
groupId>commons-logging</groupId>
          <
artifactId>commons-logging</artifactId>
          <
version>${commons-logging.version}</version>
      </
dependency>
      <
dependency>
          <
groupId>commons-lang</groupId>
          <
artifactId>commons-lang</artifactId>
          <
version>${commons-lang.version}</version>
      </
dependency>
      <
dependency>
          <
groupId>org.apache.hbase</groupId>
          <
artifactId>hbase-common</artifactId>
          <
version>${hbase.version}</version>
          <
exclusions>
              <
exclusion>
                  <
groupId>org.apache.zookeeper</groupId>
                  <
artifactId>zookeeper</artifactId>
              </
exclusion>
          </
exclusions>
      </
dependency>
      <
dependency>
          <
groupId>org.apache.zookeeper</groupId>
          <
artifactId>zookeeper</artifactId>
          <
version>${zookeeper.version}</version>
      </
dependency>
      <
dependency>
          <
groupId>org.apache.hbase</groupId>
          <
artifactId>hbase-client</artifactId>
          <
version>${hbase.version}</version>
          <
exclusions>
              <
exclusion>
                  <
groupId>org.apache.zookeeper</groupId>
                  <
artifactId>zookeeper</artifactId>
              </
exclusion>
          </
exclusions>
      </
dependency>
      <
dependency>
          <
groupId>org.apache.hadoop</groupId>
          <
artifactId>hadoop-common</artifactId>
          <
version>${hadoop.version}</version>
      </
dependency>
      <
dependency>
          <
groupId>com.google.guava</groupId>
          <
artifactId>guava</artifactId>
          <
version>${guava.version}</version>
      </
dependency>
      <
dependency>
          <
groupId>com.google.protobuf</groupId>
          <
artifactId>protobuf-java</artifactId>
          <
version>${protobuf.version}</version>
      </
dependency>
      <
dependency>
          <
groupId>com.google.code.gson</groupId>
          <
artifactId>gson</artifactId>
          <
version>${gson.version}</version>
      </
dependency>
      <
dependency>
          <
groupId>org.apache.flink</groupId>
          <
artifactId>flink-streaming-java_2.11</artifactId>
          <
version>${flink.version}</version>
      </
dependency>
      <
dependency>
          <
groupId>org.apache.flink</groupId>
          <
artifactId>flink-dist_2.11</artifactId>
          <
version>${flink.version}</version>
      </
dependency>

 

2.       实现一个读取HBase表的自定义Source

实现一个简单的自定义Source,从HBase表读取数据,

public class HBaseReaderSource extends RichSourceFunction<Tuple2<String, List<Cell>>>

 

open方法里面初始化HBase的连接和HTable

@Override
   public void open(Configuration parameters) throws Exception {
     System.out.println("Read source open");
     super.open(parameters);
     org.apache.hadoop.conf.Configuration hbaseConf = HBaseConfiguration.create();
     conn = ConnectionFactory.createConnection(hbaseConf);
     ParameterTool parameterTool = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
     String sourceTable = parameterTool.get(HBaseReaderStream.CONF_FROM_TABLE,"T2");
     table = conn.getTable(TableName.valueOf(sourceTable));
     scan = new Scan();
 }

Run方法里面实现简单的读数据逻辑

@Override
   public void run(SourceContext<Tuple2<String, List<Cell>>> sourceContext) throws Exception {
     System.out.println("Read source run");
     ResultScanner scanner = table.getScanner(scan);
     Iterator<Result> iterator = scanner.iterator();
     while (iterator.hasNext()) {
         Result result = iterator.next();
         String rowkey = Bytes.toString(result.getRow());
         List<Cell> cells = result.listCells();
         Tuple2<String, List<Cell>> tuple2 = new Tuple2<>();
         tuple2.setFields(rowkey, cells);
         sourceContext.collect(tuple2);
     }
 }

cancel方法中释放htable

@Override
   public void cancel() {
     try {
         if (table != null) {
             table.close();
         }
         if (conn != null) {
             conn.close();
         }
     } catch (IOException e) {
         logger.error("Close HBase Exception:", e.toString());
     }
 }

 

3.       实现一个写入HBase表的自定义Sink

实现一个简单的自定义Sink,向HBase目的表写入数据

public class HBaseWriterSink extends RichSinkFunction<Tuple2<String, List<Cell>>>

Open方法和Close方法和上面source的写法类似,就不重复写了

Invoke方法实现写入HBase

@Override
   public void invoke(Tuple2<String, List<Cell>> value, Context context) throws Exception {
     Put put = new Put(Bytes.toBytes(value.f0));
     List<Cell> cells = value.f1;
     for (Cell cell : cells) {
         put.add(cell);
     }
     table.put(put);
 }

 

4.       应用的启动类

写一个flink应用的启动类,主要是设置sourcesink,然后执行任务

    Main函数主要代码为:

ParameterTool parameterTool = ParameterTool.fromArgs(args);
   final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
   
   // 参数设置
   env.getConfig().setGlobalJobParameters(parameterTool);
   
 env.enableCheckpointing(5000);
 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
 System.out.println("HBase Reader add source");
 DataStreamSource<Tuple2<String, List<Cell>>> stream = env.addSource(new HBaseReaderSource());
   // stream.print().setParallelism(1);
 // NO.1 Sink
   System.out.println("HBase Reader add sink");
 stream.addSink(new HBaseWriterSink());
   // NO.2 Output
 //System.out.println("HBase Reader output format");
 //stream.writeUsingOutputFormat(new HBaseOutputFormat());
   env.execute();

 

工程中配置hbase-site.xml

在工程中加入一个hbase-site.xml,用于连接HBase,这里因为是非kerberos集群,只需要配置zookeeper地址即可

<?xml version="1.0" encoding="UTF-8"?>
   <configuration>
     <property>
         <name>hbase.zookeeper.quorum</name>
         <value>ecs-XXXXXX</value>
     </property>
   </configuration>

 

启动Flink应用

1.       工程写好了之后,可以将此工程打包,使用flink客户端提交的方式去运行,这里我们为了方便,就直接在本地的IDEA里面直接运行程序,

本地运行会启动一个Flink mini cluster,调试起来比较方便,在Main函数的类上直接执行

2.       hbase中查看,T2表为源表,T3表为目的表,数据已经完完整整的写入到了T3


【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。