RabbitMQ死信队列DLX

死信交换机(Dead Letter Exchange, DLX)

死信交换机是RabbitMQ中用于管理那些因各种原因未能成功消费的消息的一种机制。当消息无法被消费者正常处理时,这些消息可被标记为“死信”(Dead Letter),随后将被转发至一个预设的交换机——即所谓的死信交换机,而非直接丢弃或无限制地重试。死信交换机本质上是一个标准的交换机,可以配置为直连型(direct)、主题型(topic)、扇出型(fanout)或头部匹配型(headers),其运作方式与普通交换机相同。
消息被视为死信并被路由到死信交换机的情形包括但不限于以下几种情况:

  1. 显式拒绝:若消费者通过basic.reject、basic.nack(针对AMQP 0-9-1协议版本)或是rejected outcome(对于AMQP 1.0协议版本)明确表示不接受某条消息,并且在调用上述方法时设置了requeue参数为false,则该消息将被视作死信。
  2. 消息存活时间超出:一旦消息设置了特定的生存期(TTL, Time-To-Live),并且在其所在队列中的停留时间超过了这个设定值,那么这条消息就会自动转变为死信状态。
  3. 队列容量溢出:如果某个队列定义了最大长度(x-max-length)或最大字节数量(x-max-length-bytes),而实际承载的消息数量或总大小超过了这些阈值,最早进入队列的消息可能会根据具体配置被移除或者转换成死信。
  4. 超过最大投递尝试次数:特别地,在仲裁队列(Quorum Queue)里,每当一条消息重新投送的次数达到了预先设定的最大尝试次数(delivery-limit),它同样会被认定为死信。
    值得注意的是,如果整个队列因为所有消息均超过其TTL而被系统自动清除,则其中包含的所有消息都不会经历成为死信的过程。

配置死信交换机

死信交换机的配置可以通过队列参数(Queue Arguments)或服务器策略(Policy)来实现。如果同时采用这两种方法进行配置,队列参数的优先级高于服务器策略。
使用队列参数(Queue Arguments)进行配置时,在Java客户端中声明队列的过程中,可以通过设置x-dead-letter-exchange参数来指定死信交换机。

1
2
3
4
5
6
7
channel.exchangeDeclare("dlx.exchange", "direct"); // 声明死信交换机

Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx.exchange"); // 设置死信交换机
args.put("x-dead-letter-routing-key", "error.key"); // 可选:设置死信路由键

channel.queueDeclare("my.queue", false, false, false, args); // 创建队列并绑定死信交换机

注意事项

为了确保消息能够正确进入死信处理流程,死信交换机必须在任何消息被标记为死信前预先创建。若此条件未满足,相关消息将被系统自动且无声地丢弃。

死信消息的路由机制

死信消息的具体路由方式由其所属队列的配置决定:
● 当队列已定义x-dead-letter-routing-key属性时,该消息将依据此键值被导向至指定的死信交换机。
● 若上述属性未设定,则采用消息初始发送时使用的路由键进行传递(尽管原始消息可能包含CC和BCC头部信息,但在转变为死信状态的过程中,BCC部分会被移除)。
示例说明:
● 假设一条消息最初通过foo作为路由键发布,并且所在队列没有特别设置死信路由键,则当它成为死信后,依旧会沿用foo作为路由键。
● 反之,如果队列中设置了x-dead-letter-routing-key: bar,则所有从该队列出站的死信都将使用bar作为新的路由键。

死信循环问题

当一个死信消息再次被分发回原队列时(例如因死信交换机缺乏明确的路由指示而导致消息经由默认交换机返回),RabbitMQ具备识别此类循环的能力,并采取措施终止循环,直接丢弃涉及的消息以避免无限重复的情况发生。

安全考量

鉴于死信处理本质上属于一种特殊形式的消息重发操作,因此存在一定的失败风险:
● 如果目标接收队列不可达(如由于仲裁队列未能达成必要的共识),那么尝试转发的死信将会被放弃,并生成相应的错误日志条目。
● 通常情况下,针对死信消息的重新投递过程并不激活发布者确认功能,这意味着在分布式环境中可能会出现数据丢失现象。然而,借助于仲裁队列的支持,可以实现至少一次交付保证,从而增强整个系统的可靠性。

消息头字段更新

一旦某条消息转换成死信状态,其携带的信息头会发生如下变更:
● exchange字段值更新为代表当前所使用的死信交换机名称。
● 根据队列配置情况,routing-key有可能变更为预设的死信路由键。
● CC与BCC相关的元数据不再保留。
● 此外,每经历一次死信事件,都会在消息头中追加x-death (适用于AMQP 0.9.1协议) 或 x-opt-deaths (适用于AMQP 1.0协议) 字段,记录下包括但不限于触发死信的原因、次数以及涉及到的队列等详细信息。

1
2
3
4
5
6
7
// 检查死信历史(AMQP 0.9.1)
List<Map<String, Object>> deaths = (List<Map<String, Object>>) headers.get("x-death");
if (deaths != null) {
Map<String, Object> lastDeath = deaths.get(0);
String reason = (String) lastDeath.get("reason"); // "rejected", "expired", etc.
String queue = (String) lastDeath.get("queue"); // 来源队列
}
1
2
3
4
5
6
7
8
package com.lixiang.rabbitmq.utils;

public class RabbitConstant {
public static final String QUEUE_SMS_DLX = "sms.dlx";
public static final String EXCHANGE_DLX = "exchange.dlx";
public static final String QUEUE_DLX = "queue.dlx";
public static final String ROUTING_KEY_DLX = "routing.key.dlx";
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package com.lixiang.rabbitmq.utils;

import java.util.HashMap;
import java.util.Map;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import lombok.SneakyThrows;

public class RabbitUtils {
private static ConnectionFactory connectionFactory = new ConnectionFactory();

static {
connectionFactory.setHost("192.168.31.230");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("my_vhost");
}

@SneakyThrows
public static Connection getConnection() {
Connection conn = connectionFactory.newConnection();
return conn;
}

@SneakyThrows
public static void init(Channel channel) {

// 声明死信交换机,类型为direct,持久化
channel.exchangeDeclare(RabbitConstant.EXCHANGE_DLX, "direct", true);
// 声明死信队列,持久化,非独占,不自动删除
channel.queueDeclare(RabbitConstant.QUEUE_DLX, true, false, false, null);
// 将死信队列绑定到死信交换机,使用指定的路由键
channel.queueBind(RabbitConstant.QUEUE_DLX, RabbitConstant.EXCHANGE_DLX, RabbitConstant.ROUTING_KEY_DLX);

// 创建队列参数Map
Map<String, Object> quArgs = new HashMap<>();
// 设置消息TTL(存活时间)为10秒(10000毫秒)
quArgs.put("x-message-ttl", 10000);
// 设置死信交换机
quArgs.put("x-dead-letter-exchange", RabbitConstant.EXCHANGE_DLX);
// 设置死信路由键
quArgs.put("x-dead-letter-routing-key", RabbitConstant.ROUTING_KEY_DLX);
// 声明短信队列,使用上面设置的参数
channel.queueDeclare(RabbitConstant.QUEUE_SMS_DLX, true, false, false, quArgs);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.lixiang.rabbitmq;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.google.gson.Gson;
import com.lixiang.rabbitmq.entity.SMS;
import com.lixiang.rabbitmq.utils.RabbitConstant;
import com.lixiang.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class OrderSystem {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
try (Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();) {

RabbitUtils.init(channel); // 使用工具类初始交换机、队列和绑定关系

for (int i = 100; i < 200; i++) {
SMS sms = new SMS("乘客" + i, "13900000" + i, "您的车票已预订成功");
String jsonSMS = new Gson().toJson(sms);
channel.basicPublish("", RabbitConstant.QUEUE_SMS_DLX, null, jsonSMS.getBytes());
}

log.info("订单信息已投递至Broker");
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.lixiang.rabbitmq;

import java.io.IOException;

import com.lixiang.rabbitmq.utils.RabbitConstant;
import com.lixiang.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class DlxConsumer {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();

RabbitUtils.init(channel); // 使用工具类初始交换机、队列和绑定关系

channel.basicConsume(RabbitConstant.QUEUE_DLX, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String deadLetterMsg = new String(body);
log.info("[死信消费者] 收到死信消息,使用其他渠道发送短信: {}", deadLetterMsg);
}
});
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.lixiang.rabbitmq;

import java.io.IOException;

import com.lixiang.rabbitmq.utils.RabbitConstant;
import com.lixiang.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class SmsSender {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();

channel.basicQos(1);
RabbitUtils.init(channel); // 使用工具类初始交换机、队列和绑定关系

channel.basicConsume(RabbitConstant.QUEUE_SMS_DLX, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String jsonSMS = new String(body);
log.info("SMSSender-短信发送成功:{}", jsonSMS);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}