RabbitMQ消费者优先级与优先级队列

什么是消费者优先级

消费者优先级是RabbitMQ提供的一种机制,它允许为不同的消费者设置不同的优先级,从而影响消息的分发顺序。当多个消费者同时监听同一个队列时,高优先级的消费者会比低优先级的消费者获得更多的消息。

基本概念:

● 高优先级消费者:优先接收消息,只有在高优先级消费者无法处理时(如达到预取限制或繁忙状态),消息才会被分发给低优先级消费者
● 公平分发:在相同优先级的消费者之间,RabbitMQ仍然采用轮询(round-robin)的方式公平分发消息
● 优先级范围:通常支持0-255的优先级值,数字越大表示优先级越高

如何实现消费者优先级?

声明消费者时设置优先级

1
2
3
4
5
Channel channel = ...;
Consumer consumer = ...;
Map<String, Object> args = new HashMap<>();
args.put("x-priority", 10); // 设置优先级为10
channel.basicConsume("queue_name", false, args, consumer);

消费者优先级的工作原理

RabbitMQ实现消费者优先级的核心机制:

  1. 活跃消费者列表:RabbitMQ维护一个按优先级排序的消费者列表
  2. 消息分发算法:
    ○ 首先尝试将消息分发给最高优先级的可用消费者
    ○ 如果高优先级消费者达到预取限制(prefetch limit)或处于繁忙状态,则尝试下一优先级
    ○ 相同优先级的消费者之间使用轮询策略
  3. 优先级动态调整:消费者优先级可以在运行时动态修改

最佳实践与注意事项

  1. 合理设置优先级范围:建议使用适度的优先级值(如1-10),避免过度细分
  2. 预取计数配置:高优先级消费者可设置较大的prefetch count以提高吞吐量
    channel.basicQos(20); // 高优先级消费者预取20条
  3. 监控消费者分布:使用RabbitMQ管理插件监控消息分发情况
    rabbitmqctl list_consumers
  4. 与消息优先级配合使用:可以同时使用消息优先级和消费者优先级实现更精细控制
  5. 避免饥饿现象:确保低优先级消费者仍能获得部分消息,防止完全被阻塞

性能考量

  1. 内存开销:优先级实现需要额外的内存维护消费者列表,但影响通常很小
  2. CPU开销:优先级排序在消息分发时进行,对性能影响可以忽略
  3. 网络利用率:合理设置优先级可以优化网络资源使用,让重要消息优先传输

与其他RabbitMQ特性的交互

  1. 消息优先级:消费者优先级和消息优先级是两个独立但互补的特性
  2. 死信队列:被拒绝或过期的消息不受消费者优先级影响
  3. TTL设置:消息过期检查优先于消费者优先级
  4. 集群环境:优先级机制在集群中的所有节点上保持一致
1
2
3
4
5
package com.lixiang.rabbitmq.utils;

public class RabbitConstant {
public static final String QUEUE_SMS = "sms.quorum";
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.lixiang.rabbitmq;

import com.google.gson.Gson;
import com.lixiang.rabbitmq.entity.SMS;
import com.lixiang.rabbitmq.utils.RabbitConstant;
import com.lixiang.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* OrderSystem 类用于模拟订单系统,将 SMS 消息发送到 RabbitMQ 队列。
* 该类创建与 RabbitMQ 的连接,声明队列,并将 SMS 消息转换为 JSON 格式后发送到队列。
*/
@Slf4j
public class OrderSystem {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
/*
* ### 1. 连接(Connection)
Connection 代表了应用程序与RabbitMQ服务器之间的物理TCP连接。当你使用 RabbitUtils.getConnection() 方法时,实际上是在建立一个到RabbitMQ服务器的网络连接。这个连接是一个相对较重的资源,因为它涉及到网络套接字的创建、TCP握手等操作。在一个应用程序中,通常只需要创建一个 Connection 实例,并且在整个应用程序的生命周期内复用它。

### 2. 通道(Channel)
Channel 是建立在 Connection 之上的轻量级抽象。它可以看作是一个虚拟连接,允许应用程序在同一个 Connection 上进行多个独立的操作。每个 Channel 都有自己独立的ID和状态,并且可以独立地进行消息的发送和接收。由于创建和销毁 Channel 的开销相对较小,因此在需要进行大量并发操作时,可以创建多个 Channel 来提高效率。
*/
try (Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();) {

// 声明消息队列(参数说明:队列名称,持久化,排他性,自动删除,其他参数)
// durable=true 表示队列持久化(服务重启后队列不会消失)
// exclusive=false 表示非排他队列(允许多消费者连接)
// autoDelete=false 表示不会自动删除(没有消费者也不会删除队列)
channel.queueDeclare(RabbitConstant.QUEUE_SMS, true, false, false, null);

// 循环发送 100 到 200 条 SMS 消息
for (int i = 100; i <= 200; i++) {
// 创建一个 SMS 对象
SMS sms = new SMS("乘客" + i, "13900000" + i, "您的车票已预订成功");
// 将 SMS 对象转换为 JSON 字符串
String jsonSMS = new Gson().toJson(sms);
// 发送 JSON 格式的 SMS 消息到 RabbitMQ 队列
channel.basicPublish("", RabbitConstant.QUEUE_SMS, null, jsonSMS.getBytes());
Thread.sleep(100);
}

// 记录日志,表示订单信息已成功投递到 Broker
log.info("订单信息已投递至Broker");
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.lixiang.rabbitmq;

import com.lixiang.rabbitmq.utils.RabbitConstant;
import com.lixiang.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

@Slf4j
public class SmsSenderHigh {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();

channel.basicQos(5);
channel.queueDeclare(RabbitConstant.QUEUE_SMS, true, false, false, null);

// 创建消费者参数Map,用于设置消费者属性
Map<String, Object> cuArgs = new HashMap<>();
// 设置消费者优先级为10(数值越大优先级越高)
cuArgs.put("x-priority", 10);
// 开始消费队列消息,参数说明:
// RabbitConstant.QUEUE_SMS - 要消费的队列名称
// false - 关闭自动确认,需要手动调用basicAck
// cuArgs - 消费者参数配置
// new DefaultConsumer... - 消息处理回调函数
channel.basicConsume(RabbitConstant.QUEUE_SMS, false, cuArgs, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String jsonSMS = new String(body);
log.info("SMSSender-短信发送成功:{}", jsonSMS);
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.lixiang.rabbitmq;

import com.lixiang.rabbitmq.utils.RabbitConstant;
import com.lixiang.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

@Slf4j
public class SmsSenderLow {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.basicQos(5);

channel.queueDeclare(RabbitConstant.QUEUE_SMS, true, false, false, null);

Map<String, Object> cuArgs = new HashMap<>();
cuArgs.put("x-priority", 1);

channel.basicConsume(RabbitConstant.QUEUE_SMS, false,cuArgs, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String jsonSMS = new String(body);
log.info("SMSSender-短信发送成功:{}", jsonSMS);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}

RabbitMQ优先级队列

RabbitMQ 提供了为经典队列添加“优先级”功能的支持。启用了这一特性的经典队列通常被定义为“优先级队列”。该系统允许设置的优先级范围是从1到255,但官方推荐使用1至5之间的值以优化性能。值得注意的是,较高的优先级设定会相应地增加对CPU和内存资源的需求,这是因为RabbitMQ需要针对每个指定的优先级(从1到队列配置中所设的最大值)维护独立的子队列。
要创建一个优先级队列,必须在队列声明时指定x-max-priority参数。此参数应设置为介于1与255之间的一个整数,用以表示该队列能够接受的消息最高优先级水平。以下示例展示了如何使用Java API完成这一配置:

1
2
3
4
Channel ch = ...;
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-priority", 10);
ch.queueDeclare("my-priority-queue", true, false, false, args);

生产者可以通过basic.properties中的priority字段来发布具有优先级的消息,其中数值越大代表消息的优先级越高。

优先级队列的行为

AMQP 0-9-1规范对于消息优先级的具体实现方式描述较为宽泛。该规范要求所有队列至少支持两个不同的优先级,并且最多可以支持至十个优先级层级。然而,对于那些未明确指定优先级属性的消息如何处理,规范中并未给出明确规定。
默认情况下,RabbitMQ的标准队列并不直接支持消息优先级功能。当创建支持优先级的队列时,可以根据具体需求设定最大允许的优先级值。在选择具体的优先级数目时,需要综合考量以下几个关键因素:
● 资源消耗:每个队列内的每一个优先级层次都会额外占用内存和磁盘空间,同时也会增加CPU使用率(尤其是在消息被消费的过程中)。因此,不推荐设置过多的优先级层次。
● 优先级范围:根据定义,消息优先级是一个无符号字节,这意味着其有效取值区间为0到255。
● 默认优先级:对于那些没有显式设置优先级属性的消息,默认将以优先级0进行处理。
● 超出限制的情况:任何超过队列所配置的最大优先级数的消息都将按照最高优先级来进行处理。

最大优先级数与资源消耗的关系

基于上述考虑,建议将优先级的数量控制在一个相对较小的范围内,例如1到5之间。如果确实需要更多层级,则建议上限不超过1到10之间的单个位数。这是因为当前系统架构下,随着优先级数量的增长,会相应地启动更多的Erlang进程,这不仅会增加CPU资源的消耗,还可能对运行时的任务调度产生不利影响。

消费者与优先级队列的协作机制

当消费者连接到一个空的优先级队列时,随后发布的消息可能会被立即消费,而不必在队列中等待。这种情况下,优先级队列实际上并未执行任何消息排序功能,从而使得优先级机制未能发挥作用。
然而,在大多数实际应用场景中,推荐的做法是让消费者采用手动确认模式,并通过basic.qos(预取)方法来控制每次可接收的消息数量。这样做可以确保优先级机制的有效性。basic.qos参数是在消费者连接至队列时设定的,它定义了该消费者一次能够处理的最大消息数。
以下将详细探讨消费者与优先级队列之间的协作方式,并指出在某些情况下,即使是高优先级的消息也可能需要等待低优先级消息先被处理的现象。

示例演示

  1. 一名新消费者以basic.qos=10作为连接参数加入了一个空的经典(非优先级)队列。
  2. 一旦有一条消息发布,则这条消息会被立即传递给消费者进行处理。
  3. 接着快速连续地发布了5条消息;由于当前只有1条未被确认的消息(少于qos设置值10),这些新增的消息也会立刻被发送给消费者。
  4. 如果紧接着再迅速发布10条消息,在已有条件下仅允许额外4条进入处理流程(因已达到qos上限10),剩余6条则需暂时停留在队列中处于就绪状态。
  5. 当消费者完成对前5条消息的确认后,原先等待中的6条消息中有5条会即刻被投递出去。

引入优先级后的场景

  1. 同样按照上述示例,假设消费者以basic.qos=10的方式连接。
  2. 发布了10条属于较低优先级别的消息,并且这些消息立刻被分配给了消费者(此时已达到qos限制)。
  3. 如果在此之后又尝试发布一条最高优先级的消息,但由于当前预取量已达上限,这条高优先级消息只能等待直到有空间可用为止。

与其他特性的交互考量

通常来说,优先级队列具备标准RabbitMQ队列的所有特性,但在设计时需要注意一些特殊交互情况:
● 对设置了TTL(Time To Live)属性的消息而言,它们总是从队列头部开始过期。这意味着即使指定了单个队列级别的TTL策略,低优先级的消息仍可能因为被高优先级消息阻挡而无法按时过期。尽管这些信息永远不会被投递给消费者,但它们仍然会计入队列统计之中。
● 当设置了最大长度(max-length)限制的队列达到其容量上限时,系统将从队首开始删除旧消息以便为新到来的数据腾出空间。在这种情形下,可能存在为了容纳新的低优先级消息而不得不丢弃部分现有高优先级记录的风险,这显然违背了预期目的。

优先级队列的应用场景

在专业环境中,RabbitMQ的优先级队列功能可以应用于多种场景,以确保关键消息能够得到及时处理。

  1. 紧急任务调度:在需要对任务按紧急程度进行区分的情况下,如系统监控警报、故障恢复请求等,通过设置不同的优先级来保证高优先级的消息被优先消费。
  2. 客户服务支持:对于客服系统而言,可以根据客户的重要性和问题的严重性给求助信息分配不同的优先级,从而使得重要客户的询问或严重的投诉能够更快地获得响应。
  3. 金融交易处理:在金融市场中,某些类型的订单(比如大额交易)可能需要比普通订单更快速地被执行。利用优先级队列可以帮助实现这一目标。
  4. 资源分配优化:当面临有限计算资源时,可以通过为不同类型的任务设定不同级别的优先级,来优化资源使用效率,确保最重要或者最紧迫的任务首先完成。
    总之,通过灵活配置RabbitMQ中的优先级队列,企业能够在多个领域内提升服务质量与效率,更好地满足业务需求。

代码案例

1
2
3
4
5
6
7
package com.lixiang.rabbitmq.utils;

public class RabbitConstant {
public static final String PRIORITY_EXCHANGE = "priority.exchange";
public static final String PRIORITY_QUEUE = "priority.queue";
public static final String PRIORITY_ROUTING_KEY = "priority_routing_key";
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.lixiang.rabbitmq.utils;

import java.util.HashMap;
import java.util.Map;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import lombok.SneakyThrows;

public class RabbitUtils {
private static ConnectionFactory connectionFactory = new ConnectionFactory();

static {
connectionFactory.setHost("192.168.31.230");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("my_vhost");
}

@SneakyThrows
public static Connection getConnection() {
Connection conn = connectionFactory.newConnection();
return conn;
}

@SneakyThrows
public static void init(Channel channel) {
// 设置队列参数,启用优先级支持(最大优先级为5)
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-max-priority", 5);

// 声明交换器(direct类型)
channel.exchangeDeclare(RabbitConstant.PRIORITY_EXCHANGE, "direct", true);
// 声明优先级队列
channel.queueDeclare(RabbitConstant.PRIORITY_QUEUE, true, false, false, arguments);
// 绑定队列到交换器
channel.queueBind(RabbitConstant.PRIORITY_QUEUE,
RabbitConstant.PRIORITY_EXCHANGE,
RabbitConstant.PRIORITY_ROUTING_KEY);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package com.lixiang.rabbitmq.demo;

import com.lixiang.rabbitmq.utils.RabbitUtils;
import com.lixiang.rabbitmq.utils.RabbitConstant;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import lombok.extern.slf4j.Slf4j;

import com.rabbitmq.client.AMQP;
@Slf4j
public class PriorityProducer {
public static void main(String[] args) throws Exception {
Connection conn = RabbitUtils.getConnection();
Channel channel = conn.createChannel();
RabbitUtils.init(channel);

// 发送5条不同优先级的消息
for (int i = 1; i <= 5; i++) {
String message = "优先级为" + i + "的消息";
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.priority(i) // 设置优先级(1-5)
.build();

channel.basicPublish(RabbitConstant.PRIORITY_EXCHANGE,
RabbitConstant.PRIORITY_ROUTING_KEY,
props,
message.getBytes());
log.info("已发送: {}" , message);
}

channel.close();
conn.close();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.lixiang.rabbitmq.demo;

import com.lixiang.rabbitmq.utils.RabbitUtils;
import com.lixiang.rabbitmq.utils.RabbitConstant;
import com.rabbitmq.client.*;

import lombok.extern.slf4j.Slf4j;
@Slf4j
public class PriorityConsumer {
public static void main(String[] args) throws Exception {
Connection conn = RabbitUtils.getConnection();
Channel channel = conn.createChannel();
RabbitUtils.init(channel);

// 设置每次只消费一条消息
channel.basicQos(1);

// 创建消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) {
String message = new String(body);
log.info("收到消息: " + message + " | 优先级: " + properties.getPriority());
try {
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (Exception e) {
e.printStackTrace();
}
}
};

// 开始消费
channel.basicConsume(RabbitConstant.PRIORITY_QUEUE, false, consumer);
System.out.println("消费者已启动,等待接收消息...");
}
}