RabbitMQ六种队列模式

image-20210702-1

  1. 简单队列模式(Simple Queue)
    ● 特点:单生产者 → 单队列 → 单消费者,直接绑定默认交换机。
    ● 流程:生产者发送消息到指定队列,消费者监听该队列消费。
    ● 场景:基础入门场景,如单任务处理。
    ● 缺点:不支持多消费者负载均衡,队列需提前创建。

  2. 工作队列模式(Work Queue / Task Queue)
    ● 特点:单生产者 → 单队列 → 多消费者竞争消费,默认轮询分发(公平分发需配置prefetch)。
    ● 核心机制:消费者通过basic.qos设置预取数量,避免消息堆积不均。
    ● 场景:异步任务处理(如邮件发送、文件处理),横向扩展消费者提升吞吐量。

  3. 发布订阅模式(Publish/Subscribe)
    ● 特点:生产者 → Fanout交换机 → 绑定多个队列 → 每个队列独立消费者。
    ● 路由规则:Fanout交换机会将消息广播到所有绑定的队列,无视路由键。
    ● 场景:事件广播(如系统通知、日志采集到多终端)。

  4. 路由模式(Routing)
    ● 特点:生产者 → Direct交换机 → 根据精确路由键匹配队列。
    ● 绑定方式:队列绑定交换机时指定路由键(如error、info)。
    ● 场景:按消息类型分类处理(如将错误日志单独存储,普通日志丢弃)。

  5. 主题模式(Topics)
    ● 特点:生产者 → Topic交换机 → 基于通配符路由键(匹配单词,#匹配多级)灵活过滤。
    ● 示例:路由键stock.usd.nyse可匹配
    .usd.*或stock.#。
    ● 场景:复杂消息分类(如根据不同维度处理订单:支付成功、物流状态等)

  6. RPC 模式(Remote Procedure Call)
    ● 特点:双向通信,通过临时回调队列实现请求-响应机制。
    ● 流程:
    a. 客户端发送请求消息,附带reply_to(回调队列)和correlation_id(请求标识)。
    b. 服务端处理请求后,将结果发送到指定回调队列。
    c. 客户端监听回调队列,通过correlation_id匹配响应。
    ● 场景:分布式系统间的同步调用(如订单支付状态查询)。

模式选择指南

● 简单任务分发:工作队列模式(多消费者负载均衡)。
● 事件广播:发布订阅模式(Fanout交换机)。
● 按类型过滤:路由模式(Direct交换机)或主题模式(Topic交换机,更灵活)。
● 同步调用需求:RPC模式(注意性能开销)。

RabbitMQ的工作队列模式

工作队列模式(Work Queue Pattern),也称为任务队列模式,是RabbitMQ中最常见的消息传递模式之一。在这种模式下,生产者将任务发送到队列中,而多个消费者从队列中获取任务并处理。每个任务只会被一个消费者处理,从而实现任务的负载均衡。

● 核心特点

  1. 任务分发:任务由生产者发送到队列中,多个消费者竞争从队列中获取任务。
  2. 公平分发:默认情况下,RabbitMQ会以轮询的方式将消息分发给消费者。
  3. 任务确认机制:消费者在完成任务后需要向RabbitMQ发送确认(ACK),以确保任务不会丢失。
  4. 持久化支持:可以配置消息和队列的持久化,防止因服务重启导致任务丢失。

● 应用场景

  1. 任务分发:适用于需要将大量任务分发给多个工作节点进行处理的场景,例如批量数据处理、日志分析等。
  2. 负载均衡:通过多个消费者处理任务,避免单个消费者过载。
  3. 异步任务处理:将耗时任务放入队列中,由后台消费者异步处理,提升系统响应速度。
  4. 可靠性保障:通过消息确认机制和持久化配置,确保任务不会因系统故障而丢失。

流程图

image-20210702-2

流程说明

  1. 生产者将任务发送到任务队列中。
  2. 任务队列中的任务会被分发给多个消费者(Consumer1、Consumer2、Consumer3)。
  3. 每个消费者从队列中获取任务并进行处理。
  4. 消费者完成任务后,向RabbitMQ发送确认消息(ACK)。
  5. RabbitMQ收到确认后,将任务从队列中移除。

总结

工作队列模式是一种简单而强大的消息传递模式,能够有效解决任务分发和负载均衡问题。通过合理配置消息确认和持久化机制,可以确保系统的可靠性和稳定性。上述流程图清晰地展示了工作队列模式的核心流程,便于理解和实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.lixiang</groupId>
<artifactId>ex00100</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- 添加RabbitMQ Java客户端 -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.20.0</version>
</dependency>
<!-- 新增Lombok依赖 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.10</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.4.14</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.12</version>
</dependency>
</dependencies>
</project>
1
2
3
4
5
package com.lixiang.rabbitmq.utils;

public class RabbitConstant {
public static final String QUEUE_SMS = "sms";
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.lixiang.rabbitmq.utils;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.SneakyThrows;

/**
* RabbitUtils 类用于创建和管理 RabbitMQ 的连接。
* 该类使用静态代码块初始化连接工厂,并提供一个静态方法来获取 RabbitMQ 连接。
*/
public class RabbitUtils {
/**
* 静态的 ConnectionFactory 实例,用于创建 RabbitMQ 连接。
*/
private static ConnectionFactory connectionFactory = new ConnectionFactory();

static {
// 设置 RabbitMQ 服务器的主机地址
connectionFactory.setHost("192.168.31.230");
// 设置 RabbitMQ 服务器的端口号,5672 是 RabbitMQ 的默认端口号
connectionFactory.setPort(5672);
// 设置连接 RabbitMQ 服务器的用户名
connectionFactory.setUsername("admin");
// 设置连接 RabbitMQ 服务器的密码
connectionFactory.setPassword("admin");
// 设置连接 RabbitMQ 服务器的虚拟主机
connectionFactory.setVirtualHost("my_vhost");
}

/**
* 获取 RabbitMQ 的连接。
* 该方法使用静态初始化的 ConnectionFactory 创建一个新的连接。
*
* @return 一个 RabbitMQ 的连接对象
* @throws Exception 如果在创建连接过程中发生错误
*/
@SneakyThrows
public static Connection getConnection() {
// 使用 ConnectionFactory 创建一个新的连接
Connection conn = connectionFactory.newConnection();
return conn;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
package com.lixiang.rabbitmq.entity;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class SMS {
private String name;
private String mobile;
private String content;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package com.lixiang.rabbitmq;

import com.google.gson.Gson;
import com.lixiang.rabbitmq.entity.SMS;
import com.lixiang.rabbitmq.utils.RabbitConstant;
import com.lixiang.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* OrderSystem 类用于模拟订单系统,将 SMS 消息发送到 RabbitMQ 队列。
* 该类创建与 RabbitMQ 的连接,声明队列,并将 SMS 消息转换为 JSON 格式后发送到队列。
*/
@Slf4j
public class OrderSystem {
public static void main(String[] args) throws IOException, TimeoutException {
/*
* ### 1. 连接(Connection)
Connection 代表了应用程序与RabbitMQ服务器之间的物理TCP连接。当你使用 RabbitUtils.getConnection() 方法时,实际上是在建立一个到RabbitMQ服务器的网络连接。这个连接是一个相对较重的资源,因为它涉及到网络套接字的创建、TCP握手等操作。在一个应用程序中,通常只需要创建一个 Connection 实例,并且在整个应用程序的生命周期内复用它。

### 2. 通道(Channel)
Channel 是建立在 Connection 之上的轻量级抽象。它可以看作是一个虚拟连接,允许应用程序在同一个 Connection 上进行多个独立的操作。每个 Channel 都有自己独立的ID和状态,并且可以独立地进行消息的发送和接收。由于创建和销毁 Channel 的开销相对较小,因此在需要进行大量并发操作时,可以创建多个 Channel 来提高效率。
*/
try (Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();) {

// 声明消息队列(参数说明:队列名称,持久化,排他性,自动删除,其他参数)
// durable=true 表示队列持久化(服务重启后队列不会消失)
// exclusive=false 表示非排他队列(允许多消费者连接)
// autoDelete=false 表示不会自动删除(没有消费者也不会删除队列)
channel.queueDeclare(RabbitConstant.QUEUE_SMS, true, false, false, null);

// 循环发送 100 到 200 条 SMS 消息
for (int i = 100; i <= 200; i++) {
// 创建一个 SMS 对象
SMS sms = new SMS("乘客" + i, "13900000" + i, "您的车票已预订成功");
// 将 SMS 对象转换为 JSON 字符串
String jsonSMS = new Gson().toJson(sms);
// 发送 JSON 格式的 SMS 消息到 RabbitMQ 队列
channel.basicPublish("", RabbitConstant.QUEUE_SMS, null, jsonSMS.getBytes());
}

// 记录日志,表示订单信息已成功投递到 Broker
log.info("订单信息已投递至Broker");
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package com.lixiang.rabbitmq;

import com.lixiang.rabbitmq.utils.RabbitConstant;
import com.lixiang.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;

/**
* SMSSender 类用于从 RabbitMQ 队列中消费短信消息,并模拟短信发送操作。
* 该类通过 RabbitMQ 客户端连接到指定队列,接收消息并进行处理。
*/
@Slf4j
public class SmsSender {
public static void main(String[] args) throws IOException {
// 通过 RabbitUtils 工具类获取与 RabbitMQ 的连接
Connection connection = RabbitUtils.getConnection();
// 创建一个新的通道
final Channel channel = connection.createChannel();
// 声明一个队列,如果队列不存在则创建
channel.queueDeclare(RabbitConstant.QUEUE_SMS, true, false, false, null);
// 设置 QoS,确保每次只处理一条消息,避免消费者过载
channel.basicQos(1);
// 开始消费队列中的消息
channel.basicConsume(RabbitConstant.QUEUE_SMS, false, new DefaultConsumer(channel) {
/**
* 处理接收到的消息。
* 当从队列中接收到消息时,将消息转换为字符串并记录日志,模拟短信发送操作,
* 然后手动确认消息已处理。
*
* @param consumerTag 消费者标签,用于标识消费者。
* @param envelope 消息信封,包含消息的元数据。
* @param properties 消息的属性。
* @param body 消息的字节数组内容。
* @throws IOException 如果在处理消息过程中发生 I/O 错误。
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
// 将接收到的字节数组消息转换为字符串
String jsonSMS = new String(body);
// 记录日志,表示短信发送成功
log.info("SMSSender-短信发送成功:{}", jsonSMS);
try {
// 模拟短信发送的耗时操作
Thread.sleep(10);
} catch (InterruptedException e) {
// 打印异常堆栈信息
e.printStackTrace();
}
// 手动确认消息已处理,通知 RabbitMQ 可以从队列中移除该消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}