如果对数据可用性和可靠性要求较高,您可以选择使用一致性 hash 插件或 Quorum 队列来保障单节点故障场景的服务高可用。本文档介绍各种高可用方案的配置方式。
Classic 队列,即经典队列,是 RabbitMQ 中最常见的基本队列类型,Classic 队列提供了基本的消息传递和处理功能,简单而高效。Classic 队列使用先进先出(FIFO)的消息传递方式,确保消息按照发送的顺序进行处理。
但 Classic 队列在 RabbitMQ 中基于单节点的存储结构,不具备数据冗余的特性。如果节点发生故障,队列中的消息可能会丢失。
如果仍然需要使用 Classic 队列,您也可以通过rabbitmq_consistent_hash_exchange
插件实现 Classic 队列高可用。rabbitmq_consistent_hash_exchange
是 RabbitMQ 提供的一致性 hash 交换器,用于通过一致性 hash 算法将消息分发到已绑定 Exchange 的队列上。该插件可以将原有的一个队列拆分为多个队列,并将拆分出的队列分别绑定到不同的节点上,以应对单节点故障的情况下队列不可用的场景。
Quorum 队列提供了数据冗余和高可用性的特性、对消息的可靠性保障更高。由于 Quorum 队列是 RabbitMQ 新引入特性,可能存在稳定性和一致性问题,请谨慎使用 Quorum 队列。
您可以在控制台启用rabbitmq_consistent_hash_exchange
插件,然后便会通过一致性 hash 算法将消息分发到已绑定 Exchange 的队列上。x-consistent-hash
类型的 Exchange 会根据消息的指定方式计算哈希进行路由。Exchange上绑定有不同权重的队列,并根据 Queue 的权重将消息分发到不同的队列中。
rabbitmq_consistent_hash_exchange
插件。消息生产示例代码如下。
public class classicHashDemoProducer { public static void main(String[] args) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyManagementException, InterruptedException { // 下列变量按照实际情况进行配置。 final String host = "xxxxxxx.rabbitmq.volces.com"; final int port = 5671; final String username = "xxxx"; final String password = "xxxxxx"; ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setPort(port); factory.setUsername(username); factory.setPassword(password); // 配置连接相关的参数,请根据业务特点与网络状况配置 factory.setAutomaticRecoveryEnabled(true); factory.setNetworkRecoveryInterval(5000); factory.setConnectionTimeout(30 * 1000); factory.setHandshakeTimeout(30 * 1000); factory.setShutdownTimeout(0); factory.useSslProtocol(); // 下列是消息发布的AMQP-0-9-1协议对象配置。 // 设置Vhost名称和Exchange名称,请确保已经在RabbitMQ集群Web控制台创建。 final String vhost = "/"; final String EXCHANGE_NAME = "HashExchange"; factory.setVirtualHost(vhost); // 创建Connection和Channel Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); for (int i = 0; i < 10000; i++) { String message = "Hello, RabbitMQ!"; String messageId = UUID.randomUUID().toString(); // 为每条消息生成一个随机的RoutingKey,用于随机散列 String RoutingKey = UUID.randomUUID().toString(); channel.basicPublish(EXCHANGE_NAME, RoutingKey, MessageProperties.PERSISTENT_TEXT_PLAIN.builder() .messageId(messageId) .build(), message.getBytes()); System.out.println(" Sent message: '" + message + "' with id: " + messageId); } channel.close(); connection.close(); } }
通过一致性 hash 插件实现 Classic 队列高可用之后,您可以参考以下示例,验证单节点故障场景下的 Classic 队列可用性。
在 RabbitMQ 实例各节点状态正常的场景下,您可以登录 RabbitMQ WebUI,在顶部菜单栏 Queues 下观察各队列生产消费情况。例如以下示例表示消息散列到不同节点的不同队列上,减小了单节点压力。
在 RabbitMQ 实例存在单节点故障时,您可以在 RabbitMQ WebUI 中查看节点状态及生产消费情况,验证消息生产消费是否正常。
查看故障节点。
在顶部菜单栏 Overview 页签中查看节点状态。例如下图表示存在单节点故障。
查看队列生产消费情况。
在顶部菜单栏 Queues 页签中,观察各队列生产消费情况。其中,故障节点队列不可用,正常节点可以继续生产消费,证实了 Claasic 队列的可用性。
RabbitMQ Quorum 队列是在 RabbitMQ 3.8 版本中引入的一种队列类型,它具有高可用性和数据冗余的特性。相比于传统的 Classic 队列,Quorum 队列提供了更好的可用性和数据保护。
注意
由于 Quorum 队列是 RabbitMQ 新引入特性,可能存在稳定性和一致性问题,请谨慎使用 Quorum 队列。
Quorum 队列的特性如下:
创建 Quorum 队列的操作步骤如下: