Apache Pulsar 是一个开源的的分布式 pub-sub 消息系统。Pulsar 连接器提供从 Pulsar Topic 中消费和写入数据的能力,支持做数据源表和结果表。
Pulsar 连接器暂时仅支持在 Flink V1.11 引擎版本中使用。
CREATE TABLE pulsar_source ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar ) WITH ( 'connector' = 'pulsar', 'topic' = 'persistent://public/default/topic82547611', 'service-url' = 'pulsar://localhost:6650', 'admin-url' = 'http://localhost:8080', 'auth-params' = 'token:{topic token}', 'auth-plugin-classname'='org.apache.pulsar.client.impl.auth.AuthenticationToken', 'format' = 'json', 'scan.startup.mode' = 'latest' );
CREATE TABLE pulsar_sink ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar ) WITH ( 'connector' = 'pulsar', 'topic' = 'persistent://public/default/topic82547611', 'service-url' = 'pulsar://localhost:6650', 'admin-url' = 'http://localhost:8080', 'auth-params' = 'token:{topic token}', 'auth-plugin-classname'='org.apache.pulsar.client.impl.auth.AuthenticationToken', 'format' = 'json', 'scan.startup.mode' = 'latest' );
参数 | 是否必选 | 默认值 | 描述 |
---|---|---|---|
connector | 是 | (none) | 指定使用的连接器,此处是 pulsar 连接器。 |
service-url | 是 | (none) | Pulsar broker 的服务地址,如 |
admin-url | 是 | (none) | Pulsar Admin 的服务地址,如 |
topic | 否 | (none) | 指定读取数据的 Topic 的名称。如果从多个 Topic 中读取数据,Topic 间使用英文逗号(,)分隔。 |
topic-pattern | 否 | (none) | 使用正则表达式匹配读取数据的 Topic。 |
scan.startup.mode | 否 | latest | Source 的启动模式。支持 |
scan.startup.specific-offsets | 否 | (none) | 指定消息偏移量。 |
scan.startup.sub-name | 否 | (none) | 当选择 |
discovery topic interval | 否 | (none) | 分区发现时间间隔,单位毫秒。 |
properties | 否 | empty | Pulsar 可选的配置集,格式为 |
key.format | 否 | (none) | Pulsar 消息的键的序列化格式。支持 |
key.fields | 否 | (none) | 序列化键时需要使用的 SQL 定义字段。如有多个字段,使用英文逗号(,)连接。 |
key.fields-prefix | 否 | (none) | 为键格式的所有字段定义一个自定义前缀,以避免名称与值格式的字段冲突。 |
format或value.format | 是 | (none) | Pulsar 消息正文的序列化格式。支持 |
value.fields-include | 否 | ALL | Pulsar 消息正文包含的字段策略。支持 |
sink.message-router | 否 | key-hash | 写消息到 Pulsar 分区的路由方式。支持 |
sink.semantic | 否 | at-least-once | Sink 写消息的保障级别。支持 |
参数 | 默认值 | 描述 |
---|---|---|
topic | (none) | Pulsar Topic。 |
topics | (none) | 使用英文逗号(,)连接的多个 Pulsar Topic。 |
topicspattern | (none) | 使用 Java 正则表达式匹配多个 pulsar Topic。 |
partition.discovery.interval-millis | -1 | 自动获取 Topic 元数据的时间间隔。 取值为 -1,表示禁用该功能;取值大于 0,表示启用该功能。 |
clientcachesize | 100 | Pulsar 客户端的缓存数量。 |
auth-plugin-classname | (none) | Pulsar 客户端的鉴权类。 |
auth-params | (none) | Pulsar 客户端的鉴权参数。 |
polltimeoutms | 120000 | 等待获取下一条消息的超时时间,单位为毫秒。 |
pulsar.reader.fail-on-data-loss | true | 数据丢失时,是否失败。 |
pulsar.reader.use-earliest-when-data-loss | false | 数据丢失时,使用earliest重置offset。 |
commitmaxretries | 3 | 向 Pulsar 消息偏移 offset 时,最大重试次数。 |
scan.startup.mode | latest | Source 的启动模式。支持 |
enable-key-hash-range | false | 开启 Pulsar Key-Shared 订阅模式。 |
pulsar.reader.* | Pulsar reader 的详细配置。 | |
pulsar.reader.subscriptionRolePrefix | flink-pulsar- | 未指定订阅者时,自动创建订阅者名称的前缀。 |
pulsar.reader.receiverQueueSize | 1000 | 接收队列大小。 |
flushoncheckpoint | true | 在 Flink snapshotState 时,向 Pulsar Topic 中写入消息。 |
failonwrite | false | Sink 出错时,继续确认消息。 |
send-delay-millisecond | 0 | 延迟消息发送,单位毫秒。 |
pulsar.producer.sendTimeoutMs | 30000 | 发送消息时的超时时间,单位为毫秒。 |
pulsar.producer.blockIfQueueFull | false | Producer 写入消息的队列满时,支持阻塞方法,而不是抛出异常。 |
create table pulsar_table ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar ) WITH ( 'connector' = 'pulsar', 'topic' = 'persistent://public/default/topic82547611', 'service-url' = 'pulsar://localhost:6650', 'admin-url' = 'http://localhost:8080', 'auth-params' = 'token:{topic token}', 'auth-plugin-classname'='org.apache.pulsar.client.impl.auth.AuthenticationToken', 'format' = 'json', 'scan.startup.mode' = 'latest' ); create table print_sink ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar ) WITH ( 'connector' = 'print', 'print-sample-ratio' = '1' ); insert into print_sink select * from pulsar_table;