Upsert Kafka 连接器支持以 upsert 方式从 Kafka topic 中读取数据并将数据写入 Kafka topic,支持做数据源表和结果表。
Upsert-kafka 连接器暂时仅支持在 Flink 1.16-volcano 引擎版本中使用。
CREATE TABLE upsert_kafka_sink ( user_region STRING, pv BIGINT, uv BIGINT, PRIMARY KEY (user_region) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = '<yourTopicName>', 'properties.bootstrap.servers' = '...', 'key.format' = 'avro', 'value.format' = 'avro' );
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
connector | 是 | (none) | String | 指定使用的连接器,此处是 upsert-kafka 连接器。 |
topic | 是 | (none) | String | 指定用于读取或写入的 Kafka topic 名称。 |
properties.bootstrap.servers | 是 | (none) | String | 以逗号分隔的 Kafka brokers 列表,格式为 |
properties.* | 否 | (none) | String | 传递给 Kafka 的配置参数,如需了解具体的参数,请参见configuration。 |
key.format | 是 | (none) | String | 读取或写入 Kafka 消息 key 部分时使用的序列化和反序列化的格式,支持 |
key.fields | 否 | (none) | String | Kafka 消息 key 部分对应的源表或结果表字段。多个字段名以分号(;)分隔。例如 |
key.fields-prefix | 否 | (none) | String | 为 说明
|
value.format | 是 | (none) | String | 读取或写入 Kafka 消息 value 部分时使用的序列化和反序列化的格式,支持 |
value.fields-include | 否 | ALL | String | 控制哪些字段应该出现在 value 中。
|
scan.parallelism | 否 | (none) | Integer | 单独设置 Source 并发。如果不设置,则并行度为作业的默认并发数。 |
sink.parallelism | 否 | (none) | Integer | 定义 upsert-kafka sink 算子的并行度。默认情况下,与上游算子的并行度保持一致,由框架确定并行度。 |
sink.buffer-flush.max-rows | 否 | 0 | Integer | 最多能缓存多少条记录。默认值为 0,表示不开启缓存。 说明 如果需要开启缓存,则需要同时设置 |
sink.buffer-flush.interval | 否 | 0 | Duration | 缓存刷新的间隔时间,超过该时间后将刷新缓存数据。默认值为 0,表示不开启缓存。 说明 如果需要开启缓存,则需要同时设置 |
源表
CREATE TABLE upsert_source ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar, order_update_time timestamp, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'source', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'group_01', 'key.format' = 'json', 'value.format' = 'json' ); CREATE TABLE print_sink ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar, order_update_time timestamp ) WITH ( 'connector' = 'print' ); insert into print_sink select * from upsert_source;
结果表
CREATE TABLE datagen_source ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar, order_update_time as localtimestamp ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '5' ); CREATE TABLE upsert_sink ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar, order_update_time timestamp, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'sink', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'group_01', 'key.format' = 'json', 'value.format' = 'json' ); INSERT INTO upsert_sink SELECT * FROM datagen_source;