Data Subscription API
Data Subscription API
IoTDB provides powerful data subscription functionality, allowing users to access newly added data from IoTDB in real-time through subscription APIs. For detailed functional definitions and introductions:Data subscription
1. Core Steps
- Create Topic: Create a Topic that includes the measurement points you wish to subscribe to.
- Subscribe to Topic: Before a consumer subscribes to a topic, the topic must have been created, otherwise the subscription will fail. Consumers under the same consumer group will evenly distribute the data.
- Consume Data: Only by explicitly subscribing to a specific topic will you receive data from that topic.
- Unsubscribe: When a consumer is closed, it will exit the corresponding consumer group and cancel all existing subscriptions.
2. Detailed Steps
This section is used to illustrate the core development process and does not demonstrate all parameters and interfaces. For a comprehensive understanding of all features and parameters, please refer to: Java Native API
2.1 Create a Maven project
Create a Maven project and import the following dependencies(JDK >= 1.8, Maven >= 3.6)
<dependencies>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-session</artifactId>
<!-- The version number is the same as the database version number -->
<version>${project.version}</version>
</dependency>
</dependencies>
2.2 Code Example
2.2.1 Topic operations
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.rpc.subscription.config.TopicConstant;
import org.apache.iotdb.session.subscription.SubscriptionSession;
import org.apache.iotdb.session.subscription.model.Topic;
public class DataConsumerExample {
public static void main(String[] args) throws IoTDBConnectionException, StatementExecutionException {
try (SubscriptionSession session = new SubscriptionSession("127.0.0.1", 6667, "root", "root", 67108864)) {
// 1. open session
session.open();
// 2. create a topic of all data
Properties sessionConfig = new Properties();
sessionConfig.put(TopicConstant.PATH_KEY, "root.**");
session.createTopic("allData", sessionConfig);
// 3. show all topics
Set<Topic> topics = session.getTopics();
System.out.println(topics);
// 4. show a specific topic
Optional<Topic> allData = session.getTopic("allData");
System.out.println(allData.get());
}
}
}
2.2.2 Data Consume
Scenario-1: Subscribing to newly added real-time data in IoTDB (for scenarios such as dashboard or configuration display)
import java.io.IOException;
import java.util.List;
import java.util.Properties;
import org.apache.iotdb.rpc.subscription.config.ConsumerConstant;
import org.apache.iotdb.rpc.subscription.config.TopicConstant;
import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessageType;
import org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet;
import org.apache.tsfile.read.common.RowRecord;
public class DataConsumerExample {
public static void main(String[] args) throws IOException {
// 5. create a pull consumer, the subscription is automatically cancelled when the logic in the try resources is completed
Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConstant.CONSUMER_ID_KEY, "c1");
consumerConfig.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg1");
consumerConfig.put(ConsumerConstant.USERNAME_KEY, "root");
consumerConfig.put(ConsumerConstant.PASSWORD_KEY, "root");
try (SubscriptionPullConsumer pullConsumer = new SubscriptionPullConsumer(consumerConfig)) {
pullConsumer.open();
pullConsumer.subscribe("topic_all");
while (true) {
List<SubscriptionMessage> messages = pullConsumer.poll(10000);
for (final SubscriptionMessage message : messages) {
final short messageType = message.getMessageType();
if (SubscriptionMessageType.isValidatedMessageType(messageType)) {
for (final SubscriptionSessionDataSet dataSet : message.getSessionDataSetsHandler()) {
while (dataSet.hasNext()) {
final RowRecord record = dataSet.next();
System.out.println(record);
}
}
}
}
}
}
}
}
Scenario-2: Subscribing to newly added TsFiles (for scenarios such as regular data backup)
Prerequisite: The format of the topic to be consumed must be of the TsfileHandler type. For example:create topic topic_all_tsfile with ('path'='root.**','format'='TsFileHandler')
import java.io.IOException;
import java.util.List;
import java.util.Properties;
import org.apache.iotdb.rpc.subscription.config.ConsumerConstant;
import org.apache.iotdb.rpc.subscription.config.TopicConstant;
import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
public class DataConsumerExample {
public static void main(String[] args) throws IOException {
// 1. create a pull consumer, the subscription is automatically cancelled when the logic in the try resources is completed
Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConstant.CONSUMER_ID_KEY, "c1");
consumerConfig.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg1");
// 2. Specify the consumption type as the tsfile type
consumerConfig.put(ConsumerConstant.USERNAME_KEY, "root");
consumerConfig.put(ConsumerConstant.PASSWORD_KEY, "root");
consumerConfig.put(ConsumerConstant.FILE_SAVE_DIR_KEY, "/Users/iotdb/Downloads");
try (SubscriptionPullConsumer pullConsumer = new SubscriptionPullConsumer(consumerConfig)) {
pullConsumer.open();
pullConsumer.subscribe("topic_all_tsfile");
while (true) {
List<SubscriptionMessage> messages = pullConsumer.poll(10000);
for (final SubscriptionMessage message : messages) {
message.getTsFileHandler().copyFile("/Users/iotdb/Downloads/1.tsfile");
}
}
}
}
}
3. Java Native API Description
3.1 Parameter List
The consumer-related parameters can be set through the Properties parameter object. The specific parameters are as follows:
SubscriptionConsumer
Parameter | required or optional with default | Parameter Meaning |
---|---|---|
host | optional: 127.0.0.1 | String : The RPC host of a DataNode in IoTDB |
port | optional: 6667 | Integer : The RPC port of a DataNode in IoTDB |
node-urls | optional: 127.0.0.1:6667 | List<String> : The RPC addresses of all DataNodes in IoTDB, which can be multiple; either host:port or node-urls can be filled. If both host:port and node-urls are filled, the union of host:port and node-urls will be taken to form a new node-urls for application |
username | optional: root | String : The username of the DataNode in IoTDB |
password | optional: root | String : The password of the DataNode in IoTDB |
groupId | optional | String : consumer group id,if not specified, it will be randomly assigned (a new consumer group),ensuring that the consumer group id of different consumer groups are all different |
consumerId | optional | String : consumer client id,if not specified, it will be randomly assigned,ensuring that each consumer client id in the same consumer group is different |
heartbeatIntervalMs | optional: 30000 (min: 1000) | Long : The interval at which the consumer sends periodic heartbeat requests to the IoTDB DataNode |
endpointsSyncIntervalMs | optional: 120000 (min: 5000) | Long : The interval at which the consumer detects the expansion or contraction of IoTDB cluster nodes and adjusts the subscription connection |
fileSaveDir | optional: Paths.get(System.getProperty("user.dir"), "iotdb-subscription").toString() | String : The temporary directory path where the consumer stores the subscribed TsFile files |
fileSaveFsync | optional: false | Boolean : Whether the consumer actively calls fsync during the subscription of TsFiles |
Special configurations in SubscriptionPushConsumer
:
Parameter | required or optional with default | Parameter Meaning |
---|---|---|
ackStrategy | optional: ACKStrategy.AFTER_CONSUME | The acknowledgment mechanism for consumption progress includes the following options: ACKStrategy.BEFORE_CONSUME (the consumer submits the consumption progress immediately upon receiving the data, before onReceive )ACKStrategy.AFTER_CONSUME (the consumer submits the consumption progress after consuming the data, after onReceive ) |
consumeListener | optional | The callback function for consuming data, which needs to implement the ConsumeListener interface, defining the processing logic for consuming SessionDataSetsHandler and TsFileHandler formatted data |
autoPollIntervalMs | optional: 5000 (min: 500) | Long: The time interval at which the consumer automatically pulls data, in ms |
autoPollTimeoutMs | optional: 10000 (min: 1000) | Long: The timeout duration for the consumer to pull data each time, in ms |
Special configurations in SubscriptionPullConsumer
:
Parameter | required or optional with default | Parameter Meaning |
---|---|---|
autoCommit | optional: true | Boolean: Whether to automatically commit the consumption progress. If this parameter is set to false, the commit method needs to be called manually to submit the consumption progress |
autoCommitInterval | optional: 5000 (min: 500) | Long: The time interval for automatically committing the consumption progress, in ms .This parameter only takes effect when the autoCommit parameter is set to true |
3.2 Function List
Data subscription
SubscriptionPullConsumer
Function name | Description | Parameter |
---|---|---|
open() | Opens the consumer connection and starts message consumption. If autoCommit is enabled, it will start the automatic commit worker. | None |
close() | Closes the consumer connection. If autoCommit is enabled, it will commit all uncommitted messages before closing. | None |
poll(final Duration timeout) | Pulls messages with a specified timeout. | timeout : The timeout duration. |
poll(final long timeoutMs) | Pulls messages with a specified timeout in milliseconds. | timeoutMs : The timeout duration in milliseconds. |
poll(final Set<String> topicNames, final Duration timeout) | Pulls messages from specified topics with a specified timeout. | topicNames : The set of topics to pull messages from. timeout : The timeout duration。 |
poll(final Set<String> topicNames, final long timeoutMs) | Pulls messages from specified topics with a specified timeout in milliseconds. | topicNames : The set of topics to pull messages from.timeoutMs : The timeout duration in milliseconds. |
commitSync(final SubscriptionMessage message) | Synchronously commits a single message. | message : The message object to be committed. |
commitSync(final Iterable<SubscriptionMessage> messages) | Synchronously commits multiple messages. | messages : The collection of message objects to be committed. |
commitAsync(final SubscriptionMessage message) | Asynchronously commits a single message. | message : The message object to be committed. |
commitAsync(final Iterable<SubscriptionMessage> messages) | Asynchronously commits multiple messages. | messages : The collection of message objects to be committed. |
commitAsync(final SubscriptionMessage message, final AsyncCommitCallback callback) | Asynchronously commits a single message with a specified callback. | message : The message object to be committed. callback : The callback function to be executed after asynchronous commit. |
commitAsync(final Iterable<SubscriptionMessage> messages, final AsyncCommitCallback callback) | Asynchronously commits multiple messages with a specified callback. | messages : The collection of message objects to be committed.callback : The callback function to be executed after asynchronous commit. |
SubscriptionPushConsumer
Function name | Description | Parameter |
---|---|---|
open() | Opens the consumer connection, starts message consumption, and submits the automatic polling worker. | None |
close() | Closes the consumer connection and stops message consumption. | None |
toString() | Returns the core configuration information of the consumer object. | None |
coreReportMessage() | Obtains the key-value representation of the consumer's core configuration. | None |
allReportMessage() | Obtains the key-value representation of all the consumer's configurations. | None |
buildPushConsumer() | Builds a SubscriptionPushConsumer instance through the Builder | None |
ackStrategy(final AckStrategy ackStrategy) | Configures the message acknowledgment strategy for the consumer. | ackStrategy : The specified message acknowledgment strategy. |
consumeListener(final ConsumeListener consumeListener) | Configures the message consumption logic for the consumer. | consumeListener : The processing logic when the consumer receives messages. |
autoPollIntervalMs(final long autoPollIntervalMs) | Configures the interval for automatic polling. | autoPollIntervalMs : The interval for automatic polling, in milliseconds. |
autoPollTimeoutMs(final long autoPollTimeoutMs) | Configures the timeout for automatic polling.间。 | autoPollTimeoutMs : The timeout for automatic polling, in milliseconds. |