本文档以 Confluent 官方的 Java 版本 SDK 为例介绍 Kafka 生产者和消费者的使用建议。推荐在使用消息队列 Kafka版进行消息生产与消费之前,阅读以下使用建议,提高接入效率和业务稳定性。
火山引擎 Kafka 实例的消息在同一分区中可以保证数据的先入先出。即写入同一分区的消息,若消息 A 先于消息 B 写入,那么在进行消息读取时,消息A也一定可以先于消息 B 被客户端读到。需要注意的是此处仅保证通过同一生产者先后发送的消息可以保证有序,不同生产者之间的消息因为无法确认到达服务端的先后顺序,所以无法保证有序。
基于以上特性,若要实现消息顺序性的能力,可以考虑以下方式:
单分区的 Topic 在生产消费性能上会有较大的限制。在实际使用中推荐选择分区有序的方式实现业务逻辑,将需要保序的消息写入相同的分区中实现同类消息的有序。
acks 配置定义了写入消息确认的方式,并支持以下三种配置:
acks=0
:不关心消息的写入结果,服务端对于该消息的写入,无论成功失败都不会有任何结果返回。acks=1
:服务端在写入主副本之后即可返回写入结果到生产者客户端。acks=-1
或 acks=all
:消息需要在主备副本都写入后才可返回写入结果到生产客户端。acks 的三种配置,从上到下,性能依次下降,数据可靠性依次上升。推荐您直接使用可靠性最高的配置方式。
对于分布式系统,因网络或者主节点切换等问题,可能存在偶现的发送失败问题。您可以通过 retries
参数配置写入失败的重试次数,重试次数默认为长整型的最大值;通过 retry.backoff.ms
配置重试的间隔,间隔默认为 100ms。推荐配置重试次数为 3 次、重试间隔为 1000ms。
消息实际在写入时会选择 Topic 中的某一分区进行写入。分区选择逻辑如下:
推荐使用默认的分区选择逻辑即可。无消息 key 时默认逻辑本身已经实现了消息在分区中的均衡。对于使用消息 key 的场景,减少消息 key 的 hash 冲突可以有效打散消息,避免部分分区中承载的业务和消息过多。对于自定义实现的分区选择同样也需要注意尽可能的保证分区选择的均衡,避免业务和消息过度集中在部分分区中。
生产者消息的发送实现当前为异步逻辑。即调用 send 方法写入消息时,实际消息仅仅是写入本地缓存中,实际并未发送到服务端。消息会在本地缓存中根据分区做一次消息聚合,之后由异步发送线程扫描将聚合后的消息发送到服务端中。
send 方法仅为写入缓存,不代表消息实际写入成功。要获取消息的实际写入结果,当前有以下方式可以选择:
interceptor.classes
注入一个自定义的实现ProducerInterceptor
接口的拦截器,该拦截器会将消息写入的结果或异常通过onAcknowledgement
方法进行传递。Future
对象,可直接调用该对象的get
方法,阻塞等待消息的写入结果。以上三种方式中,推荐使用前两者中的任意一种,第三种其实是一种伪同步的实现方式,会严重影响客户端的生产性能,不推荐使用。
生产者通过内存缓存,消息聚合的方式,减少和服务端之间的网络请求,从而达到吞吐性能的大幅度提升。对于生产端的聚合能力,当前支持以下配置的自定义:
消息的时间戳支持两种不同的填充方式,为服务端集群的配置,详细说明请参考修改参数配置。
消息的时间戳会被用于计算消息的过期老化等场景。客户端发送的消息需要保证具备合理的时间戳,一旦消息时间戳填写错误,可能会导致数据不会按照预期的时间进行老化删除。在写入消息后,可通过消息偏移量查询进行排查。通常较老版本的 API 会存在无消息时间戳的问题,建议使用推荐的客户端版本。
Confluent 默认的 SDK 在不指定消息时间戳的情况下,会填入生产者本地的当前时间。若您需要自行指定时间时,应注意填入正确的时间戳,以免影响服务端的消息老化等业务处理。
生产者为线程安全的实现方式,因而在客户端业务实现中,推荐使用生产者池的方式,将生产者提供给不同的多线程业务使用,避免每个生产业务创建独立生产者。
生产者为异步发送的方式,在预期关闭生产之前,推荐调用生产者的flush
方法,主动将缓存中的消息推送到服务端,再调用close
方法关闭生产者客户端。否则可能会导致部分缓存中的消息在关闭之前未能及时放松,导致消息丢失。
火山引擎 Kafka 实例为分布式集群部署,初始接入点使用域名的方式提供。当客户端使用域名接入时,推荐设置客户端的 DNS 解析方式为全部 IP 解析,即 client.dns.lookup=use_all_dns_ips
。保证分布式集群在发生后端节点变化的时候客户端仍然能够正常使用。