RabbitMQ集群
4369 端口
- 功能:这个端口是epmd(Erlang Port Mapper Daemon)所使用的端口。 epmd是Erlang分布式系统的基础服务,它负责管理和维护Erlang节点的名称和端口映射关系。当RabbitMQ节点需要与其他节点进行通信时,会借助epmd来定位目标节点的具体位置。
25672 端口
- 功能:该端口是RabbitMQ节点之间进行内部通信的端口。RabbitMQ采用Erlang语言编写,利用Erlang的分布式特性来实现集群功能。 25672 端口用于在集群内的节点之间传输各种控制信息和数据,像队列同步、镜像队列的复制等操作都依赖这个端口。
.erlang.cookie 文件是一个非常重要的安全凭证文件,它的主要作用是:
- 集群节点认证:RabbitMQ节点之间通过比较这个cookie值来验证彼此的身份
- 安全机制 :相当于集群节点间的共享密钥,只有cookie值相同的节点才能组成集群
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| mkdir /home/rabbitmq-cluster cd /home/rabbitmq-cluster cat > docker-compose.yaml <<-'EOF' services: rabbit1: image: rabbitmq:4.0-management container_name: rabbit1 hostname: rabbit1 ports: - "25672:25672" - "5672:5672" - "15672:15672" - "4369:4369" environment: - RABBITMQ_NODENAME=rabbit1 - RABBITMQ_ERLANG_COOKIE=CURIOAPPLICATION - RABBITMQ_DEFAULT_USER=admin - RABBITMQ_DEFAULT_PASS=admin - RABBITMQ_DEFAULT_VHOST=my_vhost volumes: - "/var/lib/rabbitmq/.erlang.cookie:/root/.erlang.cookie"
rabbit2: image: rabbitmq:4.0-management container_name: rabbit2 hostname: rabbit2 environment: - RABBITMQ_NODENAME=rabbit2 - RABBITMQ_ERLANG_COOKIE=CURIOAPPLICATION - RABBITMQ_DEFAULT_VHOST=my_vhost ports: - "5673:5672" - "15673:15672" volumes: - "/var/lib/rabbitmq/.erlang.cookie:/root/.erlang.cookie"
rabbit3: image: rabbitmq:4.0-management container_name: rabbit3 hostname: rabbit3 environment: - RABBITMQ_NODENAME=rabbit3 - RABBITMQ_ERLANG_COOKIE=CURIOAPPLICATION - RABBITMQ_DEFAULT_VHOST=my_vhost ports: - "5674:5672" - "15674:15672" volumes: - "/var/lib/rabbitmq/.erlang.cookie:/root/.erlang.cookie" EOF
docker-compose up -d
|
● rabbitmqctl:这是RabbitMQ提供的一个命令行工具,用于管理RabbitMQ节点和集群。
● join_cluster:是rabbitmqctl工具的一个子命令,其作用是将当前运行此命令的RabbitMQ节点加入到指定的集群中。
● rabbit1@rabbit1:这是目标集群中一个节点的名称。在RabbitMQ里,节点名称的格式为nodename@hostname,nodename是节点的名称,hostname是节点所在主机的名称。
1 2 3 4 5 6 7 8 9 10 11
| [root@docker-vm rabbitmq-cluster]# docker exec -it rabbit1 /bin/bash root@rabbit1:/# rabbitmqctl stop_app rabbitmqctl reset rabbitmqctl join_cluster rabbit1@rabbit1 rabbitmqctl start_app Stopping rabbit application on node rabbit1@rabbit1 ... Resetting node rabbit1@rabbit1 ... Clustering node rabbit1@rabbit1 with rabbit1@rabbit1 Error: Error: cannot cluster node with itself: rabbit1@rabbit1 Starting node rabbit1@rabbit1 ...
|
1 2 3 4 5
| [root@docker-vm rabbitmq-cluster]# docker exec rabbit2 rabbitmqctl join_cluster rabbit1@rabbit1 Clustering node rabbit2@rabbit2 with rabbit1@rabbit1
[root@docker-vm rabbitmq-cluster]# docker exec rabbit3 rabbitmqctl join_cluster rabbit1@rabbit1 Clustering node rabbit3@rabbit3 with rabbit1@rabbit1
|
http://192.168.31.230:15672/#/queues/%2F/sms.quorum
1
| arguments.put("x-quorum-initial-group-size", 1);
|

1
| arguments.put("x-quorum-initial-group-size", 3);
|

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| package com.lixiang.rabbitmq;
import com.google.gson.Gson; import com.lixiang.rabbitmq.entity.SMS; import com.lixiang.rabbitmq.utils.RabbitConstant; import com.lixiang.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import lombok.extern.slf4j.Slf4j; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeoutException;
@Slf4j public class OrderSystem { public static void main(String[] args) throws IOException, TimeoutException {
try (Connection connection = RabbitUtils.getConnection(); Channel channel = connection.createChannel();) { Map<String, Object> arguments = new HashMap<>(); arguments.put("x-queue-type", "quorum");
arguments.put("x-quorum-initial-group-size", 3); channel.queueDeclare(RabbitConstant.QUEUE_SMS_QUORUM, true, false, false, arguments); for (int i = 100; i < 200; i++) { SMS sms = new SMS("乘客" + i, "13900000" + i, "您的车票已预订成功"); String jsonSMS = new Gson().toJson(sms); channel.basicPublish("", RabbitConstant.QUEUE_SMS_QUORUM, null, jsonSMS.getBytes()); } log.info("订单信息已投递至Broker"); } } }
|