本文以 Go 客户端为例,介绍如何在 VPC 或公网环境下通过 SASL_PLAINTEXT 接入点 PLAIN 机制接入消息队列 Kafka版,并收发消息。
已完成准备工作。详细说明请参考准备工作。
创建消息队列 Kafka版配置文件 config.json
。配置文件字段的详细说明,请参考配置文件。
通过 SASL_PLAINTEXT 接入点 PLAIN 机制接入时,配置文件示例如下。
说明
{
"bootstrap.servers": "xxxxx", // 修改配置为实例的 SASL 接入点
"security.protocol": "SASL_PLAINTEXT", // 固定为 SASL_PLAINTEXT
"topic": "xxxx", // 修改配置为待发送的 topic 名称
"consumer": {
"group.id": "xxxx" // 修改为指定消费组的名称
},
"sasl": {
"enabled": true, // 使用SASL接入点时,必须设置为 true
"mechanism": "PLAIN", // 用户类型为 Plain 时配置为 PLAIN,账号类型为Scram 时配置为 SCRAM-SHA-256
"username": "xxxx", // 用户名
"password": "xxxx" // 用户密码
}
}
producer.go
。producer.go
发送消息。通过 SASL_PLAINTEXT 接入点生产消息的示例代码如下,您也可以参考 Demo 中的示例文件 {DemoPath}/client/producer.go
,实现相关业务逻辑。
package client
import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func RunProduce(config *KafkaConf) error {
// 构造生产配置
configMap := &kafka.ConfigMap{
"bootstrap.servers": config.BootstrapServers,
"security.protocol": config.Protocol,
"acks": config.Producer.Acks,
"batch.size": config.Producer.BatchSize,
}
if config.Debug {
// 开启Debug能力
configMap.SetKey("debug", "ALL")
}
if config.Sasl.Enabled {
// 配置SASL认证
configMap.SetKey("sasl.mechanism", config.Sasl.Mechanism)
configMap.SetKey("sasl.username", config.Sasl.UserName)
configMap.SetKey("sasl.password", config.Sasl.Password)
}
// 创建一个Kafka生产者对象
producer, err := kafka.NewProducer(configMap)
if err != nil {
return err
}
// 处理消息发送的结果
go callBack(producer)()
// 获取发送channel
sendChannel := producer.ProduceChannel()
// 循环发送10条消息
for count := 0; count < 10; count++ {
// 构造消息对象
msg := &kafka.Message{
// 消息写入位置
TopicPartition: kafka.TopicPartition{
// 消息需要写入的Topic名称
Topic: &config.Topic,
// 消息写入的分区编号,可以指定Topic特定的某一分区写入,或者设置为kafka.PartitionAny由系统自行选择
Partition: kafka.PartitionAny,
},
// 消息内容,可以为nil
Value: []byte(fmt.Sprintf("Bytedance test msg %d", count)),
// 消息Key值,可以为nil。若消息key不为空且为指定分区进行写入时,相同key的消息会落在同一分区内
Key: []byte(fmt.Sprintf("Bytedance test key %d", count)),
// 消息的属性值,作为额外的扩展属性,可以为nil
Headers: []kafka.Header{
{Key: "service", Value: []byte("kafka")},
{Key: "version", Value: []byte("2.2.0")},
},
}
// 发送消息
sendChannel <- msg
}
// 因为发送是异步动作,关闭生产者之前需要将消息都推送到服务端
producer.Flush(10000)
// 关闭发送者
producer.Close()
return nil
}
// 消息发送结果的一部方法回调,注意,若配置ACKS为0时,不会有消息回调产生
func callBack(p *kafka.Producer) func() {
return func() {
for event := range p.Events() {
switch eType := event.(type) {
case *kafka.Message:
// 写入结果处理,可修改此处结果实现
if eType.TopicPartition.Error != nil {
// 写入报错,异常处理
fmt.Printf("Delivery failed: %v\n", eType.TopicPartition.Error)
} else {
// 写入成功,正常处理
fmt.Printf("Delivered message to topic %s [%d] at offset %v\n",
*eType.TopicPartition.Topic, eType.TopicPartition.Partition, eType.TopicPartition.Offset)
}
}
}
}
}
consumer.go
。consumer.go
消费消息。通过 SASL_PLAINTEXT 接入点消费消息的示例代码如下,您也可以参考 Demo 中的示例文件 {DemoPath}/client/consumer.go
,实现相关业务逻辑。
package client
import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
"strconv"
)
func RunConsumer(config *KafkaConf) error {
// 构造消费配置
configMap := &kafka.ConfigMap{
"bootstrap.servers": config.BootstrapServers,
"security.protocol": config.Protocol,
"group.id": config.Consumer.GroupId,
"auto.offset.reset": config.Consumer.AutoOffsetRest,
"enable.auto.commit": strconv.FormatBool(config.Consumer.AutoCommit),
}
if config.Debug {
// 开启Debug能力
if err := configMap.SetKey("debug", "ALL"); err != nil {
return err
}
}
if config.Sasl.Enabled {
// 配置SASL认证
configMap.SetKey("sasl.mechanism", config.Sasl.Mechanism)
configMap.SetKey("sasl.username", config.Sasl.UserName)
configMap.SetKey("sasl.password", config.Sasl.Password)
}
// 创建消费者对象
consumer, err := kafka.NewConsumer(configMap)
if err != nil {
return err
}
// 订阅指定的Topic列表
err = consumer.SubscribeTopics([]string{config.Topic}, nil)
if err != nil {
return err
}
// 循环读取10条消息
for count := 0; count < 10; {
// 从服务端拉取消息,Poll方法需要进行周期性调用
event := consumer.Poll(1000)
switch msg := event.(type) {
case *kafka.Message:
// 消费到数据后,对数据执行处理。此处处理时间不宜过长,过长会导致链接断开
handleMessage(msg)
count++
}
}
if !config.Consumer.AutoCommit {
// 未开启自动提交的场景下,手动提交消费进度,用户可根据自身需求在合适的实际执行消费进度提交
tps, err := consumer.Commit()
if err != nil {
fmt.Printf("Commit offsets failed. %s\n", err.Error())
} else {
fmt.Printf("Offset commited: \n")
for _, tp := range tps {
fmt.Printf("\t%s-%d: %d", *tp.Topic, tp.Partition, tp.Offset)
}
}
}
// 使用完毕后关闭消费者
return consumer.Close()
}
// 对获取到的消息进行处理
func handleMessage(msg *kafka.Message) {
fmt.Printf("Consumed a message from topic %s [%d] at offset %d. key: (%s), value: (%s), Headers: ",
*msg.TopicPartition.Topic, msg.TopicPartition.Partition, msg.TopicPartition.Offset, msg.Key, msg.Value)
if msg.Headers != nil {
fmt.Printf("(")
for _, header := range msg.Headers {
fmt.Printf("%s->%s,", header.Key, header.Value)
}
fmt.Printf(")")
}
fmt.Printf("\n")
}