Kafka Pipeline 连接器可以用作 Pipeline 的 Data Sink,将数据写入 Kafka。 本文档介绍如何设置 Kafka Pipeline 连接器。
从 MySQL 读取数据同步到 Kafka 的 Pipeline 可以定义如下:
source: type: mysql name: MySQL Source hostname: 127.0.0.1 port: 3306 username: admin password: pass tables: adb.\., bdb.user_table_[0-9]+, [app|web].order_\. server-id: 5401-5404 sink: type: kafka name: Kafka Sink properties.bootstrap.servers: PLAINTEXT://localhost:62510 pipeline: name: MySQL to Kafka Pipeline parallelism: 2
Option | Required | Default | Type | Description |
---|---|---|---|---|
type | required | (none) | String | 指定要使用的连接器, 这里需要设置成 'kafka'。 |
name | optional | (none) | String | Sink 的名称。 |
value.format | optional | (none) | String | 用于序列化 Kafka 消息的值部分数据的格式。可选的填写值包括 debezium-json 和 canal-json, 默认值为 |
properties.bootstrap.servers | required | (none) | String | 用于建立与 Kafka 集群初始连接的主机/端口对列表。 |
topic | optional | (none) | String | 如果配置了这个参数,所有的消息都会发送到这一个主题。 |
sink.add-tableId-to-header-enabled | optional | (none) | Boolean | 如果配置了这个参数,所有的消息都会带上键为 |
properties.* | optional | (none) | String | 将 Kafka 支持的参数传递给 pipeline,参考 Kafka consume options。 |
sink.custom-header | optional | (none) | String | Kafka 记录自定义的 Header。每个 Header 使用 ','分割, 键值使用 ':' 分割。举例来说,可以使用这种方式 'key1:value1,key2:value2'。 |
namespace.schemaName.tableName
对应的字符串,可以通过 pipeline 的 route 功能进行修改。topic
参数,所有的消息都会发送到这一个主题。对于不同的内置 value.format
选项,输出的格式也是不同的:
参考 Debezium docs, debezium-json 格式会包含 before
,after
,op
,source
几个元素, 但是 ts_ms
字段并不会包含在 source
元素中。
一个输出的示例是:
{ "before": null, "after": { "col1": "1", "col2": "1" }, "op": "c", "source": { "db": "default_namespace", "table": "table1" } }
参考 Canal | Apache Flink, canal-json 格式会包含 old
,data
,type
,database
,table
,pkNames
几个元素, 但是 ts
并不会包含在其中。
一个输出的示例是:
{ "old": null, "data": [ { "col1": "1", "col2": "1" } ], "type": "INSERT", "database": "default_schema", "table": "table1", "pkNames": [ "col1" ] }
CDC type | JSON type |
---|---|
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(p, s) | DECIMAL(p, s) |
BOOLEAN | BOOLEAN |
DATE | DATE |
TIMESTAMP | DATETIME |
TIMESTAMP_LTZ | TIMESTAMP_LTZ |
CHAR(n) | CHAR(n) |
VARCHAR(n) | VARCHAR(n) |