You need to enable JavaScript to run this app.
导航
数据订阅格式
最近更新时间:2024.12.31 11:00:36首次发布时间:2022.08.11 14:28:36

本文介绍数据库传输服务 DTS 支持的数据订阅格式。

订阅格式

数据库传输服务 DTS 支持订阅格式如下表所示。

说明

Avro 格式Debezium JSON 格式当前属于邀测阶段,如需使用请提交工单联系技术支持。

订阅格式说明
火山引擎 Proto关于火山引擎 Proto 格式的详细示例,请参见 Volc Proto
Canal Proto数据传输服务 DTS 可以解析数据库的增量日志,并将增量数据封装成 Canal Proto 格式同步到数据中间件,实现增量数据的订阅和消费。关于 Canal Proto 格式的详细示例,请参见 Canal Proto
Canal JSONCanal JSON 是 Canal 定义的一种解析数据库增量日志的数据存储格式。数据传输服务 DTS 可以将增量数据封装成 Canal JSON 格式同步至数据中间件,实现增量数据的订阅和消费。关于 Canal JSON 格式的字段信息和相应示例,请参见 Canal JSON

Avro 格式

Avro 订阅格式是一种数据序列化格式,可以将数据结构或对象转化成便于传输的格式,主要适用于大数据处理和复杂分布式系统的场景。关于 Avro 格式的详细实例,请参见 Avro

Debezium JSON 格式Debezium 是一个开源的分布式平台,用于捕获数据库变化并将其以事件流的形式发送到 Kafka 等消息队列中。关于 Debezium JSON 格式的更多信息,请参见 Debezium JSON

Canal JSON

下文介绍火山引擎数据库传输服务 DTS 如何实现 Canal JSON 的数据格式。

字段说明

字段类型说明示例
idInteger操作的序列号。默认取值为 0。0
databaseString数据库的名称。dts_test
tableString表名。test

pkNames

Array of String

组成主键的所有列名。

[
    "id"
]

isDdl

Bool

数据库传输服务 DTS 判断数据库执行的语句是否是 DDL 操作。取值为:

  • true:表示是。

  • false:表示否。

true

typeString数据库的操作类型,包括 INSERTUPDATEDELETECREATETRUNCATEERASEQUERY等。CREATE
esIntegerBinlog 中的毫秒级时间戳,即数据原始变更的时间。1682578031000
tsInteger数据库传输服务 DTS 生成该条消息的毫秒级时间戳。1682578031120

sql

String

SQL 语句。

说明

在执行 DML 操作时,该取值为空。

CREATE TABLE test (id INT NOT NULL PRIMARY KEY, name CHAR(255), message LONGTEXT)

sqlType

Map

在执行 DML 操作时,记录每一列数据在 Java 应用程序接口中的类型。

{
     "id":4,
     "message":2005,
     "name":1
 }

mysqlType

Map

在执行 DML 操作时,记录每一列数据在 MySQL 中的类型。

{
     "id":"int(11)",
     "message":"longtext",
     "name":"char(255)"
 }

data

Array of Map

在执行 DML 操作时,变更后的数据,包含每一个表结构字段的 Key-value 结构。

[
   {
         "id":"1",
         "message":"This is a longtext",
         "name":"Bob"
    }
]

old

Array of Map

在执行 DML 操作时,变更前的数据,包含变更前每一个表结构字段的 Key-value 结构。

说明

UPDATE 操作时不为空。

[
     {
          "id":"1",
          "message":"This is a longtext",
          "name":"Bob"
     }
]

pgType

Map

在数据源的数据库类型为 PostgreSQL 且在执行 DML 操作时,PostgreSQL 数据库中每一列的数据类型。

说明

数据库传输服务 DTS 的新增字段。

null

示例

下文以云数据库 MySQL 版为例,介绍数据库传输服务 DTS 将数据库的 DDL 或 DML 操作编码解析为 Canal JSON 格式的示例:

  • DDL

    {
        "id":0,
        "database":"dts_test",
        "table":"test",
        "pkNames":null,
        "isDdl":true,
        "type":"CREATE",
        "es":1682578031000,
        "ts":1682578031120,
        "sql":"CREATE TABLE test (id INT NOT NULL PRIMARY KEY, name CHAR(255), message LONGTEXT)",
        "sqlType":null,
        "mysqlType":null,
        "data":null,
        "old":null,
        "pgType":null
    }
    
  • DML

    {
    	    "id":0,
    	    "database":"dts_test",
    	    "table":"test",
    	    "pkNames":[
    	        "id"
    	    ],
    	    "isDdl":false,
    	    "type":"INSERT",
    	    "es":1682578770000,
    	    "ts":1682578770185,
    	    "sql":"",
    	    "sqlType":{
    	        "id":4,
    	        "message":2005,
    	        "name":1
    	    },
    	    "mysqlType":{
    	        "id":"int(11)",
    	        "message":"longtext",
    	        "name":"char(255)"
    	    },
    	    "data":[
    	        {
    	            "id":"1",
    	            "message":"This is a longtext",
    	            "name":"Bob"
    	        }
    	    ],
    	    "old":null,
    	    "pgType":{
    	    
    	    }
    	}
    	

Debezium JSON 格式

字段说明

Debezium JSON 格式由以下两部分字段组成:

  • schema:表示事件的架构,包括字段类型和结构。用于了解消费端的数据结构。

  • payload:表示实际的事件数据。DML 事件包括的主要字段如下表所示:

    子字段说明
    before表示前镜像,包含在更新或删除操作之前的行数据。如果是插入操作,该字段为 null。
    after表示后镜像,包含在插入或更新操作之后的行数据。如果是删除操作,该字段为 null。
    source表示数据源信息,为数据源相关的信息。
    op表示 DML 类型。
    ts_ms表示 DTS 处理事件发生的时间戳(以毫秒为单位),用于记录事件的实际发生时间。

更多字段详细信息,请参见 Debezium JSON

示例

下文以云数据库 MySQL 版为例,介绍数据库传输服务 DTS 将数据库的 DML 操作编码解析为 Debezium JSON 格式的示例:
表结构示例如下。

create table score_table(id int primary key, namr varchar(32), score double);
  • 对表进行插入操作:

    insert into score_table values(1, 'tom', 86.0);
    

    预期输出:

    {
        "schema": {...},
        "payload": {
            "before": null,
            "after": {
                "id": 1,
                "name": "tom",
                "score": 86.0
            },
            "source": {...},
            "op": "c",
            "ts_ms": 1731068683523,
            "transaction": null
        }
    }
    
  • 对表进行更新操作:

    update score_table set score = 95.0 where id = 2;
    

    预期输出:

    {
        "schema": {...},
        "payload": {
            "before": {
                "id": 1,
                "name": "tom",
                "score": 86.0
            },
            "after": {
                "id": 1,
                "name": "tom",
                "score": 95.0
            },
            "source": {...},
            "op": "u",
            "ts_ms": 1731068693523,
            "transaction": null
        }
    }
    
  • 对表进行删除操作:

    delete from score_table where id = 1;
    

    预期输出:

    {
        "schema": {...},
        "payload": {
            "before": {
                "id": 1,
                "name": "tom",
                "score": 86.0
            },
            "after": null,
            "source": {...},
            "op": "d",
            "ts_ms": 1731068713052,
            "transaction": null
        }
    }
    

相关文档