You need to enable JavaScript to run this app.
导航
Canal-JSON
最近更新时间:2024.08.20 17:38:32首次发布时间:2024.08.20 17:38:32

Canal 是一个 CDC(ChangeLog Data Capture,变更日志数据捕获)工具,可以实时地将 MySQL 变更传输到其他系统。Canal 为变更日志提供了统一的数据格式,并支持使用 JSON 序列化消息。
Flink 支持将 Canal-JSON 消息解析为 INSERT / UPDATE / DELETE 消息到 Flink SQL 系统中。在很多情况下,利用这个特性非常的有用,例如:

  • 将增量数据从数据库同步到其他系统。
  • 日志审计。
  • 数据库的实时物化视图。
  • 关联维度数据库的变更历史,等等。

Flink 还支持将 Flink SQL 中的 INSERT / UPDATE / DELETE 消息编码为 Canal 格式的 JSON 消息,输出到 Kafka 等存储中。但需要注意的是,目前 Flink 还不支持将 UPDATE_BEFORE 和 UPDATE_AFTER 合并为一条 UPDATE 消息。因此,Flink 将 UPDATE_BEFORE 和 UPDATE_AFTER 分别编码为 DELETE 和 INSERT 类型的 Canal 消息。
注意:仅适用于 Flink1.16 版本。

如何使用 Canal Format

Canal 为变更日志提供了统一的格式,下面是一个从 MySQL 库 products 表中捕获更新操作的简单示例:

{
  "data": [
    {
      "id": "111",
      "name": "scooter",
      "description": "Big 2-wheel scooter",
      "weight": "5.18"
    }
  ],
  "database": "inventory",
  "es": 1589373560000,
  "id": 9,
  "isDdl": false,
  "mysqlType": {
    "id": "INTEGER",
    "name": "VARCHAR(255)",
    "description": "VARCHAR(512)",
    "weight": "FLOAT"
  },
  "old": [
    {
      "weight": "5.15"
    }
  ],
  "pkNames": [
    "id"
  ],
  "sql": "",
  "sqlType": {
    "id": 4,
    "name": 12,
    "description": 12,
    "weight": 7
  },
  "table": "products",
  "ts": 1589373560798,
  "type": "UPDATE"
}

MySQL products 表有 4 列(idnamedescriptionweight)。上面的 JSON 消息是 products 表上的一个更新事件,表示 id = 111 的行数据上 weight 字段值从5.15变更成为 5.18。假设消息已经同步到了一个 Kafka 主题:products_binlog,那么就可以使用以下 DDL 来从这个主题消费消息并解析变更事件。

CREATE TABLE topic_products (
  -- 元数据与 MySQL "products" 表完全相同
  id BIGINT,
  name STRING,
  description STRING,
  weight DECIMAL(10, 2)
) WITH (
 'connector' = 'kafka',
 'topic' = 'products_binlog',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'canal-json'  -- 使用 canal-json 格式
)

将 Kafka 主题注册成 Flink 表之后,就可以将 Canal 消息用作变更日志源。

-- 关于MySQL "products" 表的实时物化视图
-- 计算相同产品的最新平均重量
SELECT name, AVG(weight) FROM topic_products GROUP BY name;

-- 将 MySQL "products" 表的所有数据和增量更改同步到
-- Elasticsearch "products" 索引以供将来搜索
INSERT INTO elasticsearch_products
SELECT * FROM topic_products;

Available Metadata

注意:Available Metadata 仅适用于 Flink1.16 版本。

以下格式元数据可在表定义中作为只读(VIRTUAL) 列公开。
注意:格式元数据字段只有在相应的连接器支持相应的格式元数据时才可用。目前,只有 Kafka 连接器能够公开格式元数据字段。

Key

数据类型

描述

database

STRING NULL

原始数据库。如果有的话,与 Canal 记录中的database字段相对应。

table

STRING NULL

源数据库表。如果有的话,与 Canal 记录中的table字段相对应。

sql-type

MAP<STRING, INT> NULL

各种 sql 类型的映射。对应于 Canal 记录中的 sqlType 字段(如果可用)。

pk-names

ARRAY<STRING> NULL

主键名称数组。如果有的话,与 Canal 记录中的 pkNames 字段相对应。

ingestion-timestamp

TIMESTAMP_LTZ(3) NULL

连接器处理事件的时间戳。对应 Canal 记录中的ts字段。

以下示例展示了如何访问 Kafka 中的 Canal 元数据字段:

CREATE TABLE KafkaTable (
  origin_database STRING METADATA FROM 'value.database' VIRTUAL,
  origin_table STRING METADATA FROM 'value.table' VIRTUAL,
  origin_sql_type MAP<STRING, INT> METADATA FROM 'value.sql-type' VIRTUAL,
  origin_pk_names ARRAY<STRING> METADATA FROM 'value.pk-names' VIRTUAL,
  origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
  user_id BIGINT,
  item_id BIGINT,
  behavior STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'value.format' = 'canal-json'
);

Format 参数

参数

是否必选

默认

类型

描述

format

必填

(none)

String

指定要使用的格式,此处应为 'canal-json'.

canal-json.ignore-parse-errors

选填

false

Boolean

当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null

canal-json.timestamp-format.standard

选填

'SQL'

String

指定输入和输出时间戳格式。当前支持的值是 'SQL''ISO-8601':

  • 选项 'SQL' 将解析 "yyyy-MM-dd HH:mm:ss.s{precision}" 格式的输入时间戳,例如 '2020-12-30 12:13:14.123',并以相同格式输出时间戳。
  • 选项 'ISO-8601' 将解析 "yyyy-MM-ddTHH:mm:ss.s{precision}" 格式的输入时间戳,例如 '2020-12-30T12:13:14.123',并以相同的格式输出时间戳。

canal-json.map-null-key.mode

选填

'FAIL'

String

指定处理 Map 中 key 值为空的方法。 当前支持的值有 'FAIL', 'DROP''LITERAL':

  • Option 'FAIL' 将抛出异常,如果遇到 Map 中 key 值为空的数据。
  • Option 'DROP' 将丢弃 Map 中 key 值为空的数据项。
  • Option 'LITERAL' 将使用字符串常量来替换 Map 中的空 key 值。字符串常量的值由 'canal-json.map-null-key.literal' 定义。

canal-json.map-null-key.literal

选填

'null'

String

'canal-json.map-null-key.mode' 是 LITERAL 的时候,指定字符串常量替换 Map 中的空 key 值。

canal-json.encode.decimal-as-plain-number

选填

false

Boolean

将所有 DECIMAL 类型的数据保持原状,不使用科学计数法表示。例:0.000000027 默认会表示为 2.7E-8。当此选项设为 true 时,则会表示为 0.000000027

canal-json.database.include

optional

(none)

String

一个可选的正则表达式,通过正则匹配 Canal 记录中的 "database" 元字段,仅读取指定数据库的 changelog 记录。正则字符串与 Java 的 Pattern 兼容。

canal-json.table.include

optional

(none)

String

一个可选的正则表达式,通过正则匹配 Canal 记录中的 "table" 元字段,仅读取指定表的 changelog 记录。正则字符串与 Java 的 Pattern 兼容。

数据类型映射

目前,Canal Format 使用 JSON Format 进行序列化和反序列化。
下表列出了 Flink 中的数据类型与 JSON 中的数据类型的映射关系。

Flink SQL 数据类型

JSON 数据类型

CHAR / VARCHAR / STRING

string

BOOLEAN

boolean

BINARY / VARBINARY

string with encoding: base64

DECIMAL

number

TINYINT

number

SMALLINT

number

INT

number

BIGINT

number

FLOAT

number

DOUBLE

number

DATE

string with format: date

TIME

string with format: time

TIMESTAMP

string with format: date-time

TIMESTAMP_WITH_LOCAL_TIME_ZONE

string with format: date-time (with UTC time zone)

INTERVAL

number

ARRAY

array

MAP / MULTISET

object

ROW

object

注意事项

  • 重复的变更事件。

在正常的操作环境中,Canal 应用能够以 exactly-once 的语义投递每条变更事件。在此种情况下,Flink 消费 Canal 所产生的变更事件能够运行良好。然而,当出现故障时,Canal 应用只能保证 at-least-once 的投递语义。这也就意味着,在非正常的情形下,Canal 可能会向消息队列投递重复的变更事件,当 Flink 从消息队列中进行消费时,就会获取到重复的事件。这有可能导致 Flink query 的运行得出错误的结果或者出现非预期的异常。所以,建议在这种状况下,将作业参数table.exec.source.cdc-events-duplicate 设置为 true,并在该数据源上定义 PRIMARY KEY。框架将会生成一个额外的有状态算子,利用该主键对变更事件进行去重,进而生成一个规范化的变更日志流。