火山引擎消息队列 RabbitMQ版兼容开源 RabbitMQ 协议,创建 RabbitMQ 实例后,您可以通过 SSL 认证连接实例并生产、消费消息。
关于 RabbitMQ 的使用教程,请参考 RabbitMQ 官网提供的不同语言的连接和使用向导。详细信息请查看RabbitMQ官网。
安装 Java 依赖库。
pom.xml
文件中增加如下依赖。<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.12.0</version> </dependency>
compile 'com.rabbitmq:amqp-client:5.12.0'
连接实例并生产消息。
为了提高稳定性和可用性,建议参考实例代码,开启客户端自动重连、Publish Confirm等机制。
package org.example.amqp.producer; import com.rabbitmq.client.*; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.UUID; public class SimpleProducerSSL { private static final String host = "your-end-point"; //AMQP协议公网接入地址。 private static final int port = 5671; private static final String userName = "your-user-name"; private static final String password = "your-password"; private static final String vhost = "/"; private static final String exchangeName = "your-exchange"; private static final String queueName = "your-queue"; private static final String bindingKey = "your-key"; private static final int deliveryMode = 2; private static final int batchSize = 50; private static final int publishConfirmTimeout = 10000; // publish confirm超时时间10秒 public static void main(String[] args) throws Exception{ ConnectionFactory factory = new ConnectionFactory(); // 设置接入点,在RabbitMQ版控制台实例详情页面查看 factory.setHost(host); // 设置端口,AMQP协议SSL加密端口5671 factory.setPort(port); // 用户名,在WebUI控制台配置并管理 factory.setUsername(userName); // 密码,在WebUI控制台配置并管理 factory.setPassword(password); // 设置Vhost名称,请确保已在RabbitMQ版控制台上创建完成。 factory.setVirtualHost(vhost); // 启用SSL(可选) factory.useSslProtocol(); // 开启自动重连,默认值为true factory.setAutomaticRecoveryEnabled(true); // 设置自动重连间隔 factory.setNetworkRecoveryInterval(5000); // 设置连接建立的协商超时时间 factory.setConnectionTimeout(20000); // 建立Connection Connection connection = factory.newConnection(); // 创建Channel Channel channel = connection.createChannel(); // 开启publish confirm(可选,推荐开启) channel.confirmSelect(); // 创建Exchange(可选) channel.exchangeDeclare(exchangeName, "direct", true, false, false, null); // 创建Classic类型队列(可选) channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>()); // 绑定Exchange和队列(可选) channel.queueBind(queueName, exchangeName, bindingKey); // 注册publish confirm的回调 channel.addConfirmListener(new ConfirmListener() { public void handleNack(long deliveryTag, boolean multiple) { // 处理nack回调 System.out.println("nack received: " + deliveryTag); } public void handleAck(long deliveryTag, boolean multiple) { // 处理ack回调 System.out.println("ack received: " + deliveryTag); } }); int msgsToSend = 10000; // 持续发送消息 while (msgsToSend > 0) { // 设置消息属性 AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).deliveryMode(deliveryMode).build(); String msgBody = "hello rabbitmq"; try { channel.basicPublish(exchangeName, bindingKey, true, props, msgBody.getBytes(StandardCharsets.UTF_8)); msgsToSend--; // 等待上批次发送消息的confirm if (msgsToSend%batchSize == 0) { channel.waitForConfirms(publishConfirmTimeout); } } catch (Exception e) { e.printStackTrace(); } } // 关闭Channel和Connection channel.close(); connection.close(); } }
连接实例并消费消息。
package org.example.amqp.consumer; import com.rabbitmq.client.*; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; import java.util.HashMap; import java.util.concurrent.TimeoutException; public class SimpleConsumerSSL { private static final String host = "your-end-point"; // AMQP协议公网接入地址。 private static final int port = 5671; private static final String userName = "your-user-name"; private static final String password = "your-password"; private static final String vhost = "/"; private static final String queueName = "your-queue"; public static void main(String[] args) throws IOException, TimeoutException, KeyManagementException, NoSuchAlgorithmException { ConnectionFactory factory = new ConnectionFactory(); // 设置接入点,在RabbitMQ版控制台实例详情页面查看 factory.setHost(host); // 设置端口,AMQP协议默认端口5672 factory.setPort(port); // 用户名,在WebUI控制台配置并管理 factory.setUsername(userName); // 密码,在WebUI控制台配置并管理 factory.setPassword(password); // 设置Vhost名称,请确保已在RabbitMQ版控制台上创建完成。 factory.setVirtualHost(vhost); // 启用SSL factory.useSslProtocol(); // 建立Connection Connection connection = factory.newConnection(); // 创建Channel Channel channel = connection.createChannel(); // 创建Classic类型队列(非必须,如果已在WebUI控制台创建可跳过) channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>()); // 启动消费 consume(connection); // 关闭Channel和Connection channel.close(); connection.close(); } private static void consume(Connection connection) throws IOException { final Channel channel = connection.createChannel(); channel.basicConsume(queueName, true, "test_consumer", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 处理收到的消息 System.out.printf("Received Message: %s, consumerTag: %s, deliveryTag: %s, messageId: %s\n", new String(body, StandardCharsets.UTF_8), consumerTag, envelope.getDeliveryTag(), properties.getMessageId()); } }); } }