Filesystem 连接器提供对常见的文件系统(如 CloudFS)的读写能力,支持做数据源表和结果表。您可以创建 source 流从文件系统目录下获取数据,作为作业的输入数据;也可以将作业输出数据写入到文件系统指定目录下。
CREATE TABLE filesystem_source ( name String, score INT ) WITH ( 'connector' = 'filesystem', 'path' = '<yourfilepath>' );
CREATE TABLE filesystem_sink ( name String, score INT ) WITH ( 'connector' = 'filesystem', 'path' = '<yourfilepath>', 'sink.rolling-policy.file-size' = '1M', 'sink.rolling-policy.rollover-interval' = '10 min', 'format' = 'json' );
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
connector | 是 | (none) | String | 指定使用的连接器,此处是 Filesystem 连接器。 |
path | 是 | (none) | String | 文件路径。 |
sink.rolling-policy.file-size | 否 | 128MB | MemorySize | 文件内存最大限制。 注意 写入文件时,如果需要尽快在文件系统中看到写入的文件,需要增加部分配置。您可以设置连接器的 sink.rolling-policy.file-size、sink.rolling-policy.rollover-interval 参数,以及在 Flink 参数中开启 Checkpoint。 |
sink.rolling-policy.rollover-interval | 否 | 30min | Duration | 文件最大持续写入时间。 注意 写入文件时,如果需要尽快在文件系统中看到写入的文件,需要增加部分配置。您可以设置连接器的 sink.rolling-policy.file-size、sink.rolling-policy.rollover-interval 参数,以及在 Flink 参数中开启 Checkpoint。 |
sink.rolling-policy.check-interval | 否 | 1min | Duration | 文件检查间隔。 |
sink.partition-commit.trigger | 否 | process-time | String | 分区关闭策略。取值如下:
|
sink.partition-commit.delay | 否 | 0s | Duration | 分区关闭延迟。 |
partition.time-extractor.kind | 否 | default | String | 分区时间抽取方式。 |
partition.time-extractor.class | 否 | (none) | String | 分区时间抽取类。 |
partition.time-extractor.timestamp-pattern | 否 | (none) | String | 分区时间戳的抽取格式。需要写成 yyyy-MM-dd HH:mm:ss 的形式,并用 Hive 表中相应的分区字段做占位符替换。默认支持第一个字段为 yyyy-mm-dd hh:mm:ss。
|
sink.partition-commit.policy.kind | 是 | (none) | String | 用于提交分区的策略。取值如下:
|
sink.partition-commit.policy.class | 否 | (none) | String | 分区提交类。 |
结果表
CREATE TABLE datagen_source ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1', 'fields.order_status.length' = '3', 'fields.order_id.min' = '1', 'fields.order_id.max' = '1000', 'fields.order_product_id.min' = '1', 'fields.order_product_id.max' = '100', 'fields.order_customer_id.min' = '1', 'fields.order_customer_id.max' = '1000' ); CREATE TABLE file_sink ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar ) WITH ( 'connector' = 'filesystem', 'path' = 'tos://doc-test/test_write_tos/', 'format' = 'json' ); INSERT INTO file_sink SELECT * FROM datagen_source;
源表
CREATE TABLE file_source( order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar ) WITH ( 'connector' = 'filesystem', 'path' = 'tos://doc-test/test_write_tos/', 'format' = 'json' ); create table print_sink ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar ) with ('connector' = 'print'); insert into print_sink SELECT * FROM file_source;