Apache Avro format 允许基于 Avro schema 读取和写入 Avro 数据。目前,Avro schema 从 table schema 推导而来。
这是使用 Kafka 连接器和 Avro format 创建表的示例。
CREATE TABLE user_behavior ( user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING, ts TIMESTAMP(3) ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'format' = 'avro' )
参数 | 是否必选 | 默认值 | 类型 | 描述 |
---|---|---|---|---|
format | 必要 | (none) | String | 指定使用什么 format,这里应该是 |
avro.codec | 可选 | (none) | String | avro 压缩编解码器,仅用于 filesystem。默认 snappy 压缩。目前支持:null, deflate、snappy、bzip2、xz。 |
目前,Avro schema 通常是从 table schema 中推导而来。尚不支持显式定义 Avro schema。因此,下表列出了从 Flink 类型到 Avro 类型的类型映射。
Flink SQL 数据类型 | Avro 数据类型 | Avro 逻辑类型 |
---|---|---|
CHAR / VARCHAR / STRING | string | |
|
| |
|
| |
|
|
|
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
|
|
|
|
|
|
|
|
|
| |
|
| |
|
| |
|
|
除了上面列出的类型,Flink 支持读取/写入 nullable 的类型。Flink 将 nullable 的类型映射到 Avro union(something, null)
,其中 something
是从 Flink 类型转换的 Avro 类型。