RabbitMQ Producer Confirm与Return机制

RabbitMQ的Confirm(发布确认)机制和Return(消息退回)机制是保障消息可靠投递的两个重要机制,它们分别作用于消息传输的不同阶段,共同提升消息系统的可靠性。

image-20210703-6

一、Confirm机制(发布确认)

作用阶段:生产者发送消息到Exchange(交换机)的过程。
核心目标:确保消息被Broker(RabbitMQ服务端)成功接收。

工作原理:

  1. 开启Confirm模式:生产者通过设置信道(Channel)为Confirm模式,进入消息确认状态。
  2. 消息发送与异步确认:
    ○ 生产者发送消息到Exchange后,RabbitMQ会异步返回一个确认信号(Basic.Ack),表示消息已被Broker接收。
    ○ 若消息因路由失败(如Exchange不存在)或Broker内部错误未能处理,可能返回Basic.Nack(否定确认)或超时无响应。
  3. 确认类型:
    ○ 单条确认:每发送一条消息,等待一个确认结果。
    ○ 批量确认:累积多条消息后一次性确认,牺牲部分实时性以提升吞吐量。
    ○ 异步监听:通过回调函数处理确认结果,避免阻塞生产者线程。

应用场景:

● 防止消息在传输过程中丢失(如网络故障或Broker宕机)。
● 需确保消息至少到达Exchange的场景(如金融交易、订单提交)。

二、Return机制(消息退回)

作用阶段:消息从Exchange路由到队列的过程。
核心目标:处理无法路由到任何队列的消息。

工作原理:

  1. 启用退回机制:发送消息时需设置mandatory=true,告知Broker在路由失败时退回消息。
  2. 路由失败处理:
    ○ 当Exchange无法根据路由键(Routing Key)和绑定规则找到匹配的队列时,触发Return机制。
    ○ Broker通过Basic.Return命令将消息退回给生产者,附带失败原因(如NO_ROUTE)。
  3. 生产者处理:需监听Return回调,对退回消息进行日志记录、重定向或告警。

应用场景:

● 动态路由场景中防止消息静默丢失(如配置错误的路由键)。
● 需要严格监控路由状态的系统(如告警通知、审计日志)。

三、关键区别与协作

image-20210703-7

协作价值:

● Confirm失败:消息未到达Exchange(如Broker宕机),需生产者重发。
● Return触发:消息已到达Exchange但路由失败,需修复绑定关系或处理死信。
两者结合可覆盖消息从生产到消费的全链路可靠性,避免消息在传输和路由环节丢失。

四、最佳实践建议

  1. Confirm机制:
    ○ 异步监听确认结果,结合持久化(Persistent Messages)防止Broker重启丢数据。
    ○ 设计重试策略(如指数退避)处理Nack或超时场景。
  2. Return机制:
    ○ 始终设置mandatory=true,避免消息静默丢弃。
    ○ 实现退回消息的监控和告警,及时发现路由配置错误。
    通过合理配置Confirm与Return,可以构建高可靠的异步消息系统,平衡性能与数据一致性需求。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package com.lixiang.rabbitmq;

import java.time.LocalDate;
import java.util.Random;

import com.lixiang.rabbitmq.utils.RabbitConstant;
import com.lixiang.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class WeatherProducer {
static Random random = new Random();
static String[] cities = { "Beijing", "New York", "Tokyo", "Paris", "London" };
static String[] countries = { "China", "USA", "Japan", "France", "UK" };
static String[] weathers = { "Sunny", "Rainy", "Cloudy", "Snowy" };

private static String createJSON(int idx) {
String weather = weathers[random.nextInt(weathers.length)];
int maxTemp, minTemp;
switch (weather) {
case "Sunny":
maxTemp = random.nextInt(11) + 25; // 25-35℃
minTemp = maxTemp - 10; // 日温差约10℃
break;
case "Rainy":
maxTemp = random.nextInt(6) + 20; // 20-25℃
minTemp = random.nextInt(10) + 10; // 10-20℃
break;
case "Cloudy":
maxTemp = random.nextInt(9) + 20; // 20-28℃
minTemp = random.nextInt(8) + 15; // 15-22℃
break;
default: // Snowy
maxTemp = random.nextInt(6) - 5; // -5~0℃
minTemp = random.nextInt(6) - 10; // -10~-5℃
}

String json = String.format(
"{\"date\":\"%s\", \"country\":\"%s\", \"city\":\"%s\", \"weather\":\"%s\", " +
"\"minTemp\":%d, \"maxTemp\":%d}",
LocalDate.now(),
countries[idx],
cities[idx],
weather,
minTemp,
maxTemp);
return json;
}

@SneakyThrows
public static void main(String[] args) {
try (Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();) {

// 启用Confirm模式
channel.confirmSelect();
// 添加Confirm监听器
channel.addConfirmListener((deliveryTag, multiple) -> {
log.info("消息确认成功,编号:{},是否批量:{}", deliveryTag, multiple);
}, (deliveryTag, multiple) -> {
log.error("消息确认失败,编号:{},是否批量:{}", deliveryTag, multiple);
});

// 添加Return监听器
channel.addReturnListener((returnMessage) -> {
log.warn("消息无法路由! 应答码:{} 原因:{} 路由键:{}",
returnMessage.getReplyCode(),
returnMessage.getReplyText(),
returnMessage.getRoutingKey());
});

channel.exchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_TOPIC, "topic", true);
channel.queueDeclare(RabbitConstant.QUEUE_BEIJING, true, false, false, null);
channel.queueBind(RabbitConstant.QUEUE_BEIJING, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "city.Beijing.#");
channel.queueDeclare(RabbitConstant.QUEUE_PARIS, true, false, false, null);
channel.queueBind(RabbitConstant.QUEUE_PARIS, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "city.Paris.*");

while (true) {
int idx = random.nextInt(countries.length);
String routingKey = "city." + cities[idx] + ".weather";
String json = createJSON(idx);
log.info("路由键:{},天气信息:{}", routingKey, json);

// 修改发布消息的代码,添加mandatory参数
channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC,
routingKey,
/*
当 mandatory = true 时,如果消息无法路由到任何队列,消息会通过 `addReturnListener` 监听器返回
当 mandatory = false 时(默认值),无法路由的消息会被直接丢弃
*/
true,
null,
json.getBytes());
Thread.sleep(1000);
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.lixiang.rabbitmq;

import com.lixiang.rabbitmq.utils.RabbitConstant;
import com.lixiang.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;

@Slf4j
public class ParisConsumer {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_PARIS, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
log.info("收到巴黎气象信息:{}", new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.lixiang.rabbitmq;

import com.lixiang.rabbitmq.utils.RabbitConstant;
import com.lixiang.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;

@Slf4j
public class BeijingConsumer {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_BEIJING, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
log.info("收到北京气象信息:{}", new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}