Apache CarbonData 2.0 开发实用系列之四:Flink流数据入库CarbonData表

举报
david_caiqiang 发表于 2020/10/29 19:59:43 2020/10/29
【摘要】 本文介绍如何使用Flink将流数据入库到CarbonData表,并使用Spark SQL实时查询CarbonData表数据

【内容提要】

本文介绍如何使用Flink将流数据入库到CarbonData表,并使用Spark SQL实时查询CarbonData表数据

【准备Flink】

1. 下载Flink 1.8.0

  下载页面 https://archive.apache.org/dist/flink/flink-1.8.0/

  选择下载flink-1.8.0-bin-scala_2.11.tgz

2. 解压到某个目录(FLINK_HOME)

【准备Spark】

参考 [Apache CarbonData 2.0 开发实用系列之一:与Spark SQL集成使用]的【准备Spark】章节

【安装CarbonData】

1. 下载flink carbon FileSystem集成包和writer相关依赖包

  直接下载附件dependency_jar.zip后解压出jar包,包含以下文件:

  carbondata-flink-proxy-2.0.1.jar
  carbondata-flink-2.0.1.jar
  commons-configuration-1.6.jar           
  commons-lang-2.6.jar      
  guava-11.0.2.jar         
  hadoop-auth-2.7.7.jar
  hadoop-common-2.7.7.jar
  hadoop-mapreduce-client-core-2.7.7.jar

  或者在maven仓库下载上述文件

  https://repository.apache.org/content/repositories/public

2. 下载carbondata包

下载地址:

https://dist.apache.org/repos/dist/release/carbondata/2.0.1/apache-carbondata-2.0.1-bin-spark2.4.5-hadoop2.7.2.jar

3. 将上述所有jar包拷贝到${FLINK_HOME}/lib目录

备注:其中carbondata-flink-proxy-2.0.1.jar需要放置在lib目录中,其他包也可以通过--addclasspath添加,此次为了简单处理,都放置在lib目录中。

【创建CarbonData表】

1.启动spark-sql

cd $SPARK_HOME

./bin/spark-sql \
--conf spark.sql.extensions=org.apache.spark.sql.CarbonExtensions  \
--jars /opt/bigdata/flink-1.8.0/apache-carbondata-2.0.1-bin-spark2.4.5-hadoop2.7.2.jar

2. 创建test表

create database flinkdb;

create table flinkdb.test(col1 string, col2 string, col3 string) stored as carbondata;

3. 查询数据

--结果显示0
select count(*) from flinkdb.test;

4. 查看表的location

describe formatted flinkdb.test

从输出结果中找到Location,大概路径为:${SPARK_HOME}/spark-warehouse/flinkdb/test

5. 保持spark-sql窗口,待入库后查询数据

【入库数据】

1. 启动Flink scala shell

cd ${FLINK_HOME}

export TERM=xterm-color

bin/start-scala-shell.sh local

2. 入库数据

执行以下代码,注意需要将location变量设置为【创建CarbonData表】章节的“4. 查看表的location”中找到的Location(红色标记)

    import java.util.Properties
    import org.apache.carbon.flink.CarbonLocalProperty
    import org.apache.carbon.flink.CarbonWriterFactory
    import org.apache.carbon.flink.ProxyFileSystem
    import org.apache.carbondata.core.constants.CarbonCommonConstants
    import org.apache.flink.api.common.restartstrategy.RestartStrategies
    import org.apache.flink.core.fs.Path
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
    import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
    import org.apache.flink.streaming.api.functions.source.SourceFunction
    
    val databaseName = "flinkdb"
    val tableName = "test"
    // location需替换为实际的路径,使用【创建Carbon表】章节的“4. 查看表的location”的location, 
    val location = "${SPARK_HOME}/spark-warehouse/flinkdb/test"
    val tempLocation = location + "/temp"
    val tableProperties = new Properties
    val writerProperties = new Properties
    writerProperties.setProperty(CarbonLocalProperty.DATA_TEMP_PATH, tempLocation)
    val carbonProperties = new Properties
    carbonProperties.setProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
      CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
    carbonProperties.setProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
      CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
    carbonProperties.setProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB, "1024")
    val writerFactory = CarbonWriterFactory.builder("Local").build(
      databaseName,
      tableName,
      location,
      tableProperties,
      writerProperties,
      carbonProperties
    )
    
    senv.setParallelism(1)
    senv.enableCheckpointing(2000L)
    senv.setRestartStrategy(RestartStrategies.noRestart)
    // Define a custom source.
    val source = new SourceFunction[Array[AnyRef]]() {
      var count = 0
      override
      def run(sourceContext: SourceFunction.SourceContext[Array[AnyRef]]): Unit = {
        val data = Array[AnyRef]("value1", "value2", "value3")
        while (count < 1000) {
          sourceContext.collect(data)
          count = count + 1
          Thread.sleep(1000)
        }
      }
      override
      def cancel(): Unit = {
        // do something.
        count = 1000
      }
    }
    val stream = senv.addSource(source)
    val streamSink = StreamingFileSink.forBulkFormat(
        new Path(ProxyFileSystem.DEFAULT_URI), writerFactory).build
    stream.addSink(streamSink)
    try {
      senv.execute
    } catch {
      case exception: Exception =>
        // TODO
        throw new UnsupportedOperationException(exception)
    }

【查询数据】

1. 切换回spark-sql窗口,等待2分钟,让Flink写入一些数据

2. 挂载Flink已入库数据(从stage区挂载到carbondata表空间)

insert into finkdb.test stage

3. 查询数据

--结果显示已入库数据条数
select count(*) from flinkdb.test

4. 重复步骤2,可以将新入库数据挂载到carbondata表, 然后在步骤3,将查询到更多的数据

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。