
简单队列模式(Simple Queue)
● 特点:单生产者 → 单队列 → 单消费者,直接绑定默认交换机。
● 流程:生产者发送消息到指定队列,消费者监听该队列消费。
● 场景:基础入门场景,如单任务处理。
● 缺点:不支持多消费者负载均衡,队列需提前创建。
工作队列模式(Work Queue / Task Queue)
● 特点:单生产者 → 单队列 → 多消费者竞争消费,默认轮询分发(公平分发需配置prefetch)。
● 核心机制:消费者通过basic.qos设置预取数量,避免消息堆积不均。
● 场景:异步任务处理(如邮件发送、文件处理),横向扩展消费者提升吞吐量。
发布订阅模式(Publish/Subscribe)
● 特点:生产者 → Fanout交换机 → 绑定多个队列 → 每个队列独立消费者。
● 路由规则:Fanout交换机会将消息广播到所有绑定的队列,无视路由键。
● 场景:事件广播(如系统通知、日志采集到多终端)。
路由模式(Routing)
● 特点:生产者 → Direct交换机 → 根据精确路由键匹配队列。
● 绑定方式:队列绑定交换机时指定路由键(如error、info)。
● 场景:按消息类型分类处理(如将错误日志单独存储,普通日志丢弃)。
主题模式(Topics)
● 特点:生产者 → Topic交换机 → 基于通配符路由键(匹配单词,#匹配多级)灵活过滤。
● 示例:路由键stock.usd.nyse可匹配.usd.*或stock.#。
● 场景:复杂消息分类(如根据不同维度处理订单:支付成功、物流状态等)
RPC 模式(Remote Procedure Call)
● 特点:双向通信,通过临时回调队列实现请求-响应机制。
● 流程:
a. 客户端发送请求消息,附带reply_to(回调队列)和correlation_id(请求标识)。
b. 服务端处理请求后,将结果发送到指定回调队列。
c. 客户端监听回调队列,通过correlation_id匹配响应。
● 场景:分布式系统间的同步调用(如订单支付状态查询)。
模式选择指南
● 简单任务分发:工作队列模式(多消费者负载均衡)。
● 事件广播:发布订阅模式(Fanout交换机)。
● 按类型过滤:路由模式(Direct交换机)或主题模式(Topic交换机,更灵活)。
● 同步调用需求:RPC模式(注意性能开销)。
RabbitMQ的工作队列模式
工作队列模式(Work Queue Pattern),也称为任务队列模式,是RabbitMQ中最常见的消息传递模式之一。在这种模式下,生产者将任务发送到队列中,而多个消费者从队列中获取任务并处理。每个任务只会被一个消费者处理,从而实现任务的负载均衡。
● 核心特点
- 任务分发:任务由生产者发送到队列中,多个消费者竞争从队列中获取任务。
- 公平分发:默认情况下,RabbitMQ会以轮询的方式将消息分发给消费者。
- 任务确认机制:消费者在完成任务后需要向RabbitMQ发送确认(ACK),以确保任务不会丢失。
- 持久化支持:可以配置消息和队列的持久化,防止因服务重启导致任务丢失。
● 应用场景
- 任务分发:适用于需要将大量任务分发给多个工作节点进行处理的场景,例如批量数据处理、日志分析等。
- 负载均衡:通过多个消费者处理任务,避免单个消费者过载。
- 异步任务处理:将耗时任务放入队列中,由后台消费者异步处理,提升系统响应速度。
- 可靠性保障:通过消息确认机制和持久化配置,确保任务不会因系统故障而丢失。
流程图

流程说明
- 生产者将任务发送到任务队列中。
- 任务队列中的任务会被分发给多个消费者(Consumer1、Consumer2、Consumer3)。
- 每个消费者从队列中获取任务并进行处理。
- 消费者完成任务后,向RabbitMQ发送确认消息(ACK)。
- RabbitMQ收到确认后,将任务从队列中移除。
总结
工作队列模式是一种简单而强大的消息传递模式,能够有效解决任务分发和负载均衡问题。通过合理配置消息确认和持久化机制,可以确保系统的可靠性和稳定性。上述流程图清晰地展示了工作队列模式的核心流程,便于理解和实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>
<groupId>com.lixiang</groupId> <artifactId>ex00100</artifactId> <version>1.0-SNAPSHOT</version>
<properties> <maven.compiler.source>21</maven.compiler.source> <maven.compiler.target>21</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <!-- 添加RabbitMQ Java客户端 --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.20.0</version> </dependency> <!-- 新增Lombok依赖 --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.30</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.10</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.4.14</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>2.0.12</version> </dependency> </dependencies> </project>
|
1 2 3 4 5
| package com.lixiang.rabbitmq.utils;
public class RabbitConstant { public static final String QUEUE_SMS = "sms"; }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| package com.lixiang.rabbitmq.utils;
import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import lombok.SneakyThrows;
public class RabbitUtils {
private static ConnectionFactory connectionFactory = new ConnectionFactory();
static { connectionFactory.setHost("192.168.31.230"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); connectionFactory.setVirtualHost("my_vhost"); }
@SneakyThrows public static Connection getConnection() { Connection conn = connectionFactory.newConnection(); return conn; } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| package com.lixiang.rabbitmq.entity;
import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor;
@Data @AllArgsConstructor @NoArgsConstructor public class SMS { private String name; private String mobile; private String content; }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| package com.lixiang.rabbitmq;
import com.google.gson.Gson; import com.lixiang.rabbitmq.entity.SMS; import com.lixiang.rabbitmq.utils.RabbitConstant; import com.lixiang.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import lombok.extern.slf4j.Slf4j; import java.io.IOException; import java.util.concurrent.TimeoutException;
@Slf4j public class OrderSystem { public static void main(String[] args) throws IOException, TimeoutException {
try (Connection connection = RabbitUtils.getConnection(); Channel channel = connection.createChannel();) {
channel.queueDeclare(RabbitConstant.QUEUE_SMS, true, false, false, null);
for (int i = 100; i <= 200; i++) { SMS sms = new SMS("乘客" + i, "13900000" + i, "您的车票已预订成功"); String jsonSMS = new Gson().toJson(sms); channel.basicPublish("", RabbitConstant.QUEUE_SMS, null, jsonSMS.getBytes()); }
log.info("订单信息已投递至Broker"); } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| package com.lixiang.rabbitmq;
import com.lixiang.rabbitmq.utils.RabbitConstant; import com.lixiang.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.*; import lombok.extern.slf4j.Slf4j; import java.io.IOException;
@Slf4j public class SmsSender { public static void main(String[] args) throws IOException { Connection connection = RabbitUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(RabbitConstant.QUEUE_SMS, true, false, false, null); channel.basicQos(1); channel.basicConsume(RabbitConstant.QUEUE_SMS, false, new DefaultConsumer(channel) {
@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String jsonSMS = new String(body); log.info("SMSSender-短信发送成功:{}", jsonSMS); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } channel.basicAck(envelope.getDeliveryTag(), false); } }); } }
|