本文档以 Confluent 官方 Java 版本客户端 SDK 为例,介绍使用火山引擎 Kafka 实例时的消费者最佳实践。
在同一个消费组内部,每个消息都预期仅仅只被消费组内的某个消费者消费一次,因而使用同一个消费组的不同消费者之间,即可实现消息的单播消费。
在不同的消费组之间,每个消息都预期可以被每个消费组分别消费一次,因而使用不同消费组的不同消费者之间,即可实现消息的广播消费。
消息是否被客户端消费,在服务端的认知中,仅和保存在服务端的消费位点有关。而消费位点是由消费者调用相关 API 从而记录到服务端,那么在客户端起停导致的重均衡过程中,很可能会出现消费位点未及时同步到服务端的现象。
因而,即使在同一个消费组内的不同消费者,也无法完全保证一条消息仅仅只会被消费一次。消费者若需要实现完全的幂等,可以通过在消息中添加额外的标识字段等方式在消费到消息后,再进行二次校验。
消费者支持通过以下方式指定 Topic:
推荐直接使用订阅(Subscribe)的方式。
消费者使用拉模型进行数据读取,需要保证拉取的线程不会异常退出或者被阻塞,否则会导致无法正常发起消费请求。
消费者的所有请求发送和响应几乎都基于消费者poll
方法的调用。
若客户端使用订阅(Subscribe)的方式进行消费,那么在使用过程中,需要保证poll
方法在固定的周期内进行调用,最长不能超过max.poll.interval.ms
的配置,默认 300000ms,该参数定义了两次poll
方法调用的最大时间间隔,超过该时间间隔,会导致服务端认为消费者异常,从而将其从消费组中踢出。同时过长的间隔,也可能会影响到消费组重均衡的执行,导致长时间的消费卡顿。
客户端使用订阅(Subscribe)的方式进行消费时,在消费组的生命周期中将可能在以下不同的状态之间进行流转:
通常一个正常的消费组预期应该长期保持在 Stable
状态进行正常的消费业务处理。
当一个订阅中的消费组有新的消费者加入或者老的消费者退出/失败时,将会触发一次消费组的重均衡动作。消费组将进入PreparingRebalance
状态,然后等待当前所有的消费者重新加入消费组。所有消费组触发完成后,重新计算分区的分配并进入CompletingRebalance
状态,并等待各个消费者完成各自分区的获取,进入 Stable
状态,开始正常处理消费逻辑。
从以上逻辑可以看出,重均衡逻辑中存在两处场景需要与消费者进行业务交互,而消费者的所有请求处理都需要通过明确的poll
方法调用来进行触发,因而阻塞poll
方法的正常调用,很容易导致消费组重均衡任务的长期卡顿,甚至超时。
建议在消费者使用中保证两次poll
方法的调用间隔不要超过10s,对于下游业务消息处理慢的场景,可以考虑有优化下游处理速度或者通过异步消息处理的方式来实现。
另外还需要注意的是,若使用自由分配(Assign)的方式来进行消费的话,消费组的状态将一直保持在Empty
状态。
客户端默认的提交方式为定期自动提交的方式,由以下配置决定:
在实际业务中推荐关闭自动提交,在消息处理完成后,由业务侧调用commitAsync
的方法进行消费位点的提交,以避免消息处理失败后,因自动提交可能导致无法重试的问题。同时需要注意业务主动调用提交方法的频率不宜太快,处理完一批消息后定时提交一次即可,推荐 5s 提交一次。
消费组中可以同时运行的消费者的并发数,与所消费的 Topic 分区数相关,最多不能超过分区个数。因而当消费组产生堆积时可以参考以下方式处理:
receive.buffer.bytes
调整 TCP 的接受缓存区大小,默认为 64KB。建议修改为 1MB。消费者与生产者不同,不是线程安全的,不支持多个线程调用相同的消费者对象。每个消费者都需要放在一个独立的业务线程中调用。
消费者退出时,推荐调用close
方法进行关闭,主动中断与服务端的业务。否则可能会导致消费者未正常发送退出请求,阻塞服务端消费组的业务,默认阻塞 10s。
避免频繁的创建和关闭消费者,每次创建或关闭都会引起消费组的重均衡,重均衡状态的消费组无法正常获取消息。
火山引擎 Kafka 实例为分布式集群部署,初始接入点使用域名的方式提供。当客户端使用域名接入时,推荐设置客户端的 DNS 解析方式为全部 IP 解析,即 client.dns.lookup=use_all_dns_ips
。保证分布式集群在发生后端节点变化的时候客户端仍然能够正常使用。