本文以 Go 客户端为例,介绍如何在 VPC 或公网环境下通过 SASL_SSL 接入点 PLAIN 机制接入消息队列 Kafka版,并收发消息。
已完成准备工作。详细说明请参考准备工作。
创建消息队列 Kafka版配置文件 config.json
。 通过 SASL_SSL 接入点 PLAIN 机制接入时,配置文件示例如下。配置文件字段的详细说明,请参考配置文件。
说明
{ "bootstrap.servers": "xxxxx", // 修改配置为实例的 SASL 接入点 "security.protocol": "SASL_SSL", // 固定为 SASL_SSL "topic": "xxxx", // 修改配置为待发送的 topic 名称 "consumer": { "group.id": "xxxx" // 修改为指定消费组的名称 }, "sasl": { "enabled": true, // 使用 SASL 接入点时,必须设置为 true "mechanism": "PLAIN", // 用户类型为 Plain 时配置为 PLAIN,账号类型为Scram 时配置为 SCRAM-SHA-256 "username": "xxxx", // 用户名 "password": "xxxx" // 用户密码 } }
producer.go
。producer.go
发送消息。通过 SASL_SSL 接入点生产消息的示例代码如下,您也可以参考 Demo 中的示例文件 {DemoPath}/client/producer.go
,实现相关业务逻辑。
package client import ( "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" ) func RunProduce(config *KafkaConf) error { // 构造生产配置 configMap := &kafka.ConfigMap{ "bootstrap.servers": config.BootstrapServers, "security.protocol": config.Protocol, "acks": config.Producer.Acks, "batch.size": config.Producer.BatchSize, } if config.Debug { // 开启Debug能力 configMap.SetKey("debug", "ALL") } if config.Sasl.Enabled { // 配置SASL认证 configMap.SetKey("sasl.mechanism", config.Sasl.Mechanism) configMap.SetKey("sasl.username", config.Sasl.UserName) configMap.SetKey("sasl.password", config.Sasl.Password) } // 创建一个Kafka生产者对象 producer, err := kafka.NewProducer(configMap) if err != nil { return err } // 处理消息发送的结果 go callBack(producer)() // 获取发送channel sendChannel := producer.ProduceChannel() // 循环发送10条消息 for count := 0; count < 10; count++ { // 构造消息对象 msg := &kafka.Message{ // 消息写入位置 TopicPartition: kafka.TopicPartition{ // 消息需要写入的Topic名称 Topic: &config.Topic, // 消息写入的分区编号,可以指定Topic特定的某一分区写入,或者设置为kafka.PartitionAny由系统自行选择 Partition: kafka.PartitionAny, }, // 消息内容,可以为nil Value: []byte(fmt.Sprintf("Bytedance test msg %d", count)), // 消息Key值,可以为nil。若消息key不为空且为指定分区进行写入时,相同key的消息会落在同一分区内 Key: []byte(fmt.Sprintf("Bytedance test key %d", count)), // 消息的属性值,作为额外的扩展属性,可以为nil Headers: []kafka.Header{ {Key: "service", Value: []byte("kafka")}, {Key: "version", Value: []byte("2.2.0")}, }, } // 发送消息 sendChannel <- msg } // 因为发送是异步动作,关闭生产者之前需要将消息都推送到服务端 producer.Flush(10000) // 关闭发送者 producer.Close() return nil } // 消息发送结果的一部方法回调,注意,若配置ACKS为0时,不会有消息回调产生 func callBack(p *kafka.Producer) func() { return func() { for event := range p.Events() { switch eType := event.(type) { case *kafka.Message: // 写入结果处理,可修改此处结果实现 if eType.TopicPartition.Error != nil { // 写入报错,异常处理 fmt.Printf("Delivery failed: %v\n", eType.TopicPartition.Error) } else { // 写入成功,正常处理 fmt.Printf("Delivered message to topic %s [%d] at offset %v\n", *eType.TopicPartition.Topic, eType.TopicPartition.Partition, eType.TopicPartition.Offset) } } } } }
consumer.go
。consumer.go
消费消息。通过 SASL_SSL 接入点消费消息的示例代码如下,您也可以参考 Demo 中的示例文件 {DemoPath}/client/consumer.go
,实现相关业务逻辑。
package client import ( "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" "strconv" ) func RunConsumer(config *KafkaConf) error { // 构造消费配置 configMap := &kafka.ConfigMap{ "bootstrap.servers": config.BootstrapServers, "security.protocol": config.Protocol, "group.id": config.Consumer.GroupId, "auto.offset.reset": config.Consumer.AutoOffsetRest, "enable.auto.commit": strconv.FormatBool(config.Consumer.AutoCommit), } if config.Debug { // 开启Debug能力 if err := configMap.SetKey("debug", "ALL"); err != nil { return err } } if config.Sasl.Enabled { // 配置SASL认证 configMap.SetKey("sasl.mechanism", config.Sasl.Mechanism) configMap.SetKey("sasl.username", config.Sasl.UserName) configMap.SetKey("sasl.password", config.Sasl.Password) } // 创建消费者对象 consumer, err := kafka.NewConsumer(configMap) if err != nil { return err } // 订阅指定的Topic列表 err = consumer.SubscribeTopics([]string{config.Topic}, nil) if err != nil { return err } // 循环读取10条消息 for count := 0; count < 10; { // 从服务端拉取消息,Poll方法需要进行周期性调用 event := consumer.Poll(1000) switch msg := event.(type) { case *kafka.Message: // 消费到数据后,对数据执行处理。此处处理时间不宜过长,过长会导致链接断开 handleMessage(msg) count++ } } if !config.Consumer.AutoCommit { // 未开启自动提交的场景下,手动提交消费进度,用户可根据自身需求在合适的实际执行消费进度提交 tps, err := consumer.Commit() if err != nil { fmt.Printf("Commit offsets failed. %s\n", err.Error()) } else { fmt.Printf("Offset commited: \n") for _, tp := range tps { fmt.Printf("\t%s-%d: %d", *tp.Topic, tp.Partition, tp.Offset) } } } // 使用完毕后关闭消费者 return consumer.Close() } // 对获取到的消息进行处理 func handleMessage(msg *kafka.Message) { fmt.Printf("Consumed a message from topic %s [%d] at offset %d. key: (%s), value: (%s), Headers: ", *msg.TopicPartition.Topic, msg.TopicPartition.Partition, msg.TopicPartition.Offset, msg.Key, msg.Value) if msg.Headers != nil { fmt.Printf("(") for _, header := range msg.Headers { fmt.Printf("%s->%s,", header.Key, header.Value) } fmt.Printf(")") } fmt.Printf("\n") }