本文以调用 Client SDK for Java 为例,介绍通过开源 SDK 实现 AMQP 协议下消息收发的操作过程,并提供对应的示例代码。其他语言或框架的 SDK 消息收发过程相似。
RabbitMQ 收发消息的示例代码请查看 RabbitMQ 的官方文档 RabbitMQ SDK 使用样例 和 GitHub 中的常见场景的示例代码工程 rabbitmq-tutorials。
除此之外,消息队列 RabbitMQ版提供示例项目 AMQP 协议的 Java SDK Demo 供您参考。
如果项目使用 Maven 构建,请在 pom.xml
文件中增加如下依赖。
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.12.0</version> </dependency>
如果项目使用 Gradle 构建,请添加如下依赖:
compile 'com.rabbitmq:amqp-client:5.12.0'
在使用 Client 接入 RabbitMQ 实例进行消息生产和消费时,需在代码中配置如下参数,请确保下列参数在 Client 代码中配置正确。
参数名 | 参数说明 |
---|---|
host | 接入点域名,进入火山引擎 RabbitMQ 控制台实例详情页面,取 AMQP接入点展示的域名部分。 |
port | 接入点端口,进入火山引擎 RabbitMQ 控制台实例详情页面,取 AMQP接入点展示的端口部分。 |
username | 接入用户名,请进入火山引擎 RabbitMQ 控制台实例详情页面,通过 Web UI接入点进入 RabbitMQ 集群的Web控制台页面,在 Users 标签页查看。 |
password | 接入用户密码,请进入火山引擎 RabbitMQ 控制台实例详情页面,通过Web UI接入点进入 RabbitMQ 集群的 Web 控制台页面,在 Users 标签页查看。 |
消息生产的示例代码如下。
import com.rabbitmq.client.*; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; import java.util.concurrent.TimeoutException; import java.util.HashMap; import java.util.UUID; public class Producer { public static void main(String[] args) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyManagementException { // 未开启SSL认证,使用私网接入点域名。开启SSL认证,使用公网接入点域名。 final String host = "rbtmq-xxxxx.rabbitmq.volces.com"; // 私网接入点端口为5672,公网接入点端口为5671。 final int port = 5671; //建议使用非 Admin 角色的用户。 final String username = "demo-user"; final String password = "demo-user-password"; ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setPort(port); factory.setUsername(username); factory.setPassword(password); //如果使用SSL方式收发消息,请取消以下注释。 //factory.useSslProtocol(); // 配置连接相关的参数,请根据业务特点与网络状况配置 factory.setAutomaticRecoveryEnabled(true); factory.setNetworkRecoveryInterval(5000); factory.setConnectionTimeout(30 * 1000); factory.setHandshakeTimeout(30 * 1000); factory.setShutdownTimeout(0); // 下列是消息发布的AMQP-0-9-1协议对象配置 final String vhost = "/"; final String exchange = "demo-exchange"; final String exchangeType = "direct"; final String queue = "demo-queue"; final String bindingKey = "foo"; // 设置Vhost名称,请确保已经在RabbitMQ集群Web控制台创建 factory.setVirtualHost(vhost); // 创建Connection和Channel Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 创建direct类型的exchange,如果已经在RabbitMQ集群Web控制台手动创建,则可跳过此步骤 channel.exchangeDeclare(exchange, exchangeType, true, false, false, null); HashMap<String, Object> arg = new HashMap<>(); channel.queueDeclare(queue, true, false, false, arg); channel.queueBind(queue, exchange, bindingKey); // 开始消息生产 for (int i = 0; i < 100; i++ ) { // BindingKey根据业务需求填入相应的BindingKey。 AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build(); channel.basicPublish(exchange, bindingKey, true, props, ("demo message " + i).getBytes(StandardCharsets.UTF_8)); } channel.close(); connection.close(); } }
注意
使用 AMQP 0-9-1 协议时,不建议通过 basic.get
协议方法逐条获取消息,因为该方式与持续在线的消费者相比效率较低。详细说明请参考 RabbitMQ 官方文档。
消息消费的示例代码如下。
import com.rabbitmq.client.*; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; import java.util.HashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeoutException; public class Consumer { public static void main(String[] args) throws IOException, TimeoutException, InterruptedException, NoSuchAlgorithmException, KeyManagementException { // 未开启SSL认证,使用私网接入点域名。开启SSL认证,使用公网接入点域名。 final String host = "rbtmq-xxxxx.rabbitmq.volces.com"; // 私网接入点端口为5672,公网接入点端口为5671。 final int port = 5671; //建议使用非 Admin 角色的用户。 final String username = "demo-user"; final String password = "demo-user-password"; ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setPort(port); factory.setUsername(username); factory.setPassword(password); //如果使用SSL方式收发消息,请取消以下注释。 //factory.useSslProtocol(); // 配置连接相关的参数,请根据业务特点与网络状况配置 factory.setAutomaticRecoveryEnabled(true); factory.setNetworkRecoveryInterval(5000); factory.setConnectionTimeout(30 * 1000); factory.setHandshakeTimeout(30 * 1000); factory.setShutdownTimeout(0); // 下列是消息发布的AMQP-0-9-1协议对象配置 final String vhost = "/"; final String exchange = "demo-exchange"; final String exchangeType = "direct"; final String queue = "demo-queue"; final String bindingKey = "foo"; // 设置Vhost名称,请确保已经在RabbitMQ集群Web控制台创建 factory.setVirtualHost(vhost); // 创建Connection和Channel Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 创建direct类型的exchange,如果已经在RabbitMQ集群Web控制台手动创建,则可跳过此步骤 channel.exchangeDeclare(exchange, exchangeType, true, false, false, null); HashMap<String, Object> arg = new HashMap<>(); channel.queueDeclare(queue, true, false, false, arg); channel.queueBind(queue, exchange, bindingKey); // 开始消费消息。 channel.basicConsume(queue, false, "consumertag", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.printf("receive message: " + new String(body, StandardCharsets.UTF_8) + ", deliveryTag: " + envelope.getDeliveryTag() + ", messageId: " + properties.getMessageId()); // 单条消息回复ack channel.basicAck(envelope.getDeliveryTag(), false); } }); CountDownLatch latch = new CountDownLatch(1); Runtime.getRuntime().addShutdownHook(new Thread(() -> { try { channel.getConnection().close(); } catch (IOException e) { System.out.println("close connection error." + e); } latch.countDown(); })); latch.await(); connection.close(); } }
错误码 | 描述 | 处理方法 |
---|---|---|
200 | 请求成功。 | 无。 |
403 | 权限错误。 | 请确保使用的 user 具备 vhost 的访问权限,同时请确保 username 和 password 配置正确。 |
404 | 资源不存在。 | 确保资源已经存在并符合预期。 |
406 | 预检查失败。 | 确保调用 AMQP 协议方法时的参数正确。 |
503 | 无效的命令。 | 停止使用消息队列 RabbitMQ版当前不支持的 AMQP 协议方法或 Exchange 类型。 |
541 | 服务端内部错误。 | 根据返回信息的错误原因定位问题后,联系消息队列 RabbitMQ版技术人员获取帮助。 |