Apache CarbonData 2.0 开发实用系列之四:Flink流数据入库CarbonData表
【内容提要】
本文介绍如何使用Flink将流数据入库到CarbonData表,并使用Spark SQL实时查询CarbonData表数据
【准备Flink】
1. 下载Flink 1.8.0
下载页面 https://archive.apache.org/dist/flink/flink-1.8.0/
选择下载flink-1.8.0-bin-scala_2.11.tgz
2. 解压到某个目录(FLINK_HOME)
【准备Spark】
参考 [Apache CarbonData 2.0 开发实用系列之一:与Spark SQL集成使用]的【准备Spark】章节
【安装CarbonData】
1. 下载flink carbon FileSystem集成包和writer相关依赖包
直接下载附件dependency_jar.zip后解压出jar包,包含以下文件:
carbondata-flink-proxy-2.0.1.jar carbondata-flink-2.0.1.jar commons-configuration-1.6.jar commons-lang-2.6.jar guava-11.0.2.jar hadoop-auth-2.7.7.jar hadoop-common-2.7.7.jar hadoop-mapreduce-client-core-2.7.7.jar
或者在maven仓库下载上述文件
https://repository.apache.org/content/repositories/public
2. 下载carbondata包
下载地址:
3. 将上述所有jar包拷贝到${FLINK_HOME}/lib目录
备注:其中carbondata-flink-proxy-2.0.1.jar需要放置在lib目录中,其他包也可以通过--addclasspath添加,此次为了简单处理,都放置在lib目录中。
【创建CarbonData表】
1.启动spark-sql
cd $SPARK_HOME ./bin/spark-sql \ --conf spark.sql.extensions=org.apache.spark.sql.CarbonExtensions \ --jars /opt/bigdata/flink-1.8.0/apache-carbondata-2.0.1-bin-spark2.4.5-hadoop2.7.2.jar
2. 创建test表
create database flinkdb; create table flinkdb.test(col1 string, col2 string, col3 string) stored as carbondata;
3. 查询数据
--结果显示0 select count(*) from flinkdb.test;
4. 查看表的location
describe formatted flinkdb.test
从输出结果中找到Location,大概路径为:${SPARK_HOME}/spark-warehouse/flinkdb/test
5. 保持spark-sql窗口,待入库后查询数据
【入库数据】
1. 启动Flink scala shell
cd ${FLINK_HOME} export TERM=xterm-color bin/start-scala-shell.sh local
2. 入库数据
执行以下代码,注意需要将location变量设置为【创建CarbonData表】章节的“4. 查看表的location”中找到的Location(红色标记)
import java.util.Properties import org.apache.carbon.flink.CarbonLocalProperty import org.apache.carbon.flink.CarbonWriterFactory import org.apache.carbon.flink.ProxyFileSystem import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.flink.api.common.restartstrategy.RestartStrategies import org.apache.flink.core.fs.Path import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink import org.apache.flink.streaming.api.functions.source.SourceFunction val databaseName = "flinkdb" val tableName = "test" // location需替换为实际的路径,使用【创建Carbon表】章节的“4. 查看表的location”的location, val location = "${SPARK_HOME}/spark-warehouse/flinkdb/test" val tempLocation = location + "/temp" val tableProperties = new Properties val writerProperties = new Properties writerProperties.setProperty(CarbonLocalProperty.DATA_TEMP_PATH, tempLocation) val carbonProperties = new Properties carbonProperties.setProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) carbonProperties.setProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT) carbonProperties.setProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB, "1024") val writerFactory = CarbonWriterFactory.builder("Local").build( databaseName, tableName, location, tableProperties, writerProperties, carbonProperties ) senv.setParallelism(1) senv.enableCheckpointing(2000L) senv.setRestartStrategy(RestartStrategies.noRestart) // Define a custom source. val source = new SourceFunction[Array[AnyRef]]() { var count = 0 override def run(sourceContext: SourceFunction.SourceContext[Array[AnyRef]]): Unit = { val data = Array[AnyRef]("value1", "value2", "value3") while (count < 1000) { sourceContext.collect(data) count = count + 1 Thread.sleep(1000) } } override def cancel(): Unit = { // do something. count = 1000 } } val stream = senv.addSource(source) val streamSink = StreamingFileSink.forBulkFormat( new Path(ProxyFileSystem.DEFAULT_URI), writerFactory).build stream.addSink(streamSink) try { senv.execute } catch { case exception: Exception => // TODO throw new UnsupportedOperationException(exception) }
【查询数据】
1. 切换回spark-sql窗口,等待2分钟,让Flink写入一些数据
2. 挂载Flink已入库数据(从stage区挂载到carbondata表空间)
insert into finkdb.test stage
3. 查询数据
--结果显示已入库数据条数 select count(*) from flinkdb.test
4. 重复步骤2,可以将新入库数据挂载到carbondata表, 然后在步骤3,将查询到更多的数据
- 点赞
- 收藏
- 关注作者
评论(0)