You need to enable JavaScript to run this app.
导航
Protobuf
最近更新时间:2024.08.20 17:38:32首次发布时间:2024.08.20 17:38:32
我的收藏
有用
有用
无用
无用

Google Protocol Buffer( 简称 Protobuf)是 Google 公司内部的混合语言数据标准,Protobuf 主要用于 RPC 系统和持续数据存储系统。Protobuf 格式可以读写基于 Protobuf 生成类的 Protobuf 数据。

如何用 Protobuf 格式创建一个表格

以下是使用 Kafka 连接器和 Protobuf 格式创建表的示例。
以下是 proto 定义文件:

syntax = "proto2";
package com.example;
option java_package = "com.example";
option java_multiple_files = true;

message SimpleTest {
    optional int64 uid = 1;
    optional string name = 2;
    optional int32 category_type = 3;
    optional bytes content = 4;
    optional double price = 5;
    map<int64, InnerMessageTest> value_map = 6;
    repeated  InnerMessageTest value_arr = 7;
    optional Corpus corpus_int = 8; 
    optional Corpus corpus_str = 9; 
    
    message InnerMessageTest{
          optional int64 v1 =1;
          optional int32 v2 =2;
    }
    
    enum Corpus {
        UNIVERSAL = 0;
        WEB = 1;
        IMAGES = 2;
        LOCAL = 3;
        NEWS = 4;
        PRODUCTS = 5;
        VIDEO = 7;
      }
}
  1. 使用 protoc 命令将.proto 文件编译成 Java 类。
  2. 然后编译并打包这些类(无需将原 Java 打包到 jar 中)。
  3. 最后,您应在类路径中提供 jar包,例如,在 sql-client 中使用 -j 将其传入。
CREATE TABLE simple_test (
  uid BIGINT,
  name STRING,
  category_type INT,
  content BINARY,
  price DOUBLE,
  value_map map<BIGINT, row<v1 BIGINT, v2 INT>>,
  value_arr array<row<v1 BIGINT, v2 INT>>,
  corpus_int INT,
  corpus_str STRING
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'protobuf',
 'protobuf.message-class-name' = 'com.example.SimpleTest',
 'protobuf.ignore-parse-errors' = 'true'
)

Format 参数

参数

是否必选

向前兼容

默认值

类型

描述

format

必选

no

(none)

String

声明使用的格式,这里应为'protobuf'

protobuf.message-class-name

必选

no

(none)

String

Protobuf 生成类的全名。该名称必须与 proto 定义文件中的消息名称一致。内层类名称支持 $,如 "com.exmample.OuterClass$MessageClass

protobuf.ignore-parse-errors

可选

no

false

Boolean

可选标记,用于跳过出现解析错误的记录,而不是失败。

protobuf.read-default-values

可选

yes

false

Boolean

仅当生成的类的版本是 proto2 时,此选项才有效。如果该值设置为 true,则格式将读取空值作为 proto 文件中定义的默认值。如果该值设置为 false,则当二进制 protobuf 消息中不存在该数据元素时,该格式将生成空值。如果 proto 语法是 proto3,这个值会被强制设置为 true,因为 proto3 的标准是使用默认值。

protobuf.write-null-string-literal

可选

no

""

String

当序列化为 protobuf 数据时,这是一个可选配置,用于在出现空值时在 Protobuf 的 array/map 中指定字符串字面量。

数据类型映射

下表列出了从 Flink 类型到 Protobuf 类型的映射。

Flink SQL 数据类型

Protobuf 数据类型

描述

CHAR / VARCHAR / STRING

string

BOOLEAN

bool

BINARY / VARBINARY

bytes

INT

int32

BIGINT

int64

FLOAT

float

DOUBLE

double

ARRAY

repeated

元素不能为空,可通过 write-null-string-literal指定字符串默认值。

MAP

map

键或值不能为 null,字符串默认值可以通过 write-null-string-literal 指定。

ROW

message

VARCHAR / CHAR / TINYINT / SMALLINT / INTEGER / BIGINT

enum

protobuf 的枚举值可以相应地映射到字符串或 flink 行数。

注意事项

  • 不能为空。
    由于 protobuf 不允许在映射和数组中使用空值,因此我们需要在从 Flink Rows 转换到 Protobuf 时自动生成默认值。

    Protobuf 数据类型

    默认值

    int32 / int64 / float / double

    0

    string

    ""

    bool

    false

    enum

    first element of enum

    binary

    ByteString.EMPTY

    message

    MESSAGE.getDefaultInstance()

  • OneOf 类型字段。
    "oneof" 是 protobuf 中的一种特殊字段类型,它允许您将消息定义为只能有一个字段是非零值。例如,在下面的消息定义中,一个 Person 只能有一种类型(email 或 phone):

    message Person {
      oneof contact_info {
        string email = 1;
        string phone = 2;
      }
    }
    

    在序列化过程中,无法保证同一 oneof 组中的 Flink 字段最多只包含一个有效值。在序列化时,每个字段都是按照 Flink 架构的顺序设置的,因此在同一 oneof 组中,位置较高的字段会优先于位置较低的字段。