RabbitMQ Quorum队列

Quorum队列:增强消息传递的可靠性与一致性

image-20210711-1

image-20210711-2

image-20210711-3

RabbitMQ 4引入了Quorum队列(Quorum Queues),这是一种全新的队列类型,旨在解决传统经典队列在高并发和分布式环境中的局限性。与经典队列不同,Quorum队列采用基于Raft共识算法的设计,能够在分布式节点之间实现强一致性和高可用性。这种特性使其特别适用于对消息可靠性要求极高的场景,例如金融交易系统或订单处理平台。
Quorum队列的核心工作原理在于其分布式存储机制。每个队列的数据会被复制到多个节点上,形成一个逻辑上的“仲裁组”(Quorum Group)。当生产者向队列发送消息时,消息需要被大多数节点确认后才会被视为成功写入。这种机制确保了即使部分节点发生故障,队列仍能保持可用性,并且不会丢失任何已提交的消息。相比之下,经典队列虽然提供了快速的本地存储,但在节点故障或网络分区的情况下容易出现消息丢失或不一致的问题。
Quorum队列的优势主要体现在以下几个方面。首先,它提供了更高的容错能力。由于数据被分布在多个节点上,单个节点的失效不会影响整个队列的运行。其次,Quorum队列支持自动化的领导者选举机制,能够在主节点崩溃时迅速切换到备用节点,从而减少服务中断时间。最后,Quorum队列的强一致性保证了生产者和消费者之间的数据同步,避免了因消息乱序或重复而导致的业务逻辑错误。
然而,Quorum队列也有一定的局限性。由于其基于Raft算法的设计,写入操作需要经过多数节点的确认,因此在高吞吐量场景下可能会引入额外的延迟。此外,维护多个副本会增加存储成本和网络开销,这在资源受限的环境中可能成为一个挑战。尽管如此,Quorum队列仍然是对经典队列的一种重要补充,尤其适合那些优先考虑消息可靠性和系统稳定性的应用。

与经典队列区别

https://www.rabbitmq.com/docs/quorum-queues#feature-comparison

image-20210711-4

  1. 声明与操作
    ○ 声明队列:声明仲裁队列时,需将x-queue-type参数设为quorum,默认最多有三个副本,分布在集群节点上,声明后可与其他队列一样绑定交换器。
    ○ 客户端操作:消费、确认、取消消费者、清除队列消息、删除队列等操作与经典队列类似,但声明和设置消费者QoS预取时有差异。
  2. 复制与管理
    ○ 复制因子与成员管理:默认初始复制因子为三,建议复制因子为奇数且不超过集群节点数的多数。可通过x-quorum-initial-group-size参数控制初始复制因子,通过命令行工具管理副本成员。
    ○ 队列领导者管理:每个仲裁队列有一个领导者副本,负责处理队列操作并复制到追随者副本。可通过多种方式设置初始领导者,还可使用rabbitmq-queues rebalance命令重新平衡领导者分布。
    ○ 连续成员协调(CMR):是对副本管理的补充,可自动尝试将副本数量调整到目标值。可通过配置文件、策略或队列参数设置目标副本数量等相关参数,由特定事件触发,默认每60分钟检查一次。
  3. 行为与性能
    ○ 行为表现:依赖Raft协议确保数据一致性和安全性。领导者选举在集群形成或领导者不可用时进行,故障节点恢复或新副本加入时会进行同步。集群节点数量影响容错能力,建议在3 - 7个节点的集群中使用仲裁队列。
    ○ 性能特征:设计为用延迟换取吞吐量,在使用消费者确认和发布者确认的场景中,吞吐量优于经典镜像队列。受磁盘I/O影响大,消息越大、副本越多,吞吐量越低,建议使用高速磁盘和合理设置消费者预取值。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package com.lixiang.rabbitmq;

import com.google.gson.Gson;
import com.lixiang.rabbitmq.entity.SMS;
import com.lixiang.rabbitmq.utils.RabbitConstant;
import com.lixiang.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

/**
* OrderSystem 类用于模拟订单系统,将 SMS 消息发送到 RabbitMQ 队列。
* 该类创建与 RabbitMQ 的连接,声明队列,并将 SMS 消息转换为 JSON 格式后发送到队列。
*/
@Slf4j
public class OrderSystem {
public static void main(String[] args) throws IOException, TimeoutException {

try (Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();) {
Map<String, Object> arguments = new HashMap<>();
// 设置队列类型为quorum(仲裁队列),提供高可用性
arguments.put("x-queue-type", "quorum");
// 设置初始仲裁组大小为3,即3个副本

/*
一个三节点集群将会有三个副本,每个节点上一个副本。而在七节点集群中,三个节点会各自持有一个副本,但另外四个节点不会托管新声明队列的任何副本。

可以为仲裁队列配置复制因子(队列拥有的副本数量)。

实际可用的最小因子值为三。强烈建议使用奇数作为因子值,这样可以计算出明确的节点多数(quorum)。例如,在两节点集群中就不存在"多数"节点。这将在下文的"容错和最小在线副本数"部分通过更多示例说明。

对于大型集群或节点数为偶数的集群,这可能不太理想。要控制仲裁队列成员的数量,可以在声明队列时设置x-quorum-initial-group-size队列参数。提供的组大小参数应是一个大于零且小于或等于当前RabbitMQ集群大小的整数。仲裁队列将在声明时随机选择集群中的RabbitMQ节点子集来运行。

如果仲裁队列是在所有集群节点加入集群之前声明的,并且初始副本数大于集群成员总数,则实际使用的有效值将等于集群节点总数。当更多节点加入集群时,副本数不会自动增加,但操作员可以手动增加。

*/
arguments.put("x-quorum-initial-group-size", 3);
channel.queueDeclare(RabbitConstant.QUEUE_SMS_QUORUM, true, false, false, arguments);

for (int i = 100; i < 200; i++) {
SMS sms = new SMS("乘客" + i, "13900000" + i, "您的车票已预订成功");
String jsonSMS = new Gson().toJson(sms);
channel.basicPublish("", RabbitConstant.QUEUE_SMS_QUORUM, null, jsonSMS.getBytes());
}
log.info("订单信息已投递至Broker");
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.lixiang.rabbitmq;

import com.lixiang.rabbitmq.utils.RabbitConstant;
import com.lixiang.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;

@Slf4j
public class SmsSender {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();

channel.basicConsume(RabbitConstant.QUEUE_SMS_QUORUM, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String jsonSMS = new String(body);
log.info("SMSSender-短信发送成功:{}", jsonSMS);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}