RabbitMQ RPC模式

RabbitMQ 的 RPC 模式介绍

RPC(Remote Procedure Call,远程过程调用)是一种常见的通信模式,允许客户端通过网络调用远程服务器上的方法或服务,就像调用本地函数一样。RabbitMQ提供了对 RPC 模式的原生支持,使得开发者可以通过消息队列实现高效的分布式系统。
在RabbitMQ的RPC模式中,客户端发送一个请求消息到服务器,服务器处理请求后返回响应消息给客户端。这种模式的核心思想是通过消息队列实现异步的远程调用,并确保请求和响应能够正确匹配。

image-20210708-2

RPC模式的工作流程

  1. 客户端发送请求:
    ○ 客户端创建一个临时的专属队列(通常称为“回调队列”),用于接收服务器的响应。
    ○ 客户端将请求消息发送到一个指定的队列(通常是服务器监听的队列),并在消息中附带回调队列的名称以及一个唯一标识符(如 correlation_id),以便后续匹配响应。
  2. 服务器处理请求:
    ○ 服务器从请求队列中消费消息,执行相应的业务逻辑。
    ○ 处理完成后,服务器将结果封装为响应消息,并将其发送到客户端指定的回调队列中。响应消息中会包含与请求相同的 correlation_id,以确保客户端能够正确匹配请求和响应。
  3. 客户端接收响应:
    ○ 客户端从回调队列中消费响应消息,并根据 correlation_id 找到对应的请求。
    ○ 客户端处理响应数据,完成整个 RPC 调用。

关键概念

● 回调队列(Callback Queue):
客户端创建的临时队列,用于接收服务器的响应消息。每个客户端通常都有自己的回调队列,以避免消息混淆。
● Correlation ID:
每个请求消息都附带一个唯一的标识符(correlation_id),用于在客户端匹配请求和响应。
● 消息确认机制:
RabbitMQ 提供了消息确认机制(ACK/NACK),确保消息不会丢失。例如,服务器可以确认已成功处理请求,客户端可以确认已成功接收到响应。

RPC模式的应用场景

  1. 微服务架构:
    在微服务架构中,不同服务之间需要通过网络进行通信。RPC模式可以帮助服务之间高效地传递请求和响应,而无需直接暴露服务的内部细节。
  2. 分布式计算:
    当需要将计算任务分发到多个节点时,RPC模式可以用来协调任务分发和结果收集。
  3. API 网关:
    API 网关可以通过 RPC模式与后端服务通信,从而实现负载均衡、请求路由和响应聚合。
  4. 实时数据处理:
    在需要实时处理用户请求的场景中(如在线游戏、聊天应用等),RPC模式可以快速响应用户的操作请求。
  5. 异步任务调度:
    当某些任务需要异步执行时,RPC模式可以用来提交任务并获取最终结果。例如,文件上传后的处理、图片压缩等。

总结

RabbitMQ的RPC模式是一种强大的通信机制,特别适用于分布式系统中的远程调用。它通过消息队列实现了请求和响应的解耦,提供了高可靠性和灵活性。然而,在使用RPC模式时需要注意以下几点:
● 性能开销: 由于涉及消息的序列化、网络传输和反序列化,RPC模式可能会引入一定的延迟。
● 错误处理: 需要设计完善的错误处理机制,以应对网络故障、超时等问题。
● 安全性: 在生产环境中,建议使用 TLS 加密通信,确保消息的安全性。
通过合理设计和优化,RabbitMQ 的 RPC 模式可以成为构建高效分布式系统的有力工具。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package com.lixiang.rabbitmq.client;

import com.lixiang.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
* RPCClient 类用于实现基于 RabbitMQ 的远程过程调用(RPC)客户端。
* 该类实现了 AutoCloseable 接口,确保资源可以正确关闭。
*/
@Slf4j
public class RPCClient implements AutoCloseable {

// 请求队列的名称
private String requestQueueName = "rpc_queue";

/**
* 向 RPC 服务端发送请求并等待响应。
*
* @param message 要发送的请求消息
* @return 服务端返回的响应消息
* @throws IOException 如果在与 RabbitMQ 通信时发生 I/O 错误
* @throws InterruptedException 如果在等待响应时线程被中断
*/

@SneakyThrows
public String call(String message) throws IOException, InterruptedException {
try (
// 从工具类获取 RabbitMQ 连接
Connection connection = RabbitUtils.getConnection();
// 创建一个新的通道
Channel channel = connection.createChannel();) {
// 声明请求队列,如果队列不存在则创建
channel.queueDeclare(requestQueueName, true, false, false, null);
// 生成一个唯一的关联 ID,用于匹配请求和响应
final String corrId = UUID.randomUUID().toString();

// 声明一个临时的回复队列,并获取其名称
String replyQueueName = channel.queueDeclare().getQueue();
log.info("replyQueueName:{},correlationId:{}", replyQueueName,corrId);

// 构建 AMQP 消息属性,设置关联 ID 和回复队列
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.correlationId(corrId)
// 设置回复队列,服务端将响应发送到该队列
.replyTo(replyQueueName)
.build();

// 发布请求消息到请求队列
channel.basicPublish("", requestQueueName, props, message.getBytes(StandardCharsets.UTF_8));


// 创建一个阻塞队列,用于存储响应消息
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);

/**
* 开始消费回复队列中的消息,此方法会返回一个消费者标签。
*
* @param replyQueueName 要消费的回复队列的名称
* @param true 表示自动确认消息,即消费者接收到消息后,RabbitMQ 会自动将消息标记为已消费
* @param (consumerTag, delivery) -> {...} 消息消费的回调函数,当接收到消息时会执行此函数
* @param consumerTag -> {...} 消费者取消时的回调函数,这里为空实现
* @return 消费者标签,用于后续取消消费等操作
*/
String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
// 检查响应消息的关联 ID 是否与请求的关联 ID 匹配
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
// 将响应消息添加到阻塞队列中,方便后续取出处理
response.offer(new String(delivery.getBody(), StandardCharsets.UTF_8));
}
}, consumerTag -> {
// 消费者取消时的回调,这里为空实现
});
log.info("ctag:{}", ctag);

// 从阻塞队列中获取响应消息,如果队列为空则阻塞等待
String result = response.take();
// 取消对回复队列的消费,不再对回复队列进行监听
channel.basicCancel(ctag);

return result;
}
}

/**
* 程序入口,用于测试 RPCClient 类。
* 发送一系列斐波那契数列请求并打印响应。
*
* @param argv 命令行参数
*/
@SneakyThrows
public static void main(String[] argv) {
try (RPCClient fibonacciRpc = new RPCClient()) {
// 循环发送 32 个请求
for (int i = 0; i < 32; i++) {
// 将整数转换为字符串
String i_str = Integer.toString(i);
// 打印请求信息
System.out.println(" [x] Requesting fib(" + i_str + ")");
// 调用 call 方法发送请求并获取响应
String response = fibonacciRpc.call(i_str);
// 打印响应信息
System.out.println(" [.] Got '" + response + "'");
}
}
}

/**
* 关闭资源的方法,当前未实现。
*
* @throws Exception 如果关闭资源时发生错误
*/
@Override
public void close() throws Exception {
// TODO Auto-generated method stub
log.info("close()");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package com.lixiang.rabbitmq.server;

import com.lixiang.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;

import java.nio.charset.StandardCharsets;


public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";

public static void main(String[] argv) throws Exception {


try (Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(RPC_QUEUE_NAME, true, false, false, null);
channel.basicQos(1);

System.out.println(" [x] Awaiting RPC requests");

Object monitor = new Object();
/**
* 定义一个 DeliverCallback 回调函数,用于处理从队列中接收到的消息。
* 当 RabbitMQ 服务器将消息传递给消费者时,会调用这个回调函数。
*
* @param consumerTag 消费者的标签,用于标识这个消费者。
* @param delivery 包含接收到的消息的详细信息,如消息体、属性等。
*/
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
// 构建一个 AMQP 消息属性对象,用于在回复消息时携带必要的信息
// 使用 AMQP.BasicProperties.Builder 来创建属性对象
AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()
// 设置关联 ID,该 ID 与接收到的消息的关联 ID 相同
// 关联 ID 用于将请求消息和响应消息进行关联,确保客户端能正确匹配响应
.correlationId(delivery.getProperties().getCorrelationId())
// 构建最终的 AMQP 消息属性对象
.build();

String response = "";

try {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
int n = Integer.parseInt(message);

System.out.println(" [.] fib(" + message + ")");
response += fib(n);
} catch (RuntimeException e) {
System.out.println(" [.] " + e.toString());
} finally {
/**
* 向客户端发送响应消息。
* exchange 参数为空字符串,表示使用默认的交换器。
* delivery.getProperties().getReplyTo() 为客户端指定的回复队列名称,响应消息将被发送到该队列。
* replyProps 是之前构建好的包含关联 ID 的消息属性,用于客户端匹配请求和响应。
* response.getBytes(StandardCharsets.UTF_8) 将响应字符串转换为 UTF-8 编码的字节数组进行发送。
*/
channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps,
response.getBytes(StandardCharsets.UTF_8));
/**
* 向 RabbitMQ 服务器确认已成功处理消息。
* delivery.getEnvelope().getDeliveryTag() 是消息的唯一标识,用于指定要确认的消息。
* 第二个参数为 false 表示只确认当前消息,而不是批量确认。
* 确认消息后,RabbitMQ 服务器会将该消息从队列中移除。
*/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
// RabbitMq consumer worker thread notifies the RPC server owner thread
synchronized (monitor) {
monitor.notify();
}
}
};

channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> {
}));
// Wait and be prepared to consume the message from RPC client.
while (true) {
synchronized (monitor) {
try {
monitor.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}

private static int fib(int n) {
if (n == 0)
return 0;
if (n == 1)
return 1;
return fib(n - 1) + fib(n - 2);
}
}