You need to enable JavaScript to run this app.
导航
Go SDK
最近更新时间:2024.08.14 17:53:35首次发布时间:2024.03.25 11:24:05

本文介绍如何通过 Go SDK 接入云原生消息引擎 BMQ 并收发消息。

背景信息

您可以直接使用开源 SDK sarama 进行收发消息。如需了解详细信息,请参见sarama

前提条件

  • 创建资源实例,并获取接入点地址,请参见管理资源池
  • 您需要提前在实例所属安全组中放开 9092 端口。具体操作,请参见添加安全组访问规则
  • (可选)您如果需要通过 SASL 用户名和密码进行鉴权,还需提前创建用户并获取密码。具体操作,请参见创建 SASL 用户

发送消息

编写并运行BmqProducerDemo.go发送消息。

PLAINTEXT

使用PLAINTEXT协议接入点地址连接 BMQ 实例时,无需鉴权。

import (
        "log"

        "github.com/IBM/sarama"
)

func main() {
        brokers := []string{"xxx"} // 接入点地址

        config := sarama.NewConfig()
        config.Version = sarama.V0_10_2_0
        config.Producer.Return.Successes = true

        // 创建生产者
        producer, err := sarama.NewSyncProducer(brokers, config)
        if err != nil {
                log.Println("Failed to create producer: ", err)
                return
        }
        defer producer.Close()

        // 生产消息
        message := &sarama.ProducerMessage{
                Topic: "my_topic", //Topic名称
                Key:   sarama.StringEncoder("test_key"),
                Value: sarama.StringEncoder("test_value"),
        }

        partition, offset, err := producer.SendMessage(message)
        if err != nil {
                log.Println("Failed to send message: ", err)
                return
        }

        log.Printf("Message sent successfully. Partition: %d, Offset: %d\n", partition, offset)
}

SASL_PLAINTEXT

通过 SASL 用户名和密码进行鉴权。您需要获取有权限用户的名称和密码,如何获取请参见查看 SASL 用户密码

import (
        "log"

        "github.com/IBM/sarama"
)

func main() {
        brokers := []string{"xxx"} // 接入点地址

        config := sarama.NewConfig()
        config.Version = sarama.V0_10_2_0
        config.Producer.Return.Successes = true
        
        // SASL_PLAINTEXT配置
        config.Net.SASL.Enable = true
        config.Net.SASL.User = "用户名"
        config.Net.SASL.Password = "密码"
        config.Net.SASL.Mechanism = sarama.SASLTypePlaintext
        config.Net.SASL.Version = 0

        // 创建生产者
        producer, err := sarama.NewSyncProducer(brokers, config)
        if err != nil {
                log.Println("Failed to create producer: ", err)
                return
        }
        defer producer.Close()

        // 生产消息
        message := &sarama.ProducerMessage{
                Topic: "my_topic", //Topic名称
                Key:   sarama.StringEncoder("test_key"),
                Value: sarama.StringEncoder("test_value"),
        }

        partition, offset, err := producer.SendMessage(message)
        if err != nil {
                log.Println("Failed to send message: ", err)
                return
        }

        log.Printf("Message sent successfully. Partition: %d, Offset: %d\n", partition, offset)
}

SASL_SSL

通过 SASL 用户名和密码进行鉴权。您需要获取有权限用户的名称和密码,如何获取请参见查看 SASL 用户密码

import (
        "log"

        "github.com/IBM/sarama"
)

func main() {
        brokers := []string{"xxx"} // 接入点地址

        config := sarama.NewConfig()
        config.Version = sarama.V0_10_2_0
        config.Producer.Return.Successes = true
        config.Net.SASL.Version = 0
        config.Net.TLS.Config = &tls.Config{}
        config.Net.TLS.Config.InsecureSkipVerify = true
        
        // SASL_SSL配置
        config.Net.SASL.Enable = true
        config.Net.SASL.User = "用户"
        config.Net.SASL.Password = "密码"
        config.Net.SASL.Mechanism = sarama.SASLTypePlaintext
        config.Net.TLS.Enable = true

        // 创建生产者
        producer, err := sarama.NewSyncProducer(brokers, config)
        if err != nil {
                log.Println("Failed to create producer: ", err)
                return
        }
        defer producer.Close()

        // 生产消息
        message := &sarama.ProducerMessage{
                Topic: "my_topic", //Topic名称
                Key:   sarama.StringEncoder("test_key"),
                Value: sarama.StringEncoder("test_value"),
        }

        partition, offset, err := producer.SendMessage(message)
        if err != nil {
                log.Println("Failed to send message: ", err)
                return
        }

        log.Printf("Message sent successfully. Partition: %d, Offset: %d\n", partition, offset)
}

消费消息

编写并运行BmqConsumerDemo.go消费消息。

PLAINTEXT

使用PLAINTEXT协议接入点地址连接 BMQ 实例时,无需鉴权。

package main

import (
        "fmt"
        "os"
        "os/signal"
        "sync"

        "github.com/IBM/sarama"
)

func main() {
        brokers := []string{"xxx"} // 接入点地址
        topic := "my_topic"                    //要消费的topic
        cg := "test_cg"                       //消费者组

        config := sarama.NewConfig()
        config.Version = sarama.V0_10_2_0
        
        // 1. create client
        var client sarama.Client
        var err error
        client, err = sarama.NewClient(brokers, config)
        if err != nil {
                fmt.Println("create sarama client error:", err)
        }
        defer func() {
                if err := client.Close(); err != nil {
                        fmt.Println("client close error:", err)
                } else {
                        fmt.Println("Close client!")
                }
        }()

        // 2. create consumer
        consumer, err := sarama.NewConsumerFromClient(client)
        defer func() {
                if err := consumer.Close(); err != nil {
                        fmt.Println("consumer close error:", err)
                } else {
                        fmt.Println("Close consumer!")
                }
        }()
        if err != nil {
                fmt.Println("new consumer error:", err)
        }

        // 3. offset manager
        offsetManager, err := sarama.NewOffsetManagerFromClient(cg, client)
        defer func() {
                if err := offsetManager.Close(); err != nil {
                        fmt.Println("close offset manager error:", err)
                } else {
                        fmt.Println("Close offset manager!")
                }
        }()
        if err != nil {
                fmt.Println("new offset manager error:", err)
        }

        if !config.Consumer.Offsets.AutoCommit.Enable {
                defer offsetManager.Commit()
        }

        thr := sync.WaitGroup{}
        signals := make(chan os.Signal, 1)
        signal.Notify(signals, os.Interrupt)
        quit := make(chan int)

        // 4. start to consume partitions
        partitions, err := client.Partitions(topic)
        if err != nil {
                fmt.Println("get partitions error:", err)
        }
        fmt.Println("start consume. topic:", topic, "partitions:", partitions)
        for _, partition := range partitions {
                thr.Add(1)
                go consumePartition(quit, &thr, consumer, offsetManager, int32(partition), topic)
        }

        <-signals
        close(quit)
        thr.Wait()
}

func consumePartition(quit <-chan int, thr *sync.WaitGroup, consumer sarama.Consumer,
        offsetManager sarama.OffsetManager, partition int32, topic string) {
        defer thr.Done()

        partitionOffsetManager, err := offsetManager.ManagePartition(topic, partition)
        if err != nil {
                fmt.Println("create partitionOffsetManager err:", err)
        }
        offset, _ := partitionOffsetManager.NextOffset()
        if offset == -1 {
                offset = sarama.OffsetNewest
        }

        var partitionConsumer sarama.PartitionConsumer
        partitionConsumer, err = consumer.ConsumePartition(topic, partition, offset)
        if err != nil {
                fmt.Println("create partition consumer err:", err)
                return
        }
        defer func() {
                if err := partitionConsumer.Close(); err != nil {
                        fmt.Println(err)
                } else {
                        fmt.Println("Close partition", partition)
                }
        }()
        for {
                select {
                case msg := <-partitionConsumer.Messages():
                        fmt.Println("[Consume] topic:",
                                msg.Topic, "parition:", msg.Partition, "offset:", msg.Offset, "key:", msg.Key, "value:", msg.Value)
                        partitionOffsetManager.MarkOffset(msg.Offset+1, "")
                case err := <-partitionConsumer.Errors():
                        fmt.Println("consume error:", err)
                case <-quit:
                        return
                }
        }
}

SASL_PLAINTEXT

通过 SASL 用户名和密码进行鉴权。您需要获取有权限用户的名称和密码,如何获取请参见查看 SASL 用户密码

package main

import (
        "fmt"
        "os"
        "os/signal"
        "sync"

        "github.com/IBM/sarama"
)

func main() {
        brokers := []string{"xxx"} // 接入点地址
        topic := "my_topic"                    //要消费的topic
        cg := "test_cg"                       //消费者组

        config := sarama.NewConfig()
        config.Version = sarama.V0_10_2_0
        
        // SASL_PLAINTEXT配置
        config.Net.SASL.Enable = true
        config.Net.SASL.User = "用户名"
        config.Net.SASL.Password = "密码"
        config.Net.SASL.Mechanism = sarama.SASLTypePlaintext
        config.Net.SASL.Version = 0
        
        // 1. create client
        var client sarama.Client
        var err error
        client, err = sarama.NewClient(brokers, config)
        if err != nil {
                fmt.Println("create sarama client error:", err)
        }
        defer func() {
                if err := client.Close(); err != nil {
                        fmt.Println("client close error:", err)
                } else {
                        fmt.Println("Close client!")
                }
        }()

        // 2. create consumer
        consumer, err := sarama.NewConsumerFromClient(client)
        defer func() {
                if err := consumer.Close(); err != nil {
                        fmt.Println("consumer close error:", err)
                } else {
                        fmt.Println("Close consumer!")
                }
        }()
        if err != nil {
                fmt.Println("new consumer error:", err)
        }

        // 3. offset manager
        offsetManager, err := sarama.NewOffsetManagerFromClient(cg, client)
        defer func() {
                if err := offsetManager.Close(); err != nil {
                        fmt.Println("close offset manager error:", err)
                } else {
                        fmt.Println("Close offset manager!")
                }
        }()
        if err != nil {
                fmt.Println("new offset manager error:", err)
        }

        if !config.Consumer.Offsets.AutoCommit.Enable {
                defer offsetManager.Commit()
        }

        thr := sync.WaitGroup{}
        signals := make(chan os.Signal, 1)
        signal.Notify(signals, os.Interrupt)
        quit := make(chan int)

        // 4. start to consume partitions
        partitions, err := client.Partitions(topic)
        if err != nil {
                fmt.Println("get partitions error:", err)
        }
        fmt.Println("start consume. topic:", topic, "partitions:", partitions)
        for _, partition := range partitions {
                thr.Add(1)
                go consumePartition(quit, &thr, consumer, offsetManager, int32(partition), topic)
        }

        <-signals
        close(quit)
        thr.Wait()
}

func consumePartition(quit <-chan int, thr *sync.WaitGroup, consumer sarama.Consumer,
        offsetManager sarama.OffsetManager, partition int32, topic string) {
        defer thr.Done()

        partitionOffsetManager, err := offsetManager.ManagePartition(topic, partition)
        if err != nil {
                fmt.Println("create partitionOffsetManager err:", err)
        }
        offset, _ := partitionOffsetManager.NextOffset()
        if offset == -1 {
                offset = sarama.OffsetNewest
        }

        var partitionConsumer sarama.PartitionConsumer
        partitionConsumer, err = consumer.ConsumePartition(topic, partition, offset)
        if err != nil {
                fmt.Println("create partition consumer err:", err)
                return
        }
        defer func() {
                if err := partitionConsumer.Close(); err != nil {
                        fmt.Println(err)
                } else {
                        fmt.Println("Close partition", partition)
                }
        }()
        for {
                select {
                case msg := <-partitionConsumer.Messages():
                        fmt.Println("[Consume] topic:",
                                msg.Topic, "parition:", msg.Partition, "offset:", msg.Offset, "key:", msg.Key, "value:", msg.Value)
                        partitionOffsetManager.MarkOffset(msg.Offset+1, "")
                case err := <-partitionConsumer.Errors():
                        fmt.Println("consume error:", err)
                case <-quit:
                        return
                }
        }
}

SASL_SSL

通过 SASL 用户名和密码进行鉴权。您需要获取有权限用户的名称和密码,如何获取请参见查看 SASL 用户密码

package main

import (
        "fmt"
        "os"
        "os/signal"
        "sync"

        "github.com/IBM/sarama"
)

func main() {
        brokers := []string{"xxx"} // 接入点地址
        topic := "my_topic"                    //要消费的topic
        cg := "test_cg"                       //消费者组

        config := sarama.NewConfig()
        config.Version = sarama.V0_10_2_0
        
        // SASL_SSL配置
        config.Net.SASL.Enable = true
        config.Net.SASL.User = "用户"
        config.Net.SASL.Password = "密码"
        config.Net.SASL.Mechanism = sarama.SASLTypePlaintext
        config.Net.TLS.Enable = true
        config.Net.SASL.Version = 0
        config.Net.TLS.Config = &tls.Config{}
        config.Net.TLS.Config.InsecureSkipVerify = true
        
        // 1. create client
        var client sarama.Client
        var err error
        client, err = sarama.NewClient(brokers, config)
        if err != nil {
                fmt.Println("create sarama client error:", err)
        }
        defer func() {
                if err := client.Close(); err != nil {
                        fmt.Println("client close error:", err)
                } else {
                        fmt.Println("Close client!")
                }
        }()

        // 2. create consumer
        consumer, err := sarama.NewConsumerFromClient(client)
        defer func() {
                if err := consumer.Close(); err != nil {
                        fmt.Println("consumer close error:", err)
                } else {
                        fmt.Println("Close consumer!")
                }
        }()
        if err != nil {
                fmt.Println("new consumer error:", err)
        }

        // 3. offset manager
        offsetManager, err := sarama.NewOffsetManagerFromClient(cg, client)
        defer func() {
                if err := offsetManager.Close(); err != nil {
                        fmt.Println("close offset manager error:", err)
                } else {
                        fmt.Println("Close offset manager!")
                }
        }()
        if err != nil {
                fmt.Println("new offset manager error:", err)
        }

        if !config.Consumer.Offsets.AutoCommit.Enable {
                defer offsetManager.Commit()
        }

        thr := sync.WaitGroup{}
        signals := make(chan os.Signal, 1)
        signal.Notify(signals, os.Interrupt)
        quit := make(chan int)

        // 4. start to consume partitions
        partitions, err := client.Partitions(topic)
        if err != nil {
                fmt.Println("get partitions error:", err)
        }
        fmt.Println("start consume. topic:", topic, "partitions:", partitions)
        for _, partition := range partitions {
                thr.Add(1)
                go consumePartition(quit, &thr, consumer, offsetManager, int32(partition), topic)
        }

        <-signals
        close(quit)
        thr.Wait()
}

func consumePartition(quit <-chan int, thr *sync.WaitGroup, consumer sarama.Consumer,
        offsetManager sarama.OffsetManager, partition int32, topic string) {
        defer thr.Done()

        partitionOffsetManager, err := offsetManager.ManagePartition(topic, partition)
        if err != nil {
                fmt.Println("create partitionOffsetManager err:", err)
        }
        offset, _ := partitionOffsetManager.NextOffset()
        if offset == -1 {
                offset = sarama.OffsetNewest
        }

        var partitionConsumer sarama.PartitionConsumer
        partitionConsumer, err = consumer.ConsumePartition(topic, partition, offset)
        if err != nil {
                fmt.Println("create partition consumer err:", err)
                return
        }
        defer func() {
                if err := partitionConsumer.Close(); err != nil {
                        fmt.Println(err)
                } else {
                        fmt.Println("Close partition", partition)
                }
        }()
        for {
                select {
                case msg := <-partitionConsumer.Messages():
                        fmt.Println("[Consume] topic:",
                                msg.Topic, "parition:", msg.Partition, "offset:", msg.Offset, "key:", msg.Key, "value:", msg.Value)
                        partitionOffsetManager.MarkOffset(msg.Offset+1, "")
                case err := <-partitionConsumer.Errors():
                        fmt.Println("consume error:", err)
                case <-quit:
                        return
                }
        }
}