RabbitMQ 的 RPC 模式介绍
RPC(Remote Procedure Call,远程过程调用)是一种常见的通信模式,允许客户端通过网络调用远程服务器上的方法或服务,就像调用本地函数一样。RabbitMQ提供了对 RPC 模式的原生支持,使得开发者可以通过消息队列实现高效的分布式系统。
在RabbitMQ的RPC模式中,客户端发送一个请求消息到服务器,服务器处理请求后返回响应消息给客户端。这种模式的核心思想是通过消息队列实现异步的远程调用,并确保请求和响应能够正确匹配。

RPC模式的工作流程
- 客户端发送请求:
○ 客户端创建一个临时的专属队列(通常称为“回调队列”),用于接收服务器的响应。
○ 客户端将请求消息发送到一个指定的队列(通常是服务器监听的队列),并在消息中附带回调队列的名称以及一个唯一标识符(如 correlation_id),以便后续匹配响应。
- 服务器处理请求:
○ 服务器从请求队列中消费消息,执行相应的业务逻辑。
○ 处理完成后,服务器将结果封装为响应消息,并将其发送到客户端指定的回调队列中。响应消息中会包含与请求相同的 correlation_id,以确保客户端能够正确匹配请求和响应。
- 客户端接收响应:
○ 客户端从回调队列中消费响应消息,并根据 correlation_id 找到对应的请求。
○ 客户端处理响应数据,完成整个 RPC 调用。
关键概念
● 回调队列(Callback Queue):
客户端创建的临时队列,用于接收服务器的响应消息。每个客户端通常都有自己的回调队列,以避免消息混淆。
● Correlation ID:
每个请求消息都附带一个唯一的标识符(correlation_id),用于在客户端匹配请求和响应。
● 消息确认机制:
RabbitMQ 提供了消息确认机制(ACK/NACK),确保消息不会丢失。例如,服务器可以确认已成功处理请求,客户端可以确认已成功接收到响应。
RPC模式的应用场景
- 微服务架构:
在微服务架构中,不同服务之间需要通过网络进行通信。RPC模式可以帮助服务之间高效地传递请求和响应,而无需直接暴露服务的内部细节。
- 分布式计算:
当需要将计算任务分发到多个节点时,RPC模式可以用来协调任务分发和结果收集。
- API 网关:
API 网关可以通过 RPC模式与后端服务通信,从而实现负载均衡、请求路由和响应聚合。
- 实时数据处理:
在需要实时处理用户请求的场景中(如在线游戏、聊天应用等),RPC模式可以快速响应用户的操作请求。
- 异步任务调度:
当某些任务需要异步执行时,RPC模式可以用来提交任务并获取最终结果。例如,文件上传后的处理、图片压缩等。
总结
RabbitMQ的RPC模式是一种强大的通信机制,特别适用于分布式系统中的远程调用。它通过消息队列实现了请求和响应的解耦,提供了高可靠性和灵活性。然而,在使用RPC模式时需要注意以下几点:
● 性能开销: 由于涉及消息的序列化、网络传输和反序列化,RPC模式可能会引入一定的延迟。
● 错误处理: 需要设计完善的错误处理机制,以应对网络故障、超时等问题。
● 安全性: 在生产环境中,建议使用 TLS 加密通信,确保消息的安全性。
通过合理设计和优化,RabbitMQ 的 RPC 模式可以成为构建高效分布式系统的有力工具。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
| package com.lixiang.rabbitmq.client;
import com.lixiang.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.*;
import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j;
import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue;
@Slf4j public class RPCClient implements AutoCloseable {
private String requestQueueName = "rpc_queue";
@SneakyThrows public String call(String message) throws IOException, InterruptedException { try ( Connection connection = RabbitUtils.getConnection(); Channel channel = connection.createChannel();) { channel.queueDeclare(requestQueueName, true, false, false, null); final String corrId = UUID.randomUUID().toString();
String replyQueueName = channel.queueDeclare().getQueue(); log.info("replyQueueName:{},correlationId:{}", replyQueueName,corrId);
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(); channel.basicPublish("", requestQueueName, props, message.getBytes(StandardCharsets.UTF_8)); final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> { if (delivery.getProperties().getCorrelationId().equals(corrId)) { response.offer(new String(delivery.getBody(), StandardCharsets.UTF_8)); } }, consumerTag -> { }); log.info("ctag:{}", ctag);
String result = response.take(); channel.basicCancel(ctag);
return result; } }
@SneakyThrows public static void main(String[] argv) { try (RPCClient fibonacciRpc = new RPCClient()) { for (int i = 0; i < 32; i++) { String i_str = Integer.toString(i); System.out.println(" [x] Requesting fib(" + i_str + ")"); String response = fibonacciRpc.call(i_str); System.out.println(" [.] Got '" + response + "'"); } } }
@Override public void close() throws Exception { log.info("close()"); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
| package com.lixiang.rabbitmq.server;
import com.lixiang.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.*;
import java.nio.charset.StandardCharsets;
public class RPCServer { private static final String RPC_QUEUE_NAME = "rpc_queue";
public static void main(String[] argv) throws Exception {
try (Connection connection = RabbitUtils.getConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(RPC_QUEUE_NAME, true, false, false, null); channel.basicQos(1);
System.out.println(" [x] Awaiting RPC requests");
Object monitor = new Object();
DeliverCallback deliverCallback = (consumerTag, delivery) -> { AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder() .correlationId(delivery.getProperties().getCorrelationId()) .build();
String response = "";
try { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); int n = Integer.parseInt(message);
System.out.println(" [.] fib(" + message + ")"); response += fib(n); } catch (RuntimeException e) { System.out.println(" [.] " + e.toString()); } finally {
channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes(StandardCharsets.UTF_8));
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); synchronized (monitor) { monitor.notify(); } } };
channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> { })); while (true) { synchronized (monitor) { try { monitor.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
private static int fib(int n) { if (n == 0) return 0; if (n == 1) return 1; return fib(n - 1) + fib(n - 2); } }
|