You need to enable JavaScript to run this app.
导航
Maxwell-JSON
最近更新时间:2024.08.20 17:38:32首次发布时间:2024.08.20 17:38:32

Maxwell 是一个 CDC(变更日志数据捕获)工具,可以将更改从 MySQL 实时流式传输到 Kafka、Kinesis 和其他流连接器。 Maxwell 为变更日志提供了统一的格式架构,并支持使用 JSON 序列化消息。
Flink 支持将 Maxwell-JSON 消息解释为 Flink SQL 系统中的 INSERT/UPDATE/DELETE 消息。在许多情况下,这对于利用此功能很有用,例如:

  • 将增量数据从数据库同步到其他系统。
  • 日志审计。
  • 数据库的实时物化视图。
  • 关联维度数据库的变更历史,等等。

Flink 还支持将 Flink SQL 中的 INSERT/UPDATE/DELETE 消息编码为 Maxwell-JSON 消息,并发送到 Kafka 等外部系统。但是,目前 Flink 无法将 UPDATE_BEFORE 和 UPDATE_AFTER 组合成单个 UPDATE 消息。因此,Flink 将 UPDATE_BEFORE 和 UDPATE_AFTER 编码为 DELETE 和 INSERT 的 Maxwell 消息。

如何使用 Maxwell-JSON

Maxwell 为变更日志提供了统一的格式,这是一个 JSON 格式的从 MySQL product 表捕获的更新操作的简单示例:

{
   "database":"test",
   "table":"e",
   "type":"insert",
   "ts":1477053217,
   "xid":23396,
   "commit":true,
   "position":"master.000006:800911",
   "server_id":23042,
   "thread_id":108,
   "primary_key": [1, "2016-10-21 05:33:37.523000"],
   "primary_key_columns": ["id", "c"],
   "data":{
     "id":111,
     "name":"scooter",
     "description":"Big 2-wheel scooter",
     "weight":5.15
   },
   "old":{
     "weight":5.18,
   }
}

MySQL 产品表有 4 列(idnamedescriptionweight)。上面的 JSON 消息是 products 表上的一条更新事件,其中 id = 111 的行的 weight 值从 5.18 更改为 5.15。假设此消息已同步到 Kafka 主题 products_binlog,则可以使用以下 DDL 来使用此主题并解析更改事件。

CREATE TABLE topic_products (
  -- schema is totally the same to the MySQL "products" table
  id BIGINT,
  name STRING,
  description STRING,
  weight DECIMAL(10, 2)
) WITH (
 'connector' = 'kafka',
 'topic' = 'products_binlog',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'maxwell-json'
)

将主题注册为 Flink 表后,您可以使用 Maxwell 消息作为变更日志源。

-- a real-time materialized view on the MySQL "products"
-- which calculate the latest average of weight for the same products
SELECT name, AVG(weight) FROM topic_products GROUP BY name;

-- synchronize all the data and incremental changes of MySQL "products" table to
-- Elasticsearch "products" index for future searching
INSERT INTO elasticsearch_products
SELECT * FROM topic_products;

可用元数据 (Available Metadata)

以下格式元数据可在表定义中作为只读(VIRTUAL) 列公开。
注意:格式元数据字段只有在相应的连接器支持相应的格式元数据时才可用。目前,只有 Kafka 连接器能够公开格式元数据字段。

Key

数据类型

描述

database

STRING NULL

原始数据库。对应于 Maxwell 记录中的database字段(如果可用)。

table

STRING NULL

原始数据库表。对应于 Maxwell 记录中的table字段(如果可用)。

primary-key-columns

ARRAY<STRING> NULL

主键名称数组。对应于 Maxwell 记录中的 primary-key-columns 字段(如果可用)。

ingestion-timestamp

TIMESTAMP_LTZ(3) NULL

连接器处理事件的时间戳。对应 Maxwell 记录中的ts字段。

以下示例展示了如何访问 Kafka 中的 Maxwell 元数据字段:

CREATE TABLE KafkaTable (
  origin_database STRING METADATA FROM 'value.database' VIRTUAL,
  origin_table STRING METADATA FROM 'value.table' VIRTUAL,
  origin_primary_key_columns ARRAY<STRING> METADATA FROM 'value.primary-key-columns' VIRTUAL,
  origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
  user_id BIGINT,
  item_id BIGINT,
  behavior STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'value.format' = 'maxwell-json'
);

Format 参数

参数

是否必须

默认值

类型

描述

format

required

(none)

String

指定要使用的格式,此处应为 'maxwell-json'

maxwell-json.ignore-parse-errors

optional

false

Boolean

跳过有解析错误的字段和行,而不是失败。如果出现错误,字段将被设置为null

maxwell-json.timestamp-format.standard

optional

'SQL'

String

声明输入和输出的时间戳格式。当前支持的格式为'SQL' 以及 'ISO-8601'

  • 可选参数 'SQL' 将会以 "yyyy-MM-dd HH:mm:ss.s{precision}" 的格式解析时间戳,,例如 '2020-12-30 12:13:14.123',且会以相同的格式输出。
  • 可选参数 'ISO-8601' 将会以 "yyyy-MM-ddTHH:mm:ss.s{precision}" 的格式解析输入时间戳,,,如 '2020-12-30T12:13:14.123' ,且会以相同的格式输出。

maxwell-json.map-null-key.mode

optional

'FAIL'

String

指定处理 Map 中 key 值为空的方法。。当前支持的值有 'FAIL', 'DROP''LITERAL':

  • Option 'FAIL' 将抛出异常,如果遇到 Map 中 key 值为空的数据。
  • Option 'DROP' 将丢弃 Map 中 key 值为空的数据项。
  • Option 'LITERAL' 将使用字符串常量来替换 Map 中的空 key 值。字符串常量的值由 'maxwell-json.map-null-key.literal' 定义。

maxwell-json.map-null-key.literal

optional

'null'

String

'maxwell-json.map-null-key.mode' 是 LITERAL 的时候,指定字符串常量替换 Map 中的空 key 值。

maxwell-json.encode.decimal-as-plain-number

optional

false

Boolean

将所有 DECIMAL 类型的数据保持原状,不使用科学计数法表示。例:0.000000027 默认会表示为 2.7E-8。当此选项设为 true 时,则会表示为 0.000000027

数据类型映射

目前,Maxwell-JSON 使用 JSON Format 进行序列化和反序列化。
下表列出了 Flink 中的数据类型与 JSON 中的数据类型的映射关系。

Flink SQL 数据类型

JSON 数据类型

CHAR / VARCHAR / STRING

string

BOOLEAN

boolean

BINARY / VARBINARY

string with encoding: base64

DECIMAL

number

TINYINT

number

SMALLINT

number

INT

number

BIGINT

number

FLOAT

number

DOUBLE

number

DATE

string with format: date

TIME

string with format: time

TIMESTAMP

string with format: date-time

TIMESTAMP_WITH_LOCAL_TIME_ZONE

string with format: date-time (with UTC time zone)

INTERVAL

number

ARRAY

array

MAP / MULTISET

object

ROW

object

注意事项

  • 重复的更改事件。
    在正常的操作环境下,Maxwell 应用能以 exactly-once 的语义投递每条变更事件。在这种情况下,Flink 消费 Maxwell 产生的变更事件能够工作得很好。如果 Maxwell 应用以 at-least-once 的语义投递每条变更事件, 它可能会向 Kafka 传递重复的变更事件,而 Flink 将获得重复的事件。这可能会导致 Flink 查询得到错误结果或意外异常。因此,建议在这种情况下,将作业参数 table.exec.source.cdc-events-duplicate 设置成 true,并在该 source 上定义 PRIMARY KEY。框架会生成一个额外的有状态算子,使用该 primary key 来对变更事件去重并生成一个规范化的 changelog 流。