Flink 是一个兼容 Apache Flink 的全托管流式计算平台,支持对海量实时数据的高效处理。LAS 是湖仓一体架构的 Serverless 数据平台,提供海量数据存储、管理、计算和交互分析功能。本文通过一个示例场景模拟 Flink 与 LAS 的联动,从而体验跨源查询分析、元数据自动发现等能力。
本文模拟场景主要实现:读取消息队列 Kafka 数据写入对象存储 TOS,并映射为湖仓一体分析服务 LAS 外表进行数据分析。
在 Flink 控制台通过开发 Flink SQL 任务,实现 Datagen -> Kafka -> TOS 的数据流转链路,然后在 LAS 控制台创建外表,从 TOS 数据源读取文件并映射到新建的外表中。
bucket/库/表/文件
或者bucket/库/表/分区/文件
。tos://doc_bucket/las_db/las_table_1
。在项目左侧导航栏选择任务开发 > Jupyter lab,然后单击加号按钮创建任务,也可以单击 Launcher 页签下的 Flink Stream SQL 区块。
在创建任务对话框,设置任务名称、类型、文件夹和引擎版本,然后单击确定。
配置 | 说明 |
---|---|
任务名称 | 自定义设置任务的名称,如“datagen-kafka-tos”。 |
任务类型 | 选择 Flink 任务 > Flink Stream > SQL。 |
所属文件夹 | 系统提供文件夹管理功能,用于分类管理任务。 |
引擎版本 | 按需选择引擎版本,本文选择引擎版本为 Flink 1.16-volcano 版本。 |
任务描述 | 输入任务的描述语句,一般描述任务实现的功能。 |
在任务编辑区编写 SQL 任务的业务逻辑代码。
示例代码含义为:将 Datagen 连接器实时生成的随机数写入 Kafka Topic 中;然后读取 Kafka Topic 数据并输出到 TOS Bucket。
注意
Table:xxx should not be both source and sink.
的报错信息。create table orders ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar, order_update_time as localtimestamp ) WITH ( 'connector' = 'datagen', 'rows-per-second'='1', 'fields.order_status.length' = '3', 'fields.order_id.min' = '1', 'fields.order_id.max' = '10000', 'fields.order_product_id.min' = '1', 'fields.order_product_id.max' = '100', 'fields.order_customer_id.min' = '1', 'fields.order_customer_id.max' = '1000' ); create table kafka_table ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar, order_update_time timestamp ) WITH ( 'connector' = 'kafka', --安全协议设置为SASL_PLAINTEXT。 'properties.security.protocol' = 'SASL_PLAINTEXT', --SASL 机制为 PLAIN。 'properties.sasl.mechanism' = 'PLAIN', -- 配置JAAS。 'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="doc-user" password="qaP***6";', --Kafka实例的SASL_PLAINTEXT接入点。 'properties.bootstrap.servers' = 'kafka-***hd8md.kafka.ivolces.com:9093', --Group和Topic。 'topic' = 'topic-b', 'properties.group.id' = 'group-b', --读取数据的启动模式,“earliest-offset”表示从最早分区开始读取。 'scan.startup.mode' = 'earliest-offset', --定期扫描并发现新的Topic和Partition的时间间隔。 'scan.topic-partition-discovery.interval' = '120s', 'format' = 'json', --关闭幂等性。 'properties.enable.idempotence' = 'false' ); insert into kafka_table select * from orders; CREATE TABLE tos_sink ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar, order_update_time timestamp, dt STRING, `hour` STRING ) PARTITIONED BY (dt, `hour`) WITH ( 'connector' = 'filesystem', --访问 TOS 资源时使用 filesystem 连接器。 'path' = 'tos://wuch-doc-tos/las_db/las_table_1', --tos 路径,由 Bucket 名称和文件夹名称组成,文件夹名称为 LAS DB 和 Table 名称。 'sink.rolling-policy.file-size' = '1M', --文件内存最大限制,达到该值关闭文件并新打开一个文件写入。 'sink.rolling-policy.rollover-interval' = '5 min', --文件持续写入时间,达到该值关闭文件并打开一个新文件写入。 'format' = 'parquet' ); insert into tos_sink select order_id, order_product_id, order_customer_id, order_status, order_update_time, DATE_FORMAT (order_update_time, 'yyyy-MM-dd') as dt, DATE_FORMAT (order_update_time, 'HH') as `hour` from kafka_table;
在代码编辑区上方,单击验证按钮。
系统会自动校验您的 SQL 语句正确性,如果报错,请根据提示自主完成 SQL 语句修改。检验通过后,系统提示success
。
设置执行方式和引擎版本,然后单击上线。
本文场景中执行方式设置为 STREAMING,引擎版本设置为 Flink 1.16-volcano。
在任务上线设置对话框,选择运行资源池、设置任务优先级和调度策略,然后单击确定。
配置 | 说明 |
---|---|
运行资源池 | 从下拉列表中选择任务运行的 Flink 资源池。 |
任务优先级 | 系统默认预置的优先级为 L3,您可以按需设置任务优先级,数字越小优先级越高。 |
调度策略 | 根据需求配置任务调度策略:
|
调度时长 | 设置为 GANG 调度策略时,需要设置调度时长。 |
在 Flink 控制台通过开发 Flink SQL 任务,实现 Datagen -> Kafka -> TOS 的数据流转链路。您可以通过以下三种方式验证任务结果:
Flink SQL 任务正常运行后,您可以进入 Flink UI 页面,查看任务运行情况。
Flink SQL 任务正常运行后,您可以在 Kafka 控制台查看目标 Topic 中的数据。
Flink SQL 任务正常运行后,您可以在 TOS Bucket 目标路径下查看文件。
当 Flink SQL 任务正常运行后,您可以在 LAS 控制台创建 TOS 外表,然后便可以从 TOS 读取文件数据并映射到 LAS 外表。
创建 LAS 数据库。
创建 LAS 外表。
CREATE EXTERNAL TABLE IF NOT EXISTS `las_db`.`las_table_1` ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_status string, order_update_time timestamp ) PARTITIONED BY (dt STRING, hour STRING) STORED AS PARQUET location 'tos://wuch-doc-tos/las_db/las_table_1'
刷新外表分区。
由于创建的 TOS 外表是分区表,需要手动刷新分区以加载分区信息。请先选择 Schema,然后单击表后方的刷新分区。
刷新分区需要一定时长,请耐心等待。
预览外表数据。
分区刷新成功后,您可以在表格的数据预览页签下查看数据。LAS 外部有数据后,您便可以按需进行查询分析。
(可选)配置元数据发现任务。