大数据Kudu(十):Flink操作Kudu

举报
Lansonli 发表于 2022/12/30 22:48:50 2022/12/30
1.6k+ 0 1
【摘要】 Flink操作KuduFlink主要应用场景是流式数据处理上,有些公司针对流式数据使用Flink实时分析后将结果存入Kudu,例如快手公司。这里将实时计算的结果存入Kudu需要自定义Flink Kudu Sink。场景:Flink实时读取Socket数据,将结果存入Kudu表t_flink_result,为了方便操作不再创建Kudu外表,这里在Impala中创建Kudu内表t_flink_r...

Flink操作Kudu

Flink主要应用场景是流式数据处理上,有些公司针对流式数据使用Flink实时分析后将结果存入Kudu,例如快手公司。这里将实时计算的结果存入Kudu需要自定义Flink Kudu Sink。

场景:Flink实时读取Socket数据,将结果存入Kudu表t_flink_result,为了方便操作不再创建Kudu外表,这里在Impala中创建Kudu内表t_flink_result:

create table t_flink_result
(
	id  int,
	name string,
	age int,
	primary key (id)
)
partition by hash partitions 3
stored as kudu
tblproperties(
 'kudu.master_address' = 'cm1:7150,cm2:7150'
)

在Maven中导入以下Flink 包依赖:

<!-- Flink 开发Scala需要导入以下依赖 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.9.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.11</artifactId>
    <version>1.9.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-scala_2.11</artifactId>
    <version>1.9.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_2.11</artifactId>
    <version>1.9.1</version>
</dependency>

Flink 自定义KuduSink 代码如下:

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val ds: DataStream[String] = env.socketTextStream("cm3",9999)

//自定义KuduSink
ds.addSink(new RichSinkFunction[String] {
  //初始化连接Kudu对象
  var kuduClient :KuduClient = _

  //Kudu 表对象
  var kuduTable :KuduTable = _

  //创建KuduSession 客户端会话
  var session: KuduSession = _

  //初始化时调用一次,这里初始化连接Kudu的对象
  override def open(parameters: Configuration): Unit = {
    kuduClient = new KuduClientBuilder("cm1:7051,cm2:7051").build()
    kuduTable = kuduClient.openTable("impala::default.t_flink_result")
    session = kuduClient.newSession()
    //设置插入数据策略
    session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
  }

  //来一条数据这里调用一次invoke方法
  override def invoke(one: String, context: SinkFunction.Context[_]): Unit = {
    val arr: Array[String] = one.split(",")
    val id: Int = arr(0).toInt
    val name: String = arr(1)
    val age: Int = arr(2).toInt

    //准备插入的数据
    val insert: Insert = kuduTable.newInsert()
    val row: PartialRow = insert.getRow
    row.addInt("id",id)
    row.addString("name",name)
    row.addInt("age",age)

    //插入到Kudu表中
    session.apply(insert)
  }

  //当Flink 关闭时调用一次,回收连接对象
  override def close(): Unit ={
    session.close()
    kuduClient.close()
  }
})

env.execute()

启动以上Flink 代码,打开Socket 服务器,输入数据,可以在impala 中查询表t_flink_result数据,数据被写入。

【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

作者其他文章

评论(0

抱歉,系统识别当前为高风险访问,暂不支持该操作

    全部回复

    上滑加载中

    设置昵称

    在此一键设置昵称,即可参与社区互动!

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。