You need to enable JavaScript to run this app.
导航
开发 Flink Python 任务
最近更新时间:2025.01.14 17:41:38首次发布时间:2024.04.01 15:26:08

Flink 支持开发 Python 类型任务。您可以自行编写 Python 程序,并将 Python 文件上传到资源库,即可在平台上开发 Python 任务。
本文为您介绍流式类型 Python 任务的开发流程。如需了解 Batch Python 任务,请参见 开发 Flink Batch Python 任务

前提条件

  • 项目管理员(Project_Admin)已经在项目内创建好 Flink 资源池,请参见创建资源池
  • 开发人员需提前编写 Python 程序,并将 Python 文件上传到资源库。如何上传文件,请参见上传资源文件。可以参考如下 Python 代码内容:

注意:以下代码文件名不能为 pyflink 等和 flink python 库冲突的名字,否则会造成任务失败等原因

from pyflink.datastream import StreamExecutionEnvironment

def main():
    # 创建执行环境
    env = StreamExecutionEnvironment.get_execution_environment()
    # 创建一个数据流,元素为 1, 2, 3, 4, 5
    data_stream = env.from_collection([1, 2, 3, 4, 5])
    # 对数据流中的每个元素进行加 1 操作
    result_stream = data_stream.map(lambda x: x + 1)
    # 打印结果
    result_stream.print()
    # 执行程序
    env.execute("Simple PyFlink Example")

if __name__ == "__main__":
    main()

功能限制

目前仅 Flink 1.17-volcano 版本支持 Flink Python 任务。

  1. 登录流式计算 Flink 版控制台
  2. 在顶部菜单栏选择目标地域。
  3. 在左侧导航栏选择项目管理,在搜索框中根据项目名称进行模糊搜索,然后单击项目区块进入项目。
  4. 在项目左侧导航栏选择作业开发 > 作业开发
  5. 作业开发页面单击加号按钮,创建任务。
    您也可以选择目标文件夹,直接在该文件夹中创建任务;也可以直接单击引导页面下的 Flink Python 作业
  6. 创建作业对话框,设置作业名称、存储位置、引擎版本等关键参数,然后单击确定

Image

配置

说明

作业名称

自定义设置任务的名称。
名称的字符长度限制在 1~48,支持数字、大小写英文字母、下划线(_)、短横线(-)和英文句号(.),且首尾只能是数字或字母。

作业类型

选择 作业类型 > Flink Python

存储位置

从下拉列表中选择目标文件夹。
系统默认存在一个数据开发文件夹,但为了更方便的管理任务,您可以自由创建文件夹。如何创建任务文件夹,请参见管理任务文件夹

引擎版本

仅 Flink 1.17-volcano 版本支持 Flink Python 任务。

  1. 在任务配置区域,设置任务关键参数。

Image

配置

说明

任务名称

创建作业时设置的名称。

Python File URI

从下拉列表中选择已上传的 Python 文件。如果您还没有上传文件,请参见资源文件管理

Entry Module

程序的入口类。

  • 如果 Python 作业文件为.py文件,则该项不需要填写。
  • 如果 Python 作业文件为.zip文件,则需要在此处输入您的 Entry Module,例如 kafka_test。

Entry Point Main Arguments

业务程序 main 函数的args参数,非必填项。请根据界面提示填写。

Python Libraries

第三方 Python 包。
第三方 Python 包会被添加到 Python worker 进程的 PYTHONPATH 中,从而在 Python 自定义函数中可以直接访问。

  1. 任务开发和配置完成后,单击保存

步骤二:上线任务

开发与生产隔离,当任务开发者完成任务开发后,可以将任务上线到生产环境。

  1. 作业开发栏目下查找并单击目标任务,单击上线
  2. 任务上线设置对话框,选择运行资源池、设置任务优先级调度策略,然后单击确定
    系统会提示任务上线成功,可以前往任务管理页面查看。

Image

配置

说明

运行资源池

从下拉列表中选择任务运行的 Flink 资源池。

任务优先级

系统默认预置的优先级为 L3,您可以按需设置任务优先级,数字越小优先级越高。
任务优先级决定了任务内部的调度顺序,优先级高的任务先被调度,即 L3 先于 L4 被调度。

调度策略

根据需求配置任务调度策略:

  • GANG:保证任务的所有实例被一起调度,即当剩余资源满足任务正常运行所需资源时才进行分配;不满足所需资源则不分配。
    该策略不会出现分配资源后,任务却不能启动的现象,解决了资源死锁问题。
  • DRF:从多维资源考虑,更为合理地将资源公平分配给资源池内的各个任务,从而提升利用率。
    例如:剩余10 核 40 GB 的资源,A 任务需要10 核 20 GB 资源;B 任务需要 2 核 8 GB 的资源。如果分配给 A,剩余 0 核 20 GB 资源无法被利用;DRF 策略会选择分配给 B,剩下 8 核 32 GB 可以继续给后来任务使用。

调度时长

设置为 GANG 调度策略时,需要设置调度时长。调度时长表示再次调度的时间间隔,即任务拉起不成功会再次重试调度。
如果超过调度时长,任务就会调度失败。如果设置为 0,则会一直重试。

步骤三:启动任务

任务开发者将任务上线到生产环境后,由运维人员启动任务。

  1. 在项目左侧导航栏选择任务运维 > 任务管理

  2. 任务列表页面,选择额目标任务,单击操作列中的启动

  3. 启动任务对话框,选择任务启动方式,然后单击确定
    Image

    配置

    说明

    启动方式

    请根据实际情况选择任务启动方式:

    • 从最新状态启动:以最新的 Checkpoint 或 Savepoint 启动。
    • 全新启动:不使用 Checkpoint 或 Savepoint,直接启动。
    • 指定快照启动:指定目标快照(Savepoint)启动。

    说明

    首次上线的任务,只能是全新启动方式。

    参数配置

    任务携带在开发侧的并行度、TaskManager 和 JobManager 的资源配置。在启动任务时支持您更新配置并快速生效。

    说明

    更新参数配置并启动任务后,将新增一个任务版本,并将最新配置同步到任务开发侧。

    • 并行度:任务全局并发数。
    • 单个 TaskManager CPU 数:单个 TaskManager 的 CPU 核数。
    • 单个 TaskManager 内存大小:单个 TaskManager 占用的内存大小。
    • 单个 TaskManager slot 数:单个 TaskManager 的 Slot 数量。
    • JobManager CPU 数:单个 JobManager 的 CPU 核数。
    • JobManager 内存大小:单个 JobManager 占用的内存大小。

    更多设置

    在任务开发变更时新增或修改算子,可能会导致任务无法从快照恢复,此时您可以选择启用允许忽略部分算子状态功能,保证任务能正常运行。
    Image

    注意

    • 仅当选择指定快照启动从最新状态启动方式时,支持勾选忽略部分算子状态。
    • 当您选择全新启动方式时,不支持忽略算子状态。

任务在生产环境上正常运行后,您可以在 Flink UI 界面了解任务运行、TaskManager、JobManager 的详细信息。

  1. 在项目左侧导航栏选择任务运维 > 任务管理
  2. 任务列表页面筛选目标任务,单击操作列下的 Flink UI
    浏览器将会自动打开 Apache Flink Dashboard 页面。
  3. 在 Apache Flink Dashboard 页面,查看任务运行详情。
    Image