You need to enable JavaScript to run this app.
导航
MiniBatch 配置
最近更新时间:2023.11.07 10:27:22首次发布时间:2023.11.07 10:27:22

MiniBatch 的作用是缓存一定的数据后再触发处理,以减少对 State 的访问次数,从而提升吞吐量和减少数据输出量。

适用场景

  1. 降低频繁访问状态导致的 CPU 开销
    如果使用 RocksDB 作为 State Backend。每次访问 State,都需要进行序列化和反序列化,导致 CPU 开销比较大。开启 MiniBatch,可以减少对 State 的访问次数,降低一些 CPU 开销。
  2. 减少聚合的输出量
    在普通聚合中,每来一条数据就直接输出一条当前的聚合结果;但是开启 MiniBatch 后,如果在一个微批处理内有多条数据,只会输出一次,这样可以有效降低输出数据的量。

如需详细了解 MiniBatch 原理,请参见开源文档:MiniBatch Aggregation

如何启用 MiniBatch

MiniBatch 默认关闭。如需启用,您可以在 Flink 参数配置中配置以下参数:

配置

示例值

说明

table.exec.mini-batch.enabled

true

是否启用 MiniBatch。

  • true:启用
  • false:默认值,关闭。

table.exec.mini-batch.allow-latency

5s

MiniBatch 的时间间隔。

说明

启用 MiniBatch,会缓冲一批数据而不是立即处理数据,这会产生一些延迟。您需要根据业务需要配置mini-batch.allow-latency,在任务吞吐和数据时效性之间进行折中配置。

table.exec.mini-batch.size

10000

单个并发 buffer 数据的条数。
建议通过合理配置mini-batch.size以控制异常流量(如数据回溯,lag 过大等情况)下 buffer 的数据条数,防止异常流量下 buffer 数据条数过多直接造成内存 OOM。
您可以通过以下公式粗略计算,然后合理配置mini-batch.size来限制 buffer 的最大数据条数。
图片

  • 分子中的 f 表示系数,Max(QPS) 表示算子最大写入的 QPS,f * Max(QPS) 表示允许的最大流量。
  • 分子中的 latency 取自mini-batch.allow-latency,单位为秒。
  • 分母为作业的并发数,通过 Tm 的数据量乘以 slot 的数量计算得出。

说明

数据条数和时间间隔属于的关系,满足任意条件就组成了一个 MiniBatch。

常见问题

  1. MiniBatch 跟 LocalGlobal 的关系。
    LocalGlobal 将原先的 Aggregate 分成 Local 和 Global 两阶段聚合。第一阶段在上游节点本地攒一批数据进行聚合(localAgg),并输出这次微批处理的增量值(Accumulator);第二阶段再将收到的 Accumulator 合并(Merge)得到最终的结果(GlobalAgg)。
    LocalGlobal 能够靠 LocalAgg 的聚合筛除部分倾斜数据,从而降低 GlobalAgg 的热点,提升性能。如需详细了解 LocalGlobal 原理,请参见开源文档:Local-Global Aggregation
    LocalGlobal 的前提是必须要开启 MiniBatch。LocalGlobal 不需要手动开启,默认就是开启的。所以开启 MiniBatch 配置后,LocalGlobal 就会自动优化了。

  2. 开启 MiniBatch 是否可以从已有 Checkpoint 恢复?
    开启 MiniBatch 后,如果想要恢复开启 MiniBatch 前的状态,需要关闭 LocalGlobal。
    请通过以下参数关闭 LocalGlobal。

    // 开启 MiniBatch,LocalGlobal 也默认开启,则默认值为 TWO_PHASE。需要关闭时,设置为 ONE_PHASE。
    table.optimizer.agg-phase-strategy = ONE_PHASE