RabbitMQ发布订阅模式(广播)

RabbitMQ 发布订阅(Fanout)模式介绍

发布订阅(Fanout)模式是RabbitMQ中的一种消息传递模式,属于交换机(Exchange)类型之一。在这种模式下,生产者将消息发送到一个Fanout类型的交换机,交换机会将消息广播到所有绑定到该交换机的队列中,而无需关心路由键(Routing Key)。每个消费者都可以从自己的队列中接收消息。
这种模式的特点是消息广播,即所有绑定到交换机的队列都会收到相同的消息副本。

应用场景

  1. 日志系统:
    生产者生成日志消息,不同的消费者(如日志存储服务、监控服务等)可以同时接收这些日志消息并进行处理。
  2. 实时通知系统:
    例如,用户在某个平台发布动态后,需要将动态推送给所有关注者。Fanout模式可以确保所有关注者的队列都能接收到消息。
  3. 广播消息:
    需要将某些全局配置更新或事件通知广播到所有相关服务时,Fanout模式非常适合。

流程图

image-20210702-3

总结

Fanout模式的核心在于广播机制,适用于需要将消息同时分发给多个消费者的场景。通过RabbitMQ的交换机和队列机制,可以轻松实现高效的消息广播功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.lixiang.rabbitmq.utils;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.SneakyThrows;

/**
* RabbitUtils 类用于创建和管理 RabbitMQ 的连接。
* 该类使用静态代码块初始化连接工厂,并提供一个静态方法来获取 RabbitMQ 连接。
*/
public class RabbitUtils {
/**
* 静态的 ConnectionFactory 实例,用于创建 RabbitMQ 连接。
*/
private static ConnectionFactory connectionFactory = new ConnectionFactory();

static {
// 设置 RabbitMQ 服务器的主机地址
connectionFactory.setHost("192.168.31.230");
// 设置 RabbitMQ 服务器的端口号,5672 是 RabbitMQ 的默认端口号
connectionFactory.setPort(5672);
// 设置连接 RabbitMQ 服务器的用户名
connectionFactory.setUsername("admin");
// 设置连接 RabbitMQ 服务器的密码
connectionFactory.setPassword("admin");
// 设置连接 RabbitMQ 服务器的虚拟主机
connectionFactory.setVirtualHost("my_vhost");
}

/**
* 获取 RabbitMQ 的连接。
* 该方法使用静态初始化的 ConnectionFactory 创建一个新的连接。
*
* @return 一个 RabbitMQ 的连接对象
* @throws Exception 如果在创建连接过程中发生错误
*/
@SneakyThrows
public static Connection getConnection() {
// 使用 ConnectionFactory 创建一个新的连接
Connection conn = connectionFactory.newConnection();
return conn;
}
}
1
2
3
4
5
6
7
package com.lixiang.rabbitmq.utils;

public class RabbitConstant {
public static final String QUEUE_BAIDU = "baidu";
public static final String QUEUE_SINA = "sina";
public static final String EXCHANGE_WEATHER_FANOUT = "weather.fanout";
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package com.lixiang.rabbitmq;

import java.time.LocalDate;
import java.util.Random;

import com.lixiang.rabbitmq.utils.RabbitConstant;
import com.lixiang.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class WeatherBureau {
@SneakyThrows
public static void main(String[] args) {
try (Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();) {

// 声明一个交换机,使用 RabbitMQ 通道的 exchangeDeclare 方法。
// 参数解释:
// 1. RabbitConstant.EXCHANGE_WEATHER:交换机的名称,由常量类 RabbitConstant 定义,用于后续消息的发布和路由。
// 2. "fanout":交换机的类型,这里是扇形(fanout)类型,这种类型会将接收到的消息广播到所有与之绑定的队列中。
// 3. true:表示该交换机是持久化的,即 RabbitMQ 服务器重启后,交换机不会丢失。
channel.exchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_FANOUT, "fanout", true);

// 声明一个队列,使用 RabbitMQ 通道的 queueDeclare 方法。
// 参数解释:
// 1. RabbitConstant.QUEUE_BAIDU:队列的名称,由常量类 RabbitConstant 定义,用于存储接收到的消息。
// 2. true:表示该队列是持久化的,即 RabbitMQ 服务器重启后,队列不会丢失,队列中的消息也会保留。
// 3. false:表示该队列不是排他性的,即多个连接可以同时访问该队列。
// 4. false:表示该队列在不再使用时不会自动删除。
// 5. null:表示不使用额外的队列参数。
channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, true, false, false, null);

// 将队列绑定到交换机上,使用 RabbitMQ 通道的 queueBind 方法。
// 参数解释:
// 1. RabbitConstant.QUEUE_BAIDU:要绑定的队列名称,由常量类 RabbitConstant 定义。
// 2. RabbitConstant.EXCHANGE_WEATHER:要绑定到的交换机名称,由常量类 RabbitConstant 定义。
// 3. "":绑定键,在扇形交换机中,绑定键会被忽略,因为扇形交换机会将消息广播到所有绑定的队列。
channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_FANOUT, "");

channel.queueDeclare(RabbitConstant.QUEUE_SINA, true, false, false, null);
channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_FANOUT, "");

Random random = new Random();

String[] cities = { "Beijing", "New York", "Tokyo", "Paris", "London" };
String[] countries = { "China", "USA", "Japan", "France", "UK" };
String[] weathers = { "Sunny", "Rainy", "Cloudy", "Snowy" };

while (true) {
String weather = weathers[random.nextInt(weathers.length)];
// 根据天气类型生成合理温度
int maxTemp, minTemp;
switch (weather) {
case "Sunny":
maxTemp = random.nextInt(11) + 25; // 25-35℃
minTemp = maxTemp - 10; // 日温差约10℃
break;
case "Rainy":
maxTemp = random.nextInt(6) + 20; // 20-25℃
minTemp = random.nextInt(10) + 10; // 10-20℃
break;
case "Cloudy":
maxTemp = random.nextInt(9) + 20; // 20-28℃
minTemp = random.nextInt(8) + 15; // 15-22℃
break;
default: // Snowy
maxTemp = random.nextInt(6) - 5; // -5~0℃
minTemp = random.nextInt(6) - 10; // -10~-5℃
}
int idx = random.nextInt(countries.length);
String json = String.format(
"{\"date\":\"%s\", \"country\":\"%s\", \"city\":\"%s\", \"weather\":\"%s\", " +
"\"minTemp\":%d, \"maxTemp\":%d}",
LocalDate.now(),
countries[idx],
cities[idx],
weather,
minTemp,
maxTemp);
log.info("天气信息:{}", json);
// 调用 RabbitMQ 通道的 basicPublish 方法,将生成的天气预报 JSON 数据发送到指定的交换机。
// 参数解释:
// 1. RabbitConstant.EXCHANGE_WEATHER:指定要将消息发布到的交换机名称,由常量类 RabbitConstant 定义。
// 2. "":路由键,发布订阅模式不需要。
// 3. null:消息的属性,传入 null 表示使用默认的消息属性。
// 4. json.getBytes():要发送的消息内容,将之前生成的 JSON 字符串转换为字节数组。
channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_FANOUT, "", null, json.getBytes());
Thread.sleep(1000);
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.lixiang.rabbitmq;

import com.lixiang.rabbitmq.utils.RabbitConstant;
import com.lixiang.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;

@Slf4j
public class Sina {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_SINA, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
log.info("新浪收到气象信息:{}", new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.lixiang.rabbitmq;

import com.lixiang.rabbitmq.utils.RabbitConstant;
import com.lixiang.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;

@Slf4j
public class Baidu {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_BAIDU, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
log.info("百度收到气象信息:{}", new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}