MiniBatch 的作用是缓存一定的数据后再触发处理,以减少对 State 的访问次数,从而提升吞吐量和减少数据输出量。
如需详细了解 MiniBatch 原理,请参见开源文档:MiniBatch Aggregation。
MiniBatch 默认关闭。如需启用,您可以在 Flink 参数配置中配置以下参数:
配置 | 示例值 | 说明 |
---|---|---|
table.exec.mini-batch.enabled | true | 是否启用 MiniBatch。
|
table.exec.mini-batch.allow-latency | 5s | MiniBatch 的时间间隔。 说明 启用 MiniBatch,会缓冲一批数据而不是立即处理数据,这会产生一些延迟。您需要根据业务需要配置 |
table.exec.mini-batch.size | 10000 | 单个并发 buffer 数据的条数。
说明 数据条数和时间间隔属于或的关系,满足任意条件就组成了一个 MiniBatch。 |
MiniBatch 跟 LocalGlobal 的关系。
LocalGlobal 将原先的 Aggregate 分成 Local 和 Global 两阶段聚合。第一阶段在上游节点本地攒一批数据进行聚合(localAgg),并输出这次微批处理的增量值(Accumulator);第二阶段再将收到的 Accumulator 合并(Merge)得到最终的结果(GlobalAgg)。
LocalGlobal 能够靠 LocalAgg 的聚合筛除部分倾斜数据,从而降低 GlobalAgg 的热点,提升性能。如需详细了解 LocalGlobal 原理,请参见开源文档:Local-Global Aggregation。
LocalGlobal 的前提是必须要开启 MiniBatch。LocalGlobal 不需要手动开启,默认就是开启的。所以开启 MiniBatch 配置后,LocalGlobal 就会自动优化了。
开启 MiniBatch 是否可以从已有 Checkpoint 恢复?
开启 MiniBatch 后,如果想要恢复开启 MiniBatch 前的状态,需要关闭 LocalGlobal。
请通过以下参数关闭 LocalGlobal。
// 开启 MiniBatch,LocalGlobal 也默认开启,则默认值为 TWO_PHASE。需要关闭时,设置为 ONE_PHASE。 table.optimizer.agg-phase-strategy = ONE_PHASE