JSON Format 能读写 JSON 格式的数据。当前,JSON schema 是从 table schema 中自动推导而得的。
以下是一个利用 Kafka 以及 JSON Format 构建表的例子。
CREATE TABLE user_behavior ( user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING, ts TIMESTAMP(3) ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true' )
参数 | 是否必须 | 默认值 | 类型 | 描述 |
---|---|---|---|---|
format | 必选 | (none) | String | 声明使用的格式,这里应为 |
json.fail-on-missing-field | 可选 | false | Boolean | 当解析字段缺失时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。 |
json.ignore-parse-errors | 可选 | false | Boolean | 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为 |
json.timestamp-format.standard | 可选 |
| String | 声明输入和输出的
|
json.map-null-key.mode
| 选填 |
| String | 指定处理 Map 中 key 值为空的方法。。当前支持的值有
|
json.map-null-key.literal
| 选填 | 'null' | String | 当 |
json.encode.decimal-as-plain-number
| 选填 | false | Boolean | 将所有 DECIMAL 类型的数据保持原状,不使用科学计数法表示。例: |
当前,JSON schema 将会自动从 table schema 之中自动推导得到。不支持显式地定义 JSON schema。
在 Flink 中,JSON Format 使用 jackson databind API 去解析和生成 JSON。
下表列出了 Flink 中的数据类型与 JSON 中的数据类型的映射关系。
Flink SQL 数据类型 | JSON 数据类型 |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
create table
中声明的列名(column name)和列类型(column type)进行解析。当前的 SQL 任务仅实现了部分的自动适配。例如,声明为 int
类型,但实际在 JSON 中是 int
的字符串表示形式,SQL 能够自动识别并转换回 int
类型。然而,如果声明为 int
但实际为 long
类型,或者声明为 int
但实际是带有字母的字符串等情况,SQL 无法直接转换,会报错。{"a": "a", "b": {"c": 1, "d": "s"}}
,在声明嵌套结构时,需要声明为 b Row<c int, d varchar>
,这样就表明字段 c
、d
是嵌套于字段 b
中的。JSON 也支持数组形式。
"a": [1,2,3,4]
,则可以声明为 a Array
,这意味着 a
是一个 int
类型的数组。需要注意的是,在进行 select
操作时,下标是从 1 开始的。例如 "a": [6,7,8,9]
,执行 select a[1]
会返回 6 。"a": [{"b": 3}, {"b": 6}]
,则可以声明为 a Array<Row>
,这表明 a
是一个对象类型的数组,a
的结构中,有一个类型为 int
的 b
列。获取数据的方式如 select a[1].b
会返回 3 。Invalid UTF-8 start byte
。我们内部的实现使用的是 Jackson,在反序列化方面仅支持 UTF-8、UTF-16 以及 UTF-32 这几种编码方式,若使用其他编码方式就会报上述错误。同时,打出的异常消息会存在乱码(以 UTF-8 方式转换成字符串打出)。
如果能够忽略此类消息,可以配置 json.ignore-parse-errors
来忽略掉这类消息;如果必须要解析此类消息,可以指定format
格式为 bytes
,自行编写 UDF(用户自定义函数)进行解析。