You need to enable JavaScript to run this app.
导航
Ogg-JSON
最近更新时间:2024.08.20 17:38:32首次发布时间:2024.08.20 17:38:32

Oracle GoldenGate(又名 ogg)是一个用于实现异构 IT 环境间数据实时集成和复制的综合性软件包。该产品集合支持高可用性解决方案、实时数据集成、事务更改数据捕获、运营和分析企业系统之间的数据复制、转换及验证。Ogg 为变更日志提供了统一的格式结构,并支持使用 JSON 对消息进行序列化。
Flink 支持把 Ogg 的 JSON 消息解析为 INSERT/UPDATE/DELETE 消息并导入到 Flink SQL 系统中。在许多情况下,利用这一特性是非常有用的,例如:

  • 将增量数据从数据库同步到其他系统;
  • 进行日志审计;
  • 构建数据库的实时物化视图;
  • 关联维度数据库的变更历史等等。

Flink 也支持将 Flink SQL 中的 INSERT/UPDATE/DELETE 消息编码为 Ogg JSON 格式的消息,并输出到 Kafka 等存储中。但需要注意的是,目前 Flink 尚不支持将 UPDATE_BEFORE 和 UPDATE_AFTER 合并为一条 UPDATE 消息。因此,Flink 会将 UPDATE_BEFORE 和 UPDATE_AFTER 分别编码为 DELETE 和 INSERT 类型的 Ogg 消息。

如何使用 Ogg-JSON

Ogg 为变更日志提供了统一的格式,这是一个 JSON 格式的从 Oracle PRODUCTS 表捕获的更新操作的简单示例:

{
  "before": {
    "id": 111,
    "name": "scooter",
    "description": "Big 2-wheel scooter",
    "weight": 5.18
  },
  "after": {
    "id": 111,
    "name": "scooter",
    "description": "Big 2-wheel scooter",
    "weight": 5.15
  },
  "op_type": "U",
  "op_ts": "2020-05-13 15:40:06.000000",
  "current_ts": "2020-05-13 15:40:07.000000",
  "primary_keys": [
    "id"
  ],
  "pos": "00000000000000000000143",
  "table": "PRODUCTS"
}

Oracle PRODUCTS 表有 4 列 (id, name, description and weight)。上面的 JSON 消息是 PRODUCTS 表上的一条更新事件,其中 id = 111 的行的 weight 值从 5.18 更改为 5.15。假设此消息已同步到 Kafka 的 Topic products_ogg, 则可以使用以下 DDL 来使用该 Topic 并解析更新事件。

CREATE TABLE topic_products (
  -- schema is totally the same to the Oracle "products" table
  id BIGINT,
  name STRING,
  description STRING,
  weight DECIMAL(10, 2)
) WITH (
  'connector' = 'kafka',
  'topic' = 'products_ogg',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'format' = 'ogg-json'
)

再将 Kafka Topic 注册为 Flink 表之后,可以将 OGG 消息变为变更日志源。

-- a real-time materialized view on the Oracle "PRODUCTS"
-- which calculate the latest average of weight for the same products
SELECT name, AVG(weight)
FROM topic_products
GROUP BY name;

-- synchronize all the data and incremental changes of Oracle "PRODUCTS" table to
-- Elasticsearch "products" index for future searching
INSERT INTO elasticsearch_products
SELECT *
FROM topic_products;

可用元数据 (Available Metadata)

以下格式元数据可在表定义中作为只读(VIRTUAL) 列公开。
注意:格式元数据字段只有在相应的连接器支持相应的格式元数据时才可用。目前,只有 Kafka 连接器能够公开格式元数据字段。

Key

数据类型

描述

table

STRING NULL

包含完全限定的表名。完全限定表名的格式为:CATALOG NAME.SCHEMA NAME.TABLE NAME

primary-keys

ARRAY<STRING> NULL

保存源表主键的列名的数组变量。仅当 includePrimaryKeys 配置属性设置为 true 时,主键字段才会包含在 JSON 输出中。

ingestion-timestamp

TIMESTAMP_LTZ(6) NULL

连接器处理事件的时间戳。与 Ogg 记录中的 current_ts 字段相对应。

event-timestamp

TIMESTAMP_LTZ(6) NULL

源系统创建事件的时间戳。与 Ogg 记录中的 op_ts 字段相对应。

以下示例展示了如何访问 Kafka 中的 Ogg 元数据字段:

CREATE TABLE KafkaTable (
  origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
  event_time TIMESTAMP(3) METADATA FROM 'value.event-timestamp' VIRTUAL,
  origin_table STRING METADATA FROM 'value.table' VIRTUAL,
  primary_keys ARRAY<STRING> METADATA FROM 'value.primary_keys' VIRTUAL,
  user_id BIGINT,
  item_id BIGINT,
  behavior STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'value.format' = 'ogg-json'
);

Format 参数

参数

是否必选

默认

类型

描述

format

必填

(none)

String

指定要使用的格式,此处应为 'ogg-json'.

ogg-json.ignore-parse-errors

选填

false

Boolean

当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为 null。

ogg-json.timestamp-format.standard

可选

'SQL'

String

声明输入和输出的时间戳格式。当前支持的格式为'SQL' 以及 'ISO-8601'

  • 可选参数 'SQL' 将会以 "yyyy-MM-dd HH:mm:ss.s{precision}" 的格式解析时间戳, 例如 '2020-12-30 12:13:14.123',且会以相同的格式输出。
  • 可选参数 'ISO-8601' 将会以 "yyyy-MM-ddTHH:mm:ss.s{precision}" 的格式解析输入时间戳, 例如 '2020-12-30T12:13:14.123' ,且会以相同的格式输出。

ogg-json.map-null-key.mode

选填

'FAIL'

String

指定处理 Map 中 key 值为空的方法。 当前支持的值有 'FAIL', 'DROP''LITERAL':

  • Option 'FAIL' 将抛出异常。
  • Option 'DROP' 将丢弃 Map 中 key 值为空的数据项。
  • Option 'LITERAL' 将使用字符串常量来替换 Map 中的空 key 值。字符串常量的值由 ogg-json.map-null-key.literal 定义。

ogg-json.map-null-key.literal

选填

'null'

String

'ogg-json.map-null-key.mode' 是 LITERAL 的时候,指定字符串常量替换 Map 中的空 key 值。

数据类型映射

目前, Ogg format 使用 JSON format 进行序列化和反序列化。
下表列出了 Flink 中的数据类型与 JSON 中的数据类型的映射关系。

Flink SQL 数据类型

JSON 数据类型

CHAR / VARCHAR / STRING

string

BOOLEAN

boolean

BINARY / VARBINARY

string with encoding: base64

DECIMAL

number

TINYINT

number

SMALLINT

number

INT

number

BIGINT

number

FLOAT

number

DOUBLE

number

DATE

string with format: date

TIME

string with format: time

TIMESTAMP

string with format: date-time

TIMESTAMP_WITH_LOCAL_TIME_ZONE

string with format: date-time (with UTC time zone)

INTERVAL

number

ARRAY

array

MAP / MULTISET

object

ROW

object