You need to enable JavaScript to run this app.
导航
Filesystem
最近更新时间:2023.09.12 16:22:50首次发布时间:2022.09.08 17:27:42

Filesystem 连接器提供对常见的文件系统(如 CloudFS)的读写能力,支持做数据源表和结果表。您可以创建 source 流从文件系统目录下获取数据,作为作业的输入数据;也可以将作业输出数据写入到文件系统指定目录下。

DDL 定义

用作数据源(Source)

CREATE TABLE filesystem_source (
    name String,
    score INT
 ) WITH (
     'connector' = 'filesystem',
     'path' = '<yourfilepath>'
 );

用作数据目的(Sink)

CREATE TABLE filesystem_sink (
    name String,
    score INT
 ) WITH (
     'connector' = 'filesystem',
     'path' = '<yourfilepath>',
     'sink.rolling-policy.file-size' = '1M',
     'sink.rolling-policy.rollover-interval' = '10 min',
     'format' = 'json' 
 );

WITH 参数

参数

是否必选

默认值

数据类型

描述

connector

(none)

String

指定使用的连接器,此处是 Filesystem 连接器。

path

(none)

String

文件路径。
比如您在使用对象存储 TOS 时,路径格式为tos://bucket_name/file_path/

sink.rolling-policy.file-size

128MB

MemorySize

文件内存最大限制。
当前写入的文件大小达到阈值时,写入的文件将被关闭,并打开一个新的文件进行写入。

注意

写入文件时,如果需要尽快在文件系统中看到写入的文件,需要增加部分配置。您可以设置连接器的 sink.rolling-policy.file-sizesink.rolling-policy.rollover-interval 参数,以及在 Flink 参数中开启 Checkpoint。
如需详细了解 Filesystem 连接器的滚动策略,请参见开源文档Filesystem-Rolling Policy

sink.rolling-policy.rollover-interval

30min

Duration

文件最大持续写入时间。
当前写入的时间超过了阈值时,写入的文件将被关闭,并打开一个新的文件进行写入。

注意

写入文件时,如果需要尽快在文件系统中看到写入的文件,需要增加部分配置。您可以设置连接器的 sink.rolling-policy.file-sizesink.rolling-policy.rollover-interval 参数,以及在 Flink 参数中开启 Checkpoint。
如需详细了解 Filesystem 连接器的滚动策略,请参见开源文档Filesystem-Rolling Policy

sink.rolling-policy.check-interval

1min

Duration

文件检查间隔。
FileSystem 按照这个间隔检查文件的写入时间是否已经满足了关闭条件,并将满足条件的文件进行关闭。

sink.partition-commit.trigger

process-time

String

分区关闭策略。取值如下:

  • process-time:当分区创建超过一定时间后将这个分区关闭,分区创建时间为分区创建时的物理时间。
  • partition-time:当分区创建超过一定时间之后将这个分区关闭,分区创建时间从分区中抽取出来。
    partition-time 依赖于 watermark 生成,需要配合 wartermark 才能支持自动分区发现。当 watermark 时间超过了从分区抽取的时间delay 参数配置时间之和后会提交分区。

sink.partition-commit.delay

0s

Duration

分区关闭延迟。
当分区在创建超过一定时间之后将被关闭。

partition.time-extractor.kind

default

String

分区时间抽取方式。
这个配置仅当 sink.partition-commit.trigger 配置为 partition-time 时生效。如果用户有自定义的分区时间抽取方法,配置为 custom

partition.time-extractor.class

(none)

String

分区时间抽取类。

partition.time-extractor.timestamp-pattern

(none)

String

分区时间戳的抽取格式。需要写成 yyyy-MM-dd HH:mm:ss 的形式,并用 Hive 表中相应的分区字段做占位符替换。默认支持第一个字段为 yyyy-mm-dd hh:mm:ss。

  • 如果时间戳应该从单个分区字段 'dt' 提取,可以配置 '$dt'。
  • 如果时间戳应该从多个分区字段中提取,例如 'year'、'month'、'day' 和 'hour',可以配置 '$year-$month-$day $hour:00:00'。
  • 如果时间戳应该从两个分区字段中提取,例如 'dt' 和 'hour' ,可以配置 '$dt $hour:00:00'。

sink.partition-commit.policy.kind

(none)

String

用于提交分区的策略。取值如下:

  • success-file:当分区关闭时将在分区对应的目录下生成一个 _success 的文件。
  • custom:用户自定义分区提交策略。

sink.partition-commit.policy.class

(none)

String

分区提交类。
这个类必须实现 PartitionCommitPolicy。

示例代码

  • 结果表

    CREATE TABLE datagen_source (
        order_id bigint,
        order_product_id bigint,
        order_customer_id bigint,
        order_status varchar
      )
    WITH
      (
        'connector' = 'datagen',
        'rows-per-second' = '1',
        'fields.order_status.length' = '3',
        'fields.order_id.min' = '1',
        'fields.order_id.max' = '1000',
        'fields.order_product_id.min' = '1',
        'fields.order_product_id.max' = '100',
        'fields.order_customer_id.min' = '1',
        'fields.order_customer_id.max' = '1000'
      );
    
    
    CREATE TABLE file_sink (
        order_id bigint,
        order_product_id bigint,
        order_customer_id bigint,
        order_status varchar
      )
    WITH
      (
        'connector' = 'filesystem',
        'path' = 'tos://doc-test/test_write_tos/',
        'format' = 'json'
      );
    
    
    INSERT INTO file_sink
    SELECT * FROM datagen_source;
    
  • 源表

    CREATE TABLE file_source(
      order_id bigint, 
      order_product_id bigint,
      order_customer_id bigint,
      order_status varchar
    ) WITH (
        'connector' = 'filesystem',
        'path' = 'tos://doc-test/test_write_tos/',
        'format' = 'json'
    );
    create table print_sink (
      order_id bigint, 
      order_product_id bigint,
      order_customer_id bigint,
      order_status varchar
      )
    with
      ('connector' = 'print');
    
    
    insert into print_sink
    SELECT * FROM file_source;