You need to enable JavaScript to run this app.
导航
Java SDK(AMQP 协议)
最近更新时间:2024.07.25 17:03:29首次发布时间:2022.01.28 15:17:02

本文以调用 Client SDK for Java 为例,介绍通过开源 SDK 实现 AMQP 协议下消息收发的操作过程,并提供对应的示例代码。其他语言或框架的 SDK 消息收发过程相似。

注意事项

  • 本文档以 AMQP 协议下收发消息为例,演示消息生产与消费的示例代码。请根据业务的协议类型,在控制台中复制 AMQP 接入点进行相关的参数配置。
  • 在消息收发过程中,请使用长连接,避免在每条消息的收发过程中创建连接,否则会造成大量不必要的资源消耗,甚至引起服务端 SYN Flood 防护。
  • 直接使用管理员用户进行生产和消费,可能会有信息泄露风险。您在进行生产和消费前,需要先在 RabbitMQ 集群管理 Web UI 上创建一个非 Admin 角色的新用户,并为新用户绑定 Virtual Host。操作步骤,请参见绑定用户和 Vhost

示例项目

RabbitMQ 收发消息的示例代码请查看 RabbitMQ 的官方文档 RabbitMQ SDK 使用样例GitHub 中的常见场景的示例代码工程 rabbitmq-tutorials
除此之外,消息队列 RabbitMQ版提供示例项目 AMQP 协议的 Java SDK Demo 供您参考。

环境准备

  1. 在客户端环境安装 JDK 8 或更高版本的 JDK。
  2. 添加依赖。
    • 如果项目使用 Maven 构建,请在 pom.xml 文件中增加如下依赖。

      <dependency>
          <groupId>com.rabbitmq</groupId>
          <artifactId>amqp-client</artifactId>
          <version>5.12.0</version>
      </dependency>
      
    • 如果项目使用 Gradle 构建,请添加如下依赖:

      compile 'com.rabbitmq:amqp-client:5.12.0'
      

接入配置

在使用 Client 接入 RabbitMQ 实例进行消息生产和消费时,需在代码中配置如下参数,请确保下列参数在 Client 代码中配置正确。

参数名

参数说明

host

接入点域名,进入火山引擎 RabbitMQ 控制台实例详情页面,取 AMQP接入点展示的域名部分。

port

接入点端口,进入火山引擎 RabbitMQ 控制台实例详情页面,取 AMQP接入点展示的端口部分。

username

接入用户名,请进入火山引擎 RabbitMQ 控制台实例详情页面,通过 Web UI接入点进入 RabbitMQ 集群的Web控制台页面,在 Users 标签页查看。
请确保使用的 User 已经绑定了待访问 vhost,具体操作请参见绑定用户和 Vhost

password

接入用户密码,请进入火山引擎 RabbitMQ 控制台实例详情页面,通过Web UI接入点进入 RabbitMQ 集群的 Web 控制台页面,在 Users 标签页查看。

消息生产

消息生产的示例代码如下。

import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.TimeoutException;
import java.util.HashMap;
import java.util.UUID;

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyManagementException {
        // 未开启SSL认证,使用私网接入点域名。开启SSL认证,使用公网接入点域名。
        final String host = "rbtmq-xxxxx.rabbitmq.volces.com";
        // 私网接入点端口为5672,公网接入点端口为5671。
        final int port = 5671;
        //建议使用非 Admin 角色的用户。
        final String username = "demo-user";
        final String password = "demo-user-password";

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(username);
        factory.setPassword(password);
        //如果使用SSL方式收发消息,请取消以下注释。
        //factory.useSslProtocol();

        // 配置连接相关的参数,请根据业务特点与网络状况配置
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        factory.setConnectionTimeout(30 * 1000);
        factory.setHandshakeTimeout(30 * 1000);
        factory.setShutdownTimeout(0);

        // 下列是消息发布的AMQP-0-9-1协议对象配置
        final String vhost = "/";
        final String exchange = "demo-exchange";
        final String exchangeType = "direct";
        final String queue = "demo-queue";
        final String bindingKey = "foo";

        // 设置Vhost名称,请确保已经在RabbitMQ集群Web控制台创建
        factory.setVirtualHost(vhost);

        // 创建Connection和Channel
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 创建direct类型的exchange,如果已经在RabbitMQ集群Web控制台手动创建,则可跳过此步骤
        channel.exchangeDeclare(exchange, exchangeType, true, false, false, null);
        HashMap<String, Object> arg = new HashMap<>();
        channel.queueDeclare(queue, true, false, false, arg);
        channel.queueBind(queue, exchange, bindingKey);

        // 开始消息生产
        for (int i = 0; i < 100; i++  ) {
            // BindingKey根据业务需求填入相应的BindingKey。
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
            channel.basicPublish(exchange, bindingKey, true, props,
                    ("demo message "  + i).getBytes(StandardCharsets.UTF_8));
        }

        channel.close();
        connection.close();
    }
}

消息消费

注意

使用 AMQP 0-9-1 协议时,不建议通过 basic.get 协议方法逐条获取消息,因为该方式与持续在线的消费者相比效率较低。详细说明请参考 RabbitMQ 官方文档

消息消费的示例代码如下。

import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;

public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException, NoSuchAlgorithmException, KeyManagementException {
        // 未开启SSL认证,使用私网接入点域名。开启SSL认证,使用公网接入点域名。
        final String host = "rbtmq-xxxxx.rabbitmq.volces.com";
        // 私网接入点端口为5672,公网接入点端口为5671。
        final int port = 5671;
        //建议使用非 Admin 角色的用户。
        final String username = "demo-user";
        final String password = "demo-user-password";

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(username);
        factory.setPassword(password);
        //如果使用SSL方式收发消息,请取消以下注释。
        //factory.useSslProtocol();

        // 配置连接相关的参数,请根据业务特点与网络状况配置
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        factory.setConnectionTimeout(30 * 1000);
        factory.setHandshakeTimeout(30 * 1000);
        factory.setShutdownTimeout(0);

        // 下列是消息发布的AMQP-0-9-1协议对象配置
        final String vhost = "/";
        final String exchange = "demo-exchange";
        final String exchangeType = "direct";
        final String queue = "demo-queue";
        final String bindingKey = "foo";

        // 设置Vhost名称,请确保已经在RabbitMQ集群Web控制台创建
        factory.setVirtualHost(vhost);

        // 创建Connection和Channel
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 创建direct类型的exchange,如果已经在RabbitMQ集群Web控制台手动创建,则可跳过此步骤
        channel.exchangeDeclare(exchange, exchangeType, true, false, false, null);
        HashMap<String, Object> arg = new HashMap<>();
        channel.queueDeclare(queue, true, false, false, arg);
        channel.queueBind(queue, exchange, bindingKey);

        // 开始消费消息。
        channel.basicConsume(queue, false, "consumertag", new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                System.out.printf("receive message: "  + new String(body, StandardCharsets.UTF_8) +  ", deliveryTag: "  + envelope.getDeliveryTag()  + ", messageId: " +  properties.getMessageId());
                // 单条消息回复ack
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
        CountDownLatch latch = new CountDownLatch(1);
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            try {
                channel.getConnection().close();
            } catch (IOException e) {
                System.out.println("close connection error." + e);
            }
            latch.countDown();
        }));
        latch.await();

        connection.close();
    }
}

错误码(AMQP)

错误码

描述

处理方法

200

请求成功。

无。

403

权限错误。

请确保使用的 user 具备 vhost 的访问权限,同时请确保 username 和 password 配置正确。

404

资源不存在。

确保资源已经存在并符合预期。

406

预检查失败。

确保调用 AMQP 协议方法时的参数正确。

503

无效的命令。

停止使用消息队列 RabbitMQ版当前不支持的 AMQP 协议方法或 Exchange 类型。

541

服务端内部错误。

根据返回信息的错误原因定位问题后,联系消息队列 RabbitMQ版技术人员获取帮助。