RabbitMQ队列与消息TTL

队列 TTL(Time To Live)

通过 RabbitMQ,用户能够为消息及队列配置TTL(Time To Live,生存时间)参数或策略。TTL定义了消息或队列在系统中的最大存活期限。
对于消息而言,其TTL决定了该消息在被消费之前可以在队列中保留的最大时长。一旦消息停留的时间超过了设定的TTL值,则该消息将被视为过期,并从队列中移除。“移除”意味着这条消息既不会被分发给任何订阅者,也无法通过调用basic.get方法直接从队列中获取。值得注意的是,消息TTL可以针对单个消息、特定队列或是整个队列组进行设置。
此外,TTL不仅限于应用于消息层面,还可以对整个队列生效。这一特性与自动删除属性相结合使用时尤为有用。特别是对于非持久化的经典队列来说,设置队列级别的TTL通常更加有意义;然而,需指出的是流式队列并不支持这种过期机制。当一个队列处于未被活跃使用的状态时(即没有消费者在线连接到该队列),它会在达到指定的TTL后自动删除。
TTL的行为可以通过可选的队列参数来调控,而最推荐的做法是通过策略来进行这些配置。同时,管理员也可以通过制定策略来强制执行某些TTL相关的规则。

队列级别的消息 TTL

可通过策略设置 message-ttl 参数,或在声明队列时指定相同参数来为特定队列设置消息 TTL。
在队列中停留时间超过配置 TTL 的消息将被视为过期。注意,路由到多个队列的消息可能在每个队列中以不同时间过期(或完全不过期)。某队列中消息的过期不会影响其他队列中的同一消息。
服务器保证过期消息不会通过 basic.deliver 投递(给消费者),也不会在响应轮询消费者时发送(在 basic.get-ok 响应中)。
此外,服务器会尝试在消息基于 TTL 的过期时间到达时或之后尽快移除它们。
TTL 参数或策略的值必须是非负整数(≥0),以毫秒为单位描述 TTL 时长。例如,值 1000 表示消息在队列中存活 1 秒或直到被投递给消费者。该参数可以是 AMQP 0-9-1 类型的 short-short-int、short-int、long-int 或 long-long-int。

使用策略为队列定义消息 TTL

以下 Java 示例创建一个消息最多保留 60 秒的队列:

1
2
3
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 60000);
channel.queueDeclare("myqueue", false, false, false, args);

可以对已包含消息的队列应用消息 TTL 策略,但需注意以下情况:
● 当消息被重新入队时(例如使用带 requeue 参数的 AMQP 方法,或因通道关闭),其原始过期时间会被保留。
● 将 TTL 设置为 0 会使消息在到达队列时立即过期,除非能立即投递给消费者。这提供了替代 RabbitMQ 服务器不支持的 immediate 发布标志的方案。与该标志不同,不会生成 basic.returns,如果设置了死信交换,消息会被死信化。

发布者设置的逐条消息 TTL

在发布消息时,可以通过设置 expiration 属性来实现每条消息的生存时间(TTL)。expiration 字段应以毫秒为单位指定 TTL 的持续时间,这一要求与 x-message-ttl 参数的规定一致。需要注意的是,expiration 字段必须采用字符串形式提供,即代理仅接受该字段值作为数字的字符串表示。
如果同时设置了队列级别的 TTL 以及单个消息级别的 TTL,则实际生效的将是两者中较小的那个值。
以下 Java 示例使用 RabbitMQ 客户端发布一条最多保留 60 秒的消息:

1
2
3
4
5
byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.expiration("60000")
.build();
channel.basicPublish("my-exchange", "routing-key", properties, messageBodyBytes);

逐条消息 TTL 与死信机制

仅当过期消息位于队列头部时,才会被实际删除(即标记为已删除)。消费者不会接收到这些过期的消息。需要注意的是,在消息过期与消费者接收之间可能存在一种自然的竞争状态,例如,一条消息可能在发送到套接字之后但在到达消费者之前就已经过期。
对于设置了逐条消息时间生存期(TTL)的情况,过期的消息可能会累积在尚未过期的消息之后,直到后者被消费或同样过期为止。因此,这些过期但未被清理的消息将继续占用系统资源,并且会被计入队列的统计信息中,比如队列中的总消息数量。
当对现有队列追溯性地应用逐条消息TTL策略时,建议保持消费者在线以加速过期消息的清除过程。
鉴于上述针对单个消息设置TTL的行为特点,在需要通过删除消息来释放存储空间等资源的情况下,推荐采用整个队列级别的TTL设置方法,或者考虑使用队列清空、队列删除等功能作为替代方案。

队列 TTL

时间至存活(TTL)特性不仅适用于队列中的消息,也适用于队列本身。这一机制能够与自动删除队列属性协同工作,从而实现更灵活的资源管理。
对于队列级别的 TTL(即过期设置),主要针对的是瞬态(非持久化)的经典队列类型;而流式队列则不支持此类过期功能。当一个队列处于未使用状态时,它将在预设的时间后被标记为可删除状态——这里“未使用”的定义是指该队列当前没有任何活跃消费者、最近没有被重新声明(重新声明操作会重置其生存期限),并且在过去的一个完整过期周期内没有执行过 basic.get 操作。这种配置特别适合于如远程过程调用(RPC)响应队列这样的场景,其中大量创建的队列可能最终并不需要实际处理任何消息。
系统确保在至少达到一次完整的设定过期时间之后,如果队列仍然保持未使用状态,则会被安全地移除。但是,请注意,虽然可以保证超过过期时限后队列将被安排删除,但并不能保证其会在恰好到达那个时刻立即被清除。
无论是通过 queue.declare 方法中指定 x-expires 参数还是通过设置 expires 策略来定义队列的过期行为,所给定的值都必须是以毫秒为单位的正整数(不同于消息级别 TTL 允许设置为 0 的情况)。例如,设定值为 1000 意味着如果该队列连续 1 秒钟未被访问,则满足了自动删除条件。

1
2
3
Map<String, Object> args = new HashMap<>();
args.put("x-expires", 1800000);
channel.queueDeclare("myqueue", false, false, false, args);
1
2
3
4
5
package com.lixiang.rabbitmq.utils;

public class RabbitConstant {
public static final String QUEUE_SMS_TTL = "sms.ttl";
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.lixiang.rabbitmq;

import com.google.gson.Gson;
import com.lixiang.rabbitmq.entity.SMS;
import com.lixiang.rabbitmq.utils.RabbitConstant;
import com.lixiang.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

@Slf4j
public class OrderSystem {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
try (Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();) {
Map<String, Object> quArgs = new HashMap<>();
quArgs.put("x-message-ttl", 10000);
channel.queueDeclare(RabbitConstant.QUEUE_SMS_TTL, true, false, false, quArgs);

for (int i = 100; i < 200; i++) {
SMS sms = new SMS("乘客" + i, "13900000" + i, "您的车票已预订成功");
String jsonSMS = new Gson().toJson(sms);
channel.basicPublish("", RabbitConstant.QUEUE_SMS_TTL, null, jsonSMS.getBytes());
}

log.info("订单信息已投递至Broker");
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.lixiang.rabbitmq;

import com.lixiang.rabbitmq.utils.RabbitConstant;
import com.lixiang.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

@Slf4j
public class SmsSender {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.basicQos(1);
Map<String, Object> quArgs = new HashMap<>();
quArgs.put("x-message-ttl", 10000);
channel.queueDeclare(RabbitConstant.QUEUE_SMS_TTL, true, false, false, quArgs);

channel.basicConsume(RabbitConstant.QUEUE_SMS_TTL, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String jsonSMS = new String(body);
log.info("SMSSender-短信发送成功:{}", jsonSMS);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}