RabbitMQ队列与消息TTL
队列 TTL(Time To Live)
通过 RabbitMQ,用户能够为消息及队列配置TTL(Time To Live,生存时间)参数或策略。TTL定义了消息或队列在系统中的最大存活期限。
对于消息而言,其TTL决定了该消息在被消费之前可以在队列中保留的最大时长。一旦消息停留的时间超过了设定的TTL值,则该消息将被视为过期,并从队列中移除。“移除”意味着这条消息既不会被分发给任何订阅者,也无法通过调用basic.get方法直接从队列中获取。值得注意的是,消息TTL可以针对单个消息、特定队列或是整个队列组进行设置。
此外,TTL不仅限于应用于消息层面,还可以对整个队列生效。这一特性与自动删除属性相结合使用时尤为有用。特别是对于非持久化的经典队列来说,设置队列级别的TTL通常更加有意义;然而,需指出的是流式队列并不支持这种过期机制。当一个队列处于未被活跃使用的状态时(即没有消费者在线连接到该队列),它会在达到指定的TTL后自动删除。
TTL的行为可以通过可选的队列参数来调控,而最推荐的做法是通过策略来进行这些配置。同时,管理员也可以通过制定策略来强制执行某些TTL相关的规则。
队列级别的消息 TTL
可通过策略设置 message-ttl 参数,或在声明队列时指定相同参数来为特定队列设置消息 TTL。
在队列中停留时间超过配置 TTL 的消息将被视为过期。注意,路由到多个队列的消息可能在每个队列中以不同时间过期(或完全不过期)。某队列中消息的过期不会影响其他队列中的同一消息。
服务器保证过期消息不会通过 basic.deliver 投递(给消费者),也不会在响应轮询消费者时发送(在 basic.get-ok 响应中)。
此外,服务器会尝试在消息基于 TTL 的过期时间到达时或之后尽快移除它们。
TTL 参数或策略的值必须是非负整数(≥0),以毫秒为单位描述 TTL 时长。例如,值 1000 表示消息在队列中存活 1 秒或直到被投递给消费者。该参数可以是 AMQP 0-9-1 类型的 short-short-int、short-int、long-int 或 long-long-int。
使用策略为队列定义消息 TTL
以下 Java 示例创建一个消息最多保留 60 秒的队列:
1 | Map<String, Object> args = new HashMap<>(); |
可以对已包含消息的队列应用消息 TTL 策略,但需注意以下情况:
● 当消息被重新入队时(例如使用带 requeue 参数的 AMQP 方法,或因通道关闭),其原始过期时间会被保留。
● 将 TTL 设置为 0 会使消息在到达队列时立即过期,除非能立即投递给消费者。这提供了替代 RabbitMQ 服务器不支持的 immediate 发布标志的方案。与该标志不同,不会生成 basic.returns,如果设置了死信交换,消息会被死信化。
发布者设置的逐条消息 TTL
在发布消息时,可以通过设置 expiration 属性来实现每条消息的生存时间(TTL)。expiration 字段应以毫秒为单位指定 TTL 的持续时间,这一要求与 x-message-ttl 参数的规定一致。需要注意的是,expiration 字段必须采用字符串形式提供,即代理仅接受该字段值作为数字的字符串表示。
如果同时设置了队列级别的 TTL 以及单个消息级别的 TTL,则实际生效的将是两者中较小的那个值。
以下 Java 示例使用 RabbitMQ 客户端发布一条最多保留 60 秒的消息:
1 | byte[] messageBodyBytes = "Hello, world!".getBytes(); |
逐条消息 TTL 与死信机制
仅当过期消息位于队列头部时,才会被实际删除(即标记为已删除)。消费者不会接收到这些过期的消息。需要注意的是,在消息过期与消费者接收之间可能存在一种自然的竞争状态,例如,一条消息可能在发送到套接字之后但在到达消费者之前就已经过期。
对于设置了逐条消息时间生存期(TTL)的情况,过期的消息可能会累积在尚未过期的消息之后,直到后者被消费或同样过期为止。因此,这些过期但未被清理的消息将继续占用系统资源,并且会被计入队列的统计信息中,比如队列中的总消息数量。
当对现有队列追溯性地应用逐条消息TTL策略时,建议保持消费者在线以加速过期消息的清除过程。
鉴于上述针对单个消息设置TTL的行为特点,在需要通过删除消息来释放存储空间等资源的情况下,推荐采用整个队列级别的TTL设置方法,或者考虑使用队列清空、队列删除等功能作为替代方案。
队列 TTL
时间至存活(TTL)特性不仅适用于队列中的消息,也适用于队列本身。这一机制能够与自动删除队列属性协同工作,从而实现更灵活的资源管理。
对于队列级别的 TTL(即过期设置),主要针对的是瞬态(非持久化)的经典队列类型;而流式队列则不支持此类过期功能。当一个队列处于未使用状态时,它将在预设的时间后被标记为可删除状态——这里“未使用”的定义是指该队列当前没有任何活跃消费者、最近没有被重新声明(重新声明操作会重置其生存期限),并且在过去的一个完整过期周期内没有执行过 basic.get 操作。这种配置特别适合于如远程过程调用(RPC)响应队列这样的场景,其中大量创建的队列可能最终并不需要实际处理任何消息。
系统确保在至少达到一次完整的设定过期时间之后,如果队列仍然保持未使用状态,则会被安全地移除。但是,请注意,虽然可以保证超过过期时限后队列将被安排删除,但并不能保证其会在恰好到达那个时刻立即被清除。
无论是通过 queue.declare 方法中指定 x-expires 参数还是通过设置 expires 策略来定义队列的过期行为,所给定的值都必须是以毫秒为单位的正整数(不同于消息级别 TTL 允许设置为 0 的情况)。例如,设定值为 1000 意味着如果该队列连续 1 秒钟未被访问,则满足了自动删除条件。
1 | Map<String, Object> args = new HashMap<>(); |
1 | package com.lixiang.rabbitmq.utils; |
1 | package com.lixiang.rabbitmq; |
1 | package com.lixiang.rabbitmq; |