Google Protocol Buffer( 简称 Protobuf)是 Google 公司内部的混合语言数据标准,Protobuf 主要用于 RPC 系统和持续数据存储系统。Protobuf 格式可以读写基于 Protobuf 生成类的 Protobuf 数据。
以下是使用 Kafka 连接器和 Protobuf 格式创建表的示例。
以下是 proto 定义文件:
syntax = "proto2"; package com.example; option java_package = "com.example"; option java_multiple_files = true; message SimpleTest { optional int64 uid = 1; optional string name = 2; optional int32 category_type = 3; optional bytes content = 4; optional double price = 5; map<int64, InnerMessageTest> value_map = 6; repeated InnerMessageTest value_arr = 7; optional Corpus corpus_int = 8; optional Corpus corpus_str = 9; message InnerMessageTest{ optional int64 v1 =1; optional int32 v2 =2; } enum Corpus { UNIVERSAL = 0; WEB = 1; IMAGES = 2; LOCAL = 3; NEWS = 4; PRODUCTS = 5; VIDEO = 7; } }
protoc
命令将.proto
文件编译成 Java 类。jar
包,例如,在 sql-client 中使用 -j
将其传入。CREATE TABLE simple_test ( uid BIGINT, name STRING, category_type INT, content BINARY, price DOUBLE, value_map map<BIGINT, row<v1 BIGINT, v2 INT>>, value_arr array<row<v1 BIGINT, v2 INT>>, corpus_int INT, corpus_str STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'format' = 'protobuf', 'protobuf.message-class-name' = 'com.example.SimpleTest', 'protobuf.ignore-parse-errors' = 'true' )
参数 | 是否必选 | 向前兼容 | 默认值 | 类型 | 描述 |
---|---|---|---|---|---|
format | 必选 | no | (none) | String | 声明使用的格式,这里应为 |
protobuf.message-class-name | 必选 | no | (none) | String | Protobuf 生成类的全名。该名称必须与 proto 定义文件中的消息名称一致。内层类名称支持 |
protobuf.ignore-parse-errors | 可选 | no | false | Boolean | 可选标记,用于跳过出现解析错误的记录,而不是失败。 |
protobuf.read-default-values | 可选 | yes | false | Boolean | 仅当生成的类的版本是 proto2 时,此选项才有效。如果该值设置为 true,则格式将读取空值作为 proto 文件中定义的默认值。如果该值设置为 false,则当二进制 protobuf 消息中不存在该数据元素时,该格式将生成空值。如果 proto 语法是 proto3,这个值会被强制设置为 true,因为 proto3 的标准是使用默认值。 |
protobuf.write-null-string-literal | 可选 | no | "" | String | 当序列化为 protobuf 数据时,这是一个可选配置,用于在出现空值时在 Protobuf 的 array/map 中指定字符串字面量。 |
下表列出了从 Flink 类型到 Protobuf 类型的映射。
Flink SQL 数据类型 | Protobuf 数据类型 | 描述 |
---|---|---|
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| 元素不能为空,可通过 |
|
| 键或值不能为 null,字符串默认值可以通过 |
|
| |
|
| protobuf 的枚举值可以相应地映射到字符串或 flink 行数。 |
不能为空。
由于 protobuf 不允许在映射和数组中使用空值,因此我们需要在从 Flink Rows 转换到 Protobuf 时自动生成默认值。
Protobuf 数据类型 | 默认值 |
---|---|
int32 / int64 / float / double | 0 |
string | "" |
bool | false |
enum | first element of enum |
binary | ByteString.EMPTY |
message | MESSAGE.getDefaultInstance() |
OneOf 类型字段。
"oneof" 是 protobuf 中的一种特殊字段类型,它允许您将消息定义为只能有一个字段是非零值。例如,在下面的消息定义中,一个 Person 只能有一种类型(email 或 phone):
message Person { oneof contact_info { string email = 1; string phone = 2; } }
在序列化过程中,无法保证同一 oneof 组中的 Flink 字段最多只包含一个有效值。在序列化时,每个字段都是按照 Flink 架构的顺序设置的,因此在同一 oneof 组中,位置较高的字段会优先于位置较低的字段。