You need to enable JavaScript to run this app.
导航
Debezium-JSON
最近更新时间:2024.08.20 17:38:33首次发布时间:2024.08.20 17:38:33

Debezium 是一个 CDC(Changelog Data Capture,变更数据捕获)的工具,可以把来自 MySQL、PostgreSQL、Oracle、Microsoft SQL Server 和许多其他数据库的更改实时流式传输到 Kafka 中。 Debezium 为变更日志提供了统一的格式结构,并支持使用 JSON 和 Apache Avro 序列化消息。
Flink 支持将 Debezium JSON 消息解析为 INSERT / UPDATE / DELETE 消息到 Flink SQL 系统中。在很多情况下,利用这个特性非常的有用,例如。

  • 将增量数据从数据库同步到其他系统。
  • 日志审计
  • 数据库的实时物化视图。
  • 关联维度数据库的变更历史,等等。

Flink 还支持将 Flink SQL 中的 INSERT / UPDATE / DELETE 消息编码为 Debezium 格式的 JSON 消息,输出到 Kafka 等存储中。但需要注意的是,目前 Flink 还不支持将 UPDATE_BEFORE 和 UPDATE_AFTER 合并为一条 UPDATE 消息。因此,Flink 将 UPDATE_BEFORE 和 UPDATE_AFTER 分别编码为 DELETE 和 INSERT 类型的 Debezium 消息。

如何使用 Debezium-JSON

Debezium 为变更日志提供了统一的格式,这是一个 JSON 格式的从 MySQL product 表捕获的更新操作的简单示例::

{
  "before": {
    "id": 111,
    "name": "scooter",
    "description": "Big 2-wheel scooter",
    "weight": 5.18
  },
  "after": {
    "id": 111,
    "name": "scooter",
    "description": "Big 2-wheel scooter",
    "weight": 5.15
  },
  "source": {...},
  "op": "u",
  "ts_ms": 1589362330904,
  "transaction": null
}

MySQL 产品表有 4 列( idnamedescriptionweight)。上面的 JSON 消息是 products 表上的一条更新事件,其中 id = 111 的行的 weight 值从 5.18 更改为 5.15。假设此消息已同步到 Kafka 主题 products_binlog,则可以使用以下 DDL 来使用此主题并解析更改事件:

CREATE TABLE topic_products (
  -- schema 与 MySQL 的 products 表完全相同
  id BIGINT,
  name STRING,
  description STRING,
  weight DECIMAL(10, 2)
) WITH (
 'connector' = 'kafka',
 'topic' = 'products_binlog',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
  -- 使用 'debezium-json' format 来解析 Debezium 的 JSON 消息
 'format' = 'debezium-json'
)

在某些情况下,用户在设置 Debezium Kafka Connect 时,可能会开启 Kafka 的配置 'value.converter.schemas.enable',用来在消息体中包含 schema 信息。然后,Debezium JSON 消息可能如下所示:
::为了解析这一类信息,你需要在上述 DDL WITH 子句中添加选项 'debezium-json.schema-include' = 'true'(默认为 false)。通常情况下,建议不要包含 schema 的描述,因为这样会使消息变得非常冗长,并降低解析性能。
在将主题注册为 Flink 表之后,可以将 Debezium 消息用作变更日志源:

-- MySQL "products" 的实时物化视图
-- 计算相同产品的最新平均重量
SELECT name, AVG(weight) FROM topic_products GROUP BY name;

-- 将 MySQL "products" 表的所有数据和增量更改同步到
-- Elasticsearch "products" 索引,供将来查找
INSERT INTO elasticsearch_products
SELECT * FROM topic_products;

支持的元数据 (Available Metadata)

注意:仅适用于 Flink1.16 版本。

以下格式元数据可在表定义中作为只读(VIRTUAL) 列公开。
注意:格式元数据字段只有在相应的连接器支持相应的格式元数据时才可用。目前,只有 Kafka 连接器能够公开格式元数据字段。

Key

数据类型

描述

schema

STRING NULL

描述有效载荷模式的 JSON 字符串。如果模式未包含在 Debezium 记录中,则为空。

ingestion-timestamp

TIMESTAMP_LTZ(3) NULL

连接器处理事件的时间戳。对应于 Debezium 记录中的ts_ms字段。

source.timestamp

TIMESTAMP_LTZ(3) NULL

源系统创建事件的时间戳。与 Debezium 记录中的 source.ts_ms 字段相对应。

source.database

STRING NULL

源数据库。对应 Debezium 记录中的 source.db 字段(如果有)。

source.schema

STRING NULL

源数据库模式。对应 Debezium 记录中的 source.schema 字段(如果有)。

source.table

STRING NULL

源数据库表。与 Debezium 记录中的source.tablesource.collection(如果有)相对应。

source.properties

MAP<STRING, STRING> NULL

各种源属性的映射。对应于 Debezium 记录中的source字段。

下面的示例展示了如何在 Kafka 中访问 Debezium 元数据字段:

CREATE TABLE KafkaTable (
  origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
  event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
  origin_database STRING METADATA FROM 'value.source.database' VIRTUAL,
  origin_schema STRING METADATA FROM 'value.source.schema' VIRTUAL,
  origin_table STRING METADATA FROM 'value.source.table' VIRTUAL,
  origin_properties MAP<STRING, STRING> METADATA FROM 'value.source.properties' VIRTUAL,
  user_id BIGINT,
  item_id BIGINT,
  behavior STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'value.format' = 'debezium-json'
);

Format 参数

Flink 提供了 debezium-avro-confluentdebezium-json 两种 format 来解析 Debezium 生成的 JSON 格式和 Avro 格式的消息。请使 debezium-avro-confluent 来解析 Debezium 的 Avro 消息,使用 debezium-json 来解析 Debezium 的 JSON 消息。

参数

是否必选

默认值

类型

描述

format

必选

(none)

String

指定要使用的格式,此处应为 'debezium-json'

debezium-json.schema-include

可选

false

Boolean

设置 Debezium Kafka Connect 时,用户可以启用 Kafka 配置 'value.converter.schemas.enable' 以在消息中包含 schema。此选项表明 Debezium JSON 消息是否包含 schema。

debezium-json.ignore-parse-errors

可选

false

Boolean

当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null

debezium-json.timestamp-format.standard

可选

'SQL'

String

声明输入和输出的时间戳格式。当前支持的格式为'SQL' 以及 'ISO-8601'

  • 可选参数 'SQL' 将会以 "yyyy-MM-dd HH:mm:ss.s{precision}" 的格式解析时间戳,,例如 '2020-12-30 12:13:14.123',且会以相同的格式输出。
  • 可选参数 'ISO-8601' 将会以 "yyyy-MM-ddTHH:mm:ss.s{precision}" 的格式解析输入时间戳,如 '2020-12-30T12:13:14.123' ,且会以相同的格式输出。

debezium-json.map-null-key.mode

注意:仅适用于 Flink1.16 版本

选填

'FAIL'

String

指定处理 Map 中 key 值为空的方法。。当前支持的值有 'FAIL', 'DROP''LITERAL':

  • Option 'FAIL' 将抛出异常,如果遇到 Map 中 key 值为空的数据。
  • Option 'DROP' 将丢弃 Map 中 key 值为空的数据项。
  • Option 'LITERAL' 将使用字符串常量来替换 Map 中的空 key 值。字符串常量的值由 'debezium-json.map-null-key.literal' 定义。

debezium-json.map-null-key.literal

注意:仅适用于 Flink1.16 版本

选填

'null'

String

'debezium-json.map-null-key.mode' 是 LITERAL 的时候,指定字符串常量替换 Map 中的空 key 值。

debezium-json.encode.decimal-as-plain-number

注意:仅适用于 Flink1.16 版本

选填

false

Boolean

将所有 DECIMAL 类型的数据保持原状,不使用科学计数法表示。例:0.000000027 默认会表示为 2.7E-8。当此选项设为 true 时,则会表示为 0.000000027

数据类型映射

目前,Debezium Format 使用 JSON Format 进行序列化和反序列化。
下表列出了 Flink 中的数据类型与 JSON 中的数据类型的映射关系。

Flink SQL 数据类型

JSON 数据类型

CHAR / VARCHAR / STRING

string

BOOLEAN

boolean

BINARY / VARBINARY

string with encoding: base64

DECIMAL

number

TINYINT

number

SMALLINT

number

INT

number

BIGINT

number

FLOAT

number

DOUBLE

number

DATE

string with format: date

TIME

string with format: time

TIMESTAMP

string with format: date-time

TIMESTAMP_WITH_LOCAL_TIME_ZONE

string with format: date-time (with UTC time zone)

INTERVAL

number

ARRAY

array

MAP / MULTISET

object

ROW

object

注意事项

  • 处理重复的变更事件。

在正常的操作环境中,Debezium 应用能够以 exactly-once 的语义传递每条变更事件。在此情况下,Flink 消费 Debezium 生成的变更事件能够良好运行。然而,当出现故障时,Debezium 应用仅能保证 at-least-once 的投递语义。您可以查阅 Debezium 官方文档以获取更多关于 Debezium 的消息投递语义相关信息。这意味着在非正常状况下,Debezium 可能会向 Kafka 投递重复的变更事件,而当 Flink 从 Kafka 中进行消费时,就会获取到重复的事件。这可能会致使 Flink query 的运行得到错误的结果或者出现非预期的异常。因此,建议在这种情形下,将作业参数“table.exec.source.cdc-events-duplicate”设置为 true,并在该数据源上定义 PRIMARY KEY。框架会生成一个额外的有状态算子,利用该主键对变更事件进行去重,并生成一个规范化的变更日志流。

  • 消费 Debezium Postgres Connector 产生的数据

如果您正在使用 Debezium PostgreSQL Connector 将变更捕获至 Kafka,那么请确保被监控表的 REPLICA IDENTITY 已被配置为 FULL ,其默认值是 DEFAULT 。否则,Flink SQL 将无法正确解析 Debezium 数据。
当配置为 FULL 时,更新和删除事件将会完整包含所有列的之前的值。而当配置为其他情况时,更新和删除事件的“before”字段将只包含主键字段的值,或者为 null(在没有主键的情况下)。您可以通过运行“ALTER TABLE REPLICA IDENTITY FULL”来更改 REPLICA IDENTITY 的配置。