Oracle GoldenGate(又名 ogg)是一个用于实现异构 IT 环境间数据实时集成和复制的综合性软件包。该产品集合支持高可用性解决方案、实时数据集成、事务更改数据捕获、运营和分析企业系统之间的数据复制、转换及验证。Ogg 为变更日志提供了统一的格式结构,并支持使用 JSON 对消息进行序列化。
Flink 支持把 Ogg 的 JSON 消息解析为 INSERT/UPDATE/DELETE 消息并导入到 Flink SQL 系统中。在许多情况下,利用这一特性是非常有用的,例如:
Flink 也支持将 Flink SQL 中的 INSERT/UPDATE/DELETE 消息编码为 Ogg JSON 格式的消息,并输出到 Kafka 等存储中。但需要注意的是,目前 Flink 尚不支持将 UPDATE_BEFORE 和 UPDATE_AFTER 合并为一条 UPDATE 消息。因此,Flink 会将 UPDATE_BEFORE 和 UPDATE_AFTER 分别编码为 DELETE 和 INSERT 类型的 Ogg 消息。
Ogg 为变更日志提供了统一的格式,这是一个 JSON 格式的从 Oracle PRODUCTS
表捕获的更新操作的简单示例:
{ "before": { "id": 111, "name": "scooter", "description": "Big 2-wheel scooter", "weight": 5.18 }, "after": { "id": 111, "name": "scooter", "description": "Big 2-wheel scooter", "weight": 5.15 }, "op_type": "U", "op_ts": "2020-05-13 15:40:06.000000", "current_ts": "2020-05-13 15:40:07.000000", "primary_keys": [ "id" ], "pos": "00000000000000000000143", "table": "PRODUCTS" }
Oracle PRODUCTS
表有 4 列 (id
, name
, description
and weight
)。上面的 JSON 消息是 PRODUCTS
表上的一条更新事件,其中 id = 111
的行的 weight
值从 5.18
更改为 5.15
。假设此消息已同步到 Kafka 的 Topic products_ogg
, 则可以使用以下 DDL 来使用该 Topic 并解析更新事件。
CREATE TABLE topic_products ( -- schema is totally the same to the Oracle "products" table id BIGINT, name STRING, description STRING, weight DECIMAL(10, 2) ) WITH ( 'connector' = 'kafka', 'topic' = 'products_ogg', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'format' = 'ogg-json' )
再将 Kafka Topic 注册为 Flink 表之后,可以将 OGG 消息变为变更日志源。
-- a real-time materialized view on the Oracle "PRODUCTS" -- which calculate the latest average of weight for the same products SELECT name, AVG(weight) FROM topic_products GROUP BY name; -- synchronize all the data and incremental changes of Oracle "PRODUCTS" table to -- Elasticsearch "products" index for future searching INSERT INTO elasticsearch_products SELECT * FROM topic_products;
以下格式元数据可在表定义中作为只读(VIRTUAL
) 列公开。
注意:格式元数据字段只有在相应的连接器支持相应的格式元数据时才可用。目前,只有 Kafka 连接器能够公开格式元数据字段。
Key | 数据类型 | 描述 |
---|---|---|
|
| 包含完全限定的表名。完全限定表名的格式为:CATALOG NAME.SCHEMA NAME.TABLE NAME |
|
| 保存源表主键的列名的数组变量。仅当 |
|
| 连接器处理事件的时间戳。与 Ogg 记录中的 |
|
| 源系统创建事件的时间戳。与 Ogg 记录中的 |
以下示例展示了如何访问 Kafka 中的 Ogg 元数据字段:
CREATE TABLE KafkaTable ( origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL, event_time TIMESTAMP(3) METADATA FROM 'value.event-timestamp' VIRTUAL, origin_table STRING METADATA FROM 'value.table' VIRTUAL, primary_keys ARRAY<STRING> METADATA FROM 'value.primary_keys' VIRTUAL, user_id BIGINT, item_id BIGINT, behavior STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'earliest-offset', 'value.format' = 'ogg-json' );
参数 | 是否必选 | 默认 | 类型 | 描述 |
---|---|---|---|---|
format | 必填 | (none) | String | 指定要使用的格式,此处应为 |
ogg-json.ignore-parse-errors | 选填 | false | Boolean | 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为 null。 |
ogg-json.timestamp-format.standard | 可选 |
| String | 声明输入和输出的时间戳格式。当前支持的格式为
|
ogg-json.map-null-key.mode | 选填 |
| String | 指定处理 Map 中 key 值为空的方法。 当前支持的值有
|
ogg-json.map-null-key.literal | 选填 | 'null' | String | 当 |
目前, Ogg format 使用 JSON format 进行序列化和反序列化。
下表列出了 Flink 中的数据类型与 JSON 中的数据类型的映射关系。
Flink SQL 数据类型 | JSON 数据类型 |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|