数据同步是工业物联网的典型需求,通过数据同步机制,可实现 IoTDB 之间的数据共享,搭建完整的数据链路来满足内网外网数据互通、端边云同步、数据迁移、数据备份等需求。
1. 功能概述
1.1 数据同步
一个数据同步任务包含 3 个阶段:

- 抽取(Source)阶段:该部分用于从源 IoTDB 抽取数据,在 SQL 语句中的 source 部分定义
- 处理(Process)阶段:该部分用于处理从源 IoTDB 抽取出的数据,在 SQL 语句中的 processor 部分定义
- 发送(Sink)阶段:该部分用于向目标 IoTDB 发送数据,在 SQL 语句中的 sink 部分定义
通过 SQL 语句声明式地配置 3 个部分的具体内容,可实现灵活的数据同步能力。
1.2 功能限制及说明
- 支持 1.x 系列版本 IoTDB 数据同步到 2.x 以及以上系列版本版本的 IoTDB。
- 不支持 2.x 系列版本 IoTDB 数据同步到 1.x 系列版本版本的 IoTDB。
- 在进行数据同步任务时,请避免执行任何删除操作,防止两端状态不一致。
2. 使用说明
数据同步任务有三种状态:RUNNING、STOPPED 和 DROPPED。任务状态转换如下图所示:

提供以下 SQL 语句对同步任务进行状态管理。
2.1 创建任务
为选填项,输入 SQL 时注意 SOURCE
SQL 示例如下:
CREATE PIPE [IF NOT EXISTS] <PipeId> -- PipeId 是能够唯一标定任务的名字
-- 数据抽取插件,可选插件
[<parameter> = <value>,],
-- 数据处理插件,可选插件
[<parameter> = <value>,],
-- 数据连接插件,必填插件
[<parameter> = <value>,],
IF NOT EXISTS 语义:用于创建操作中,确保当指定 Pipe 不存在时,执行创建命令,防止因尝试创建已存在的 Pipe 而导致报错。
2.2 开始任务
创建之后,任务直接进入运行状态,不需要执行启动任务。当使用STOP PIPE
语句停止任务时需手动使用START PIPE
语句来启动任务,PIPE 发生异常情况停止后会自动重新启动任务,从而开始处理数据:
2.3 停止任务
2.4 删除任务
IF EXISTS 语义:用于删除操作中,确保当指定 Pipe 存在时,执行删除命令,防止因尝试删除不存在的 Pipe 而导致报错。
2.5 查看任务
pipe 的 show pipes 结果示例:
| ID| CreationTime| State|PipeSource|PipeProcessor| PipeSink|ExceptionMessage|RemainingEventCount|EstimatedRemainingSeconds|
|59abf95db892428b9d01c5fa318014ea|2024-06-17T14:03:44.189|RUNNING| {}| {}|{sink=iotdb-thrift-sink, sink.ip=, sink.port=6668}| | 128| 1.03|
- ID:同步任务的唯一标识符
- CreationTime:同步任务的创建的时间
- State:同步任务的状态
- PipeSource:同步数据流的来源
- PipeProcessor:同步数据流在传输过程中的处理逻辑
- PipeSink:同步数据流的目的地
- ExceptionMessage:显示同步任务的异常信息
- RemainingEventCount(统计存在延迟):剩余 event 数,当前数据同步任务中的所有 event 总数,包括数据同步的 event,以及系统和用户自定义的 event。
- EstimatedRemainingSeconds(统计存在延迟):剩余时间,基于当前 event 个数和 pipe 处速率,预估完成传输的剩余时间。
2.6 同步插件
为了使得整体架构更加灵活以匹配不同的同步场景需求,我们支持在同步任务框架中进行插件组装。系统为您预置了一些常用插件可直接使用,同时您也可以自定义 processor 插件 和 Sink 插件,并加载至 IoTDB 系统进行使用。查看系统中的插件(含自定义与内置插件)可以用以下语句:
| PluginName|PluginType| ClassName| PluginJar|
| DO-NOTHING-PROCESSOR| Builtin| org.apache.iotdb.commons.pipe.plugin.builtin.processor.donothing.DoNothingProcessor| |
| DO-NOTHING-SINK| Builtin| org.apache.iotdb.commons.pipe.plugin.builtin.connector.donothing.DoNothingConnector| |
| IOTDB-SOURCE| Builtin| org.apache.iotdb.commons.pipe.plugin.builtin.extractor.iotdb.IoTDBExtractor| |
| IOTDB-THRIFT-SINK| Builtin| org.apache.iotdb.commons.pipe.plugin.builtin.connector.iotdb.thrift.IoTDBThriftConnector| |
| IOTDB-THRIFT-SSL-SINK| Builtin| org.apache.iotdb.commons.pipe.plugin.builtin.connector.iotdb.thrift.IoTDBThriftSslConnector| |
类型 | 自定义插件 | 插件名称 | 介绍 |
source 插件 | 不支持 | iotdb-source | 默认的 extractor 插件,用于抽取 IoTDB 历史或实时数据 |
processor 插件 | 支持 | do-nothing-processor | 默认的 processor 插件,不对传入的数据做任何的处理 |
sink 插件 | 支持 | do-nothing-sink | 不对发送出的数据做任何的处理 |
iotdb-thrift-sink | 默认的 sink 插件,用于 IoTDB 到 IoTDB(V2.0.0 及以上)之间的数据传输。使用 Thrift RPC 框架传输数据,多线程 async non-blocking IO 模型,传输性能高,尤其适用于目标端为分布式时的场景 | ||
iotdb-thrift-ssl-sink | 用于 IoTDB 与 IoTDB(V2.0.0 及以上)之间的数据传输。使用 Thrift RPC 框架传输数据,多线程 sync blocking IO 模型,适用于安全需求较高的场景 |
3. 使用示例
3.1 全量数据同步
本例子用来演示将一个 IoTDB 的所有数据同步至另一个 IoTDB,数据链路如下图所示:

在这个例子中,我们可以创建一个名为 A2B 的同步任务,用来同步 A IoTDB 到 B IoTDB 间的全量数据,这里需要用到用到 sink 的 iotdb-thrift-sink 插件(内置插件),需通过 node-urls 配置目标端 IoTDB 中 DataNode 节点的数据服务端口的 url,如下面的示例语句:
create pipe A2B
with sink (
'node-urls' = '', -- 目标端 IoTDB 中 DataNode 节点的数据服务端口的 url
3.2 部分数据同步
本例子用来演示同步某个历史时间范围( 2023 年 8 月 23 日 8 点到 2023 年 10 月 23 日 8 点)的数据至另一个 IoTDB,数据链路如下图所示:

在这个例子中,我们可以创建一个名为 A2B 的同步任务。首先我们需要在 source 中定义传输数据的范围,由于传输的是历史数据(历史数据是指同步任务创建之前存在的数据),需要配置数据的起止时间 start-time 和 end-time 以及传输的模式 mode.streaming。通过 node-urls 配置目标端 IoTDB 中 DataNode 节点的数据服务端口的 url。
create pipe A2B
'source'= 'iotdb-source',
'mode.streaming' = 'true' -- 新插入数据(pipe创建后)的抽取模式:是否按流式抽取(false 时为批式)
'start-time' = '2023.08.23T08:00:00+00:00', -- 同步所有数据的开始 event time,包含 start-time
'end-time' = '2023.10.23T08:00:00+00:00' -- 同步所有数据的结束 event time,包含 end-time
with SINK (
'node-urls' = '', -- 目标端 IoTDB 中 DataNode 节点的数据服务端口的 url
3.3 双向数据传输
本例子用来演示两个 IoTDB 之间互为双活的场景,数据链路如下图所示:

在这个例子中,为了避免数据无限循环,需要将 A 和 B 上的参数source.mode.double-living
均设置为 true
,表示不转发从另一 pipe 传输而来的数据。
在 A IoTDB 上执行下列语句:
create pipe AB
with source (
'source.mode.double-living' ='true' --不转发由其他 Pipe 写入的数据
with sink (
'node-urls' = '', -- 目标端 IoTDB 中 DataNode 节点的数据服务端口的 url
在 B IoTDB 上执行下列语句:
create pipe BA
with source (
'source.mode.double-living' ='true' --不转发由其他 Pipe 写入的数据
with sink (
'node-urls' = '', -- 目标端 IoTDB 中 DataNode 节点的数据服务端口的 url
3.4 边云数据传输
本例子用来演示多个 IoTDB 之间边云传输数据的场景,数据由 B 、C、D 集群分别都同步至 A 集群,数据链路如下图所示:

在这个例子中,为了将 B 、C、D 集群的数据同步至 A,在 BA 、CA、DA 之间的 pipe 需要配置database-name 和 table-name 限制范围,详细语句如下:
在 B IoTDB 上执行下列语句,将 B 中数据同步至 A:
create pipe BA
with source (
'database-name'='db_b.*', -- 限制范围
'table-name'='.*', -- 可选择匹配所有
with sink (
'node-urls' = '', -- 目标端 IoTDB 中 DataNode 节点的数据服务端口的 url
在 C IoTDB 上执行下列语句,将 C 中数据同步至 A:
create pipe CA
with source (
'database-name'='db_c.*', -- 限制范围
'table-name'='.*', -- 可选择匹配所有
with sink (
'node-urls' = '', -- 目标端 IoTDB 中 DataNode 节点的数据服务端口的 url
在 D IoTDB 上执行下列语句,将 D 中数据同步至 A:
create pipe DA
with source (
'database-name'='db_d.*', -- 限制范围
'table-name'='.*', -- 可选择匹配所有
with sink (
'node-urls' = '', -- 目标端 IoTDB 中 DataNode 节点的数据服务端口的 url
3.5 级联数据传输
本例子用来演示多个 IoTDB 之间级联传输数据的场景,数据由 A 集群同步至 B 集群,再同步至 C 集群,数据链路如下图所示:

在这个例子中,为了将 A 集群的数据同步至 C,在 BC 之间的 pipe 需要将 source.mode.double-living
在 A IoTDB 上执行下列语句,将 A 中数据同步至 B:
create pipe AB
with sink (
'node-urls' = '', -- 目标端 IoTDB 中 DataNode 节点的数据服务端口的 url
在 B IoTDB 上执行下列语句,将 B 中数据同步至 C:
create pipe BC
with source (
'source.mode.double-living' ='true' --不转发由其他 Pipe 写入的数据
with sink (
'node-urls' = '', -- 目标端 IoTDB 中 DataNode 节点的数据服务端口的 url
3.6 压缩同步
IoTDB 支持在同步过程中指定数据压缩方式。可通过配置 compressor
目前支持 snappy / gzip / lz4 / zstd / lzma2 5 种可选算法,且可以选择多种压缩算法组合,按配置的顺序进行压缩。rate-limit-bytes-per-second
(V1.3.3 及以后版本支持)每秒最大允许传输的byte数,计算压缩后的byte,若小于0则不限制。
如创建一个名为 A2B 的同步任务:
create pipe A2B
with sink (
'node-urls' = '', -- 目标端 IoTDB 中 DataNode 节点的数据服务端口的 url
'compressor' = 'snappy,lz4' --
'rate-limit-bytes-per-second'='1048576' -- 每秒最大允许传输的byte数
3.7 加密同步
IoTDB 支持在同步过程中使用 SSL 加密,从而在不同的 IoTDB 实例之间安全地传输数据。通过配置 SSL 相关的参数,如证书地址和密码(ssl.trust-store-path
)可以确保数据在同步过程中被 SSL 加密所保护。
如创建名为 A2B 的同步任务:
create pipe A2B
with sink (
'node-urls'='', -- 目标端 IoTDB 中 DataNode 节点的数据服务端口的 url
'ssl.trust-store-path'='pki/trusted', -- 连接目标端 DataNode 所需的 trust store 证书路径
'ssl.trust-store-pwd'='root' -- 连接目标端 DataNode 所需的 trust store 证书密码
可通过修改 IoTDB 配置文件(iotdb-system.properties
# pipe_receiver_file_dir
# If this property is unset, system will save the data in the default relative path directory under the IoTDB folder(i.e., %IOTDB_HOME%/${cn_system_dir}/pipe/receiver).
# If it is absolute, system will save the data in the exact location it points to.
# If it is relative, system will save the data in the relative path directory it indicates under the IoTDB folder.
# Note: If pipe_receiver_file_dir is assigned an empty string(i.e.,zero-size), it will be handled as a relative path.
# effectiveMode: restart
# For windows platform
# If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is absolute. Otherwise, it is relative.
# pipe_receiver_file_dir=data\\confignode\\system\\pipe\\receiver
# For Linux platform
# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
### Pipe Configuration
# Uncomment the following field to configure the pipe lib directory.
# effectiveMode: first_start
# For Windows platform
# If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is
# absolute. Otherwise, it is relative.
# pipe_lib_dir=ext\\pipe
# For Linux platform
# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
# The maximum number of threads that can be used to execute the pipe subtasks in PipeSubtaskExecutor.
# The actual value will be min(pipe_subtask_executor_max_thread_num, max(1, CPU core number / 2)).
# effectiveMode: restart
# Datatype: int
# The connection timeout (in milliseconds) for the thrift client.
# effectiveMode: restart
# Datatype: int
# The maximum number of selectors that can be used in the sink.
# Recommend to set this value to less than or equal to pipe_sink_max_client_number.
# effectiveMode: restart
# Datatype: int
# The maximum number of clients that can be used in the sink.
# effectiveMode: restart
# Datatype: int
# The total bytes that all pipe sinks can transfer per second.
# When given a value less than or equal to 0, it means no limit.
# default value is -1, which means no limit.
# effectiveMode: hot_reload
# Datatype: double
source 参数
参数 | 描述 | value 取值范围 | 是否必填 | 默认取值 |
source | iotdb-source | String: iotdb-source | 必填 | - |
mode.streaming | 此参数指定时序数据写入的捕获来源。适用于 mode.streaming 为 false 模式下的场景,决定inclusion 中data.insert 数据的捕获来源。提供两种捕获策略:true: 动态选择捕获的类型。系统将根据下游处理速度,自适应地选择是捕获每个写入请求还是仅捕获 TsFile 文件的封口请求。当下游处理速度快时,优先捕获写入请求以减少延迟;当处理速度慢时,仅捕获文件封口请求以避免处理堆积。这种模式适用于大多数场景,能够实现处理延迟和吞吐量的最优平衡。false:固定按批捕获方式。仅捕获 TsFile 文件的封口请求,适用于资源受限的应用场景,以降低系统负载。注意,pipe 启动时捕获的快照数据只会以文件的方式供下游处理。 | Boolean: true / false | 否 | true |
mode.strict | 在使用 time / path / database-name / table-name 参数过滤数据时,是否需要严格按照条件筛选:true : 严格筛选。系统将完全按照给定条件过滤筛选被捕获的数据,确保只有符合条件的数据被选中。false :非严格筛选。系统在筛选被捕获的数据时可能会包含一些额外的数据,适用于性能敏感的场景,可降低 CPU 和 IO 消耗。 | Boolean: true / false | 否 | true |
mode.snapshot | 此参数决定时序数据的捕获方式,影响inclusion 中的data 数据。提供两种模式:true :静态数据捕获。启动 pipe 时,会进行一次性的数据快照捕获。当快照数据被完全消费后,pipe 将自动终止(DROP PIPE SQL 会自动执行)。false :动态数据捕获。除了在 pipe 启动时捕获快照数据外,还会持续捕获后续的数据变更。pipe 将持续运行以处理动态数据流。 | Boolean: true / false | 否 | false |
database-name | 当用户连接指定的 sql_dialect 为 table 时可以指定。此参数决定时序数据的捕获范围,影响inclusion 中的data 数据。表示要过滤的数据库的名称。它可以是具体的数据库名,也可以是 Java 风格正则表达式来匹配多个数据库。默认情况下,匹配所有的库。 | String:数据库名或数据库正则模式串,可以匹配未创建的、不存在的库 | 否 | ".*" |
table-name | 当用户连接指定的 sql_dialect 为 table 时可以指定。此参数决定时序数据的捕获范围,影响inclusion 中的data 数据。表示要过滤的表的名称。它可以是具体的表名,也可以是 Java 风格正则表达式来匹配多个表。默认情况下,匹配所有的表。 | String:数据表名或数据表正则模式串,可以是未创建的、不存在的表 | 否 | ".*" |
start-time | 此参数决定时序数据的捕获范围,影响inclusion 中的data 数据。当数据的 event time 大于等于该参数时,数据会被筛选出来进入流处理 pipe。 | Long: [Long.MIN_VALUE, Long.MAX_VALUE] (unix 裸时间戳)或 String:IoTDB 支持的 ISO 格式时间戳 | 否 | Long.MIN_VALUE(unix 裸时间戳) |
end-time | 此参数决定时序数据的捕获范围,影响inclusion 中的data 数据。当数据的 event time 小于等于该参数时,数据会被筛选出来进入流处理 pipe。 | Long: [Long.MIN_VALUE, Long.MAX_VALUE](unix 裸时间戳)或String:IoTDB 支持的 ISO 格式时间戳 | 否 | Long.MAX_VALUE(unix 裸时间戳) |
forwarding-pipe-requests | 是否转发由 pipe 数据同步而来的集群外的数据。一般供搭建双活集群时使用,双活集群模式下该参数为 false,以此避免无限的环形同步。 | Boolean: true / false | 否 | true |
💎 说明:数据抽取模式 mode.streaming 取值 true 和 false 的差异
- true(推荐):该取值下,任务将对数据进行实时处理、发送,其特点是高时效、低吞吐
- false:该取值下,任务将对数据进行批量(按底层数据文件)处理、发送,其特点是低时效、高吞吐
sink 参数
参数 | 描述 | value 取值范围 | 是否必填 | 默认取值 |
sink | iotdb-thrift-sink 或 iotdb-thrift-async-sink | String: iotdb-thrift-sink 或 iotdb-thrift-async-sink | 必填 | - |
node-urls | 目标端 IoTDB 任意多个 DataNode 节点的数据服务端口的 url(请注意同步任务不支持向自身服务进行转发) | String. 例:',,', '' | 必填 | - |
user/usename | 连接接收端使用的用户名,同步要求该用户具备相应的操作权限 | String | 选填 | root |
password | 连接接收端使用的用户名对应的密码,同步要求该用户具备相应的操作权限 | String | 选填 | root |
batch.enable | 是否开启日志攒批发送模式,用于提高传输吞吐,降低 IOPS | Boolean: true, false | 选填 | true |
batch.max-delay-seconds | 在开启日志攒批发送模式时生效,表示一批数据在发送前的最长等待时间(单位:s) | Integer | 选填 | 1 |
batch.size-bytes | 在开启日志攒批发送模式时生效,表示一批数据最大的攒批大小(单位:byte) | Long | 选填 | 1610241024 |
compressor | 所选取的 rpc 压缩算法,可配置多个,对每个请求顺序采用 | String: snappy / gzip / lz4 / zstd / lzma2 | 选填 | "" |
compressor.zstd.level | 所选取的 rpc 压缩算法为 zstd 时,可使用该参数额外配置 zstd 算法的压缩等级 | Int: [-131072, 22] | 选填 | 3 |
rate-limit-bytes-per-second | 每秒最大允许传输的 byte 数,计算压缩后的 byte(如压缩),若小于 0 则不限制 | Double: [Double.MIN_VALUE, Double.MAX_VALUE] | 选填 | -1 |
参数 | 描述 | value 取值范围 | 是否必填 | 默认取值 |
sink | iotdb-thrift-ssl-sink | String: iotdb-thrift-ssl-sink | 必填 | - |
node-urls | 目标端 IoTDB 任意多个 DataNode 节点的数据服务端口的 url(请注意同步任务不支持向自身服务进行转发) | String. 例:',,', '' | 必填 | - |
user/usename | 连接接收端使用的用户名,同步要求该用户具备相应的操作权限 | String | 选填 | root |
password | 连接接收端使用的用户名对应的密码,同步要求该用户具备相应的操作权限 | String | 选填 | root |
batch.enable | 是否开启日志攒批发送模式,用于提高传输吞吐,降低 IOPS | Boolean: true, false | 选填 | true |
batch.max-delay-seconds | 在开启日志攒批发送模式时生效,表示一批数据在发送前的最长等待时间(单位:s) | Integer | 选填 | 1 |
batch.size-bytes | 在开启日志攒批发送模式时生效,表示一批数据最大的攒批大小(单位:byte) | Long | 选填 | 1610241024 |
compressor | 所选取的 rpc 压缩算法,可配置多个,对每个请求顺序采用 | String: snappy / gzip / lz4 / zstd / lzma2 | 选填 | "" |
compressor.zstd.level | 所选取的 rpc 压缩算法为 zstd 时,可使用该参数额外配置 zstd 算法的压缩等级 | Int: [-131072, 22] | 选填 | 3 |
rate-limit-bytes-per-second | 每秒最大允许传输的 byte 数,计算压缩后的 byte(如压缩),若小于 0 则不限制 | Double: [Double.MIN_VALUE, Double.MAX_VALUE] | 选填 | -1 |
ssl.trust-store-path | 连接目标端 DataNode 所需的 trust store 证书路径 | String.Example: ',,', '' | 必填 | - |
ssl.trust-store-pwd | 连接目标端 DataNode 所需的 trust store 证书密码 | Integer | 必填 | - |