You need to enable JavaScript to run this app.
导航
Confluent Avro
最近更新时间:2024.08.20 17:38:32首次发布时间:2024.08.20 17:38:32

Avro Schema Registry (avro-confluent) 格式能让你读取被 io.confluent.kafka.serializers.KafkaAvroSerializer 序列化的记录,以及可以写入成能被 io.confluent.kafka.serializers.KafkaAvroDeserializer 反序列化的记录。
当以这种格式读取(反序列化)记录时,将根据记录中编码的 schema 版本 id 从配置的 Confluent Schema Registry 中获取 Avro writer schema ,而从 table schema 中推断出 reader schema。
当以这种格式写入(序列化)记录时,Avro schema 是从 table schema 中推断出来的,并会用来检索要与数据一起编码的 schema id。我们会在配置的 Confluent Schema Registry 中配置的 subject 下,检索 schema id。subject 通过 avro-confluent.subject 参数来指定。
Avro Schema Registry 格式只能与 Apache Kafka SQL 连接器或 Upsert Kafka SQL 连接器一起使用。

如何创建使用 Avro-Confluent 格式的表

以下是一个使用 Kafka 连接器和 Confluent Avro 格式创建表的示例。
SQL 使用原始的 UTF-8 字符串作为 Kafka 的 key,Schema Registry 中注册的 Avro 记录作为 Kafka 的 values 的表的示例:

CREATE TABLE user_created (

  -- 该列映射到 Kafka 原始的 UTF-8 key
  the_kafka_key STRING,
  
  -- 映射到 Kafka value 中的 Avro 字段的一些列
  id STRING,
  name STRING,
  email STRING

) WITH (

  'connector' = 'kafka',
  'topic' = 'user_events_example1',
  'properties.bootstrap.servers' = 'localhost:9092',

  -- UTF-8 字符串作为 Kafka 的 keys,使用表中的 'the_kafka_key' 列
  'key.format' = 'raw',
  'key.fields' = 'the_kafka_key',

  'value.format' = 'avro-confluent',
  'value.avro-confluent.url' = 'http://localhost:8082',
  'value.fields-include' = 'EXCEPT_KEY'
)

我们可以像下面这样将数据写入到 kafka 表中:

INSERT INTO user_created
SELECT
  -- 将 user id 复制至映射到 kafka key 的列中
  id as the_kafka_key,

  -- 所有的 values
  id, name, email
FROM some_table

Kafka 的 key 和 value 在 Schema Registry 中都注册为 Avro 记录的表的示例:

CREATE TABLE user_created (
  
  -- 该列映射到 Kafka key 中的 Avro 字段 'id'
  kafka_key_id STRING,
  
  -- 映射到 Kafka value 中的 Avro 字段的一些列
  id STRING,
  name STRING, 
  email STRING
  
) WITH (

  'connector' = 'kafka',
  'topic' = 'user_events_example2',
  'properties.bootstrap.servers' = 'localhost:9092',

  -- 注意:由于哈希分区,在 Kafka key 的上下文中,schema 升级几乎从不向后也不向前兼容。
  'key.format' = 'avro-confluent',
  'key.avro-confluent.url' = 'http://localhost:8082',
  'key.fields' = 'kafka_key_id',

  -- 在本例中,我们希望 Kafka 的 key 和 value 的 Avro 类型都包含 'id' 字段
  -- => 给表中与 Kafka key 字段关联的列添加一个前缀来避免冲突
  'key.fields-prefix' = 'kafka_key_',

  'value.format' = 'avro-confluent',
  'value.avro-confluent.url' = 'http://localhost:8082',
  'value.fields-include' = 'EXCEPT_KEY',
   
  -- 自 Flink 1.13 起,subjects 具有一个默认值, 但是可以被覆盖:
  'key.avro-confluent.subject' = 'user_events_example2-key2',
  'value.avro-confluent.subject' = 'user_events_example2-value2'
)

使用 upsert-kafka 连接器,Kafka 的 value 在 Schema Registry 中注册为 Avro 记录的表的示例:

CREATE TABLE user_created (
  
  -- 该列映射到 Kafka 原始的 UTF-8 key
  kafka_key_id STRING,
  
  -- 映射到 Kafka value 中的 Avro 字段的一些列
  id STRING, 
  name STRING, 
  email STRING, 
  
  -- upsert-kafka 连接器需要一个主键来定义 upsert 行为
  PRIMARY KEY (kafka_key_id) NOT ENFORCED

) WITH (

  'connector' = 'upsert-kafka',
  'topic' = 'user_events_example3',
  'properties.bootstrap.servers' = 'localhost:9092',

  -- UTF-8 字符串作为 Kafka 的 keys
  -- 在本例中我们不指定 'key.fields',因为它由表的主键决定
  'key.format' = 'raw',
  
  -- 在本例中,我们希望 Kafka 的 key 和 value 的 Avro 类型都包含 'id' 字段
  -- => 给表中与 Kafka key 字段关联的列添加一个前缀来避免冲突
  'key.fields-prefix' = 'kafka_key_',

  'value.format' = 'avro-confluent',
  'value.avro-confluent.url' = 'http://localhost:8082',
  'value.fields-include' = 'EXCEPT_KEY'
)

Format 参数

参数

是否必选

默认值

类型

描述

format

必选

(none)

String

指定要使用的格式,此处应为 'avro-confluent'

avro-confluent.basic-auth.credentials-source

可选

(none)

String

指定 Schema Registry 的基本认证资格 source。

avro-confluent.basic-auth.user-info

可选

(none)

String

指定 Schema Registry 的基本认证用户信息。

avro-confluent.bearer-auth.credentials-source

可选

(none)

String

指定 Schema Registry 的持有者认证资格 source。

avro-confluent.bearer-auth.token

可选

(none)

String

指定 Schema Registry 的持有者认证令牌。

avro-confluent.properties

可选

(none)

Map

属性映射,该映射被转发到 Schema Registry。对于没有通过 Flink 配置选项正式公开的选项非常有用。

重要 Flink 选项具有更高的优先级。

avro-confluent.ssl.keystore.location

可选

(none)

String

指定 SSL keystore 的位置。

avro-confluent.ssl.keystore.password

可选

(none)

String

指定 SSL keystore 的密码。

avro-confluent.ssl.truststore.location

可选

(none)

String

指定 SSL truststore 的位置。

avro-confluent.ssl.truststore.password

可选

(none)

String

指定 SSL truststore 的密码。

avro-confluent.subject

可选

(none)

String

指定 Confluent Schema Registry subject,在该 subject 下注册此格式在序列化期间使用的模式。默认情况下,如果使用 kafka 和 upsert-kafka 连接器作为值或键格式,则使用-value 或-key 作为默认 subject 名称。对于 filesystem 连接器,当其作为结果表使用时,必须使用 subject 选项。

avro-confluent.url

必选

(none)

String

指定用于获取或注册 schemas 的 Confluent Schema Registry 的 URL。

数据类型映射

目前 Apache Flink 都是从 table schema 去推断反序列化期间的 Avro reader schema 和序列化期间的 Avro writer schema。显式地定义 Avro schema 暂不支持。因此下表列出了从 Flink 类型到 Avro 类型的类型映射。

Flink SQL 数据类型

Avro 数据类型

Avro 逻辑类型

CHAR / VARCHAR / STRING

string

BOOLEAN

boolean

BINARY / VARBINARY

bytes

DECIMAL

fixed

decimal

TINYINT

int

SMALLINT

int

INT

int

BIGINT

long

FLOAT

float

DOUBLE

double

DATE

int

date

TIME

int

time-millis

TIMESTAMP

long

timestamp-millis

ARRAY

array

MAP
(key 必须是 string/char/varchar 类型)

map

MULTISET
(元素必须是 string/char/varchar 类型)

map

ROW

record

除了此处列出的类型之外,Flink 还支持读取/写入可为空(nullable)的类型。 Flink 将可为空的类型映射到 Avro union(something, null), 其中 something 是从 Flink 类型转换的 Avro 类型。