本文介绍数据库传输服务 DTS 支持的数据订阅格式。
数据库传输服务 DTS 支持订阅格式如下表所示。
说明
Avro 格式订阅格式当前属于邀测阶段,如需使用请提交工单联系技术支持。
订阅格式 | 说明 |
---|---|
火山引擎 Proto | 关于火山引擎 Proto 格式的详细示例,请参见 Volc Proto。 |
Canal Proto | 数据传输服务 DTS 可以解析数据库的增量日志,并将增量数据封装成 Canal Proto 格式同步到数据中间件,实现增量数据的订阅和消费。关于 Canal Proto 格式的详细示例,请参见 Canal Proto。 |
Canal JSON | Canal JSON 是 Canal 定义的一种解析数据库增量日志的数据存储格式。数据传输服务 DTS 可以将增量数据封装成 Canal JSON 格式同步至数据中间件,实现增量数据的订阅和消费。关于 Canal JSON 格式的字段信息和相应示例,请参见 Canal JSON。 |
Avro 格式 | Avro 订阅格式是一种数据序列化格式,可以将数据结构或对象转化成便于传输的格式,主要适用于大数据处理和复杂分布式系统的场景。关于 Avro 格式的详细实例,请参见 Avro。 |
下文介绍火山引擎数据库传输服务 DTS 如何实现 Canal JSON 的数据格式。
字段 | 类型 | 说明 | 示例 |
---|---|---|---|
id | Integer | 操作的序列号。默认取值为 0。 | 0 |
database | String | 数据库的名称。 | dts_test |
table | String | 表名。 | test |
pkNames | Array of String | 组成主键的所有列名。 |
|
isDdl | Bool | 数据库传输服务 DTS 判断数据库执行的语句是否是 DDL 操作。取值为:
| true |
type | String | 数据库的操作类型,包括 INSERT 、UPDATE 、DELETE 、CREATE 、TRUNCATE 、ERASE 、QUERY 等。 | CREATE |
es | Integer | Binlog 中的毫秒级时间戳,即数据原始变更的时间。 | 1682578031000 |
ts | Integer | 数据库传输服务 DTS 生成该条消息的毫秒级时间戳。 | 1682578031120 |
sql | String | SQL 语句。 说明 在执行 DML 操作时,该取值为空。 |
|
sqlType | Map | 在执行 DML 操作时,记录每一列数据在 Java 应用程序接口中的类型。 |
|
mysqlType | Map | 在执行 DML 操作时,记录每一列数据在 MySQL 中的类型。 |
|
data | Array of Map | 在执行 DML 操作时,变更后的数据,包含每一个表结构字段的 Key-value 结构。 |
|
old | Array of Map | 在执行 DML 操作时,变更前的数据,包含变更前每一个表结构字段的 Key-value 结构。 说明 仅 |
|
pgType | Map | 在数据源的数据库类型为 PostgreSQL 且在执行 DML 操作时,PostgreSQL 数据库中每一列的数据类型。 说明 数据库传输服务 DTS 的新增字段。 | null |
下文以云数据库 MySQL 版为例,介绍数据库传输服务 DTS 将数据库的 DDL 或 DML 操作编码解析为 Canal JSON 格式的示例:
DDL
{ "id":0, "database":"dts_test", "table":"test", "pkNames":null, "isDdl":true, "type":"CREATE", "es":1682578031000, "ts":1682578031120, "sql":"CREATE TABLE test (id INT NOT NULL PRIMARY KEY, name CHAR(255), message LONGTEXT)", "sqlType":null, "mysqlType":null, "data":null, "old":null, "pgType":null }
DML
{ "id":0, "database":"dts_test", "table":"test", "pkNames":[ "id" ], "isDdl":false, "type":"INSERT", "es":1682578770000, "ts":1682578770185, "sql":"", "sqlType":{ "id":4, "message":2005, "name":1 }, "mysqlType":{ "id":"int(11)", "message":"longtext", "name":"char(255)" }, "data":[ { "id":"1", "message":"This is a longtext", "name":"Bob" } ], "old":null, "pgType":{ } }