Apache CarbonData 2.0 开发实用系列之四:Flink流数据入库CarbonData表
【摘要】 本文介绍如何使用Flink将流数据入库到CarbonData表,并使用Spark SQL实时查询CarbonData表数据
【内容提要】
本文介绍如何使用Flink将流数据入库到CarbonData表,并使用Spark SQL实时查询CarbonData表数据
【准备Flink】
1. 下载Flink 1.8.0
下载页面 https://archive.apache.org/dist/flink/flink-1.8.0/
选择下载flink-1.8.0-bin-scala_2.11.tgz
2. 解压到某个目录(FLINK_HOME)
【准备Spark】
参考 [Apache CarbonData 2.0 开发实用系列之一:与Spark SQL集成使用]的【准备Spark】章节
【安装CarbonData】
1. 下载flink carbon FileSystem集成包和writer相关依赖包
直接下载附件dependency_jar.zip后解压出jar包,包含以下文件:
carbondata-flink-proxy-2.0.1.jar
carbondata-flink-2.0.1.jar
commons-configuration-1.6.jar
commons-lang-2.6.jar
guava-11.0.2.jar
hadoop-auth-2.7.7.jar
hadoop-common-2.7.7.jar
hadoop-mapreduce-client-core-2.7.7.jar
或者在maven仓库下载上述文件
https://repository.apache.org/content/repositories/public
2. 下载carbondata包
下载地址:
3. 将上述所有jar包拷贝到${FLINK_HOME}/lib目录
备注:其中carbondata-flink-proxy-2.0.1.jar需要放置在lib目录中,其他包也可以通过--addclasspath添加,此次为了简单处理,都放置在lib目录中。
【创建CarbonData表】
1.启动spark-sql
cd $SPARK_HOME
./bin/spark-sql \
--conf spark.sql.extensions=org.apache.spark.sql.CarbonExtensions \
--jars /opt/bigdata/flink-1.8.0/apache-carbondata-2.0.1-bin-spark2.4.5-hadoop2.7.2.jar
2. 创建test表
create database flinkdb;
create table flinkdb.test(col1 string, col2 string, col3 string) stored as carbondata;
3. 查询数据
--结果显示0
select count(*) from flinkdb.test;
4. 查看表的location
describe formatted flinkdb.test
从输出结果中找到Location,大概路径为:${SPARK_HOME}/spark-warehouse/flinkdb/test
5. 保持spark-sql窗口,待入库后查询数据
【入库数据】
1. 启动Flink scala shell
cd ${FLINK_HOME}
export TERM=xterm-color
bin/start-scala-shell.sh local
2. 入库数据
执行以下代码,注意需要将location变量设置为【创建CarbonData表】章节的“4. 查看表的location”中找到的Location(红色标记)
import java.util.Properties
import org.apache.carbon.flink.CarbonLocalProperty
import org.apache.carbon.flink.CarbonWriterFactory
import org.apache.carbon.flink.ProxyFileSystem
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.functions.source.SourceFunction
val databaseName = "flinkdb"
val tableName = "test"
// location需替换为实际的路径,使用【创建Carbon表】章节的“4. 查看表的location”的location,
val location = "${SPARK_HOME}/spark-warehouse/flinkdb/test"
val tempLocation = location + "/temp"
val tableProperties = new Properties
val writerProperties = new Properties
writerProperties.setProperty(CarbonLocalProperty.DATA_TEMP_PATH, tempLocation)
val carbonProperties = new Properties
carbonProperties.setProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
carbonProperties.setProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
carbonProperties.setProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB, "1024")
val writerFactory = CarbonWriterFactory.builder("Local").build(
databaseName,
tableName,
location,
tableProperties,
writerProperties,
carbonProperties
)
senv.setParallelism(1)
senv.enableCheckpointing(2000L)
senv.setRestartStrategy(RestartStrategies.noRestart)
// Define a custom source.
val source = new SourceFunction[Array[AnyRef]]() {
var count = 0
override
def run(sourceContext: SourceFunction.SourceContext[Array[AnyRef]]): Unit = {
val data = Array[AnyRef]("value1", "value2", "value3")
while (count < 1000) {
sourceContext.collect(data)
count = count + 1
Thread.sleep(1000)
}
}
override
def cancel(): Unit = {
// do something.
count = 1000
}
}
val stream = senv.addSource(source)
val streamSink = StreamingFileSink.forBulkFormat(
new Path(ProxyFileSystem.DEFAULT_URI), writerFactory).build
stream.addSink(streamSink)
try {
senv.execute
} catch {
case exception: Exception =>
// TODO
throw new UnsupportedOperationException(exception)
}
【查询数据】
1. 切换回spark-sql窗口,等待2分钟,让Flink写入一些数据
2. 挂载Flink已入库数据(从stage区挂载到carbondata表空间)
insert into finkdb.test stage
3. 查询数据
--结果显示已入库数据条数
select count(*) from flinkdb.test
4. 重复步骤2,可以将新入库数据挂载到carbondata表, 然后在步骤3,将查询到更多的数据
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
作者其他文章
养猪技术专家2020/10/31 02:00:501楼编辑删除举报
ihao2020/10/31 02:10:232楼编辑删除举报