From 4b04738532c8de2ca277630220dc57602330dc56 Mon Sep 17 00:00:00 2001 From: Zhijia Cao Date: Fri, 20 Dec 2024 19:16:38 +0800 Subject: [PATCH 1/5] rebuild java api docs --- .../latest/API/Programming-Java-Native-API.md | 1290 ++++++++--------- 1 file changed, 629 insertions(+), 661 deletions(-) diff --git a/src/zh/UserGuide/latest/API/Programming-Java-Native-API.md b/src/zh/UserGuide/latest/API/Programming-Java-Native-API.md index 8c68005d8..4668c4e77 100644 --- a/src/zh/UserGuide/latest/API/Programming-Java-Native-API.md +++ b/src/zh/UserGuide/latest/API/Programming-Java-Native-API.md @@ -1,511 +1,581 @@ -# Java 原生接口 +# Java编程接口 + +## Session原生API -## 安装 +IoTDB 原生 API 中的 Session 是实现与数据库交互的核心接口,它集成了丰富的方法,支持数据写入、查询以及元数据操作等功能。通过实例化 Session,能够建立与 IoTDB 服务器的连接,在该连接所构建的环境中执行各类数据库操作。 -### 依赖 +Session为单线程安全模型,如实际的应用场景为多线程并发场景,强烈推荐使用SessionPool编程。SessionPool 是 Session 的池化形式,专门针对多线程并发场景进行了优化,在多线程并发的情形下,SessionPool 能够合理地管理和分配连接资源,以提升系统性能与资源利用效率。 -* JDK >= 1.8 -* Maven >= 3.6 +### 步骤概览 -### 在 MAVEN 中使用原生接口 +使用SessionPool的核心步骤: +1. 创建会话池实例:初始化一个SessionPool对象,用于管理多个Session实例。 +2. 执行操作:直接从SessionPool中获取Session实例,并执行数据库操作,无需每次都打开和关闭连接。 +3. 关闭会话池资源:在不再需要进行数据库操作时,关闭SessionPool,释放所有相关资源。 + +### 详细步骤 +本章节用于说明开发的核心流程,并未演示所有的参数和接口,如需了解全部功能及参数请参见: [附录](./Programming-Java-Native-API.md#附录) 或 查阅: [源码](https://github.com/apache/iotdb/tree/master/example/session/src/main/java/org/apache/iotdb) + +#### 1. 创建maven项目 +创建一个maven项目,并导入以下依赖(JDK >= 1.8, Maven >= 3.6) ```xml org.apache.iotdb iotdb-session + ${project.version} ``` - -## 语法说明 - - - 对于 IoTDB-SQL 接口:传入的 SQL 参数需要符合 [语法规范](../User-Manual/Syntax-Rule.md#字面值常量) ,并且针对 JAVA 字符串进行反转义,如双引号前需要加反斜杠。(即:经 JAVA 转义之后与命令行执行的 SQL 语句一致。) - - 对于其他接口: - - 经参数传入的路径或路径前缀中的节点: 在 SQL 语句中需要使用反引号(`)进行转义的,此处均需要进行转义。 - - 经参数传入的标识符(如模板名):在 SQL 语句中需要使用反引号(`)进行转义的,均可以不用进行转义。 - - 语法说明相关代码示例可以参考:`example/session/src/main/java/org/apache/iotdb/SyntaxConventionRelatedExample.java` - -## 基本接口说明 - -下面将给出 Session 对应的接口的简要介绍和对应参数: - -### Session管理 - -* 初始化 Session - -``` java -// 全部使用默认配置 -session = new Session.Builder.build(); - -// 指定一个可连接节点 -session = - new Session.Builder() - .host(String host) - .port(int port) - .build(); - -// 指定多个可连接节点 -session = - new Session.Builder() - .nodeUrls(List nodeUrls) - .build(); - -// 其他配置项 -session = - new Session.Builder() - .fetchSize(int fetchSize) - .username(String username) - .password(String password) - .thriftDefaultBufferSize(int thriftDefaultBufferSize) - .thriftMaxFrameSize(int thriftMaxFrameSize) - .enableRedirection(boolean enableRedirection) - .version(Version version) - .build(); -``` - -其中,version 表示客户端使用的 SQL 语义版本,用于升级 0.13 时兼容 0.12 的 SQL 语义,可能取值有:`V_0_12`、`V_0_13`、`V_1_0`等。 - - -* 开启 Session - -``` java -void open() -``` - -* 开启 Session,并决定是否开启 RPC 压缩 - -``` java -void open(boolean enableRPCCompression) -``` - -注意: 客户端的 RPC 压缩开启状态需和服务端一致 - -* 关闭 Session - -``` java -void close() +#### 2. 创建会话实例 + +```java +public class IoTDBSessionExample { + public static void main(String[] args) { + // Using nodeUrls ensures that when one node goes down, other nodes are automatically connected to retry + List nodeUrls = new ArrayList<>(); + nodeUrls.add("127.0.0.1:6667"); + nodeUrls.add("127.0.0.1:6668"); + sessionPool = + new SessionPool.Builder() + .nodeUrls(nodeUrls) + .user("root") + .password("root") + .maxSize(3) + .build(); + } +} ``` -* SessionPool - -我们提供了一个针对原生接口的连接池 (`SessionPool`),使用该接口时,你只需要指定连接池的大小,就可以在使用时从池中获取连接。 -如果超过 60s 都没得到一个连接的话,那么会打印一条警告日志,但是程序仍将继续等待。 - -当一个连接被用完后,他会自动返回池中等待下次被使用; -当一个连接损坏后,他会从池中被删除,并重建一个连接重新执行用户的操作; -你还可以像创建 Session 那样在创建 SessionPool 时指定多个可连接节点的 url,以保证分布式集群中客户端的高可用性。 - -对于查询操作: +#### 执行数据库操作 +##### 数据写入 +在工业场景中,数据写入可以根据设备数量、写入频率和数据类型分为以下几类:多(单)设备同一时刻写入、单设备不同时刻数据(攒批)写入。业务允许的情况下,推荐攒批写入以提高写入效率。下面按不同场景对写入接口进行介绍。 -1. 使用 SessionPool 进行查询时,得到的结果集是`SessionDataSet`的封装类`SessionDataSetWrapper`; -2. 若对于一个查询的结果集,用户并没有遍历完且不再想继续遍历时,需要手动调用释放连接的操作`closeResultSet`; -3. 若对一个查询的结果集遍历时出现异常,也需要手动调用释放连接的操作`closeResultSet`. -4. 可以调用 `SessionDataSetWrapper` 的 `getColumnNames()` 方法得到结果集列名 +###### 多(单)设备同一时刻写入 +场景:多(单)个设备的实时状态或传感器数据批量写入,特点是采集一次上传一次。 -使用示例可以参见 `session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java` +适用接口: -或 `example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java` +| 接口名称 | 功能描述 | +|------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------| +| `insertRecords(List deviceIds, List times, List> measurementsList, List> typesList, List> valuesList)` | 插入多个设备,每个设备多个测点的一个时刻的记录 | +| `insertRecords(List deviceIds, List times, List> measurementsList, List> valuesList)` |同上,不需要指定数据类型,会根据传入的值进行推断。推断规则可在服务端配置,详细配置在iotdb-system.properties.template中的搜索`infer_type`关键字 | +| `insertAlignedRecords(List deviceIds, List times, List> measurementsList, List> typesList, List> valuesList)` | 插入多个设备,每个设备多个测点的一个时刻的记录。每个设备为对齐设备 | +| `insertAlignedRecords(List deviceIds, List times, List> measurementsList, List> valuesList)` | 同上,不需要指定数据类型,会根据传入的值进行推断。推断规则可在服务端配置,详细配置在iotdb-system.properties.template中的搜索`infer_type`关键字 | -### 测点管理接口 +代码案例: +```java +public class SessionPoolExample { -#### Database 管理 + private static SessionPool sessionPool; -* 设置 database + public static void main(String[] args) { -``` java -void setStorageGroup(String storageGroupId) -``` - -* 删除单个或多个 database - -``` java -void deleteStorageGroup(String storageGroup) -void deleteStorageGroups(List storageGroups) -``` -#### 时间序列管理 - -* 创建单个或多个时间序列 - -``` java -void createTimeseries(String path, TSDataType dataType, - TSEncoding encoding, CompressionType compressor, Map props, - Map tags, Map attributes, String measurementAlias) - -void createMultiTimeseries(List paths, List dataTypes, - List encodings, List compressors, - List> propsList, List> tagsList, - List> attributesList, List measurementAliasList) -``` - -* 创建对齐时间序列 + // 1. init SessionPool + constructSessionPool(); -``` -void createAlignedTimeseries(String prefixPath, List measurements, - List dataTypes, List encodings, - List compressors, List measurementAliasList); -``` - -注意:目前**暂不支持**使用传感器别名。 - -* 删除一个或多个时间序列 - -``` java -void deleteTimeseries(String path) -void deleteTimeseries(List paths) -``` + // 2. execute insert data + insertRecordsExample(); -* 检测时间序列是否存在 + // 3. close SessionPool + closeSessionPool(); -``` java -boolean checkTimeseriesExists(String path) -``` - -#### 元数据模版 - -* 创建元数据模板,可以通过先后创建 Template、MeasurementNode 的对象,描述模板内物理量结构与类型、编码方式、压缩方式等信息,并通过以下接口创建模板 + } -``` java -public void createSchemaTemplate(Template template); + private static void constructSessionPool() { + // Using nodeUrls ensures that when one node goes down, other nodes are automatically connected to retry + List nodeUrls = new ArrayList<>(); + nodeUrls.add("127.0.0.1:6667"); + nodeUrls.add("127.0.0.1:6668"); + sessionPool = + new SessionPool.Builder() + .nodeUrls(nodeUrls) + .user("root") + .password("root") + .maxSize(3) + .build(); + } -Class Template { - private String name; - private boolean directShareTime; - Map children; - public Template(String name, boolean isShareTime); - - public void addToTemplate(Node node); - public void deleteFromTemplate(String name); - public void setShareTime(boolean shareTime); -} + public static void insertRecordsExample(){ + String deviceId = "root.sg1.d1"; + List measurements = new ArrayList<>(); + measurements.add("s1"); + measurements.add("s2"); + measurements.add("s3"); + List deviceIds = new ArrayList<>(); + List> measurementsList = new ArrayList<>(); + List> valuesList = new ArrayList<>(); + List timestamps = new ArrayList<>(); + List> typesList = new ArrayList<>(); + + for (long time = 0; time < 500; time++) { + List values = new ArrayList<>(); + List types = new ArrayList<>(); + values.add(1L); + values.add(2L); + values.add(3L); + types.add(TSDataType.INT64); + types.add(TSDataType.INT64); + types.add(TSDataType.INT64); + + deviceIds.add(deviceId); + measurementsList.add(measurements); + valuesList.add(values); + typesList.add(types); + timestamps.add(time); + if (time != 0 && time % 100 == 0) { + try { + sessionPool.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList); + } catch (IoTDBConnectionException | StatementExecutionException e) { + // solve exception + } + deviceIds.clear(); + measurementsList.clear(); + valuesList.clear(); + typesList.clear(); + timestamps.clear(); + } + } -Abstract Class Node { - private String name; - public void addChild(Node node); - public void deleteChild(Node node); -} + try { + sessionPool.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList); + } catch (IoTDBConnectionException | StatementExecutionException e) { + // solve exception + } + } -Class MeasurementNode extends Node { - TSDataType dataType; - TSEncoding encoding; - CompressionType compressor; - public MeasurementNode(String name, - TSDataType dataType, - TSEncoding encoding, - CompressionType compressor); + public static void closeSessionPool(){ + sessionPool.close(); + } } ``` -通过上述类的实例描述模板时,Template 内应当仅能包含单层的 MeasurementNode,具体可以参见如下示例: +###### 批量数据上传 +场景:多个设备的大量数据同时上传,适合大规模分布式数据接入。 -``` java -MeasurementNode nodeX = new MeasurementNode("x", TSDataType.FLOAT, TSEncoding.RLE, CompressionType.SNAPPY); -MeasurementNode nodeY = new MeasurementNode("y", TSDataType.FLOAT, TSEncoding.RLE, CompressionType.SNAPPY); -MeasurementNode nodeSpeed = new MeasurementNode("speed", TSDataType.DOUBLE, TSEncoding.GORILLA, CompressionType.SNAPPY); +适用接口: -// This is the template we suggest to implement -Template flatTemplate = new Template("flatTemplate"); -template.addToTemplate(nodeX); -template.addToTemplate(nodeY); -template.addToTemplate(nodeSpeed); +| 接口名称 | 功能描述 | +|-----------------------------------------------------------------------------------------|------------------------------------------| +| `insertTablet(Tablet tablet)` | 插入单个设备多个测点,每个测点多个时刻的数据 | +| `insertAlignedTablet(Tablet tablet)` | 插入单个设备多个测点,每个测点多个时刻的数据,该设备为对齐设备 | -createSchemaTemplate(flatTemplate); -``` - -* 完成模板挂载操作后,可以通过如下的接口在给定的设备上使用模板注册序列,或者也可以直接向相应的设备写入数据以自动使用模板注册序列。 +代码案例: +```java +public class SessionPoolExample { -``` java -void createTimeseriesUsingSchemaTemplate(List devicePathList) -``` + private static SessionPool sessionPool; -* 将名为'templateName'的元数据模板挂载到'prefixPath'路径下,在执行这一步之前,你需要创建名为'templateName'的元数据模板 -* **请注意,我们强烈建议您将模板设置在 database 或 database 下层的节点中,以更好地适配未来版本更新及各模块的协作** + public static void main(String[] args) throws IoTDBConnectionException, StatementExecutionException { -``` java -void setSchemaTemplate(String templateName, String prefixPath) -``` + // 1. init SessionPool + constructSessionPool(); -- 将模板挂载到 MTree 上之后,你可以随时查询所有模板的名称、某模板被设置到 MTree 的所有路径、所有正在使用某模板的所有路径,即如下接口: + // 2. execute insert data + insertTabletExample(); -``` java -/** @return All template names. */ -public List showAllTemplates(); + // 3. close SessionPool + closeSessionPool(); -/** @return All paths have been set to designated template. */ -public List showPathsTemplateSetOn(String templateName); + } -/** @return All paths are using designated template. */ -public List showPathsTemplateUsingOn(String templateName) -``` + private static void constructSessionPool() { + // Using nodeUrls ensures that when one node goes down, other nodes are automatically connected to retry + List nodeUrls = new ArrayList<>(); + nodeUrls.add("127.0.0.1:6667"); + nodeUrls.add("127.0.0.1:6668"); + sessionPool = + new SessionPool.Builder() + .nodeUrls(nodeUrls) + .user("root") + .password("root") + .maxSize(3) + .build(); + } -- 如果你需要删除某一个模板,请确保在进行删除之前,MTree 上已经没有节点被挂载了模板,对于已经被挂载模板的节点,可以用如下接口卸载模板; + private static void insertTabletExample() throws IoTDBConnectionException, StatementExecutionException { + /* + * A Tablet example: + * device1 + * time s1, s2, s3 + * 1, 1, 1, 1 + * 2, 2, 2, 2 + * 3, 3, 3, 3 + */ + // The schema of measurements of one device + // only measurementId and data type in MeasurementSchema take effects in Tablet + List schemaList = new ArrayList<>(); + schemaList.add(new MeasurementSchema("s1", TSDataType.INT64)); + schemaList.add(new MeasurementSchema("s2", TSDataType.INT64)); + schemaList.add(new MeasurementSchema("s3", TSDataType.INT64)); + + Tablet tablet = new Tablet("root.sg.d1", schemaList, 100); + + // Method 1 to add tablet data + long timestamp = System.currentTimeMillis(); + + Random random = new Random(); + for (long row = 0; row < 100; row++) { + int rowIndex = tablet.rowSize++; + tablet.addTimestamp(rowIndex, timestamp); + for (int s = 0; s < 3; s++) { + long value = random.nextLong(); + tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value); + } + if (tablet.rowSize == tablet.getMaxRowNumber()) { + sessionPool.insertTablet(tablet); + tablet.reset(); + } + timestamp++; + } + if (tablet.rowSize != 0) { + sessionPool.insertTablet(tablet); + tablet.reset(); + } + } -``` java -void unsetSchemaTemplate(String prefixPath, String templateName); -public void dropSchemaTemplate(String templateName); -``` -* 请注意,如果一个子树中有多个孩子节点需要使用模板,可以在其共同父母节点上使用 setSchemaTemplate 。而只有在已有数据点插入模板对应的物理量时,模板才会被设置为激活状态,进而被 show timeseries 等查询检测到。 -* 卸载'prefixPath'路径下的名为'templateName'的元数据模板。你需要保证给定的路径'prefixPath'下需要有名为'templateName'的元数据模板。 - -注意:目前不支持从曾经在'prefixPath'路径及其后代节点使用模板插入数据后(即使数据已被删除)卸载模板。 - - -### 数据写入接口 - -推荐使用 insertTablet 帮助提高写入效率 - -* 插入一个 Tablet,Tablet 是一个设备若干行数据块,每一行的列都相同 - * **写入效率高** - * **支持批量写入** - * **支持写入空值**:空值处可以填入任意值,然后通过 BitMap 标记空值 - -``` java -void insertTablet(Tablet tablet) - -public class Tablet { - /** deviceId of this tablet */ - public String prefixPath; - /** the list of measurement schemas for creating the tablet */ - private List schemas; - /** timestamps in this tablet */ - public long[] timestamps; - /** each object is a primitive type array, which represents values of one measurement */ - public Object[] values; - /** each bitmap represents the existence of each value in the current column. */ - public BitMap[] bitMaps; - /** the number of rows to include in this tablet */ - public int rowSize; - /** the maximum number of rows for this tablet */ - private int maxRowNumber; - /** whether this tablet store data of aligned timeseries or not */ - private boolean isAligned; + public static void closeSessionPool(){ + sessionPool.close(); + } } ``` -* 插入多个 Tablet +##### 数据查询 +```java +public class SessionPoolExample { -``` java -void insertTablets(Map tablets) -``` - -* 插入一个 Record,一个 Record 是一个设备一个时间戳下多个测点的数据。这里的 value 是 Object 类型,相当于提供了一个公用接口,后面可以通过 TSDataType 将 value 强转为原类型 - - 其中,Object 类型与 TSDataType 类型的对应关系如下表所示: - - | TSDataType | Object | - |------------|--------------| - | BOOLEAN | Boolean | - | INT32 | Integer | - | DATE | LocalDate | - | INT64 | Long | - | TIMESTAMP | Long | - | FLOAT | Float | - | DOUBLE | Double | - | TEXT | String, Binary | - | STRING | String, Binary | - | BLOB | Binary | - -``` java -void insertRecord(String prefixPath, long time, List measurements, - List types, List values) -``` + private static SessionPool sessionPool; -* 插入多个 Record + public static void main(String[] args) throws IoTDBConnectionException, StatementExecutionException { -``` java -void insertRecords(List deviceIds, - List times, - List> measurementsList, - List> typesList, - List> valuesList) -``` - -* 插入同属于一个 device 的多个 Record + // 1. init SessionPool + constructSessionPool(); -``` java -void insertRecordsOfOneDevice(String deviceId, List times, - List> measurementsList, List> typesList, - List> valuesList) -``` + // 2. executes a non-query SQL statement, such as a DDL or DML command. + executeQueryExample(); -#### 带有类型推断的写入 + // 3. executes a query SQL statement and returns the result set. + executeNonQueryExample(); -当数据均是 String 类型时,我们可以使用如下接口,根据 value 的值进行类型推断。例如:value 为 "true" ,就可以自动推断为布尔类型。value 为 "3.2" ,就可以自动推断为数值类型。服务器需要做类型推断,可能会有额外耗时,速度较无需类型推断的写入慢 + // 4. close SessionPool + closeSessionPool(); -* 插入一个 Record,一个 Record 是一个设备一个时间戳下多个测点的数据 + } -``` java -void insertRecord(String prefixPath, long time, List measurements, List values) -``` -* 插入多个 Record + private static void executeNonQueryExample() throws IoTDBConnectionException, StatementExecutionException { -``` java -void insertRecords(List deviceIds, List times, - List> measurementsList, List> valuesList) -``` + // 1. create time series + sessionPool.executeNonQueryStatement("create timeseries root.test.d1.s1 with dataType = int32"); -* 插入同属于一个 device 的多个 Record - -``` java -void insertStringRecordsOfOneDevice(String deviceId, List times, - List> measurementsList, List> valuesList) -``` + // 2. set ttl + sessionPool.executeNonQueryStatement("set TTL to root.test.** 10000"); -#### 对齐时间序列的写入 + // 3. delete time series + sessionPool.executeNonQueryStatement("delete timeseries root.test.d1.s1"); + } -对齐时间序列的写入使用 insertAlignedXXX 接口,其余与上述接口类似: + + private static void executeQueryExample() throws IoTDBConnectionException, StatementExecutionException { + // 1. execute normal query + try(SessionDataSetWrapper wrapper = sessionPool.executeQueryStatement("select s1 from root.sg1.d1 limit 10")) { + while (wrapper.hasNext()) { + System.out.println(wrapper.next()); + } + } -* insertAlignedRecord -* insertAlignedRecords -* insertAlignedRecordsOfOneDevice -* insertAlignedStringRecordsOfOneDevice -* insertAlignedTablet -* insertAlignedTablets + // 2. execute aggregate query + try(SessionDataSetWrapper wrapper = sessionPool.executeQueryStatement("select count(s1) from root.sg1.d1 group by ([0, 40), 5ms) ")) { + while (wrapper.hasNext()) { + System.out.println(wrapper.next()); + } + } -### 数据删除接口 + } -* 删除一个或多个时间序列在某个时间点前或这个时间点的数据 + private static void constructSessionPool() { + // Using nodeUrls ensures that when one node goes down, other nodes are automatically connected to retry + List nodeUrls = new ArrayList<>(); + nodeUrls.add("127.0.0.1:6667"); + nodeUrls.add("127.0.0.1:6668"); + sessionPool = + new SessionPool.Builder() + .nodeUrls(nodeUrls) + .user("root") + .password("root") + .maxSize(3) + .build(); + } -``` java -void deleteData(String path, long endTime) -void deleteData(List paths, long endTime) + public static void closeSessionPool(){ + sessionPool.close(); + } +} ``` -### 数据查询接口 - -* 时间序列原始数据范围查询: - - 指定的查询时间范围为左闭右开区间,包含开始时间但不包含结束时间。 +##### 数据删除 +```java +public class SessionPoolExample { -``` java -SessionDataSet executeRawDataQuery(List paths, long startTime, long endTime); -``` + private static SessionPool sessionPool; -* 最新点查询: - - 查询最后一条时间戳大于等于某个时间点的数据。 - ``` java - SessionDataSet executeLastDataQuery(List paths, long lastTime); - ``` - - 快速查询单设备下指定序列最新点,支持重定向;如果您确认使用的查询路径是合法的,可将`isLegalPathNodes`置为true以避免路径校验带来的性能损失。 - ``` java - SessionDataSet executeLastDataQueryForOneDevice( - String db, String device, List sensors, boolean isLegalPathNodes); - ``` - -* 聚合查询: - - 支持指定查询时间范围。指定的查询时间范围为左闭右开区间,包含开始时间但不包含结束时间。 - - 支持按照时间区间分段查询。 - -``` java -SessionDataSet executeAggregationQuery(List paths, List aggregations); - -SessionDataSet executeAggregationQuery( - List paths, List aggregations, long startTime, long endTime); - -SessionDataSet executeAggregationQuery( - List paths, - List aggregations, - long startTime, - long endTime, - long interval); - -SessionDataSet executeAggregationQuery( - List paths, - List aggregations, - long startTime, - long endTime, - long interval, - long slidingStep); -``` + public static void main(String[] args) throws IoTDBConnectionException, StatementExecutionException { -* 直接执行查询语句 + // 1. init SessionPool + constructSessionPool(); -``` java -SessionDataSet executeQueryStatement(String sql) -``` + // 2. delete data + deleteDataExample(); -### 数据订阅 + // 3. close SessionPool + closeSessionPool(); -#### 1 Topic 管理 - -IoTDB 订阅客户端中的 `SubscriptionSession` 类提供了 Topic 管理的相关接口。Topic状态变化如下图所示: + } -
- -
+ public static void deleteDataExample() throws IoTDBConnectionException, StatementExecutionException { + // 1. Delete data at a precise point in time + String path = "root.sg.d1.s1"; + long deleteTime = 99; + sessionPool.deleteData(path, deleteTime); -##### 1.1 创建 Topic + // 2. The data of a certain period is deleted + long startTime = 1; + sessionPool.deleteData(Collections.singletonList(path),startTime, deleteTime); -```Java - void createTopicIfNotExists(String topicName, Properties properties) throws Exception; -``` + // 3. Delete a measurement point + sessionPool.deleteTimeseries(path); + } -示例: + private static void constructSessionPool() { + // Using nodeUrls ensures that when one node goes down, other nodes are automatically connected to retry + List nodeUrls = new ArrayList<>(); + nodeUrls.add("127.0.0.1:6667"); + nodeUrls.add("127.0.0.1:6668"); + sessionPool = + new SessionPool.Builder() + .nodeUrls(nodeUrls) + .user("root") + .password("root") + .maxSize(3) + .build(); + } -```Java -try (final SubscriptionSession session = new SubscriptionSession(host, port)) { - session.open(); - final Properties config = new Properties(); - config.put(TopicConstant.PATH_KEY, "root.db.**"); - session.createTopic(topicName, config); + public static void closeSessionPool(){ + sessionPool.close(); + } } ``` -##### 1.2 删除 Topic +## 数据订阅 +IoTDB 提供了强大的数据订阅功能,允许用户通过订阅获取数据更新通知。订阅功能支持多种消费模式,包括拉取模式(Pull)和推送模式(Push)。章节旨在帮助用户理解并使用 IoTDB 的数据订阅功能。详细的功能定义及介绍:[数据订阅](../../User-Manual/Data-Sync_timecho.md#数据同步) -```Java -void dropTopicIfExists(String topicName) throws Exception; -``` +### 核心步骤 -##### 1.3 查看 Topic +1. 创建Topic:创建一个Topic,Topic中包含指定的数据范围。 +2. 订阅Topic:在 consumer 订阅 topic 前,topic 必须已经被创建,否则订阅会失败。如果该 consumer 所在的 consumer group 中已经有 consumers 订阅了相同的 topics,那么该 consumer 将会复用对应的消费进度。 +3. 消费数据:支持Pull和Push两种消费模型。只有显式订阅了某个 topic,才会收到对应 topic 的数据。若在创建后没有订阅任何 topics,此时该 consumer 无法消费到任何数据,即使该 consumer 所在的 consumer group 中其它的 consumers 订阅了一些 topics. +4. 取消订阅: consumer close 时会退出对应的 consumer group,同时自动 unsubscribe 该 consumer 现存订阅的所有 topics. consumer 在 close 后生命周期即结束,无法再重新 open 订阅并消费数据。 -```Java -// 获取所有 topics -Set getTopics() throws Exception; -// 获取单个 topic -Optional getTopic(String topicName) throws Exception; -``` +### 详细步骤 +本章节用于说明开发的核心流程,并未演示所有的参数和接口,如需了解全部功能及参数请参见: [附录](./Programming-Java-Native-API.md#附录) -#### 2 查看订阅状态 -IoTDB 订阅客户端中的 `SubscriptionSession` 类提供了获取订阅状态的相关接口: +#### 1. 创建maven项目 +创建一个maven项目,并导入以下依赖(JDK >= 1.8, Maven >= 3.6) -```Java -Set getSubscriptions() throws Exception; -Set getSubscriptions(final String topicName) throws Exception; +```xml + + + org.apache.iotdb + iotdb-session + + ${project.version} + + ``` -#### 3 创建 Consumer +```java +public class DataConsumerExample { + + public static void main(String[] args) throws IoTDBConnectionException, StatementExecutionException { + try (SubscriptionSession session = new SubscriptionSession("127.0.0.1", 6667)) { + // 1. open session + session.open(); + + // 2. create a topic of all data + Properties sessionConfig = new Properties(); + sessionConfig.put(TopicConstant.PATH_KEY, "root.db.**"); + + session.createTopic("allData", sessionConfig); + + // 3. show all topics + Set topics = session.getTopics(); + System.out.println(topics); + + // 4. show a specific topic + Optional allData = session.getTopic("allData"); + System.out.println(allData.get()); + } + + // 5. create a pull consumer, the subscription is automatically cancelled when the logic in the try resources is completed + Properties consumerConfig = new Properties(); + consumerConfig.put(ConsumerConstant.CONSUMER_ID_KEY, "c1"); + consumerConfig.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg1"); + try (SubscriptionPullConsumer pullConsumer = new SubscriptionPullConsumer(consumerConfig)) { + pullConsumer.open(); + pullConsumer.subscribe("allData"); + int i = 0; + while (i < 100) { + List messages = pullConsumer.poll(10000); + for (SubscriptionMessage message : messages) { + for (SubscriptionSessionDataSet dataSet : message.getSessionDataSetsHandler()) { + System.out.println(dataSet.getColumnNames()); + System.out.println(dataSet.getColumnTypes()); + while (dataSet.hasNext()) { + System.out.println(dataSet.next()); + } + } + } + i++; + } + } + + + // 6. create a push consumer, the subscription is automatically cancelled when the logic in the try resources is completed + try (SubscriptionPushConsumer consumer2 = + new SubscriptionPushConsumer.Builder().consumerId("c2").consumerGroupId("sg2") + .fileSaveDir(System.getProperty("java.io.tmpdir")) + .ackStrategy(AckStrategy.AFTER_CONSUME) + .consumeListener( + message -> { + for (SubscriptionSessionDataSet dataSet : message.getSessionDataSetsHandler()) { + System.out.println(dataSet.getColumnNames()); + System.out.println(dataSet.getColumnTypes()); + while (dataSet.hasNext()) { + System.out.println(dataSet.next()); + } + } + return ConsumeResult.SUCCESS; + }) + .buildPushConsumer()) { + consumer2.open(); + consumer2.subscribe("allData"); + } + } +} -在使用 JAVA 原生接口创建 consumer 时,需要指定 consumer 所应用的参数。 +``` -对于 `SubscriptionPullConsumer` 和 `SubscriptionPushConsumer` 而言,有以下公共配置: +## 附录 +### 参数列表 +#### Session +| 字段名 | 类型 | 说明 | +|--------------------------------|-------------------------------|----------------------------------------------------------------------| +| `nodeUrls` | `List` | 数据库节点的 URL 列表,支持多节点连接 | +| `username` | `String` | 用户名 | +| `password` | `String` | 密码 | +| `fetchSize` | `int` | 查询结果的默认批量返回大小 | +| `useSSL` | `boolean` | 是否启用 SSL | +| `trustStore` | `String` | 信任库路径 | +| `trustStorePwd` | `String` | 信任库密码 | +| `queryTimeoutInMs` | `long` | 查询的超时时间,单位毫秒 | +| `enableRPCCompression` | `boolean` | 是否启用 RPC 压缩 | +| `connectionTimeoutInMs` | `int` | 连接超时时间,单位毫秒 | +| `zoneId` | `ZoneId` | 会话的时区设置 | +| `thriftDefaultBufferSize` | `int` | Thrift 默认缓冲区大小 | +| `thriftMaxFrameSize` | `int` | Thrift 最大帧大小 | +| `defaultEndPoint` | `TEndPoint` | 默认的数据库端点信息 | +| `defaultSessionConnection` | `SessionConnection` | 默认的会话连接对象 | +| `isClosed` | `boolean` | 当前会话是否已关闭 | +| `enableRedirection` | `boolean` | 是否启用重定向功能 | +| `enableRecordsAutoConvertTablet` | `boolean` | 是否启用记录自动转换为 Tablet 的功能 | +| `deviceIdToEndpoint` | `Map` | 设备 ID 和数据库端点的映射关系 | +| `endPointToSessionConnection` | `Map` | 数据库端点和会话连接的映射关系 | +| `executorService` | `ScheduledExecutorService` | 用于定期更新节点列表的线程池 | +| `availableNodes` | `INodeSupplier` | 可用节点的供应器 | +| `enableQueryRedirection` | `boolean` | 是否启用查询重定向功能 | +| `version` | `Version` | 客户端的版本号,用于与服务端的兼容性判断 | +| `enableAutoFetch` | `boolean` | 是否启用自动获取功能 | +| `maxRetryCount` | `int` | 最大重试次数 | +| `retryIntervalInMs` | `long` | 重试的间隔时间,单位毫秒 | +需要额外说明的参数 + +nodeUrls: 多节点 URL 列表,支持自动切换到下一个可用节点。格式为 ip:port。 + +queryTimeoutInMs: 如果为负数,则表示使用服务端默认配置;如果为 0,则禁用查询超时功能。 + +enableRPCCompression: 启用后,RPC 数据传输将启用压缩,适用于高带宽延迟场景。 + +zoneId: 会话时区,可用值参考 Java 的 ZoneId 标准,例如 Asia/Shanghai。 + +#### SessionPool +| 字段名 | 类型 | 说明 | +|--------------------------------|-------------------------------|----------------------------------------------------------------------| +| `host` | `String` | 数据库主机地址 | +| `port` | `int` | 数据库端口 | +| `user` | `String` | 数据库用户名 | +| `password` | `String` | 数据库密码 | +| `nodeUrls` | `List` | 多节点的 URL 列表 | +| `maxSize` | `int` | 连接池的最大连接数 | +| `fetchSize` | `int` | 查询结果的默认批量返回大小 | +| `waitToGetSessionTimeoutInMs` | `long` | 获取连接的等待超时时间(毫秒) | +| `enableCompression` | `boolean` | 是否启用 RPC 压缩 | +| `enableRedirection` | `boolean` | 是否启用重定向功能 | +| `enableRecordsAutoConvertTablet` | `boolean` | 是否启用记录自动转换为 Tablet 的功能 | +| `thriftDefaultBufferSize` | `int` | Thrift 默认缓冲区大小 | +| `thriftMaxFrameSize` | `int` | Thrift 最大帧大小 | +| `queryTimeoutInMs` | `long` | 查询超时时间,单位毫秒 | +| `version` | `Version` | 客户端版本号 | +| `connectionTimeoutInMs` | `int` | 连接超时时间,单位毫秒 | +| `zoneId` | `ZoneId` | 时区设置 | +| `useSSL` | `boolean` | 是否启用 SSL | +| `trustStore` | `String` | 信任库路径 | +| `trustStorePwd` | `String` | 信任库密码 | +| `enableQueryRedirection` | `boolean` | 是否启用查询重定向功能 | +| `executorService` | `ScheduledExecutorService` | 定期更新节点列表的线程池 | +| `availableNodes` | `INodeSupplier` | 可用节点的供应器 | +| `maxRetryCount` | `int` | 最大重试次数 | +| `retryIntervalInMs` | `long` | 重试间隔时间,单位毫秒 | +| `closed` | `boolean` | 当前连接池是否已关闭 | +| `queue` | `ConcurrentLinkedDeque` | 可用会话连接的队列 | +| `occupied` | `ConcurrentMap` | 已占用的会话连接映射 | +| `deviceIdToEndpoint` | `Map` | 设备 ID 到数据库端点的映射 | +| `formattedNodeUrls` | `String` | 格式化后的节点 URL 字符串 | +需要额外说明的字段 + +nodeUrls:一个包含多个节点地址的列表,用于支持集群环境的连接。格式为 ["host1:port1", "host2:port2"]。 + +queue:保存所有可用的会话连接。当需要连接时会从队列中取出。 + +occupied:用于记录正在被占用的连接 + +#### SubscriptionConsumer | 参数 | 是否必填(默认值) | 参数含义 | | :-------------------------------------------- | :--------------------------------- | :----------------------------------------------------------- | @@ -521,10 +591,8 @@ Set getSubscriptions(final String topicName) throws Exception; | fileSaveDir | optional: Paths.get(System.getProperty("user.dir"), "iotdb-subscription").toString() | `String`: consumer 订阅出的 TsFile 文件临时存放的目录路径 | | fileSaveFsync | optional: false | `Boolean`: consumer 订阅 TsFile 的过程中是否主动调用 fsync | +`SubscriptionPushConsumer` 中的特殊配置: -##### 3.1 SubscriptionPushConsumer - -以下为 `SubscriptionPushConsumer` 中的特殊配置: | 参数 | 是否必填(默认值) | 参数含义 | | :-------------------------------------------- | :--------------------------------- | :----------------------------------------------------------- | | ackStrategy | optional: `ACKStrategy.AFTER_CONSUME` | 消费进度的确认机制包含以下选项:`ACKStrategy.BEFORE_CONSUME`(当 consumer 收到数据时立刻提交消费进度,`onReceive` 前)`ACKStrategy.AFTER_CONSUME`(当 consumer 消费完数据再去提交消费进度,`onReceive` 后) | @@ -532,261 +600,161 @@ Set getSubscriptions(final String topicName) throws Exception; | autoPollIntervalMs | optional: 5000 (min: 500) | Long: consumer 自动拉取数据的时间间隔,单位为**毫秒** | | autoPollTimeoutMs | optional: 10000 (min: 1000) | Long: consumer 每次拉取数据的超时时间,单位为**毫秒** | -其中,`ConsumerListener` 接口定义如下: - -```Java -@FunctionInterface -interface ConsumeListener { - default ConsumeResult onReceive(Message message) { - return ConsumeResult.SUCCESS; - } -} - -enum ConsumeResult { - SUCCESS, - FAILURE, -} -``` - -##### 3.2 SubscriptionPullConsumer - -以下为 `SubscriptionPullConsumer` 中的特殊配置: +`SubscriptionPullConsumer` 中的特殊配置: | 参数 | 是否必填(默认值) | 参数含义 | | :-------------------------------------------- | :--------------------------------- | :----------------------------------------------------------- | | autoCommit | optional: true | Boolean: 是否自动提交消费进度如果此参数设置为 false,则需要调用 `commit` 方法来手动提交消费进度 | | autoCommitInterval | optional: 5000 (min: 500) | Long: 自动提交消费进度的时间间隔,单位为**毫秒**仅当 autoCommit 参数为 true 的时候才会生效 | -在创建 consumer 后,需要手动调用 consumer 的 open 方法: - -```Java -void open() throws Exception; -``` - -此时,IoTDB 订阅客户端才会校验 consumer 的配置正确性,在校验成功后 consumer 就会加入对应的 consumer group。也就是说,在打开 consumer 后,才可以使用返回的 consumer 对象进行订阅 Topic,消费数据等操作。 - -#### 4 订阅 Topic - -`SubscriptionPushConsumer` 和 `SubscriptionPullConsumer` 提供了下述 JAVA 原生接口用于订阅 Topics: - -```Java -// 订阅 topics -void subscribe(String topic) throws Exception; -void subscribe(List topics) throws Exception; -``` - -- 在 consumer 订阅 topic 前,topic 必须已经被创建,否则订阅会失败 -- 一个 consumer 在已经订阅了某个 topic 的情况下再次订阅这个 topic,不会报错 -- 如果该 consumer 所在的 consumer group 中已经有 consumers 订阅了相同的 topics,那么该 consumer 将会复用对应的消费进度 - -#### 5 消费数据 - -无论是 push 模式还是 pull 模式的 consumer: - -- 只有显式订阅了某个 topic,才会收到对应 topic 的数据 -- 若在创建后没有订阅任何 topics,此时该 consumer 无法消费到任何数据,即使该 consumer 所在的 consumer group 中其它的 consumers 订阅了一些 topics - -##### 5.1 SubscriptionPushConsumer - -SubscriptionPushConsumer 在订阅 topics 后,无需手动拉取数据,其消费数据的逻辑在创建 SubscriptionPushConsumer 指定的 `consumeListener` 配置中。 - -##### 5.2 SubscriptionPullConsumer - -SubscriptionPullConsumer 在订阅 topics 后,需要主动调用 `poll` 方法拉取数据: - -```Java -List poll(final Duration timeout) throws Exception; -List poll(final long timeoutMs) throws Exception; -List poll(final Set topicNames, final Duration timeout) throws Exception; -List poll(final Set topicNames, final long timeoutMs) throws Exception; -``` - -在 poll 方法中可以指定需要拉取的 topic 名称(如果不指定则默认拉取该 consumer 已订阅的所有 topics)和超时时间。 - -当 SubscriptionPullConsumer 配置 autoCommit 参数为 false 时,需要手动调用 commitSync 和 commitAsync 方法同步或异步提交某批数据的消费进度: - -```Java -void commitSync(final SubscriptionMessage message) throws Exception; -void commitSync(final Iterable messages) throws Exception; - -CompletableFuture commitAsync(final SubscriptionMessage message); -CompletableFuture commitAsync(final Iterable messages); -void commitAsync(final SubscriptionMessage message, final AsyncCommitCallback callback); -void commitAsync(final Iterable messages, final AsyncCommitCallback callback); -``` - -AsyncCommitCallback 类定义如下: - -```Java -public interface AsyncCommitCallback { - default void onComplete() { - // Do nothing - } - - default void onFailure(final Throwable e) { - // Do nothing - } -} -``` - -#### 6 取消订阅 - -`SubscriptionPushConsumer` 和 `SubscriptionPullConsumer` 提供了下述 JAVA 原生接口用于取消订阅并关闭 consumer: - -```Java -// 取消订阅 topics -void unsubscribe(String topic) throws Exception; -void unsubscribe(List topics) throws Exception; - -// 关闭 consumer -void close(); -``` - -- 在 topic 存在的情况下,如果一个 consumer 在没有订阅了某个 topic 的情况下取消订阅某个 topic,不会报错 -- consumer close 时会退出对应的 consumer group,同时自动 unsubscribe 该 consumer 现存订阅的所有 topics -- consumer 在 close 后生命周期即结束,无法再重新 open 订阅并消费数据 - -#### 7 代码示例 - -##### 7.1 单 Pull Consumer 消费 SessionDataSetsHandler 形式的数据 - -```Java -// Create topics -try (final SubscriptionSession session = new SubscriptionSession(HOST, PORT)) { - session.open(); - final Properties config = new Properties(); - config.put(TopicConstant.PATH_KEY, "root.db.**"); - session.createTopic(TOPIC_1, config); -} - -// Subscription: property-style ctor -final Properties config = new Properties(); -config.put(ConsumerConstant.CONSUMER_ID_KEY, "c1"); -config.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg1"); - -final SubscriptionPullConsumer consumer1 = new SubscriptionPullConsumer(config); -consumer1.open(); -consumer1.subscribe(TOPIC_1); -while (true) { - LockSupport.parkNanos(SLEEP_NS); // wait some time - final List messages = consumer1.poll(POLL_TIMEOUT_MS); - for (final SubscriptionMessage message : messages) { - for (final SubscriptionSessionDataSet dataSet : message.getSessionDataSetsHandler()) { - System.out.println(dataSet.getColumnNames()); - System.out.println(dataSet.getColumnTypes()); - while (dataSet.hasNext()) { - System.out.println(dataSet.next()); - } - } - } - // Auto commit -} - -// Show topics and subscriptions -try (final SubscriptionSession session = new SubscriptionSession(HOST, PORT)) { - session.open(); - session.getTopics().forEach((System.out::println)); - session.getSubscriptions().forEach((System.out::println)); -} - -consumer1.unsubscribe(TOPIC_1); -consumer1.close(); -``` - -##### 7.2 多 Push Consumer 消费 TsFileHandler 形式的数据 - -```Java -// Create topics -try (final SubscriptionSession subscriptionSession = new SubscriptionSession(HOST, PORT)) { - subscriptionSession.open(); - final Properties config = new Properties(); - config.put(TopicConstant.FORMAT_KEY, TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE); - subscriptionSession.createTopic(TOPIC_2, config); -} - -final List threads = new ArrayList<>(); -for (int i = 0; i < 8; ++i) { - final int idx = i; - final Thread thread = - new Thread( - () -> { - // Subscription: builder-style ctor - try (final SubscriptionPushConsumer consumer2 = - new SubscriptionPushConsumer.Builder() - .consumerId("c" + idx) - .consumerGroupId("cg2") - .fileSaveDir(System.getProperty("java.io.tmpdir")) - .ackStrategy(AckStrategy.AFTER_CONSUME) - .consumeListener( - message -> { - doSomething(message.getTsFileHandler()); - return ConsumeResult.SUCCESS; - }) - .buildPushConsumer()) { - consumer2.open(); - consumer2.subscribe(TOPIC_2); - // block the consumer main thread - Thread.sleep(Long.MAX_VALUE); - } catch (final IOException | InterruptedException e) { - throw new RuntimeException(e); - } - }); - thread.start(); - threads.add(thread); -} - -for (final Thread thread : threads) { - thread.join(); -} -``` - - -### 其他功能(直接执行SQL语句) - -``` java -void executeNonQueryStatement(String sql) -``` - -### 写入测试接口 (用于分析网络带宽) - -不实际写入数据,只将数据传输到 server 即返回 - -* 测试 insertRecord - -``` java -void testInsertRecord(String deviceId, long time, List measurements, List values) - -void testInsertRecord(String deviceId, long time, List measurements, - List types, List values) -``` - -* 测试 testInsertRecords - -``` java -void testInsertRecords(List deviceIds, List times, - List> measurementsList, List> valuesList) - -void testInsertRecords(List deviceIds, List times, - List> measurementsList, List> typesList, - List> valuesList) -``` - -* 测试 insertTablet - -``` java -void testInsertTablet(Tablet tablet) -``` - -* 测试 insertTablets - -``` java -void testInsertTablets(Map tablets) -``` - -### 示例代码 - -浏览上述接口的详细信息,请参阅代码 ```session/src/main/java/org/apache/iotdb/session/Session.java``` -使用上述接口的示例代码在 ```example/session/src/main/java/org/apache/iotdb/SessionExample.java``` +### 函数列表 +#### 会话管理 + +| 方法名 | 功能描述 | 参数解释 | +|-----------------------------------------------------------------------------------------|--------------------------------------------|------------------------------------------------------------------------------------------------------------| +| `open()` | 打开会话 | 无参数 | +| `open(boolean enableRPCCompression)` | 打开会话并启用RPC压缩 | `enableRPCCompression`: 是否启用RPC压缩 | +| `open(boolean enableRPCCompression, int connectionTimeoutInMs)` | 打开会话并设置连接超时 | `enableRPCCompression`: 是否启用RPC压缩,`connectionTimeoutInMs`: 连接超时时间(毫秒) | +| `open(boolean enableRPCCompression, int connectionTimeoutInMs, Map deviceIdToEndpoint, INodeSupplier nodeSupplier)` | 打开会话并配置节点 | `enableRPCCompression`: 是否启用RPC压缩,`connectionTimeoutInMs`: 超时时间,`deviceIdToEndpoint`: 设备映射 | +| `close()` | 关闭会话 | 无参数 | +| `getVersion()` | 获取会话版本 | 无参数 | +| `setVersion(Version version)` | 设置会话版本 | `version`: 要设置的版本 | +| `getTimeZone()` | 获取当前时区 | 无参数 | +| `setTimeZone(String zoneId)` | 设置时区 | `zoneId`: 时区标识符(例如 `Asia/Shanghai`) | +| `setTimeZoneOfSession(String zoneId)` | 设置会话时区 | `zoneId`: 时区标识符 | +| `getFetchSize()` | 获取批量查询的记录数限制 | 无参数 | +| `setFetchSize(int fetchSize)` | 设置批量查询的记录数限制 | `fetchSize`: 每批查询返回的最大记录数 | +| `setQueryTimeout(long timeoutInMs)` | 设置查询超时时间 | `timeoutInMs`: 查询的超时时间(毫秒) | +| `getQueryTimeout()` | 获取查询超时时间 | 无参数 | +| `isEnableQueryRedirection()` | 检查是否启用查询重定向 | 无参数 | +| `setEnableQueryRedirection(boolean enableQueryRedirection)` | 设置查询重定向 | `enableQueryRedirection`: 是否启用查询重定向 | +| `isEnableRedirection()` | 检查是否启用重定向 | 无参数 | +| `setEnableRedirection(boolean enableRedirection)` | 设置重定向 | `enableRedirection`: 是否启用重定向 | + + +#### 元数据管理 + +| 方法名 | 功能描述 | 参数解释 | +|-----------------------------------------------------------------------------------------|--------------------------------------------|------------------------------------------------------------------------------------------------------------| +| `createDatabase(String database)` | 创建数据库 | `database`: 数据库名称 | +| `deleteDatabase(String database)` | 删除指定数据库 | `database`: 要删除的数据库名称 | +| `deleteDatabases(List databases)` | 批量删除数据库 | `databases`: 要删除的数据库名称列表 | +| `createTimeseries(String path, TSDataType dataType, TSEncoding encoding, CompressionType compressor)` | 创建单个时间序列 | `path`: 时间序列路径,`dataType`: 数据类型,`encoding`: 编码类型,`compressor`: 压缩类型 | +| `createAlignedTimeseries(...)` | 创建对齐时间序列 | 设备ID、测点列表、数据类型列表、编码列表、压缩类型列表 | +| `createMultiTimeseries(...)` | 批量创建时间序列 | 多个路径、数据类型、编码、压缩类型、属性、标签、别名等 | +| `deleteTimeseries(String path)` | 删除时间序列 | `path`: 要删除的时间序列路径 | +| `deleteTimeseries(List paths)` | 批量删除时间序列 | `paths`: 要删除的时间序列路径列表 | +| `setSchemaTemplate(String templateName, String prefixPath)` | 设置模式模板 | `templateName`: 模板名称,`prefixPath`: 应用模板的路径 | +| `createSchemaTemplate(Template template)` | 创建模式模板 | `template`: 模板对象 | +| `dropSchemaTemplate(String templateName)` | 删除模式模板 | `templateName`: 要删除的模板名称 | +| `addAlignedMeasurementsInTemplate(...)` | 添加对齐测点到模板 | 模板名称、测点路径列表、数据类型、编码类型、压缩类型 | +| `addUnalignedMeasurementsInTemplate(...)` | 添加非对齐测点到模板 | 同上 | +| `deleteNodeInTemplate(String templateName, String path)` | 删除模板中的节点 | `templateName`: 模板名称,`path`: 要删除的路径 | +| `countMeasurementsInTemplate(String name)` | 统计模板中测点数量 | `name`: 模板名称 | +| `isMeasurementInTemplate(String templateName, String path)` | 检查模板中是否存在某测点 | `templateName`: 模板名称,`path`: 测点路径 | +| `isPathExistInTemplate(String templateName, String path)` | 检查模板中路径是否存在 | 同上 | +| `showMeasurementsInTemplate(String templateName)` | 显示模板中的测点 | `templateName`: 模板名称 | +| `showMeasurementsInTemplate(String templateName, String pattern)` | 按模式显示模板中的测点 | `templateName`: 模板名称,`pattern`: 匹配模式 | +| `showAllTemplates()` | 显示所有模板 | 无参数 | +| `showPathsTemplateSetOn(String templateName)` | 显示模板应用的路径 | `templateName`: 模板名称 | +| `showPathsTemplateUsingOn(String templateName)` | 显示模板实际使用的路径 | 同上 | +| `unsetSchemaTemplate(String prefixPath, String templateName)` | 取消路径的模板设置 | `prefixPath`: 路径,`templateName`: 模板名称 | + + +#### 数据写入 +| 方法名 | 功能描述 | 参数解释 | +|-----------------------------------------------------------------------------------------|--------------------------------------------|------------------------------------------------------------------------------------------------------------| +| `insertRecord(String deviceId, long time, List measurements, List types, Object... values)` | 插入单条记录 | `deviceId`: 设备ID,`time`: 时间戳,`measurements`: 测点列表,`types`: 数据类型列表,`values`: 值列表 | +| `insertRecord(String deviceId, long time, List measurements, List values)` | 插入单条记录 | `deviceId`: 设备ID,`time`: 时间戳,`measurements`: 测点列表,`values`: 值列表 | +| `insertRecords(List deviceIds, List times, List> measurementsList, List> valuesList)` | 插入多条记录 | `deviceIds`: 设备ID列表,`times`: 时间戳列表,`measurementsList`: 测点列表列表,`valuesList`: 值列表 | +| `insertRecords(List deviceIds, List times, List> measurementsList, List> typesList, List> valuesList)` | 插入多条记录 | 同上,增加 `typesList`: 数据类型列表 | +| `insertRecordsOfOneDevice(String deviceId, List times, List> measurementsList, List> typesList, List> valuesList)` | 插入单设备的多条记录 | `deviceId`: 设备ID,`times`: 时间戳列表,`measurementsList`: 测点列表列表,`typesList`: 类型列表,`valuesList`: 值列表 | +| `insertRecordsOfOneDevice(String deviceId, List times, List> measurementsList, List> typesList, List> valuesList, boolean haveSorted)` | 插入排序后的单设备多条记录 | 同上,增加 `haveSorted`: 数据是否已排序 | +| `insertStringRecordsOfOneDevice(String deviceId, List times, List> measurementsList, List> valuesList)` | 插入字符串格式的单设备记录 | `deviceId`: 设备ID,`times`: 时间戳列表,`measurementsList`: 测点列表,`valuesList`: 值列表 | +| `insertStringRecordsOfOneDevice(String deviceId, List times, List> measurementsList, List> valuesList, boolean haveSorted)` | 插入排序的字符串格式单设备记录 | 同上,增加 `haveSorted`: 数据是否已排序 | +| `insertAlignedRecord(String deviceId, long time, List measurements, List types, List values)` | 插入单条对齐记录 | `deviceId`: 设备ID,`time`: 时间戳,`measurements`: 测点列表,`types`: 类型列表,`values`: 值列表 | +| `insertAlignedRecord(String deviceId, long time, List measurements, List values)` | 插入字符串格式的单条对齐记录 | `deviceId`: 设备ID,`time`: 时间戳,`measurements`: 测点列表,`values`: 值列表 | +| `insertAlignedRecords(List deviceIds, List times, List> measurementsList, List> valuesList)` | 插入多条对齐记录 | `deviceIds`: 设备ID列表,`times`: 时间戳列表,`measurementsList`: 测点列表,`valuesList`: 值列表 | +| `insertAlignedRecords(List deviceIds, List times, List> measurementsList, List> typesList, List> valuesList)` | 插入多条对齐记录 | 同上,增加 `typesList`: 数据类型列表 | +| `insertAlignedRecordsOfOneDevice(String deviceId, List times, List> measurementsList, List> typesList, List> valuesList)` | 插入单设备的多条对齐记录 | 同上 | +| `insertAlignedRecordsOfOneDevice(String deviceId, List times, List> measurementsList, List> typesList, List> valuesList, boolean haveSorted)` | 插入排序的单设备多条对齐记录 | 同上,增加 `haveSorted`: 数据是否已排序 | +| `insertAlignedStringRecordsOfOneDevice(String deviceId, List times, List> measurementsList, List> valuesList)` | 插入字符串格式的单设备对齐记录 | `deviceId`: 设备ID,`times`: 时间戳列表,`measurementsList`: 测点列表,`valuesList`: 值列表 | +| `insertAlignedStringRecordsOfOneDevice(String deviceId, List times, List> measurementsList, List> valuesList, boolean haveSorted)` | 插入排序的字符串格式单设备对齐记录 | 同上,增加 `haveSorted`: 数据是否已排序 | +| `insertTablet(Tablet tablet)` | 插入单个Tablet数据 | `tablet`: 要插入的Tablet数据 | +| `insertTablet(Tablet tablet, boolean sorted)` | 插入排序的Tablet数据 | 同上,增加 `sorted`: 数据是否已排序 | +| `insertAlignedTablet(Tablet tablet)` | 插入对齐的Tablet数据 | `tablet`: 要插入的Tablet数据 | +| `insertAlignedTablet(Tablet tablet, boolean sorted)` | 插入排序的对齐Tablet数据 | 同上,增加 `sorted`: 数据是否已排序 | +| `insertTablets(Map tablets)` | 批量插入多个Tablet数据 | `tablets`: 设备ID到Tablet的映射表 | +| `insertTablets(Map tablets, boolean sorted)` | 批量插入排序的多个Tablet数据 | 同上,增加 `sorted`: 数据是否已排序 | +| `insertAlignedTablets(Map tablets)` | 批量插入多个对齐Tablet数据 | `tablets`: 设备ID到Tablet的映射表 | +| `insertAlignedTablets(Map tablets, boolean sorted)` | 批量插入排序的多个对齐Tablet数据 | 同上,增加 `sorted`: 数据是否已排序 | + +#### 数据删除 + +| 方法名 | 功能描述 | 参数解释 | +|-----------------------------------------------------------------------------------------|--------------------------------------------|------------------------------------------------------------------------------------------------------------| +| `deleteTimeseries(String path)` | 删除单个时间序列 | `path`: 时间序列路径 | +| `deleteTimeseries(List paths)` | 批量删除时间序列 | `paths`: 时间序列路径列表 | +| `deleteData(String path, long endTime)` | 删除指定路径的历史数据 | `path`: 路径,`endTime`: 结束时间戳 | +| `deleteData(List paths, long endTime)` | 批量删除路径的历史数据 | `paths`: 路径列表,`endTime`: 结束时间戳 | +| `deleteData(List paths, long startTime, long endTime)` | 删除路径时间范围内的历史数据 | 同上,增加 `startTime`: 起始时间戳 | + + +#### 数据查询 +| 方法名 | 功能描述 | 参数解释 | +|-----------------------------------------------------------------------------------------|--------------------------------------------|------------------------------------------------------------------------------------------------------------| +| `executeQueryStatement(String sql)` | 执行查询语句 | `sql`: 查询SQL语句 | +| `executeQueryStatement(String sql, long timeoutInMs)` | 执行带超时的查询语句 | `sql`: 查询SQL语句,`timeoutInMs`: 查询超时时间(毫秒) | +| `executeRawDataQuery(List paths, long startTime, long endTime)` | 查询指定路径的原始数据 | `paths`: 查询路径列表,`startTime`: 起始时间戳,`endTime`: 结束时间戳 | +| `executeRawDataQuery(List paths, long startTime, long endTime, long timeOut)` | 查询指定路径的原始数据(带超时) | 同上,增加 `timeOut`: 超时时间 | +| `executeLastDataQuery(List paths)` | 查询最新数据 | `paths`: 查询路径列表 | +| `executeLastDataQuery(List paths, long lastTime)` | 查询指定时间的最新数据 | `paths`: 查询路径列表,`lastTime`: 指定的时间戳 | +| `executeLastDataQuery(List paths, long lastTime, long timeOut)` | 查询指定时间的最新数据(带超时) | 同上,增加 `timeOut`: 超时时间 | +| `executeLastDataQueryForOneDevice(String db, String device, List sensors, boolean isLegalPathNodes)` | 查询单个设备的最新数据 | `db`: 数据库名,`device`: 设备名,`sensors`: 传感器列表,`isLegalPathNodes`: 是否合法路径节点 | +| `executeAggregationQuery(List paths, List aggregations)` | 执行聚合查询 | `paths`: 查询路径列表,`aggregations`: 聚合类型列表 | +| `executeAggregationQuery(List paths, List aggregations, long startTime, long endTime)` | 执行带时间范围的聚合查询 | 同上,增加 `startTime`: 起始时间戳,`endTime`: 结束时间戳 | +| `executeAggregationQuery(List paths, List aggregations, long startTime, long endTime, long interval)` | 执行带时间间隔的聚合查询 | 同上,增加 `interval`: 时间间隔 | +| `executeAggregationQuery(List paths, List aggregations, long startTime, long endTime, long interval, long slidingStep)` | 执行滑动窗口聚合查询 | 同上,增加 `slidingStep`: 滑动步长 | +| `fetchAllConnections()` | 获取所有活动连接信息 | 无参数 | + +#### 系统状态与备份 +| 方法名 | 功能描述 | 参数解释 | +|-----------------------------------------------------------------------------------------|--------------------------------------------|------------------------------------------------------------------------------------------------------------| +| `getBackupConfiguration()` | 获取备份配置信息 | 无参数 | +| `fetchAllConnections()` | 获取所有活动的连接信息 | 无参数 | +| `getSystemStatus()` | 获取系统状态 | 已废弃,默认返回 `SystemStatus.NORMAL` | + +#### 数据订阅 +##### SubscriptionPullConsumer + +| **函数名** | **说明** | **参数** | +|-------------------------------------|--------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `open()` | 打开消费者连接,启动消息消费。如果 `autoCommit` 启用,会启动自动提交工作器。 | 无 | +| `close()` | 关闭消费者连接。如果 `autoCommit` 启用,会在关闭前提交所有未提交的消息。 | 无 | +| `poll(final Duration timeout)` | 拉取消息,指定超时时间。 | `timeout` (Duration): 拉取的超时时间。 | +| `poll(final long timeoutMs)` | 拉取消息,指定超时时间(毫秒)。 | `timeoutMs` (long): 超时时间,单位为毫秒。 | +| `poll(final Set topicNames, final Duration timeout)` | 拉取指定主题的消息,指定超时时间。 | `topicNames` (Set): 要拉取的主题集合。
`timeout` (Duration): 超时时间。 | +| `poll(final Set topicNames, final long timeoutMs)` | 拉取指定主题的消息,指定超时时间(毫秒)。 | `topicNames` (Set): 要拉取的主题集合。
`timeoutMs` (long): 超时时间,单位为毫秒。 | +| `commitSync(final SubscriptionMessage message)` | 同步提交单条消息。 | `message` (SubscriptionMessage): 需要提交的消息对象。 | +| `commitSync(final Iterable messages)` | 同步提交多条消息。 | `messages` (Iterable): 需要提交的消息集合。 | +| `commitAsync(final SubscriptionMessage message)` | 异步提交单条消息。 | `message` (SubscriptionMessage): 需要提交的消息对象。 | +| `commitAsync(final Iterable messages)` | 异步提交多条消息。 | `messages` (Iterable): 需要提交的消息集合。 | +| `commitAsync(final SubscriptionMessage message, final AsyncCommitCallback callback)` | 异步提交单条消息并指定回调函数。 | `message` (SubscriptionMessage): 需要提交的消息对象。
`callback` (AsyncCommitCallback): 异步提交完成后的回调函数。 | +| `commitAsync(final Iterable messages, final AsyncCommitCallback callback)` | 异步提交多条消息并指定回调函数。 | `messages` (Iterable): 需要提交的消息集合。
`callback` (AsyncCommitCallback): 异步提交完成后的回调函数。 | + +##### SubscriptionPushConsumer + +| **函数名** | **说明** | **参数** | +|-------------------------------------|----------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `open()` | 打开消费者连接,启动消息消费,提交自动轮询工作器。 | 无 | +| `close()` | 关闭消费者连接,停止消息消费。 | 无 | +| `toString()` | 返回消费者对象的核心配置信息。 | 无 | +| `coreReportMessage()` | 获取消费者核心配置的键值对表示形式。 | 无 | +| `allReportMessage()` | 获取消费者所有配置的键值对表示形式。 | 无 | +| `buildPushConsumer()` | 通过 `Builder` 构建 `SubscriptionPushConsumer` 实例。 | 无 | +| `ackStrategy(final AckStrategy ackStrategy)` | 配置消费者的消息确认策略。 | `ackStrategy` (AckStrategy): 指定的消息确认策略。 | +| `consumeListener(final ConsumeListener consumeListener)` | 配置消费者的消息消费逻辑。 | `consumeListener` (ConsumeListener): 消费者接收消息时的处理逻辑。 | +| `autoPollIntervalMs(final long autoPollIntervalMs)` | 配置自动轮询的时间间隔。 | `autoPollIntervalMs` (long): 自动轮询的间隔时间,单位为毫秒。 | +| `autoPollTimeoutMs(final long autoPollTimeoutMs)` | 配置自动轮询的超时时间。 | `autoPollTimeoutMs` (long): 自动轮询的超时时间,单位为毫秒。 | -使用对齐时间序列和元数据模板的示例可以参见 `example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java` \ No newline at end of file From b13974cbda24888b06164d58a5db28e357cbc143 Mon Sep 17 00:00:00 2001 From: Zhijia Cao Date: Fri, 20 Dec 2024 19:25:43 +0800 Subject: [PATCH 2/5] rebuild java api docs --- src/zh/UserGuide/latest/API/Programming-Java-Native-API.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/zh/UserGuide/latest/API/Programming-Java-Native-API.md b/src/zh/UserGuide/latest/API/Programming-Java-Native-API.md index 4668c4e77..dbdf39de4 100644 --- a/src/zh/UserGuide/latest/API/Programming-Java-Native-API.md +++ b/src/zh/UserGuide/latest/API/Programming-Java-Native-API.md @@ -420,6 +420,8 @@ IoTDB 提供了强大的数据订阅功能,允许用户通过订阅获取数 ``` +#### 2. 代码案例 + ```java public class DataConsumerExample { From 4715b4206c033feff7afcc51fc09e23e191603ff Mon Sep 17 00:00:00 2001 From: Zhijia Cao Date: Fri, 20 Dec 2024 19:44:40 +0800 Subject: [PATCH 3/5] fix CI --- .../latest/API/Programming-Java-Native-API.md | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/src/zh/UserGuide/latest/API/Programming-Java-Native-API.md b/src/zh/UserGuide/latest/API/Programming-Java-Native-API.md index dbdf39de4..89bb6f464 100644 --- a/src/zh/UserGuide/latest/API/Programming-Java-Native-API.md +++ b/src/zh/UserGuide/latest/API/Programming-Java-Native-API.md @@ -734,16 +734,16 @@ occupied:用于记录正在被占用的连接 |-------------------------------------|--------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | `open()` | 打开消费者连接,启动消息消费。如果 `autoCommit` 启用,会启动自动提交工作器。 | 无 | | `close()` | 关闭消费者连接。如果 `autoCommit` 启用,会在关闭前提交所有未提交的消息。 | 无 | -| `poll(final Duration timeout)` | 拉取消息,指定超时时间。 | `timeout` (Duration): 拉取的超时时间。 | -| `poll(final long timeoutMs)` | 拉取消息,指定超时时间(毫秒)。 | `timeoutMs` (long): 超时时间,单位为毫秒。 | -| `poll(final Set topicNames, final Duration timeout)` | 拉取指定主题的消息,指定超时时间。 | `topicNames` (Set): 要拉取的主题集合。
`timeout` (Duration): 超时时间。 | -| `poll(final Set topicNames, final long timeoutMs)` | 拉取指定主题的消息,指定超时时间(毫秒)。 | `topicNames` (Set): 要拉取的主题集合。
`timeoutMs` (long): 超时时间,单位为毫秒。 | -| `commitSync(final SubscriptionMessage message)` | 同步提交单条消息。 | `message` (SubscriptionMessage): 需要提交的消息对象。 | -| `commitSync(final Iterable messages)` | 同步提交多条消息。 | `messages` (Iterable): 需要提交的消息集合。 | -| `commitAsync(final SubscriptionMessage message)` | 异步提交单条消息。 | `message` (SubscriptionMessage): 需要提交的消息对象。 | -| `commitAsync(final Iterable messages)` | 异步提交多条消息。 | `messages` (Iterable): 需要提交的消息集合。 | -| `commitAsync(final SubscriptionMessage message, final AsyncCommitCallback callback)` | 异步提交单条消息并指定回调函数。 | `message` (SubscriptionMessage): 需要提交的消息对象。
`callback` (AsyncCommitCallback): 异步提交完成后的回调函数。 | -| `commitAsync(final Iterable messages, final AsyncCommitCallback callback)` | 异步提交多条消息并指定回调函数。 | `messages` (Iterable): 需要提交的消息集合。
`callback` (AsyncCommitCallback): 异步提交完成后的回调函数。 | +| `poll(final Duration timeout)` | 拉取消息,指定超时时间。 | `timeout` : 拉取的超时时间。 | +| `poll(final long timeoutMs)` | 拉取消息,指定超时时间(毫秒)。 | `timeoutMs` : 超时时间,单位为毫秒。 | +| `poll(final Set topicNames, final Duration timeout)` | 拉取指定主题的消息,指定超时时间。 | `topicNames` : 要拉取的主题集合。`timeout`: 超时时间。 | +| `poll(final Set topicNames, final long timeoutMs)` | 拉取指定主题的消息,指定超时时间(毫秒)。 | `topicNames` : 要拉取的主题集合。`timeoutMs`: 超时时间,单位为毫秒。 | +| `commitSync(final SubscriptionMessage message)` | 同步提交单条消息。 | `message` : 需要提交的消息对象。 | +| `commitSync(final Iterable messages)` | 同步提交多条消息。 | `messages` : 需要提交的消息集合。 | +| `commitAsync(final SubscriptionMessage message)` | 异步提交单条消息。 | `message` : 需要提交的消息对象。 | +| `commitAsync(final Iterable messages)` | 异步提交多条消息。 | `messages` : 需要提交的消息集合。 | +| `commitAsync(final SubscriptionMessage message, final AsyncCommitCallback callback)` | 异步提交单条消息并指定回调函数。 | `message` : 需要提交的消息对象。`callback` : 异步提交完成后的回调函数。 | +| `commitAsync(final Iterable messages, final AsyncCommitCallback callback)` | 异步提交多条消息并指定回调函数。 | `messages` : 需要提交的消息集合。`callback` : 异步提交完成后的回调函数。 | ##### SubscriptionPushConsumer @@ -755,8 +755,8 @@ occupied:用于记录正在被占用的连接 | `coreReportMessage()` | 获取消费者核心配置的键值对表示形式。 | 无 | | `allReportMessage()` | 获取消费者所有配置的键值对表示形式。 | 无 | | `buildPushConsumer()` | 通过 `Builder` 构建 `SubscriptionPushConsumer` 实例。 | 无 | -| `ackStrategy(final AckStrategy ackStrategy)` | 配置消费者的消息确认策略。 | `ackStrategy` (AckStrategy): 指定的消息确认策略。 | -| `consumeListener(final ConsumeListener consumeListener)` | 配置消费者的消息消费逻辑。 | `consumeListener` (ConsumeListener): 消费者接收消息时的处理逻辑。 | -| `autoPollIntervalMs(final long autoPollIntervalMs)` | 配置自动轮询的时间间隔。 | `autoPollIntervalMs` (long): 自动轮询的间隔时间,单位为毫秒。 | -| `autoPollTimeoutMs(final long autoPollTimeoutMs)` | 配置自动轮询的超时时间。 | `autoPollTimeoutMs` (long): 自动轮询的超时时间,单位为毫秒。 | +| `ackStrategy(final AckStrategy ackStrategy)` | 配置消费者的消息确认策略。 | `ackStrategy`: 指定的消息确认策略。 | +| `consumeListener(final ConsumeListener consumeListener)` | 配置消费者的消息消费逻辑。 | `consumeListener`: 消费者接收消息时的处理逻辑。 | +| `autoPollIntervalMs(final long autoPollIntervalMs)` | 配置自动轮询的时间间隔。 | `autoPollIntervalMs` : 自动轮询的间隔时间,单位为毫秒。 | +| `autoPollTimeoutMs(final long autoPollTimeoutMs)` | 配置自动轮询的超时时间。 | `autoPollTimeoutMs`: 自动轮询的超时时间,单位为毫秒。 | From b0ebf517d532c9439c66c22771c061634cf40a53 Mon Sep 17 00:00:00 2001 From: Zhijia Cao Date: Tue, 31 Dec 2024 19:46:41 +0800 Subject: [PATCH 4/5] upgrade docs --- src/.vuepress/sidebar_timecho/V1.3.3/zh.ts | 7 +- .../latest/API/Programming-Data-Sync.md | 247 +++++++++++ .../latest/API/Programming-Java-Native-API.md | 414 +++--------------- 3 files changed, 326 insertions(+), 342 deletions(-) create mode 100644 src/zh/UserGuide/latest/API/Programming-Data-Sync.md diff --git a/src/.vuepress/sidebar_timecho/V1.3.3/zh.ts b/src/.vuepress/sidebar_timecho/V1.3.3/zh.ts index e4a00e0d0..9fe10e08b 100644 --- a/src/.vuepress/sidebar_timecho/V1.3.3/zh.ts +++ b/src/.vuepress/sidebar_timecho/V1.3.3/zh.ts @@ -137,7 +137,12 @@ export const zhSidebar = { prefix: 'API/', // children: 'structure', children: [ - { text: 'Java原生接口', link: 'Programming-Java-Native-API' }, + { text: 'Java原生接口', collapsible: true, + children: [ + { text: 'Java原生API', link: 'Programming-Java-Native-API' }, + { text: '数据订阅API', link: 'Programming-Data-Sync' }, + ], + }, { text: 'Python原生接口', link: 'Programming-Python-Native-API' }, { text: 'C++原生接口', link: 'Programming-Cpp-Native-API' }, { text: 'Go原生接口', link: 'Programming-Go-Native-API' }, diff --git a/src/zh/UserGuide/latest/API/Programming-Data-Sync.md b/src/zh/UserGuide/latest/API/Programming-Data-Sync.md new file mode 100644 index 000000000..fcfc6e183 --- /dev/null +++ b/src/zh/UserGuide/latest/API/Programming-Data-Sync.md @@ -0,0 +1,247 @@ + + +# 数据订阅编程 +IoTDB 提供了强大的数据订阅功能,允许用户通过订阅SDK实时获取IoTDB新增的数据。详细的功能定义及介绍:[数据订阅](../../User-Manual/Data-Sync_timecho.md#数据同步) + +## 1 核心步骤 + +1. 创建Topic:创建一个Topic,Topic中包含希望订阅的测点。 +2. 订阅Topic:在 consumer 订阅 topic 前,topic 必须已经被创建,否则订阅会失败。同一个 consumer group 下的 consumers 会均分数据。 +3. 消费数据:只有显式订阅了某个 topic,才会收到对应 topic 的数据。 +4. 取消订阅: consumer close 时会退出对应的 consumer group,同时取消现存的所有订阅。 + + +## 2 详细步骤 +本章节用于说明开发的核心流程,并未演示所有的参数和接口,如需了解全部功能及参数请参见: [全量接口说明](./Programming-Java-Native-API.md#全量接口说明) + + +### 2.1 创建maven项目 +创建一个maven项目,并导入以下依赖(JDK >= 1.8, Maven >= 3.6) + +```xml + + + org.apache.iotdb + iotdb-session + + ${project.version} + + +``` + +### 2.2 代码案例 +#### 2.2.1 Topic操作 +```java +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.rpc.subscription.config.TopicConstant; +import org.apache.iotdb.session.subscription.SubscriptionSession; +import org.apache.iotdb.session.subscription.model.Topic; + +public class DataConsumerExample { + + public static void main(String[] args) throws IoTDBConnectionException, StatementExecutionException { + try (SubscriptionSession session = new SubscriptionSession("127.0.0.1", 6667)) { + // 1. open session + session.open(); + + // 2. create a topic of all data + Properties sessionConfig = new Properties(); + sessionConfig.put(TopicConstant.PATH_KEY, "root.**"); + + session.createTopic("allData", sessionConfig); + + // 3. show all topics + Set topics = session.getTopics(); + System.out.println(topics); + + // 4. show a specific topic + Optional allData = session.getTopic("allData"); + System.out.println(allData.get()); + } + } +} +``` +#### 2.2.2 数据消费 + +##### 场景-1: 订阅IoTDB中新增的实时数据(大屏或组态展示的场景) + +```java +import java.io.IOException; +import java.util.List; +import java.util.Properties; +import org.apache.iotdb.rpc.subscription.config.ConsumerConstant; +import org.apache.iotdb.rpc.subscription.config.TopicConstant; +import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer; +import org.apache.iotdb.session.subscription.payload.SubscriptionMessage; +import org.apache.iotdb.session.subscription.payload.SubscriptionMessageType; +import org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet; +import org.apache.tsfile.read.common.RowRecord; + +public class DataConsumerExample { + + public static void main(String[] args) throws IOException { + + // 5. create a pull consumer, the subscription is automatically cancelled when the logic in the try resources is completed + Properties consumerConfig = new Properties(); + consumerConfig.put(ConsumerConstant.CONSUMER_ID_KEY, "c1"); + consumerConfig.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg1"); + consumerConfig.put(ConsumerConstant.CONSUME_LISTENER_KEY, TopicConstant.FORMAT_SESSION_DATA_SETS_HANDLER_VALUE); + try (SubscriptionPullConsumer pullConsumer = new SubscriptionPullConsumer(consumerConfig)) { + pullConsumer.open(); + pullConsumer.subscribe("topoc_all"); + while (true) { + List messages = pullConsumer.poll(10000); + for (final SubscriptionMessage message : messages) { + final short messageType = message.getMessageType(); + if (SubscriptionMessageType.isValidatedMessageType(messageType)) { + for (final SubscriptionSessionDataSet dataSet : message.getSessionDataSetsHandler()) { + while (dataSet.hasNext()) { + final RowRecord record = dataSet.next(); + System.out.println(record); + } + } + } + } + } + } + } +} + + +``` +##### 场景-2:订阅新增的 Tsfile(定期数据备份的场景) + +前提:需要被消费的topic为TsfileHandler类型,举例:`create topic topic_all_tsfile with ('path'='root.**','format'='TsFileHandler')` + +```java +import java.io.IOException; +import java.util.List; +import java.util.Properties; +import org.apache.iotdb.rpc.subscription.config.ConsumerConstant; +import org.apache.iotdb.rpc.subscription.config.TopicConstant; +import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer; +import org.apache.iotdb.session.subscription.payload.SubscriptionMessage; + + +public class DataConsumerExample { + + public static void main(String[] args) throws IOException { + // 1. create a pull consumer, the subscription is automatically cancelled when the logic in the try resources is completed + Properties consumerConfig = new Properties(); + consumerConfig.put(ConsumerConstant.CONSUMER_ID_KEY, "c1"); + consumerConfig.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg1"); + // 2. Specify the consumption type as the tsfile type + consumerConfig.put(TopicConstant.FORMAT_KEY, TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE); + consumerConfig.put(ConsumerConstant.FILE_SAVE_DIR_KEY, "/Users/caozhijia/Desktop"); + try (SubscriptionPullConsumer pullConsumer = new SubscriptionPullConsumer(consumerConfig)) { + pullConsumer.open(); + pullConsumer.subscribe("topic_all_tsfile"); + while (true) { + List messages = pullConsumer.poll(10000); + for (final SubscriptionMessage message : messages) { + message.getTsFileHandler().copyFile("/Users/caozhijia/Downloads/1.tsfile"); + } + } + } + } +} +``` + + + + +## 2 全量接口说明 + +### 2.1 参数列表 +可通过Properties参数对象设置消费者相关参数,具体参数如下。 + +#### 2.1.1 SubscriptionConsumer + + +| 参数 | 是否必填(默认值) | 参数含义 | +| :-------------------------------------------- | :--------------------------------- | :----------------------------------------------------------- | +| host | optional: 127.0.0.1 | `String`: IoTDB 中某 DataNode 的 RPC host | +| port | optional: 6667 | `Integer`: IoTDB 中某 DataNode 的 RPC port | +| node-urls | optional: 127.0.0.1:6667 | `List`: IoTDB 中所有 DataNode 的 RPC 地址,可以是多个;host:port 和 node-urls 选填一个即可。当 host:port 和 node-urls 都填写了,则取 host:port 和 node-urls 的**并集**构成新的 node-urls 应用 | +| username | optional: root | `String`: IoTDB 中 DataNode 的用户名 | +| password | optional: root | `String`: IoTDB 中 DataNode 的密码 | +| groupId | optional | `String`: consumer group id,若未指定则随机分配(新的 consumer group),保证不同的 consumer group 对应的 consumer group id 均不相同 | +| consumerId | optional | `String`: consumer client id,若未指定则随机分配,保证同一个 consumer group 中每一个 consumer client id 均不相同 | +| heartbeatIntervalMs | optional: 30000 (min: 1000) | `Long`: consumer 向 IoTDB DataNode 定期发送心跳请求的间隔 | +| endpointsSyncIntervalMs | optional: 120000 (min: 5000) | `Long`: consumer 探测 IoTDB 集群节点扩缩容情况调整订阅连接的间隔 | +| fileSaveDir | optional: Paths.get(System.getProperty("user.dir"), "iotdb-subscription").toString() | `String`: consumer 订阅出的 TsFile 文件临时存放的目录路径 | +| fileSaveFsync | optional: false | `Boolean`: consumer 订阅 TsFile 的过程中是否主动调用 fsync | + +`SubscriptionPushConsumer` 中的特殊配置: + +| 参数 | 是否必填(默认值) | 参数含义 | +| :-------------------------------------------- | :--------------------------------- | :----------------------------------------------------------- | +| ackStrategy | optional: `ACKStrategy.AFTER_CONSUME` | 消费进度的确认机制包含以下选项:`ACKStrategy.BEFORE_CONSUME`(当 consumer 收到数据时立刻提交消费进度,`onReceive` 前)`ACKStrategy.AFTER_CONSUME`(当 consumer 消费完数据再去提交消费进度,`onReceive` 后) | +| consumeListener | optional | 消费数据的回调函数,需实现 `ConsumeListener` 接口,定义消费 `SessionDataSetsHandler` 和 `TsFileHandler` 形式数据的处理逻辑 | +| autoPollIntervalMs | optional: 5000 (min: 500) | Long: consumer 自动拉取数据的时间间隔,单位为**毫秒** | +| autoPollTimeoutMs | optional: 10000 (min: 1000) | Long: consumer 每次拉取数据的超时时间,单位为**毫秒** | + +`SubscriptionPullConsumer` 中的特殊配置: + +| 参数 | 是否必填(默认值) | 参数含义 | +| :-------------------------------------------- | :--------------------------------- | :----------------------------------------------------------- | +| autoCommit | optional: true | Boolean: 是否自动提交消费进度如果此参数设置为 false,则需要调用 `commit` 方法来手动提交消费进度 | +| autoCommitInterval | optional: 5000 (min: 500) | Long: 自动提交消费进度的时间间隔,单位为**毫秒**仅当 autoCommit 参数为 true 的时候才会生效 | + + +### 函数列表 +#### 数据订阅 +##### SubscriptionPullConsumer + +| **函数名** | **说明** | **参数** | +|-------------------------------------|--------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `open()` | 打开消费者连接,启动消息消费。如果 `autoCommit` 启用,会启动自动提交工作器。 | 无 | +| `close()` | 关闭消费者连接。如果 `autoCommit` 启用,会在关闭前提交所有未提交的消息。 | 无 | +| `poll(final Duration timeout)` | 拉取消息,指定超时时间。 | `timeout` : 拉取的超时时间。 | +| `poll(final long timeoutMs)` | 拉取消息,指定超时时间(毫秒)。 | `timeoutMs` : 超时时间,单位为毫秒。 | +| `poll(final Set topicNames, final Duration timeout)` | 拉取指定主题的消息,指定超时时间。 | `topicNames` : 要拉取的主题集合。`timeout`: 超时时间。 | +| `poll(final Set topicNames, final long timeoutMs)` | 拉取指定主题的消息,指定超时时间(毫秒)。 | `topicNames` : 要拉取的主题集合。`timeoutMs`: 超时时间,单位为毫秒。 | +| `commitSync(final SubscriptionMessage message)` | 同步提交单条消息。 | `message` : 需要提交的消息对象。 | +| `commitSync(final Iterable messages)` | 同步提交多条消息。 | `messages` : 需要提交的消息集合。 | +| `commitAsync(final SubscriptionMessage message)` | 异步提交单条消息。 | `message` : 需要提交的消息对象。 | +| `commitAsync(final Iterable messages)` | 异步提交多条消息。 | `messages` : 需要提交的消息集合。 | +| `commitAsync(final SubscriptionMessage message, final AsyncCommitCallback callback)` | 异步提交单条消息并指定回调函数。 | `message` : 需要提交的消息对象。`callback` : 异步提交完成后的回调函数。 | +| `commitAsync(final Iterable messages, final AsyncCommitCallback callback)` | 异步提交多条消息并指定回调函数。 | `messages` : 需要提交的消息集合。`callback` : 异步提交完成后的回调函数。 | + +##### SubscriptionPushConsumer + +| **函数名** | **说明** | **参数** | +|-------------------------------------|----------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `open()` | 打开消费者连接,启动消息消费,提交自动轮询工作器。 | 无 | +| `close()` | 关闭消费者连接,停止消息消费。 | 无 | +| `toString()` | 返回消费者对象的核心配置信息。 | 无 | +| `coreReportMessage()` | 获取消费者核心配置的键值对表示形式。 | 无 | +| `allReportMessage()` | 获取消费者所有配置的键值对表示形式。 | 无 | +| `buildPushConsumer()` | 通过 `Builder` 构建 `SubscriptionPushConsumer` 实例。 | 无 | +| `ackStrategy(final AckStrategy ackStrategy)` | 配置消费者的消息确认策略。 | `ackStrategy`: 指定的消息确认策略。 | +| `consumeListener(final ConsumeListener consumeListener)` | 配置消费者的消息消费逻辑。 | `consumeListener`: 消费者接收消息时的处理逻辑。 | +| `autoPollIntervalMs(final long autoPollIntervalMs)` | 配置自动轮询的时间间隔。 | `autoPollIntervalMs` : 自动轮询的间隔时间,单位为毫秒。 | +| `autoPollTimeoutMs(final long autoPollTimeoutMs)` | 配置自动轮询的超时时间。 | `autoPollTimeoutMs`: 自动轮询的超时时间,单位为毫秒。 | + diff --git a/src/zh/UserGuide/latest/API/Programming-Java-Native-API.md b/src/zh/UserGuide/latest/API/Programming-Java-Native-API.md index 89bb6f464..a77ee10ed 100644 --- a/src/zh/UserGuide/latest/API/Programming-Java-Native-API.md +++ b/src/zh/UserGuide/latest/API/Programming-Java-Native-API.md @@ -18,26 +18,22 @@ --> -# Java编程接口 +# Session原生API -## Session原生API +IoTDB 原生 API 中的 Session 是实现与数据库交互的核心接口,它集成了丰富的方法,支持数据写入、查询以及元数据操作等功能。通过实例化 Session,能够建立与 IoTDB 服务器的连接,在该连接所构建的环境中执行各类数据库操作。Session为非线程安全,不能被多线程同时调用。 -IoTDB 原生 API 中的 Session 是实现与数据库交互的核心接口,它集成了丰富的方法,支持数据写入、查询以及元数据操作等功能。通过实例化 Session,能够建立与 IoTDB 服务器的连接,在该连接所构建的环境中执行各类数据库操作。 +SessionPool 是 Session 的连接池,推荐使用SessionPool编程。在多线程并发的情形下,SessionPool 能够合理地管理和分配连接资源,以提升系统性能与资源利用效率。 -Session为单线程安全模型,如实际的应用场景为多线程并发场景,强烈推荐使用SessionPool编程。SessionPool 是 Session 的池化形式,专门针对多线程并发场景进行了优化,在多线程并发的情形下,SessionPool 能够合理地管理和分配连接资源,以提升系统性能与资源利用效率。 - -### 步骤概览 - -使用SessionPool的核心步骤: -1. 创建会话池实例:初始化一个SessionPool对象,用于管理多个Session实例。 +## 1 步骤概览 +1. 创建连接池实例:初始化一个SessionPool对象,用于管理多个Session实例。 2. 执行操作:直接从SessionPool中获取Session实例,并执行数据库操作,无需每次都打开和关闭连接。 -3. 关闭会话池资源:在不再需要进行数据库操作时,关闭SessionPool,释放所有相关资源。 +3. 关闭连接池资源:在不再需要进行数据库操作时,关闭SessionPool,释放所有相关资源。 -### 详细步骤 -本章节用于说明开发的核心流程,并未演示所有的参数和接口,如需了解全部功能及参数请参见: [附录](./Programming-Java-Native-API.md#附录) 或 查阅: [源码](https://github.com/apache/iotdb/tree/master/example/session/src/main/java/org/apache/iotdb) +## 2 详细步骤 +本章节用于说明开发的核心流程,并未演示所有的参数和接口,如需了解全部功能及参数请参见: [全量接口说明](./Programming-Java-Native-API.md#) 或 查阅: [源码](https://github.com/apache/iotdb/tree/master/example/session/src/main/java/org/apache/iotdb) -#### 1. 创建maven项目 -创建一个maven项目,并导入以下依赖(JDK >= 1.8, Maven >= 3.6) +### 2.1 创建maven项目 +创建一个maven项目,并在pom.xml文件中添加以下依赖(JDK >= 1.8, Maven >= 3.6) ```xml @@ -49,10 +45,16 @@ Session为单线程安全模型,如实际的应用场景为多线程并发场 ``` -#### 2. 创建会话实例 +### 2.2 创建连接池实例 ```java -public class IoTDBSessionExample { +import java.util.ArrayList; +import java.util.List; +import org.apache.iotdb.session.pool.SessionPool; + +public class IoTDBSessionPoolExample { + private static SessionPool sessionPool; + public static void main(String[] args) { // Using nodeUrls ensures that when one node goes down, other nodes are automatically connected to retry List nodeUrls = new ArrayList<>(); @@ -69,30 +71,35 @@ public class IoTDBSessionExample { } ``` -#### 执行数据库操作 -##### 数据写入 -在工业场景中,数据写入可以根据设备数量、写入频率和数据类型分为以下几类:多(单)设备同一时刻写入、单设备不同时刻数据(攒批)写入。业务允许的情况下,推荐攒批写入以提高写入效率。下面按不同场景对写入接口进行介绍。 +### 2.3 执行数据库操作 +#### 2.3.1 数据写入 +在工业场景中,数据写入可分为以下几类:多行数据写入、单设备多行数据写入,下面按不同场景对写入接口进行介绍。 -###### 多(单)设备同一时刻写入 -场景:多(单)个设备的实时状态或传感器数据批量写入,特点是采集一次上传一次。 +##### 多行数据写入接口 +接口说明:支持一次写入多行数据,每一行对应一个设备一个时间戳的多个测点值。 -适用接口: +接口列表: -| 接口名称 | 功能描述 | -|------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------| -| `insertRecords(List deviceIds, List times, List> measurementsList, List> typesList, List> valuesList)` | 插入多个设备,每个设备多个测点的一个时刻的记录 | -| `insertRecords(List deviceIds, List times, List> measurementsList, List> valuesList)` |同上,不需要指定数据类型,会根据传入的值进行推断。推断规则可在服务端配置,详细配置在iotdb-system.properties.template中的搜索`infer_type`关键字 | -| `insertAlignedRecords(List deviceIds, List times, List> measurementsList, List> typesList, List> valuesList)` | 插入多个设备,每个设备多个测点的一个时刻的记录。每个设备为对齐设备 | -| `insertAlignedRecords(List deviceIds, List times, List> measurementsList, List> valuesList)` | 同上,不需要指定数据类型,会根据传入的值进行推断。推断规则可在服务端配置,详细配置在iotdb-system.properties.template中的搜索`infer_type`关键字 | +| 接口名称 | 功能描述 | +|------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------------| +| `insertRecords(List deviceIds, List times, List> measurementsList, List> typesList, List> valuesList)` | 插入多行数据,适用于不同测点独立采集的场景 | +| `insertAlignedRecords(List deviceIds, List times, List> measurementsList, List> typesList, List> valuesList)` | 插入多行数据,适用于不同测点同时采集的场景 | 代码案例: ```java +import java.util.ArrayList; +import java.util.List; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.pool.SessionPool; +import org.apache.tsfile.enums.TSDataType; + public class SessionPoolExample { private static SessionPool sessionPool; - public static void main(String[] args) { + public static void main(String[] args) throws IoTDBConnectionException, StatementExecutionException { // 1. init SessionPool constructSessionPool(); @@ -119,7 +126,7 @@ public class SessionPoolExample { .build(); } - public static void insertRecordsExample(){ + public static void insertRecordsExample() throws IoTDBConnectionException, StatementExecutionException { String deviceId = "root.sg1.d1"; List measurements = new ArrayList<>(); measurements.add("s1"); @@ -173,18 +180,28 @@ public class SessionPoolExample { } ``` -###### 批量数据上传 -场景:多个设备的大量数据同时上传,适合大规模分布式数据接入。 +##### 单设备多行数据写入接口 +接口说明:支持一次写入单个设备的多行数据,每一行对应一个时间戳的多个测点值。 -适用接口: +接口列表: -| 接口名称 | 功能描述 | -|-----------------------------------------------------------------------------------------|------------------------------------------| -| `insertTablet(Tablet tablet)` | 插入单个设备多个测点,每个测点多个时刻的数据 | -| `insertAlignedTablet(Tablet tablet)` | 插入单个设备多个测点,每个测点多个时刻的数据,该设备为对齐设备 | +| 接口名称 | 功能描述 | +|-----------------------------------------------------------------------------------------|----------------------------| +| `insertTablet(Tablet tablet)` | 插入单个设备的多行数据,适用于不同测点独立采集的场景 | +| `insertAlignedTablet(Tablet tablet)` | 插入单个设备的多行数据,适用于不同测点同时采集的场景 | 代码案例: ```java +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.pool.SessionPool; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.MeasurementSchema; + public class SessionPoolExample { private static SessionPool sessionPool; @@ -265,8 +282,15 @@ public class SessionPoolExample { } ``` -##### 数据查询 +#### 2.3.2 数据查询 ```java +import java.util.ArrayList; +import java.util.List; +import org.apache.iotdb.isession.pool.SessionDataSetWrapper; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.pool.SessionPool; + public class SessionPoolExample { private static SessionPool sessionPool; @@ -300,7 +324,7 @@ public class SessionPoolExample { sessionPool.executeNonQueryStatement("delete timeseries root.test.d1.s1"); } - + private static void executeQueryExample() throws IoTDBConnectionException, StatementExecutionException { // 1. execute normal query try(SessionDataSetWrapper wrapper = sessionPool.executeQueryStatement("select s1 from root.sg1.d1 limit 10")) { @@ -338,165 +362,11 @@ public class SessionPoolExample { } ``` -##### 数据删除 -```java -public class SessionPoolExample { - - private static SessionPool sessionPool; - - public static void main(String[] args) throws IoTDBConnectionException, StatementExecutionException { - - // 1. init SessionPool - constructSessionPool(); - - // 2. delete data - deleteDataExample(); +### 3 全量接口说明 - // 3. close SessionPool - closeSessionPool(); +#### 3.1 参数列表 +Session具有如下的字段,可以通过构造函数或Session.Builder方式设置如下参数 - } - - public static void deleteDataExample() throws IoTDBConnectionException, StatementExecutionException { - // 1. Delete data at a precise point in time - String path = "root.sg.d1.s1"; - long deleteTime = 99; - sessionPool.deleteData(path, deleteTime); - - // 2. The data of a certain period is deleted - long startTime = 1; - sessionPool.deleteData(Collections.singletonList(path),startTime, deleteTime); - - // 3. Delete a measurement point - sessionPool.deleteTimeseries(path); - } - - private static void constructSessionPool() { - // Using nodeUrls ensures that when one node goes down, other nodes are automatically connected to retry - List nodeUrls = new ArrayList<>(); - nodeUrls.add("127.0.0.1:6667"); - nodeUrls.add("127.0.0.1:6668"); - sessionPool = - new SessionPool.Builder() - .nodeUrls(nodeUrls) - .user("root") - .password("root") - .maxSize(3) - .build(); - } - - public static void closeSessionPool(){ - sessionPool.close(); - } -} -``` - -## 数据订阅 -IoTDB 提供了强大的数据订阅功能,允许用户通过订阅获取数据更新通知。订阅功能支持多种消费模式,包括拉取模式(Pull)和推送模式(Push)。章节旨在帮助用户理解并使用 IoTDB 的数据订阅功能。详细的功能定义及介绍:[数据订阅](../../User-Manual/Data-Sync_timecho.md#数据同步) - -### 核心步骤 - -1. 创建Topic:创建一个Topic,Topic中包含指定的数据范围。 -2. 订阅Topic:在 consumer 订阅 topic 前,topic 必须已经被创建,否则订阅会失败。如果该 consumer 所在的 consumer group 中已经有 consumers 订阅了相同的 topics,那么该 consumer 将会复用对应的消费进度。 -3. 消费数据:支持Pull和Push两种消费模型。只有显式订阅了某个 topic,才会收到对应 topic 的数据。若在创建后没有订阅任何 topics,此时该 consumer 无法消费到任何数据,即使该 consumer 所在的 consumer group 中其它的 consumers 订阅了一些 topics. -4. 取消订阅: consumer close 时会退出对应的 consumer group,同时自动 unsubscribe 该 consumer 现存订阅的所有 topics. consumer 在 close 后生命周期即结束,无法再重新 open 订阅并消费数据。 - - -### 详细步骤 -本章节用于说明开发的核心流程,并未演示所有的参数和接口,如需了解全部功能及参数请参见: [附录](./Programming-Java-Native-API.md#附录) - - -#### 1. 创建maven项目 -创建一个maven项目,并导入以下依赖(JDK >= 1.8, Maven >= 3.6) - -```xml - - - org.apache.iotdb - iotdb-session - - ${project.version} - - -``` - -#### 2. 代码案例 - -```java -public class DataConsumerExample { - - public static void main(String[] args) throws IoTDBConnectionException, StatementExecutionException { - try (SubscriptionSession session = new SubscriptionSession("127.0.0.1", 6667)) { - // 1. open session - session.open(); - - // 2. create a topic of all data - Properties sessionConfig = new Properties(); - sessionConfig.put(TopicConstant.PATH_KEY, "root.db.**"); - - session.createTopic("allData", sessionConfig); - - // 3. show all topics - Set topics = session.getTopics(); - System.out.println(topics); - - // 4. show a specific topic - Optional allData = session.getTopic("allData"); - System.out.println(allData.get()); - } - - // 5. create a pull consumer, the subscription is automatically cancelled when the logic in the try resources is completed - Properties consumerConfig = new Properties(); - consumerConfig.put(ConsumerConstant.CONSUMER_ID_KEY, "c1"); - consumerConfig.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg1"); - try (SubscriptionPullConsumer pullConsumer = new SubscriptionPullConsumer(consumerConfig)) { - pullConsumer.open(); - pullConsumer.subscribe("allData"); - int i = 0; - while (i < 100) { - List messages = pullConsumer.poll(10000); - for (SubscriptionMessage message : messages) { - for (SubscriptionSessionDataSet dataSet : message.getSessionDataSetsHandler()) { - System.out.println(dataSet.getColumnNames()); - System.out.println(dataSet.getColumnTypes()); - while (dataSet.hasNext()) { - System.out.println(dataSet.next()); - } - } - } - i++; - } - } - - - // 6. create a push consumer, the subscription is automatically cancelled when the logic in the try resources is completed - try (SubscriptionPushConsumer consumer2 = - new SubscriptionPushConsumer.Builder().consumerId("c2").consumerGroupId("sg2") - .fileSaveDir(System.getProperty("java.io.tmpdir")) - .ackStrategy(AckStrategy.AFTER_CONSUME) - .consumeListener( - message -> { - for (SubscriptionSessionDataSet dataSet : message.getSessionDataSetsHandler()) { - System.out.println(dataSet.getColumnNames()); - System.out.println(dataSet.getColumnTypes()); - while (dataSet.hasNext()) { - System.out.println(dataSet.next()); - } - } - return ConsumeResult.SUCCESS; - }) - .buildPushConsumer()) { - consumer2.open(); - consumer2.subscribe("allData"); - } - } -} - -``` - -## 附录 -### 参数列表 -#### Session | 字段名 | 类型 | 说明 | |--------------------------------|-------------------------------|----------------------------------------------------------------------| | `nodeUrls` | `List` | 数据库节点的 URL 列表,支持多节点连接 | @@ -526,116 +396,12 @@ public class DataConsumerExample { | `enableAutoFetch` | `boolean` | 是否启用自动获取功能 | | `maxRetryCount` | `int` | 最大重试次数 | | `retryIntervalInMs` | `long` | 重试的间隔时间,单位毫秒 | -需要额外说明的参数 -nodeUrls: 多节点 URL 列表,支持自动切换到下一个可用节点。格式为 ip:port。 -queryTimeoutInMs: 如果为负数,则表示使用服务端默认配置;如果为 0,则禁用查询超时功能。 -enableRPCCompression: 启用后,RPC 数据传输将启用压缩,适用于高带宽延迟场景。 +#### 3.2 接口列表 -zoneId: 会话时区,可用值参考 Java 的 ZoneId 标准,例如 Asia/Shanghai。 - -#### SessionPool -| 字段名 | 类型 | 说明 | -|--------------------------------|-------------------------------|----------------------------------------------------------------------| -| `host` | `String` | 数据库主机地址 | -| `port` | `int` | 数据库端口 | -| `user` | `String` | 数据库用户名 | -| `password` | `String` | 数据库密码 | -| `nodeUrls` | `List` | 多节点的 URL 列表 | -| `maxSize` | `int` | 连接池的最大连接数 | -| `fetchSize` | `int` | 查询结果的默认批量返回大小 | -| `waitToGetSessionTimeoutInMs` | `long` | 获取连接的等待超时时间(毫秒) | -| `enableCompression` | `boolean` | 是否启用 RPC 压缩 | -| `enableRedirection` | `boolean` | 是否启用重定向功能 | -| `enableRecordsAutoConvertTablet` | `boolean` | 是否启用记录自动转换为 Tablet 的功能 | -| `thriftDefaultBufferSize` | `int` | Thrift 默认缓冲区大小 | -| `thriftMaxFrameSize` | `int` | Thrift 最大帧大小 | -| `queryTimeoutInMs` | `long` | 查询超时时间,单位毫秒 | -| `version` | `Version` | 客户端版本号 | -| `connectionTimeoutInMs` | `int` | 连接超时时间,单位毫秒 | -| `zoneId` | `ZoneId` | 时区设置 | -| `useSSL` | `boolean` | 是否启用 SSL | -| `trustStore` | `String` | 信任库路径 | -| `trustStorePwd` | `String` | 信任库密码 | -| `enableQueryRedirection` | `boolean` | 是否启用查询重定向功能 | -| `executorService` | `ScheduledExecutorService` | 定期更新节点列表的线程池 | -| `availableNodes` | `INodeSupplier` | 可用节点的供应器 | -| `maxRetryCount` | `int` | 最大重试次数 | -| `retryIntervalInMs` | `long` | 重试间隔时间,单位毫秒 | -| `closed` | `boolean` | 当前连接池是否已关闭 | -| `queue` | `ConcurrentLinkedDeque` | 可用会话连接的队列 | -| `occupied` | `ConcurrentMap` | 已占用的会话连接映射 | -| `deviceIdToEndpoint` | `Map` | 设备 ID 到数据库端点的映射 | -| `formattedNodeUrls` | `String` | 格式化后的节点 URL 字符串 | -需要额外说明的字段 - -nodeUrls:一个包含多个节点地址的列表,用于支持集群环境的连接。格式为 ["host1:port1", "host2:port2"]。 - -queue:保存所有可用的会话连接。当需要连接时会从队列中取出。 - -occupied:用于记录正在被占用的连接 - -#### SubscriptionConsumer - -| 参数 | 是否必填(默认值) | 参数含义 | -| :-------------------------------------------- | :--------------------------------- | :----------------------------------------------------------- | -| host | optional: 127.0.0.1 | `String`: IoTDB 中某 DataNode 的 RPC host | -| port | optional: 6667 | `Integer`: IoTDB 中某 DataNode 的 RPC port | -| node-urls | optional: 127.0.0.1:6667 | `List`: IoTDB 中所有 DataNode 的 RPC 地址,可以是多个;host:port 和 node-urls 选填一个即可。当 host:port 和 node-urls 都填写了,则取 host:port 和 node-urls 的**并集**构成新的 node-urls 应用 | -| username | optional: root | `String`: IoTDB 中 DataNode 的用户名 | -| password | optional: root | `String`: IoTDB 中 DataNode 的密码 | -| groupId | optional | `String`: consumer group id,若未指定则随机分配(新的 consumer group),保证不同的 consumer group 对应的 consumer group id 均不相同 | -| consumerId | optional | `String`: consumer client id,若未指定则随机分配,保证同一个 consumer group 中每一个 consumer client id 均不相同 | -| heartbeatIntervalMs | optional: 30000 (min: 1000) | `Long`: consumer 向 IoTDB DataNode 定期发送心跳请求的间隔 | -| endpointsSyncIntervalMs | optional: 120000 (min: 5000) | `Long`: consumer 探测 IoTDB 集群节点扩缩容情况调整订阅连接的间隔 | -| fileSaveDir | optional: Paths.get(System.getProperty("user.dir"), "iotdb-subscription").toString() | `String`: consumer 订阅出的 TsFile 文件临时存放的目录路径 | -| fileSaveFsync | optional: false | `Boolean`: consumer 订阅 TsFile 的过程中是否主动调用 fsync | - -`SubscriptionPushConsumer` 中的特殊配置: - -| 参数 | 是否必填(默认值) | 参数含义 | -| :-------------------------------------------- | :--------------------------------- | :----------------------------------------------------------- | -| ackStrategy | optional: `ACKStrategy.AFTER_CONSUME` | 消费进度的确认机制包含以下选项:`ACKStrategy.BEFORE_CONSUME`(当 consumer 收到数据时立刻提交消费进度,`onReceive` 前)`ACKStrategy.AFTER_CONSUME`(当 consumer 消费完数据再去提交消费进度,`onReceive` 后) | -| consumeListener | optional | 消费数据的回调函数,需实现 `ConsumeListener` 接口,定义消费 `SessionDataSetsHandler` 和 `TsFileHandler` 形式数据的处理逻辑 | -| autoPollIntervalMs | optional: 5000 (min: 500) | Long: consumer 自动拉取数据的时间间隔,单位为**毫秒** | -| autoPollTimeoutMs | optional: 10000 (min: 1000) | Long: consumer 每次拉取数据的超时时间,单位为**毫秒** | - -`SubscriptionPullConsumer` 中的特殊配置: - -| 参数 | 是否必填(默认值) | 参数含义 | -| :-------------------------------------------- | :--------------------------------- | :----------------------------------------------------------- | -| autoCommit | optional: true | Boolean: 是否自动提交消费进度如果此参数设置为 false,则需要调用 `commit` 方法来手动提交消费进度 | -| autoCommitInterval | optional: 5000 (min: 500) | Long: 自动提交消费进度的时间间隔,单位为**毫秒**仅当 autoCommit 参数为 true 的时候才会生效 | - - -### 函数列表 -#### 会话管理 - -| 方法名 | 功能描述 | 参数解释 | -|-----------------------------------------------------------------------------------------|--------------------------------------------|------------------------------------------------------------------------------------------------------------| -| `open()` | 打开会话 | 无参数 | -| `open(boolean enableRPCCompression)` | 打开会话并启用RPC压缩 | `enableRPCCompression`: 是否启用RPC压缩 | -| `open(boolean enableRPCCompression, int connectionTimeoutInMs)` | 打开会话并设置连接超时 | `enableRPCCompression`: 是否启用RPC压缩,`connectionTimeoutInMs`: 连接超时时间(毫秒) | -| `open(boolean enableRPCCompression, int connectionTimeoutInMs, Map deviceIdToEndpoint, INodeSupplier nodeSupplier)` | 打开会话并配置节点 | `enableRPCCompression`: 是否启用RPC压缩,`connectionTimeoutInMs`: 超时时间,`deviceIdToEndpoint`: 设备映射 | -| `close()` | 关闭会话 | 无参数 | -| `getVersion()` | 获取会话版本 | 无参数 | -| `setVersion(Version version)` | 设置会话版本 | `version`: 要设置的版本 | -| `getTimeZone()` | 获取当前时区 | 无参数 | -| `setTimeZone(String zoneId)` | 设置时区 | `zoneId`: 时区标识符(例如 `Asia/Shanghai`) | -| `setTimeZoneOfSession(String zoneId)` | 设置会话时区 | `zoneId`: 时区标识符 | -| `getFetchSize()` | 获取批量查询的记录数限制 | 无参数 | -| `setFetchSize(int fetchSize)` | 设置批量查询的记录数限制 | `fetchSize`: 每批查询返回的最大记录数 | -| `setQueryTimeout(long timeoutInMs)` | 设置查询超时时间 | `timeoutInMs`: 查询的超时时间(毫秒) | -| `getQueryTimeout()` | 获取查询超时时间 | 无参数 | -| `isEnableQueryRedirection()` | 检查是否启用查询重定向 | 无参数 | -| `setEnableQueryRedirection(boolean enableQueryRedirection)` | 设置查询重定向 | `enableQueryRedirection`: 是否启用查询重定向 | -| `isEnableRedirection()` | 检查是否启用重定向 | 无参数 | -| `setEnableRedirection(boolean enableRedirection)` | 设置重定向 | `enableRedirection`: 是否启用重定向 | - - -#### 元数据管理 +##### 3.2.1 元数据管理 | 方法名 | 功能描述 | 参数解释 | |-----------------------------------------------------------------------------------------|--------------------------------------------|------------------------------------------------------------------------------------------------------------| @@ -664,7 +430,7 @@ occupied:用于记录正在被占用的连接 | `unsetSchemaTemplate(String prefixPath, String templateName)` | 取消路径的模板设置 | `prefixPath`: 路径,`templateName`: 模板名称 | -#### 数据写入 +##### 3.2.2 数据写入 | 方法名 | 功能描述 | 参数解释 | |-----------------------------------------------------------------------------------------|--------------------------------------------|------------------------------------------------------------------------------------------------------------| | `insertRecord(String deviceId, long time, List measurements, List types, Object... values)` | 插入单条记录 | `deviceId`: 设备ID,`time`: 时间戳,`measurements`: 测点列表,`types`: 数据类型列表,`values`: 值列表 | @@ -692,7 +458,7 @@ occupied:用于记录正在被占用的连接 | `insertAlignedTablets(Map tablets)` | 批量插入多个对齐Tablet数据 | `tablets`: 设备ID到Tablet的映射表 | | `insertAlignedTablets(Map tablets, boolean sorted)` | 批量插入排序的多个对齐Tablet数据 | 同上,增加 `sorted`: 数据是否已排序 | -#### 数据删除 +##### 3.2.3 数据删除 | 方法名 | 功能描述 | 参数解释 | |-----------------------------------------------------------------------------------------|--------------------------------------------|------------------------------------------------------------------------------------------------------------| @@ -703,7 +469,7 @@ occupied:用于记录正在被占用的连接 | `deleteData(List paths, long startTime, long endTime)` | 删除路径时间范围内的历史数据 | 同上,增加 `startTime`: 起始时间戳 | -#### 数据查询 +##### 3.2.4 数据查询 | 方法名 | 功能描述 | 参数解释 | |-----------------------------------------------------------------------------------------|--------------------------------------------|------------------------------------------------------------------------------------------------------------| | `executeQueryStatement(String sql)` | 执行查询语句 | `sql`: 查询SQL语句 | @@ -720,43 +486,9 @@ occupied:用于记录正在被占用的连接 | `executeAggregationQuery(List paths, List aggregations, long startTime, long endTime, long interval, long slidingStep)` | 执行滑动窗口聚合查询 | 同上,增加 `slidingStep`: 滑动步长 | | `fetchAllConnections()` | 获取所有活动连接信息 | 无参数 | -#### 系统状态与备份 +##### 3.2.5 系统状态与备份 | 方法名 | 功能描述 | 参数解释 | |-----------------------------------------------------------------------------------------|--------------------------------------------|------------------------------------------------------------------------------------------------------------| | `getBackupConfiguration()` | 获取备份配置信息 | 无参数 | | `fetchAllConnections()` | 获取所有活动的连接信息 | 无参数 | -| `getSystemStatus()` | 获取系统状态 | 已废弃,默认返回 `SystemStatus.NORMAL` | - -#### 数据订阅 -##### SubscriptionPullConsumer - -| **函数名** | **说明** | **参数** | -|-------------------------------------|--------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| `open()` | 打开消费者连接,启动消息消费。如果 `autoCommit` 启用,会启动自动提交工作器。 | 无 | -| `close()` | 关闭消费者连接。如果 `autoCommit` 启用,会在关闭前提交所有未提交的消息。 | 无 | -| `poll(final Duration timeout)` | 拉取消息,指定超时时间。 | `timeout` : 拉取的超时时间。 | -| `poll(final long timeoutMs)` | 拉取消息,指定超时时间(毫秒)。 | `timeoutMs` : 超时时间,单位为毫秒。 | -| `poll(final Set topicNames, final Duration timeout)` | 拉取指定主题的消息,指定超时时间。 | `topicNames` : 要拉取的主题集合。`timeout`: 超时时间。 | -| `poll(final Set topicNames, final long timeoutMs)` | 拉取指定主题的消息,指定超时时间(毫秒)。 | `topicNames` : 要拉取的主题集合。`timeoutMs`: 超时时间,单位为毫秒。 | -| `commitSync(final SubscriptionMessage message)` | 同步提交单条消息。 | `message` : 需要提交的消息对象。 | -| `commitSync(final Iterable messages)` | 同步提交多条消息。 | `messages` : 需要提交的消息集合。 | -| `commitAsync(final SubscriptionMessage message)` | 异步提交单条消息。 | `message` : 需要提交的消息对象。 | -| `commitAsync(final Iterable messages)` | 异步提交多条消息。 | `messages` : 需要提交的消息集合。 | -| `commitAsync(final SubscriptionMessage message, final AsyncCommitCallback callback)` | 异步提交单条消息并指定回调函数。 | `message` : 需要提交的消息对象。`callback` : 异步提交完成后的回调函数。 | -| `commitAsync(final Iterable messages, final AsyncCommitCallback callback)` | 异步提交多条消息并指定回调函数。 | `messages` : 需要提交的消息集合。`callback` : 异步提交完成后的回调函数。 | - -##### SubscriptionPushConsumer - -| **函数名** | **说明** | **参数** | -|-------------------------------------|----------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| `open()` | 打开消费者连接,启动消息消费,提交自动轮询工作器。 | 无 | -| `close()` | 关闭消费者连接,停止消息消费。 | 无 | -| `toString()` | 返回消费者对象的核心配置信息。 | 无 | -| `coreReportMessage()` | 获取消费者核心配置的键值对表示形式。 | 无 | -| `allReportMessage()` | 获取消费者所有配置的键值对表示形式。 | 无 | -| `buildPushConsumer()` | 通过 `Builder` 构建 `SubscriptionPushConsumer` 实例。 | 无 | -| `ackStrategy(final AckStrategy ackStrategy)` | 配置消费者的消息确认策略。 | `ackStrategy`: 指定的消息确认策略。 | -| `consumeListener(final ConsumeListener consumeListener)` | 配置消费者的消息消费逻辑。 | `consumeListener`: 消费者接收消息时的处理逻辑。 | -| `autoPollIntervalMs(final long autoPollIntervalMs)` | 配置自动轮询的时间间隔。 | `autoPollIntervalMs` : 自动轮询的间隔时间,单位为毫秒。 | -| `autoPollTimeoutMs(final long autoPollTimeoutMs)` | 配置自动轮询的超时时间。 | `autoPollTimeoutMs`: 自动轮询的超时时间,单位为毫秒。 | - +| `getSystemStatus()` | 获取系统状态 | 已废弃,默认返回 `SystemStatus.NORMAL` | \ No newline at end of file From 21c2290cb613fb79c23a4f53f8caa4e20c45e148 Mon Sep 17 00:00:00 2001 From: Zhijia Cao Date: Tue, 31 Dec 2024 20:52:36 +0800 Subject: [PATCH 5/5] upgrade docs --- src/zh/UserGuide/latest/API/Programming-Data-Sync.md | 12 ++++++------ .../latest/API/Programming-Java-Native-API.md | 9 +++++---- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/src/zh/UserGuide/latest/API/Programming-Data-Sync.md b/src/zh/UserGuide/latest/API/Programming-Data-Sync.md index fcfc6e183..76eebf23c 100644 --- a/src/zh/UserGuide/latest/API/Programming-Data-Sync.md +++ b/src/zh/UserGuide/latest/API/Programming-Data-Sync.md @@ -18,7 +18,7 @@ --> -# 数据订阅编程 +# 数据订阅API IoTDB 提供了强大的数据订阅功能,允许用户通过订阅SDK实时获取IoTDB新增的数据。详细的功能定义及介绍:[数据订阅](../../User-Manual/Data-Sync_timecho.md#数据同步) ## 1 核心步骤 @@ -110,7 +110,7 @@ public class DataConsumerExample { consumerConfig.put(ConsumerConstant.CONSUME_LISTENER_KEY, TopicConstant.FORMAT_SESSION_DATA_SETS_HANDLER_VALUE); try (SubscriptionPullConsumer pullConsumer = new SubscriptionPullConsumer(consumerConfig)) { pullConsumer.open(); - pullConsumer.subscribe("topoc_all"); + pullConsumer.subscribe("topic_all"); while (true) { List messages = pullConsumer.poll(10000); for (final SubscriptionMessage message : messages) { @@ -131,9 +131,9 @@ public class DataConsumerExample { ``` -##### 场景-2:订阅新增的 Tsfile(定期数据备份的场景) +##### 场景-2:订阅新增的 TsFile(定期数据备份的场景) -前提:需要被消费的topic为TsfileHandler类型,举例:`create topic topic_all_tsfile with ('path'='root.**','format'='TsFileHandler')` +前提:需要被消费的topic的格式为TsfileHandler类型,举例:`create topic topic_all_tsfile with ('path'='root.**','format'='TsFileHandler')` ```java import java.io.IOException; @@ -154,14 +154,14 @@ public class DataConsumerExample { consumerConfig.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg1"); // 2. Specify the consumption type as the tsfile type consumerConfig.put(TopicConstant.FORMAT_KEY, TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE); - consumerConfig.put(ConsumerConstant.FILE_SAVE_DIR_KEY, "/Users/caozhijia/Desktop"); + consumerConfig.put(ConsumerConstant.FILE_SAVE_DIR_KEY, "/Users/iotdb/Downloads"); try (SubscriptionPullConsumer pullConsumer = new SubscriptionPullConsumer(consumerConfig)) { pullConsumer.open(); pullConsumer.subscribe("topic_all_tsfile"); while (true) { List messages = pullConsumer.poll(10000); for (final SubscriptionMessage message : messages) { - message.getTsFileHandler().copyFile("/Users/caozhijia/Downloads/1.tsfile"); + message.getTsFileHandler().copyFile("/Users/iotdb/Downloads/1.tsfile"); } } } diff --git a/src/zh/UserGuide/latest/API/Programming-Java-Native-API.md b/src/zh/UserGuide/latest/API/Programming-Java-Native-API.md index a77ee10ed..3e38fbda7 100644 --- a/src/zh/UserGuide/latest/API/Programming-Java-Native-API.md +++ b/src/zh/UserGuide/latest/API/Programming-Java-Native-API.md @@ -83,7 +83,6 @@ public class IoTDBSessionPoolExample { | 接口名称 | 功能描述 | |------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------------| | `insertRecords(List deviceIds, List times, List> measurementsList, List> typesList, List> valuesList)` | 插入多行数据,适用于不同测点独立采集的场景 | -| `insertAlignedRecords(List deviceIds, List times, List> measurementsList, List> typesList, List> valuesList)` | 插入多行数据,适用于不同测点同时采集的场景 | 代码案例: @@ -188,7 +187,6 @@ public class SessionPoolExample { | 接口名称 | 功能描述 | |-----------------------------------------------------------------------------------------|----------------------------| | `insertTablet(Tablet tablet)` | 插入单个设备的多行数据,适用于不同测点独立采集的场景 | -| `insertAlignedTablet(Tablet tablet)` | 插入单个设备的多行数据,适用于不同测点同时采集的场景 | 代码案例: ```java @@ -282,7 +280,10 @@ public class SessionPoolExample { } ``` -#### 2.3.2 数据查询 +#### 2.3.2 SQL操作 + +SQL操作分为查询和非查询两类操作,对应的接口为`executeQuery`和`executeNonQuery`操作,其区别为前者执行的是具体的查询语句,会返回一个结果集,后者是执行的是增、删、改操作,不返回结果集。 + ```java import java.util.ArrayList; import java.util.List; @@ -314,7 +315,7 @@ public class SessionPoolExample { private static void executeNonQueryExample() throws IoTDBConnectionException, StatementExecutionException { - // 1. create time series + // 1. create a nonAligned time series sessionPool.executeNonQueryStatement("create timeseries root.test.d1.s1 with dataType = int32"); // 2. set ttl