Apache IoTDB开发系统整合之TsFile-Spark-Connector

【摘要】 TsFile-Spark-Connector 可以在 SparkSQL By SparkSQL 中将一个或多个 TsFiles 显示为表。它还允许用户指定单个目录或使用通配符来匹配多个目录。如果有多个 TsFiles,则所有 TsFiles 中测量值的并集将保留在表中,并且默认情况下,具有相同名称的度量将具有相同的数据类型。


1. TsFile-Spark-Connector简介

TsFile-Spark-Connector 实现了 Spark 对 Tsfile 类型的外部数据源的支持。这使用户能够通过Spark读取,写入和查询Tsfile。


  • 将单个 TsFile 从本地文件系统或 hdfs 加载到 Spark 中
  • 将特定目录中的所有文件从本地文件系统或HDFS加载到Spark中
  • 将数据从 Spark 写入 TsFile

2. 系统要求

Spark Version Scala Version Java Version TsFile
2.4.3 2.11.8 1.8 0.10.0

注意:有关如何下载和使用 TsFile 的更多信息,请参阅以下链接:https://github.com/apache/incubator-iotdb/tree/master/tsfile.

3. 快速入门


在本地模式下使用 TsFile-Spark-Connector 启动 Spark:

  1. ./<spark-shell-path> --jars tsfile-spark-connector.jar,tsfile-0.10.0-jar-with-dependencies.jar


  • is the real path of your spark-shell.
  • Multiple jar packages are separated by commas without any spaces.


在分布式模式下使用 TsFile-Spark-Connector 启动 Spark(即 Spark 集群通过 spark-shell 连接):

  1. . /<spark-shell-path> --jars tsfile-spark-connector.jar,tsfile-{version}-jar-with-dependencies.jar --master spark://ip:7077

4. Data Type Correspondence

TsFile data type SparkSQL data type
BOOLEAN BooleanType
INT32 IntegerType
INT64 LongType
FLOAT FloatType
DOUBLE DoubleType
TEXT StringType

5. 模式推理

显示 TsFile 的方式取决于架构。以以下 TsFile 结构为例: TsFile 架构中有三个度量:状态、温度和硬件。这三项测量的基本信息如下:

Name Type Encode
status Boolean PLAIN
temperature Float RLE
hardware Text PLAIN

TsFile 中的现有数据如下:

time root.ln.wf02.wt02.temperature root.ln.wf02.wt02.status root.ln.wf02.wt02.hardware root.ln.wf01.wt01.temperature root.ln.wf01.wt01.status root.ln.wf01.wt01.hardware
1 null true null 2.2 true null
2 null false aaa 2.2 null null
3 null null null 2.1 true null
4 null true bbb null null null
5 null null null null false null
6 null null ccc null

您也可以使用窄表形式,如下所示:(您可以看到第 6 部分有关如何使用窄格式)

time device_name status hardware temperature
1 root.ln.wf02.wt01 true null 2.2
1 root.ln.wf02.wt02 true null null
2 root.ln.wf02.wt01 null null 2.2
2 root.ln.wf02.wt02 false aaa null
3 root.ln.wf02.wt01 true null 2.1
4 root.ln.wf02.wt02 true bbb null
5 root.ln.wf02.wt01 false null null
6 root.ln.wf02.wt02 null ccc null

6. Scala API


示例 1:从本地文件系统读取

  1. import org.apache.iotdb.tsfile._
  2. val wide_df = spark.read.tsfile("test.tsfile")
  3. wide_df.show
  4. val narrow_df = spark.read.tsfile("test.tsfile", true)
  5. narrow_df.show

示例 2:从 Hadoop文件系统读取

  1. import org.apache.iotdb.tsfile._
  2. val wide_df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile")
  3. wide_df.show
  4. val narrow_df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile", true)
  5. narrow_df.show

示例 3:从特定目录读取

  1. import org.apache.iotdb.tsfile._
  2. val df = spark.read.tsfile("hdfs://localhost:9000/usr/hadoop")
  3. df.show

注 1:目前不支持目录中所有 TsFiles 的全局时间排序。

注 2:同名测量应具有相同的架构。

示例 4:宽格式查询

  1. import org.apache.iotdb.tsfile._
  2. val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile")
  3. df.createOrReplaceTempView("tsfile_table")
  4. val newDf = spark.sql("select * from tsfile_table where `device_1.sensor_1`>0 and `device_1.sensor_2` < 22")
  5. newDf.show

  1. import org.apache.iotdb.tsfile._
  2. val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile")
  3. df.createOrReplaceTempView("tsfile_table")
  4. val newDf = spark.sql("select count(*) from tsfile_table")
  5. newDf.show

示例 5:窄格式查询

  1. import org.apache.iotdb.tsfile._
  2. val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile", true)
  3. df.createOrReplaceTempView("tsfile_table")
  4. val newDf = spark.sql("select * from tsfile_table where device_name = 'root.ln.wf02.wt02' and temperature > 5")
  5. newDf.show

  1. import org.apache.iotdb.tsfile._
  2. val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile", true)
  3. df.createOrReplaceTempView("tsfile_table")
  4. val newDf = spark.sql("select count(*) from tsfile_table")
  5. newDf.show

示例 6:以宽格式写入

  1. // we only support wide_form table to write
  2. import org.apache.iotdb.tsfile._
  3. val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile")
  4. df.show
  5. df.write.tsfile("hdfs://localhost:9000/output")
  6. val newDf = spark.read.tsfile("hdfs://localhost:9000/output")
  7. newDf.show

示例 6:以窄格式书写

  1. // we only support wide_form table to write
  2. import org.apache.iotdb.tsfile._
  3. val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile", true)
  4. df.show
  5. df.write.tsfile("hdfs://localhost:9000/output", true)
  6. val newDf = spark.read.tsfile("hdfs://localhost:9000/output", true)
  7. newDf.show

附录 A:架构推理的旧设计

显示 TsFile 的方式与 TsFile 架构有关。以以下 TsFile 结构为例:TsFile 的架构中有三个度量:状态、温度和硬件。这三个测量的基本信息如下:

名字 类型 编码
地位 布尔 平原
温度 RLE
硬件 发短信 平原



delta_object:root.ln.wf01.wt01 delta_object:root.ln.wf02.wt02 delta_object:root.sgcc.wf03.wt01
地位 温度 硬件 地位 地位 温度
时间 价值 时间 价值 时间 价值 时间 价值 时间 价值 时间 价值
1 1 2.2 2 “啊” 1 2 3 3.3
3 2 2.2 4 “咔嚓” 2 3 6 6.6
5 3 2.1 6 “抄送” 4 4 8 8.8
7 4 2.0 8 “爹” 5 6 9 9.9





  • time:时间戳,长类型
  • delta_object: Delta_object ID, 字符串类型

接下来,为每个度量创建一个列来存储特定数据。SparkSQL 表结构如下:

时间(长型) delta_object(字符串类型) 状态(布尔类型) 温度(浮子型) 硬件(字符串类型)
1 根.ln.wf01.wt01 2.2
1 根.ln.wf02.wt02
2 根.ln.wf01.wt01 2.2
2 根.ln.wf02.wt02 “啊”
2 根.sgcc.wf03.wt01
3 根.ln.wf01.wt01 2.1
3 根.sgcc.wf03.wt01 3.3
4 根.ln.wf01.wt01 2.0
4 根.ln.wf02.wt02 “咔嚓”
4 根.sgcc.wf03.wt01
5 根.ln.wf01.wt01
5 根.ln.wf02.wt02
5 根.sgcc.wf03.wt01
6 根.ln.wf02.wt02 “抄送”
6 根.sgcc.wf03.wt01 6.6
7 根.ln.wf01.wt01
8 根.ln.wf02.wt02 “爹”
8 根.sgcc.wf03.wt01 8.8
9 根.sgcc.wf03.wt01 9.9



那么 SparkSQL 表结构如下:

time(LongType) group(StringType) field(StringType) device(StringType) status(BooleanType) temperature(FloatType) hardware(StringType)
1 ln wf01 wt01 True 2.2 null
1 ln wf02 wt02 True null null
2 ln wf01 wt01 null 2.2 null
2 ln wf02 wt02 False null “aaa”
2 sgcc wf03 wt01 True null null
3 ln wf01 wt01 True 2.1 null
3 sgcc wf03 wt01 True 3.3 null
4 ln wf01 wt01 null 2.0 null
4 ln wf02 wt02 True null “bbb”
4 sgcc wf03 wt01 True null null
5 ln wf01 wt01 False null null
5 ln wf02 wt02 False null null
5 sgcc wf03 wt01 True null null
6 ln wf02 wt02 null null “ccc”
6 sgcc wf03 wt01 null 6.6 null
7 ln wf01 wt01 True null null
8 ln wf02 wt02 null null “ddd”
8 sgcc wf03 wt01 null 8.8 null
9 sgcc wf03 wt01 null 9.9

TsFile-Spark-Connector 可以在 SparkSQL By SparkSQL 中将一个或多个 TsFiles 显示为表。它还允许用户指定单个目录或使用通配符来匹配多个目录。如果有多个 TsFiles,则所有 TsFiles 中测量值的并集将保留在表中,并且默认情况下,具有相同名称的度量将具有相同的数据类型。请注意,如果存在名称相同但数据类型不同的情况,TsFile-Spark-Connector 将不保证结果的正确性。

写入过程是将数据帧写入为一个或多个 TsFiles。默认情况下,需要包含两列:时间和delta_object。其余列用作度量。

