Canal 是一个 CDC(ChangeLog Data Capture,变更日志数据捕获)工具,可以实时地将 MySQL 变更传输到其他系统。Canal 为变更日志提供了统一的数据格式,并支持使用 JSON 序列化消息。
Flink 支持将 Canal-JSON 消息解析为 INSERT / UPDATE / DELETE 消息到 Flink SQL 系统中。在很多情况下,利用这个特性非常的有用,例如:
Flink 还支持将 Flink SQL 中的 INSERT / UPDATE / DELETE 消息编码为 Canal 格式的 JSON 消息,输出到 Kafka 等存储中。但需要注意的是,目前 Flink 还不支持将 UPDATE_BEFORE 和 UPDATE_AFTER 合并为一条 UPDATE 消息。因此,Flink 将 UPDATE_BEFORE 和 UPDATE_AFTER 分别编码为 DELETE 和 INSERT 类型的 Canal 消息。
注意:仅适用于 Flink1.16 版本。
Canal 为变更日志提供了统一的格式,下面是一个从 MySQL 库 products
表中捕获更新操作的简单示例:
{ "data": [ { "id": "111", "name": "scooter", "description": "Big 2-wheel scooter", "weight": "5.18" } ], "database": "inventory", "es": 1589373560000, "id": 9, "isDdl": false, "mysqlType": { "id": "INTEGER", "name": "VARCHAR(255)", "description": "VARCHAR(512)", "weight": "FLOAT" }, "old": [ { "weight": "5.15" } ], "pkNames": [ "id" ], "sql": "", "sqlType": { "id": 4, "name": 12, "description": 12, "weight": 7 }, "table": "products", "ts": 1589373560798, "type": "UPDATE" }
MySQL products
表有 4 列(id
,name
,description
和 weight
)。上面的 JSON 消息是 products
表上的一个更新事件,表示 id = 111
的行数据上 weight
字段值从5.15
变更成为 5.18
。假设消息已经同步到了一个 Kafka 主题:products_binlog
,那么就可以使用以下 DDL 来从这个主题消费消息并解析变更事件。
CREATE TABLE topic_products ( -- 元数据与 MySQL "products" 表完全相同 id BIGINT, name STRING, description STRING, weight DECIMAL(10, 2) ) WITH ( 'connector' = 'kafka', 'topic' = 'products_binlog', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'format' = 'canal-json' -- 使用 canal-json 格式 )
将 Kafka 主题注册成 Flink 表之后,就可以将 Canal 消息用作变更日志源。
-- 关于MySQL "products" 表的实时物化视图 -- 计算相同产品的最新平均重量 SELECT name, AVG(weight) FROM topic_products GROUP BY name; -- 将 MySQL "products" 表的所有数据和增量更改同步到 -- Elasticsearch "products" 索引以供将来搜索 INSERT INTO elasticsearch_products SELECT * FROM topic_products;
注意:Available Metadata 仅适用于 Flink1.16 版本。
以下格式元数据可在表定义中作为只读(VIRTUAL
) 列公开。
注意:格式元数据字段只有在相应的连接器支持相应的格式元数据时才可用。目前,只有 Kafka 连接器能够公开格式元数据字段。
Key | 数据类型 | 描述 |
---|---|---|
|
| 原始数据库。如果有的话,与 Canal 记录中的 |
|
| 源数据库表。如果有的话,与 Canal 记录中的 |
|
| 各种 sql 类型的映射。对应于 Canal 记录中的 |
|
| 主键名称数组。如果有的话,与 Canal 记录中的 |
|
| 连接器处理事件的时间戳。对应 Canal 记录中的 |
以下示例展示了如何访问 Kafka 中的 Canal 元数据字段:
CREATE TABLE KafkaTable ( origin_database STRING METADATA FROM 'value.database' VIRTUAL, origin_table STRING METADATA FROM 'value.table' VIRTUAL, origin_sql_type MAP<STRING, INT> METADATA FROM 'value.sql-type' VIRTUAL, origin_pk_names ARRAY<STRING> METADATA FROM 'value.pk-names' VIRTUAL, origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL, user_id BIGINT, item_id BIGINT, behavior STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'earliest-offset', 'value.format' = 'canal-json' );
参数 | 是否必选 | 默认 | 类型 | 描述 |
---|---|---|---|---|
format | 必填 | (none) | String | 指定要使用的格式,此处应为 |
canal-json.ignore-parse-errors | 选填 | false | Boolean | 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为 |
canal-json.timestamp-format.standard | 选填 |
| String | 指定输入和输出时间戳格式。当前支持的值是
|
canal-json.map-null-key.mode | 选填 |
| String | 指定处理 Map 中 key 值为空的方法。 当前支持的值有
|
canal-json.map-null-key.literal | 选填 | 'null' | String | 当 |
canal-json.encode.decimal-as-plain-number | 选填 | false | Boolean | 将所有 DECIMAL 类型的数据保持原状,不使用科学计数法表示。例: |
canal-json.database.include | optional | (none) | String | 一个可选的正则表达式,通过正则匹配 Canal 记录中的 "database" 元字段,仅读取指定数据库的 changelog 记录。正则字符串与 Java 的 Pattern 兼容。 |
canal-json.table.include | optional | (none) | String | 一个可选的正则表达式,通过正则匹配 Canal 记录中的 "table" 元字段,仅读取指定表的 changelog 记录。正则字符串与 Java 的 Pattern 兼容。 |
目前,Canal Format 使用 JSON Format 进行序列化和反序列化。
下表列出了 Flink 中的数据类型与 JSON 中的数据类型的映射关系。
Flink SQL 数据类型 | JSON 数据类型 |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
在正常的操作环境中,Canal 应用能够以 exactly-once 的语义投递每条变更事件。在此种情况下,Flink 消费 Canal 所产生的变更事件能够运行良好。然而,当出现故障时,Canal 应用只能保证 at-least-once
的投递语义。这也就意味着,在非正常的情形下,Canal 可能会向消息队列投递重复的变更事件,当 Flink 从消息队列中进行消费时,就会获取到重复的事件。这有可能导致 Flink query 的运行得出错误的结果或者出现非预期的异常。所以,建议在这种状况下,将作业参数table.exec.source.cdc-events-duplicate
设置为 true,并在该数据源上定义 PRIMARY KEY
。框架将会生成一个额外的有状态算子,利用该主键对变更事件进行去重,进而生成一个规范化的变更日志流。