Apache IoTDB开发系统之TsFile API

举报
小云悠悠zZ 发表于 2023/09/11 10:25:10 2023/09/11
【摘要】 时间序列被视为四元组序列。四重定义为(设备、测量、时间、值)。 测量:时间序列正在采取的物理或形式测量,例如,城市的温度、某些商品的销售数量或不同时间的火车速度。由于传统的传感器(如温度计)也进行单次测量并产生时间序列,因此我们将在下面互换使用测量和传感器。 设备:设备是指进行多次测量(产生多个时间序列)的实体,例如,正在运行的火车监控其速度、油表、运行里程、当前乘客每个被传送到一个时间序列。

TsFile是我们在IoTDB中使用的时间序列的文件格式。在本节中,我们想介绍这种文件格式的用法。

Ts文件库安装

有两种方法可以在您自己的项目中使用 TsFile。

  • Using as jars:

    • 编译源代码并构建到 jar

      1. git clone https://github.com/apache/incubator-iotdb.git
      2. cd tsfile/
      3. mvn clean package -Dmaven.test.skip=true

      然后,所有 jar 都可以在名为 的文件夹中获取。导入到您的项目。target/target/tsfile-0.10.0-jar-with-dependencies.jar

  • Using as a maven dependency:

    编译源代码并通过三个步骤部署到本地存储库:

    • 获取源代码

      1. git clone https://github.com/apache/incubator-iotdb.git
    • 编译源代码并部署

      1. cd tsfile/
      2. mvn clean install -Dmaven.test.skip=true
    • 将依赖项添加到项目中:

      1. <dependency>
      2. <groupId>org.apache.iotdb</groupId>
      3. <artifactId>tsfile</artifactId>
      4. <version>0.10.0</version>
      5. </dependency>
  1. Or, you can download the dependencies from official Maven repository:
  2. - First, find your maven `settings.xml` on path: `${username}\.m2\settings.xml` , add this `<profile>` to `<profiles>`:
  3. ```
  4. <profile>
  5. <id>allow-snapshots</id>
  6. <activation><activeByDefault>true</activeByDefault></activation>
  7. <repositories>
  8. <repository>
  9. <id>apache.snapshots</id>
  10. <name>Apache Development Snapshot Repository</name>
  11. <url>https://repository.apache.org/content/repositories/snapshots/</url>
  12. <releases>
  13. <enabled>false</enabled>
  14. </releases>
  15. <snapshots>
  16. <enabled>true</enabled>
  17. </snapshots>
  18. </repository>
  19. </repositories>
  20. </profile>
  21. ```
  22. - Then add dependencies into your project:
  23. ```
  24. <dependency>
  25. <groupId>org.apache.iotdb</groupId>
  26. <artifactId>tsfile</artifactId>
  27. <version>0.10.0</version>
  28. </dependency>
  29. ```

TSFile 用法

本节演示 TsFile 的详细用法。

时间序列数据

时间序列被视为四元组序列。四重定义为(设备、测量、时间、值)。

  • 测量:时间序列正在采取的物理或形式测量,例如,城市的温度、某些商品的销售数量或不同时间的火车速度。由于传统的传感器(如温度计)也进行单次测量并产生时间序列,因此我们将在下面互换使用测量和传感器。

  • 设备:设备是指进行多次测量(产生多个时间序列)的实体,例如,正在运行的火车监控其速度、油表、运行里程、当前乘客每个被传送到一个时间序列。

表 1 说明了一组时间序列数据。下表中显示的集包含一个名为“device_1”的设备,其中包含名为“sensor_1”、“sensor_2”和“sensor_3”的三个测量值。

device_1
sensor_1 sensor_2 sensor_3
时间 价值 时间 价值 时间 价值
1 1.2 1 20 2 50
3 1.4 2 20 4 51
5 1.1 3 21 6 52
7 1.8 4 20 8 53

一组时间序列数据

一行数据:在许多工业应用中,一个设备通常包含多个传感器,这些传感器的值可能位于同一时间戳,这称为一行数据

形式上,一行数据由 、 一个表示自 1 年 1970 月 00 日 00:00:<> 以来的毫秒的时间戳和由 和 对应的几个数据对组成。一行中的所有数据对都属于此行,并且具有相同的时间戳。如果其中一个在 中没有 ,请改用空格(实际上,TsFile 不存储空值)。其格式如下所示:device_idmeasurement_idvaluedevice_idmeasurementsvaluetimestamp

  1. device_id, timestamp, <measurement_id, value>...

示例如下所示。在此示例中,两个度量的数据类型分别为 。INT32FLOAT

  1. device_1, 1490860659000, m1, 10, m2, 12.12

正在写入 Ts文件

生成 tsfile 文件。

可以通过以下三个步骤生成 TsFile,完整的代码将在“编写 TsFile 的示例”一节中给出。

  • 首先,构造一个实例。TsFileWriter

    以下是可用的构造函数:

    • 没有预定义的架构
    1. public TsFileWriter(File file) throws IOException
    • 使用预定义的架构
    1. public TsFileWriter(File file, Schema schema) throws IOException

    这个用于使用 HDFS 文件系统。 可以是类 的实例。TsFileOutputHDFSOutput

    1. public TsFileWriter(TsFileOutput output, Schema schema) throws IOException

    如果你想自己设置一些 TSFile 配置,你可以使用 param 。例如:config

    1. TSFileConfig conf = new TSFileConfig();
    2. conf.setTSFileStorageFs("HDFS");
    3. TsFileWriter tsFileWriter = new TsFileWriter(file, schema, conf);

    在此示例中,数据文件将存储在 HDFS 中,而不是本地文件系统中。如果要将数据文件存储在本地文件系统中,可以使用 ,这也是默认配置。conf.setTSFileStorageFs("LOCAL")

    您还可以通过 和 配置 HDFS 的 IP 和端口。默认 ip 为 ,默认端口为 。config.setHdfsIp(...)config.setHdfsPort(...)localhost9000

    参数:

    • 文件 : 要写入的 Ts文件

    • 架构 :文件架构将在下一部分介绍。

    • config : TsFile 的配置。

  • 第二,添加测量值

    或者你可以先创建一个类的实例,然后将其传递给类的构造函数SchemaTsFileWriter

    该类包含一个映射,其键是一个度量架构的名称,值是架构本身。Schema

    以下是接口:

    1. // Create an empty Schema or from an existing map
    2. public Schema()
    3. public Schema(Map<String, MeasurementSchema> measurements)
    4. // Use this two interfaces to add measurements
    5. public void registerMeasurement(MeasurementSchema descriptor)
    6. public void registerMeasurements(Map<String, MeasurementSchema> measurements)
    7. // Some useful getter and checker
    8. public TSDataType getMeasurementDataType(String measurementId)
    9. public MeasurementSchema getMeasurementSchema(String measurementId)
    10. public Map<String, MeasurementSchema> getAllMeasurementSchema()
    11. public boolean hasMeasurement(String measurementId)

    您始终可以在类中使用以下接口来添加其他测量:TsFileWriter

    1. public void addMeasurement(MeasurementSchema measurementSchema) throws WriteProcessException

    该类包含一个度量的信息,有几个构造函数:MeasurementSchema

    1. public MeasurementSchema(String measurementId, TSDataType type, TSEncoding encoding)
    2. public MeasurementSchema(String measurementId, TSDataType type, TSEncoding encoding, CompressionType compressionType)
    3. public MeasurementSchema(String measurementId, TSDataType type, TSEncoding encoding, CompressionType compressionType,
    4. Map<String, String> props)

    参数:

    • 测量 ID:此测量的名称,通常是传感器的名称。

    • 类型:数据类型,现在支持六种类型:BOOLEANINT32INT64FLOATDOUBLETEXT;

    • 编码:数据编码。看第2-3章.

    • 压缩:数据压缩。现在支持和 .UNCOMPRESSEDSNAPPY

    • props:特殊数据类型的属性。如 for 和 , for 。用作映射中的字符串对,例如 (“max_point_number”、“3”)。max_point_numberFLOATDOUBLEmax_string_lengthTEXT

  1. > **Notice:** Although one measurement name can be used in multiple deltaObjects, the properties cannot be changed. I.e. it's not allowed to add one measurement name for multiple times with different type or encoding. Here is a bad example:
  2. ```
  3. // The measurement "sensor_1" is float type
  4. addMeasurement(new MeasurementSchema("sensor_1", TSDataType.FLOAT, TSEncoding.RLE));
  5. // This call will throw a WriteProcessException exception
  6. addMeasurement(new MeasurementSchema("sensor_1", TSDataType.INT32, TSEncoding.RLE));
  7. ```
  • 第三,不断插入和写入数据。

    使用此接口创建新的(时间戳和设备对)。TSRecord

    1. public TSRecord(long timestamp, String deviceId)

    然后创建一个(度量值和值对),并使用 addTuple 方法将数据点添加到正确的 TsRecord。DataPoint

    使用此方法写入

    1. public void write(TSRecord record) throws IOException, WriteProcessException
  • 最后,打电话完成这个写作过程。close

    1. public void close() throws IOException

我们还能够将数据写入封闭的 TsFile。

  • 用于打开已关闭的文件。ForceAppendTsFileWriter
  1. public ForceAppendTsFileWriter(File file) throws IOException
  • 调用截断元数据的一部分doTruncate

  • 然后用于构造一个新的ForceAppendTsFileWriterTsFileWriter

  1. public TsFileWriter(TsFileIOWriter fileWriter) throws IOException

请注意,在将新数据写入 TsFile 之前,我们应该重做添加测量值的步骤。

写入 TsFile 的示例

您应该将 TsFile 安装到本地 maven 存储库中。

  1. mvn clean install -pl tsfile -am -DskipTests

如果您有未对齐(例如,并非所有传感器都包含值)时间序列数据,则可以通过构造 TSRecord 来编写 TsFile。

更全面的例子可以在/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileWriteWithTSRecord.java

如果您有对齐的时间序列数据,则可以通过构造平板电脑来编写 TsFile。

更全面的例子可以在/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileWriteWithTablet.java

您可以使用 ForceAppendTsFileWriter 将数据写入关闭的 TsFile。

更全面的例子可以在/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileForceAppendWrite.java

用于读取 TsFile 的接口

开始之前

此处使用“时间序列数据”部分中的时间序列数据集进行本节的具体介绍。下表中显示的集合包含一个名为“device_1”的 deltaObject,其中包含名为“sensor_1”、“sensor_2”和“sensor_3”的三个测量值。测量已简化为一个简单的说明,每个图仅包含 4 个时间值对。

device_1
sensor_1 sensor_2 sensor_3
时间 价值 时间 价值 时间 价值
1 1.2 1 20 2 50
3 1.4 2 20 4 51
5 1.1 3 21 6 52
7 1.8 4 20 8 53

一组时间序列数据

路径的定义

路径是一个点分隔的字符串,它唯一标识 TsFile 中的时间序列,例如“root.area_1.device_1.sensor_1”。最后一部分“sensor_1”称为“measurementId”,而其余部分“root.area_1.device_1”称为deviceId。如上所述,不同设备中的相同测量具有相同的数据类型和编码,并且设备也是唯一的。

在读取接口中,该参数指示要选择的测量值。paths

路径实例可以通过类轻松构造。例如:Path

  1. Path p = new Path("device_1.sensor_1");

我们将为最终查询调用传递路径的 ArrayList 以支持多个路径。

  1. List<Path> paths = new ArrayList<Path>();
  2. paths.add(new Path("device_1.sensor_1"));
  3. paths.add(new Path("device_1.sensor_3"));

通知:构造 Path 时,参数的格式应为点分隔字符串,最后一部分将被识别为 measurementId,而其余部分将被识别为 deviceId。

过滤器的定义

使用场景

过滤器在 TsFile 读取过程中用于选择满足一个或多个给定条件的数据。

IExpression

这是一个过滤器表达式接口,它将传递给我们的最终查询调用。我们创建一个或多个过滤器表达式,并可能使用二进制过滤器运算符将它们链接到我们的最终表达式。IExpression

  • 创建筛选表达式

    有两种类型的过滤器。

    • 时间筛选器:用于时间序列数据的筛选器。time

      1. IExpression timeFilterExpr = new GlobalTimeExpression(TimeFilter);

      使用以下关系获取对象(值是长整型变量)。TimeFilter

      关系 描述
      TimeFilter.eq(value) 选择等于值的时间
      TimeFilter.lt(值) 选择小于值的时间
      TimeFilter.gt(值) 选择大于值的时间
      TimeFilter.ltEq(value) 选择小于或等于值的时间
      TimeFilter.gtEq(value) 选择大于或等于该值的时间
      TimeFilter.notEq(value) 选择不等于值的时间
      TimeFilter.not(TimeFilter) 选择不满足另一个时间过滤器的时间
    • 值筛选器:用于时序数据的筛选器。value

      1. IExpression valueFilterExpr = new SingleSeriesExpression(Path, ValueFilter);

      的用法与使用 相同,只是为了确保值的类型等于测量值的类型(在路径中定义)。ValueFilterTimeFilter

  • 二进制筛选器运算符

    二进制筛选器运算符可用于链接两个单个表达式。

    • BinaryExpression.and(Expression, Expression):为两个表达式选择满足值。
    • BinaryExpression.or(Expression, Expression):选择至少一个表达式的值 ati。
筛选器表达式示例
  • 时间过滤器表达式示例

    1. IExpression timeFilterExpr = new GlobalTimeExpression(TimeFilter.eq(15)); // series time = 15
    1. IExpression timeFilterExpr = new GlobalTimeExpression(TimeFilter.ltEq(15)); // series time <= 15
    1. IExpression timeFilterExpr = new GlobalTimeExpression(TimeFilter.lt(15)); // series time < 15
    1. IExpression timeFilterExpr = new GlobalTimeExpression(TimeFilter.gtEq(15)); // series time >= 15
    1. IExpression timeFilterExpr = new GlobalTimeExpression(TimeFilter.notEq(15)); // series time != 15
    1. IExpression timeFilterExpr = BinaryExpression.and(new GlobalTimeExpression(TimeFilter.gtEq(15L)),
    2. new GlobalTimeExpression(TimeFilter.lt(25L))); // 15 <= series time < 25
    1. IExpression timeFilterExpr = BinaryExpression.or(new GlobalTimeExpression(TimeFilter.gtEq(15L)),
    2. new GlobalTimeExpression(TimeFilter.lt(25L))); // series time >= 15 or series time < 25

读取接口

首先,我们打开 TsFile 并从 文件路径字符串 .ReadOnlyTsFilepath

  1. TsFileSequenceReader reader = new TsFileSequenceReader(path);
  2. ReadOnlyTsFile readTsFile = new ReadOnlyTsFile(reader);

接下来,我们准备路径数组和查询表达式,然后通过此接口获取最终对象:QueryExpression

  1. QueryExpression queryExpression = QueryExpression.create(paths, statement);

类有两种方法来执行查询。query

  • 方法 1

    1. public QueryDataSet query(QueryExpression queryExpression) throws IOException
  • 方法 2

    1. public QueryDataSet query(QueryExpression queryExpression, long partitionStartOffset, long partitionEndOffset) throws IOException

    此方法专为高级应用程序(如 TsFile-Spark 连接器)而设计。

    • params :对于方法 2,添加了两个附加参数以支持部分查询:

      • partitionStartOffset:TsFile 的起始偏移量
      • partitionEndOffset:TsFile 的结束偏移量

      什么是部分查询?

      在一些分布式文件系统(例如HDFS)中,文件被分成几个部分,称为“块”并存储在不同的节点中。在所涉及的每个节点中并行执行查询可以提高效率。因此需要部分查询。Paritial Query 仅选择存储在由 TsFile 分割的零件中的结果。QueryConstant.PARTITION_START_OFFSETQueryConstant.PARTITION_END_OFFSET

查询数据集接口

上面执行的查询将返回一个对象。QueryDataset

这是对用户有用的界面。

  • bool hasNext();

    如果此数据集仍有元素,则返回 true。

  • List<Path> getPaths()

    获取此数据集中的路径。

  • List<TSDataType> getDataTypes();

    获取数据类型。类 TSDataType 是一个枚举类,其值将是以下值之一:

    1. BOOLEAN,
    2. INT32,
    3. INT64,
    4. FLOAT,
    5. DOUBLE,
    6. TEXT;
  • RowRecord next() throws IOException;

    获取下一条记录。

    该类由时间戳和不同传感器中的数据组成,我们可以使用两种 getter 方法来获取它们。RowRecordlongList<Field>

    1. long getTimestamp();
    2. List<Field> getFields();

    要从一个字段获取数据,请使用以下方法:

    1. TSDataType getDataType();
    2. Object getObjectValue();

读取现有 TsFile 的示例

您应该将 TsFile 安装到本地 maven 存储库中。

有关查询语句的更全面示例,请参见/tsfile/example/src/main/java/org/apache/iotdb/tsfile/TsFileRead.java

  1. package org.apache.iotdb.tsfile;
  2. import java.io.IOException;
  3. import java.util.ArrayList;
  4. import org.apache.iotdb.tsfile.read.ReadOnlyTsFile;
  5. import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
  6. import org.apache.iotdb.tsfile.read.common.Path;
  7. import org.apache.iotdb.tsfile.read.expression.IExpression;
  8. import org.apache.iotdb.tsfile.read.expression.QueryExpression;
  9. import org.apache.iotdb.tsfile.read.expression.impl.BinaryExpression;
  10. import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
  11. import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
  12. import org.apache.iotdb.tsfile.read.filter.TimeFilter;
  13. import org.apache.iotdb.tsfile.read.filter.ValueFilter;
  14. import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
  15. /**
  16. * The class is to show how to read TsFile file named "test.tsfile".
  17. * The TsFile file "test.tsfile" is generated from class TsFileWrite.
  18. * Run TsFileWrite to generate the test.tsfile first
  19. */
  20. public class TsFileRead {
  21. private static void queryAndPrint(ArrayList<Path> paths, ReadOnlyTsFile readTsFile, IExpression statement)
  22. throws IOException {
  23. QueryExpression queryExpression = QueryExpression.create(paths, statement);
  24. QueryDataSet queryDataSet = readTsFile.query(queryExpression);
  25. while (queryDataSet.hasNext()) {
  26. System.out.println(queryDataSet.next());
  27. }
  28. System.out.println("------------");
  29. }
  30. public static void main(String[] args) throws IOException {
  31. // file path
  32. String path = "test.tsfile";
  33. // create reader and get the readTsFile interface
  34. TsFileSequenceReader reader = new TsFileSequenceReader(path);
  35. ReadOnlyTsFile readTsFile = new ReadOnlyTsFile(reader);
  36. // use these paths(all sensors) for all the queries
  37. ArrayList<Path> paths = new ArrayList<>();
  38. paths.add(new Path("device_1.sensor_1"));
  39. paths.add(new Path("device_1.sensor_2"));
  40. paths.add(new Path("device_1.sensor_3"));
  41. // no query statement
  42. queryAndPrint(paths, readTsFile, null);
  43. //close the reader when you left
  44. reader.close();
  45. }
  46. }

更改 Ts文件配置

  1. TSFileConfig config = TSFileDescriptor.getInstance().getConfig();
  2. config.setXXX();
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。