You need to enable JavaScript to run this app.
导航
Protobuf
最近更新时间:2025.08.25 14:34:13首次发布时间:2024.08.20 17:38:32
复制全文
我的收藏
有用
有用
无用
无用

Google Protocol Buffer (简称 Protobuf) 是 Google 公司内部的混合语言数据标准,Protobuf 主要用于 RPC 系统和持续数据存储系统。Protobuf 格式可以读写基于 Protobuf 生成类的 Protobuf 数据。

使用限制

  • Protobuf 格式目前仅支持在 Flink 1.16-volcano 及以上引擎版本中使用 。

使用步骤

以下是使用 Kafka 连接器和 Protobuf 格式创建表的步骤。

本地安装 jdk8 环境

火山 Flink 基于 jdk8 环境运行,为了保证编译的 pb class 能在 Flink 正常运行,推荐本地使用 jdk8 环境来编译。

下载 3.21.2 版本的 protoc

由于 Flink 1.16 & 1.17 内置的 pb format 版本是 3.21,因此推荐下载 3.21.2 版本的 protoc,以保证兼容性,下载地址:Link
解压后,在 bin 目录下找到 protoc 二进制可执行文件。

使用 protoc 把 proto 文件生成 Java 文件

proto 定义文件以 SimpleTest.proto 文件为例:
以下是 proto 定义文件:

syntax = "proto2";
package com.example;
option java_package = "com.example";
option java_multiple_files = true;

message SimpleTest {
    optional int64 uid = 1;
    optional string name = 2;
    optional int32 category_type = 3;
    optional bytes content = 4;
    optional double price = 5;
    map<int64, InnerMessageTest> value_map = 6;
    repeated  InnerMessageTest value_arr = 7;
    optional Corpus corpus_int = 8; 
    optional Corpus corpus_str = 9; 
    
    message InnerMessageTest{
          optional int64 v1 =1;
          optional int32 v2 =2;
    }
    
    enum Corpus {
        UNIVERSAL = 0;
        WEB = 1;
        IMAGES = 2;
        LOCAL = 3;
        NEWS = 4;
        PRODUCTS = 5;
        VIDEO = 7;
      }
}

执行生成 Java 文件的命令:

./protoc --java_out=src/main/java SimpleTest.proto

把 Java 文件编译为 Jar 包

采用 maven 的方式,把生成的 Java 文件编译并打包为 Jar 包,pom.xml 如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.example</groupId>
  <artifactId>pb-demo</artifactId>
  <version>1.0-SNAPSHOT</version>

  <properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <dependencies>
    <!-- 引入 protobuf-java 3.21.2,和 Flink 内置的 protobuf 版本保持一致 -->
    <dependency>
      <groupId>com.google.protobuf</groupId>
      <artifactId>protobuf-java</artifactId>
      <version>3.21.2</version>
      <scope>provided</scope>
    </dependency>
  </dependencies>

  <build>
    <plugins>
    <plugin>
      <artifactId>maven-shade-plugin</artifactId>
      <executions>
        <execution>
          <id>shade-flink</id>
          <phase>package</phase>
          <goals>
            <goal>shade</goal>
          </goals>
          <configuration>
            <shadeTestJar>false</shadeTestJar>
            <artifactSet>
              <includes>
                <include>*:*</include>
              </includes>
            </artifactSet>
          </configuration>
        </execution>
      </executions>
    </plugin>
  </plugins>
</build>

</project>

执行打包命令:

mvn clean package

在 target 目录下,找到最终编译好的 Jar 包。

新建 Flink SQL 1.16 & 1.17 作业,并使用如下 SQL 模版:

写入 pb 格式的数据

CREATE TABLE datagen_source (
  uid BIGINT,
  name STRING,
  category_type INT,
  content BINARY,
  price DOUBLE,
  value_map map<BIGINT, row<v1 BIGINT, v2 INT>>,
  value_arr array<row<v1 BIGINT, v2 INT>>,
  corpus_int INT,
  corpus_str STRING
) WITH (
     'connector' = 'datagen'
);

CREATE TABLE kafka_sink (
  uid BIGINT,
  name STRING,
  category_type INT,
  content BINARY,
  price DOUBLE,
  value_map map<BIGINT, row<v1 BIGINT, v2 INT>>,
  value_arr array<row<v1 BIGINT, v2 INT>>,
  corpus_int INT,
  corpus_str STRING
) WITH (
     'connector' = 'kafka',
     'topic' = 'pb-test',
     'properties.bootstrap.servers' = 'kafka-xxx.kafka.ivolces.com:9092',
     'properties.group.id' = 'testGroup',
     'format' = 'protobuf',
     'protobuf.message-class-name' = 'org.example.SimpleTest',
     'protobuf.ignore-parse-errors' = 'true'
);

INSERT INTO kafka_sink
SELECT *
FROM datagen_source;

读取 pb 格式的数据

CREATE TABLE kafka_source (
  uid BIGINT,
  name STRING,
  category_type INT,
  content BINARY,
  price DOUBLE,
  value_map map<BIGINT, row<v1 BIGINT, v2 INT>>,
  value_arr array<row<v1 BIGINT, v2 INT>>,
  corpus_int INT,
  corpus_str STRING
) WITH (
     'connector' = 'kafka',
     'topic' = 'pb-test',
     'properties.bootstrap.servers' = 'kafka-xxx.kafka.ivolces.com:9092',
     'properties.group.id' = 'testGroup',
     'scan.startup.mode' = 'earliest-offset',
     'format' = 'protobuf',
     'protobuf.message-class-name' = 'com.example.SimpleTest',
     'protobuf.ignore-parse-errors' = 'true'
);

CREATE TABLE print_sink (
  uid BIGINT,
  name STRING,
  category_type INT,
  content BINARY,
  price DOUBLE,
  value_map map<BIGINT, row<v1 BIGINT, v2 INT>>,
  value_arr array<row<v1 BIGINT, v2 INT>>,
  corpus_int INT,
  corpus_str STRING
) WITH (
     'connector' = 'print'
);

INSERT INTO print_sink
SELECT *
FROM kafka_source;

通过文件依赖的方式,把上述生成的 Jar 包先上传到 Flink 平台,再添加该文件依赖到 Flink 作业里。
文件上传参见:https://www.volcengine.com/docs/6581/152196
添加文件依赖的方式如下:

Format 参数

参数

是否必选

向前兼容

默认值

类型

描述

format

必选

no

(none)

String

声明使用的格式,这里应为'protobuf'

protobuf.message-class-name

必选

no

(none)

String

Protobuf 生成类的全名。该名称必须与 proto 定义文件中的消息名称一致。内层类名称支持 $,如 "com.exmample.OuterClass$MessageClass

protobuf.ignore-parse-errors

可选

no

false

Boolean

可选标记,用于跳过出现解析错误的记录,而不是失败。

protobuf.read-default-values

可选

yes

false

Boolean

仅当生成的类的版本是 proto2 时,此选项才有效。如果该值设置为 true,则格式将读取空值作为 proto 文件中定义的默认值。如果该值设置为 false,则当二进制 protobuf 消息中不存在该数据元素时,该格式将生成空值。如果 proto 语法是 proto3,这个值会被强制设置为 true,因为 proto3 的标准是使用默认值。

protobuf.write-null-string-literal

可选

no

""

String

当序列化为 protobuf 数据时,这是一个可选配置,用于在出现空值时在 Protobuf 的 array/map 中指定字符串字面量。

数据类型映射

下表列出了从 Flink 类型到 Protobuf 类型的映射。

Flink SQL 数据类型

Protobuf 数据类型

描述

CHAR / VARCHAR / STRING

string

BOOLEAN

bool

BINARY / VARBINARY

bytes

INT

int32

BIGINT

int64

FLOAT

float

DOUBLE

double

ARRAY

repeated

元素不能为空,可通过 write-null-string-literal指定字符串默认值。

MAP

map

键或值不能为 null,字符串默认值可以通过 write-null-string-literal 指定。

ROW

message

VARCHAR / CHAR / TINYINT / SMALLINT / INTEGER / BIGINT

enum

protobuf 的枚举值可以相应地映射到字符串或 flink 行数。

注意事项

  • 不能为空。
    由于 protobuf 不允许在映射和数组中使用空值,因此我们需要在从 Flink Rows 转换到 Protobuf 时自动生成默认值。

    Protobuf 数据类型

    默认值

    int32 / int64 / float / double

    0

    string

    ""

    bool

    false

    enum

    first element of enum

    binary

    ByteString.EMPTY

    message

    MESSAGE.getDefaultInstance()

  • OneOf 类型字段。
    "oneof" 是 protobuf 中的一种特殊字段类型,它允许您将消息定义为只能有一个字段是非零值。例如,在下面的消息定义中,一个 Person 只能有一种类型(email 或 phone):

    message Person {
      oneof contact_info {
        string email = 1;
        string phone = 2;
      }
    }
    

    在序列化过程中,无法保证同一 oneof 组中的 Flink 字段最多只包含一个有效值。在序列化时,每个字段都是按照 Flink 架构的顺序设置的,因此在同一 oneof 组中,位置较高的字段会优先于位置较低的字段。