火山引擎消息队列 RocketMQ版提供同步发送、异步发送和单向(Oneway)发送三种方式来发送普通消息。本文介绍如何通过不同方式发送普通消息。
火山引擎消息队列 RocketMQ版提供的普通消息发送方式包括以下三种,您可以根据业务要求选择合适的发送方式。
同步发送 | 异步发送 | 单向发送 | |
---|---|---|---|
发送方式 | 消息发送方发出一条消息到服务端,服务端接收并处理消息,然后返回响应给发送方,发送方收到响应后才会发送下一条消息。 | 消息发送方发出一条消息后,不等服务端返回响应,直接发送下一条消息。 | 发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。 |
应用场景 | 重要通知邮件、报名短信通知、营销短信系统等。 | 一般用于链路耗时较长,对响应时间较为敏感的业务场景,例如,您视频上传后通知启动转码服务,转码完成后通知推送转码结果等。 | 适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。 |
TPS | 快 | 快 | 最快 |
是否反馈发送结果 | 反馈 | 反馈 | 不反馈 |
可靠性 | 不丢失消息 | 不丢失消息 | 可能丢失消息 |
同步发送是指消息发送方发出一条消息后,会在收到服务端返回响应之后才发下一条消息的通讯方式。一般用于较为重要的消息发送场景。
同步发送方式发送普通消息的示例代码如下。
import ( "context" "fmt" "os" "strconv" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/producer" ) func main() { p, _ := rocketmq.NewProducer( // 配置实例的接入点信息 producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"http://rocketmq-cnoea09856be****.rocketmq.volces.com:9876"})), producer.WithRetry(2), // 配置RocketMQ实例的AccessKey ID和AccessKey Secret。 producer.WithCredentials(primitive.Credentials{ AccessKey: "ACCCESSKEY", SecretKey: "SECRETKEY", }), ) err := p.Start() if err != nil { fmt.Printf("start producer error: %s", err.Error()) os.Exit(1) } // 配置消息写入的Topic名称。对于实例 ID 格式为 MQ_INST_xxxx 的实例,此处配置的格式为${实例ID%TopicID},例如 "MQ_INST_****%testTopic"。详细说明请参考https://www.volcengine.com/docs/6410/153010#注意事项。 topic := "testTopic" for i := 0; i < 10; i++ { msg := &primitive.Message{ Topic: topic, Body: []byte("Hello RocketMQ Go Client! " + strconv.Itoa(i)), } // 使用同步方式发送消息 res, err := p.SendSync(context.Background(), msg) if err != nil { fmt.Printf("send message error: %s\n", err) } else { fmt.Printf("send message success: result=%s\n", res.String()) } } err = p.Shutdown() if err != nil { fmt.Printf("shutdown producer error: %s", err.Error()) } }
异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。
火山引擎消息队列 RocketMQ版的异步发送,需要实现异步发送回调接口(SendCallback)。消息发送方在发送了一条消息后,不需要等待服务端响应即可发送第二条消息。发送方通过回调接口接收服务端响应,并处理响应结果。
异步发送的示例代码如下。
import ( "context" "fmt" "os" "sync" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/producer" ) func main() { p, _ := rocketmq.NewProducer( // 配置接入点信息 producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"http://rocketmq-cnoea09856be****.rocketmq.volces.com:9876"})), producer.WithRetry(2), // 配置RocketMQ实例的AccessKey ID和AccessKey Secret。 producer.WithCredentials(primitive.Credentials{ AccessKey: "ACCCESSKEY", SecretKey: "SECRETKEY", }), ) err := p.Start() if err != nil { fmt.Printf("start producer error: %s", err.Error()) os.Exit(1) } // 使用异步方式发送消息 var wg sync.WaitGroup for i := 0; i < 30; i++ { wg.Add(1) err := p.SendAsync(context.TODO(), // 异步发送消息的回调 // result 中获取发送的结果 // error 获取发送中的异常 func(ctx context.Context, result *primitive.SendResult, e error) { if e != nil { fmt.Printf("send message error: %s\n", err) } else { fmt.Printf("send message success: result=%s\n", result.String()) } wg.Done() // 对于实例 ID 格式为 MQ_INST_xxxx 的实例,此处配置的格式为${实例ID%TopicID},例如 "MQ_INST_****%testTopic"。详细说明请参考https://www.volcengine.com/docs/6410/153010#注意事项。 }, primitive.NewMessage("testTopic", []byte("Hello World"))) if err != nil { fmt.Printf("send message error: %s\n", err) } } wg.Wait() err = p.Shutdown() if err != nil { fmt.Printf("shutdown producer error: %s", err.Error()) } }
发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。
单向发送适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集等场景。
单向发送的示例代码如下。
import ( "context" "fmt" "os" "strconv" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/producer" ) func main() { p, _ := rocketmq.NewProducer( // 配置实例的接入点信息 producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"http://rocketmq-cnoea09856be****.rocketmq.volces.com:9876"})), producer.WithRetry(2), // 配置RocketMQ实例的AccessKey ID和AccessKey Secret。 producer.WithCredentials(primitive.Credentials{ AccessKey: "ACCCESSKEY", SecretKey: "SECRETKEY", }), ) err := p.Start() if err != nil { fmt.Printf("start producer error: %s", err.Error()) os.Exit(1) } // 配置消息写入的Topic名称。对于实例 ID 格式为 MQ_INST_xxxx 的实例,此处配置的格式为${实例ID%TopicID},例如 "MQ_INST_****%testTopic"。详细说明请参考https://www.volcengine.com/docs/6410/153010#注意事项。 topic := "test" for i := 0; i < 10; i++ { msg := &primitive.Message{ Topic: topic, Body: []byte("Hello RocketMQ Go Client! " + strconv.Itoa(i)), } // 使用单向方式发送消息 err := p.SendOneWay(context.Background(), msg) if err != nil { fmt.Printf("send message error: %s\n", err) } else { fmt.Printf("send message success: result=%s\n", res.String()) } } err = p.Shutdown() if err != nil { fmt.Printf("shutdown producer error: %s", err.Error()) } }
集群模式消费的示例代码如下。
import ( "context" "fmt" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/consumer" "github.com/apache/rocketmq-client-go/v2/primitive" "os" ) func main() { c, _ := rocketmq.NewPushConsumer( // 配置使用的group信息,对于实例 ID 格式为 MQ_INST_xxxx 的实例,此处配置的格式为${实例ID%GroupID},例如 "MQ_INST_****%demo-group"。详细说明请参考https://www.volcengine.com/docs/6410/153010#注意事项。 consumer.WithGroupName("demo-group"), consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{ "http://rocketmq-cnoea09856be****.rocketmq.volces.com:9876", })), // 配置RocketMQ实例的AccessKey ID和AccessKey Secret。 consumer.WithCredentials(primitive.Credentials{ AccessKey: "ACCESS KEY", SecretKey: "SECRETKEY", }), ) // 配置订阅的group信息,和处理消息的回调,对于实例 ID 格式为 MQ_INST_xxxx 的实例,此处配置的格式为${实例ID%GroupID},例如 "MQ_INST_****%demo-group"。详细说明请参考https://www.volcengine.com/docs/6410/153010#注意事项。 err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) { for i := range msgs { fmt.Printf("subscribe callback: %v \n", msgs[i]) } // 如果消息处理成功则返回consumer.ConsumeSuccess // 如果消息处理失败则返回consumer.ConsumeRetryLater return consumer.ConsumeSuccess, nil }) if err != nil { fmt.Println(err.Error()) } // Note: start after subscribe err = c.Start() if err != nil { fmt.Println(err.Error()) os.Exit(-1) } waitChan := make(chan interface{}, 0) <-waitChan }
广播模式消费的示例代码如下。
import ( "context" "fmt" "os" "time" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/consumer" "github.com/apache/rocketmq-client-go/v2/primitive" ) func main() { c, _ := rocketmq.NewPushConsumer( // 配置使用的group信息,对于实例 ID 格式为 MQ_INST_xxxx 的实例,此处配置的格式为${实例ID%GroupID},例如 "MQ_INST_****%demo-group"。详细说明请参考https://www.volcengine.com/docs/6410/153010#注意事项。 consumer.WithGroupName("demo-group"), // 配置使用的实例接入点 consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{ "http://rocketmq-cnoea09856be****.rocketmq.volces.com:9876", })), // 配置RocketMQ实例的AccessKey ID和AccessKey Secret。 consumer.WithCredentials(primitive.Credentials{ AccessKey: "ACCESS KEY", SecretKey: "SECRETKEY", }), // 配置使用的消费模型:配置为广播模式 consumer.WithConsumerModel(consumer.BroadCasting), ) // 配置处理消息的回调,于实例 ID 格式为 MQ_INST_xxxx 的实例,此处配置的格式为${实例ID%TopicID},例如 "MQ_INST_****%testTopic"。详细说明请参考https://www.volcengine.com/docs/6410/153010#注意事项。 err := c.Subscribe("testTopic", consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) { fmt.Printf("subscribe callback: %v \n", msgs) return consumer.ConsumeSuccess, nil }) if err != nil { fmt.Println(err.Error()) } // Note: start after subscribe err = c.Start() if err != nil { fmt.Println(err.Error()) os.Exit(-1) } time.Sleep(time.Hour) err = c.Shutdown() if err != nil { fmt.Printf("Shutdown Consumer error: %s", err.Error()) } }