前言
刚刚开始接触Flink,由于之前有点HBase经验,便想着使用Flink去做个简单的HBase读写程序,后续的Flink学习也就都基于HBase来做。
准备
部署一个单机版的HBase,单机版部署HBase可以参考https://bbs.huaweicloud.com/blogs/197806,在HBase中创建两张表,一个是T2,一个是T3,这个例子便是使用Flink从T2表把数据读取出来并写入到T3表中,我们提前在T2表中写入一定量的数据。
Flink就不单独部署了,这里例子中,我们使用IDE启动的方式,方便调试。
实现Flink写HBase的应用代码
1.建立一个maven工程,pom.xml中加入如下依赖,其中加入org.apache.flink:flink-dist_2.11是
为了在IDE里面运行Flink mini cluster
<dependencies> <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>${commons-logging.version}</version> </dependency> <dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> <version>${commons-lang.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-common</artifactId> <version>${hbase.version}</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>${zookeeper.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>${hbase.version}</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>${guava.version}</version> </dependency> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>${protobuf.version}</version> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>${gson.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-dist_2.11</artifactId> <version>${flink.version}</version> </dependency> |
2. 实现一个读取HBase表的自定义Source
实现一个简单的自定义Source,从HBase表读取数据,
public class HBaseReaderSource extends RichSourceFunction<Tuple2<String, List<Cell>>> |
open方法里面初始化HBase的连接和HTable
@Override public void open(Configuration parameters) throws Exception { System.out.println("Read source open"); super.open(parameters); org.apache.hadoop.conf.Configuration hbaseConf = HBaseConfiguration.create(); conn = ConnectionFactory.createConnection(hbaseConf); ParameterTool parameterTool = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); String sourceTable = parameterTool.get(HBaseReaderStream.CONF_FROM_TABLE,"T2"); table = conn.getTable(TableName.valueOf(sourceTable)); scan = new Scan(); } |
Run方法里面实现简单的读数据逻辑
@Override public void run(SourceContext<Tuple2<String, List<Cell>>> sourceContext) throws Exception { System.out.println("Read source run"); ResultScanner scanner = table.getScanner(scan); Iterator<Result> iterator = scanner.iterator(); while (iterator.hasNext()) { Result result = iterator.next(); String rowkey = Bytes.toString(result.getRow()); List<Cell> cells = result.listCells(); Tuple2<String, List<Cell>> tuple2 = new Tuple2<>(); tuple2.setFields(rowkey, cells); sourceContext.collect(tuple2); } } |
cancel方法中释放htable
@Override public void cancel() { try { if (table != null) { table.close(); } if (conn != null) { conn.close(); } } catch (IOException e) { logger.error("Close HBase Exception:", e.toString()); } } |
3. 实现一个写入HBase表的自定义Sink
实现一个简单的自定义Sink,向HBase目的表写入数据
public class HBaseWriterSink extends RichSinkFunction<Tuple2<String, List<Cell>>> |
Open方法和Close方法和上面source的写法类似,就不重复写了
Invoke方法实现写入HBase表
@Override public void invoke(Tuple2<String, List<Cell>> value, Context context) throws Exception { Put put = new Put(Bytes.toBytes(value.f0)); List<Cell> cells = value.f1; for (Cell cell : cells) { put.add(cell); } table.put(put); } |
4. 应用的启动类
写一个flink应用的启动类,主要是设置source和sink,然后执行任务
Main函数主要代码为:
ParameterTool parameterTool = ParameterTool.fromArgs(args); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 参数设置 env.getConfig().setGlobalJobParameters(parameterTool); env.enableCheckpointing(5000); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); System.out.println("HBase Reader add source"); DataStreamSource<Tuple2<String, List<Cell>>> stream = env.addSource(new HBaseReaderSource()); // stream.print().setParallelism(1); // NO.1 Sink System.out.println("HBase Reader add sink"); stream.addSink(new HBaseWriterSink()); // NO.2 Output //System.out.println("HBase Reader output format"); //stream.writeUsingOutputFormat(new HBaseOutputFormat()); env.execute(); |
工程中配置hbase-site.xml
在工程中加入一个hbase-site.xml,用于连接HBase,这里因为是非kerberos集群,只需要配置zookeeper地址即可
<?xml version="1.0" encoding="UTF-8"?> <configuration> <property> <name>hbase.zookeeper.quorum</name> <value>ecs-XXXXXX</value> </property> </configuration> |
启动Flink应用
1. 工程写好了之后,可以将此工程打包,使用flink客户端提交的方式去运行,这里我们为了方便,就直接在本地的IDEA里面直接运行程序,
本地运行会启动一个Flink mini cluster,调试起来比较方便,在Main函数的类上直接执行
2. 在hbase中查看,T2表为源表,T3表为目的表,数据已经完完整整的写入到了T3表
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
评论(0)