本文介绍如何通过 Go SDK 接入云原生消息引擎 BMQ 并收发消息。
您可以直接使用开源 SDK sarama 进行收发消息。如需了解详细信息,请参见sarama。
编写并运行BmqProducerDemo.go
发送消息。
使用PLAINTEXT
协议接入点地址连接 BMQ 实例时,无需鉴权。
import ( "log" "github.com/IBM/sarama" ) func main() { brokers := []string{"xxx"} // 接入点地址 config := sarama.NewConfig() config.Version = sarama.V0_10_2_0 config.Producer.Return.Successes = true // 创建生产者 producer, err := sarama.NewSyncProducer(brokers, config) if err != nil { log.Println("Failed to create producer: ", err) return } defer producer.Close() // 生产消息 message := &sarama.ProducerMessage{ Topic: "my_topic", //Topic名称 Key: sarama.StringEncoder("test_key"), Value: sarama.StringEncoder("test_value"), } partition, offset, err := producer.SendMessage(message) if err != nil { log.Println("Failed to send message: ", err) return } log.Printf("Message sent successfully. Partition: %d, Offset: %d\n", partition, offset) }
通过 SASL 用户名和密码进行鉴权。您需要获取有权限用户的名称和密码,如何获取请参见查看 SASL 用户密码。
import ( "log" "github.com/IBM/sarama" ) func main() { brokers := []string{"xxx"} // 接入点地址 config := sarama.NewConfig() config.Version = sarama.V0_10_2_0 config.Producer.Return.Successes = true // SASL_PLAINTEXT配置 config.Net.SASL.Enable = true config.Net.SASL.User = "用户名" config.Net.SASL.Password = "密码" config.Net.SASL.Mechanism = sarama.SASLTypePlaintext config.Net.SASL.Version = 0 // 创建生产者 producer, err := sarama.NewSyncProducer(brokers, config) if err != nil { log.Println("Failed to create producer: ", err) return } defer producer.Close() // 生产消息 message := &sarama.ProducerMessage{ Topic: "my_topic", //Topic名称 Key: sarama.StringEncoder("test_key"), Value: sarama.StringEncoder("test_value"), } partition, offset, err := producer.SendMessage(message) if err != nil { log.Println("Failed to send message: ", err) return } log.Printf("Message sent successfully. Partition: %d, Offset: %d\n", partition, offset) }
通过 SASL 用户名和密码进行鉴权。您需要获取有权限用户的名称和密码,如何获取请参见查看 SASL 用户密码。
import ( "log" "github.com/IBM/sarama" ) func main() { brokers := []string{"xxx"} // 接入点地址 config := sarama.NewConfig() config.Version = sarama.V0_10_2_0 config.Producer.Return.Successes = true config.Net.SASL.Version = 0 config.Net.TLS.Config = &tls.Config{} config.Net.TLS.Config.InsecureSkipVerify = true // SASL_SSL配置 config.Net.SASL.Enable = true config.Net.SASL.User = "用户" config.Net.SASL.Password = "密码" config.Net.SASL.Mechanism = sarama.SASLTypePlaintext config.Net.TLS.Enable = true // 创建生产者 producer, err := sarama.NewSyncProducer(brokers, config) if err != nil { log.Println("Failed to create producer: ", err) return } defer producer.Close() // 生产消息 message := &sarama.ProducerMessage{ Topic: "my_topic", //Topic名称 Key: sarama.StringEncoder("test_key"), Value: sarama.StringEncoder("test_value"), } partition, offset, err := producer.SendMessage(message) if err != nil { log.Println("Failed to send message: ", err) return } log.Printf("Message sent successfully. Partition: %d, Offset: %d\n", partition, offset) }
编写并运行BmqConsumerDemo.go
消费消息。
使用PLAINTEXT
协议接入点地址连接 BMQ 实例时,无需鉴权。
package main import ( "fmt" "os" "os/signal" "sync" "github.com/IBM/sarama" ) func main() { brokers := []string{"xxx"} // 接入点地址 topic := "my_topic" //要消费的topic cg := "test_cg" //消费者组 config := sarama.NewConfig() config.Version = sarama.V0_10_2_0 // 1. create client var client sarama.Client var err error client, err = sarama.NewClient(brokers, config) if err != nil { fmt.Println("create sarama client error:", err) } defer func() { if err := client.Close(); err != nil { fmt.Println("client close error:", err) } else { fmt.Println("Close client!") } }() // 2. create consumer consumer, err := sarama.NewConsumerFromClient(client) defer func() { if err := consumer.Close(); err != nil { fmt.Println("consumer close error:", err) } else { fmt.Println("Close consumer!") } }() if err != nil { fmt.Println("new consumer error:", err) } // 3. offset manager offsetManager, err := sarama.NewOffsetManagerFromClient(cg, client) defer func() { if err := offsetManager.Close(); err != nil { fmt.Println("close offset manager error:", err) } else { fmt.Println("Close offset manager!") } }() if err != nil { fmt.Println("new offset manager error:", err) } if !config.Consumer.Offsets.AutoCommit.Enable { defer offsetManager.Commit() } thr := sync.WaitGroup{} signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) quit := make(chan int) // 4. start to consume partitions partitions, err := client.Partitions(topic) if err != nil { fmt.Println("get partitions error:", err) } fmt.Println("start consume. topic:", topic, "partitions:", partitions) for _, partition := range partitions { thr.Add(1) go consumePartition(quit, &thr, consumer, offsetManager, int32(partition), topic) } <-signals close(quit) thr.Wait() } func consumePartition(quit <-chan int, thr *sync.WaitGroup, consumer sarama.Consumer, offsetManager sarama.OffsetManager, partition int32, topic string) { defer thr.Done() partitionOffsetManager, err := offsetManager.ManagePartition(topic, partition) if err != nil { fmt.Println("create partitionOffsetManager err:", err) } offset, _ := partitionOffsetManager.NextOffset() if offset == -1 { offset = sarama.OffsetNewest } var partitionConsumer sarama.PartitionConsumer partitionConsumer, err = consumer.ConsumePartition(topic, partition, offset) if err != nil { fmt.Println("create partition consumer err:", err) return } defer func() { if err := partitionConsumer.Close(); err != nil { fmt.Println(err) } else { fmt.Println("Close partition", partition) } }() for { select { case msg := <-partitionConsumer.Messages(): fmt.Println("[Consume] topic:", msg.Topic, "parition:", msg.Partition, "offset:", msg.Offset, "key:", msg.Key, "value:", msg.Value) partitionOffsetManager.MarkOffset(msg.Offset+1, "") case err := <-partitionConsumer.Errors(): fmt.Println("consume error:", err) case <-quit: return } } }
通过 SASL 用户名和密码进行鉴权。您需要获取有权限用户的名称和密码,如何获取请参见查看 SASL 用户密码。
package main import ( "fmt" "os" "os/signal" "sync" "github.com/IBM/sarama" ) func main() { brokers := []string{"xxx"} // 接入点地址 topic := "my_topic" //要消费的topic cg := "test_cg" //消费者组 config := sarama.NewConfig() config.Version = sarama.V0_10_2_0 // SASL_PLAINTEXT配置 config.Net.SASL.Enable = true config.Net.SASL.User = "用户名" config.Net.SASL.Password = "密码" config.Net.SASL.Mechanism = sarama.SASLTypePlaintext config.Net.SASL.Version = 0 // 1. create client var client sarama.Client var err error client, err = sarama.NewClient(brokers, config) if err != nil { fmt.Println("create sarama client error:", err) } defer func() { if err := client.Close(); err != nil { fmt.Println("client close error:", err) } else { fmt.Println("Close client!") } }() // 2. create consumer consumer, err := sarama.NewConsumerFromClient(client) defer func() { if err := consumer.Close(); err != nil { fmt.Println("consumer close error:", err) } else { fmt.Println("Close consumer!") } }() if err != nil { fmt.Println("new consumer error:", err) } // 3. offset manager offsetManager, err := sarama.NewOffsetManagerFromClient(cg, client) defer func() { if err := offsetManager.Close(); err != nil { fmt.Println("close offset manager error:", err) } else { fmt.Println("Close offset manager!") } }() if err != nil { fmt.Println("new offset manager error:", err) } if !config.Consumer.Offsets.AutoCommit.Enable { defer offsetManager.Commit() } thr := sync.WaitGroup{} signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) quit := make(chan int) // 4. start to consume partitions partitions, err := client.Partitions(topic) if err != nil { fmt.Println("get partitions error:", err) } fmt.Println("start consume. topic:", topic, "partitions:", partitions) for _, partition := range partitions { thr.Add(1) go consumePartition(quit, &thr, consumer, offsetManager, int32(partition), topic) } <-signals close(quit) thr.Wait() } func consumePartition(quit <-chan int, thr *sync.WaitGroup, consumer sarama.Consumer, offsetManager sarama.OffsetManager, partition int32, topic string) { defer thr.Done() partitionOffsetManager, err := offsetManager.ManagePartition(topic, partition) if err != nil { fmt.Println("create partitionOffsetManager err:", err) } offset, _ := partitionOffsetManager.NextOffset() if offset == -1 { offset = sarama.OffsetNewest } var partitionConsumer sarama.PartitionConsumer partitionConsumer, err = consumer.ConsumePartition(topic, partition, offset) if err != nil { fmt.Println("create partition consumer err:", err) return } defer func() { if err := partitionConsumer.Close(); err != nil { fmt.Println(err) } else { fmt.Println("Close partition", partition) } }() for { select { case msg := <-partitionConsumer.Messages(): fmt.Println("[Consume] topic:", msg.Topic, "parition:", msg.Partition, "offset:", msg.Offset, "key:", msg.Key, "value:", msg.Value) partitionOffsetManager.MarkOffset(msg.Offset+1, "") case err := <-partitionConsumer.Errors(): fmt.Println("consume error:", err) case <-quit: return } } }
通过 SASL 用户名和密码进行鉴权。您需要获取有权限用户的名称和密码,如何获取请参见查看 SASL 用户密码。
package main import ( "fmt" "os" "os/signal" "sync" "github.com/IBM/sarama" ) func main() { brokers := []string{"xxx"} // 接入点地址 topic := "my_topic" //要消费的topic cg := "test_cg" //消费者组 config := sarama.NewConfig() config.Version = sarama.V0_10_2_0 // SASL_SSL配置 config.Net.SASL.Enable = true config.Net.SASL.User = "用户" config.Net.SASL.Password = "密码" config.Net.SASL.Mechanism = sarama.SASLTypePlaintext config.Net.TLS.Enable = true config.Net.SASL.Version = 0 config.Net.TLS.Config = &tls.Config{} config.Net.TLS.Config.InsecureSkipVerify = true // 1. create client var client sarama.Client var err error client, err = sarama.NewClient(brokers, config) if err != nil { fmt.Println("create sarama client error:", err) } defer func() { if err := client.Close(); err != nil { fmt.Println("client close error:", err) } else { fmt.Println("Close client!") } }() // 2. create consumer consumer, err := sarama.NewConsumerFromClient(client) defer func() { if err := consumer.Close(); err != nil { fmt.Println("consumer close error:", err) } else { fmt.Println("Close consumer!") } }() if err != nil { fmt.Println("new consumer error:", err) } // 3. offset manager offsetManager, err := sarama.NewOffsetManagerFromClient(cg, client) defer func() { if err := offsetManager.Close(); err != nil { fmt.Println("close offset manager error:", err) } else { fmt.Println("Close offset manager!") } }() if err != nil { fmt.Println("new offset manager error:", err) } if !config.Consumer.Offsets.AutoCommit.Enable { defer offsetManager.Commit() } thr := sync.WaitGroup{} signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) quit := make(chan int) // 4. start to consume partitions partitions, err := client.Partitions(topic) if err != nil { fmt.Println("get partitions error:", err) } fmt.Println("start consume. topic:", topic, "partitions:", partitions) for _, partition := range partitions { thr.Add(1) go consumePartition(quit, &thr, consumer, offsetManager, int32(partition), topic) } <-signals close(quit) thr.Wait() } func consumePartition(quit <-chan int, thr *sync.WaitGroup, consumer sarama.Consumer, offsetManager sarama.OffsetManager, partition int32, topic string) { defer thr.Done() partitionOffsetManager, err := offsetManager.ManagePartition(topic, partition) if err != nil { fmt.Println("create partitionOffsetManager err:", err) } offset, _ := partitionOffsetManager.NextOffset() if offset == -1 { offset = sarama.OffsetNewest } var partitionConsumer sarama.PartitionConsumer partitionConsumer, err = consumer.ConsumePartition(topic, partition, offset) if err != nil { fmt.Println("create partition consumer err:", err) return } defer func() { if err := partitionConsumer.Close(); err != nil { fmt.Println(err) } else { fmt.Println("Close partition", partition) } }() for { select { case msg := <-partitionConsumer.Messages(): fmt.Println("[Consume] topic:", msg.Topic, "parition:", msg.Partition, "offset:", msg.Offset, "key:", msg.Key, "value:", msg.Value) partitionOffsetManager.MarkOffset(msg.Offset+1, "") case err := <-partitionConsumer.Errors(): fmt.Println("consume error:", err) case <-quit: return } } }