You need to enable JavaScript to run this app.
导航
Protobuf
最近更新时间:2024.08.20 17:38:32首次发布时间:2024.08.20 17:38:32

Google Protocol Buffer( 简称 Protobuf)是 Google 公司内部的混合语言数据标准,Protobuf 主要用于 RPC 系统和持续数据存储系统。Protobuf 格式可以读写基于 Protobuf 生成类的 Protobuf 数据。

如何用 Protobuf 格式创建一个表格

以下是使用 Kafka 连接器和 Protobuf 格式创建表的示例。
以下是 proto 定义文件:

syntax = "proto2";
package com.example;
option java_package = "com.example";
option java_multiple_files = true;

message SimpleTest {
    optional int64 uid = 1;
    optional string name = 2;
    optional int32 category_type = 3;
    optional bytes content = 4;
    optional double price = 5;
    map<int64, InnerMessageTest> value_map = 6;
    repeated  InnerMessageTest value_arr = 7;
    optional Corpus corpus_int = 8; 
    optional Corpus corpus_str = 9; 
    
    message InnerMessageTest{
          optional int64 v1 =1;
          optional int32 v2 =2;
    }
    
    enum Corpus {
        UNIVERSAL = 0;
        WEB = 1;
        IMAGES = 2;
        LOCAL = 3;
        NEWS = 4;
        PRODUCTS = 5;
        VIDEO = 7;
      }
}
  1. 使用 protoc 命令将.proto 文件编译成 Java 类。
  2. 然后编译并打包这些类(无需将原 Java 打包到 jar 中)。
  3. 最后,您应在类路径中提供 jar包,例如,在 sql-client 中使用 -j 将其传入。
CREATE TABLE simple_test (
  uid BIGINT,
  name STRING,
  category_type INT,
  content BINARY,
  price DOUBLE,
  value_map map<BIGINT, row<v1 BIGINT, v2 INT>>,
  value_arr array<row<v1 BIGINT, v2 INT>>,
  corpus_int INT,
  corpus_str STRING
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'protobuf',
 'protobuf.message-class-name' = 'com.example.SimpleTest',
 'protobuf.ignore-parse-errors' = 'true'
)

Format 参数

参数

是否必选

向前兼容

默认值

类型

描述

format

必选

no

(none)

String

声明使用的格式,这里应为'protobuf'

protobuf.message-class-name

必选

no

(none)

String

Protobuf 生成类的全名。该名称必须与 proto 定义文件中的消息名称一致。内层类名称支持 $,如 "com.exmample.OuterClass$MessageClass

protobuf.ignore-parse-errors

可选

no

false

Boolean

可选标记,用于跳过出现解析错误的记录,而不是失败。

protobuf.read-default-values

可选

yes

false

Boolean

仅当生成的类的版本是 proto2 时,此选项才有效。如果该值设置为 true,则格式将读取空值作为 proto 文件中定义的默认值。如果该值设置为 false,则当二进制 protobuf 消息中不存在该数据元素时,该格式将生成空值。如果 proto 语法是 proto3,这个值会被强制设置为 true,因为 proto3 的标准是使用默认值。

protobuf.write-null-string-literal

可选

no

""

String

当序列化为 protobuf 数据时,这是一个可选配置,用于在出现空值时在 Protobuf 的 array/map 中指定字符串字面量。

数据类型映射

下表列出了从 Flink 类型到 Protobuf 类型的映射。

Flink SQL 数据类型

Protobuf 数据类型

描述

CHAR / VARCHAR / STRING

string

BOOLEAN

bool

BINARY / VARBINARY

bytes

INT

int32

BIGINT

int64

FLOAT

float

DOUBLE

double

ARRAY

repeated

元素不能为空,可通过 write-null-string-literal指定字符串默认值。

MAP

map

键或值不能为 null,字符串默认值可以通过 write-null-string-literal 指定。

ROW

message

VARCHAR / CHAR / TINYINT / SMALLINT / INTEGER / BIGINT

enum

protobuf 的枚举值可以相应地映射到字符串或 flink 行数。

注意事项

  • 不能为空。
    由于 protobuf 不允许在映射和数组中使用空值,因此我们需要在从 Flink Rows 转换到 Protobuf 时自动生成默认值。

    Protobuf 数据类型

    默认值

    int32 / int64 / float / double

    0

    string

    ""

    bool

    false

    enum

    first element of enum

    binary

    ByteString.EMPTY

    message

    MESSAGE.getDefaultInstance()

  • OneOf 类型字段。
    "oneof" 是 protobuf 中的一种特殊字段类型,它允许您将消息定义为只能有一个字段是非零值。例如,在下面的消息定义中,一个 Person 只能有一种类型(email 或 phone):

    message Person {
      oneof contact_info {
        string email = 1;
        string phone = 2;
      }
    }
    

    在序列化过程中,无法保证同一 oneof 组中的 Flink 字段最多只包含一个有效值。在序列化时,每个字段都是按照 Flink 架构的顺序设置的,因此在同一 oneof 组中,位置较高的字段会优先于位置较低的字段。