流式 EMR SQL 底层的计算引擎为 Apache Flink,其符合标准 SQL 语义,降低了用户进行实时开发的门槛,支持在线创建、解析和运维流式任务。
本文将为您介绍 EMR Flink SQL 任务的相关使用。
DataLeap产品需开通数据开发特惠版、DataOps敏捷研发、大数据分析或分布式数据自治服务后,才可创建火山引擎 E-MapReduce(EMR)流式数据开发任务。
EMR 引擎绑定的集群类型、版本及依赖的服务,需满足以下条件之一,方可创建 EMR Flink SQL 任务:
支持集群版本 | 支持集群类型 | 依赖集群服务 |
---|---|---|
EMR-1.3.1 | Hadoop | Flink |
Flink | Flink | |
EMR-3.2.1及以上 | Hadoop | Flink 和 GTS |
EMR Flink SQL 目前仅支持原生Connector:kafka、datagen、print。
DataLeap 项目控制台首次绑定 EMR 集群时,会提示在 EMR 集群关联的安全组中添加 8898 和 9030 端口,您单击确定按钮即可实现自动添加。添加后,为确保能在 DataLeap 上正常进行数据开发和执行任务,需保证相关端口一直存在于安全组中,不要删除。
详见创建项目。
选择流式数据 > EMR Flink SQL 任务类型新建任务。
新建任务成功后,进入代码开发编辑界面,通过 DDL 和 DML 编写 SQL,下图以DataGen生成数据写入Kafka为例进行说明,详细语法可参考对应版本 Flink 官方文档。
CREATE TABLE datagen_source (f_sequence INT) WITH ('connector' = 'datagen', 'rows-per-second' = '60'); CREATE TABLE kafka_sink (f_sequence INT, f_ts BIGINT) WITH ( 'connector' = 'kafka', 'topic' = '{{kafka_topic}}', 'properties.bootstrap.servers' = '{{bootstrap.servers}}', 'format' = 'csv', 'scan.startup.mode' = 'earliest-offset' ); INSERT INTO kafka_sink SELECT f_sequence, UNIX_TIMESTAMP() AS f_ts FROM datagen_source;
plaintext
注意
任务被模板使用后,格式化、表管理、执行引擎选择功能将不支持使用。
功能名称 | 描述 |
---|---|
格式化 | 依据在个性化设置中的 SQL 格式化风格的设置,格式化书写的代码,使其语法结构看起来简洁明了。 |
解析 | 解析检查书写的 SQL 代码的语法和语义正确性,运行前检查语法错误信息,防止运行出错。 |
任务模板 | 您可以选择是否通过任务模板方式,便捷快速的复用代码模板逻辑,在弹窗中选择 EMR Flink SQL 任务模板,并选择相应的版本号,输入替换的参数即可完成复用。 注意
|
执行引擎 | 下拉选择 EMR 集群中 Flink 组件对应的执行引擎版本。 |
单击右侧导航栏中参数设置,进行任务的基本信息、任务输入参数、资源设置、数据源登记、Flink 运行参数配置。
EMR Flink SQL 任务的基本信息配置如下:
参数名称 | 描述 |
---|---|
任务名称 | 显示创建任务时输入的任务名称,参数设置中不支持修改,可以在左侧任务目录结构中的任务名称右侧更多单击重命名进行修改。 |
任务类型 | EMR Flink SQL |
任务描述 | 非必填,可对任务进行详细描述,方便后续查看和管理。 |
责任人 | 仅限一个成员,默认为任务创建人(任务执行失败、复查通过或者失败时的默认接收者),可根据实际需要,修改为其他项目成员。
|
Yarn 队列 | 支持下拉选择任务运行所需的 Yarn 队列信息。您可在创建项目 > 服务绑定 > EMR Yarn 队列绑定时,可添加绑定多个队列信息。详见创建项目。 说明 EMR Yarn 队列的更多操作详见 YARN 队列管理。 |
优先级 | 您可对流式任务设置任务优先级,指定当前任务的优先级情况:
|
标签 | 您可以自定义标签,用于标识某一类任务,以便快速搜索过滤,操作即时生效,无需重新上线任务。
|
Flink SQL 任务支持通过引入资源 Jar 包,并在 Jar 包中自定义 connector 的方式,进行 Flink SQL 任务的编辑、提交上线执行等操作。
说明
Jar 包中自定义 connector 和内置 connector 重名时,不支持在同一个任务中使用,否则任务执行时会出现冲突异常的情形。
在下拉框中,选择已通过资源库上传的 Jar 包资源,可支持多选。若您还未上传资源,您可单击“新建资源”按钮,前往资源库进行资源创建,操作详见资源库。
设置 EMR Flink SQL 运行时相关资源分配:
参数名称 | 描述 |
---|---|
TaskManager个数 | 设置 flink 作业中 TaskManager 的数量 |
单TaskManagerCPU数 | 设置单个 TaskManager 所占用的 CPU 数量 |
单TaskManager内存大小(MB) | 设置单个 TaskManager 所占用的内存大小 |
单TaskManager slot数 | 设置单个 TaskManager 中 slot 的数量 |
JobManager CPU数 | 设置单个 JobManager 所占用的 CPU 数量 |
JobManager内存 | 设置单个 JobManager 所占用的内存大小 |
您可通过手动添加的方式,选择项目参数或自定义参数,以变量或常量形式传入流式任务中使用,来实现同一套代码在不同执行环境下能够自动替换参数来执行。添加操作详见 3.4.4-任务输入参数。
登记该任务使用的 Source 和 Sink 信息,以用于后续监控配置和血缘构建,支持通过手动添加。
说明
可填写 Flink 相关的动态参数和执行参数,平台已为您提供一些常用的 SQL 参数、State 参数、Runtime 参数等,您可以根据实际情况进行选择,或者自行输入所需参数。更多参数详见 Flink 官方文档。
参数配置操作详见 4.2 Flink 运行参数。
任务所需参数配置完成后,将任务提交发布到运维中心实时任务运维中执行。
单击操作栏中的保存和提交上线按钮,在弹窗中,需先通过任务上线检查和提交上线等上线流程,最后单击确认按钮,完成作业提交。详见概述---流式任务提交发布。
后续任务运维操作详见:实时任务运维。
流式任务启动后,可按照以下步骤,可以查看数据是否已写入目的端,以kafka为例,详细介绍请参考:消息查询。