本文介绍数据库传输服务 DTS 支持的数据订阅格式。
数据库传输服务 DTS 支持订阅格式如下表所示。
说明
Avro 格式和 Debezium JSON 格式当前属于邀测阶段,如需使用请提交工单联系技术支持。
订阅格式 | 说明 |
---|---|
火山引擎 Proto | 关于火山引擎 Proto 格式的详细示例,请参见 Volc Proto。 |
Canal Proto | 数据传输服务 DTS 可以解析数据库的增量日志,并将增量数据封装成 Canal Proto 格式同步到数据中间件,实现增量数据的订阅和消费。关于 Canal Proto 格式的详细示例,请参见 Canal Proto。 |
Canal JSON | Canal JSON 是 Canal 定义的一种解析数据库增量日志的数据存储格式。数据传输服务 DTS 可以将增量数据封装成 Canal JSON 格式同步至数据中间件,实现增量数据的订阅和消费。关于 Canal JSON 格式的字段信息和相应示例,请参见 Canal JSON。 |
Avro 格式 | Avro 订阅格式是一种数据序列化格式,可以将数据结构或对象转化成便于传输的格式,主要适用于大数据处理和复杂分布式系统的场景。关于 Avro 格式的详细实例,请参见 Avro。 |
Debezium JSON 格式 | Debezium 是一个开源的分布式平台,用于捕获数据库变化并将其以事件流的形式发送到 Kafka 等消息队列中。关于 Debezium JSON 格式的更多信息,请参见 Debezium JSON。 |
下文介绍火山引擎数据库传输服务 DTS 如何实现 Canal JSON 的数据格式。
字段 | 类型 | 说明 | 示例 |
---|---|---|---|
id | Integer | 操作的序列号。默认取值为 0。 | 0 |
database | String | 数据库的名称。 | dts_test |
table | String | 表名。 | test |
pkNames | Array of String | 组成主键的所有列名。 |
|
isDdl | Bool | 数据库传输服务 DTS 判断数据库执行的语句是否是 DDL 操作。取值为:
| true |
type | String | 数据库的操作类型,包括 INSERT 、UPDATE 、DELETE 、CREATE 、TRUNCATE 、ERASE 、QUERY 等。 | CREATE |
es | Integer | Binlog 中的毫秒级时间戳,即数据原始变更的时间。 | 1682578031000 |
ts | Integer | 数据库传输服务 DTS 生成该条消息的毫秒级时间戳。 | 1682578031120 |
sql | String | SQL 语句。 说明 在执行 DML 操作时,该取值为空。 |
|
sqlType | Map | 在执行 DML 操作时,记录每一列数据在 Java 应用程序接口中的类型。 |
|
mysqlType | Map | 在执行 DML 操作时,记录每一列数据在 MySQL 中的类型。 |
|
data | Array of Map | 在执行 DML 操作时,变更后的数据,包含每一个表结构字段的 Key-value 结构。 |
|
old | Array of Map | 在执行 DML 操作时,变更前的数据,包含变更前每一个表结构字段的 Key-value 结构。 说明 仅 |
|
pgType | Map | 在数据源的数据库类型为 PostgreSQL 且在执行 DML 操作时,PostgreSQL 数据库中每一列的数据类型。 说明 数据库传输服务 DTS 的新增字段。 | null |
下文以云数据库 MySQL 版为例,介绍数据库传输服务 DTS 将数据库的 DDL 或 DML 操作编码解析为 Canal JSON 格式的示例:
DDL
{ "id":0, "database":"dts_test", "table":"test", "pkNames":null, "isDdl":true, "type":"CREATE", "es":1682578031000, "ts":1682578031120, "sql":"CREATE TABLE test (id INT NOT NULL PRIMARY KEY, name CHAR(255), message LONGTEXT)", "sqlType":null, "mysqlType":null, "data":null, "old":null, "pgType":null }
DML
{ "id":0, "database":"dts_test", "table":"test", "pkNames":[ "id" ], "isDdl":false, "type":"INSERT", "es":1682578770000, "ts":1682578770185, "sql":"", "sqlType":{ "id":4, "message":2005, "name":1 }, "mysqlType":{ "id":"int(11)", "message":"longtext", "name":"char(255)" }, "data":[ { "id":"1", "message":"This is a longtext", "name":"Bob" } ], "old":null, "pgType":{ } }
Debezium JSON 格式由以下两部分字段组成:
schema
:表示事件的架构,包括字段类型和结构。用于了解消费端的数据结构。
payload
:表示实际的事件数据。DML 事件包括的主要字段如下表所示:
子字段 | 说明 |
---|---|
before | 表示前镜像,包含在更新或删除操作之前的行数据。如果是插入操作,该字段为 null。 |
after | 表示后镜像,包含在插入或更新操作之后的行数据。如果是删除操作,该字段为 null。 |
source | 表示数据源信息,为数据源相关的信息。 |
op | 表示 DML 类型。 |
ts_ms | 表示 DTS 处理事件发生的时间戳(以毫秒为单位),用于记录事件的实际发生时间。 |
更多字段详细信息,请参见 Debezium JSON。
下文以云数据库 MySQL 版为例,介绍数据库传输服务 DTS 将数据库的 DML 操作编码解析为 Debezium JSON 格式的示例:
表结构示例如下。
create table score_table(id int primary key, namr varchar(32), score double);
对表进行插入操作:
insert into score_table values(1, 'tom', 86.0);
预期输出:
{ "schema": {...}, "payload": { "before": null, "after": { "id": 1, "name": "tom", "score": 86.0 }, "source": {...}, "op": "c", "ts_ms": 1731068683523, "transaction": null } }
对表进行更新操作:
update score_table set score = 95.0 where id = 2;
预期输出:
{ "schema": {...}, "payload": { "before": { "id": 1, "name": "tom", "score": 86.0 }, "after": { "id": 1, "name": "tom", "score": 95.0 }, "source": {...}, "op": "u", "ts_ms": 1731068693523, "transaction": null } }
对表进行删除操作:
delete from score_table where id = 1;
预期输出:
{ "schema": {...}, "payload": { "before": { "id": 1, "name": "tom", "score": 86.0 }, "after": null, "source": {...}, "op": "d", "ts_ms": 1731068713052, "transaction": null } }