Filesystem 连接器提供对常见的文件系统(如 CloudFS)的读写能力,支持做数据源表和结果表。您可以创建 source 流从文件系统目录下获取数据,作为作业的输入数据;也可以将作业输出数据写入到文件系统指定目录下。
CREATE TABLE filesystem_source (
name String,
score INT
) WITH (
'connector' = 'filesystem',
'path' = '<yourfilepath>',
'format' = 'json'
);
CREATE TABLE filesystem_sink (
name String,
score INT
) WITH (
'connector' = 'filesystem',
'path' = '<yourfilepath>',
'sink.rolling-policy.file-size' = '1M',
'sink.rolling-policy.rollover-interval' = '10 min',
'format' = 'json'
);
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
connector | 是 | (none) | String | 指定使用的连接器,此处是 Filesystem 连接器。 |
path | 是 | (none) | String | 文件路径。 |
sink.rolling-policy.file-size | 否 | 128MB | MemorySize | 文件内存最大限制。 注意 写入文件时,如果需要尽快在文件系统中看到写入的文件,需要增加部分配置。您可以设置连接器的 sink.rolling-policy.file-size、sink.rolling-policy.rollover-interval 参数,以及在 Flink 参数中开启 Checkpoint。 |
sink.rolling-policy.rollover-interval | 否 | 30min | Duration | 文件最大持续写入时间。 注意 写入文件时,如果需要尽快在文件系统中看到写入的文件,需要增加部分配置。您可以设置连接器的 sink.rolling-policy.file-size、sink.rolling-policy.rollover-interval 参数,以及在 Flink 参数中开启 Checkpoint。 |
sink.rolling-policy.check-interval | 否 | 1min | Duration | 文件检查间隔。 |
sink.partition-commit.trigger | 否 | process-time | String | 分区关闭策略。取值如下:
|
sink.partition-commit.delay | 否 | 0s | Duration | 分区关闭延迟。 |
partition.time-extractor.kind | 否 | default | String | 分区时间抽取方式。 |
partition.time-extractor.class | 否 | (none) | String | 分区时间抽取类。 |
partition.time-extractor.timestamp-pattern | 否 | (none) | String | 分区时间戳的抽取格式。需要写成 yyyy-MM-dd HH:mm:ss 的形式,并用 Hive 表中相应的分区字段做占位符替换。默认支持第一个字段为 yyyy-mm-dd hh:mm:ss。
|
sink.partition-commit.policy.kind | 是 | (none) | String | 用于提交分区的策略。取值如下:
|
sink.partition-commit.policy.class | 否 | (none) | String | 分区提交类。 |
format | 是 | (none) | String | 必须要指定文件格式。当前支持文件格式可以参考 概览。 |
结果表
CREATE TABLE datagen_source (
order_id bigint,
order_product_id bigint,
order_customer_id bigint,
order_status varchar
)
WITH
(
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.order_status.length' = '3',
'fields.order_id.min' = '1',
'fields.order_id.max' = '1000',
'fields.order_product_id.min' = '1',
'fields.order_product_id.max' = '100',
'fields.order_customer_id.min' = '1',
'fields.order_customer_id.max' = '1000'
);
CREATE TABLE file_sink (
order_id bigint,
order_product_id bigint,
order_customer_id bigint,
order_status varchar
)
WITH
(
'connector' = 'filesystem',
'path' = 'tos://doc-test/test_write_tos/',
'format' = 'json'
);
INSERT INTO file_sink
SELECT * FROM datagen_source;
源表
CREATE TABLE file_source(
order_id bigint,
order_product_id bigint,
order_customer_id bigint,
order_status varchar
) WITH (
'connector' = 'filesystem',
'path' = 'tos://doc-test/test_write_tos/',
'format' = 'json'
);
create table print_sink (
order_id bigint,
order_product_id bigint,
order_customer_id bigint,
order_status varchar
)
with
('connector' = 'print');
insert into print_sink
SELECT * FROM file_source;