You need to enable JavaScript to run this app.
导航
配置 Flink 参数
最近更新时间:2024.12.05 13:36:57首次发布时间:2023.04.20 17:35:37

在任务开发侧,您可以配置 Flink 相关参数,包括 Task Manager 和 Job Manager 使用资源配置,以及 Checkpoint 、Task 故障重启策略、任务失败重试、依赖文件、变量作用域等配置。

注意事项

  • 对于已上线的任务,如果更新 Flink 参数配置,需要重新上线才能同步更新。
  • 流式任务和批式任务的 Flink 参数配置有所差异,主要体现在 Flink Btach 任务不支持 Checkpoint、不支持自动调优。当您选择任务的执行方式STREAMINGBatch,此时打开参数配置,将会自动根据您选择的执行方式切换适配的参数。
  • 不同引擎版本支持的 Flink 参数配置也有差异,主要体现在 Flink 1.11 支持任务自动调优配置,其他引擎版本不支持。

功能入口

在任务编辑区的右侧,单击参数配置,然后根据任务需求进行 Flink 参数配置。

说明

您也可以直接引用提前创建好的配置模板,快速配置任务的 Flink 参数。相关文档,请参见使用配置模板快速配置 Flink 参数

Image

配置

说明

并行度

任务全局并发数,默认值为 2。

单个 TaskManager CPU 数

单个 TaskManager 的 CPU 核数,默认值为 1,最少支持设置为 0.5。

单个 TaskManager 内存大小

单个 TaskManager 的内存大小,将根据您设置的 CPU 核数自动调整,呈 1C4G 的关系。

单个 TaskManager slot 数

单个 TaskManager 的 Slot 数量,默认值为 2。

JobManager CPU 数

JobManager 的 CPU 核数,默认值为 1,最少支持设置为 0.5。

JobManager 内存大小

JobManager 的内存大小,将根据您设置的 CPU 核数自动调整,呈 1C4G 的关系。

Checkpoint 配置

开启 Checkpoint 之后,Flink 可以将任务的状态信息定期保存到持久化存储中。当任务失败或意外停止时,Flink 可以从最近的一次 Checkpoint 中恢复任务的状态,提高任务的可靠性和容错能力。

说明

只有 Flink STREAM 类型任务支持 Checkpoint 配置,Flink Batch 类型任务不显示 Checkpoint 配置。

配置

说明

启用 Checkpoint

是否启用 Checkpoint,默认开启 Checkpoint。
开启 Checkpoint 之后,Flink 可以将任务的状态信息定期保存到持久化存储中,也支持手动创建快照。相关文档,请参见创建任务 Savepoint

Checkpoint 间隔

系统自动执行 Checkpoint 的时间间隔。

Checkpoint 超时时间

Checkpoint 的超时时间。

Region Checkpoint

在任务并发度高;个别节点失败导致全局的 Checkpoint 失败;网络延迟、写入超时对任务影响明显等情况,可以启用该功能以提升 Checkpoint 成功率。

说明

Region Checkpoint 配置仅支持在 Flink 1.11-volcano 引擎版本中使用。

  • Region Checkpoint 将任务中彼此没有连接的 Task 集合划分为单独的Region,将 Checkpoint 以 Region 为单位进行管理。当出现 Checkpoint 失败时,会找出需要重启的最小 Region 集合,这样保证了需要重启的 Task 数量更少。
  • 当某个 Region 出现 Checkpoint 失败时,将会从上一次成功 Checkpoint 文件中提取对应 Region 的成功的 state 文件,并进行逻辑映射。
    支持设置允许 Region Checkpoint 失败的最大比例。如允许 50%,包含 4 个 Region,那么最多支持两个 Region 失败。
  • 支持设置允许 Region 连续失败次数,以避免存储过多的 Checkpoint 历史数据。如允许 2 次,则表示最多能拉取前面两次成功的 state 文件。

如需了解更加详细的 Region Checkpoint原理,请参见Regional Checkpoint实践

Backend Cache Enable

Flink 平台目前使用 RocksDB 作为 StateBackend。由于使用二进制与 RocksDBStateBackend 进行交互,这就意味着每次 State 访问都需要将数据进行序列化或反序列化,需要消耗大量的 CPU,导致吞吐量低。
您可以开启该功能,并设置最大堆内存,能有效提高任务吞吐量,提高资源利用率。

说明

Backend Cache 配置仅支持在 Flink 1.11-volcano 引擎版本中使用。

  • 通过拆分 Cache,实现缓存策略可插拔,数据存储抽象为单独的 StateStore,以适配不同业务场景;
  • 通过控制 TaskManager 中所有 Cache 内存的最大值,并实现 Cache 内存自动分配和回收,以保证 Cache 使用总和不超过最大值,避免出现内存溢出和泄露;
  • 根据 GC 频率和耗时来衡量内存资源是否紧张,是否需要进行自动扩缩容,以降低 JVM 的 GC 压力;
  • 同步阶段 Flush 脏数据到底层 StateBackend,并根据高低水位线机制划分区间,以保证快照同步阶段 Cache 中的数据就是低于高水位线的,从而降低快照同步时间。

如需了解更加详细的 Backend Cache 信息,请参见StateBackend性能提升
最大堆内存(Backend Cache MaxHeapSize):表示 TaskManager 中所有 Cache 累计允许的内存最大占用量,当超过该限制时,Cache 将无法分配内存。您可通过“backend.cache.initial.size * 单个TM中State总数”计算出最大堆内存大小。由于实行按需分配机制,所以也可以设置一个较大的内存上限。

Task 重启策略配置

用于控制任务中的 Task 在出现失败或异常情况时的重启行为。通过配置 Task 的重启策略,可以指定 Flink 在遇到 Task 失败时采取的处理方式,以确保任务能够稳定可靠地运行。

配置

说明

不重启策略(No Restart Strategy)

选择该策略时,表示任务故障时不重启。

固定时间间隔重启策略(Fixed Delay Restart Strategy)

选择该策略时,表示任务故障在固定的时间间隔内不会重启。

说明

固定时间间隔重启策略是 Flink 1.16-volcano 版本的默认重启策略。

  • 最多重启的次数:任务尝试重启的最大次数,可设置为大于 0 的整数,默认值为 10。
  • 每次重启时间间隔:一次重启失败后至第二次重启之间的间隔时间。可用的时间单位有 second、 minute、hour、day,默认值为 20 second。

故障率重启策略(Failure Rate Restart Strategy)

选择该策略时,表示会根据任务故障率进行重启。

说明

故障率重启策略是 Flink 1.17-volcano 版本的默认重启策略。

  • 故障率统计时间间隔:统计任务故障率的时间间隔。可用的时间单位有 second、 minute、hour、day,默认值为 10 minute。
  • 时间间隔内的最大失败次数:在时间间隔内支持的最大重启次数,可设置为大于 0 的整数,默认值为 20。
  • 每次重启时间间隔:一次重启失败后至第二次重启之间的间隔时间。可用的时间单位有 second、 minute、hour、day,默认值为 20 second。

指数延迟重启策略
Exponential Delay Restart Strategy

选择该策略时,当任务失败时,会根据延迟规则以指数递增的方式无限次重启任务,直至任务恢复。

说明

指数延迟重启策略仅支持在 Flink 1.16、Flink 1.17 版本中使用。

  • 初次失败重启时间间隔(exponential-delay.initial-backoff):重启之间的初始延迟。单位为s,默认为1s。
  • 最大重启时间间隔(Exponential-delay.max-backoff):重启之间的最大延迟。单位为s,默认为300s。
  • 重启时间间隔的增长系数(Exponential-delay.backoff-multiplier):重启时间间隔的增长系数,默认为2。延迟时间乘以此值作为新的延迟时间,直到达到最大延迟。
  • 重置失败重启间隔的时间阈值(Exponential-delay.reset-backoff-threshold):任务恢复运行的持续时间。运行达到该时长后,将延迟时间恢复为初始延迟。默认为3600s。
  • 重启时间间隔最大抖动值(Exponential-delay.jitter-factor):在延迟时间中添加或减去的随机值,用于规避同时重启启动多个任务,从而降低负载峰值。默认值为0.1。

聚合故障率重启策略(Aggregated Failure Rate Restart Strategy)

该策略是火山引擎自研的任务故障重启策略。
选择该策略时,会统计指定时间内的失败次数,在到达阈值后阻止 Application Failure,在 Task Failure 之前 delay 指定的时间;会聚合 delay 时间内的多次 Task Failure,减少短时间大量 Task Failure 导致的 Application 提前退出。

说明

聚合故障率重启策略是 Flink 1.11-volcano 版本默认的重启策略,且仅支持在 Flink 1.11-volcano 版本中使用。

  • 重启前 delay 时间:Task Failover 重启前的 delay 时间,即多个 Task Failure 可以被聚合的时间。可用的时间单位有 second、 minute、hour、day,默认值为 20 second。
  • 聚合 Failure 次数的时间间隔:用于统计任务失败次数的时间间隔。可用的时间单位有 second、 minute、hour、day,默认值为 10 minute。
  • 最大重启次数:在聚合时间间隔内允许的最大重启次数。可设置为大于 0 的整数,默认值为 20。

任务失败重试拉起

当 Task 重启策略生效时,任务仍然运行失败,配置的任务失败重试拉起规则会进行整个任务维度的拉起操作。
Flink 平台已经提供了任务重启策略配置功能,但 Failover 策略有一定的理解门槛,配置起来相对困难。您可以使用任务失败重试拉起功能,配置简单易理解。

配置

说明

启用

是否启用任务失败重试拉起功能,默认启用。
启用该功能,您需要根据业务场景配置任务失败重启规则,即配置最大重试拉起次数重试拉起时间间隔

最大重试拉起次数

任务运行过程中,如果出现异常导致整个任务运行失败,系统在该次任务失败生命周期中的最大重试拉起次数。
默认值为 3 次,可设范围为 (0, 100] 的整数值。

重试拉起时间间隔

任务失败拉起,却还是运行失败,两次重试拉起之间的时间间隔。
默认值为 1 min,可设范围为 [1, 300] 的整数值。

值得注意的是,任务失败重试拉起策略需要同时满足以下条件才能生效:

  1. 作业必须设置开启任务失败重试策略;
  2. 作业必须开启 Checkpoint;
  3. 作业当前的实例必须至少成功生成一次 Checkpoint 或者 Savepoint。

注意:如果开启了作业失败重试策略,但是任务却没有重试拉起,可以从几个方向检查:

  1. 参考 Checkpoint 配置 检查任务是否开启 Checkpoint;
  2. 检查运行事件中是否有 “Automatic restart is suppressed because the job can only be initiated from new” 类似关键信息,如果有则可能因为该作业实例没有成功生成 Checkpoint 或者 Savepoint 而导致无法重试拉起。

自定义参数

通过设置自定义参数,可以对 Flink 任务进行更精细的控制和优化,使其更好地适应不同的业务需求和环境变化。同时,还可以提高任务的可维护性和灵活性。自定义参数的生效优先级低于已在配置面板上填写的配置。
此处提供几个示例参数配置样例,请根据实际情况设置。

类别

参数

描述

快照

savepoint.scheduler.default.interval

系统自动创建 Savepoint 的间隔时间,单位为毫秒,最小取值为 60000 毫秒。
系统快照间隔时间设置后,表示开启系统快照功能。系统每隔一次快照间隔时间就会自动触发创建任务快照。

说明

Flink 1.11-volcano 版本支持系统快照。

访问公网

kubernetes.jobmanager.annotations.gro.openstudio/network-policy

当需要 Flink 访问公网时,可以将该参数取值设置为external-internet
如何实现 Flink 访问公网数据源,请参见Flink 访问公网

kubernetes.taskmanager.annotations.gro.openstudio/network-policy

当需要 Flink 访问公网时,可以将该参数取值设置为external-internet
如何实现 Flink 访问公网数据源,请参见Flink 访问公网

限制 Log 日志等级

env.log.level

指定任务输出的 log 日志等级,支持取值为 trace、debug、info、warn、error、fatal。指定输出日志等级后,在任务日志中只会输出当前等级日志,以及更高等级的日志。
比如指定输出日志级别为 debug,则会输出 debug~fatal 级别的日志。

说明

  • 更新任务输出日志等级,需要重新上线和启动任务,配置的日志等级才会生效。
  • 仅能指定 TaskManager 和 JobManager 输出的日志等级,Client 日志不受控制。

推断 Source 并行度

table.exec.hive.infer-source-parallelism

table.exec.iceberg.infer-source-parallelism

Flink Batch 类型任务读取 Hive 或 Iceberg 数据时,是否自动推断 Source 并行度。
默认值为 true,表示自动推断并行度。支持您设置为 false,关闭自动推断并行度,此时并行度由配置的并行度决定。

table.exec.hive.infer-source-parallelism.max

table.exec.iceberg.infer-source-parallelism.max

Flink Batch 类型任务读取 Hive 或 Iceberg 数据时,Source 的最大推断并行度。默认值为 1000。

自动调优

Flink 平台已经支持为 Stream 类型任务开启自动调优。开启后,系统将自动分析运行任务,将根据上游 Kafka 的 Lag 调整 Flink 计算任务的资源使用量,从而降低资源使用量。

注意

开启任务自动调优前,需要先了解以下注意事项:

  • 仅支持为 Flink 1.11 引擎版本的 Stream 类型任务开启自动调优。
  • 仅支持 Source 端为 Kafka 的任务开启自动调优。
  • 开启自动调优功能的任务仅支持运行在按量计费的资源池上。

Image
如果需要为任务开启自动调优,请根据业务情况配置相关参数。

配置

说明

不生效时间

设置每天自动调优的不生效时间段,即在该时间段内智能调优正常运行但不进行任何调优调整。

最大资源限制

设置任务在自动调整资源时,可以扩容的最大资源上限,单位为 CU。
默认值为 100CU。

调整间隔时间

设置任务在调优重启生效一次之后,下一次再进行调优的时间间隔,单位为 min。
默认值为 30min。

依赖文件

依赖文件的配置主要用于指定任务所需的外部文件或资源的路径,在任务执行期间会自动加载所需的文件或资源,并将其传递给相应的组件或函数使用。
单击添加,然后选择添加依赖文件的方式:
Image

  • 选择资源:从资源库中选择目标文件。如果还未上传文件,请参见资源文件管理
  • 填写 TOS 地址:填写 TOS 地址,从 TOS Bucket 中读取依赖文件,格式为tos://bucket/path/依赖文件名。示例:tos://test-init/tos_jar/flink-faker-0.5.3.jar
    当您选择从 TOS Bucket 中读取依赖文件时,您设置的 Bucket 需要和 Flink 产品初始化时的 Bucket 保持一致,否则将无法正确读取依赖文件。

    注意

    填写 Bucket 时,请先了解以下两个注意事项:

    • Flink 平台历史用户:与产品初始化时选择的 TOS Bucket 保持一致,您可以在创建资源池的页面确认 Bucket 的名称。
    • Flink 平台新用户:如果您是在 2024 年 02 月 04 日以后开通 Flink 产品的新用户,产品初始化时将会自动生成并绑定一个 TOS Bucket,格式固定为volc-flink-meta-{账号ID}-{region}。示例:volc-flink-meta-2100xxxx-cn-beijing

变量作用域

当 SQL 代码中引用了指定域变量,需要在 Flink 参数配置中选择生效的变量作用域。如何创建变量,请参见创建变量
Image
系统将会先从您指定的变量作用域中查找变量,再从全局变量中查找变量,指定域变量优先级高于全局变量。如果变量不存在则运行报错。

  • 全局变量:所有任务引用均可生效。
  • 指定域变量:只有在任务参数配置中指定该域,引用才可生效。