CSV Format 允许我们基于 CSV schema 进行解析和生成 CSV 数据。目前 CSV schema 是基于 table schema 推断而来的。
以下是一个使用 Kafka 连接器和 CSV 格式创建表的示例。
CREATE TABLE user_behavior ( user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING, ts TIMESTAMP(3) ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'format' = 'csv', 'csv.ignore-parse-errors' = 'true', 'csv.allow-comments' = 'true' )
参数 | 是否必选 | 默认值 | 类型 | 描述 |
---|---|---|---|---|
format | 必选 | (none) | String | 指定要使用的格式,这里应该是 |
csv.field-delimiter | 可选 |
| String | 字段分隔符 (默认 |
csv.disable-quote-character | 可选 | false | Boolean | 是否禁止对引用的值使用引号 (默认是 false). 如果禁止,选项 |
csv.quote-character | 可选 |
| String | 用于围住字段值的引号字符 (默认 |
csv.allow-comments | 可选 | false | Boolean | 是否允许忽略注释行(默认不允许),注释行以 |
csv.ignore-parse-errors | 可选 | false | Boolean | 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为 |
csv.array-element-delimiter | 可选 |
| String | 分隔数组和行元素的字符串(默认 |
csv.escape-character | 可选 | (none) | String | 转义字符(默认关闭)。 |
csv.null-literal | 可选 | (none) | String | 是否将 "null" 字符串转化为 null 值。 |
csv.write-bigdecimal-in-scientific-notation | 可选 | true | Boolean | 设置将 Bigdecimal 类型的数据表示为科学计数法(默认为true,即需要转为科学计数法),例如一个BigDecimal的值为100000,设置true,结果为 '1E+5';设置为false,结果为 100000。注意:只有当值不等于0且是10的倍数才会转为科学计数法。
|
目前 CSV 的 schema 都是从 table schema 推断而来的。显式地定义 CSV schema 暂不支持。 Flink 的 CSV Format 数据使用 jackson databind API 去解析 CSV 字符串。
下面的表格列出了 flink 数据和 CSV 数据的对应关系。
Flink SQL 数据类型 | CSV 数据类型 |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|