You need to enable JavaScript to run this app.
导航
普通消息
最近更新时间:2023.10.31 11:56:19首次发布时间:2022.01.28 15:16:40

火山引擎消息队列 RocketMQ版提供同步发送、异步发送和单向(Oneway)发送三种方式来发送普通消息。本文介绍如何通过不同方式发送普通消息。

前提条件

发送方式

火山引擎消息队列 RocketMQ版提供的普通消息发送方式包括以下三种,您可以根据业务要求选择合适的发送方式。

同步发送

异步发送

单向发送

发送方式

消息发送方发出一条消息到服务端,服务端接收并处理消息,然后返回响应给发送方,发送方收到响应后才会发送下一条消息。

消息发送方发出一条消息后,不等服务端返回响应,直接发送下一条消息。

发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。

应用场景

重要通知邮件、报名短信通知、营销短信系统等。

一般用于链路耗时较长,对响应时间较为敏感的业务场景,例如,您视频上传后通知启动转码服务,转码完成后通知推送转码结果等。

适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。

TPS

最快

是否反馈发送结果

反馈

反馈

不反馈

可靠性

不丢失消息

不丢失消息

可能丢失消息

同步发送

同步发送是指消息发送方发出一条消息后,会在收到服务端返回响应之后才发下一条消息的通讯方式。一般用于较为重要的消息发送场景。
同步发送方式发送普通消息的示例代码如下。

import (
   "context"
   "fmt"
   "os"
   "strconv"

   "github.com/apache/rocketmq-client-go/v2"
   "github.com/apache/rocketmq-client-go/v2/primitive"
   "github.com/apache/rocketmq-client-go/v2/producer"
)

func main() {
   p, _ := rocketmq.NewProducer(
      // 配置实例的接入点信息
      producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"http://rocketmq-cnoea09856be****.rocketmq.volces.com:9876"})),
      producer.WithRetry(2),
      // 配置RocketMQ实例的AccessKey ID和AccessKey Secret。
      producer.WithCredentials(primitive.Credentials{
          AccessKey: "ACCCESSKEY",
          SecretKey: "SECRETKEY",
      }),
   )
   err := p.Start()
   if err != nil {
      fmt.Printf("start producer error: %s", err.Error())
      os.Exit(1)
   }
   
   // 配置消息写入的Topic名称。对于实例 ID 格式为 MQ_INST_xxxx 的实例,此处配置的格式为${实例ID%TopicID},例如 "MQ_INST_****%testTopic"。详细说明请参考https://www.volcengine.com/docs/6410/153010#注意事项。
   topic := "testTopic"

   for i := 0; i < 10; i++ {
      msg := &primitive.Message{
         Topic: topic,
         Body:  []byte("Hello RocketMQ Go Client! " + strconv.Itoa(i)),
      }
      
      // 使用同步方式发送消息
      res, err := p.SendSync(context.Background(), msg)

      if err != nil {
         fmt.Printf("send message error: %s\n", err)
      } else {
         fmt.Printf("send message success: result=%s\n", res.String())
      }
   }
   err = p.Shutdown()
   if err != nil {
      fmt.Printf("shutdown producer error: %s", err.Error())
   }
}

异步发送

异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。
火山引擎消息队列 RocketMQ版的异步发送,需要实现异步发送回调接口(SendCallback)。消息发送方在发送了一条消息后,不需要等待服务端响应即可发送第二条消息。发送方通过回调接口接收服务端响应,并处理响应结果。
异步发送的示例代码如下。

import (
   "context"
   "fmt"
   "os"
   "sync"

   "github.com/apache/rocketmq-client-go/v2"
   "github.com/apache/rocketmq-client-go/v2/primitive"
   "github.com/apache/rocketmq-client-go/v2/producer"
)

func main() {
   p, _ := rocketmq.NewProducer(
      // 配置接入点信息
      producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"http://rocketmq-cnoea09856be****.rocketmq.volces.com:9876"})),
      producer.WithRetry(2),
      // 配置RocketMQ实例的AccessKey ID和AccessKey Secret。
      producer.WithCredentials(primitive.Credentials{
          AccessKey: "ACCCESSKEY",
          SecretKey: "SECRETKEY",
      }),
   )

   err := p.Start()
   if err != nil {
      fmt.Printf("start producer error: %s", err.Error())
      os.Exit(1)
   }
   
   // 使用异步方式发送消息
   var wg sync.WaitGroup
   for i := 0; i < 30; i++ {
      wg.Add(1)
      err := p.SendAsync(context.TODO(),
         // 异步发送消息的回调
         // result 中获取发送的结果
         // error 获取发送中的异常
         func(ctx context.Context, result *primitive.SendResult, e error) {
            if e != nil {
               fmt.Printf("send message error: %s\n", err)
            } else {
               fmt.Printf("send message success: result=%s\n", result.String())
            }
            wg.Done()
            // 对于实例 ID 格式为 MQ_INST_xxxx 的实例,此处配置的格式为${实例ID%TopicID},例如 "MQ_INST_****%testTopic"。详细说明请参考https://www.volcengine.com/docs/6410/153010#注意事项。
         }, primitive.NewMessage("testTopic", []byte("Hello World")))

      if err != nil {
         fmt.Printf("send message error: %s\n", err)
      }
   }
   wg.Wait()
   err = p.Shutdown()
   if err != nil {
      fmt.Printf("shutdown producer error: %s", err.Error())
   }
}

单向发送

发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。
单向发送适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集等场景。
单向发送的示例代码如下。

import (
   "context"
   "fmt"
   "os"
   "strconv"

   "github.com/apache/rocketmq-client-go/v2"
   "github.com/apache/rocketmq-client-go/v2/primitive"
   "github.com/apache/rocketmq-client-go/v2/producer"
)

func main() {
   p, _ := rocketmq.NewProducer(
      // 配置实例的接入点信息
      producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"http://rocketmq-cnoea09856be****.rocketmq.volces.com:9876"})),
      producer.WithRetry(2),
      // 配置RocketMQ实例的AccessKey ID和AccessKey Secret。
      producer.WithCredentials(primitive.Credentials{
          AccessKey: "ACCCESSKEY",
          SecretKey: "SECRETKEY",
      }),
   )
   err := p.Start()
   if err != nil {
      fmt.Printf("start producer error: %s", err.Error())
      os.Exit(1)
   }
   
   // 配置消息写入的Topic名称。对于实例 ID 格式为 MQ_INST_xxxx 的实例,此处配置的格式为${实例ID%TopicID},例如 "MQ_INST_****%testTopic"。详细说明请参考https://www.volcengine.com/docs/6410/153010#注意事项。
   topic := "test"

   for i := 0; i < 10; i++ {
      msg := &primitive.Message{
         Topic: topic,
         Body:  []byte("Hello RocketMQ Go Client! " + strconv.Itoa(i)),
      }
      
      // 使用单向方式发送消息
      err := p.SendOneWay(context.Background(), msg)

      if err != nil {
         fmt.Printf("send message error: %s\n", err)
      } else {
         fmt.Printf("send message success: result=%s\n", res.String())
      }
   }
   err = p.Shutdown()
   if err != nil {
      fmt.Printf("shutdown producer error: %s", err.Error())
   }
}

订阅普通消息

集群模式消费

集群模式消费的示例代码如下。

import (
   "context"
   "fmt"
   "github.com/apache/rocketmq-client-go/v2"
   "github.com/apache/rocketmq-client-go/v2/consumer"
   "github.com/apache/rocketmq-client-go/v2/primitive"
   "os"
)

func main() {
   c, _ := rocketmq.NewPushConsumer(
      // 配置使用的group信息,对于实例 ID 格式为 MQ_INST_xxxx 的实例,此处配置的格式为${实例ID%GroupID},例如 "MQ_INST_****%demo-group"。详细说明请参考https://www.volcengine.com/docs/6410/153010#注意事项。
      consumer.WithGroupName("demo-group"),
      consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{
         "http://rocketmq-cnoea09856be****.rocketmq.volces.com:9876",
      })),
      // 配置RocketMQ实例的AccessKey ID和AccessKey Secret。
      consumer.WithCredentials(primitive.Credentials{
          AccessKey: "ACCESS KEY",
          SecretKey: "SECRETKEY",
      }),
   )
   
   // 配置订阅的group信息,和处理消息的回调,对于实例 ID 格式为 MQ_INST_xxxx 的实例,此处配置的格式为${实例ID%GroupID},例如 "MQ_INST_****%demo-group"。详细说明请参考https://www.volcengine.com/docs/6410/153010#注意事项。
   err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context,
      msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
      for i := range msgs {
         fmt.Printf("subscribe callback: %v \n", msgs[i])
      }
      // 如果消息处理成功则返回consumer.ConsumeSuccess
      // 如果消息处理失败则返回consumer.ConsumeRetryLater
      return consumer.ConsumeSuccess, nil
   })
   if err != nil {
      fmt.Println(err.Error())
   }
   // Note: start after subscribe
   err = c.Start()
   if err != nil {
      fmt.Println(err.Error())
      os.Exit(-1)
   }

   waitChan := make(chan interface{}, 0)
   <-waitChan

}

广播模式消费

广播模式消费的示例代码如下。

import (
   "context"
   "fmt"
   "os"
   "time"

   "github.com/apache/rocketmq-client-go/v2"
   "github.com/apache/rocketmq-client-go/v2/consumer"
   "github.com/apache/rocketmq-client-go/v2/primitive"
)

func main() {
   c, _ := rocketmq.NewPushConsumer(
      // 配置使用的group信息,对于实例 ID 格式为 MQ_INST_xxxx 的实例,此处配置的格式为${实例ID%GroupID},例如 "MQ_INST_****%demo-group"。详细说明请参考https://www.volcengine.com/docs/6410/153010#注意事项。
      consumer.WithGroupName("demo-group"),
      // 配置使用的实例接入点
      consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{
         "http://rocketmq-cnoea09856be****.rocketmq.volces.com:9876",
      })),
      // 配置RocketMQ实例的AccessKey ID和AccessKey Secret。
      consumer.WithCredentials(primitive.Credentials{
          AccessKey: "ACCESS KEY",
          SecretKey: "SECRETKEY",
      }),
      // 配置使用的消费模型:配置为广播模式
      consumer.WithConsumerModel(consumer.BroadCasting),
   )
   
   // 配置处理消息的回调,于实例 ID 格式为 MQ_INST_xxxx 的实例,此处配置的格式为${实例ID%TopicID},例如 "MQ_INST_****%testTopic"。详细说明请参考https://www.volcengine.com/docs/6410/153010#注意事项。
   err := c.Subscribe("testTopic", consumer.MessageSelector{}, func(ctx context.Context,
      msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
      fmt.Printf("subscribe callback: %v \n", msgs)
      return consumer.ConsumeSuccess, nil
   })
   if err != nil {
      fmt.Println(err.Error())
   }
   // Note: start after subscribe
   err = c.Start()
   if err != nil {
      fmt.Println(err.Error())
      os.Exit(-1)
   }
   time.Sleep(time.Hour)
   err = c.Shutdown()
   if err != nil {
      fmt.Printf("Shutdown Consumer error: %s", err.Error())
   }
}