Flink 是一个兼容 Apache Flink 的全托管流式计算平台,支持对海量实时数据的高效处理。而 火山引擎对象存储 TOS(Torch Object Storage)是火山引擎提供的海量、安全、低成本、易用、高可靠、高可用的分布式云存储服务。通过 Flink 内置的 Filesytem Connector,您可以轻松访问和管理火山引擎 TOS 上的数据。
本文模拟场景主要实现:读取消息队列 Kafka 数据写入对象存储 TOS。
在 Flink 控制台通过开发 Flink SQL 任务,实现 Datagen -> Kafka -> TOS 的数据流转链路。
| | | \ |**配置** |**说明** | |---|---| | | | \ |任务名称 |自定义设置任务的名称,如“datagen-kafka-tos”。 |\ | |名称的字符长度限制在 1~48,支持数字、大小写英文字母、下划线(_)、短横线(-)和英文句号(.),且首尾只能是数字或字母。 | | | | \ |任务类型 |选择 作业类型 > Flink SQL > 流式。 | | | | \ |所属文件夹 |系统提供文件夹管理功能,用于分类管理任务。 |\ | |您可以直接选择系统默认存在的**数据开发文件夹**,也可以使用自定义创建的文件夹。 | | | | \ |引擎版本 |按需选择引擎版本,本文选择引擎版本为 **Flink 1.16-volcano** 版本。 | | | | \ |任务描述 |输入任务的描述语句,一般描述任务实现的功能。 |
在任务编辑区编写 SQL 任务的业务逻辑代码。
示例代码含义为:将 Datagen 连接器实时生成的随机数写入 Kafka Topic 中;然后读取 Kafka Topic 数据并输出到 TOS Bucket。
注意
Table:xxx should not be both source and sink.
的报错信息。create table orders (
order_id bigint,
order_product_id bigint,
order_customer_id bigint,
order_status varchar,
order_update_time as localtimestamp
) WITH (
'connector' = 'datagen',
'rows-per-second'='1',
'fields.order_status.length' = '3',
'fields.order_id.min' = '1',
'fields.order_id.max' = '10000',
'fields.order_product_id.min' = '1',
'fields.order_product_id.max' = '100',
'fields.order_customer_id.min' = '1',
'fields.order_customer_id.max' = '1000'
);
create table kafka_table (
order_id bigint,
order_product_id bigint,
order_customer_id bigint,
order_status varchar,
order_update_time timestamp
) WITH (
'connector' = 'kafka',
--安全协议设置为SASL_PLAINTEXT。
'properties.security.protocol' = 'SASL_PLAINTEXT',
--SASL 机制为 PLAIN。
'properties.sasl.mechanism' = 'PLAIN',
-- 配置JAAS。
'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="doc-user" password="qaP***6";',
--Kafka实例的SASL_PLAINTEXT接入点。
'properties.bootstrap.servers' = 'kafka-***hd8md.kafka.ivolces.com:9093',
--Group和Topic。
'topic' = 'topic-b',
'properties.group.id' = 'group-b',
--读取数据的启动模式,“earliest-offset”表示从最早分区开始读取。
'scan.startup.mode' = 'earliest-offset',
--定期扫描并发现新的Topic和Partition的时间间隔。
'scan.topic-partition-discovery.interval' = '120s',
'format' = 'json',
--关闭幂等性。
'properties.enable.idempotence' = 'false'
);
insert into kafka_table
select * from orders;
CREATE TABLE tos_sink (
order_id bigint,
order_product_id bigint,
order_customer_id bigint,
order_status varchar,
order_update_time timestamp,
dt STRING,
`hour` STRING
) PARTITIONED BY (dt, `hour`)
WITH (
'connector' = 'filesystem', --访问 TOS 资源时使用 filesystem 连接器。
'path' = 'tos://wuch-doc-tos/db/table_1', --tos 路径,由 Bucket 名称和文件夹名称组成,文件夹名称建议为 DB 和 Table 名称。
'sink.rolling-policy.file-size' = '1M', --文件内存最大限制,达到该值关闭文件并新打开一个文件写入。
'sink.rolling-policy.rollover-interval' = '5 min', --文件持续写入时间,达到该值关闭文件并打开一个新文件写入。
'format' = 'parquet'
);
insert into tos_sink
select
order_id,
order_product_id,
order_customer_id,
order_status,
order_update_time,
DATE_FORMAT (order_update_time, 'yyyy-MM-dd') as dt,
DATE_FORMAT (order_update_time, 'HH') as `hour`
from kafka_table;
SQL
在代码编辑区上方,单击验证按钮。
系统会自动校验您的 SQL 语句正确性,如果报错,请根据提示自主完成 SQL 语句修改。检验通过后,系统提示success
。
启用 Checkpoint。
在代码编辑区上方,单击参数配置,然后开启 Checkpoint。
上线任务。
设置执行方式和引擎版本,然后单击上线。
本文场景中执行方式设置为 STREAMING,引擎版本设置为 Flink 1.16-volcano。
在任务上线设置对话框,选择运行资源池、设置任务优先级和调度策略,然后单击确定。
配置 | 说明 |
---|---|
运行资源池 | 从下拉列表中选择任务运行的 Flink 资源池。 |
任务优先级 | 系统默认预置的优先级为 L3,您可以按需设置任务优先级,数字越小优先级越高。 |
调度策略 | 根据需求配置任务调度策略:
|
调度时长 | 设置为 GANG 调度策略时,需要设置调度时长。 |
启动任务。
在 Flink 控制台通过开发 Flink SQL 任务,实现 Datagen -> Kafka -> TOS 的数据流转链路。您可以通过以下三种方式验证任务结果:
Flink SQL 任务正常运行后,您可以进入 Flink UI 页面,查看任务运行情况。
Flink SQL 任务正常运行后,您可以在 Kafka 控制台查看目标 Topic 中的数据。
Flink SQL 任务正常运行后,您可以在 TOS Bucket 目标路径下查看文件。