You need to enable JavaScript to run this app.
导航
JSON
最近更新时间:2024.08.20 17:38:32首次发布时间:2024.08.20 17:38:32

JSON Format 能读写 JSON 格式的数据。当前,JSON schema 是从 table schema 中自动推导而得的。

如何创建一张基于 JSON Format 的表

以下是一个利用 Kafka 以及 JSON Format 构建表的例子。

CREATE TABLE user_behavior (
  user_id BIGINT,
  item_id BIGINT,
  category_id BIGINT,
  behavior STRING,
  ts TIMESTAMP(3)
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'json',
 'json.fail-on-missing-field' = 'false',
 'json.ignore-parse-errors' = 'true'
)

Format 参数

参数

是否必须

默认值

类型

描述

format

必选

(none)

String

声明使用的格式,这里应为'json'

json.fail-on-missing-field

可选

false

Boolean

当解析字段缺失时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。

json.ignore-parse-errors

可选

false

Boolean

当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null

json.timestamp-format.standard

可选

'SQL'

String

声明输入和输出的 TIMESTAMPTIMESTAMP_LTZ 的格式。当前支持的格式为'SQL' 以及 'ISO-8601'

  • 可选参数 'SQL' 将会以 "yyyy-MM-dd HH:mm:ss.s{precision}" 的格式解析 TIMESTAMP, 例如 "2020-12-30 12:13:14.123",以 "yyyy-MM-dd HH:mm:ss.s{precision}'Z'" 的格式解析 TIMESTAMP_LTZ, 例如 "2020-12-30 12:13:14.123Z" 且会以相同的格式输出。
  • 可选参数 'ISO-8601' 将会以 "yyyy-MM-ddTHH:mm:ss.s{precision}" 的格式解析输入 TIMESTAMP, 例如 "2020-12-30T12:13:14.123" ,以 "yyyy-MM-ddTHH:mm:ss.s{precision}'Z'" 的格式解析 TIMESTAMP_LTZ, 例如 "2020-12-30T12:13:14.123Z" 且会以相同的格式输出。

json.map-null-key.mode

注意:仅适用于 Flink1.16 版本

选填

'FAIL'

String

指定处理 Map 中 key 值为空的方法。。当前支持的值有 'FAIL', 'DROP''LITERAL':

  • Option 'FAIL' 将抛出异常,如果遇到 Map 中 key 值为空的数据。
  • Option 'DROP' 将丢弃 Map 中 key 值为空的数据项。
  • Option 'LITERAL' 将使用字符串常量来替换 Map 中的空 key 值。字符串常量的值由 'json.map-null-key.literal' 定义。

json.map-null-key.literal

注意:仅适用于 Flink1.16 版本

选填

'null'

String

'json.map-null-key.mode' 是 LITERAL 的时候,指定字符串常量替换 Map 中的空 key 值。

json.encode.decimal-as-plain-number

注意:仅适用于 Flink1.16 版本

选填

false

Boolean

将所有 DECIMAL 类型的数据保持原状,不使用科学计数法表示。例:0.000000027 默认会表示为 2.7E-8。当此选项设为 true 时,则会表示为 0.000000027

数据类型映射关系

当前,JSON schema 将会自动从 table schema 之中自动推导得到。不支持显式地定义 JSON schema。
在 Flink 中,JSON Format 使用 jackson databind API 去解析和生成 JSON。
下表列出了 Flink 中的数据类型与 JSON 中的数据类型的映射关系。

Flink SQL 数据类型

JSON 数据类型

CHAR / VARCHAR / STRING

string

BOOLEAN

boolean

BINARY / VARBINARY

string with encoding: base64

DECIMAL

number

TINYINT

number

SMALLINT

number

INT

number

BIGINT

number

FLOAT

number

DOUBLE

number

DATE

string with format: date

TIME

string with format: time

TIMESTAMP

string with format: date-time

TIMESTAMP_WITH_LOCAL_TIME_ZONE

string with format: date-time (with UTC time zone)

INTERVAL

number

ARRAY

array

MAP / MULTISET

object

ROW

object

注意事项

  • JSON 类型会依据用户在 create table 中声明的列名(column name)和列类型(column type)进行解析。当前的 SQL 任务仅实现了部分的自动适配。例如,声明为 int 类型,但实际在 JSON 中是 int 的字符串表示形式,SQL 能够自动识别并转换回 int 类型。然而,如果声明为 int 但实际为 long 类型,或者声明为 int 但实际是带有字母的字符串等情况,SQL 无法直接转换,会报错。
  • JSON 类型支持嵌套。例如,{"a": "a", "b": {"c": 1, "d": "s"}} ,在声明嵌套结构时,需要声明为 b Row<c int, d varchar> ,这样就表明字段 cd 是嵌套于字段 b 中的。JSON 也支持数组形式。
    • 如果是简单的数组,例如 "a": [1,2,3,4] ,则可以声明为 a Array ,这意味着 a 是一个 int 类型的数组。需要注意的是,在进行 select 操作时,下标是从 1 开始的。例如 "a": [6,7,8,9] ,执行 select a[1] 会返回 6 。
    • 如果是嵌套 JSON 的数组,例如 "a": [{"b": 3}, {"b": 6}] ,则可以声明为 a Array<Row> ,这表明 a 是一个对象类型的数组,a 的结构中,有一个类型为 intb 列。获取数据的方式如 select a[1].b 会返回 3 。

常见问题

  • 反序列化时出现报错 Invalid UTF-8 start byte

我们内部的实现使用的是 Jackson,在反序列化方面仅支持 UTF-8、UTF-16 以及 UTF-32 这几种编码方式,若使用其他编码方式就会报上述错误。同时,打出的异常消息会存在乱码(以 UTF-8 方式转换成字符串打出)。
如果能够忽略此类消息,可以配置 json.ignore-parse-errors来忽略掉这类消息;如果必须要解析此类消息,可以指定format格式为 bytes,自行编写 UDF(用户自定义函数)进行解析。