Maxwell 是一个 CDC(变更日志数据捕获)工具,可以将更改从 MySQL 实时流式传输到 Kafka、Kinesis 和其他流连接器。 Maxwell 为变更日志提供了统一的格式架构,并支持使用 JSON 序列化消息。
Flink 支持将 Maxwell-JSON 消息解释为 Flink SQL 系统中的 INSERT/UPDATE/DELETE 消息。在许多情况下,这对于利用此功能很有用,例如:
Flink 还支持将 Flink SQL 中的 INSERT/UPDATE/DELETE 消息编码为 Maxwell-JSON 消息,并发送到 Kafka 等外部系统。但是,目前 Flink 无法将 UPDATE_BEFORE 和 UPDATE_AFTER 组合成单个 UPDATE 消息。因此,Flink 将 UPDATE_BEFORE 和 UDPATE_AFTER 编码为 DELETE 和 INSERT 的 Maxwell 消息。
Maxwell 为变更日志提供了统一的格式,这是一个 JSON 格式的从 MySQL product 表捕获的更新操作的简单示例:
{ "database":"test", "table":"e", "type":"insert", "ts":1477053217, "xid":23396, "commit":true, "position":"master.000006:800911", "server_id":23042, "thread_id":108, "primary_key": [1, "2016-10-21 05:33:37.523000"], "primary_key_columns": ["id", "c"], "data":{ "id":111, "name":"scooter", "description":"Big 2-wheel scooter", "weight":5.15 }, "old":{ "weight":5.18, } }
MySQL 产品表有 4 列(id
、name
、description
、weight
)。上面的 JSON 消息是 products
表上的一条更新事件,其中 id = 111
的行的 weight
值从 5.18
更改为 5.15
。假设此消息已同步到 Kafka 主题 products_binlog
,则可以使用以下 DDL 来使用此主题并解析更改事件。
CREATE TABLE topic_products ( -- schema is totally the same to the MySQL "products" table id BIGINT, name STRING, description STRING, weight DECIMAL(10, 2) ) WITH ( 'connector' = 'kafka', 'topic' = 'products_binlog', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'format' = 'maxwell-json' )
将主题注册为 Flink 表后,您可以使用 Maxwell 消息作为变更日志源。
-- a real-time materialized view on the MySQL "products" -- which calculate the latest average of weight for the same products SELECT name, AVG(weight) FROM topic_products GROUP BY name; -- synchronize all the data and incremental changes of MySQL "products" table to -- Elasticsearch "products" index for future searching INSERT INTO elasticsearch_products SELECT * FROM topic_products;
以下格式元数据可在表定义中作为只读(VIRTUAL
) 列公开。
注意:格式元数据字段只有在相应的连接器支持相应的格式元数据时才可用。目前,只有 Kafka 连接器能够公开格式元数据字段。
Key | 数据类型 | 描述 |
---|---|---|
|
| 原始数据库。对应于 Maxwell 记录中的 |
|
| 原始数据库表。对应于 Maxwell 记录中的 |
|
| 主键名称数组。对应于 Maxwell 记录中的 |
|
| 连接器处理事件的时间戳。对应 Maxwell 记录中的 |
以下示例展示了如何访问 Kafka 中的 Maxwell 元数据字段:
CREATE TABLE KafkaTable ( origin_database STRING METADATA FROM 'value.database' VIRTUAL, origin_table STRING METADATA FROM 'value.table' VIRTUAL, origin_primary_key_columns ARRAY<STRING> METADATA FROM 'value.primary-key-columns' VIRTUAL, origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL, user_id BIGINT, item_id BIGINT, behavior STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'earliest-offset', 'value.format' = 'maxwell-json' );
参数 | 是否必须 | 默认值 | 类型 | 描述 |
---|---|---|---|---|
format | required | (none) | String | 指定要使用的格式,此处应为 |
maxwell-json.ignore-parse-errors | optional | false | Boolean | 跳过有解析错误的字段和行,而不是失败。如果出现错误,字段将被设置为 |
maxwell-json.timestamp-format.standard | optional |
| String | 声明输入和输出的时间戳格式。当前支持的格式为
|
maxwell-json.map-null-key.mode | optional |
| String | 指定处理 Map 中 key 值为空的方法。。当前支持的值有
|
maxwell-json.map-null-key.literal | optional | 'null' | String | 当 |
maxwell-json.encode.decimal-as-plain-number | optional | false | Boolean | 将所有 DECIMAL 类型的数据保持原状,不使用科学计数法表示。例: |
目前,Maxwell-JSON 使用 JSON Format 进行序列化和反序列化。
下表列出了 Flink 中的数据类型与 JSON 中的数据类型的映射关系。
Flink SQL 数据类型 | JSON 数据类型 |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
table.exec.source.cdc-events-duplicate
设置成 true
,并在该 source 上定义 PRIMARY KEY。框架会生成一个额外的有状态算子,使用该 primary key 来对变更事件去重并生成一个规范化的 changelog 流。