Java Native API
Java Native API
In the native API of IoTDB, the Session
is the core interface for interacting with the database. It integrates a rich set of methods that support data writing, querying, and metadata operations. By instantiating a Session
, you can establish a connection to the IoTDB server and perform various database operations within the environment constructed by this connection. The Session
is not thread-safe and should not be called simultaneously by multiple threads.
SessionPool
is a connection pool for Session
, and it is recommended to use SessionPool
for programming. In scenarios with multi-threaded concurrency, SessionPool
can manage and allocate connection resources effectively, thereby improving system performance and resource utilization efficiency.
1. Overview of Steps
- Create a Connection Pool Instance: Initialize a SessionPool object to manage multiple Session instances.
- Perform Operations: Directly obtain a Session instance from the SessionPool and execute database operations, without the need to open and close connections each time.
- Close Connection Pool Resources: When database operations are no longer needed, close the SessionPool to release all related resources.
2. Detailed Steps
This section provides an overview of the core development process and does not demonstrate all parameters and interfaces. For a complete list of functionalities and parameters, please refer to:Java Native API or check the: Source Code
2.1 Create a Maven Project
Create a Maven project and add the following dependencies to the pom.xml file (JDK >= 1.8, Maven >= 3.6):
<dependencies>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-session</artifactId>
<!-- The version number is the same as the database version number -->
<version>${project.version}</version>
</dependency>
</dependencies>
2.2 Creating a Connection Pool Instance
import java.util.ArrayList;
import java.util.List;
import org.apache.iotdb.session.pool.SessionPool;
public class IoTDBSessionPoolExample {
private static SessionPool sessionPool;
public static void main(String[] args) {
// Using nodeUrls ensures that when one node goes down, other nodes are automatically connected to retry
List<String> nodeUrls = new ArrayList<>();
nodeUrls.add("127.0.0.1:6667");
nodeUrls.add("127.0.0.1:6668");
sessionPool =
new SessionPool.Builder()
.nodeUrls(nodeUrls)
.user("root")
.password("root")
.maxSize(3)
.build();
}
}
2.3 Performing Database Operations
2.3.1 Data Insertion
In industrial scenarios, data insertion can be categorized into the following types: inserting multiple rows of data, and inserting multiple rows of data for a single device. Below, we introduce the insertion interfaces for different scenarios.
Multi-Row Data Insertion Interface
Interface Description: Supports inserting multiple rows of data at once, where each row corresponds to multiple measurement values for a device at a specific timestamp.
Interface List:
Interface Name | Function Description |
---|---|
insertRecords(List<String> deviceIds, List<Long> times, List<List<String>> measurementsList, List<List<TSDataType>> typesList, List<List<Object>> valuesList) | Inserts multiple rows of data, suitable for scenarios where measurements are independently collected. |
Code Example:
import java.util.ArrayList;
import java.util.List;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.pool.SessionPool;
import org.apache.tsfile.enums.TSDataType;
public class SessionPoolExample {
private static SessionPool sessionPool;
public static void main(String[] args) throws IoTDBConnectionException, StatementExecutionException {
// 1. init SessionPool
constructSessionPool();
// 2. execute insert data
insertRecordsExample();
// 3. close SessionPool
closeSessionPool();
}
private static void constructSessionPool() {
// Using nodeUrls ensures that when one node goes down, other nodes are automatically connected to retry
List<String> nodeUrls = new ArrayList<>();
nodeUrls.add("127.0.0.1:6667");
nodeUrls.add("127.0.0.1:6668");
sessionPool =
new SessionPool.Builder()
.nodeUrls(nodeUrls)
.user("root")
.password("root")
.maxSize(3)
.build();
}
public static void insertRecordsExample() throws IoTDBConnectionException, StatementExecutionException {
String deviceId = "root.sg1.d1";
List<String> measurements = new ArrayList<>();
measurements.add("s1");
measurements.add("s2");
measurements.add("s3");
List<String> deviceIds = new ArrayList<>();
List<List<String>> measurementsList = new ArrayList<>();
List<List<Object>> valuesList = new ArrayList<>();
List<Long> timestamps = new ArrayList<>();
List<List<TSDataType>> typesList = new ArrayList<>();
for (long time = 0; time < 500; time++) {
List<Object> values = new ArrayList<>();
List<TSDataType> types = new ArrayList<>();
values.add(1L);
values.add(2L);
values.add(3L);
types.add(TSDataType.INT64);
types.add(TSDataType.INT64);
types.add(TSDataType.INT64);
deviceIds.add(deviceId);
measurementsList.add(measurements);
valuesList.add(values);
typesList.add(types);
timestamps.add(time);
if (time != 0 && time % 100 == 0) {
try {
sessionPool.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
} catch (IoTDBConnectionException | StatementExecutionException e) {
// solve exception
}
deviceIds.clear();
measurementsList.clear();
valuesList.clear();
typesList.clear();
timestamps.clear();
}
}
try {
sessionPool.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
} catch (IoTDBConnectionException | StatementExecutionException e) {
// solve exception
}
}
public static void closeSessionPool(){
sessionPool.close();
}
}
Single-Device Multi-Row Data Insertion Interface
Interface Description: Supports inserting multiple rows of data for a single device at once, where each row corresponds to multiple measurement values for a specific timestamp.
Interface List:
Interface Name | Function Description |
---|---|
insertTablet(Tablet tablet) | Inserts multiple rows of data for a single device, suitable for scenarios where measurements are independently collected. |
Code Example:
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.pool.SessionPool;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.MeasurementSchema;
public class SessionPoolExample {
private static SessionPool sessionPool;
public static void main(String[] args) throws IoTDBConnectionException, StatementExecutionException {
// 1. init SessionPool
constructSessionPool();
// 2. execute insert data
insertTabletExample();
// 3. close SessionPool
closeSessionPool();
}
private static void constructSessionPool() {
// Using nodeUrls ensures that when one node goes down, other nodes are automatically connected to retry
List<String> nodeUrls = new ArrayList<>();
nodeUrls.add("127.0.0.1:6667");
nodeUrls.add("127.0.0.1:6668");
sessionPool =
new SessionPool.Builder()
.nodeUrls(nodeUrls)
.user("root")
.password("root")
.maxSize(3)
.build();
}
private static void insertTabletExample() throws IoTDBConnectionException, StatementExecutionException {
/*
* A Tablet example:
* device1
* time s1, s2, s3
* 1, 1, 1, 1
* 2, 2, 2, 2
* 3, 3, 3, 3
*/
// The schema of measurements of one device
// only measurementId and data type in MeasurementSchema take effects in Tablet
List<MeasurementSchema> schemaList = new ArrayList<>();
schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
schemaList.add(new MeasurementSchema("s2", TSDataType.INT64));
schemaList.add(new MeasurementSchema("s3", TSDataType.INT64));
Tablet tablet = new Tablet("root.sg.d1", schemaList, 100);
// Method 1 to add tablet data
long timestamp = System.currentTimeMillis();
Random random = new Random();
for (long row = 0; row < 100; row++) {
int rowIndex = tablet.rowSize++;
tablet.addTimestamp(rowIndex, timestamp);
for (int s = 0; s < 3; s++) {
long value = random.nextLong();
tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value);
}
if (tablet.rowSize == tablet.getMaxRowNumber()) {
sessionPool.insertTablet(tablet);
tablet.reset();
}
timestamp++;
}
if (tablet.rowSize != 0) {
sessionPool.insertTablet(tablet);
tablet.reset();
}
}
public static void closeSessionPool(){
sessionPool.close();
}
}
2.3.2 SQL Operations
SQL operations are divided into two categories: queries and non-queries. The corresponding interfaces are executeQuery and executeNonQuery. The difference between them is that the former executes specific query statements and returns a result set, while the latter performs insert, delete, and update operations and does not return a result set.
import java.util.ArrayList;
import java.util.List;
import org.apache.iotdb.isession.pool.SessionDataSetWrapper;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.pool.SessionPool;
public class SessionPoolExample {
private static SessionPool sessionPool;
public static void main(String[] args) throws IoTDBConnectionException, StatementExecutionException {
// 1. init SessionPool
constructSessionPool();
// 2. executes a non-query SQL statement, such as a DDL or DML command.
executeQueryExample();
// 3. executes a query SQL statement and returns the result set.
executeNonQueryExample();
// 4. close SessionPool
closeSessionPool();
}
private static void executeNonQueryExample() throws IoTDBConnectionException, StatementExecutionException {
// 1. create a nonAligned time series
sessionPool.executeNonQueryStatement("create timeseries root.test.d1.s1 with dataType = int32");
// 2. set ttl
sessionPool.executeNonQueryStatement("set TTL to root.test.** 10000");
// 3. delete time series
sessionPool.executeNonQueryStatement("delete timeseries root.test.d1.s1");
private static void executeQueryExample() throws IoTDBConnectionException, StatementExecutionException {
// 1. execute normal query
try(SessionDataSetWrapper wrapper = sessionPool.executeQueryStatement("select s1 from root.sg1.d1 limit 10")) {
while (wrapper.hasNext()) {
System.out.println(wrapper.next());
}
}
// 2. execute aggregate query
try(SessionDataSetWrapper wrapper = sessionPool.executeQueryStatement("select count(s1) from root.sg1.d1 group by ([0, 40), 5ms) ")) {
while (wrapper.hasNext()) {
System.out.println(wrapper.next());
}
}
}
private static void constructSessionPool() {
// Using nodeUrls ensures that when one node goes down, other nodes are automatically connected to retry
List<String> nodeUrls = new ArrayList<>();
nodeUrls.add("127.0.0.1:6667");
nodeUrls.add("127.0.0.1:6668");
sessionPool =
new SessionPool.Builder()
.nodeUrls(nodeUrls)
.user("root")
.password("root")
.maxSize(3)
.build();
}
public static void closeSessionPool(){
sessionPool.close();
}
}
3. Native Interface Description
3.1 Parameter List
The Session class has the following fields, which can be set through the constructor or the Session.Builder method:
Field Name | Type | Description |
---|---|---|
nodeUrls | List<String> | List of URLs for database nodes, supporting multiple node connections |
username | String | Username |
password | String | Password |
fetchSize | int | Default batch size for query results |
useSSL | boolean | Whether to enable SSL |
trustStore | String | Path to the trust store |
trustStorePwd | String | Password for the trust store |
queryTimeoutInMs | long | Query timeout in milliseconds |
enableRPCCompression | boolean | Whether to enable RPC compression |
connectionTimeoutInMs | int | Connection timeout in milliseconds |
zoneId | ZoneId | Time zone setting for the session |
thriftDefaultBufferSize | int | Default buffer size for Thrift Thrift |
thriftMaxFrameSize | int | Maximum frame size for Thrift Thrift |
defaultEndPoint | TEndPoint | Default database endpoint information |
defaultSessionConnection | SessionConnection | Default session connection object |
isClosed | boolean | Whether the current session is closed |
enableRedirection | boolean | Whether to enable redirection |
enableRecordsAutoConvertTablet | boolean | Whether to enable the function of recording the automatic transfer to Tablet |
deviceIdToEndpoint | Map<String, TEndPoint> | Mapping of device IDs to database endpoints |
endPointToSessionConnection | Map<TEndPoint, SessionConnection> | Mapping of database endpoints to session connections |
executorService | ScheduledExecutorService | Thread pool for periodically updating the node list |
availableNodes | INodeSupplier | Supplier of available nodes |
enableQueryRedirection | boolean | Whether to enable query redirection |
version | Version | Client version number, used for compatibility judgment with the server |
enableAutoFetch | boolean | Whether to enable automatic fetching |
maxRetryCount | int | Maximum number of retries |
retryIntervalInMs | long | Retry interval in milliseconds |
3.2 Interface list
3.2.1 Metadata Management
Method Name | Function Description | Parameter Explanation |
---|---|---|
createDatabase(String database) | Create a database | database : The name of the database to be created |
deleteDatabase(String database) | Delete a specified database | database : The name of the database to be deleted |
deleteDatabases(List<String> databases) | Batch delete databases | databases : A list of database names to be deleted |
createTimeseries(String path, TSDataType dataType, TSEncoding encoding, CompressionType compressor) | Create a single time series | path : The path of the time series,dataType : The data type,encoding : The encoding type,compressor : The compression type |
createAlignedTimeseries(...) | Create aligned time series | Device ID, list of measurement points, list of data types, list of encodings, list of compression types |
createMultiTimeseries(...) | Batch create time series | Multiple paths, data types, encodings, compression types, properties, tags, aliases, etc. |
deleteTimeseries(String path) | Delete a time series | path : The path of the time series to be deleted |
deleteTimeseries(List<String> paths) | Batch delete time series | paths : A list of time series paths to be deleted |
setSchemaTemplate(String templateName, String prefixPath) | Set a schema template | templateName : The name of template,prefixPath : The path where the template is applied |
createSchemaTemplate(Template template) | Create a schema template | template : The template object |
dropSchemaTemplate(String templateName) | Delete a schema template | templateName : The name of template to be deleted |
addAlignedMeasurementsInTemplate(...) | Add aligned measurements to a template | Template name, list of measurement paths, data type, encoding type, compression type |
addUnalignedMeasurementsInTemplate(...) | Add unaligned measurements to a template | Same as above |
deleteNodeInTemplate(String templateName, String path) | Delete a node in a template | templateName : The name of template,path : The path to be deleted |
countMeasurementsInTemplate(String name) | Count the number of measurements in a template | name : The name of template |
isMeasurementInTemplate(String templateName, String path) | Check if a measurement exists in a template | templateName : The name of template,path : The path of the measurement |
isPathExistInTemplate(String templateName, String path) | Check if a path exists in a template | same as above |
showMeasurementsInTemplate(String templateName) | Show measurements in a template | templateName : The name of template |
showMeasurementsInTemplate(String templateName, String pattern) | Show measurements in a template by pattern | templateName : The name of template,pattern : The matching pattern |
showAllTemplates() | Show all templates | No parameters |
showPathsTemplateSetOn(String templateName) | Show paths where a template is set | templateName : The name of the template |
showPathsTemplateUsingOn(String templateName) | Show actual paths using a template | Same as above上 |
unsetSchemaTemplate(String prefixPath, String templateName) | Unset the template setting for a path | prefixPath : The path,templateName : The name of template |
3.2.2 Data Insertion
Method Name | Function Description | Parameter Explanation |
---|---|---|
insertRecord(String deviceId, long time, List<String> measurements, List<TSDataType> types, Object... values) | Insert a single record | deviceId : Device ID,time : Timestamp,measurements : List of measurement points,types : List of data types,values : List of values |
insertRecord(String deviceId, long time, List<String> measurements, List<String> values) | Insert a single record | deviceId : Device ID,time : Timestamp,measurements : List of measurement points,values : List of values |
insertRecords(List<String> deviceIds, List<Long> times, List<List<String>> measurementsList, List<List<Object>> valuesList) | Insert multiple records | deviceIds : List of device IDs,times : List of timestamps,measurementsList : List of timestamps,valuesList : List of lists of values |
insertRecords(List<String> deviceIds, List<Long> times, List<List<String>> measurementsList, List<List<TSDataType>> typesList, List<List<Object>> valuesList) | Insert multiple records | Same as above,plus typesList : List of lists of data types |
insertRecordsOfOneDevice(String deviceId, List<Long> times, List<List<String>> measurementsList, List<List<TSDataType>> typesList, List<List<Object>> valuesList) | Insert multiple records for a single device | deviceId : Device ID,times : List of timestamps,measurementsList : List of lists of measurement points,typesList : List of lists of types,valuesList : List of lists of values |
insertRecordsOfOneDevice(String deviceId, List<Long> times, List<List<String>> measurementsList, List<List<TSDataType>> typesList, List<List<Object>> valuesList, boolean haveSorted) | Insert sorted multiple records for a single device | Same as above, plus haveSorted : Whether the data is already sorted |
insertStringRecordsOfOneDevice(String deviceId, List<Long> times, List<List<String>> measurementsList, List<List<String>> valuesList) | Insert string-formatted records for a single device | deviceId : Device ID,times : List of timestamps,measurementsList : List of lists of measurement points,valuesList : List of lists of values |
insertStringRecordsOfOneDevice(String deviceId, List<Long> times, List<List<String>> measurementsList, List<List<String>> valuesList, boolean haveSorted) | Insert sorted string-formatted records for a single device | Same as above, plus haveSorted : Whether the data is already sorted序 |
insertAlignedRecord(String deviceId, long time, List<String> measurements, List<TSDataType> types, List<Object> values) | Insert a single aligned record | deviceId : Device ID,time : Timestamp,measurements : List of measurement points,types : List of types,values : List of values |
insertAlignedRecord(String deviceId, long time, List<String> measurements, List<String> values) | Insert a single string-formatted aligned record | deviceId : Device IDtime : Timestamp,measurements : List of measurement points,values : List of values |
insertAlignedRecords(List<String> deviceIds, List<Long> times, List<List<String>> measurementsList, List<List<Object>> valuesList) | Insert multiple aligned records | deviceIds : List of device IDs,times : List of timestamps,measurementsList : List of lists of measurement points,valuesList : List of lists of values |
insertAlignedRecords(List<String> deviceIds, List<Long> times, List<List<String>> measurementsList, List<List<TSDataType>> typesList, List<List<Object>> valuesList) | Insert multiple aligned records | Same as above, plus typesList : List of lists of data types |
insertAlignedRecordsOfOneDevice(String deviceId, List<Long> times, List<List<String>> measurementsList, List<List<TSDataType>> typesList, List<List<Object>> valuesList) | Insert multiple aligned records for a single device | Same as above |
insertAlignedRecordsOfOneDevice(String deviceId, List<Long> times, List<List<String>> measurementsList, List<List<TSDataType>> typesList, List<List<Object>> valuesList, boolean haveSorted) | Insert sorted multiple aligned records for a single device | Same as above, plus haveSorted : Whether the data is already sorted |
insertAlignedStringRecordsOfOneDevice(String deviceId, List<Long> times, List<List<String>> measurementsList, List<List<String>> valuesList) | Insert string-formatted aligned records for a single device | deviceId : Device ID,times : List of timestamps,measurementsList : List of lists of measurement points,valuesList : List of lists of values |
insertAlignedStringRecordsOfOneDevice(String deviceId, List<Long> times, List<List<String>> measurementsList, List<List<String>> valuesList, boolean haveSorted) | Insert sorted string-formatted aligned records for a single device | Same as above, plus w haveSorted : whether the data is already sorted |
insertTablet(Tablet tablet) | Insert a single Tablet data | tablet : The Tablet data to be inserted |
insertTablet(Tablet tablet, boolean sorted) | Insert a sorted Tablet data | Same as above, plus sorted : whether the data is already sorted |
insertAlignedTablet(Tablet tablet) | Insert an aligned Tablet data | tablet : The Tablet data to be inserted |
insertAlignedTablet(Tablet tablet, boolean sorted) | Insert a sorted aligned Tablet data | Same as above, plus sorted : whether the data is already sorted |
insertTablets(Map<String, Tablet> tablets) | Insert multiple Tablet data in batch | tablets : Mapping from device IDs to Tablet data |
insertTablets(Map<String, Tablet> tablets, boolean sorted) | Insert sorted multiple Tablet data in batch | Same as above, plus sorted : whether the data is already sorted |
insertAlignedTablets(Map<String, Tablet> tablets) | Insert multiple aligned Tablet data in batch | tablets : Mapping from device IDs to Tablet data |
insertAlignedTablets(Map<String, Tablet> tablets, boolean sorted) | Insert sorted multiple aligned Tablet data in batch | Same as above, plus sorted : whether the data is already sorted |
3.2.3 Data Deletion
Method Name | Function Description | Parameter Explanation |
---|---|---|
deleteTimeseries(String path) | Delete a single time series | path : The path of the time series |
deleteTimeseries(List<String> paths) | Batch delete time series | paths : A list of time series paths |
deleteData(String path, long endTime) | Delete historical data for a specified path | path : The path,endTime : The end timestamp |
deleteData(List<String> paths, long endTime) | Batch delete historical data for specified paths | paths : A list of paths,endTime : The end timestamp |
deleteData(List<String> paths, long startTime, long endTime) | Delete historical data within a time range for specified paths | Same as above, plus startTime : The start timestamp |
3.2.4 Data Query
Method Name | Function Description | Parameter Explanation |
---|---|---|
executeQueryStatement(String sql) | Execute a query statement | sql : The query SQL statement |
executeQueryStatement(String sql, long timeoutInMs) | Execute a query statement with timeout | sql : The query SQL statement, timeoutInMs : The query timeout (in milliseconds) |
executeRawDataQuery(List<String> paths, long startTime, long endTime) | Query raw data for specified paths | paths: A list of query paths, startTime : The start timestamp, endTime : The end timestamp |
executeRawDataQuery(List<String> paths, long startTime, long endTime, long timeOut) | Query raw data for specified paths (with timeout) | Same as above, plus timeOut : The timeout time |
executeLastDataQuery(List<String> paths) | Query the latest data | paths : A list of query paths |
executeLastDataQuery(List<String> paths, long lastTime) | Query the latest data at a specified time | paths : A list of query paths, lastTime : The specified timestamp |
executeLastDataQuery(List<String> paths, long lastTime, long timeOut) | Query the latest data at a specified time (with timeout) | Same as above, plus timeOut : The timeout time |
executeLastDataQueryForOneDevice(String db, String device, List<String> sensors, boolean isLegalPathNodes) | Query the latest data for a single device | db : The database name, device : The device name, sensors : A list of sensors, isLegalPathNodes : Whether the path nodes are legal |
executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations) | Execute an aggregation query | paths : A list of query paths, aggregations : A list of aggregation types |
executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations, long startTime, long endTime) | Execute an aggregation query with a time range | Same as above, plus startTime : The start timestamp, endTime :` The end timestamp |
executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations, long startTime, long endTime, long interval) | Execute an aggregation query with a time interval | Same as above, plus interval : The time interval |
executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations, long startTime, long endTime, long interval, long slidingStep) | Execute a sliding window aggregation query | Same as above, plus slidingStep : The sliding step |
fetchAllConnections() | Get information of all active connections | No parameters |
3.2.5 System Status and Backup
Method Name | Function Description | Parameter Explanation |
---|---|---|
getBackupConfiguration() | Get backup configuration information | No parameters |
fetchAllConnections() | Get information of all active connections | No parameters |
getSystemStatus() | Get the system status | Deprecated, returns SystemStatus.NORMAL |