Flink 支持开发 Python 类型任务。您可以自行编写 Python 程序,并将 Python 文件上传到资源库,即可在平台上开发 Python 任务。
本文为您介绍流式类型 Python 任务的开发流程。如需了解 Batch Python 任务,请参见 开发 Flink Batch Python 任务。
前提条件
- 项目管理员(Project_Admin)已经在项目内创建好 Flink 资源池,请参见创建资源池。
- 开发人员需提前编写 Python 程序,并将 Python 文件上传到资源库。如何上传文件,请参见上传资源文件。可以参考如下 Python 代码内容:
注意:以下代码文件名不能为 pyflink 等和 flink python 库冲突的名字,否则会造成任务失败等原因
from pyflink.datastream import StreamExecutionEnvironment
def main():
# 创建执行环境
env = StreamExecutionEnvironment.get_execution_environment()
# 创建一个数据流,元素为 1, 2, 3, 4, 5
data_stream = env.from_collection([1, 2, 3, 4, 5])
# 对数据流中的每个元素进行加 1 操作
result_stream = data_stream.map(lambda x: x + 1)
# 打印结果
result_stream.print()
# 执行程序
env.execute("Simple PyFlink Example")
if __name__ == "__main__":
main()
功能限制
目前仅 Flink 1.17-volcano 版本支持 Flink Python 任务。
步骤一:创建 Flink Python 任务
- 登录流式计算 Flink 版控制台。
- 在顶部菜单栏选择目标地域。
- 在左侧导航栏选择项目管理,在搜索框中根据项目名称进行模糊搜索,然后单击项目区块进入项目。
- 在项目左侧导航栏选择作业开发 > 作业开发。
- 在作业开发页面单击加号按钮,创建任务。
您也可以选择目标文件夹,直接在该文件夹中创建任务;也可以直接单击引导页面下的 Flink Python 作业。 - 在创建作业对话框,设置作业名称、存储位置、引擎版本等关键参数,然后单击确定。
配置 | 说明 |
---|
作业名称 | 自定义设置任务的名称。
名称的字符长度限制在 1~48,支持数字、大小写英文字母、下划线(_)、短横线(-)和英文句号(.),且首尾只能是数字或字母。 |
作业类型 | 选择 作业类型 > Flink Python 。 |
存储位置 | 从下拉列表中选择目标文件夹。
系统默认存在一个数据开发文件夹,但为了更方便的管理任务,您可以自由创建文件夹。如何创建任务文件夹,请参见管理任务文件夹。 |
引擎版本 | 仅 Flink 1.17-volcano 版本支持 Flink Python 任务。 |
- 在任务配置区域,设置任务关键参数。
配置 | 说明 |
---|
任务名称 | 创建作业时设置的名称。 |
Python File URI | 从下拉列表中选择已上传的 Python 文件。如果您还没有上传文件,请参见资源文件管理。 |
Entry Module | 程序的入口类。 - 如果 Python 作业文件为
.py 文件,则该项不需要填写。 - 如果 Python 作业文件为
.zip 文件,则需要在此处输入您的 Entry Module,例如 kafka_test。
|
Entry Point Main Arguments | 业务程序 main 函数的args 参数,非必填项。请根据界面提示填写。 |
Python Libraries | 第三方 Python 包。
第三方 Python 包会被添加到 Python worker 进程的 PYTHONPATH 中,从而在 Python 自定义函数中可以直接访问。 |
- 任务开发和配置完成后,单击保存。
步骤二:上线任务
开发与生产隔离,当任务开发者完成任务开发后,可以将任务上线到生产环境。
- 在作业开发栏目下查找并单击目标任务,单击上线。
- 在任务上线设置对话框,选择运行资源池、设置任务优先级和调度策略,然后单击确定。
系统会提示任务上线成功,可以前往任务管理页面查看。
配置 | 说明 |
---|
运行资源池 | 从下拉列表中选择任务运行的 Flink 资源池。 |
任务优先级 | 系统默认预置的优先级为 L3,您可以按需设置任务优先级,数字越小优先级越高。
任务优先级决定了任务内部的调度顺序,优先级高的任务先被调度,即 L3 先于 L4 被调度。 |
调度策略 | 根据需求配置任务调度策略: - GANG:保证任务的所有实例被一起调度,即当剩余资源满足任务正常运行所需资源时才进行分配;不满足所需资源则不分配。
该策略不会出现分配资源后,任务却不能启动的现象,解决了资源死锁问题。 - DRF:从多维资源考虑,更为合理地将资源公平分配给资源池内的各个任务,从而提升利用率。
例如:剩余10 核 40 GB 的资源,A 任务需要10 核 20 GB 资源;B 任务需要 2 核 8 GB 的资源。如果分配给 A,剩余 0 核 20 GB 资源无法被利用;DRF 策略会选择分配给 B,剩下 8 核 32 GB 可以继续给后来任务使用。
|
调度时长 | 设置为 GANG 调度策略时,需要设置调度时长。调度时长表示再次调度的时间间隔,即任务拉起不成功会再次重试调度。
如果超过调度时长,任务就会调度失败。如果设置为 0,则会一直重试。 |
步骤三:启动任务
任务开发者将任务上线到生产环境后,由运维人员启动任务。
在项目左侧导航栏选择任务运维 > 任务管理。
在任务列表页面,选择额目标任务,单击操作列中的启动。
在启动任务对话框,选择任务启动方式,然后单击确定。
配置 | 说明 |
---|
启动方式 | 请根据实际情况选择任务启动方式: - 从最新状态启动:以最新的 Checkpoint 或 Savepoint 启动。
- 全新启动:不使用 Checkpoint 或 Savepoint,直接启动。
- 指定快照启动:指定目标快照(Savepoint)启动。
|
参数配置 | 任务携带在开发侧的并行度、TaskManager 和 JobManager 的资源配置。在启动任务时支持您更新配置并快速生效。 说明 更新参数配置并启动任务后,将新增一个任务版本,并将最新配置同步到任务开发侧。 - 并行度:任务全局并发数。
- 单个 TaskManager CPU 数:单个 TaskManager 的 CPU 核数。
- 单个 TaskManager 内存大小:单个 TaskManager 占用的内存大小。
- 单个 TaskManager slot 数:单个 TaskManager 的 Slot 数量。
- JobManager CPU 数:单个 JobManager 的 CPU 核数。
- JobManager 内存大小:单个 JobManager 占用的内存大小。
|
更多设置 | 在任务开发变更时新增或修改算子,可能会导致任务无法从快照恢复,此时您可以选择启用允许忽略部分算子状态功能,保证任务能正常运行。
注意 - 仅当选择指定快照启动或从最新状态启动方式时,支持勾选忽略部分算子状态。
- 当您选择全新启动方式时,不支持忽略算子状态。
|
步骤四:查看 Flink UI
任务在生产环境上正常运行后,您可以在 Flink UI 界面了解任务运行、TaskManager、JobManager 的详细信息。
- 在项目左侧导航栏选择任务运维 > 任务管理。
- 在任务列表页面筛选目标任务,单击操作列下的 Flink UI。
浏览器将会自动打开 Apache Flink Dashboard 页面。 - 在 Apache Flink Dashboard 页面,查看任务运行详情。