RabbitMQ备用交换机(Alternate Exchange)

RabbitMQ备用交换机(Alternate Exchange)机制详解

image-20210708-1

什么是备用交换机?

备用交换机(Alternate Exchange)是RabbitMQ提供的一种优雅的消息处理机制,用于解决消息无法路由时的处理问题。当生产者发送的消息无法被路由到任何队列时,这些消息会被自动转发到预先配置的备用交换机,而不是被直接丢弃。

为什么需要备用交换机?

在RabbitMQ中,当消息无法路由到任何队列时,默认行为是直接丢弃这些消息。这可能导致以下问题:

  1. 重要消息丢失:无法路由的消息可能包含重要业务数据
  2. 问题排查困难:无法知道哪些消息被丢弃了
  3. 缺乏处理机制:无法对无法路由的消息进行后续处理
    备用交换机机制为解决这些问题提供了标准化的解决方案。

备用交换机的工作原理

  1. 配置阶段:在声明主交换机时,通过alternate-exchange参数指定一个备用交换机
  2. 消息路由阶段:当消息无法被主交换机的路由规则匹配到任何队列时
  3. 转发阶段:RabbitMQ自动将消息转发到备用交换机
  4. 最终处理:备用交换机将消息路由到绑定它的队列中

备用交换机的典型应用场景

  1. 死信处理:收集所有无法路由的消息进行统一处理
  2. 日志记录:记录所有无法路由的消息用于审计
  3. 异常通知:当消息无法路由时触发告警机制
  4. 消息重试:将无法路由的消息暂存后重新投递

备用交换机的设计建议

  1. 交换机类型选择:通常使用fanout类型,确保所有绑定队列都能收到消息
  2. 队列命名:使用清晰表明用途的名称,如unroutable.messages
  3. 消息处理:在备用队列的消费者中添加适当的处理逻辑
  4. 监控告警:对备用队列的消息量设置监控阈值

备用交换机与Return机制的对比

  1. Return机制:需要生产者设置mandatory=true并实现ReturnListener,适合需要即时反馈的场景
  2. 备用交换机:在服务端自动处理,无需生产者额外配置,适合长期稳定的无法路由消息处理

最佳实践建议

  1. 为每个主交换机配置专用的备用交换机
  2. 在备用队列的消费者中添加详细的日志记录
  3. 定期分析无法路由消息的原因,优化路由规则
  4. 对备用队列设置TTL或最大长度限制,防止堆积
    通过合理使用备用交换机机制,可以大大提高RabbitMQ消息系统的可靠性和可维护性。
1
2
3
4
5
6
7
8
9
10
package com.lixiang.rabbitmq.utils;

public class RabbitConstant {
public static final String QUEUE_BEIJING = "beijing";
public static final String QUEUE_PARIS = "paris";
public static final String QUEUE_WEATHER_UNROUTABLE = "weather.unroutable";
public static final String EXCHANGE_WEATHER_TOPIC = "weather.topic.1";
public static final String EXCHANGE_WEATHER_ALTERNATE = "weather.alternate";

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.lixiang.rabbitmq.utils;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.SneakyThrows;

/**
* RabbitUtils 类用于创建和管理 RabbitMQ 的连接。
* 该类使用静态代码块初始化连接工厂,并提供一个静态方法来获取 RabbitMQ 连接。
*/
public class RabbitUtils {
/**
* 静态的 ConnectionFactory 实例,用于创建 RabbitMQ 连接。
*/
private static ConnectionFactory connectionFactory = new ConnectionFactory();

static {
// 设置 RabbitMQ 服务器的主机地址
connectionFactory.setHost("192.168.31.230");
// 设置 RabbitMQ 服务器的端口号,5672 是 RabbitMQ 的默认端口号
connectionFactory.setPort(5672);
// 设置连接 RabbitMQ 服务器的用户名
connectionFactory.setUsername("admin");
// 设置连接 RabbitMQ 服务器的密码
connectionFactory.setPassword("admin");
// 设置连接 RabbitMQ 服务器的虚拟主机
connectionFactory.setVirtualHost("my_vhost");
}

/**
* 获取 RabbitMQ 的连接。
* 该方法使用静态初始化的 ConnectionFactory 创建一个新的连接。
*
* @return 一个 RabbitMQ 的连接对象
* @throws Exception 如果在创建连接过程中发生错误
*/
@SneakyThrows
public static Connection getConnection() {
// 使用 ConnectionFactory 创建一个新的连接
Connection conn = connectionFactory.newConnection();
return conn;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package com.lixiang.rabbitmq;

import java.time.LocalDate;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;

import com.lixiang.rabbitmq.utils.RabbitConstant;
import com.lixiang.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class WeatherProducer {
static Random random = new Random();
static String[] cities = { "Beijing", "New York", "Tokyo", "Paris", "London" };
static String[] countries = { "China", "USA", "Japan", "France", "UK" };
static String[] weathers = { "Sunny", "Rainy", "Cloudy", "Snowy" };

private static String createJSON(int idx) {
String weather = weathers[random.nextInt(weathers.length)];
int maxTemp, minTemp;
switch (weather) {
case "Sunny":
maxTemp = random.nextInt(11) + 25; // 25-35℃
minTemp = maxTemp - 10; // 日温差约10℃
break;
case "Rainy":
maxTemp = random.nextInt(6) + 20; // 20-25℃
minTemp = random.nextInt(10) + 10; // 10-20℃
break;
case "Cloudy":
maxTemp = random.nextInt(9) + 20; // 20-28℃
minTemp = random.nextInt(8) + 15; // 15-22℃
break;
default: // Snowy
maxTemp = random.nextInt(6) - 5; // -5~0℃
minTemp = random.nextInt(6) - 10; // -10~-5℃
}

String json = String.format(
"{\"date\":\"%s\", \"country\":\"%s\", \"city\":\"%s\", \"weather\":\"%s\", " +
"\"minTemp\":%d, \"maxTemp\":%d}",
LocalDate.now(),
countries[idx],
cities[idx],
weather,
minTemp,
maxTemp);
return json;
}

@SneakyThrows
public static void main(String[] args) {
try (Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();) {
// 声明备用交换机和队列
channel.exchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_ALTERNATE, "fanout", true);
channel.queueDeclare(RabbitConstant.QUEUE_WEATHER_UNROUTABLE, true, false, false, null);
channel.queueBind(RabbitConstant.QUEUE_WEATHER_UNROUTABLE,
RabbitConstant.EXCHANGE_WEATHER_ALTERNATE, "");

// 声明主交换机时添加alternate-exchange参数
Map<String, Object> exArgs = new HashMap<>();
exArgs.put("alternate-exchange", RabbitConstant.EXCHANGE_WEATHER_ALTERNATE);

channel.exchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_TOPIC, "topic", true, false, exArgs);
channel.queueDeclare(RabbitConstant.QUEUE_BEIJING, true, false, false, null);

channel.queueBind(RabbitConstant.QUEUE_BEIJING, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "city.Beijing.#");
channel.queueDeclare(RabbitConstant.QUEUE_PARIS, true, false, false, null);
channel.queueBind(RabbitConstant.QUEUE_PARIS, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "city.Paris.*");

while (true) {
int idx = random.nextInt(countries.length);
String routingKey = "city." + cities[idx] + ".weather";
String json = createJSON(idx);
log.info("路由键:{},天气信息:{}", routingKey, json);

// 修改发布消息的代码,添加mandatory参数
channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC,
routingKey,
/*
* 当 mandatory = true 时,如果消息无法路由到任何队列,消息会通过 `addReturnListener` 监听器返回
* 当 mandatory = false 时(默认值),无法路由的消息会被直接丢弃
*/
true,
null,
json.getBytes());
Thread.sleep(1000);
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.lixiang.rabbitmq;

import com.lixiang.rabbitmq.utils.RabbitConstant;
import com.lixiang.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;

@Slf4j
public class BeijingConsumer {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_BEIJING, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
log.info("收到北京气象信息:{}", new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.lixiang.rabbitmq;

import com.lixiang.rabbitmq.utils.RabbitConstant;
import com.lixiang.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;

@Slf4j
public class ParisConsumer {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_PARIS, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
log.info("收到巴黎气象信息:{}", new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.lixiang.rabbitmq;

import com.lixiang.rabbitmq.utils.RabbitConstant;
import com.lixiang.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;

@Slf4j
public class UnroutableConsumer {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_WEATHER_UNROUTABLE, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
log.info("收到未路由信息:{}", new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}