RabbitMQ路由模式与主题模式

RabbitMQ 路由模式(Direct)介绍

Direct模式是RabbitMQ中交换机(Exchange)的一种工作模式,通过精确匹配路由键(Routing Key)和绑定键(Binding Key),将消息路由到对应的队列。适用于需要根据特定规则精确分发消息的场景。

核心原理

  1. 生产者发送消息时指定一个路由键。
  2. Direct Exchange 将消息转发给所有绑定键(Binding Key)与路由键完全匹配的队列。
  3. 消费者监听特定队列,处理匹配的消息。

应用场景

  1. 日志分级处理
    不同级别的日志(如 error、info)分发到不同队列,由消费者独立处理。
  2. 订单类型路由
    根据订单类型(如 digital、physical)将消息路由到对应的业务处理服务。
  3. 消息分类通知
    例如,将sms、email消息分发到不同的通知通道。

image-20210703-1

注意事项

● 精确匹配:路由键和绑定键需完全一致。
● 多队列绑定:若多个队列绑定相同键(如两个队列均绑定error),消息会同时分发到这些队列(类似广播)。
● 性能:Direct模式效率高,适合需要精确控制的场景,但不支持通配符(需使用Topic模式)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package com.lixiang.rabbitmq;

import java.time.LocalDate;
import java.util.Random;

import com.lixiang.rabbitmq.utils.RabbitConstant;
import com.lixiang.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class WeatherProducer {
@SneakyThrows
public static void main(String[] args) {
try (Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();) {

/**
* 声明一个直连交换机。
*
* @param exchange 交换机的名称,使用常量类 RabbitConstant 中定义的 EXCHANGE_WEATHER_DIRECT。
* @param type 交换机的类型,这里指定为 "direct",表示直连交换机,根据路由键精确匹配队列。
* @param durable 是否持久化,设置为 true 表示该交换机在 RabbitMQ 服务器重启后不会丢失。
*/
channel.exchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_DIRECT, "direct", true);

// 声明一个队列,使用 RabbitMQ 通道的 queueDeclare 方法。
channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, true, false, false, null);


/**
* 将队列绑定到交换机,并指定路由键。
*
* @param queue 要绑定的队列名称,这里使用常量类 RabbitConstant 中定义的 QUEUE_BAIDU 队列。
* @param exchange 要绑定到的交换机名称,这里使用常量类 RabbitConstant 中定义的 EXCHANGE_WEATHER_DIRECT 交换机。
* @param routingKey 绑定的路由键,用于指定消息的路由规则,这里设置为 "baidu"。
*/
channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_DIRECT, "baidu");
channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_DIRECT, "duxiaoman");

channel.queueDeclare(RabbitConstant.QUEUE_SINA, true, false, false, null);
channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_DIRECT, "sina");
channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_DIRECT, "weibo");

Random random = new Random();
String[] cities = { "Beijing", "New York", "Tokyo", "Paris", "London" };
String[] countries = { "China", "USA", "Japan", "France", "UK" };
String[] weathers = { "Sunny", "Rainy", "Cloudy", "Snowy" };

while (true) {
String weather = weathers[random.nextInt(weathers.length)];
// 根据天气类型生成合理温度
int maxTemp, minTemp;
switch (weather) {
case "Sunny":
maxTemp = random.nextInt(11) + 25; // 25-35℃
minTemp = maxTemp - 10; // 日温差约10℃
break;
case "Rainy":
maxTemp = random.nextInt(6) + 20; // 20-25℃
minTemp = random.nextInt(10) + 10; // 10-20℃
break;
case "Cloudy":
maxTemp = random.nextInt(9) + 20; // 20-28℃
minTemp = random.nextInt(8) + 15; // 15-22℃
break;
default: // Snowy
maxTemp = random.nextInt(6) - 5; // -5~0℃
minTemp = random.nextInt(6) - 10; // -10~-5℃
}
int idx = random.nextInt(countries.length);
String json = String.format(
"{\"date\":\"%s\", \"country\":\"%s\", \"city\":\"%s\", \"weather\":\"%s\", " +
"\"minTemp\":%d, \"maxTemp\":%d}",
LocalDate.now(),
countries[idx],
cities[idx],
weather,
minTemp,
maxTemp);
String key = random.nextBoolean()?"baidu": "sina";
log.info("路由键:{},天气信息:{}",key, json);
// 调用 RabbitMQ 通道的 basicPublish 方法,将生成的天气预报 JSON 数据发送到指定的交换机。
// 参数解释:
// 1. RabbitConstant.EXCHANGE_WEATHER:指定要将消息发布到的交换机名称,由常量类 RabbitConstant 定义。
// 2. "":路由键,发布订阅模式不需要。
// 3. null:消息的属性,传入 null 表示使用默认的消息属性。
// 4. json.getBytes():要发送的消息内容,将之前生成的 JSON 字符串转换为字节数组。
channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_DIRECT, key, null, json.getBytes());
Thread.sleep(1000);
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.lixiang.rabbitmq;

import com.lixiang.rabbitmq.utils.RabbitConstant;
import com.lixiang.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;

@Slf4j
public class Baidu {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_BAIDU, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
log.info("百度收到气象信息:{}", new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.lixiang.rabbitmq;

import com.lixiang.rabbitmq.utils.RabbitConstant;
import com.lixiang.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;

@Slf4j
public class Sina {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_SINA, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
log.info("新浪收到气象信息:{}", new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}

RabbitMQ主题模式(TOPIC)

RabbitMQ 的主题模式(Topic Exchange)是一种支持基于消息路由键(Routing Key)进行灵活匹配的消息分发模式。它允许生产者发送带有特定路由键的消息,消费者通过绑定键(Binding Key)订阅符合规则的消息。
在主题模式中,路由键和绑定键可以包含多个单词,以点号(.)分隔,例如stock.usd.nyse或weather.europe.london。此外,绑定键支持两种通配符:
● *:匹配一个单词。
● #:匹配零个或多个单词。
工作原理

  1. 生产者发送消息时,指定一个路由键。
  2. 消息被发送到 Topic Exchange。
  3. Topic Exchange 根据绑定键的规则将消息路由到符合条件的队列。
    ○ 如果绑定键与路由键完全匹配,则消息会被路由到对应的队列。
    ○ 如果绑定键包含通配符,则会根据通配符规则进行匹配。

image-20210703-2

image-20210703-3

image-20210703-4

示例

假设有一个Topic Exchange,以下是绑定键和路由键的例子:
● 绑定键:*.orange.*
○ 匹配路由键:quick.orange.rabbit、lazy.orange.elephant
○ 不匹配路由键:orange、quick.orange.male.rabbit
● 绑定键:lazy.#
○ 匹配路由键:lazy.orange.elephant、lazy.brown.fox、lazy
○ 不匹配路由键:quick.orange.rabbit

应用场景

主题模式适用于需要对消息进行多维度分类和灵活订阅的场景。以下是一些典型的应用场景:

  1. 日志系统:
    ○ 路由键可以表示日志的来源和级别,例如 app.error、db.warning。
    ○ 消费者可以根据不同的需求订阅特定类型的日志,例如 *.error 订阅所有错误日志。
  2. 事件驱动架构:
    ○ 在微服务架构中,不同服务之间通过消息通信。
    ○ 主题模式允许服务根据事件类型和子类型灵活订阅感兴趣的消息,例如 order.created、payment.failed。
  3. 物联网(IoT)数据处理:
    ○ 设备上报的数据可以用路由键标识设备类型和位置,例如 sensor.temp.zone1。
    ○ 数据处理模块可以根据绑定键订阅特定区域或特定类型的传感器数据。

主题模式(TOPIC)与直接模式(DIRECT)的区别

image-20210703-5

具体对比

  1. 灵活性:
    ○ DIRECT 模式要求路由键和绑定键完全匹配,适用于消息分类较为固定的场景。
    ○ TOPIC 模式支持通配符,允许更灵活的消息订阅,适合需要动态分类和多维度过滤的场景。
  2. 性能:
    ○ DIRECT 模式的匹配逻辑简单,性能较高。
    ○ TOPIC 模式需要解析通配符规则,可能带来一定的性能开销,但通常在现代硬件上影响较小。
  3. 使用场景选择:
    ○ 如果消息分类明确且固定(例如订单状态更新),建议使用 DIRECT 模式。
    ○ 如果需要支持多条件组合订阅(例如日志系统或事件驱动架构),建议使用 TOPIC 模式。

总结

RabbitMQ 的主题模式(TOPIC)是一种强大的消息路由机制,通过支持通配符的方式提供了极大的灵活性,适合需要多维度分类和动态订阅的场景。而直接模式(DIRECT)则更加简单高效,适用于分类明确且固定的场景。在实际应用中,应根据业务需求选择合适的模式,从而实现高效的系统设计。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package com.lixiang.rabbitmq;

import java.time.LocalDate;
import java.util.Random;

import com.lixiang.rabbitmq.utils.RabbitConstant;
import com.lixiang.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class WeatherProducer {
@SneakyThrows
public static void main(String[] args) {
try (Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();) {

channel.exchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_TOPIC, "topic", true);

channel.queueDeclare(RabbitConstant.QUEUE_BEIJING, true, false, false, null);

channel.queueBind(RabbitConstant.QUEUE_BEIJING, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "city.Beijing.#");

channel.queueDeclare(RabbitConstant.QUEUE_PARIS, true, false, false, null);
channel.queueBind(RabbitConstant.QUEUE_PARIS, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "city.Paris.*");

Random random = new Random();

String[] cities = { "Beijing", "New York", "Tokyo", "Paris", "London" };
String[] countries = { "China", "USA", "Japan", "France", "UK" };
String[] weathers = { "Sunny", "Rainy", "Cloudy", "Snowy" };

while (true) {
String weather = weathers[random.nextInt(weathers.length)];
int maxTemp, minTemp;
switch (weather) {
case "Sunny":
maxTemp = random.nextInt(11) + 25; // 25-35℃
minTemp = maxTemp - 10; // 日温差约10℃
break;
case "Rainy":
maxTemp = random.nextInt(6) + 20; // 20-25℃
minTemp = random.nextInt(10) + 10; // 10-20℃
break;
case "Cloudy":
maxTemp = random.nextInt(9) + 20; // 20-28℃
minTemp = random.nextInt(8) + 15; // 15-22℃
break;
default: // Snowy
maxTemp = random.nextInt(6) - 5; // -5~0℃
minTemp = random.nextInt(6) - 10; // -10~-5℃
}
int idx = random.nextInt(countries.length);
String json = String.format(
"{\"date\":\"%s\", \"country\":\"%s\", \"city\":\"%s\", \"weather\":\"%s\", " +
"\"minTemp\":%d, \"maxTemp\":%d}",
LocalDate.now(),
countries[idx],
cities[idx],
weather,
minTemp,
maxTemp);
String routingKey = "city." + cities[idx] + ".weather";
log.info("路由键:{},天气信息:{}", routingKey, json);

channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC, routingKey, null, json.getBytes());
Thread.sleep(1000);
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.lixiang.rabbitmq;

import com.lixiang.rabbitmq.utils.RabbitConstant;
import com.lixiang.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;

@Slf4j
public class BeijingConsumer {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_BEIJING, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
log.info("收到北京气象信息:{}", new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.lixiang.rabbitmq;

import com.lixiang.rabbitmq.utils.RabbitConstant;
import com.lixiang.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;

@Slf4j
public class ParisConsumer {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_PARIS, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
log.info("收到巴黎气象信息:{}", new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}