本文为您介绍 Flink 出现 Task 反压的常见原因和处理建议。
您在查看本文内容前,建议先查看背景信息中的内容。
什么是反压?
首先通过经典的“生产者-消费者模型”来理解反压。
在该模型中,有生产者、消费者,以及一个固定大小的队列,生产者负责生产数据并写入到队列中,消费者从队列中获取数据。当消费者的消费能力小于生产者的生产能力时,队列中的数据就会开始堆积,直至堆积满,此时生产者将被阻塞且无法继续生产数据。生产者还可能是其他队列的消费者,堆积现象还会继续往上层传递,直到源头,这就是反压现象。
Flink Job 部署在不同 TaskManager 时,从 TaskA 流转到 TaskB 的过程也有可能出现反压现象。
Flink内部通信模型
场景模拟:一个 Flink Job 包含 TaskA 和 TaskB,且并发度为 4,即 TaskA.1~4 和 TaskB.1~4。将这个 Flink Job 部署到 2 个 taskmanager 中,每个 taskmanager 分配 2 个 slots。由于 Flink 支持不同类型的 Task 可以放到同⼀个 slot 中,则整个分配方式如下:
B.1 | B.2 | B.3 | B.4 | |
---|---|---|---|---|
A.1 | local | remote | ||
A.2 | ||||
A.3 | remote | local | ||
A.4 |
TaskManager1 分配 A1、A2、B1、B2;TaskManager2 分配 A3、A4、B3、B4。位于相同 Taskmanager 内部的 Task 靠local
传输,位于不同的 TaskManager 之间的 Task 靠remote
传输。
TaskManager内部数据流转流程
两个 TaskManager 之间的 TCP Channel 是共享的,例如 A1 -> B3/B4 和 A2-> B3/B4 使用的是同一个 TCP Channel。
从上图中可以得出,生产者 TaskA 将数据发送到 TaskB,需要经历 2 个步骤:
这种反压机制有一个缺点,就是当产生数据倾斜时,Subtask A.2 往 Subtask B.4 产生数据量较大时,由于反压,会影响到 Subtask A.1 的生产能力。如果您想深入了解 Flink 反压机制,请参见Flink's Network Stack。
介绍出现反压的常见原因。
Container CPU 不足,导致计算能力不足,出现反压。
您可以观察 Container CPU 和 Application CPU 的使用情况。
由于 cgroup 策略给单个 Container 预留了 20% 的 Buffer,所以可以按照 100% 来区分高负载和低负载。
是否资源不足 | Container CPU < 100% * tm_cores | Container CPU > 100% * tm_cores |
---|---|---|
Application CPU < 80% * total_cores | 否 | 负载不均 |
Application CPU > 80% * total_cores | NA(不会出现) | 是 |
当资源不足时,可以选择缩小单 tm_slots,增加 tm_num 个数。 如果出现负载不均,请参照负载不均的排查思路。
当出现负载不均时,需要分析两个原因:
Task是否调度不均衡
如果 Task 并发度与 Container 并发度不能成正比,则代表不同 Container 分配的 Task 个数不同。
如何判断是否是由于反压导致任务产生 Lagsize?您可以在 Flink WebUI 上手动触发反压机制,查看反压状态;也可以查看 Flink Job 中间的 Task 的 InputQueueUsage 和 OutputQueueUsage 比例来判断。
此处假设 Job 分为两个 Task,使用 key 相连接,为了模拟反压效果,可以在 Map Operator 加上 Sleep 10s 逻辑。
GAG 图如下所示:
您可以在 Flink Web 中,针对任何一个 Task 做反压检测。反压检测需要在手动触发,触发后 TaskManager 使用 Thread.getStackTrace 来抽样检测 Task Thread 是否处于等待 NetworkBuffer 中。
根据抽样比例,来判断反压状态,反压状态分为 OK、LOW 和 HIGH 三种状态。Ratio 表示抽样 n 次,处于等待 NetworkBuffer 次数的比例。
ratio ≤ 0.10
0.10 < Ratio ≤ 0.5
0.5 < Ratio ≤ 1
您可以从 Sink -> Source 逆序依次进行检查,遇到的第一个处于 HIGH 状态的 Task 可能就是触发反压的根本原因(并不绝对),由于该 Task 处于反压状态,它会将该状态一直向上传递,直到 Source。 这种检测方式,本身存在一些缺点:
每一个 Flink Job 中间的 Task 都会有一个 InputQueue 和 OutputQueue,您可以通过查看 InputQueueUsage 和 OutputQueueUsage 的比例来判断。
比例情况 | OutputQueueUsage < 1.0 | OutputQueueUsage == 1.0 |
---|---|---|
InputQueueUsage < 1.0 | 正常 | 处于反压,原因可能是该 Task 下游处理能力不足。如果持续下去,该 Task 将会向上游传递反压。 |
InputQueueUsage == 1.0 | 处于反压,该 Task 可能是反压的源头。如果持续下去,该 Task 会向上游传递反压。 | 处于反压,原因可能是被下游阻塞。 |
当任务处于反压下,您可以从 Sink -> Source 逆序逐步检查,直到找到第一个 InputQueueUsage High,OutputQueueUsage Low 的 Task。