You need to enable JavaScript to run this app.
导航
Kafka
最近更新时间:2024.11.12 17:26:32首次发布时间:2024.11.12 17:26:32

Kafka Pipeline 连接器可以用作 Pipeline 的 Data Sink,将数据写入 Kafka。 本文档介绍如何设置 Kafka Pipeline 连接器。

使用限制

  • Kafka Pipeline 连接器暂时仅支持在 Flink 1.16-volcano 及以上引擎版本中使用。

连接器的功能

  • 自动建表
  • 表结构变更同步
  • 数据实时同步

如何创建 Pipeline

从 MySQL 读取数据同步到 Kafka 的 Pipeline 可以定义如下:

source:
  type: mysql
  name: MySQL Source
  hostname: 127.0.0.1
  port: 3306
  username: admin
  password: pass
  tables: adb.\., bdb.user_table_[0-9]+, [app|web].order_\.
  server-id: 5401-5404

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: PLAINTEXT://localhost:62510

pipeline:
  name: MySQL to Kafka Pipeline
  parallelism: 2

Pipeline 连接器配置项

Option

Required

Default

Type

Description

type

required

(none)

String

指定要使用的连接器, 这里需要设置成 'kafka'。

name

optional

(none)

String

Sink 的名称。

value.format

optional

(none)

String

用于序列化 Kafka 消息的值部分数据的格式。可选的填写值包括 debezium-jsoncanal-json, 默认值为 debezium-json,并且目前不支持用户自定义输出格式。

properties.bootstrap.servers

required

(none)

String

用于建立与 Kafka 集群初始连接的主机/端口对列表。

topic

optional

(none)

String

如果配置了这个参数,所有的消息都会发送到这一个主题。

sink.add-tableId-to-header-enabled

optional

(none)

Boolean

如果配置了这个参数,所有的消息都会带上键为 namespace, 'schemaName', 'tableName',值为事件 TableId 里对应的 字符串的 header。

properties.*

optional

(none)

String

将 Kafka 支持的参数传递给 pipeline,参考 Kafka consume options

sink.custom-header

optional

(none)

String

Kafka 记录自定义的 Header。每个 Header 使用 ','分割, 键值使用 ':' 分割。举例来说,可以使用这种方式 'key1:value1,key2:value2'。

使用说明

  • 写入 Kafka 的 topic 默认会是上游表 namespace.schemaName.tableName 对应的字符串,可以通过 pipeline 的 route 功能进行修改。
  • 如果配置了 topic 参数,所有的消息都会发送到这一个主题。
  • 写入 Kafka 的 topic 如果不存在,则会默认创建。

输出格式

对于不同的内置 value.format 选项,输出的格式也是不同的:

debezium-json

参考 Debezium docs, debezium-json 格式会包含 before,after,op,source 几个元素, 但是 ts_ms 字段并不会包含在 source 元素中。
一个输出的示例是:

{
  "before": null,
  "after": {
    "col1": "1",
    "col2": "1"
  },
  "op": "c",
  "source": {
    "db": "default_namespace",
    "table": "table1"
  }
}

canal-json

参考 Canal | Apache Flink, canal-json 格式会包含 old,data,type,database,table,pkNames 几个元素, 但是 ts 并不会包含在其中。
一个输出的示例是:

{
    "old": null,
    "data": [
        {
            "col1": "1",
            "col2": "1"
        }
    ],
    "type": "INSERT",
    "database": "default_schema",
    "table": "table1",
    "pkNames": [
        "col1"
    ]
}

数据类型映射

CDC type

JSON type

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL(p, s)

DECIMAL(p, s)

BOOLEAN

BOOLEAN

DATE

DATE

TIMESTAMP

DATETIME

TIMESTAMP_LTZ

TIMESTAMP_LTZ

CHAR(n)

CHAR(n)

VARCHAR(n)

VARCHAR(n)