Google Protocol Buffer (简称 Protobuf) 是 Google 公司内部的混合语言数据标准,Protobuf 主要用于 RPC 系统和持续数据存储系统。Protobuf 格式可以读写基于 Protobuf 生成类的 Protobuf 数据。
以下是使用 Kafka 连接器和 Protobuf 格式创建表的步骤。
火山 Flink 基于 jdk8 环境运行,为了保证编译的 pb class 能在 Flink 正常运行,推荐本地使用 jdk8 环境来编译。
由于 Flink 1.16 & 1.17 内置的 pb format 版本是 3.21,因此推荐下载 3.21.2 版本的 protoc,以保证兼容性,下载地址:Link
解压后,在 bin 目录下找到 protoc 二进制可执行文件。
proto 定义文件以 SimpleTest.proto 文件为例:
以下是 proto 定义文件:
syntax = "proto2"; package com.example; option java_package = "com.example"; option java_multiple_files = true; message SimpleTest { optional int64 uid = 1; optional string name = 2; optional int32 category_type = 3; optional bytes content = 4; optional double price = 5; map<int64, InnerMessageTest> value_map = 6; repeated InnerMessageTest value_arr = 7; optional Corpus corpus_int = 8; optional Corpus corpus_str = 9; message InnerMessageTest{ optional int64 v1 =1; optional int32 v2 =2; } enum Corpus { UNIVERSAL = 0; WEB = 1; IMAGES = 2; LOCAL = 3; NEWS = 4; PRODUCTS = 5; VIDEO = 7; } }
执行生成 Java 文件的命令:
./protoc --java_out=src/main/java SimpleTest.proto
采用 maven 的方式,把生成的 Java 文件编译并打包为 Jar 包,pom.xml 如下:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.example</groupId> <artifactId>pb-demo</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <!-- 引入 protobuf-java 3.21.2,和 Flink 内置的 protobuf 版本保持一致 --> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.21.2</version> <scope>provided</scope> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-shade-plugin</artifactId> <executions> <execution> <id>shade-flink</id> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <shadeTestJar>false</shadeTestJar> <artifactSet> <includes> <include>*:*</include> </includes> </artifactSet> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
执行打包命令:
mvn clean package
在 target 目录下,找到最终编译好的 Jar 包。
新建 Flink SQL 1.16 & 1.17 作业,并使用如下 SQL 模版:
CREATE TABLE datagen_source ( uid BIGINT, name STRING, category_type INT, content BINARY, price DOUBLE, value_map map<BIGINT, row<v1 BIGINT, v2 INT>>, value_arr array<row<v1 BIGINT, v2 INT>>, corpus_int INT, corpus_str STRING ) WITH ( 'connector' = 'datagen' ); CREATE TABLE kafka_sink ( uid BIGINT, name STRING, category_type INT, content BINARY, price DOUBLE, value_map map<BIGINT, row<v1 BIGINT, v2 INT>>, value_arr array<row<v1 BIGINT, v2 INT>>, corpus_int INT, corpus_str STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'pb-test', 'properties.bootstrap.servers' = 'kafka-xxx.kafka.ivolces.com:9092', 'properties.group.id' = 'testGroup', 'format' = 'protobuf', 'protobuf.message-class-name' = 'org.example.SimpleTest', 'protobuf.ignore-parse-errors' = 'true' ); INSERT INTO kafka_sink SELECT * FROM datagen_source;
CREATE TABLE kafka_source ( uid BIGINT, name STRING, category_type INT, content BINARY, price DOUBLE, value_map map<BIGINT, row<v1 BIGINT, v2 INT>>, value_arr array<row<v1 BIGINT, v2 INT>>, corpus_int INT, corpus_str STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'pb-test', 'properties.bootstrap.servers' = 'kafka-xxx.kafka.ivolces.com:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'earliest-offset', 'format' = 'protobuf', 'protobuf.message-class-name' = 'com.example.SimpleTest', 'protobuf.ignore-parse-errors' = 'true' ); CREATE TABLE print_sink ( uid BIGINT, name STRING, category_type INT, content BINARY, price DOUBLE, value_map map<BIGINT, row<v1 BIGINT, v2 INT>>, value_arr array<row<v1 BIGINT, v2 INT>>, corpus_int INT, corpus_str STRING ) WITH ( 'connector' = 'print' ); INSERT INTO print_sink SELECT * FROM kafka_source;
通过文件依赖的方式,把上述生成的 Jar 包先上传到 Flink 平台,再添加该文件依赖到 Flink 作业里。
文件上传参见:https://www.volcengine.com/docs/6581/152196
添加文件依赖的方式如下:
参数 | 是否必选 | 向前兼容 | 默认值 | 类型 | 描述 |
|---|---|---|---|---|---|
format | 必选 | no | (none) | String | 声明使用的格式,这里应为 |
protobuf.message-class-name | 必选 | no | (none) | String | Protobuf 生成类的全名。该名称必须与 proto 定义文件中的消息名称一致。内层类名称支持 |
protobuf.ignore-parse-errors | 可选 | no | false | Boolean | 可选标记,用于跳过出现解析错误的记录,而不是失败。 |
protobuf.read-default-values | 可选 | yes | false | Boolean | 仅当生成的类的版本是 proto2 时,此选项才有效。如果该值设置为 true,则格式将读取空值作为 proto 文件中定义的默认值。如果该值设置为 false,则当二进制 protobuf 消息中不存在该数据元素时,该格式将生成空值。如果 proto 语法是 proto3,这个值会被强制设置为 true,因为 proto3 的标准是使用默认值。 |
protobuf.write-null-string-literal | 可选 | no | "" | String | 当序列化为 protobuf 数据时,这是一个可选配置,用于在出现空值时在 Protobuf 的 array/map 中指定字符串字面量。 |
下表列出了从 Flink 类型到 Protobuf 类型的映射。
Flink SQL 数据类型 | Protobuf 数据类型 | 描述 |
|---|---|---|
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| 元素不能为空,可通过 |
|
| 键或值不能为 null,字符串默认值可以通过 |
|
| |
|
| protobuf 的枚举值可以相应地映射到字符串或 flink 行数。 |
不能为空。
由于 protobuf 不允许在映射和数组中使用空值,因此我们需要在从 Flink Rows 转换到 Protobuf 时自动生成默认值。
Protobuf 数据类型 | 默认值 |
|---|---|
int32 / int64 / float / double | 0 |
string | "" |
bool | false |
enum | first element of enum |
binary | ByteString.EMPTY |
message | MESSAGE.getDefaultInstance() |
OneOf 类型字段。
"oneof" 是 protobuf 中的一种特殊字段类型,它允许您将消息定义为只能有一个字段是非零值。例如,在下面的消息定义中,一个 Person 只能有一种类型(email 或 phone):
message Person { oneof contact_info { string email = 1; string phone = 2; } }
在序列化过程中,无法保证同一 oneof 组中的 Flink 字段最多只包含一个有效值。在序列化时,每个字段都是按照 Flink 架构的顺序设置的,因此在同一 oneof 组中,位置较高的字段会优先于位置较低的字段。