You need to enable JavaScript to run this app.
导航
Pulsar
最近更新时间:2025.01.14 17:41:38首次发布时间:2023.09.12 16:22:50

Apache Pulsar 是一个开源的的分布式 pub-sub 消息系统。Pulsar 连接器提供从 Pulsar Topic 中消费和写入数据的能力,支持做数据源表和结果表。

注意事项

Pulsar 连接器暂时仅支持在 Flink V1.11 引擎版本中使用。

DDL定义

用作数据源

CREATE TABLE pulsar_source (
    order_id bigint,
    order_product_id bigint,
    order_customer_id bigint,
    order_status varchar
 ) WITH (
     'connector' = 'pulsar',
     'topic' = 'persistent://public/default/topic82547611',
     'service-url' = 'pulsar://localhost:6650',
     'admin-url' = 'http://localhost:8080', 
     'auth-params' = 'token:{topic token}',
     'auth-plugin-classname'='org.apache.pulsar.client.impl.auth.AuthenticationToken',     
     'format' = 'json',
     'scan.startup.mode' = 'latest'
 );

用作数据目的

CREATE TABLE pulsar_sink (
    order_id bigint,
    order_product_id bigint,
    order_customer_id bigint,
    order_status varchar
 ) WITH (
     'connector' = 'pulsar',
     'topic' = 'persistent://public/default/topic82547611',
     'service-url' = 'pulsar://localhost:6650',
     'admin-url' = 'http://localhost:8080', 
     'auth-params' = 'token:{topic token}',
     'auth-plugin-classname'='org.apache.pulsar.client.impl.auth.AuthenticationToken',     
     'format' = 'json',
     'scan.startup.mode' = 'latest'
 );

WITH参数

参数

是否必选

默认值

描述

connector

(none)

指定使用的连接器,此处是 pulsar 连接器。

service-url

(none)

Pulsar broker 的服务地址,如pulsar://localhost:6650

admin-url

(none)

Pulsar Admin 的服务地址,如http://localhost:8080

topic

(none)

指定读取数据的 Topic 的名称。如果从多个 Topic 中读取数据,Topic 间使用英文逗号(,)分隔。
该参数与topic-pattern参数互斥,两者只用其一。

topic-pattern

(none)

使用正则表达式匹配读取数据的 Topic。
该参数与topic参数互斥,两者只用其一。

scan.startup.mode

latest

Source 的启动模式。支持 earliestlatestexternal-subscriptionspecific-offsets 选项。

scan.startup.specific-offsets

(none)

指定消息偏移量。
当选择specific-offsets启动模式时,需要设置消息偏移量。

scan.startup.sub-name

(none)

当选择external-subscription启动模式时,需要设置该参数。

discovery topic interval

(none)

分区发现时间间隔,单位毫秒。

properties

empty

Pulsar 可选的配置集,格式为 properties.key='value'。支持的的配置参数,请参见本文中的可选配置参集

key.format

(none)

Pulsar 消息的键的序列化格式。支持 rawavrojson 等格式。

key.fields

(none)

序列化键时需要使用的 SQL 定义字段。如有多个字段,使用英文逗号(,)连接。

key.fields-prefix

(none)

为键格式的所有字段定义一个自定义前缀,以避免名称与值格式的字段冲突。
默认情况下,前缀为空。如果定义了自定义前缀,则 Table 模式和 'key.fields' 都将使用带前缀的名称。构造密钥格式的数据类型时,前缀将被删除,并且密钥格式内使用非前缀名称。

format或value.format

(none)

Pulsar 消息正文的序列化格式。支持 jsonavro 等格式。

value.fields-include

ALL

Pulsar 消息正文包含的字段策略。支持 ALLEXCEPT_KEY 选项。

sink.message-router

key-hash

写消息到 Pulsar 分区的路由方式。支持 key-hashround-robin、自定义 MessageRouter 实现类的引用路径。

sink.semantic

at-least-once

Sink 写消息的保障级别。支持 at-least-onceexactly-oncenone 选项。

可选配置集

参数

默认值

描述

topic

(none)

Pulsar Topic。

topics

(none)

使用英文逗号(,)连接的多个 Pulsar Topic。

topicspattern

(none)

使用 Java 正则表达式匹配多个 pulsar Topic。

partition.discovery.interval-millis

-1

自动获取 Topic 元数据的时间间隔。 取值为 -1,表示禁用该功能;取值大于 0,表示启用该功能。

clientcachesize

100

Pulsar 客户端的缓存数量。

auth-plugin-classname

(none)

Pulsar 客户端的鉴权类。

auth-params

(none)

Pulsar 客户端的鉴权参数。

polltimeoutms

120000

等待获取下一条消息的超时时间,单位为毫秒。

pulsar.reader.fail-on-data-loss

true

数据丢失时,是否失败。

pulsar.reader.use-earliest-when-data-loss

false

数据丢失时,使用earliest重置offset。

commitmaxretries

3

向 Pulsar 消息偏移 offset 时,最大重试次数。

scan.startup.mode

latest

Source 的启动模式。支持 earliestlatestexternal-subscriptionspecific-offsets 选项。

enable-key-hash-range

false

开启 Pulsar Key-Shared 订阅模式。

pulsar.reader.*

Pulsar reader 的详细配置。

pulsar.reader.subscriptionRolePrefix

flink-pulsar-

未指定订阅者时,自动创建订阅者名称的前缀。

pulsar.reader.receiverQueueSize

1000

接收队列大小。

flushoncheckpoint

true

在 Flink snapshotState 时,向 Pulsar Topic 中写入消息。

failonwrite

false

Sink 出错时,继续确认消息。

send-delay-millisecond

0

延迟消息发送,单位毫秒。

pulsar.producer.sendTimeoutMs

30000

发送消息时的超时时间,单位为毫秒。

pulsar.producer.blockIfQueueFull

false

Producer 写入消息的队列满时,支持阻塞方法,而不是抛出异常。

示例代码

create table pulsar_table (
    order_id bigint,
    order_product_id bigint,
    order_customer_id bigint,
    order_status varchar
) WITH (
  'connector' = 'pulsar',
  'topic' = 'persistent://public/default/topic82547611',
  'service-url' = 'pulsar://localhost:6650',
  'admin-url' = 'http://localhost:8080', 
  'auth-params' = 'token:{topic token}',
  'auth-plugin-classname'='org.apache.pulsar.client.impl.auth.AuthenticationToken',    
  'format' = 'json',
  'scan.startup.mode' = 'latest'
);

create table print_sink (
    order_id bigint,
    order_product_id bigint,
    order_customer_id bigint,
    order_status varchar
) WITH (
  'connector' = 'print',
  'print-sample-ratio' = '1'
);

insert into print_sink
select * from pulsar_table;