DataLeap接入了流式计算 Flink 版,在关联 Flink 的项目和资源池后,可以进行 Flink 作业开发。可以通过 Serverless Pyjstorm Flink 作业实现任务的托管和运维。本文以一个简单的示例,将为您介绍 Serverless Pyjstorm Flink 作业相关的开发流程操作。
DataLeap产品需开通 DataOps敏捷研发、大数据分析、数据开发特惠版或分布式数据自治服务后,才可绑定流式计算 Flink 引擎。绑定引擎操作详见:项目管理。
子账号操作项目绑定 Flink 引擎实例时:
主账号需要先在流式计算 Flink 版控制台导入 IAM 用户。操作详见:用户管理。
并将子账号加入到对应引擎项目中。操作详见:引擎项目成员管理。
目前 Flink 版本只支持 1.11。
登录 DataLeap租户控制台。
在概览界面,显示加入的项目中,单击数据开发进入对应项目。
在任务开发界面,左侧导航栏中,单击新建任务按钮,进入新建任务页面。
选择任务类型:
分类:数据开发。
绑定引擎:流式计算 Flink 版。
关联引擎项目:默认选择引擎绑定时选择的引擎项目,不可更改。
选择任务:流式数据 Serverless Pyjstorm Flink。
填写任务基本信息:
任务名称:输入任务的名称,只能由数字、字母、下划线、-和.组成, 首尾只能是数字、字母,且允许输入 1~63 个字符。
保存至: 选择任务存放的目标文件夹目录。
单击确认按钮,成功创建任务。
选择 SCM 资源类型,可通过以下方式选择资源:
资源库中选取已有资源。
直接在下拉框中新建资源,或在左侧导航栏资源库中新建流式计算 Flink 资源,详见:资源库操作。
注意
SCM 资源类型,需先创建好 SCM 仓库,并获取其对应代码版本号和部署路径。
执行引擎:选择执行引擎 Flink-1.11。
YamlConf配置案例:
详细参数说明参考域内 Flink 文档:PyJstorm 参数配置及含义
environment: SEC_KV_AUTH: '1' PYTHONPATH: /opt/tiger/pyFlink:/opt/tiger:/opt/tiger/pyutil:/opt/tiger/ss_thrift_gen:/opt/tiger/ss_lib topology_standalone_mode: 0 flink_args: containerized.heap-cutoff-ratio: 0.3 blacklist.enabled: true smart-resources.enable: false smart-resources.durtion.minutes: 1440 smart-resources.cpu-reserve-ratio: 0.2 smart-resources.mem-reserve-ratio: 0.2 smart-resources.disable-mem-adjust: true containerized.master.env.SEC_KV_AUTH: 1 containerized.taskmanager.env.SEC_KV_AUTH: 1 docker.image: default docker.namespace: yarn yarn.provided.lib.dirs.enabled: true ipv6.enabled: true yarn.res-lake.enabled: true gts.migration.platform: GTS spout: common_args: consumer_group: pyFlink_hello_word_lq output_fields: - msg script_name: /opt/tiger/pyFlink/pytopology/runner/kafka_spout.py is_thread_num_determined_by_partition: true whence: 1 read_num: 100 spout_list: - spout_name: simple_spout args: kafka_cluster: bmq_test_lq bootstrap_servers: 'kafka-cnngiu8m5tg2otd0.kafka.ivolces.com:9092' auto_partition: true topic_name: pyjstorm-test-1 partition: 3 prefer_service: dcleader tolerate_exception_num: 3 flink_resource_args: tm_slot: 12 tm_cores: 0.5 jm_cores: 1.2 tm_memory: 4096 jm_memory: 4096 tm_num: 2 bolt: bolt_list: - thread_num: 1 bolt_name: split_bolt script_name: /opt/tiger/pyFlink/pytopology/runner/bolt.py output_fields: - word group_list: - group_type: shuffle group_field: null stream_from: - simple_spout args: debug_dumper.enable: true debug_dumper.tcc_service_name: stream.pyFlink.dump debug_dumper.tcc_config_key: wordcount_py.split_bolt debug_dumper.tcc_check_interval_sec: 30 debug_dumper.max_queue_len: 1000 - thread_num: 1 bolt_name: wordcount_bolt script_name: /opt/tiger/pyFlink/pytopology/runner/bolt.py output_fields: null group_list: - group_type: fields group_field: word stream_from: - split_bolt args: debug_dumper.enable: true bootstrap_servers: 'kafka-cnngiu8m5tg2otd0.kafka.ivolces.com:9092' kafka_cluster: bmq_test_lq topic_name: pyjstorm-test-2 topology_name: pyjstorm_11_test topology_standalone_time: 500 common_args: metrics_namespace_prefix: pyflink.wordcount_py log_file: /var/log/tiger/wordcount_py.log log_format: '%(asctime)s %(filename)s %(levelname)s %(message)s'
单击右侧导航栏中参数设置,进行任务的基本信息、资源设置、数据源登记配置。
Serverless Pyjstorm Flink 任务的基本信息配置如下:
参数名称 | 描述 |
---|---|
任务名称 | 显示创建任务时输入的任务名称,参数设置中不支持修改,可以在左侧任务目录结构中的任务名称右侧更多单击重命名进行修改。 |
任务类型 | Serverless Pyjstorm Flink |
引擎类型 | 流式计算 Flink 版 |
关联引擎项目 | DataLeap侧关联的引擎项目名称。 |
任务描述 | 非必填,可对任务进行详细描述,方便后续查看和管理。 |
责任人 | 仅限一个成员,默认为任务创建人(任务执行失败、复查通过或者失败时的默认接收者),可根据实际需要,修改为其他项目成员。
|
计算资源 | 从关联的引擎项目中选择目标资源池。 |
优先级 | 您可对流式任务设置任务优先级,指定当前任务的优先级情况:
|
标签 | 您可以自定义标签,用于标识某一类任务,以便快速搜索过滤,操作即时生效,无需重新上线任务。
|
设置任务运行时相关资源分配情况
参数名称 | 描述 |
---|---|
TaskManager个数 | 设置 flink 作业中 TaskManager 的数量。 |
单TaskManagerCPU数 | 设置单个 TaskManager 所占用的CPU数量。 |
单TaskManager内存大小(MB) | 设置单个 TaskManager 所占用的内存大小。 |
单TaskManager slot数 | 设置单个 TaskManager 中slot的数量。 |
JobManager CPU数 | 设置单个 JobManager 所占用的CPU数量。 |
JobManager内存 | 设置单个 JobManager 所占用的内存大小。 |
登记该任务使用的 Source 和 Sink 信息,以用于后续监控配置和血缘构建,支持通过手动添加。
手动添加:单击手动添加按钮,可根据 SQL 逻辑进行数据源上下游登记。
源/目标:支持源/目标 Source 和 Sink 的选择。
数据源类型:支持选择 DataGen、Rmq、Bmq、JDBC、Abase、火山/自建 Kafka 这些 Connector 的数据源。
各数据源类型需补充填写相应的信息,您可根据实际场景进行配置,如 Kafka 的数据源类型,需填写具体的Bootstrap Servers、Topic 名称、消费组等信息。
直接上游任务:您可通过手动输入任务名称方式,进行搜索上游任务,单击添加按钮,进行手动添加。
直接下游任务:无法通过手动添加的方式进行操作,仅展现通过数据源匹配后的结果。
说明
Abase 和 Rmq 数据源类型,仅支持解析操作,暂不支持上线发布后运行。且 Abase 数据源通常是在火山引擎内部业务上云中使用。
Serverless Pyjstorm Flink 暂不支持自动解析功能来登记数据源。
批量删除:手动添加的数据源,您可通过勾选已登记的数据源后,单击右侧删除或批量删除按钮 ,删除勾选中的数据源。
上下游任务查看:数据源登记解析出的上下游任务,您可对上游依赖任务进行编辑、添加、删除、禁用等操作;下游任务仅支持查看。
任务所需参数配置完成后,将任务提交发布到运维中心实时任务运维中执行。
单击操作栏中的保存和提交上线按钮,在弹窗中,需先通过任务上线检查和提交上线等上线流程,最后单击确认按钮,完成作业提交。详见概述---流式任务提交发布。
后续任务运维操作详见:实时任务运维。