火山引擎对象存储 TOS(Torch Object Storage)是火山引擎提供的海量、安全、低成本、易用、高可靠、高可用的分布式云存储服务。通过 Flink 内置的 Filesytem Connector,您可以轻松访问和管理火山引擎 TOS 上的数据。
将 TOS 文件/目录作为数据源非常简单,只要用如下建表语句,指定 filesystem connector,以及 tos://
开头的文件路径,以及文件格式即可:
CREATE TABLE filesystem_source ( name String, score INT ) WITH ( 'connector' = 'filesystem', 'path' = 'tos://<bucket>/<path>', 'format' = 'json' );
将 TOS 文件/目录作为数据下游,用如下建表语句,指定 filesystem connector,以及 tos://
开头的文件路径、文件格式,以及文件生成规则:
CREATE TABLE filesystem_sink ( name String, score INT ) WITH ( 'connector' = 'filesystem', 'path' = 'tos://<bucket>/<path>', 'sink.rolling-policy.file-size' = '1M', 'sink.rolling-policy.rollover-interval' = '10 min', 'format' = 'json' );
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
connector | 是 | (none) | String | 指定使用的连接器,此处是 Filesystem 连接器。 |
path | 是 | (none) | String | 文件路径。 |
sink.rolling-policy.file-size | 否 | 128MB | MemorySize | 文件内存最大限制。 注意 写入文件时,如果需要尽快在文件系统中看到写入的文件,需要增加部分配置。您可以设置连接器的 sink.rolling-policy.file-size、sink.rolling-policy.rollover-interval 参数,以及在 Flink 参数中开启 Checkpoint。 |
sink.rolling-policy.rollover-interval | 否 | 30min | Duration | 文件最大持续写入时间。 注意 写入文件时,如果需要尽快在文件系统中看到写入的文件,需要增加部分配置。您可以设置连接器的 sink.rolling-policy.file-size、sink.rolling-policy.rollover-interval 参数,以及在 Flink 参数中开启 Checkpoint。 |
sink.rolling-policy.check-interval | 否 | 1min | Duration | 文件检查间隔。 |
sink.partition-commit.trigger | 否 | process-time | String | 分区关闭策略。取值如下:
|
sink.partition-commit.delay | 否 | 0s | Duration | 分区关闭延迟。 |
partition.time-extractor.kind | 否 | default | String | 分区时间抽取方式。 |
partition.time-extractor.class | 否 | (none) | String | 分区时间抽取类。 |
partition.time-extractor.timestamp-pattern | 否 | (none) | String | 分区时间戳的抽取格式。需要写成 yyyy-MM-dd HH:mm:ss 的形式,并用 Hive 表中相应的分区字段做占位符替换。默认支持第一个字段为 yyyy-mm-dd hh:mm:ss。
|
sink.partition-commit.policy.kind | 是 | (none) | String | 用于提交分区的策略。取值如下:
|
sink.partition-commit.policy.class | 否 | (none) | String | 分区提交类。 |
format | 是 | (none) | String | 必须要指定文件格式。当前支持文件格式可以参考 概览。 |
CREATE TABLE datagen_source ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1', 'fields.order_status.length' = '3', 'fields.order_id.min' = '1', 'fields.order_id.max' = '1000', 'fields.order_product_id.min' = '1', 'fields.order_product_id.max' = '100', 'fields.order_customer_id.min' = '1', 'fields.order_customer_id.max' = '1000' ); CREATE TABLE file_sink ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar ) WITH ( 'connector' = 'filesystem', 'path' = 'tos://doc-test/test_write_tos/', 'format' = 'json' ); INSERT INTO file_sink SELECT * FROM datagen_source;
CREATE TABLE file_source( order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar ) WITH ( 'connector' = 'filesystem', 'path' = 'tos://doc-test/test_write_tos/', 'format' = 'json' ); create table print_sink ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar ) with ('connector' = 'print'); insert into print_sink SELECT * FROM file_source;