Quorum队列:增强消息传递的可靠性与一致性



RabbitMQ 4引入了Quorum队列(Quorum Queues),这是一种全新的队列类型,旨在解决传统经典队列在高并发和分布式环境中的局限性。与经典队列不同,Quorum队列采用基于Raft共识算法的设计,能够在分布式节点之间实现强一致性和高可用性。这种特性使其特别适用于对消息可靠性要求极高的场景,例如金融交易系统或订单处理平台。
Quorum队列的核心工作原理在于其分布式存储机制。每个队列的数据会被复制到多个节点上,形成一个逻辑上的“仲裁组”(Quorum Group)。当生产者向队列发送消息时,消息需要被大多数节点确认后才会被视为成功写入。这种机制确保了即使部分节点发生故障,队列仍能保持可用性,并且不会丢失任何已提交的消息。相比之下,经典队列虽然提供了快速的本地存储,但在节点故障或网络分区的情况下容易出现消息丢失或不一致的问题。
Quorum队列的优势主要体现在以下几个方面。首先,它提供了更高的容错能力。由于数据被分布在多个节点上,单个节点的失效不会影响整个队列的运行。其次,Quorum队列支持自动化的领导者选举机制,能够在主节点崩溃时迅速切换到备用节点,从而减少服务中断时间。最后,Quorum队列的强一致性保证了生产者和消费者之间的数据同步,避免了因消息乱序或重复而导致的业务逻辑错误。
然而,Quorum队列也有一定的局限性。由于其基于Raft算法的设计,写入操作需要经过多数节点的确认,因此在高吞吐量场景下可能会引入额外的延迟。此外,维护多个副本会增加存储成本和网络开销,这在资源受限的环境中可能成为一个挑战。尽管如此,Quorum队列仍然是对经典队列的一种重要补充,尤其适合那些优先考虑消息可靠性和系统稳定性的应用。
与经典队列区别
https://www.rabbitmq.com/docs/quorum-queues#feature-comparison

- 声明与操作
○ 声明队列:声明仲裁队列时,需将x-queue-type参数设为quorum,默认最多有三个副本,分布在集群节点上,声明后可与其他队列一样绑定交换器。
○ 客户端操作:消费、确认、取消消费者、清除队列消息、删除队列等操作与经典队列类似,但声明和设置消费者QoS预取时有差异。
- 复制与管理
○ 复制因子与成员管理:默认初始复制因子为三,建议复制因子为奇数且不超过集群节点数的多数。可通过x-quorum-initial-group-size参数控制初始复制因子,通过命令行工具管理副本成员。
○ 队列领导者管理:每个仲裁队列有一个领导者副本,负责处理队列操作并复制到追随者副本。可通过多种方式设置初始领导者,还可使用rabbitmq-queues rebalance命令重新平衡领导者分布。
○ 连续成员协调(CMR):是对副本管理的补充,可自动尝试将副本数量调整到目标值。可通过配置文件、策略或队列参数设置目标副本数量等相关参数,由特定事件触发,默认每60分钟检查一次。
- 行为与性能
○ 行为表现:依赖Raft协议确保数据一致性和安全性。领导者选举在集群形成或领导者不可用时进行,故障节点恢复或新副本加入时会进行同步。集群节点数量影响容错能力,建议在3 - 7个节点的集群中使用仲裁队列。
○ 性能特征:设计为用延迟换取吞吐量,在使用消费者确认和发布者确认的场景中,吞吐量优于经典镜像队列。受磁盘I/O影响大,消息越大、副本越多,吞吐量越低,建议使用高速磁盘和合理设置消费者预取值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| package com.lixiang.rabbitmq;
import com.google.gson.Gson; import com.lixiang.rabbitmq.entity.SMS; import com.lixiang.rabbitmq.utils.RabbitConstant; import com.lixiang.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import lombok.extern.slf4j.Slf4j; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeoutException;
@Slf4j public class OrderSystem { public static void main(String[] args) throws IOException, TimeoutException {
try (Connection connection = RabbitUtils.getConnection(); Channel channel = connection.createChannel();) { Map<String, Object> arguments = new HashMap<>(); arguments.put("x-queue-type", "quorum");
arguments.put("x-quorum-initial-group-size", 3); channel.queueDeclare(RabbitConstant.QUEUE_SMS_QUORUM, true, false, false, arguments); for (int i = 100; i < 200; i++) { SMS sms = new SMS("乘客" + i, "13900000" + i, "您的车票已预订成功"); String jsonSMS = new Gson().toJson(sms); channel.basicPublish("", RabbitConstant.QUEUE_SMS_QUORUM, null, jsonSMS.getBytes()); } log.info("订单信息已投递至Broker"); } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| package com.lixiang.rabbitmq;
import com.lixiang.rabbitmq.utils.RabbitConstant; import com.lixiang.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.*; import lombok.extern.slf4j.Slf4j; import java.io.IOException;
@Slf4j public class SmsSender { public static void main(String[] args) throws IOException { Connection connection = RabbitUtils.getConnection(); final Channel channel = connection.createChannel(); channel.basicConsume(RabbitConstant.QUEUE_SMS_QUORUM, false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String jsonSMS = new String(body); log.info("SMSSender-短信发送成功:{}", jsonSMS); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } channel.basicAck(envelope.getDeliveryTag(), false); } }); } }
|