You need to enable JavaScript to run this app.
导航
参数说明
最近更新时间:2022.12.14 11:37:07首次发布时间:2022.01.28 15:16:40

本文介绍您在使用 Go SDK 接入火山引擎消息队列 RocketMQ版时,需要配置的参数。

消息发送参数

配置示例:

// 通用的使用方式
p, _ := rocketmq.NewProducer(
   producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
   producer.WithRetry(2),
   // 在这里添加新的配置参数
)

参数说明:

参数
是否必选
说明
配置方式

WithGroupName

可选

Producer 组名,即您在火山引擎 RocketMQ 控制台上创建的 Group ID。
使用事务消息时需要指定 Producer 的 groupID,并保证使用相同 groupID 的客户端处理逻辑是相同的,RocketMQ 服务端会在回查事务消息提交状态的时候发送到相同 groupID 中的任何一个客户端。

说明

对于2022年12月16日之前创建的实例,此处配置的格式为 RocketMQ实例ID%Group名称,例如实例 ID为MQ_INST_1111313Group 名称为 Demo,则拼接后为 MQ_INST_1111313%Demo

producer.WithGroupName("demo-group")

WithInstanceName在单个进程启动多个客户端时必选客户端标识(Client ID)。一个进程中需要创建多个 RocketMQ 客户端连接不同实例时,需要通过 InstanceName 区分客户端连接。producer.WithInstanceName("instance")
WithSendMsgTimeout可选发送消息的超时时间。producer.WithSendMsgTimeout(3 * time.Second)
Retry可选重试次数。producer.Retry(3)
WithInterceptor可选拦截器。primitive.Interceptor

WithQueueSelector

顺序消息时需要填写

消息投递选择队列的逻辑。RocketMQ 提供内置的队列选择器,您可以根据业务需要任选一种。
内置的队列选择器包括以下几种。

  • producer.NewRandomQueueSelector 随机选择队列。

  • producer.NewRoundRobinQueueSelector 按照轮训方式选择队列。

  • producer.NewManualQueueSelector 直接选择消息中配置的队列。

queueSelector := producer.NewRandomQueueSelector()
p, _ := rocketmq.NewProducer(
   ...
   producer.WithQueueSelector(queueSelector),
)
// 也可以实现 producer.QueueSelector 接口,指定自己的队列选择器
//type QueueSelector interface {
//   Select(*primitive.Message, []*primitive.MessageQueue) *primitive.MessageQueue
//}

WithCredentials

必选

密钥信息,包括 AccessKey 和 SecretKey。

producer.WithCredentials(primitive.Credentials{
   AccessKey: "",
   SecretKey: "",
}),

WithNsResolver

必选

指定用于解析 NameServer 地址的路由,NameServer 地址和域名任选其一。

p, _ := rocketmq.NewProducer(
   producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
   ....
)

消息消费参数

参数
是否必选
说明
配置方式

WithConsumerModel

可选

消费模式。默认为 Clustering,即集群模式。

  • BroadCasting:广播模式

  • Clustering:集群模式

c, err := rocketmq.NewPushConsumer(
   consumer.WithConsumerModel(consumer.Clustering),
   ....
)

WithConsumeFromWhere

可选

新的Consumer Group启动后,用于确定从何处开始拉取。取值包括:

  • ConsumeFromLastOffset:从上次消费位置继续消费。新建的Consumer Group选择此参数将从起始位置开始消费。

  • ConsumeFromFirstOffset:从起始位置开始消费。选择此参数时,可能发生重复消费,请根据实际业务情况选择。

  • ConsumeFromTimestamp:从指定时间戳开始消费。小于指定时间戳的消息将不会被消费。

// 1. ConsumeFromLastOffset 从最新位置开始消费.
c, err := rocketmq.NewPushConsumer(
   consumer.WithConsumeFromWhere(consumer.ConsumeFromLastOffset),
   ....
)
// 2. ConsumeFromFirstOffset 从起始位置开始消费.
c, err := rocketmq.NewPushConsumer(
   consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),
   ....
)

// 3. ConsumeFromTimestamp 从指定时间戳开始消费.
c, err := rocketmq.NewPushConsumer(
   consumer.WithConsumeFromWhere(consumer.ConsumeFromTimestamp),
   consumer.WithConsumeTimestamp("20131223171201")
   ....
)

WithConsumerOrder

顺序消费时必选

配置是否为顺序消费模式,默认为 false,即不顺序消费。

c, _ := rocketmq.NewPushConsumer(
   ....
   consumer.WithConsumerOrder(true),
)

WithConsume
MessageBatchMaxSize

可选

每次消费一批消息的时候包含多少条消息,默认为 1。

c, _ := rocketmq.NewPushConsumer(
   ....
   consumer.WithConsumeMessageBatchMaxSize(1),
)
WithInterceptor可选拦截器,用于拦截消费者。primitive.Interceptor

WithGroupName

必选

Consumer 的组名。即您在控制台上创建的 GroupID。

说明

对于2022年12月16日之前创建的实例,此处配置的格式为 RocketMQ实例ID%Group名称,例如实例 ID 为 MQ_INST_1111313Group 名称为 Demo,则拼接后为 MQ_INST_1111313%Demo

c, _ := rocketmq.NewPushConsumer(
   ....
   consumer.WithGroupName("demo-group"),
)

WithInstance

在单个进程启动多个客户端时必选

客户端标识(Client ID)。一个进程中需要创建多个 RocketMQ 客户端连接不同实例时,需要通过 InstanceName 区分客户端连接。

c, _ := rocketmq.NewPushConsumer(
   ....
   consumer.WithInstance("aaaaa"),
)

WithCredentials

必选

RocketMQ 实例的密钥,即您在火山引擎控制台中、RocketMQ 实例的密钥管理页面创建的密钥,用于身份认证。
注意:此处的密钥并非火山引擎主账号AccessKey。

c, err := rocketmq.NewPushConsumer(
   consumer.WithGroupName("demo-group"),
   consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
   consumer.WithCredentials(primitive.Credentials{
      AccessKey: "RocketMQ",
      SecretKey: "12345678",
   }),
)

WithMaxReconsumeTimes

可选

消费最大重试次数,默认为 -1,即重试 16 次。

c, err := rocketmq.NewPushConsumer(
   consumer.WithMaxReconsumeTimes(10),
)

WithStrategy

可选

集群模式下队列分配逻辑。详细信息请 consumer 包里面的 strategy.go 文件。

c, err := rocketmq.NewPushConsumer(
consumer.WithStrategy(consumer.AllocateByAveragely),
)