You need to enable JavaScript to run this app.
导航
TOS
最近更新时间:2025.03.28 15:32:06首次发布时间:2025.03.28 15:32:06
我的收藏
有用
有用
无用
无用

火山引擎对象存储 TOS(Torch Object Storage)是火山引擎提供的海量、安全、低成本、易用、高可靠、高可用的分布式云存储服务。通过 Flink 内置的 Filesytem Connector,您可以轻松访问和管理火山引擎 TOS 上的数据。

DDL 定义

用作数据源(Source)

将 TOS 文件/目录作为数据源非常简单,只要用如下建表语句,指定 filesystem connector,以及 tos://开头的文件路径,以及文件格式即可:

CREATE TABLE filesystem_source (
    name String,
    score INT
 ) WITH (
     'connector' = 'filesystem',
     'path' = 'tos://<bucket>/<path>',
     'format' = 'json'
 );

用作数据目的(Sink)

将 TOS 文件/目录作为数据下游,用如下建表语句,指定 filesystem connector,以及 tos://开头的文件路径、文件格式,以及文件生成规则:

CREATE TABLE filesystem_sink (
    name String,
    score INT
 ) WITH (
     'connector' = 'filesystem',
     'path' = 'tos://<bucket>/<path>',
     'sink.rolling-policy.file-size' = '1M',
     'sink.rolling-policy.rollover-interval' = '10 min',
     'format' = 'json' 
 );

WITH 参数

参数

是否必选

默认值

数据类型

描述

connector

(none)

String

指定使用的连接器,此处是 Filesystem 连接器。

path

(none)

String

文件路径。
在使用对象存储 TOS 时,路径格式为tos://bucket_name/file_path/

sink.rolling-policy.file-size

128MB

MemorySize

文件内存最大限制。
当前写入的文件大小达到阈值时,写入的文件将被关闭,并打开一个新的文件进行写入。

注意

写入文件时,如果需要尽快在文件系统中看到写入的文件,需要增加部分配置。您可以设置连接器的 sink.rolling-policy.file-sizesink.rolling-policy.rollover-interval 参数,以及在 Flink 参数中开启 Checkpoint。
如需详细了解 Filesystem 连接器的滚动策略,请参见开源文档Filesystem-Rolling Policy

sink.rolling-policy.rollover-interval

30min

Duration

文件最大持续写入时间。
当前写入的时间超过了阈值时,写入的文件将被关闭,并打开一个新的文件进行写入。

注意

写入文件时,如果需要尽快在文件系统中看到写入的文件,需要增加部分配置。您可以设置连接器的 sink.rolling-policy.file-sizesink.rolling-policy.rollover-interval 参数,以及在 Flink 参数中开启 Checkpoint。
如需详细了解 Filesystem 连接器的滚动策略,请参见开源文档Filesystem-Rolling Policy

sink.rolling-policy.check-interval

1min

Duration

文件检查间隔。
FileSystem 按照这个间隔检查文件的写入时间是否已经满足了关闭条件,并将满足条件的文件进行关闭。

sink.partition-commit.trigger

process-time

String

分区关闭策略。取值如下:

  • process-time:当分区创建超过一定时间后将这个分区关闭,分区创建时间为分区创建时的物理时间。
  • partition-time:当分区创建超过一定时间之后将这个分区关闭,分区创建时间从分区中抽取出来。
    partition-time 依赖于 watermark 生成,需要配合 wartermark 才能支持自动分区发现。当 watermark 时间超过了从分区抽取的时间delay 参数配置时间之和后会提交分区。

sink.partition-commit.delay

0s

Duration

分区关闭延迟。
当分区在创建超过一定时间之后将被关闭。

partition.time-extractor.kind

default

String

分区时间抽取方式。
这个配置仅当 sink.partition-commit.trigger 配置为 partition-time 时生效。如果用户有自定义的分区时间抽取方法,配置为 custom

partition.time-extractor.class

(none)

String

分区时间抽取类。

partition.time-extractor.timestamp-pattern

(none)

String

分区时间戳的抽取格式。需要写成 yyyy-MM-dd HH:mm:ss 的形式,并用 Hive 表中相应的分区字段做占位符替换。默认支持第一个字段为 yyyy-mm-dd hh:mm:ss。

  • 如果时间戳应该从单个分区字段 'dt' 提取,可以配置 '$dt'。
  • 如果时间戳应该从多个分区字段中提取,例如 'year'、'month'、'day' 和 'hour',可以配置 '$year-$month-$day $hour:00:00'。
  • 如果时间戳应该从两个分区字段中提取,例如 'dt' 和 'hour' ,可以配置 '$dt $hour:00:00'。

sink.partition-commit.policy.kind

(none)

String

用于提交分区的策略。取值如下:

  • success-file:当分区关闭时将在分区对应的目录下生成一个 _success 的文件。
  • custom:用户自定义分区提交策略。

sink.partition-commit.policy.class

(none)

String

分区提交类。
这个类必须实现 PartitionCommitPolicy。

format

(none)

String

必须要指定文件格式。当前支持文件格式可以参考 概览

示例代码

  • 结果表
CREATE TABLE datagen_source (
    order_id bigint,
    order_product_id bigint,
    order_customer_id bigint,
    order_status varchar
  )
WITH
  (
    'connector' = 'datagen',
    'rows-per-second' = '1',
    'fields.order_status.length' = '3',
    'fields.order_id.min' = '1',
    'fields.order_id.max' = '1000',
    'fields.order_product_id.min' = '1',
    'fields.order_product_id.max' = '100',
    'fields.order_customer_id.min' = '1',
    'fields.order_customer_id.max' = '1000'
  );

CREATE TABLE file_sink (
    order_id bigint,
    order_product_id bigint,
    order_customer_id bigint,
    order_status varchar
  )
WITH
  (
    'connector' = 'filesystem',
    'path' = 'tos://doc-test/test_write_tos/',
    'format' = 'json'
  );

INSERT INTO file_sink
SELECT * FROM datagen_source;
  • 源表
CREATE TABLE file_source(
  order_id bigint, 
  order_product_id bigint,
  order_customer_id bigint,
  order_status varchar
) WITH (
    'connector' = 'filesystem',
    'path' = 'tos://doc-test/test_write_tos/',
    'format' = 'json'
);

create table print_sink (
  order_id bigint, 
  order_product_id bigint,
  order_customer_id bigint,
  order_status varchar
  )
with
  ('connector' = 'print');

insert into print_sink
SELECT * FROM file_source;