准备开发环境,并创建相关服务资源之后,您可以调用开源 TCP 协议的 RocketMQ SDK 收发普通消息。本文档以 TCP 协议的 Java SDK 为例,介绍消息队列 RocketMQ版收发普通消息的基本步骤。
在使用 Java SDK 接入火山引擎消息队列 RocketMQ版收发消息时,需要配置相应的消息生产或消费参数。您可以参考参数说明,了解相关的参数信息。消息队列 RocketMQ版提供常见场景的消息生产与消费示例代码,您也可以参考示例代码中的注释,了解各个关键参数。详细信息请参考SDK 参考。
RocketMQ Java 客户端 SDK 支持连接火山引擎消息队列 RocketMQ版,推荐使用的客户端版本为 4.8.0,详情请参见 Java SDK 下载地址 。
在pom.xml
中添加以下依赖。
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.8.0</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-acl</artifactId> <version>4.8.0</version> </dependency>
您可以通过以下方式发送普通消息:
成功发送消息之后,如果可以通过消息查询功能检索到这条消息,表示消息已成功发送到服务端。查询消息的操作步骤请参考消息查询。
import java.util.Date; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; public class RocketMQAsyncProducer { //设置为您在火山引擎消息队列 RocketMQ版控制台上创建的GID,以及替换为您的AccessKey ID和AccessKey Secret。 private static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials("YOUR ACCESS KEY", "YOUR SECRET KEY")); } public static void main(String[] args) throws MQClientException { /** * 创建Producer * 设置为您从火山引擎消息队列 RocketMQ版控制台获取的接入点信息,例如TCP私网接入点(VPC)为“http://{INSTANCE_ID}.rocketmq.ivolces.com.com:9876”。 * 设置AccessKey和Secret Key */ DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook()); producer.setNamesrvAddr("YOUR ACCESS POINT"); //如果 SSL 认证策略设置为仅SSL连接,则通过公网访问实例时必须设置setUseTLS(true) //producer.setUseTLS(true); producer.start(); for (int i = 0; i < 128; i++) { try { Message msg = new Message("YOUR TOPIC", "YOUR MESSAGE TAG", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult result) { // 消费发送成功。 System.out.println("send message success. msgId= " + result.getMsgId()); } @Override public void onException(Throwable throwable) { // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。 System.out.println("send message failed."); throwable.printStackTrace(); } }); } catch (Exception e) { // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。 System.out.println(new Date() + " Send mq message failed."); e.printStackTrace(); } } // 在应用退出前,销毁Producer对象。 // 注意:如果不销毁也没有问题。 producer.shutdown(); } }
成功发送消息之后,需要启动消费者客户端,订阅消息。您可以参考以下示例代码,启动消费者并订阅普通消息。
import java.util.List; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.RPCHook; public class RocketMQPushConsumer { //设置为您在火山引擎消息队列 RocketMQ版控制台上创建的GID,以及替换为您的AccessKey ID和AccessKey Secret。 private static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials("YOUR ACCESS KEY", "YOUR SECRET KEY")); } public static void main(String[] args) throws MQClientException { //设置为您在火山引擎消息队列 RocketMQ版控制台上创建的GID, 以及替换为您火山引擎账号的AccessKey ID和AccessKey Secret。 //设置为火山引擎消息队列 RocketMQ版实例的接入点。 // 设置为您在火山引擎消息队列 RocketMQ版控制台上创建的Topic。 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely()); consumer.setNamesrvAddr("http://{INSTANCE_ID}.rocketmq.volces.com.com:9876"); consumer.subscribe("YOUR TOPIC", "*"); //如果 SSL 认证策略设置为仅SSL连接,则通过公网访问实例时必须设置setUseTLS(true) //consumer.setUseTLS(true); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("Receive New Messages: %s %n", msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } }