Apache IoTDB开发系统整合之TsFile-Spark-Connector
TsFile-Spark-Connector用户指南
1. TsFile-Spark-Connector简介
TsFile-Spark-Connector 实现了 Spark 对 Tsfile 类型的外部数据源的支持。这使用户能够通过Spark读取,写入和查询Tsfile。
使用此连接器,您可以
- 将单个 TsFile 从本地文件系统或 hdfs 加载到 Spark 中
- 将特定目录中的所有文件从本地文件系统或HDFS加载到Spark中
- 将数据从 Spark 写入 TsFile
2. 系统要求
Spark Version | Scala Version | Java Version | TsFile |
---|---|---|---|
2.4.3 |
2.11.8 |
1.8 |
0.10.0 |
注意:有关如何下载和使用 TsFile 的更多信息,请参阅以下链接:https://github.com/apache/incubator-iotdb/tree/master/tsfile.
3. 快速入门
本地模式
在本地模式下使用 TsFile-Spark-Connector 启动 Spark:
./<spark-shell-path> --jars tsfile-spark-connector.jar,tsfile-0.10.0-jar-with-dependencies.jar
注意:
- is the real path of your spark-shell.
- Multiple jar packages are separated by commas without any spaces.
分布式模式
在分布式模式下使用 TsFile-Spark-Connector 启动 Spark(即 Spark 集群通过 spark-shell 连接):
. /<spark-shell-path> --jars tsfile-spark-connector.jar,tsfile-{version}-jar-with-dependencies.jar --master spark://ip:7077
4. Data Type Correspondence
TsFile data type | SparkSQL data type |
---|---|
BOOLEAN | BooleanType |
INT32 | IntegerType |
INT64 | LongType |
FLOAT | FloatType |
DOUBLE | DoubleType |
TEXT | StringType |
5. 模式推理
显示 TsFile 的方式取决于架构。以以下 TsFile 结构为例: TsFile 架构中有三个度量:状态、温度和硬件。这三项测量的基本信息如下:
Name | Type | Encode | |||
---|---|---|---|---|---|
status | Boolean | PLAIN | |||
temperature | Float | RLE | |||
hardware | Text | PLAIN |
TsFile 中的现有数据如下:
time | root.ln.wf02.wt02.temperature | root.ln.wf02.wt02.status | root.ln.wf02.wt02.hardware | root.ln.wf01.wt01.temperature | root.ln.wf01.wt01.status | root.ln.wf01.wt01.hardware |
---|---|---|---|---|---|---|
1 | null | true | null | 2.2 | true | null |
2 | null | false | aaa | 2.2 | null | null |
3 | null | null | null | 2.1 | true | null |
4 | null | true | bbb | null | null | null |
5 | null | null | null | null | false | null |
6 | null | null | ccc | null |
您也可以使用窄表形式,如下所示:(您可以看到第 6 部分有关如何使用窄格式)
time | device_name | status | hardware | temperature |
---|---|---|---|---|
1 | root.ln.wf02.wt01 | true | null | 2.2 |
1 | root.ln.wf02.wt02 | true | null | null |
2 | root.ln.wf02.wt01 | null | null | 2.2 |
2 | root.ln.wf02.wt02 | false | aaa | null |
3 | root.ln.wf02.wt01 | true | null | 2.1 |
4 | root.ln.wf02.wt02 | true | bbb | null |
5 | root.ln.wf02.wt01 | false | null | null |
6 | root.ln.wf02.wt02 | null | ccc | null |
6. Scala API
注意:请记住提前分配必要的读取和写入权限。
示例 1:从本地文件系统读取
import org.apache.iotdb.tsfile._
val wide_df = spark.read.tsfile("test.tsfile")
wide_df.show
val narrow_df = spark.read.tsfile("test.tsfile", true)
narrow_df.show
示例 2:从 Hadoop文件系统读取
import org.apache.iotdb.tsfile._
val wide_df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile")
wide_df.show
val narrow_df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile", true)
narrow_df.show
示例 3:从特定目录读取
import org.apache.iotdb.tsfile._
val df = spark.read.tsfile("hdfs://localhost:9000/usr/hadoop")
df.show
注 1:目前不支持目录中所有 TsFiles 的全局时间排序。
注 2:同名测量应具有相同的架构。
示例 4:宽格式查询
import org.apache.iotdb.tsfile._
val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile")
df.createOrReplaceTempView("tsfile_table")
val newDf = spark.sql("select * from tsfile_table where `device_1.sensor_1`>0 and `device_1.sensor_2` < 22")
newDf.show
import org.apache.iotdb.tsfile._
val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile")
df.createOrReplaceTempView("tsfile_table")
val newDf = spark.sql("select count(*) from tsfile_table")
newDf.show
示例 5:窄格式查询
import org.apache.iotdb.tsfile._
val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile", true)
df.createOrReplaceTempView("tsfile_table")
val newDf = spark.sql("select * from tsfile_table where device_name = 'root.ln.wf02.wt02' and temperature > 5")
newDf.show
import org.apache.iotdb.tsfile._
val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile", true)
df.createOrReplaceTempView("tsfile_table")
val newDf = spark.sql("select count(*) from tsfile_table")
newDf.show
示例 6:以宽格式写入
// we only support wide_form table to write
import org.apache.iotdb.tsfile._
val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile")
df.show
df.write.tsfile("hdfs://localhost:9000/output")
val newDf = spark.read.tsfile("hdfs://localhost:9000/output")
newDf.show
示例 6:以窄格式书写
// we only support wide_form table to write
import org.apache.iotdb.tsfile._
val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile", true)
df.show
df.write.tsfile("hdfs://localhost:9000/output", true)
val newDf = spark.read.tsfile("hdfs://localhost:9000/output", true)
newDf.show
附录 A:架构推理的旧设计
显示 TsFile 的方式与 TsFile 架构有关。以以下 TsFile 结构为例:TsFile 的架构中有三个度量:状态、温度和硬件。这三个测量的基本信息如下:
名字 | 类型 | 编码 | |||
---|---|---|---|---|---|
地位 | 布尔 | 平原 | |||
温度 | 浮 | RLE | |||
硬件 | 发短信 | 平原 |
测量基本信息
文件中的现有数据如下所示:
delta_object:root.ln.wf01.wt01 | delta_object:root.ln.wf02.wt02 | delta_object:root.sgcc.wf03.wt01 | |||||||||
---|---|---|---|---|---|---|---|---|---|---|---|
地位 | 温度 | 硬件 | 地位 | 地位 | 温度 | ||||||
时间 | 价值 | 时间 | 价值 | 时间 | 价值 | 时间 | 价值 | 时间 | 价值 | 时间 | 价值 |
1 | 真 | 1 | 2.2 | 2 | “啊” | 1 | 真 | 2 | 真 | 3 | 3.3 |
3 | 真 | 2 | 2.2 | 4 | “咔嚓” | 2 | 假 | 3 | 真 | 6 | 6.6 |
5 | 假 | 3 | 2.1 | 6 | “抄送” | 4 | 真 | 4 | 真 | 8 | 8.8 |
7 | 真 | 4 | 2.0 | 8 | “爹” | 5 | 假 | 6 | 真 | 9 | 9.9 |
一组时间序列数据
有两种方法可以显示它:
默认方式
将创建两列来存储设备的完整路径:time(LongType)和delta_object(StringType)。
time
:时间戳,长类型delta_object
: Delta_object ID, 字符串类型
接下来,为每个度量创建一个列来存储特定数据。SparkSQL 表结构如下:
时间(长型) | delta_object(字符串类型) | 状态(布尔类型) | 温度(浮子型) | 硬件(字符串类型) |
---|---|---|---|---|
1 | 根.ln.wf01.wt01 | 真 | 2.2 | 零 |
1 | 根.ln.wf02.wt02 | 真 | 零 | 零 |
2 | 根.ln.wf01.wt01 | 零 | 2.2 | 零 |
2 | 根.ln.wf02.wt02 | 假 | 零 | “啊” |
2 | 根.sgcc.wf03.wt01 | 真 | 零 | 零 |
3 | 根.ln.wf01.wt01 | 真 | 2.1 | 零 |
3 | 根.sgcc.wf03.wt01 | 真 | 3.3 | 零 |
4 | 根.ln.wf01.wt01 | 零 | 2.0 | 零 |
4 | 根.ln.wf02.wt02 | 真 | 零 | “咔嚓” |
4 | 根.sgcc.wf03.wt01 | 真 | 零 | 零 |
5 | 根.ln.wf01.wt01 | 假 | 零 | 零 |
5 | 根.ln.wf02.wt02 | 假 | 零 | 零 |
5 | 根.sgcc.wf03.wt01 | 真 | 零 | 零 |
6 | 根.ln.wf02.wt02 | 零 | 零 | “抄送” |
6 | 根.sgcc.wf03.wt01 | 零 | 6.6 | 零 |
7 | 根.ln.wf01.wt01 | 真 | 零 | 零 |
8 | 根.ln.wf02.wt02 | 零 | 零 | “爹” |
8 | 根.sgcc.wf03.wt01 | 零 | 8.8 | 零 |
9 | 根.sgcc.wf03.wt01 | 零 | 9.9 | 零 |
展开delta_object列
将设备列按“.”展开为多个列,忽略根目录“root”。方便进行更丰富的聚合操作。如果用户想使用这种显示方式,需要在表创建语句中设置参数“delta_object_name”(参考本手册第5.5节中的示例1),如本例中参数“delta_object_name”设置为“root.device.turbine”。路径层数需要一对一。此时,将为设备路径的每个层(“根”层除外)创建一列。列名是参数中的名称,值是设备相应层的名称。接下来,将为每个测量创建一个列来存储特定数据。
那么 SparkSQL 表结构如下:
time(LongType) | group(StringType) | field(StringType) | device(StringType) | status(BooleanType) | temperature(FloatType) | hardware(StringType) |
---|---|---|---|---|---|---|
1 | ln | wf01 | wt01 | True | 2.2 | null |
1 | ln | wf02 | wt02 | True | null | null |
2 | ln | wf01 | wt01 | null | 2.2 | null |
2 | ln | wf02 | wt02 | False | null | “aaa” |
2 | sgcc | wf03 | wt01 | True | null | null |
3 | ln | wf01 | wt01 | True | 2.1 | null |
3 | sgcc | wf03 | wt01 | True | 3.3 | null |
4 | ln | wf01 | wt01 | null | 2.0 | null |
4 | ln | wf02 | wt02 | True | null | “bbb” |
4 | sgcc | wf03 | wt01 | True | null | null |
5 | ln | wf01 | wt01 | False | null | null |
5 | ln | wf02 | wt02 | False | null | null |
5 | sgcc | wf03 | wt01 | True | null | null |
6 | ln | wf02 | wt02 | null | null | “ccc” |
6 | sgcc | wf03 | wt01 | null | 6.6 | null |
7 | ln | wf01 | wt01 | True | null | null |
8 | ln | wf02 | wt02 | null | null | “ddd” |
8 | sgcc | wf03 | wt01 | null | 8.8 | null |
9 | sgcc | wf03 | wt01 | null | 9.9 |
TsFile-Spark-Connector 可以在 SparkSQL By SparkSQL 中将一个或多个 TsFiles 显示为表。它还允许用户指定单个目录或使用通配符来匹配多个目录。如果有多个 TsFiles,则所有 TsFiles 中测量值的并集将保留在表中,并且默认情况下,具有相同名称的度量将具有相同的数据类型。请注意,如果存在名称相同但数据类型不同的情况,TsFile-Spark-Connector 将不保证结果的正确性。
写入过程是将数据帧写入为一个或多个 TsFiles。默认情况下,需要包含两列:时间和delta_object。其余列用作度量。
- 点赞
- 收藏
- 关注作者
评论(0)