构建RabbitMQ集群

RabbitMQ集群

4369 端口

  • 功能:这个端口是epmd(Erlang Port Mapper Daemon)所使用的端口。 epmd是Erlang分布式系统的基础服务,它负责管理和维护Erlang节点的名称和端口映射关系。当RabbitMQ节点需要与其他节点进行通信时,会借助epmd来定位目标节点的具体位置。

25672 端口

  • 功能:该端口是RabbitMQ节点之间进行内部通信的端口。RabbitMQ采用Erlang语言编写,利用Erlang的分布式特性来实现集群功能。 25672 端口用于在集群内的节点之间传输各种控制信息和数据,像队列同步、镜像队列的复制等操作都依赖这个端口。

.erlang.cookie 文件是一个非常重要的安全凭证文件,它的主要作用是:

  • 集群节点认证:RabbitMQ节点之间通过比较这个cookie值来验证彼此的身份
  • 安全机制 :相当于集群节点间的共享密钥,只有cookie值相同的节点才能组成集群
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
mkdir /home/rabbitmq-cluster
cd /home/rabbitmq-cluster
cat > docker-compose.yaml <<-'EOF'
services:
rabbit1:
image: rabbitmq:4.0-management
container_name: rabbit1
hostname: rabbit1
ports:
- "25672:25672"
- "5672:5672"
- "15672:15672"
- "4369:4369"
environment:
- RABBITMQ_NODENAME=rabbit1
- RABBITMQ_ERLANG_COOKIE=CURIOAPPLICATION
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=admin
- RABBITMQ_DEFAULT_VHOST=my_vhost
volumes:
- "/var/lib/rabbitmq/.erlang.cookie:/root/.erlang.cookie"

rabbit2:
image: rabbitmq:4.0-management
container_name: rabbit2
hostname: rabbit2
environment:
- RABBITMQ_NODENAME=rabbit2
- RABBITMQ_ERLANG_COOKIE=CURIOAPPLICATION
- RABBITMQ_DEFAULT_VHOST=my_vhost
ports:
- "5673:5672"
- "15673:15672"
volumes:
- "/var/lib/rabbitmq/.erlang.cookie:/root/.erlang.cookie"

rabbit3:
image: rabbitmq:4.0-management
container_name: rabbit3
hostname: rabbit3
environment:
- RABBITMQ_NODENAME=rabbit3
- RABBITMQ_ERLANG_COOKIE=CURIOAPPLICATION
- RABBITMQ_DEFAULT_VHOST=my_vhost
ports:
- "5674:5672"
- "15674:15672"
volumes:
- "/var/lib/rabbitmq/.erlang.cookie:/root/.erlang.cookie"
EOF

docker-compose up -d

● rabbitmqctl:这是RabbitMQ提供的一个命令行工具,用于管理RabbitMQ节点和集群。
● join_cluster:是rabbitmqctl工具的一个子命令,其作用是将当前运行此命令的RabbitMQ节点加入到指定的集群中。
● rabbit1@rabbit1:这是目标集群中一个节点的名称。在RabbitMQ里,节点名称的格式为nodename@hostname,nodename是节点的名称,hostname是节点所在主机的名称。

1
2
3
4
5
6
7
8
9
10
11
[root@docker-vm rabbitmq-cluster]# docker exec -it rabbit1 /bin/bash
root@rabbit1:/# rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit1@rabbit1
rabbitmqctl start_app
Stopping rabbit application on node rabbit1@rabbit1 ...
Resetting node rabbit1@rabbit1 ...
Clustering node rabbit1@rabbit1 with rabbit1@rabbit1
Error:
Error: cannot cluster node with itself: rabbit1@rabbit1
Starting node rabbit1@rabbit1 ...
1
2
3
4
5
[root@docker-vm rabbitmq-cluster]# docker exec rabbit2 rabbitmqctl join_cluster rabbit1@rabbit1
Clustering node rabbit2@rabbit2 with rabbit1@rabbit1

[root@docker-vm rabbitmq-cluster]# docker exec rabbit3 rabbitmqctl join_cluster rabbit1@rabbit1
Clustering node rabbit3@rabbit3 with rabbit1@rabbit1

http://192.168.31.230:15672/#/queues/%2F/sms.quorum

1
arguments.put("x-quorum-initial-group-size", 1);

image-20210718-1

1
arguments.put("x-quorum-initial-group-size", 3);

image-20210718-2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package com.lixiang.rabbitmq;

import com.google.gson.Gson;
import com.lixiang.rabbitmq.entity.SMS;
import com.lixiang.rabbitmq.utils.RabbitConstant;
import com.lixiang.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

/**
* OrderSystem 类用于模拟订单系统,将 SMS 消息发送到 RabbitMQ 队列。
* 该类创建与 RabbitMQ 的连接,声明队列,并将 SMS 消息转换为 JSON 格式后发送到队列。
*/
@Slf4j
public class OrderSystem {
public static void main(String[] args) throws IOException, TimeoutException {

try (Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();) {
Map<String, Object> arguments = new HashMap<>();
// 设置队列类型为quorum(仲裁队列),提供高可用性
arguments.put("x-queue-type", "quorum");
// 设置初始仲裁组大小为3,即3个副本

/*
一个三节点集群将会有三个副本,每个节点上一个副本。而在七节点集群中,三个节点会各自持有一个副本,但另外四个节点不会托管新声明队列的任何副本。

可以为仲裁队列配置复制因子(队列拥有的副本数量)。

实际可用的最小因子值为三。强烈建议使用奇数作为因子值,这样可以计算出明确的节点多数(quorum)。例如,在两节点集群中就不存在"多数"节点。这将在下文的"容错和最小在线副本数"部分通过更多示例说明。

对于大型集群或节点数为偶数的集群,这可能不太理想。要控制仲裁队列成员的数量,可以在声明队列时设置x-quorum-initial-group-size队列参数。提供的组大小参数应是一个大于零且小于或等于当前RabbitMQ集群大小的整数。仲裁队列将在声明时随机选择集群中的RabbitMQ节点子集来运行。

如果仲裁队列是在所有集群节点加入集群之前声明的,并且初始副本数大于集群成员总数,则实际使用的有效值将等于集群节点总数。当更多节点加入集群时,副本数不会自动增加,但操作员可以手动增加。

*/
arguments.put("x-quorum-initial-group-size", 3);
channel.queueDeclare(RabbitConstant.QUEUE_SMS_QUORUM, true, false, false, arguments);

for (int i = 100; i < 200; i++) {
SMS sms = new SMS("乘客" + i, "13900000" + i, "您的车票已预订成功");
String jsonSMS = new Gson().toJson(sms);
channel.basicPublish("", RabbitConstant.QUEUE_SMS_QUORUM, null, jsonSMS.getBytes());
}
log.info("订单信息已投递至Broker");
}
}
}