From 3a6fd217c166505f6b3124d244e9d6447ade893a Mon Sep 17 00:00:00 2001 From: Xinyi Zhao Date: Wed, 27 Nov 2024 17:43:09 +0800 Subject: [PATCH] move subscription api to java api doc --- .../Tree/API/Programming-Java-Native-API.md | 376 ++++++++++++++++-- .../Tree/User-Manual/Data-subscription.md | 303 +------------- .../latest/API/Programming-Java-Native-API.md | 376 ++++++++++++++++-- .../latest/User-Manual/Data-subscription.md | 303 +------------- .../Tree/API/Programming-Java-Native-API.md | 355 +++++++++++++++-- .../Tree/User-Manual/Data-subscription.md | 289 +------------- .../latest/API/Programming-Java-Native-API.md | 355 +++++++++++++++-- .../latest/User-Manual/Data-subscription.md | 289 +------------- 8 files changed, 1324 insertions(+), 1322 deletions(-) diff --git a/src/UserGuide/Master/Tree/API/Programming-Java-Native-API.md b/src/UserGuide/Master/Tree/API/Programming-Java-Native-API.md index d76f63088..3d558c097 100644 --- a/src/UserGuide/Master/Tree/API/Programming-Java-Native-API.md +++ b/src/UserGuide/Master/Tree/API/Programming-Java-Native-API.md @@ -57,7 +57,7 @@ In root directory: Here we show the commonly used interfaces and their parameters in the Native API: -### Initialization +### Session Management * Initialize a Session @@ -91,7 +91,7 @@ session = .build(); ``` -Version represents the SQL semantic version used by the client, which is used to be compatible with the SQL semantics of 0.12 when upgrading 0.13. The possible values are: `V_0_12`, `V_0_13`, `V_1_0`. +Version represents the SQL semantic version used by the client, which is used to be compatible with the SQL semantics of 0.12 when upgrading 0.13. The possible values are: `V_0_12`, `V_0_13`, `V_1_0`, etc. * Open a Session @@ -114,7 +114,33 @@ Notice: this RPC compression status of client must comply with that of IoTDB ser void close() ``` -### Data Definition Interface (DDL Interface) +* SessionPool + +We provide a connection pool (`SessionPool) for Native API. +Using the interface, you need to define the pool size. + +If you can not get a session connection in 60 seconds, there is a warning log but the program will hang. + +If a session has finished an operation, it will be put back to the pool automatically. +If a session connection is broken, the session will be removed automatically and the pool will try +to create a new session and redo the operation. +You can also specify an url list of multiple reachable nodes when creating a SessionPool, just as you would when creating a Session. To ensure high availability of clients in distributed cluster. + +For query operations: + +1. When using SessionPool to query data, the result set is `SessionDataSetWrapper`; +2. Given a `SessionDataSetWrapper`, if you have not scanned all the data in it and stop to use it, +you have to call `SessionPool.closeResultSet(wrapper)` manually; +3. When you call `hasNext()` and `next()` of a `SessionDataSetWrapper` and there is an exception, then +you have to call `SessionPool.closeResultSet(wrapper)` manually; +4. You can call `getColumnNames()` of `SessionDataSetWrapper` to get the column names of query result; + +Examples: ```session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java``` + +Or `example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java` + + +### Database & Timeseries Management API #### Database Management @@ -288,7 +314,7 @@ Attention: Unsetting the template named 'templateName' from node at path 'prefix ### Data Manipulation Interface (DML Interface) -#### Insert +### Data Insert API It is recommended to use insertTablet to help improve write efficiency. @@ -398,7 +424,7 @@ The Insert of aligned timeseries uses interfaces like insertAlignedXXX, and othe * insertAlignedTablet * insertAlignedTablets -#### Delete +### Data Delete API * Delete data before or equal to a timestamp of one or several timeseries @@ -407,7 +433,7 @@ void deleteData(String path, long time) void deleteData(List paths, long time) ``` -#### Query +### Data Query API * Time-series raw data query with time range: - The specified query time range is a left-closed right-open interval, including the start time but excluding the end time. @@ -454,20 +480,325 @@ SessionDataSet executeAggregationQuery( long slidingStep); ``` -### IoTDB-SQL Interface - * Execute query statement ``` java SessionDataSet executeQueryStatement(String sql) ``` +### Data Subscription + +#### 1 Topic Management + +The `SubscriptionSession` class in the IoTDB subscription client provides interfaces for topic management. The status changes of topics are illustrated in the diagram below: + +
+ +
+ +##### 1.1 Create Topic + +```Java + void createTopicIfNotExists(String topicName, Properties properties) throws Exception; +``` + +Example: + +```Java +try (final SubscriptionSession session = new SubscriptionSession(host, port)) { + session.open(); + final Properties config = new Properties(); + config.put(TopicConstant.PATH_KEY, "root.db.**"); + session.createTopic(topicName, config); +} +``` + +##### 1.2 Delete Topic + +```Java +void dropTopicIfExists(String topicName) throws Exception; +``` + +##### 1.3 View Topic + +```Java +// Get all topics +Set getTopics() throws Exception; + +// Get a specific topic +Optional getTopic(String topicName) throws Exception; +``` + +#### 2 Check Subscription Status +The `SubscriptionSession` class in the IoTDB subscription client provides interfaces to check the subscription status: + +```Java +Set getSubscriptions() throws Exception; +Set getSubscriptions(final String topicName) throws Exception; +``` + +#### 3 Create Consumer + +When creating a consumer using the JAVA native interface, you need to specify the parameters applied to the consumer. + +For both `SubscriptionPullConsumer` and `SubscriptionPushConsumer`, the following common configurations are available: + + +| key | **required or optional with default** | description | +| :---------------------- | :----------------------------------------------------------- | :----------------------------------------------------------- | +| host | optional: 127.0.0.1 | `String`: The RPC host of a certain DataNode in IoTDB | +| port | optional: 6667 | Integer: The RPC port of a certain DataNode in IoTDB | +| node-urls | optional: 127.0.0.1:6667 | `List`: The RPC addresses of all DataNodes in IoTDB, can be multiple; either host:port or node-urls can be filled in. If both host:port and node-urls are filled in, the union of host:port and node-urls will be used to form a new node-urls application | +| username | optional: root | `String`: The username of a DataNode in IoTDB | +| password | optional: root | `String`: The password of a DataNode in IoTDB | +| groupId | optional | `String`: consumer group id, if not specified, a new consumer group will be randomly assigned, ensuring that different consumer groups have different consumer group ids | +| consumerId | optional | `String`: consumer client id, if not specified, it will be randomly assigned, ensuring that each consumer client id in the same consumer group is unique | +| heartbeatIntervalMs | optional: 30000 (min: 1000) | `Long`: The interval at which the consumer sends heartbeat requests to the IoTDB DataNode | +| endpointsSyncIntervalMs | optional: 120000 (min: 5000) | `Long`: The interval at which the consumer detects the expansion and contraction of IoTDB cluster nodes and adjusts the subscription connection | +| fileSaveDir | optional: Paths.get(System.getProperty("user.dir"), "iotdb-subscription").toString() | `String`: The temporary directory path where the TsFile files subscribed by the consumer are stored | +| fileSaveFsync | optional: false | `Boolean`: Whether the consumer actively calls fsync during the subscription of TsFile | + + +##### 3.1 SubscriptionPushConsumer + +The following are special configurations for `SubscriptionPushConsumer`: + + +| key | **required or optional with default** | description | +| :----------------- | :------------------------------------ | :----------------------------------------------------------- | +| ackStrategy | optional: `ACKStrategy.AFTER_CONSUME` | Consumption progress confirmation mechanism includes the following options: `ACKStrategy.BEFORE_CONSUME` (submit consumption progress immediately when the consumer receives data, before `onReceive`) `ACKStrategy.AFTER_CONSUME` (submit consumption progress after the consumer has consumed the data, after `onReceive`) | +| consumeListener | optional | Consumption data callback function, need to implement the `ConsumeListener` interface, define the consumption logic of `SessionDataSetsHandler` and `TsFileHandler` form data| +| autoPollIntervalMs | optional: 5000 (min: 500) | Long: The interval at which the consumer automatically pulls data, in ms | +| autoPollTimeoutMs | optional: 10000 (min: 1000) | Long: The timeout time for the consumer to pull data each time, in ms | + +Among them, the ConsumerListener interface is defined as follows: + + +```Java +@FunctionInterface +interface ConsumeListener { + default ConsumeResult onReceive(Message message) { + return ConsumeResult.SUCCESS; + } +} + +enum ConsumeResult { + SUCCESS, + FAILURE, +} +``` + +##### 3.2 SubscriptionPullConsumer + +The following are special configurations for `SubscriptionPullConsumer` : + +| key | **required or optional with default** | description | +| :----------------- | :------------------------------------ | :----------------------------------------------------------- | +| autoCommit | optional: true | Boolean: Whether to automatically commit consumption progress. If this parameter is set to false, the commit method must be called to manually `commit` consumption progress. | +| autoCommitInterval | optional: 5000 (min: 500) | Long: The interval at which consumption progress is automatically committed, in milliseconds. This only takes effect when the autoCommit parameter is true. + | + +After creating a consumer, you need to manually call the consumer's open method: + + +```Java +void open() throws Exception; +``` + +At this point, the IoTDB subscription client will verify the correctness of the consumer's configuration. After a successful verification, the consumer will join the corresponding consumer group. That is, only after opening the consumer can you use the returned consumer object to subscribe to topics, consume data, and perform other operations. + +#### 4 Subscribe to Topics + +Both `SubscriptionPushConsumer` and `SubscriptionPullConsumer` provide the following JAVA native interfaces for subscribing to topics: + +```Java +// Subscribe to topics +void subscribe(String topic) throws Exception; +void subscribe(List topics) throws Exception; +``` + +- Before a consumer subscribes to a topic, the topic must have been created, otherwise, the subscription will fail. + +- If a consumer subscribes to a topic that it has already subscribed to, no error will occur. + +- If there are other consumers in the same consumer group that have subscribed to the same topics, the consumer will reuse the corresponding consumption progress. + + +#### 5 Consume Data + +For both push and pull mode consumers: + + +- Only after explicitly subscribing to a topic will the consumer receive data for that topic. + +- If no topics are subscribed to after creation, the consumer will not be able to consume any data, even if other consumers in the same consumer group have subscribed to some topics. + +##### 5.1 SubscriptionPushConsumer + +After `SubscriptionPushConsumer` subscribes to topics, there is no need to manually pull data. + +The data consumption logic is within the `consumeListener` configuration specified when creating `SubscriptionPushConsumer`. + +##### 5.2 SubscriptionPullConsumer + +After SubscriptionPullConsumer subscribes to topics, it needs to actively call the poll method to pull data: + +```Java +List poll(final Duration timeout) throws Exception; +List poll(final long timeoutMs) throws Exception; +List poll(final Set topicNames, final Duration timeout) throws Exception; +List poll(final Set topicNames, final long timeoutMs) throws Exception; +``` + +In the poll method, you can specify the topic names to be pulled (if not specified, it defaults to pulling all topics that the consumer has subscribed to) and the timeout period. + + +When the SubscriptionPullConsumer is configured with the autoCommit parameter set to false, it is necessary to manually call the commitSync and commitAsync methods to synchronously or asynchronously commit the consumption progress of a batch of data: + + +```Java +void commitSync(final SubscriptionMessage message) throws Exception; +void commitSync(final Iterable messages) throws Exception; + +CompletableFuture commitAsync(final SubscriptionMessage message); +CompletableFuture commitAsync(final Iterable messages); +void commitAsync(final SubscriptionMessage message, final AsyncCommitCallback callback); +void commitAsync(final Iterable messages, final AsyncCommitCallback callback); +``` + +The AsyncCommitCallback class is defined as follows: + +```Java +public interface AsyncCommitCallback { + default void onComplete() { + // Do nothing + } + + default void onFailure(final Throwable e) { + // Do nothing + } +} +``` + +#### 6 Unsubscribe + +The `SubscriptionPushConsumer` and `SubscriptionPullConsumer` provide the following JAVA native interfaces for unsubscribing and closing the consumer: + +```Java +// Unsubscribe from topics +void unsubscribe(String topic) throws Exception; +void unsubscribe(List topics) throws Exception; + +// Close consumer +void close(); +``` + +- If a consumer unsubscribes from a topic that it has not subscribed to, no error will occur. +- When a consumer is closed, it will exit the corresponding consumer group and automatically unsubscribe from all topics it is currently subscribed to. +- Once a consumer is closed, its lifecycle ends, and it cannot be reopened to subscribe to and consume data again. + + +#### 7 Code Examples + +##### 7.1 Single Pull Consumer Consuming SessionDataSetsHandler Format Data + +```Java +// Create topics +try (final SubscriptionSession session = new SubscriptionSession(HOST, PORT)) { + session.open(); + final Properties config = new Properties(); + config.put(TopicConstant.PATH_KEY, "root.db.**"); + session.createTopic(TOPIC_1, config); +} + +// Subscription: property-style ctor +final Properties config = new Properties(); +config.put(ConsumerConstant.CONSUMER_ID_KEY, "c1"); +config.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg1"); + +final SubscriptionPullConsumer consumer1 = new SubscriptionPullConsumer(config); +consumer1.open(); +consumer1.subscribe(TOPIC_1); +while (true) { + LockSupport.parkNanos(SLEEP_NS); // wait some time + final List messages = consumer1.poll(POLL_TIMEOUT_MS); + for (final SubscriptionMessage message : messages) { + for (final SubscriptionSessionDataSet dataSet : message.getSessionDataSetsHandler()) { + System.out.println(dataSet.getColumnNames()); + System.out.println(dataSet.getColumnTypes()); + while (dataSet.hasNext()) { + System.out.println(dataSet.next()); + } + } + } + // Auto commit +} + +// Show topics and subscriptions +try (final SubscriptionSession session = new SubscriptionSession(HOST, PORT)) { + session.open(); + session.getTopics().forEach((System.out::println)); + session.getSubscriptions().forEach((System.out::println)); +} + +consumer1.unsubscribe(TOPIC_1); +consumer1.close(); +``` + +##### 7.2 Multiple Push Consumers Consuming TsFileHandler Format Data + +```Java +// Create topics +try (final SubscriptionSession subscriptionSession = new SubscriptionSession(HOST, PORT)) { + subscriptionSession.open(); + final Properties config = new Properties(); + config.put(TopicConstant.FORMAT_KEY, TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE); + subscriptionSession.createTopic(TOPIC_2, config); +} + +final List threads = new ArrayList<>(); +for (int i = 0; i < 8; ++i) { + final int idx = i; + final Thread thread = + new Thread( + () -> { + // Subscription: builder-style ctor + try (final SubscriptionPushConsumer consumer2 = + new SubscriptionPushConsumer.Builder() + .consumerId("c" + idx) + .consumerGroupId("cg2") + .fileSaveDir(System.getProperty("java.io.tmpdir")) + .ackStrategy(AckStrategy.AFTER_CONSUME) + .consumeListener( + message -> { + doSomething(message.getTsFileHandler()); + return ConsumeResult.SUCCESS; + }) + .buildPushConsumer()) { + consumer2.open(); + consumer2.subscribe(TOPIC_2); + } catch (final IOException e) { + throw new RuntimeException(e); + } + }); + thread.start(); + threads.add(thread); +} + +for (final Thread thread : threads) { + thread.join(); +} +``` + +### Other Modules (Execute SQL Directly) + * Execute non query statement ``` java void executeNonQueryStatement(String sql) ``` + ### Write Test Interface (to profile network cost) These methods **don't** insert data into database and server just return after accept the request. @@ -510,31 +841,4 @@ To get more information of the following interfaces, please view session/src/mai The sample code of using these interfaces is in example/session/src/main/java/org/apache/iotdb/SessionExample.java,which provides an example of how to open an IoTDB session, execute a batch insertion. -For examples of aligned timeseries and measurement template, you can refer to `example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java` - - -## Session Pool for Native API - -We provide a connection pool (`SessionPool) for Native API. -Using the interface, you need to define the pool size. - -If you can not get a session connection in 60 seconds, there is a warning log but the program will hang. - -If a session has finished an operation, it will be put back to the pool automatically. -If a session connection is broken, the session will be removed automatically and the pool will try -to create a new session and redo the operation. -You can also specify an url list of multiple reachable nodes when creating a SessionPool, just as you would when creating a Session. To ensure high availability of clients in distributed cluster. - -For query operations: - -1. When using SessionPool to query data, the result set is `SessionDataSetWrapper`; -2. Given a `SessionDataSetWrapper`, if you have not scanned all the data in it and stop to use it, -you have to call `SessionPool.closeResultSet(wrapper)` manually; -3. When you call `hasNext()` and `next()` of a `SessionDataSetWrapper` and there is an exception, then -you have to call `SessionPool.closeResultSet(wrapper)` manually; -4. You can call `getColumnNames()` of `SessionDataSetWrapper` to get the column names of query result; - -Examples: ```session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java``` - -Or `example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java` - +For examples of aligned timeseries and measurement template, you can refer to `example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java` \ No newline at end of file diff --git a/src/UserGuide/Master/Tree/User-Manual/Data-subscription.md b/src/UserGuide/Master/Tree/User-Manual/Data-subscription.md index ce353f65c..9a5522acd 100644 --- a/src/UserGuide/Master/Tree/User-Manual/Data-subscription.md +++ b/src/UserGuide/Master/Tree/User-Manual/Data-subscription.md @@ -132,309 +132,8 @@ Result set: ## 4. API interface -In addition to SQL statements, IoTDB also supports using data subscription features through Java native interfaces([link](../API/Programming-Java-Native-API.md)). +In addition to SQL statements, IoTDB also supports using data subscription features through Java native interfaces, more details see([link](../API/Programming-Java-Native-API.md)). -### 4.1 Topic Management - -The `SubscriptionSession` class in the IoTDB subscription client provides interfaces for topic management. The status changes of topics are illustrated in the diagram below: - -
- -
- -#### 4.1.1 Create Topic - -```Java - void createTopicIfNotExists(String topicName, Properties properties) throws Exception; -``` - -Example: - -```Java -try (final SubscriptionSession session = new SubscriptionSession(host, port)) { - session.open(); - final Properties config = new Properties(); - config.put(TopicConstant.PATH_KEY, "root.db.**"); - session.createTopic(topicName, config); -} -``` - -#### 4.1.2 Delete Topic - -```Java -void dropTopicIfExists(String topicName) throws Exception; -``` - -#### 4.1.3 View Topic - -```Java -// Get all topics -Set getTopics() throws Exception; - -// Get a specific topic -Optional getTopic(String topicName) throws Exception; -``` - -### 4.2 Check Subscription Status -The `SubscriptionSession` class in the IoTDB subscription client provides interfaces to check the subscription status: - -```Java -Set getSubscriptions() throws Exception; -Set getSubscriptions(final String topicName) throws Exception; -``` - -### 4.3 Create Consumer - -When creating a consumer using the JAVA native interface, you need to specify the parameters applied to the consumer. - -For both `SubscriptionPullConsumer` and `SubscriptionPushConsumer`, the following common configurations are available: - - -| key | **required or optional with default** | description | -| :---------------------- | :----------------------------------------------------------- | :----------------------------------------------------------- | -| host | optional: 127.0.0.1 | `String`: The RPC host of a certain DataNode in IoTDB | -| port | optional: 6667 | Integer: The RPC port of a certain DataNode in IoTDB | -| node-urls | optional: 127.0.0.1:6667 | `List`: The RPC addresses of all DataNodes in IoTDB, can be multiple; either host:port or node-urls can be filled in. If both host:port and node-urls are filled in, the union of host:port and node-urls will be used to form a new node-urls application | -| username | optional: root | `String`: The username of a DataNode in IoTDB | -| password | optional: root | `String`: The password of a DataNode in IoTDB | -| groupId | optional | `String`: consumer group id, if not specified, a new consumer group will be randomly assigned, ensuring that different consumer groups have different consumer group ids | -| consumerId | optional | `String`: consumer client id, if not specified, it will be randomly assigned, ensuring that each consumer client id in the same consumer group is unique | -| heartbeatIntervalMs | optional: 30000 (min: 1000) | `Long`: The interval at which the consumer sends heartbeat requests to the IoTDB DataNode | -| endpointsSyncIntervalMs | optional: 120000 (min: 5000) | `Long`: The interval at which the consumer detects the expansion and contraction of IoTDB cluster nodes and adjusts the subscription connection | -| fileSaveDir | optional: Paths.get(System.getProperty("user.dir"), "iotdb-subscription").toString() | `String`: The temporary directory path where the TsFile files subscribed by the consumer are stored | -| fileSaveFsync | optional: false | `Boolean`: Whether the consumer actively calls fsync during the subscription of TsFile | - - -#### 4.3.1 SubscriptionPushConsumer - -The following are special configurations for `SubscriptionPushConsumer`: - - -| key | **required or optional with default** | description | -| :----------------- | :------------------------------------ | :----------------------------------------------------------- | -| ackStrategy | optional: `ACKStrategy.AFTER_CONSUME` | Consumption progress confirmation mechanism includes the following options: `ACKStrategy.BEFORE_CONSUME` (submit consumption progress immediately when the consumer receives data, before `onReceive`) `ACKStrategy.AFTER_CONSUME` (submit consumption progress after the consumer has consumed the data, after `onReceive`) | -| consumeListener | optional | Consumption data callback function, need to implement the `ConsumeListener` interface, define the consumption logic of `SessionDataSetsHandler` and `TsFileHandler` form data| -| autoPollIntervalMs | optional: 5000 (min: 500) | Long: The interval at which the consumer automatically pulls data, in ms | -| autoPollTimeoutMs | optional: 10000 (min: 1000) | Long: The timeout time for the consumer to pull data each time, in ms | - -Among them, the ConsumerListener interface is defined as follows: - - -```Java -@FunctionInterface -interface ConsumeListener { - default ConsumeResult onReceive(Message message) { - return ConsumeResult.SUCCESS; - } -} - -enum ConsumeResult { - SUCCESS, - FAILURE, -} -``` - -#### 4.3.2 SubscriptionPullConsumer - -The following are special configurations for `SubscriptionPullConsumer` : - -| key | **required or optional with default** | description | -| :----------------- | :------------------------------------ | :----------------------------------------------------------- | -| autoCommit | optional: true | Boolean: Whether to automatically commit consumption progress. If this parameter is set to false, the commit method must be called to manually `commit` consumption progress. | -| autoCommitInterval | optional: 5000 (min: 500) | Long: The interval at which consumption progress is automatically committed, in milliseconds. This only takes effect when the autoCommit parameter is true. - | - -After creating a consumer, you need to manually call the consumer's open method: - - -```Java -void open() throws Exception; -``` - -At this point, the IoTDB subscription client will verify the correctness of the consumer's configuration. After a successful verification, the consumer will join the corresponding consumer group. That is, only after opening the consumer can you use the returned consumer object to subscribe to topics, consume data, and perform other operations. - -### 4.4 Subscribe to Topics - -Both `SubscriptionPushConsumer` and `SubscriptionPullConsumer` provide the following JAVA native interfaces for subscribing to topics: - -```Java -// Subscribe to topics -void subscribe(String topic) throws Exception; -void subscribe(List topics) throws Exception; -``` - -- Before a consumer subscribes to a topic, the topic must have been created, otherwise, the subscription will fail. - -- If a consumer subscribes to a topic that it has already subscribed to, no error will occur. - -- If there are other consumers in the same consumer group that have subscribed to the same topics, the consumer will reuse the corresponding consumption progress. - - -### 4.5 Consume Data - -For both push and pull mode consumers: - - -- Only after explicitly subscribing to a topic will the consumer receive data for that topic. - -- If no topics are subscribed to after creation, the consumer will not be able to consume any data, even if other consumers in the same consumer group have subscribed to some topics. - -#### 4.5.1 SubscriptionPushConsumer - -After `SubscriptionPushConsumer` subscribes to topics, there is no need to manually pull data. - -The data consumption logic is within the `consumeListener` configuration specified when creating `SubscriptionPushConsumer`. - -#### 4.5.2 SubscriptionPullConsumer - -After SubscriptionPullConsumer subscribes to topics, it needs to actively call the poll method to pull data: - -```Java -List poll(final Duration timeout) throws Exception; -List poll(final long timeoutMs) throws Exception; -List poll(final Set topicNames, final Duration timeout) throws Exception; -List poll(final Set topicNames, final long timeoutMs) throws Exception; -``` - -In the poll method, you can specify the topic names to be pulled (if not specified, it defaults to pulling all topics that the consumer has subscribed to) and the timeout period. - - -When the SubscriptionPullConsumer is configured with the autoCommit parameter set to false, it is necessary to manually call the commitSync and commitAsync methods to synchronously or asynchronously commit the consumption progress of a batch of data: - - -```Java -void commitSync(final SubscriptionMessage message) throws Exception; -void commitSync(final Iterable messages) throws Exception; - -CompletableFuture commitAsync(final SubscriptionMessage message); -CompletableFuture commitAsync(final Iterable messages); -void commitAsync(final SubscriptionMessage message, final AsyncCommitCallback callback); -void commitAsync(final Iterable messages, final AsyncCommitCallback callback); -``` - -The AsyncCommitCallback class is defined as follows: - -```Java -public interface AsyncCommitCallback { - default void onComplete() { - // Do nothing - } - - default void onFailure(final Throwable e) { - // Do nothing - } -} -``` - -### 4.6 Unsubscribe - -The `SubscriptionPushConsumer` and `SubscriptionPullConsumer` provide the following JAVA native interfaces for unsubscribing and closing the consumer: - -```Java -// Unsubscribe from topics -void unsubscribe(String topic) throws Exception; -void unsubscribe(List topics) throws Exception; - -// Close consumer -void close(); -``` - -- If a consumer unsubscribes from a topic that it has not subscribed to, no error will occur. -- When a consumer is closed, it will exit the corresponding consumer group and automatically unsubscribe from all topics it is currently subscribed to. -- Once a consumer is closed, its lifecycle ends, and it cannot be reopened to subscribe to and consume data again. - - -### 4.7 Code Examples - -#### 4.7.1 Single Pull Consumer Consuming SessionDataSetsHandler Format Data - -```Java -// Create topics -try (final SubscriptionSession session = new SubscriptionSession(HOST, PORT)) { - session.open(); - final Properties config = new Properties(); - config.put(TopicConstant.PATH_KEY, "root.db.**"); - session.createTopic(TOPIC_1, config); -} - -// Subscription: property-style ctor -final Properties config = new Properties(); -config.put(ConsumerConstant.CONSUMER_ID_KEY, "c1"); -config.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg1"); - -final SubscriptionPullConsumer consumer1 = new SubscriptionPullConsumer(config); -consumer1.open(); -consumer1.subscribe(TOPIC_1); -while (true) { - LockSupport.parkNanos(SLEEP_NS); // wait some time - final List messages = consumer1.poll(POLL_TIMEOUT_MS); - for (final SubscriptionMessage message : messages) { - for (final SubscriptionSessionDataSet dataSet : message.getSessionDataSetsHandler()) { - System.out.println(dataSet.getColumnNames()); - System.out.println(dataSet.getColumnTypes()); - while (dataSet.hasNext()) { - System.out.println(dataSet.next()); - } - } - } - // Auto commit -} - -// Show topics and subscriptions -try (final SubscriptionSession session = new SubscriptionSession(HOST, PORT)) { - session.open(); - session.getTopics().forEach((System.out::println)); - session.getSubscriptions().forEach((System.out::println)); -} - -consumer1.unsubscribe(TOPIC_1); -consumer1.close(); -``` - -#### 4.7.2 Multiple Push Consumers Consuming TsFileHandler Format Data - -```Java -// Create topics -try (final SubscriptionSession subscriptionSession = new SubscriptionSession(HOST, PORT)) { - subscriptionSession.open(); - final Properties config = new Properties(); - config.put(TopicConstant.FORMAT_KEY, TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE); - subscriptionSession.createTopic(TOPIC_2, config); -} - -final List threads = new ArrayList<>(); -for (int i = 0; i < 8; ++i) { - final int idx = i; - final Thread thread = - new Thread( - () -> { - // Subscription: builder-style ctor - try (final SubscriptionPushConsumer consumer2 = - new SubscriptionPushConsumer.Builder() - .consumerId("c" + idx) - .consumerGroupId("cg2") - .fileSaveDir(System.getProperty("java.io.tmpdir")) - .ackStrategy(AckStrategy.AFTER_CONSUME) - .consumeListener( - message -> { - doSomething(message.getTsFileHandler()); - return ConsumeResult.SUCCESS; - }) - .buildPushConsumer()) { - consumer2.open(); - consumer2.subscribe(TOPIC_2); - } catch (final IOException e) { - throw new RuntimeException(e); - } - }); - thread.start(); - threads.add(thread); -} - -for (final Thread thread : threads) { - thread.join(); -} -``` ## 5. Frequently Asked Questions diff --git a/src/UserGuide/latest/API/Programming-Java-Native-API.md b/src/UserGuide/latest/API/Programming-Java-Native-API.md index d76f63088..3d558c097 100644 --- a/src/UserGuide/latest/API/Programming-Java-Native-API.md +++ b/src/UserGuide/latest/API/Programming-Java-Native-API.md @@ -57,7 +57,7 @@ In root directory: Here we show the commonly used interfaces and their parameters in the Native API: -### Initialization +### Session Management * Initialize a Session @@ -91,7 +91,7 @@ session = .build(); ``` -Version represents the SQL semantic version used by the client, which is used to be compatible with the SQL semantics of 0.12 when upgrading 0.13. The possible values are: `V_0_12`, `V_0_13`, `V_1_0`. +Version represents the SQL semantic version used by the client, which is used to be compatible with the SQL semantics of 0.12 when upgrading 0.13. The possible values are: `V_0_12`, `V_0_13`, `V_1_0`, etc. * Open a Session @@ -114,7 +114,33 @@ Notice: this RPC compression status of client must comply with that of IoTDB ser void close() ``` -### Data Definition Interface (DDL Interface) +* SessionPool + +We provide a connection pool (`SessionPool) for Native API. +Using the interface, you need to define the pool size. + +If you can not get a session connection in 60 seconds, there is a warning log but the program will hang. + +If a session has finished an operation, it will be put back to the pool automatically. +If a session connection is broken, the session will be removed automatically and the pool will try +to create a new session and redo the operation. +You can also specify an url list of multiple reachable nodes when creating a SessionPool, just as you would when creating a Session. To ensure high availability of clients in distributed cluster. + +For query operations: + +1. When using SessionPool to query data, the result set is `SessionDataSetWrapper`; +2. Given a `SessionDataSetWrapper`, if you have not scanned all the data in it and stop to use it, +you have to call `SessionPool.closeResultSet(wrapper)` manually; +3. When you call `hasNext()` and `next()` of a `SessionDataSetWrapper` and there is an exception, then +you have to call `SessionPool.closeResultSet(wrapper)` manually; +4. You can call `getColumnNames()` of `SessionDataSetWrapper` to get the column names of query result; + +Examples: ```session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java``` + +Or `example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java` + + +### Database & Timeseries Management API #### Database Management @@ -288,7 +314,7 @@ Attention: Unsetting the template named 'templateName' from node at path 'prefix ### Data Manipulation Interface (DML Interface) -#### Insert +### Data Insert API It is recommended to use insertTablet to help improve write efficiency. @@ -398,7 +424,7 @@ The Insert of aligned timeseries uses interfaces like insertAlignedXXX, and othe * insertAlignedTablet * insertAlignedTablets -#### Delete +### Data Delete API * Delete data before or equal to a timestamp of one or several timeseries @@ -407,7 +433,7 @@ void deleteData(String path, long time) void deleteData(List paths, long time) ``` -#### Query +### Data Query API * Time-series raw data query with time range: - The specified query time range is a left-closed right-open interval, including the start time but excluding the end time. @@ -454,20 +480,325 @@ SessionDataSet executeAggregationQuery( long slidingStep); ``` -### IoTDB-SQL Interface - * Execute query statement ``` java SessionDataSet executeQueryStatement(String sql) ``` +### Data Subscription + +#### 1 Topic Management + +The `SubscriptionSession` class in the IoTDB subscription client provides interfaces for topic management. The status changes of topics are illustrated in the diagram below: + +
+ +
+ +##### 1.1 Create Topic + +```Java + void createTopicIfNotExists(String topicName, Properties properties) throws Exception; +``` + +Example: + +```Java +try (final SubscriptionSession session = new SubscriptionSession(host, port)) { + session.open(); + final Properties config = new Properties(); + config.put(TopicConstant.PATH_KEY, "root.db.**"); + session.createTopic(topicName, config); +} +``` + +##### 1.2 Delete Topic + +```Java +void dropTopicIfExists(String topicName) throws Exception; +``` + +##### 1.3 View Topic + +```Java +// Get all topics +Set getTopics() throws Exception; + +// Get a specific topic +Optional getTopic(String topicName) throws Exception; +``` + +#### 2 Check Subscription Status +The `SubscriptionSession` class in the IoTDB subscription client provides interfaces to check the subscription status: + +```Java +Set getSubscriptions() throws Exception; +Set getSubscriptions(final String topicName) throws Exception; +``` + +#### 3 Create Consumer + +When creating a consumer using the JAVA native interface, you need to specify the parameters applied to the consumer. + +For both `SubscriptionPullConsumer` and `SubscriptionPushConsumer`, the following common configurations are available: + + +| key | **required or optional with default** | description | +| :---------------------- | :----------------------------------------------------------- | :----------------------------------------------------------- | +| host | optional: 127.0.0.1 | `String`: The RPC host of a certain DataNode in IoTDB | +| port | optional: 6667 | Integer: The RPC port of a certain DataNode in IoTDB | +| node-urls | optional: 127.0.0.1:6667 | `List`: The RPC addresses of all DataNodes in IoTDB, can be multiple; either host:port or node-urls can be filled in. If both host:port and node-urls are filled in, the union of host:port and node-urls will be used to form a new node-urls application | +| username | optional: root | `String`: The username of a DataNode in IoTDB | +| password | optional: root | `String`: The password of a DataNode in IoTDB | +| groupId | optional | `String`: consumer group id, if not specified, a new consumer group will be randomly assigned, ensuring that different consumer groups have different consumer group ids | +| consumerId | optional | `String`: consumer client id, if not specified, it will be randomly assigned, ensuring that each consumer client id in the same consumer group is unique | +| heartbeatIntervalMs | optional: 30000 (min: 1000) | `Long`: The interval at which the consumer sends heartbeat requests to the IoTDB DataNode | +| endpointsSyncIntervalMs | optional: 120000 (min: 5000) | `Long`: The interval at which the consumer detects the expansion and contraction of IoTDB cluster nodes and adjusts the subscription connection | +| fileSaveDir | optional: Paths.get(System.getProperty("user.dir"), "iotdb-subscription").toString() | `String`: The temporary directory path where the TsFile files subscribed by the consumer are stored | +| fileSaveFsync | optional: false | `Boolean`: Whether the consumer actively calls fsync during the subscription of TsFile | + + +##### 3.1 SubscriptionPushConsumer + +The following are special configurations for `SubscriptionPushConsumer`: + + +| key | **required or optional with default** | description | +| :----------------- | :------------------------------------ | :----------------------------------------------------------- | +| ackStrategy | optional: `ACKStrategy.AFTER_CONSUME` | Consumption progress confirmation mechanism includes the following options: `ACKStrategy.BEFORE_CONSUME` (submit consumption progress immediately when the consumer receives data, before `onReceive`) `ACKStrategy.AFTER_CONSUME` (submit consumption progress after the consumer has consumed the data, after `onReceive`) | +| consumeListener | optional | Consumption data callback function, need to implement the `ConsumeListener` interface, define the consumption logic of `SessionDataSetsHandler` and `TsFileHandler` form data| +| autoPollIntervalMs | optional: 5000 (min: 500) | Long: The interval at which the consumer automatically pulls data, in ms | +| autoPollTimeoutMs | optional: 10000 (min: 1000) | Long: The timeout time for the consumer to pull data each time, in ms | + +Among them, the ConsumerListener interface is defined as follows: + + +```Java +@FunctionInterface +interface ConsumeListener { + default ConsumeResult onReceive(Message message) { + return ConsumeResult.SUCCESS; + } +} + +enum ConsumeResult { + SUCCESS, + FAILURE, +} +``` + +##### 3.2 SubscriptionPullConsumer + +The following are special configurations for `SubscriptionPullConsumer` : + +| key | **required or optional with default** | description | +| :----------------- | :------------------------------------ | :----------------------------------------------------------- | +| autoCommit | optional: true | Boolean: Whether to automatically commit consumption progress. If this parameter is set to false, the commit method must be called to manually `commit` consumption progress. | +| autoCommitInterval | optional: 5000 (min: 500) | Long: The interval at which consumption progress is automatically committed, in milliseconds. This only takes effect when the autoCommit parameter is true. + | + +After creating a consumer, you need to manually call the consumer's open method: + + +```Java +void open() throws Exception; +``` + +At this point, the IoTDB subscription client will verify the correctness of the consumer's configuration. After a successful verification, the consumer will join the corresponding consumer group. That is, only after opening the consumer can you use the returned consumer object to subscribe to topics, consume data, and perform other operations. + +#### 4 Subscribe to Topics + +Both `SubscriptionPushConsumer` and `SubscriptionPullConsumer` provide the following JAVA native interfaces for subscribing to topics: + +```Java +// Subscribe to topics +void subscribe(String topic) throws Exception; +void subscribe(List topics) throws Exception; +``` + +- Before a consumer subscribes to a topic, the topic must have been created, otherwise, the subscription will fail. + +- If a consumer subscribes to a topic that it has already subscribed to, no error will occur. + +- If there are other consumers in the same consumer group that have subscribed to the same topics, the consumer will reuse the corresponding consumption progress. + + +#### 5 Consume Data + +For both push and pull mode consumers: + + +- Only after explicitly subscribing to a topic will the consumer receive data for that topic. + +- If no topics are subscribed to after creation, the consumer will not be able to consume any data, even if other consumers in the same consumer group have subscribed to some topics. + +##### 5.1 SubscriptionPushConsumer + +After `SubscriptionPushConsumer` subscribes to topics, there is no need to manually pull data. + +The data consumption logic is within the `consumeListener` configuration specified when creating `SubscriptionPushConsumer`. + +##### 5.2 SubscriptionPullConsumer + +After SubscriptionPullConsumer subscribes to topics, it needs to actively call the poll method to pull data: + +```Java +List poll(final Duration timeout) throws Exception; +List poll(final long timeoutMs) throws Exception; +List poll(final Set topicNames, final Duration timeout) throws Exception; +List poll(final Set topicNames, final long timeoutMs) throws Exception; +``` + +In the poll method, you can specify the topic names to be pulled (if not specified, it defaults to pulling all topics that the consumer has subscribed to) and the timeout period. + + +When the SubscriptionPullConsumer is configured with the autoCommit parameter set to false, it is necessary to manually call the commitSync and commitAsync methods to synchronously or asynchronously commit the consumption progress of a batch of data: + + +```Java +void commitSync(final SubscriptionMessage message) throws Exception; +void commitSync(final Iterable messages) throws Exception; + +CompletableFuture commitAsync(final SubscriptionMessage message); +CompletableFuture commitAsync(final Iterable messages); +void commitAsync(final SubscriptionMessage message, final AsyncCommitCallback callback); +void commitAsync(final Iterable messages, final AsyncCommitCallback callback); +``` + +The AsyncCommitCallback class is defined as follows: + +```Java +public interface AsyncCommitCallback { + default void onComplete() { + // Do nothing + } + + default void onFailure(final Throwable e) { + // Do nothing + } +} +``` + +#### 6 Unsubscribe + +The `SubscriptionPushConsumer` and `SubscriptionPullConsumer` provide the following JAVA native interfaces for unsubscribing and closing the consumer: + +```Java +// Unsubscribe from topics +void unsubscribe(String topic) throws Exception; +void unsubscribe(List topics) throws Exception; + +// Close consumer +void close(); +``` + +- If a consumer unsubscribes from a topic that it has not subscribed to, no error will occur. +- When a consumer is closed, it will exit the corresponding consumer group and automatically unsubscribe from all topics it is currently subscribed to. +- Once a consumer is closed, its lifecycle ends, and it cannot be reopened to subscribe to and consume data again. + + +#### 7 Code Examples + +##### 7.1 Single Pull Consumer Consuming SessionDataSetsHandler Format Data + +```Java +// Create topics +try (final SubscriptionSession session = new SubscriptionSession(HOST, PORT)) { + session.open(); + final Properties config = new Properties(); + config.put(TopicConstant.PATH_KEY, "root.db.**"); + session.createTopic(TOPIC_1, config); +} + +// Subscription: property-style ctor +final Properties config = new Properties(); +config.put(ConsumerConstant.CONSUMER_ID_KEY, "c1"); +config.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg1"); + +final SubscriptionPullConsumer consumer1 = new SubscriptionPullConsumer(config); +consumer1.open(); +consumer1.subscribe(TOPIC_1); +while (true) { + LockSupport.parkNanos(SLEEP_NS); // wait some time + final List messages = consumer1.poll(POLL_TIMEOUT_MS); + for (final SubscriptionMessage message : messages) { + for (final SubscriptionSessionDataSet dataSet : message.getSessionDataSetsHandler()) { + System.out.println(dataSet.getColumnNames()); + System.out.println(dataSet.getColumnTypes()); + while (dataSet.hasNext()) { + System.out.println(dataSet.next()); + } + } + } + // Auto commit +} + +// Show topics and subscriptions +try (final SubscriptionSession session = new SubscriptionSession(HOST, PORT)) { + session.open(); + session.getTopics().forEach((System.out::println)); + session.getSubscriptions().forEach((System.out::println)); +} + +consumer1.unsubscribe(TOPIC_1); +consumer1.close(); +``` + +##### 7.2 Multiple Push Consumers Consuming TsFileHandler Format Data + +```Java +// Create topics +try (final SubscriptionSession subscriptionSession = new SubscriptionSession(HOST, PORT)) { + subscriptionSession.open(); + final Properties config = new Properties(); + config.put(TopicConstant.FORMAT_KEY, TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE); + subscriptionSession.createTopic(TOPIC_2, config); +} + +final List threads = new ArrayList<>(); +for (int i = 0; i < 8; ++i) { + final int idx = i; + final Thread thread = + new Thread( + () -> { + // Subscription: builder-style ctor + try (final SubscriptionPushConsumer consumer2 = + new SubscriptionPushConsumer.Builder() + .consumerId("c" + idx) + .consumerGroupId("cg2") + .fileSaveDir(System.getProperty("java.io.tmpdir")) + .ackStrategy(AckStrategy.AFTER_CONSUME) + .consumeListener( + message -> { + doSomething(message.getTsFileHandler()); + return ConsumeResult.SUCCESS; + }) + .buildPushConsumer()) { + consumer2.open(); + consumer2.subscribe(TOPIC_2); + } catch (final IOException e) { + throw new RuntimeException(e); + } + }); + thread.start(); + threads.add(thread); +} + +for (final Thread thread : threads) { + thread.join(); +} +``` + +### Other Modules (Execute SQL Directly) + * Execute non query statement ``` java void executeNonQueryStatement(String sql) ``` + ### Write Test Interface (to profile network cost) These methods **don't** insert data into database and server just return after accept the request. @@ -510,31 +841,4 @@ To get more information of the following interfaces, please view session/src/mai The sample code of using these interfaces is in example/session/src/main/java/org/apache/iotdb/SessionExample.java,which provides an example of how to open an IoTDB session, execute a batch insertion. -For examples of aligned timeseries and measurement template, you can refer to `example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java` - - -## Session Pool for Native API - -We provide a connection pool (`SessionPool) for Native API. -Using the interface, you need to define the pool size. - -If you can not get a session connection in 60 seconds, there is a warning log but the program will hang. - -If a session has finished an operation, it will be put back to the pool automatically. -If a session connection is broken, the session will be removed automatically and the pool will try -to create a new session and redo the operation. -You can also specify an url list of multiple reachable nodes when creating a SessionPool, just as you would when creating a Session. To ensure high availability of clients in distributed cluster. - -For query operations: - -1. When using SessionPool to query data, the result set is `SessionDataSetWrapper`; -2. Given a `SessionDataSetWrapper`, if you have not scanned all the data in it and stop to use it, -you have to call `SessionPool.closeResultSet(wrapper)` manually; -3. When you call `hasNext()` and `next()` of a `SessionDataSetWrapper` and there is an exception, then -you have to call `SessionPool.closeResultSet(wrapper)` manually; -4. You can call `getColumnNames()` of `SessionDataSetWrapper` to get the column names of query result; - -Examples: ```session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java``` - -Or `example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java` - +For examples of aligned timeseries and measurement template, you can refer to `example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java` \ No newline at end of file diff --git a/src/UserGuide/latest/User-Manual/Data-subscription.md b/src/UserGuide/latest/User-Manual/Data-subscription.md index ce353f65c..9a5522acd 100644 --- a/src/UserGuide/latest/User-Manual/Data-subscription.md +++ b/src/UserGuide/latest/User-Manual/Data-subscription.md @@ -132,309 +132,8 @@ Result set: ## 4. API interface -In addition to SQL statements, IoTDB also supports using data subscription features through Java native interfaces([link](../API/Programming-Java-Native-API.md)). +In addition to SQL statements, IoTDB also supports using data subscription features through Java native interfaces, more details see([link](../API/Programming-Java-Native-API.md)). -### 4.1 Topic Management - -The `SubscriptionSession` class in the IoTDB subscription client provides interfaces for topic management. The status changes of topics are illustrated in the diagram below: - -
- -
- -#### 4.1.1 Create Topic - -```Java - void createTopicIfNotExists(String topicName, Properties properties) throws Exception; -``` - -Example: - -```Java -try (final SubscriptionSession session = new SubscriptionSession(host, port)) { - session.open(); - final Properties config = new Properties(); - config.put(TopicConstant.PATH_KEY, "root.db.**"); - session.createTopic(topicName, config); -} -``` - -#### 4.1.2 Delete Topic - -```Java -void dropTopicIfExists(String topicName) throws Exception; -``` - -#### 4.1.3 View Topic - -```Java -// Get all topics -Set getTopics() throws Exception; - -// Get a specific topic -Optional getTopic(String topicName) throws Exception; -``` - -### 4.2 Check Subscription Status -The `SubscriptionSession` class in the IoTDB subscription client provides interfaces to check the subscription status: - -```Java -Set getSubscriptions() throws Exception; -Set getSubscriptions(final String topicName) throws Exception; -``` - -### 4.3 Create Consumer - -When creating a consumer using the JAVA native interface, you need to specify the parameters applied to the consumer. - -For both `SubscriptionPullConsumer` and `SubscriptionPushConsumer`, the following common configurations are available: - - -| key | **required or optional with default** | description | -| :---------------------- | :----------------------------------------------------------- | :----------------------------------------------------------- | -| host | optional: 127.0.0.1 | `String`: The RPC host of a certain DataNode in IoTDB | -| port | optional: 6667 | Integer: The RPC port of a certain DataNode in IoTDB | -| node-urls | optional: 127.0.0.1:6667 | `List`: The RPC addresses of all DataNodes in IoTDB, can be multiple; either host:port or node-urls can be filled in. If both host:port and node-urls are filled in, the union of host:port and node-urls will be used to form a new node-urls application | -| username | optional: root | `String`: The username of a DataNode in IoTDB | -| password | optional: root | `String`: The password of a DataNode in IoTDB | -| groupId | optional | `String`: consumer group id, if not specified, a new consumer group will be randomly assigned, ensuring that different consumer groups have different consumer group ids | -| consumerId | optional | `String`: consumer client id, if not specified, it will be randomly assigned, ensuring that each consumer client id in the same consumer group is unique | -| heartbeatIntervalMs | optional: 30000 (min: 1000) | `Long`: The interval at which the consumer sends heartbeat requests to the IoTDB DataNode | -| endpointsSyncIntervalMs | optional: 120000 (min: 5000) | `Long`: The interval at which the consumer detects the expansion and contraction of IoTDB cluster nodes and adjusts the subscription connection | -| fileSaveDir | optional: Paths.get(System.getProperty("user.dir"), "iotdb-subscription").toString() | `String`: The temporary directory path where the TsFile files subscribed by the consumer are stored | -| fileSaveFsync | optional: false | `Boolean`: Whether the consumer actively calls fsync during the subscription of TsFile | - - -#### 4.3.1 SubscriptionPushConsumer - -The following are special configurations for `SubscriptionPushConsumer`: - - -| key | **required or optional with default** | description | -| :----------------- | :------------------------------------ | :----------------------------------------------------------- | -| ackStrategy | optional: `ACKStrategy.AFTER_CONSUME` | Consumption progress confirmation mechanism includes the following options: `ACKStrategy.BEFORE_CONSUME` (submit consumption progress immediately when the consumer receives data, before `onReceive`) `ACKStrategy.AFTER_CONSUME` (submit consumption progress after the consumer has consumed the data, after `onReceive`) | -| consumeListener | optional | Consumption data callback function, need to implement the `ConsumeListener` interface, define the consumption logic of `SessionDataSetsHandler` and `TsFileHandler` form data| -| autoPollIntervalMs | optional: 5000 (min: 500) | Long: The interval at which the consumer automatically pulls data, in ms | -| autoPollTimeoutMs | optional: 10000 (min: 1000) | Long: The timeout time for the consumer to pull data each time, in ms | - -Among them, the ConsumerListener interface is defined as follows: - - -```Java -@FunctionInterface -interface ConsumeListener { - default ConsumeResult onReceive(Message message) { - return ConsumeResult.SUCCESS; - } -} - -enum ConsumeResult { - SUCCESS, - FAILURE, -} -``` - -#### 4.3.2 SubscriptionPullConsumer - -The following are special configurations for `SubscriptionPullConsumer` : - -| key | **required or optional with default** | description | -| :----------------- | :------------------------------------ | :----------------------------------------------------------- | -| autoCommit | optional: true | Boolean: Whether to automatically commit consumption progress. If this parameter is set to false, the commit method must be called to manually `commit` consumption progress. | -| autoCommitInterval | optional: 5000 (min: 500) | Long: The interval at which consumption progress is automatically committed, in milliseconds. This only takes effect when the autoCommit parameter is true. - | - -After creating a consumer, you need to manually call the consumer's open method: - - -```Java -void open() throws Exception; -``` - -At this point, the IoTDB subscription client will verify the correctness of the consumer's configuration. After a successful verification, the consumer will join the corresponding consumer group. That is, only after opening the consumer can you use the returned consumer object to subscribe to topics, consume data, and perform other operations. - -### 4.4 Subscribe to Topics - -Both `SubscriptionPushConsumer` and `SubscriptionPullConsumer` provide the following JAVA native interfaces for subscribing to topics: - -```Java -// Subscribe to topics -void subscribe(String topic) throws Exception; -void subscribe(List topics) throws Exception; -``` - -- Before a consumer subscribes to a topic, the topic must have been created, otherwise, the subscription will fail. - -- If a consumer subscribes to a topic that it has already subscribed to, no error will occur. - -- If there are other consumers in the same consumer group that have subscribed to the same topics, the consumer will reuse the corresponding consumption progress. - - -### 4.5 Consume Data - -For both push and pull mode consumers: - - -- Only after explicitly subscribing to a topic will the consumer receive data for that topic. - -- If no topics are subscribed to after creation, the consumer will not be able to consume any data, even if other consumers in the same consumer group have subscribed to some topics. - -#### 4.5.1 SubscriptionPushConsumer - -After `SubscriptionPushConsumer` subscribes to topics, there is no need to manually pull data. - -The data consumption logic is within the `consumeListener` configuration specified when creating `SubscriptionPushConsumer`. - -#### 4.5.2 SubscriptionPullConsumer - -After SubscriptionPullConsumer subscribes to topics, it needs to actively call the poll method to pull data: - -```Java -List poll(final Duration timeout) throws Exception; -List poll(final long timeoutMs) throws Exception; -List poll(final Set topicNames, final Duration timeout) throws Exception; -List poll(final Set topicNames, final long timeoutMs) throws Exception; -``` - -In the poll method, you can specify the topic names to be pulled (if not specified, it defaults to pulling all topics that the consumer has subscribed to) and the timeout period. - - -When the SubscriptionPullConsumer is configured with the autoCommit parameter set to false, it is necessary to manually call the commitSync and commitAsync methods to synchronously or asynchronously commit the consumption progress of a batch of data: - - -```Java -void commitSync(final SubscriptionMessage message) throws Exception; -void commitSync(final Iterable messages) throws Exception; - -CompletableFuture commitAsync(final SubscriptionMessage message); -CompletableFuture commitAsync(final Iterable messages); -void commitAsync(final SubscriptionMessage message, final AsyncCommitCallback callback); -void commitAsync(final Iterable messages, final AsyncCommitCallback callback); -``` - -The AsyncCommitCallback class is defined as follows: - -```Java -public interface AsyncCommitCallback { - default void onComplete() { - // Do nothing - } - - default void onFailure(final Throwable e) { - // Do nothing - } -} -``` - -### 4.6 Unsubscribe - -The `SubscriptionPushConsumer` and `SubscriptionPullConsumer` provide the following JAVA native interfaces for unsubscribing and closing the consumer: - -```Java -// Unsubscribe from topics -void unsubscribe(String topic) throws Exception; -void unsubscribe(List topics) throws Exception; - -// Close consumer -void close(); -``` - -- If a consumer unsubscribes from a topic that it has not subscribed to, no error will occur. -- When a consumer is closed, it will exit the corresponding consumer group and automatically unsubscribe from all topics it is currently subscribed to. -- Once a consumer is closed, its lifecycle ends, and it cannot be reopened to subscribe to and consume data again. - - -### 4.7 Code Examples - -#### 4.7.1 Single Pull Consumer Consuming SessionDataSetsHandler Format Data - -```Java -// Create topics -try (final SubscriptionSession session = new SubscriptionSession(HOST, PORT)) { - session.open(); - final Properties config = new Properties(); - config.put(TopicConstant.PATH_KEY, "root.db.**"); - session.createTopic(TOPIC_1, config); -} - -// Subscription: property-style ctor -final Properties config = new Properties(); -config.put(ConsumerConstant.CONSUMER_ID_KEY, "c1"); -config.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg1"); - -final SubscriptionPullConsumer consumer1 = new SubscriptionPullConsumer(config); -consumer1.open(); -consumer1.subscribe(TOPIC_1); -while (true) { - LockSupport.parkNanos(SLEEP_NS); // wait some time - final List messages = consumer1.poll(POLL_TIMEOUT_MS); - for (final SubscriptionMessage message : messages) { - for (final SubscriptionSessionDataSet dataSet : message.getSessionDataSetsHandler()) { - System.out.println(dataSet.getColumnNames()); - System.out.println(dataSet.getColumnTypes()); - while (dataSet.hasNext()) { - System.out.println(dataSet.next()); - } - } - } - // Auto commit -} - -// Show topics and subscriptions -try (final SubscriptionSession session = new SubscriptionSession(HOST, PORT)) { - session.open(); - session.getTopics().forEach((System.out::println)); - session.getSubscriptions().forEach((System.out::println)); -} - -consumer1.unsubscribe(TOPIC_1); -consumer1.close(); -``` - -#### 4.7.2 Multiple Push Consumers Consuming TsFileHandler Format Data - -```Java -// Create topics -try (final SubscriptionSession subscriptionSession = new SubscriptionSession(HOST, PORT)) { - subscriptionSession.open(); - final Properties config = new Properties(); - config.put(TopicConstant.FORMAT_KEY, TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE); - subscriptionSession.createTopic(TOPIC_2, config); -} - -final List threads = new ArrayList<>(); -for (int i = 0; i < 8; ++i) { - final int idx = i; - final Thread thread = - new Thread( - () -> { - // Subscription: builder-style ctor - try (final SubscriptionPushConsumer consumer2 = - new SubscriptionPushConsumer.Builder() - .consumerId("c" + idx) - .consumerGroupId("cg2") - .fileSaveDir(System.getProperty("java.io.tmpdir")) - .ackStrategy(AckStrategy.AFTER_CONSUME) - .consumeListener( - message -> { - doSomething(message.getTsFileHandler()); - return ConsumeResult.SUCCESS; - }) - .buildPushConsumer()) { - consumer2.open(); - consumer2.subscribe(TOPIC_2); - } catch (final IOException e) { - throw new RuntimeException(e); - } - }); - thread.start(); - threads.add(thread); -} - -for (final Thread thread : threads) { - thread.join(); -} -``` ## 5. Frequently Asked Questions diff --git a/src/zh/UserGuide/Master/Tree/API/Programming-Java-Native-API.md b/src/zh/UserGuide/Master/Tree/API/Programming-Java-Native-API.md index e24bfbafb..3cc23929a 100644 --- a/src/zh/UserGuide/Master/Tree/API/Programming-Java-Native-API.md +++ b/src/zh/UserGuide/Master/Tree/API/Programming-Java-Native-API.md @@ -60,7 +60,7 @@ mvn clean install -pl iotdb-client/session -am -DskipTests 下面将给出 Session 对应的接口的简要介绍和对应参数: -### 初始化 +### Session管理 * 初始化 Session @@ -94,7 +94,7 @@ session = .build(); ``` -其中,version 表示客户端使用的 SQL 语义版本,用于升级 0.13 时兼容 0.12 的 SQL 语义,可能取值有:`V_0_12`、`V_0_13`、`V_1_0`。 +其中,version 表示客户端使用的 SQL 语义版本,用于升级 0.13 时兼容 0.12 的 SQL 语义,可能取值有:`V_0_12`、`V_0_13`、`V_1_0`等。 * 开启 Session @@ -117,7 +117,28 @@ void open(boolean enableRPCCompression) void close() ``` -### 数据定义接口 DDL +* SessionPool + +我们提供了一个针对原生接口的连接池 (`SessionPool`),使用该接口时,你只需要指定连接池的大小,就可以在使用时从池中获取连接。 +如果超过 60s 都没得到一个连接的话,那么会打印一条警告日志,但是程序仍将继续等待。 + +当一个连接被用完后,他会自动返回池中等待下次被使用; +当一个连接损坏后,他会从池中被删除,并重建一个连接重新执行用户的操作; +你还可以像创建 Session 那样在创建 SessionPool 时指定多个可连接节点的 url,以保证分布式集群中客户端的高可用性。 + +对于查询操作: + +1. 使用 SessionPool 进行查询时,得到的结果集是`SessionDataSet`的封装类`SessionDataSetWrapper`; +2. 若对于一个查询的结果集,用户并没有遍历完且不再想继续遍历时,需要手动调用释放连接的操作`closeResultSet`; +3. 若对一个查询的结果集遍历时出现异常,也需要手动调用释放连接的操作`closeResultSet`. +4. 可以调用 `SessionDataSetWrapper` 的 `getColumnNames()` 方法得到结果集列名 + +使用示例可以参见 `session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java` + +或 `example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java` + + +### 测点管理接口 #### Database 管理 @@ -262,9 +283,7 @@ public void dropSchemaTemplate(String templateName); 注意:目前不支持从曾经在'prefixPath'路径及其后代节点使用模板插入数据后(即使数据已被删除)卸载模板。 -### 数据操作接口 DML - -#### 数据写入 +### 数据写入接口 推荐使用 insertTablet 帮助提高写入效率 @@ -377,7 +396,7 @@ void insertStringRecordsOfOneDevice(String deviceId, List times, * insertAlignedTablet * insertAlignedTablets -#### 数据删除 +### 数据删除接口 * 删除一个或多个时间序列在某个时间点前或这个时间点的数据 @@ -386,7 +405,7 @@ void deleteData(String path, long endTime) void deleteData(List paths, long endTime) ``` -#### 数据查询 +### 数据查询接口 * 时间序列原始数据范围查询: - 指定的查询时间范围为左闭右开区间,包含开始时间但不包含结束时间。 @@ -432,15 +451,303 @@ SessionDataSet executeAggregationQuery( long slidingStep); ``` -### IoTDB-SQL 接口 - -* 执行查询语句 +* 直接执行查询语句 ``` java SessionDataSet executeQueryStatement(String sql) ``` -* 执行非查询语句 +### 数据订阅 + +#### 1 Topic 管理 + +IoTDB 订阅客户端中的 `SubscriptionSession` 类提供了 Topic 管理的相关接口。Topic状态变化如下图所示: + +
+ +
+ +##### 1.1 创建 Topic + +```Java + void createTopicIfNotExists(String topicName, Properties properties) throws Exception; +``` + +示例: + +```Java +try (final SubscriptionSession session = new SubscriptionSession(host, port)) { + session.open(); + final Properties config = new Properties(); + config.put(TopicConstant.PATH_KEY, "root.db.**"); + session.createTopic(topicName, config); +} +``` + +##### 1.2 删除 Topic + +```Java +void dropTopicIfExists(String topicName) throws Exception; +``` + +##### 1.3 查看 Topic + +```Java +// 获取所有 topics +Set getTopics() throws Exception; + +// 获取单个 topic +Optional getTopic(String topicName) throws Exception; +``` + +#### 2 查看订阅状态 + +IoTDB 订阅客户端中的 `SubscriptionSession` 类提供了获取订阅状态的相关接口: + +```Java +Set getSubscriptions() throws Exception; +Set getSubscriptions(final String topicName) throws Exception; +``` + +#### 3 创建 Consumer + +在使用 JAVA 原生接口创建 consumer 时,需要指定 consumer 所应用的参数。 + +对于 `SubscriptionPullConsumer` 和 `SubscriptionPushConsumer` 而言,有以下公共配置: + +| 参数 | 是否必填(默认值) | 参数含义 | +| :-------------------------------------------- | :--------------------------------- | :----------------------------------------------------------- | +| host | optional: 127.0.0.1 | `String`: IoTDB 中某 DataNode 的 RPC host | +| port | optional: 6667 | `Integer`: IoTDB 中某 DataNode 的 RPC port | +| node-urls | optional: 127.0.0.1:6667 | `List`: IoTDB 中所有 DataNode 的 RPC 地址,可以是多个;host:port 和 node-urls 选填一个即可。当 host:port 和 node-urls 都填写了,则取 host:port 和 node-urls 的**并集**构成新的 node-urls 应用 | +| username | optional: root | `String`: IoTDB 中 DataNode 的用户名 | +| password | optional: root | `String`: IoTDB 中 DataNode 的密码 | +| groupId | optional | `String`: consumer group id,若未指定则随机分配(新的 consumer group),保证不同的 consumer group 对应的 consumer group id 均不相同 | +| consumerId | optional | `String`: consumer client id,若未指定则随机分配,保证同一个 consumer group 中每一个 consumer client id 均不相同 | +| heartbeatIntervalMs | optional: 30000 (min: 1000) | `Long`: consumer 向 IoTDB DataNode 定期发送心跳请求的间隔 | +| endpointsSyncIntervalMs | optional: 120000 (min: 5000) | `Long`: consumer 探测 IoTDB 集群节点扩缩容情况调整订阅连接的间隔 | +| fileSaveDir | optional: Paths.get(System.getProperty("user.dir"), "iotdb-subscription").toString() | `String`: consumer 订阅出的 TsFile 文件临时存放的目录路径 | +| fileSaveFsync | optional: false | `Boolean`: consumer 订阅 TsFile 的过程中是否主动调用 fsync | + + +##### 3.1 SubscriptionPushConsumer + +以下为 `SubscriptionPushConsumer` 中的特殊配置: +| 参数 | 是否必填(默认值) | 参数含义 | +| :-------------------------------------------- | :--------------------------------- | :----------------------------------------------------------- | +| ackStrategy | optional: `ACKStrategy.AFTER_CONSUME` | 消费进度的确认机制包含以下选项:`ACKStrategy.BEFORE_CONSUME`(当 consumer 收到数据时立刻提交消费进度,`onReceive` 前)`ACKStrategy.AFTER_CONSUME`(当 consumer 消费完数据再去提交消费进度,`onReceive` 后) | +| consumeListener | optional | 消费数据的回调函数,需实现 `ConsumeListener` 接口,定义消费 `SessionDataSetsHandler` 和 `TsFileHandler` 形式数据的处理逻辑 | +| autoPollIntervalMs | optional: 5000 (min: 500) | Long: consumer 自动拉取数据的时间间隔,单位为**毫秒** | +| autoPollTimeoutMs | optional: 10000 (min: 1000) | Long: consumer 每次拉取数据的超时时间,单位为**毫秒** | + +其中,`ConsumerListener` 接口定义如下: + +```Java +@FunctionInterface +interface ConsumeListener { + default ConsumeResult onReceive(Message message) { + return ConsumeResult.SUCCESS; + } +} + +enum ConsumeResult { + SUCCESS, + FAILURE, +} +``` + +##### 3.2 SubscriptionPullConsumer + +以下为 `SubscriptionPullConsumer` 中的特殊配置: + +| 参数 | 是否必填(默认值) | 参数含义 | +| :-------------------------------------------- | :--------------------------------- | :----------------------------------------------------------- | +| autoCommit | optional: true | Boolean: 是否自动提交消费进度如果此参数设置为 false,则需要调用 `commit` 方法来手动提交消费进度 | +| autoCommitInterval | optional: 5000 (min: 500) | Long: 自动提交消费进度的时间间隔,单位为**毫秒**仅当 autoCommit 参数为 true 的时候才会生效 | + +在创建 consumer 后,需要手动调用 consumer 的 open 方法: + +```Java +void open() throws Exception; +``` + +此时,IoTDB 订阅客户端才会校验 consumer 的配置正确性,在校验成功后 consumer 就会加入对应的 consumer group。也就是说,在打开 consumer 后,才可以使用返回的 consumer 对象进行订阅 Topic,消费数据等操作。 + +#### 4 订阅 Topic + +`SubscriptionPushConsumer` 和 `SubscriptionPullConsumer` 提供了下述 JAVA 原生接口用于订阅 Topics: + +```Java +// 订阅 topics +void subscribe(String topic) throws Exception; +void subscribe(List topics) throws Exception; +``` + +- 在 consumer 订阅 topic 前,topic 必须已经被创建,否则订阅会失败 +- 一个 consumer 在已经订阅了某个 topic 的情况下再次订阅这个 topic,不会报错 +- 如果该 consumer 所在的 consumer group 中已经有 consumers 订阅了相同的 topics,那么该 consumer 将会复用对应的消费进度 + +#### 5 消费数据 + +无论是 push 模式还是 pull 模式的 consumer: + +- 只有显式订阅了某个 topic,才会收到对应 topic 的数据 +- 若在创建后没有订阅任何 topics,此时该 consumer 无法消费到任何数据,即使该 consumer 所在的 consumer group 中其它的 consumers 订阅了一些 topics + +##### 5.1 SubscriptionPushConsumer + +SubscriptionPushConsumer 在订阅 topics 后,无需手动拉取数据,其消费数据的逻辑在创建 SubscriptionPushConsumer 指定的 `consumeListener` 配置中。 + +##### 5.2 SubscriptionPullConsumer + +SubscriptionPullConsumer 在订阅 topics 后,需要主动调用 `poll` 方法拉取数据: + +```Java +List poll(final Duration timeout) throws Exception; +List poll(final long timeoutMs) throws Exception; +List poll(final Set topicNames, final Duration timeout) throws Exception; +List poll(final Set topicNames, final long timeoutMs) throws Exception; +``` + +在 poll 方法中可以指定需要拉取的 topic 名称(如果不指定则默认拉取该 consumer 已订阅的所有 topics)和超时时间。 + +当 SubscriptionPullConsumer 配置 autoCommit 参数为 false 时,需要手动调用 commitSync 和 commitAsync 方法同步或异步提交某批数据的消费进度: + +```Java +void commitSync(final SubscriptionMessage message) throws Exception; +void commitSync(final Iterable messages) throws Exception; + +CompletableFuture commitAsync(final SubscriptionMessage message); +CompletableFuture commitAsync(final Iterable messages); +void commitAsync(final SubscriptionMessage message, final AsyncCommitCallback callback); +void commitAsync(final Iterable messages, final AsyncCommitCallback callback); +``` + +AsyncCommitCallback 类定义如下: + +```Java +public interface AsyncCommitCallback { + default void onComplete() { + // Do nothing + } + + default void onFailure(final Throwable e) { + // Do nothing + } +} +``` + +#### 6 取消订阅 + +`SubscriptionPushConsumer` 和 `SubscriptionPullConsumer` 提供了下述 JAVA 原生接口用于取消订阅并关闭 consumer: + +```Java +// 取消订阅 topics +void unsubscribe(String topic) throws Exception; +void unsubscribe(List topics) throws Exception; + +// 关闭 consumer +void close(); +``` + +- 在 topic 存在的情况下,如果一个 consumer 在没有订阅了某个 topic 的情况下取消订阅某个 topic,不会报错 +- consumer close 时会退出对应的 consumer group,同时自动 unsubscribe 该 consumer 现存订阅的所有 topics +- consumer 在 close 后生命周期即结束,无法再重新 open 订阅并消费数据 + +#### 7 代码示例 + +##### 7.1 单 Pull Consumer 消费 SessionDataSetsHandler 形式的数据 + +```Java +// Create topics +try (final SubscriptionSession session = new SubscriptionSession(HOST, PORT)) { + session.open(); + final Properties config = new Properties(); + config.put(TopicConstant.PATH_KEY, "root.db.**"); + session.createTopic(TOPIC_1, config); +} + +// Subscription: property-style ctor +final Properties config = new Properties(); +config.put(ConsumerConstant.CONSUMER_ID_KEY, "c1"); +config.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg1"); + +final SubscriptionPullConsumer consumer1 = new SubscriptionPullConsumer(config); +consumer1.open(); +consumer1.subscribe(TOPIC_1); +while (true) { + LockSupport.parkNanos(SLEEP_NS); // wait some time + final List messages = consumer1.poll(POLL_TIMEOUT_MS); + for (final SubscriptionMessage message : messages) { + for (final SubscriptionSessionDataSet dataSet : message.getSessionDataSetsHandler()) { + System.out.println(dataSet.getColumnNames()); + System.out.println(dataSet.getColumnTypes()); + while (dataSet.hasNext()) { + System.out.println(dataSet.next()); + } + } + } + // Auto commit +} + +// Show topics and subscriptions +try (final SubscriptionSession session = new SubscriptionSession(HOST, PORT)) { + session.open(); + session.getTopics().forEach((System.out::println)); + session.getSubscriptions().forEach((System.out::println)); +} + +consumer1.unsubscribe(TOPIC_1); +consumer1.close(); +``` + +##### 7.2 多 Push Consumer 消费 TsFileHandler 形式的数据 + +```Java +// Create topics +try (final SubscriptionSession subscriptionSession = new SubscriptionSession(HOST, PORT)) { + subscriptionSession.open(); + final Properties config = new Properties(); + config.put(TopicConstant.FORMAT_KEY, TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE); + subscriptionSession.createTopic(TOPIC_2, config); +} + +final List threads = new ArrayList<>(); +for (int i = 0; i < 8; ++i) { + final int idx = i; + final Thread thread = + new Thread( + () -> { + // Subscription: builder-style ctor + try (final SubscriptionPushConsumer consumer2 = + new SubscriptionPushConsumer.Builder() + .consumerId("c" + idx) + .consumerGroupId("cg2") + .fileSaveDir(System.getProperty("java.io.tmpdir")) + .ackStrategy(AckStrategy.AFTER_CONSUME) + .consumeListener( + message -> { + doSomething(message.getTsFileHandler()); + return ConsumeResult.SUCCESS; + }) + .buildPushConsumer()) { + consumer2.open(); + consumer2.subscribe(TOPIC_2); + } catch (final IOException e) { + throw new RuntimeException(e); + } + }); + thread.start(); + threads.add(thread); +} + +for (final Thread thread : threads) { + thread.join(); +} +``` + + +### 其他功能(直接执行SQL语句) ``` java void executeNonQueryStatement(String sql) @@ -488,26 +795,4 @@ void testInsertTablets(Map tablets) 使用上述接口的示例代码在 ```example/session/src/main/java/org/apache/iotdb/SessionExample.java``` -使用对齐时间序列和元数据模板的示例可以参见 `example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java` - -## 针对原生接口的连接池 - -我们提供了一个针对原生接口的连接池 (`SessionPool`),使用该接口时,你只需要指定连接池的大小,就可以在使用时从池中获取连接。 -如果超过 60s 都没得到一个连接的话,那么会打印一条警告日志,但是程序仍将继续等待。 - -当一个连接被用完后,他会自动返回池中等待下次被使用; -当一个连接损坏后,他会从池中被删除,并重建一个连接重新执行用户的操作; -你还可以像创建 Session 那样在创建 SessionPool 时指定多个可连接节点的 url,以保证分布式集群中客户端的高可用性。 - -对于查询操作: - -1. 使用 SessionPool 进行查询时,得到的结果集是`SessionDataSet`的封装类`SessionDataSetWrapper`; -2. 若对于一个查询的结果集,用户并没有遍历完且不再想继续遍历时,需要手动调用释放连接的操作`closeResultSet`; -3. 若对一个查询的结果集遍历时出现异常,也需要手动调用释放连接的操作`closeResultSet`. -4. 可以调用 `SessionDataSetWrapper` 的 `getColumnNames()` 方法得到结果集列名 - -使用示例可以参见 `session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java` - -或 `example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java` - - +使用对齐时间序列和元数据模板的示例可以参见 `example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java` \ No newline at end of file diff --git a/src/zh/UserGuide/Master/Tree/User-Manual/Data-subscription.md b/src/zh/UserGuide/Master/Tree/User-Manual/Data-subscription.md index e6fce18fe..18dd6d463 100644 --- a/src/zh/UserGuide/Master/Tree/User-Manual/Data-subscription.md +++ b/src/zh/UserGuide/Master/Tree/User-Manual/Data-subscription.md @@ -127,294 +127,7 @@ SHOW SUBSCRIPTIONS ON ## 4. API 接口 -除 SQL 语句外,IoTDB 还支持通过 Java 原生接口([链接](../API/Programming-Java-Native-API.md))使用数据订阅功能。 - -### 4.1 Topic 管理 - -IoTDB 订阅客户端中的 `SubscriptionSession` 类提供了 Topic 管理的相关接口。Topic状态变化如下图所示: - -
- -
- -#### 4.1.1 创建 Topic - -```Java - void createTopicIfNotExists(String topicName, Properties properties) throws Exception; -``` - -示例: - -```Java -try (final SubscriptionSession session = new SubscriptionSession(host, port)) { - session.open(); - final Properties config = new Properties(); - config.put(TopicConstant.PATH_KEY, "root.db.**"); - session.createTopic(topicName, config); -} -``` - -#### 4.1.2 删除 Topic - -```Java -void dropTopicIfExists(String topicName) throws Exception; -``` - -#### 4.1.3 查看 Topic - -```Java -// 获取所有 topics -Set getTopics() throws Exception; - -// 获取单个 topic -Optional getTopic(String topicName) throws Exception; -``` - -### 4.2 查看订阅状态 - -IoTDB 订阅客户端中的 `SubscriptionSession` 类提供了获取订阅状态的相关接口: - -```Java -Set getSubscriptions() throws Exception; -Set getSubscriptions(final String topicName) throws Exception; -``` - -### 4.3 创建 Consumer - -在使用 JAVA 原生接口创建 consumer 时,需要指定 consumer 所应用的参数。 - -对于 `SubscriptionPullConsumer` 和 `SubscriptionPushConsumer` 而言,有以下公共配置: - -| 参数 | 是否必填(默认值) | 参数含义 | -| :-------------------------------------------- | :--------------------------------- | :----------------------------------------------------------- | -| host | optional: 127.0.0.1 | `String`: IoTDB 中某 DataNode 的 RPC host | -| port | optional: 6667 | `Integer`: IoTDB 中某 DataNode 的 RPC port | -| node-urls | optional: 127.0.0.1:6667 | `List`: IoTDB 中所有 DataNode 的 RPC 地址,可以是多个;host:port 和 node-urls 选填一个即可。当 host:port 和 node-urls 都填写了,则取 host:port 和 node-urls 的**并集**构成新的 node-urls 应用 | -| username | optional: root | `String`: IoTDB 中 DataNode 的用户名 | -| password | optional: root | `String`: IoTDB 中 DataNode 的密码 | -| groupId | optional | `String`: consumer group id,若未指定则随机分配(新的 consumer group),保证不同的 consumer group 对应的 consumer group id 均不相同 | -| consumerId | optional | `String`: consumer client id,若未指定则随机分配,保证同一个 consumer group 中每一个 consumer client id 均不相同 | -| heartbeatIntervalMs | optional: 30000 (min: 1000) | `Long`: consumer 向 IoTDB DataNode 定期发送心跳请求的间隔 | -| endpointsSyncIntervalMs | optional: 120000 (min: 5000) | `Long`: consumer 探测 IoTDB 集群节点扩缩容情况调整订阅连接的间隔 | -| fileSaveDir | optional: Paths.get(System.getProperty("user.dir"), "iotdb-subscription").toString() | `String`: consumer 订阅出的 TsFile 文件临时存放的目录路径 | -| fileSaveFsync | optional: false | `Boolean`: consumer 订阅 TsFile 的过程中是否主动调用 fsync | - - -#### 4.3.1 SubscriptionPushConsumer - -以下为 `SubscriptionPushConsumer` 中的特殊配置: -| 参数 | 是否必填(默认值) | 参数含义 | -| :-------------------------------------------- | :--------------------------------- | :----------------------------------------------------------- | -| ackStrategy | optional: `ACKStrategy.AFTER_CONSUME` | 消费进度的确认机制包含以下选项:`ACKStrategy.BEFORE_CONSUME`(当 consumer 收到数据时立刻提交消费进度,`onReceive` 前)`ACKStrategy.AFTER_CONSUME`(当 consumer 消费完数据再去提交消费进度,`onReceive` 后) | -| consumeListener | optional | 消费数据的回调函数,需实现 `ConsumeListener` 接口,定义消费 `SessionDataSetsHandler` 和 `TsFileHandler` 形式数据的处理逻辑 | -| autoPollIntervalMs | optional: 5000 (min: 500) | Long: consumer 自动拉取数据的时间间隔,单位为**毫秒** | -| autoPollTimeoutMs | optional: 10000 (min: 1000) | Long: consumer 每次拉取数据的超时时间,单位为**毫秒** | - -其中,`ConsumerListener` 接口定义如下: - -```Java -@FunctionInterface -interface ConsumeListener { - default ConsumeResult onReceive(Message message) { - return ConsumeResult.SUCCESS; - } -} - -enum ConsumeResult { - SUCCESS, - FAILURE, -} -``` - -#### 4.3.2 SubscriptionPullConsumer - -以下为 `SubscriptionPullConsumer` 中的特殊配置: - -| 参数 | 是否必填(默认值) | 参数含义 | -| :-------------------------------------------- | :--------------------------------- | :----------------------------------------------------------- | -| autoCommit | optional: true | Boolean: 是否自动提交消费进度如果此参数设置为 false,则需要调用 `commit` 方法来手动提交消费进度 | -| autoCommitInterval | optional: 5000 (min: 500) | Long: 自动提交消费进度的时间间隔,单位为**毫秒**仅当 autoCommit 参数为 true 的时候才会生效 | - -在创建 consumer 后,需要手动调用 consumer 的 open 方法: - -```Java -void open() throws Exception; -``` - -此时,IoTDB 订阅客户端才会校验 consumer 的配置正确性,在校验成功后 consumer 就会加入对应的 consumer group。也就是说,在打开 consumer 后,才可以使用返回的 consumer 对象进行订阅 Topic,消费数据等操作。 - -### 4.4 订阅 Topic - -`SubscriptionPushConsumer` 和 `SubscriptionPullConsumer` 提供了下述 JAVA 原生接口用于订阅 Topics: - -```Java -// 订阅 topics -void subscribe(String topic) throws Exception; -void subscribe(List topics) throws Exception; -``` - -- 在 consumer 订阅 topic 前,topic 必须已经被创建,否则订阅会失败 -- 一个 consumer 在已经订阅了某个 topic 的情况下再次订阅这个 topic,不会报错 -- 如果该 consumer 所在的 consumer group 中已经有 consumers 订阅了相同的 topics,那么该 consumer 将会复用对应的消费进度 - -### 4.5 消费数据 - -无论是 push 模式还是 pull 模式的 consumer: - -- 只有显式订阅了某个 topic,才会收到对应 topic 的数据 -- 若在创建后没有订阅任何 topics,此时该 consumer 无法消费到任何数据,即使该 consumer 所在的 consumer group 中其它的 consumers 订阅了一些 topics - -#### 4.5.1 SubscriptionPushConsumer - -SubscriptionPushConsumer 在订阅 topics 后,无需手动拉取数据,其消费数据的逻辑在创建 SubscriptionPushConsumer 指定的 `consumeListener` 配置中。 - -#### 4.5.2 SubscriptionPullConsumer - -SubscriptionPullConsumer 在订阅 topics 后,需要主动调用 `poll` 方法拉取数据: - -```Java -List poll(final Duration timeout) throws Exception; -List poll(final long timeoutMs) throws Exception; -List poll(final Set topicNames, final Duration timeout) throws Exception; -List poll(final Set topicNames, final long timeoutMs) throws Exception; -``` - -在 poll 方法中可以指定需要拉取的 topic 名称(如果不指定则默认拉取该 consumer 已订阅的所有 topics)和超时时间。 - -当 SubscriptionPullConsumer 配置 autoCommit 参数为 false 时,需要手动调用 commitSync 和 commitAsync 方法同步或异步提交某批数据的消费进度: - -```Java -void commitSync(final SubscriptionMessage message) throws Exception; -void commitSync(final Iterable messages) throws Exception; - -CompletableFuture commitAsync(final SubscriptionMessage message); -CompletableFuture commitAsync(final Iterable messages); -void commitAsync(final SubscriptionMessage message, final AsyncCommitCallback callback); -void commitAsync(final Iterable messages, final AsyncCommitCallback callback); -``` - -AsyncCommitCallback 类定义如下: - -```Java -public interface AsyncCommitCallback { - default void onComplete() { - // Do nothing - } - - default void onFailure(final Throwable e) { - // Do nothing - } -} -``` - -### 4.6 取消订阅 - -`SubscriptionPushConsumer` 和 `SubscriptionPullConsumer` 提供了下述 JAVA 原生接口用于取消订阅并关闭 consumer: - -```Java -// 取消订阅 topics -void unsubscribe(String topic) throws Exception; -void unsubscribe(List topics) throws Exception; - -// 关闭 consumer -void close(); -``` - -- 在 topic 存在的情况下,如果一个 consumer 在没有订阅了某个 topic 的情况下取消订阅某个 topic,不会报错 -- consumer close 时会退出对应的 consumer group,同时自动 unsubscribe 该 consumer 现存订阅的所有 topics -- consumer 在 close 后生命周期即结束,无法再重新 open 订阅并消费数据 - -### 4.7 代码示例 - -#### 4.7.1 单 Pull Consumer 消费 SessionDataSetsHandler 形式的数据 - -```Java -// Create topics -try (final SubscriptionSession session = new SubscriptionSession(HOST, PORT)) { - session.open(); - final Properties config = new Properties(); - config.put(TopicConstant.PATH_KEY, "root.db.**"); - session.createTopic(TOPIC_1, config); -} - -// Subscription: property-style ctor -final Properties config = new Properties(); -config.put(ConsumerConstant.CONSUMER_ID_KEY, "c1"); -config.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg1"); - -final SubscriptionPullConsumer consumer1 = new SubscriptionPullConsumer(config); -consumer1.open(); -consumer1.subscribe(TOPIC_1); -while (true) { - LockSupport.parkNanos(SLEEP_NS); // wait some time - final List messages = consumer1.poll(POLL_TIMEOUT_MS); - for (final SubscriptionMessage message : messages) { - for (final SubscriptionSessionDataSet dataSet : message.getSessionDataSetsHandler()) { - System.out.println(dataSet.getColumnNames()); - System.out.println(dataSet.getColumnTypes()); - while (dataSet.hasNext()) { - System.out.println(dataSet.next()); - } - } - } - // Auto commit -} - -// Show topics and subscriptions -try (final SubscriptionSession session = new SubscriptionSession(HOST, PORT)) { - session.open(); - session.getTopics().forEach((System.out::println)); - session.getSubscriptions().forEach((System.out::println)); -} - -consumer1.unsubscribe(TOPIC_1); -consumer1.close(); -``` - -#### 4.7.2 多 Push Consumer 消费 TsFileHandler 形式的数据 - -```Java -// Create topics -try (final SubscriptionSession subscriptionSession = new SubscriptionSession(HOST, PORT)) { - subscriptionSession.open(); - final Properties config = new Properties(); - config.put(TopicConstant.FORMAT_KEY, TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE); - subscriptionSession.createTopic(TOPIC_2, config); -} - -final List threads = new ArrayList<>(); -for (int i = 0; i < 8; ++i) { - final int idx = i; - final Thread thread = - new Thread( - () -> { - // Subscription: builder-style ctor - try (final SubscriptionPushConsumer consumer2 = - new SubscriptionPushConsumer.Builder() - .consumerId("c" + idx) - .consumerGroupId("cg2") - .fileSaveDir(System.getProperty("java.io.tmpdir")) - .ackStrategy(AckStrategy.AFTER_CONSUME) - .consumeListener( - message -> { - doSomething(message.getTsFileHandler()); - return ConsumeResult.SUCCESS; - }) - .buildPushConsumer()) { - consumer2.open(); - consumer2.subscribe(TOPIC_2); - } catch (final IOException e) { - throw new RuntimeException(e); - } - }); - thread.start(); - threads.add(thread); -} - -for (final Thread thread : threads) { - thread.join(); -} -``` +除 SQL 语句外,IoTDB 还支持通过 Java 原生接口使用数据订阅功能。详细语法参见页面:Java 原生接口([链接](../API/Programming-Java-Native-API.md))。 ## 5. 常见问题 diff --git a/src/zh/UserGuide/latest/API/Programming-Java-Native-API.md b/src/zh/UserGuide/latest/API/Programming-Java-Native-API.md index e24bfbafb..3cc23929a 100644 --- a/src/zh/UserGuide/latest/API/Programming-Java-Native-API.md +++ b/src/zh/UserGuide/latest/API/Programming-Java-Native-API.md @@ -60,7 +60,7 @@ mvn clean install -pl iotdb-client/session -am -DskipTests 下面将给出 Session 对应的接口的简要介绍和对应参数: -### 初始化 +### Session管理 * 初始化 Session @@ -94,7 +94,7 @@ session = .build(); ``` -其中,version 表示客户端使用的 SQL 语义版本,用于升级 0.13 时兼容 0.12 的 SQL 语义,可能取值有:`V_0_12`、`V_0_13`、`V_1_0`。 +其中,version 表示客户端使用的 SQL 语义版本,用于升级 0.13 时兼容 0.12 的 SQL 语义,可能取值有:`V_0_12`、`V_0_13`、`V_1_0`等。 * 开启 Session @@ -117,7 +117,28 @@ void open(boolean enableRPCCompression) void close() ``` -### 数据定义接口 DDL +* SessionPool + +我们提供了一个针对原生接口的连接池 (`SessionPool`),使用该接口时,你只需要指定连接池的大小,就可以在使用时从池中获取连接。 +如果超过 60s 都没得到一个连接的话,那么会打印一条警告日志,但是程序仍将继续等待。 + +当一个连接被用完后,他会自动返回池中等待下次被使用; +当一个连接损坏后,他会从池中被删除,并重建一个连接重新执行用户的操作; +你还可以像创建 Session 那样在创建 SessionPool 时指定多个可连接节点的 url,以保证分布式集群中客户端的高可用性。 + +对于查询操作: + +1. 使用 SessionPool 进行查询时,得到的结果集是`SessionDataSet`的封装类`SessionDataSetWrapper`; +2. 若对于一个查询的结果集,用户并没有遍历完且不再想继续遍历时,需要手动调用释放连接的操作`closeResultSet`; +3. 若对一个查询的结果集遍历时出现异常,也需要手动调用释放连接的操作`closeResultSet`. +4. 可以调用 `SessionDataSetWrapper` 的 `getColumnNames()` 方法得到结果集列名 + +使用示例可以参见 `session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java` + +或 `example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java` + + +### 测点管理接口 #### Database 管理 @@ -262,9 +283,7 @@ public void dropSchemaTemplate(String templateName); 注意:目前不支持从曾经在'prefixPath'路径及其后代节点使用模板插入数据后(即使数据已被删除)卸载模板。 -### 数据操作接口 DML - -#### 数据写入 +### 数据写入接口 推荐使用 insertTablet 帮助提高写入效率 @@ -377,7 +396,7 @@ void insertStringRecordsOfOneDevice(String deviceId, List times, * insertAlignedTablet * insertAlignedTablets -#### 数据删除 +### 数据删除接口 * 删除一个或多个时间序列在某个时间点前或这个时间点的数据 @@ -386,7 +405,7 @@ void deleteData(String path, long endTime) void deleteData(List paths, long endTime) ``` -#### 数据查询 +### 数据查询接口 * 时间序列原始数据范围查询: - 指定的查询时间范围为左闭右开区间,包含开始时间但不包含结束时间。 @@ -432,15 +451,303 @@ SessionDataSet executeAggregationQuery( long slidingStep); ``` -### IoTDB-SQL 接口 - -* 执行查询语句 +* 直接执行查询语句 ``` java SessionDataSet executeQueryStatement(String sql) ``` -* 执行非查询语句 +### 数据订阅 + +#### 1 Topic 管理 + +IoTDB 订阅客户端中的 `SubscriptionSession` 类提供了 Topic 管理的相关接口。Topic状态变化如下图所示: + +
+ +
+ +##### 1.1 创建 Topic + +```Java + void createTopicIfNotExists(String topicName, Properties properties) throws Exception; +``` + +示例: + +```Java +try (final SubscriptionSession session = new SubscriptionSession(host, port)) { + session.open(); + final Properties config = new Properties(); + config.put(TopicConstant.PATH_KEY, "root.db.**"); + session.createTopic(topicName, config); +} +``` + +##### 1.2 删除 Topic + +```Java +void dropTopicIfExists(String topicName) throws Exception; +``` + +##### 1.3 查看 Topic + +```Java +// 获取所有 topics +Set getTopics() throws Exception; + +// 获取单个 topic +Optional getTopic(String topicName) throws Exception; +``` + +#### 2 查看订阅状态 + +IoTDB 订阅客户端中的 `SubscriptionSession` 类提供了获取订阅状态的相关接口: + +```Java +Set getSubscriptions() throws Exception; +Set getSubscriptions(final String topicName) throws Exception; +``` + +#### 3 创建 Consumer + +在使用 JAVA 原生接口创建 consumer 时,需要指定 consumer 所应用的参数。 + +对于 `SubscriptionPullConsumer` 和 `SubscriptionPushConsumer` 而言,有以下公共配置: + +| 参数 | 是否必填(默认值) | 参数含义 | +| :-------------------------------------------- | :--------------------------------- | :----------------------------------------------------------- | +| host | optional: 127.0.0.1 | `String`: IoTDB 中某 DataNode 的 RPC host | +| port | optional: 6667 | `Integer`: IoTDB 中某 DataNode 的 RPC port | +| node-urls | optional: 127.0.0.1:6667 | `List`: IoTDB 中所有 DataNode 的 RPC 地址,可以是多个;host:port 和 node-urls 选填一个即可。当 host:port 和 node-urls 都填写了,则取 host:port 和 node-urls 的**并集**构成新的 node-urls 应用 | +| username | optional: root | `String`: IoTDB 中 DataNode 的用户名 | +| password | optional: root | `String`: IoTDB 中 DataNode 的密码 | +| groupId | optional | `String`: consumer group id,若未指定则随机分配(新的 consumer group),保证不同的 consumer group 对应的 consumer group id 均不相同 | +| consumerId | optional | `String`: consumer client id,若未指定则随机分配,保证同一个 consumer group 中每一个 consumer client id 均不相同 | +| heartbeatIntervalMs | optional: 30000 (min: 1000) | `Long`: consumer 向 IoTDB DataNode 定期发送心跳请求的间隔 | +| endpointsSyncIntervalMs | optional: 120000 (min: 5000) | `Long`: consumer 探测 IoTDB 集群节点扩缩容情况调整订阅连接的间隔 | +| fileSaveDir | optional: Paths.get(System.getProperty("user.dir"), "iotdb-subscription").toString() | `String`: consumer 订阅出的 TsFile 文件临时存放的目录路径 | +| fileSaveFsync | optional: false | `Boolean`: consumer 订阅 TsFile 的过程中是否主动调用 fsync | + + +##### 3.1 SubscriptionPushConsumer + +以下为 `SubscriptionPushConsumer` 中的特殊配置: +| 参数 | 是否必填(默认值) | 参数含义 | +| :-------------------------------------------- | :--------------------------------- | :----------------------------------------------------------- | +| ackStrategy | optional: `ACKStrategy.AFTER_CONSUME` | 消费进度的确认机制包含以下选项:`ACKStrategy.BEFORE_CONSUME`(当 consumer 收到数据时立刻提交消费进度,`onReceive` 前)`ACKStrategy.AFTER_CONSUME`(当 consumer 消费完数据再去提交消费进度,`onReceive` 后) | +| consumeListener | optional | 消费数据的回调函数,需实现 `ConsumeListener` 接口,定义消费 `SessionDataSetsHandler` 和 `TsFileHandler` 形式数据的处理逻辑 | +| autoPollIntervalMs | optional: 5000 (min: 500) | Long: consumer 自动拉取数据的时间间隔,单位为**毫秒** | +| autoPollTimeoutMs | optional: 10000 (min: 1000) | Long: consumer 每次拉取数据的超时时间,单位为**毫秒** | + +其中,`ConsumerListener` 接口定义如下: + +```Java +@FunctionInterface +interface ConsumeListener { + default ConsumeResult onReceive(Message message) { + return ConsumeResult.SUCCESS; + } +} + +enum ConsumeResult { + SUCCESS, + FAILURE, +} +``` + +##### 3.2 SubscriptionPullConsumer + +以下为 `SubscriptionPullConsumer` 中的特殊配置: + +| 参数 | 是否必填(默认值) | 参数含义 | +| :-------------------------------------------- | :--------------------------------- | :----------------------------------------------------------- | +| autoCommit | optional: true | Boolean: 是否自动提交消费进度如果此参数设置为 false,则需要调用 `commit` 方法来手动提交消费进度 | +| autoCommitInterval | optional: 5000 (min: 500) | Long: 自动提交消费进度的时间间隔,单位为**毫秒**仅当 autoCommit 参数为 true 的时候才会生效 | + +在创建 consumer 后,需要手动调用 consumer 的 open 方法: + +```Java +void open() throws Exception; +``` + +此时,IoTDB 订阅客户端才会校验 consumer 的配置正确性,在校验成功后 consumer 就会加入对应的 consumer group。也就是说,在打开 consumer 后,才可以使用返回的 consumer 对象进行订阅 Topic,消费数据等操作。 + +#### 4 订阅 Topic + +`SubscriptionPushConsumer` 和 `SubscriptionPullConsumer` 提供了下述 JAVA 原生接口用于订阅 Topics: + +```Java +// 订阅 topics +void subscribe(String topic) throws Exception; +void subscribe(List topics) throws Exception; +``` + +- 在 consumer 订阅 topic 前,topic 必须已经被创建,否则订阅会失败 +- 一个 consumer 在已经订阅了某个 topic 的情况下再次订阅这个 topic,不会报错 +- 如果该 consumer 所在的 consumer group 中已经有 consumers 订阅了相同的 topics,那么该 consumer 将会复用对应的消费进度 + +#### 5 消费数据 + +无论是 push 模式还是 pull 模式的 consumer: + +- 只有显式订阅了某个 topic,才会收到对应 topic 的数据 +- 若在创建后没有订阅任何 topics,此时该 consumer 无法消费到任何数据,即使该 consumer 所在的 consumer group 中其它的 consumers 订阅了一些 topics + +##### 5.1 SubscriptionPushConsumer + +SubscriptionPushConsumer 在订阅 topics 后,无需手动拉取数据,其消费数据的逻辑在创建 SubscriptionPushConsumer 指定的 `consumeListener` 配置中。 + +##### 5.2 SubscriptionPullConsumer + +SubscriptionPullConsumer 在订阅 topics 后,需要主动调用 `poll` 方法拉取数据: + +```Java +List poll(final Duration timeout) throws Exception; +List poll(final long timeoutMs) throws Exception; +List poll(final Set topicNames, final Duration timeout) throws Exception; +List poll(final Set topicNames, final long timeoutMs) throws Exception; +``` + +在 poll 方法中可以指定需要拉取的 topic 名称(如果不指定则默认拉取该 consumer 已订阅的所有 topics)和超时时间。 + +当 SubscriptionPullConsumer 配置 autoCommit 参数为 false 时,需要手动调用 commitSync 和 commitAsync 方法同步或异步提交某批数据的消费进度: + +```Java +void commitSync(final SubscriptionMessage message) throws Exception; +void commitSync(final Iterable messages) throws Exception; + +CompletableFuture commitAsync(final SubscriptionMessage message); +CompletableFuture commitAsync(final Iterable messages); +void commitAsync(final SubscriptionMessage message, final AsyncCommitCallback callback); +void commitAsync(final Iterable messages, final AsyncCommitCallback callback); +``` + +AsyncCommitCallback 类定义如下: + +```Java +public interface AsyncCommitCallback { + default void onComplete() { + // Do nothing + } + + default void onFailure(final Throwable e) { + // Do nothing + } +} +``` + +#### 6 取消订阅 + +`SubscriptionPushConsumer` 和 `SubscriptionPullConsumer` 提供了下述 JAVA 原生接口用于取消订阅并关闭 consumer: + +```Java +// 取消订阅 topics +void unsubscribe(String topic) throws Exception; +void unsubscribe(List topics) throws Exception; + +// 关闭 consumer +void close(); +``` + +- 在 topic 存在的情况下,如果一个 consumer 在没有订阅了某个 topic 的情况下取消订阅某个 topic,不会报错 +- consumer close 时会退出对应的 consumer group,同时自动 unsubscribe 该 consumer 现存订阅的所有 topics +- consumer 在 close 后生命周期即结束,无法再重新 open 订阅并消费数据 + +#### 7 代码示例 + +##### 7.1 单 Pull Consumer 消费 SessionDataSetsHandler 形式的数据 + +```Java +// Create topics +try (final SubscriptionSession session = new SubscriptionSession(HOST, PORT)) { + session.open(); + final Properties config = new Properties(); + config.put(TopicConstant.PATH_KEY, "root.db.**"); + session.createTopic(TOPIC_1, config); +} + +// Subscription: property-style ctor +final Properties config = new Properties(); +config.put(ConsumerConstant.CONSUMER_ID_KEY, "c1"); +config.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg1"); + +final SubscriptionPullConsumer consumer1 = new SubscriptionPullConsumer(config); +consumer1.open(); +consumer1.subscribe(TOPIC_1); +while (true) { + LockSupport.parkNanos(SLEEP_NS); // wait some time + final List messages = consumer1.poll(POLL_TIMEOUT_MS); + for (final SubscriptionMessage message : messages) { + for (final SubscriptionSessionDataSet dataSet : message.getSessionDataSetsHandler()) { + System.out.println(dataSet.getColumnNames()); + System.out.println(dataSet.getColumnTypes()); + while (dataSet.hasNext()) { + System.out.println(dataSet.next()); + } + } + } + // Auto commit +} + +// Show topics and subscriptions +try (final SubscriptionSession session = new SubscriptionSession(HOST, PORT)) { + session.open(); + session.getTopics().forEach((System.out::println)); + session.getSubscriptions().forEach((System.out::println)); +} + +consumer1.unsubscribe(TOPIC_1); +consumer1.close(); +``` + +##### 7.2 多 Push Consumer 消费 TsFileHandler 形式的数据 + +```Java +// Create topics +try (final SubscriptionSession subscriptionSession = new SubscriptionSession(HOST, PORT)) { + subscriptionSession.open(); + final Properties config = new Properties(); + config.put(TopicConstant.FORMAT_KEY, TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE); + subscriptionSession.createTopic(TOPIC_2, config); +} + +final List threads = new ArrayList<>(); +for (int i = 0; i < 8; ++i) { + final int idx = i; + final Thread thread = + new Thread( + () -> { + // Subscription: builder-style ctor + try (final SubscriptionPushConsumer consumer2 = + new SubscriptionPushConsumer.Builder() + .consumerId("c" + idx) + .consumerGroupId("cg2") + .fileSaveDir(System.getProperty("java.io.tmpdir")) + .ackStrategy(AckStrategy.AFTER_CONSUME) + .consumeListener( + message -> { + doSomething(message.getTsFileHandler()); + return ConsumeResult.SUCCESS; + }) + .buildPushConsumer()) { + consumer2.open(); + consumer2.subscribe(TOPIC_2); + } catch (final IOException e) { + throw new RuntimeException(e); + } + }); + thread.start(); + threads.add(thread); +} + +for (final Thread thread : threads) { + thread.join(); +} +``` + + +### 其他功能(直接执行SQL语句) ``` java void executeNonQueryStatement(String sql) @@ -488,26 +795,4 @@ void testInsertTablets(Map tablets) 使用上述接口的示例代码在 ```example/session/src/main/java/org/apache/iotdb/SessionExample.java``` -使用对齐时间序列和元数据模板的示例可以参见 `example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java` - -## 针对原生接口的连接池 - -我们提供了一个针对原生接口的连接池 (`SessionPool`),使用该接口时,你只需要指定连接池的大小,就可以在使用时从池中获取连接。 -如果超过 60s 都没得到一个连接的话,那么会打印一条警告日志,但是程序仍将继续等待。 - -当一个连接被用完后,他会自动返回池中等待下次被使用; -当一个连接损坏后,他会从池中被删除,并重建一个连接重新执行用户的操作; -你还可以像创建 Session 那样在创建 SessionPool 时指定多个可连接节点的 url,以保证分布式集群中客户端的高可用性。 - -对于查询操作: - -1. 使用 SessionPool 进行查询时,得到的结果集是`SessionDataSet`的封装类`SessionDataSetWrapper`; -2. 若对于一个查询的结果集,用户并没有遍历完且不再想继续遍历时,需要手动调用释放连接的操作`closeResultSet`; -3. 若对一个查询的结果集遍历时出现异常,也需要手动调用释放连接的操作`closeResultSet`. -4. 可以调用 `SessionDataSetWrapper` 的 `getColumnNames()` 方法得到结果集列名 - -使用示例可以参见 `session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java` - -或 `example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java` - - +使用对齐时间序列和元数据模板的示例可以参见 `example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java` \ No newline at end of file diff --git a/src/zh/UserGuide/latest/User-Manual/Data-subscription.md b/src/zh/UserGuide/latest/User-Manual/Data-subscription.md index e6fce18fe..18dd6d463 100644 --- a/src/zh/UserGuide/latest/User-Manual/Data-subscription.md +++ b/src/zh/UserGuide/latest/User-Manual/Data-subscription.md @@ -127,294 +127,7 @@ SHOW SUBSCRIPTIONS ON ## 4. API 接口 -除 SQL 语句外,IoTDB 还支持通过 Java 原生接口([链接](../API/Programming-Java-Native-API.md))使用数据订阅功能。 - -### 4.1 Topic 管理 - -IoTDB 订阅客户端中的 `SubscriptionSession` 类提供了 Topic 管理的相关接口。Topic状态变化如下图所示: - -
- -
- -#### 4.1.1 创建 Topic - -```Java - void createTopicIfNotExists(String topicName, Properties properties) throws Exception; -``` - -示例: - -```Java -try (final SubscriptionSession session = new SubscriptionSession(host, port)) { - session.open(); - final Properties config = new Properties(); - config.put(TopicConstant.PATH_KEY, "root.db.**"); - session.createTopic(topicName, config); -} -``` - -#### 4.1.2 删除 Topic - -```Java -void dropTopicIfExists(String topicName) throws Exception; -``` - -#### 4.1.3 查看 Topic - -```Java -// 获取所有 topics -Set getTopics() throws Exception; - -// 获取单个 topic -Optional getTopic(String topicName) throws Exception; -``` - -### 4.2 查看订阅状态 - -IoTDB 订阅客户端中的 `SubscriptionSession` 类提供了获取订阅状态的相关接口: - -```Java -Set getSubscriptions() throws Exception; -Set getSubscriptions(final String topicName) throws Exception; -``` - -### 4.3 创建 Consumer - -在使用 JAVA 原生接口创建 consumer 时,需要指定 consumer 所应用的参数。 - -对于 `SubscriptionPullConsumer` 和 `SubscriptionPushConsumer` 而言,有以下公共配置: - -| 参数 | 是否必填(默认值) | 参数含义 | -| :-------------------------------------------- | :--------------------------------- | :----------------------------------------------------------- | -| host | optional: 127.0.0.1 | `String`: IoTDB 中某 DataNode 的 RPC host | -| port | optional: 6667 | `Integer`: IoTDB 中某 DataNode 的 RPC port | -| node-urls | optional: 127.0.0.1:6667 | `List`: IoTDB 中所有 DataNode 的 RPC 地址,可以是多个;host:port 和 node-urls 选填一个即可。当 host:port 和 node-urls 都填写了,则取 host:port 和 node-urls 的**并集**构成新的 node-urls 应用 | -| username | optional: root | `String`: IoTDB 中 DataNode 的用户名 | -| password | optional: root | `String`: IoTDB 中 DataNode 的密码 | -| groupId | optional | `String`: consumer group id,若未指定则随机分配(新的 consumer group),保证不同的 consumer group 对应的 consumer group id 均不相同 | -| consumerId | optional | `String`: consumer client id,若未指定则随机分配,保证同一个 consumer group 中每一个 consumer client id 均不相同 | -| heartbeatIntervalMs | optional: 30000 (min: 1000) | `Long`: consumer 向 IoTDB DataNode 定期发送心跳请求的间隔 | -| endpointsSyncIntervalMs | optional: 120000 (min: 5000) | `Long`: consumer 探测 IoTDB 集群节点扩缩容情况调整订阅连接的间隔 | -| fileSaveDir | optional: Paths.get(System.getProperty("user.dir"), "iotdb-subscription").toString() | `String`: consumer 订阅出的 TsFile 文件临时存放的目录路径 | -| fileSaveFsync | optional: false | `Boolean`: consumer 订阅 TsFile 的过程中是否主动调用 fsync | - - -#### 4.3.1 SubscriptionPushConsumer - -以下为 `SubscriptionPushConsumer` 中的特殊配置: -| 参数 | 是否必填(默认值) | 参数含义 | -| :-------------------------------------------- | :--------------------------------- | :----------------------------------------------------------- | -| ackStrategy | optional: `ACKStrategy.AFTER_CONSUME` | 消费进度的确认机制包含以下选项:`ACKStrategy.BEFORE_CONSUME`(当 consumer 收到数据时立刻提交消费进度,`onReceive` 前)`ACKStrategy.AFTER_CONSUME`(当 consumer 消费完数据再去提交消费进度,`onReceive` 后) | -| consumeListener | optional | 消费数据的回调函数,需实现 `ConsumeListener` 接口,定义消费 `SessionDataSetsHandler` 和 `TsFileHandler` 形式数据的处理逻辑 | -| autoPollIntervalMs | optional: 5000 (min: 500) | Long: consumer 自动拉取数据的时间间隔,单位为**毫秒** | -| autoPollTimeoutMs | optional: 10000 (min: 1000) | Long: consumer 每次拉取数据的超时时间,单位为**毫秒** | - -其中,`ConsumerListener` 接口定义如下: - -```Java -@FunctionInterface -interface ConsumeListener { - default ConsumeResult onReceive(Message message) { - return ConsumeResult.SUCCESS; - } -} - -enum ConsumeResult { - SUCCESS, - FAILURE, -} -``` - -#### 4.3.2 SubscriptionPullConsumer - -以下为 `SubscriptionPullConsumer` 中的特殊配置: - -| 参数 | 是否必填(默认值) | 参数含义 | -| :-------------------------------------------- | :--------------------------------- | :----------------------------------------------------------- | -| autoCommit | optional: true | Boolean: 是否自动提交消费进度如果此参数设置为 false,则需要调用 `commit` 方法来手动提交消费进度 | -| autoCommitInterval | optional: 5000 (min: 500) | Long: 自动提交消费进度的时间间隔,单位为**毫秒**仅当 autoCommit 参数为 true 的时候才会生效 | - -在创建 consumer 后,需要手动调用 consumer 的 open 方法: - -```Java -void open() throws Exception; -``` - -此时,IoTDB 订阅客户端才会校验 consumer 的配置正确性,在校验成功后 consumer 就会加入对应的 consumer group。也就是说,在打开 consumer 后,才可以使用返回的 consumer 对象进行订阅 Topic,消费数据等操作。 - -### 4.4 订阅 Topic - -`SubscriptionPushConsumer` 和 `SubscriptionPullConsumer` 提供了下述 JAVA 原生接口用于订阅 Topics: - -```Java -// 订阅 topics -void subscribe(String topic) throws Exception; -void subscribe(List topics) throws Exception; -``` - -- 在 consumer 订阅 topic 前,topic 必须已经被创建,否则订阅会失败 -- 一个 consumer 在已经订阅了某个 topic 的情况下再次订阅这个 topic,不会报错 -- 如果该 consumer 所在的 consumer group 中已经有 consumers 订阅了相同的 topics,那么该 consumer 将会复用对应的消费进度 - -### 4.5 消费数据 - -无论是 push 模式还是 pull 模式的 consumer: - -- 只有显式订阅了某个 topic,才会收到对应 topic 的数据 -- 若在创建后没有订阅任何 topics,此时该 consumer 无法消费到任何数据,即使该 consumer 所在的 consumer group 中其它的 consumers 订阅了一些 topics - -#### 4.5.1 SubscriptionPushConsumer - -SubscriptionPushConsumer 在订阅 topics 后,无需手动拉取数据,其消费数据的逻辑在创建 SubscriptionPushConsumer 指定的 `consumeListener` 配置中。 - -#### 4.5.2 SubscriptionPullConsumer - -SubscriptionPullConsumer 在订阅 topics 后,需要主动调用 `poll` 方法拉取数据: - -```Java -List poll(final Duration timeout) throws Exception; -List poll(final long timeoutMs) throws Exception; -List poll(final Set topicNames, final Duration timeout) throws Exception; -List poll(final Set topicNames, final long timeoutMs) throws Exception; -``` - -在 poll 方法中可以指定需要拉取的 topic 名称(如果不指定则默认拉取该 consumer 已订阅的所有 topics)和超时时间。 - -当 SubscriptionPullConsumer 配置 autoCommit 参数为 false 时,需要手动调用 commitSync 和 commitAsync 方法同步或异步提交某批数据的消费进度: - -```Java -void commitSync(final SubscriptionMessage message) throws Exception; -void commitSync(final Iterable messages) throws Exception; - -CompletableFuture commitAsync(final SubscriptionMessage message); -CompletableFuture commitAsync(final Iterable messages); -void commitAsync(final SubscriptionMessage message, final AsyncCommitCallback callback); -void commitAsync(final Iterable messages, final AsyncCommitCallback callback); -``` - -AsyncCommitCallback 类定义如下: - -```Java -public interface AsyncCommitCallback { - default void onComplete() { - // Do nothing - } - - default void onFailure(final Throwable e) { - // Do nothing - } -} -``` - -### 4.6 取消订阅 - -`SubscriptionPushConsumer` 和 `SubscriptionPullConsumer` 提供了下述 JAVA 原生接口用于取消订阅并关闭 consumer: - -```Java -// 取消订阅 topics -void unsubscribe(String topic) throws Exception; -void unsubscribe(List topics) throws Exception; - -// 关闭 consumer -void close(); -``` - -- 在 topic 存在的情况下,如果一个 consumer 在没有订阅了某个 topic 的情况下取消订阅某个 topic,不会报错 -- consumer close 时会退出对应的 consumer group,同时自动 unsubscribe 该 consumer 现存订阅的所有 topics -- consumer 在 close 后生命周期即结束,无法再重新 open 订阅并消费数据 - -### 4.7 代码示例 - -#### 4.7.1 单 Pull Consumer 消费 SessionDataSetsHandler 形式的数据 - -```Java -// Create topics -try (final SubscriptionSession session = new SubscriptionSession(HOST, PORT)) { - session.open(); - final Properties config = new Properties(); - config.put(TopicConstant.PATH_KEY, "root.db.**"); - session.createTopic(TOPIC_1, config); -} - -// Subscription: property-style ctor -final Properties config = new Properties(); -config.put(ConsumerConstant.CONSUMER_ID_KEY, "c1"); -config.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg1"); - -final SubscriptionPullConsumer consumer1 = new SubscriptionPullConsumer(config); -consumer1.open(); -consumer1.subscribe(TOPIC_1); -while (true) { - LockSupport.parkNanos(SLEEP_NS); // wait some time - final List messages = consumer1.poll(POLL_TIMEOUT_MS); - for (final SubscriptionMessage message : messages) { - for (final SubscriptionSessionDataSet dataSet : message.getSessionDataSetsHandler()) { - System.out.println(dataSet.getColumnNames()); - System.out.println(dataSet.getColumnTypes()); - while (dataSet.hasNext()) { - System.out.println(dataSet.next()); - } - } - } - // Auto commit -} - -// Show topics and subscriptions -try (final SubscriptionSession session = new SubscriptionSession(HOST, PORT)) { - session.open(); - session.getTopics().forEach((System.out::println)); - session.getSubscriptions().forEach((System.out::println)); -} - -consumer1.unsubscribe(TOPIC_1); -consumer1.close(); -``` - -#### 4.7.2 多 Push Consumer 消费 TsFileHandler 形式的数据 - -```Java -// Create topics -try (final SubscriptionSession subscriptionSession = new SubscriptionSession(HOST, PORT)) { - subscriptionSession.open(); - final Properties config = new Properties(); - config.put(TopicConstant.FORMAT_KEY, TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE); - subscriptionSession.createTopic(TOPIC_2, config); -} - -final List threads = new ArrayList<>(); -for (int i = 0; i < 8; ++i) { - final int idx = i; - final Thread thread = - new Thread( - () -> { - // Subscription: builder-style ctor - try (final SubscriptionPushConsumer consumer2 = - new SubscriptionPushConsumer.Builder() - .consumerId("c" + idx) - .consumerGroupId("cg2") - .fileSaveDir(System.getProperty("java.io.tmpdir")) - .ackStrategy(AckStrategy.AFTER_CONSUME) - .consumeListener( - message -> { - doSomething(message.getTsFileHandler()); - return ConsumeResult.SUCCESS; - }) - .buildPushConsumer()) { - consumer2.open(); - consumer2.subscribe(TOPIC_2); - } catch (final IOException e) { - throw new RuntimeException(e); - } - }); - thread.start(); - threads.add(thread); -} - -for (final Thread thread : threads) { - thread.join(); -} -``` +除 SQL 语句外,IoTDB 还支持通过 Java 原生接口使用数据订阅功能。详细语法参见页面:Java 原生接口([链接](../API/Programming-Java-Native-API.md))。 ## 5. 常见问题