RabbitMQ Streams流处理

RabbitMQ Streams概述

RabbitMQ Streams是一种持久化且可复制的数据结构,旨在执行与传统队列相同的功能:即作为生产者发送的消息与消费者接收消息之间的缓冲区。尽管如此,Streams在消息的存储机制和消费模式上与传统队列存在显著差异:
● 非破坏性消费语义:Stream模仿了一个仅追加的日志模型,其中的消息可以被重复读取直至其自然过期。这一特性意味着所有通过Stream传输的信息都将保持持久性和跨节点复制状态。
● 灵活的消息访问:不同于队列中消息一旦被消费便会被移除的行为,在Stream中,一个或多个订阅者能够独立地多次访问同一条信息。
使用RabbitMQ Stream的方式有两种:一是通过标准的RabbitMQ客户端库;二是采用专为此设计的二进制协议插件及配套客户端。后者不仅全面支持Stream的所有高级功能,还确保了最佳的数据处理性能。

应用场景

RabbitMQ Stream技术的发展初衷在于解决以下几种传统队列难以高效应对的问题场景:

  1. 广泛分发(大规模扇出):对于需要向众多订阅者广播同一消息的情况,现有方法要求为每位订阅者单独分配一个队列,这在面对大量受众时显得效率低下。而利用Stream,则可以让多位订阅者以不相互干扰的方式共享同一份数据源,无需额外设置多个队列。
  2. 历史消息重放:鉴于当前RabbitMQ中的队列类型都遵循“消耗即删除”的原则,因此无法实现对已消费消息的回溯查询。相比之下,Stream允许用户定位到日志中的任意点,并从该位置开始读取内容。
  3. 高性能吞吐量:没有一种现有的持久化队列解决方案能够在吞吐能力方面与基于日志架构的消息系统相匹敌。Stream正是针对这一点进行了专门优化。
  4. 大容量存储需求:多数RabbitMQ队列倾向于维持低占用状态,当面临海量未处理消息时,其性能往往会受到影响。Stream则采取了更为有效的策略来管理大量待处理项目,同时将内存使用率控制在较低水平。

创建RabbitMQ Stream

要创建一个新的Stream,您需要将x-queue-type参数设置为stream,而非默认值classic。值得注意的是,此配置项必须在首次声明队列时由客户端明确指定,不能通过策略进行后期修改或设定。这是因为虽然策略可以动态调整,但队列类型一旦确定后便不可更改。下面是一个使用AMQP 0.9.1 Java客户端建立Stream的示例代码片段。

1
2
3
4
5
6
7
8
9
ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(
"my-stream",
true, // 持久化
false, false, // 非独占,非自动删除
Collections.singletonMap("x-queue-type", "stream")
);

带有x-queue-type参数设置为stream的队列,将在每个配置的RabbitMQ节点上创建一个具有副本的流。鉴于流基于仲裁机制运行,强烈推荐使用奇数个节点组成的集群来确保系统的稳定性和数据的一致性。
尽管流本质上仍然是遵循AMQP 0.9.1协议定义的队列,这意味着它们可以像其他任何RabbitMQ队列一样,在创建后与任意交换器进行绑定操作。
此外,针对流型队列,还支持一系列额外的配置选项,这些选项既可以直接作为队列声明的一部分指定,也可以通过策略来设定:
● x-max-length-bytes:此参数用于限制流的最大存储容量(以字节为单位)。详情请参考保留策略文档。默认情况下不设上限。
● x-max-age:用来定义流中消息能够存在的最长时限。同样地,请查阅相关保留策略说明获取更多信息。默认值表示无时间限制。
● x-stream-max-segment-size-bytes:该属性决定了流在磁盘上的分割文件大小(单位:字节),其中流由多个固定大小的段组成。其标准设置为5亿字节。
● x-stream-filter-size-bytes:指定了用于过滤目的Bloom过滤器的尺寸(以字节计)。允许的有效范围是从16到255字节,而初始配置通常采用最小值16字节。
值得注意的是,尽管可以通过策略调整x-stream-max-segment-size-bytes和x-stream-filter-size-bytes这两个参数,但只有当相应策略在流首次声明时就已经存在的情况下才会生效。对于已有的流而言,即使更新了匹配策略中的上述参数设置,实际上也不会对现有流产生影响。因此,为了保证预期行为,建议直接通过队列声明过程中的参数来管理这些配置。
下面提供了一个Java语言编写的示例代码片段,演示了如何在应用层面上于流创建时指定这些特定参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-queue-type", "stream");
// 最大流大小:20 GB
arguments.put("x-max-length-bytes", 20_000_000_000);
// 段文件大小:100 MB
arguments.put("x-stream-max-segment-size-bytes", 100_000_000);
// 流Bloom过滤器大小:32
arguments.put("x-stream-filter-size-bytes", 32);

channel.queueDeclare(
"my-stream",
true, // 持久化
false, false, // 非独占,非自动删除
arguments
);

客户端操作

消费
鉴于流机制不会删除任何消息,因此所有消费者皆可从日志中的任意位置开始读取或消费消息。此过程受控于x-stream-offset这一消费者参数。若未特别指定,则默认情况下,消费者将从其启动之后写入日志的下一个偏移量处开始读取。支持的参数值包括但不限于:
● first:指示从日志中最早可用的消息开始读取。
● last:意味着从最新写入的“消息块”开始读取;这里所谓的“块”,是指流中用于存储和传输的一组消息集合,其大小根据输入情况可能涵盖数条至数千条不等的消息。
● next:等同于未指定偏移量的情况。
● Offset:允许直接指定一个确切的数值偏移量。如果给定的偏移量在日志内不存在,则会自动调整到最接近的有效偏移量,即要么是日志起始处要么是末尾。
● Timestamp:通过提供具体的时间戳来确定起始读取位置。该时间戳会被映射到最邻近的实际存在之偏移量上;如若提供的时刻超出了现有记录范围,则同样会自动调整至日志首尾之一。对于采用AMQP 0.9.1协议的情况,所使用的时间戳为POSIX标准下的秒级精度时间(自1970年1月1日零时起算)。值得注意的是,在这种模式下,用户可能会接收到早于所设定时间点之前发布的消息。
● Interval:允许基于当前时间加上一段相对时间间隔来定位起始阅读点。此选项遵循与x-max-age保留策略相同的定义方式。
下面是一个利用first偏移量设置的具体示例代码片段:

1
2
3
4
5
6
7
8
9
10
11
channel.basicQos(100); // 必须指定QoS
channel.basicConsume(
"my-stream",
false,
Collections.singletonMap("x-stream-offset", "first"), // "first"偏移量规范
(consumerTag, message) -> {
// 消息处理
// ...
channel.basicAck(message.getEnvelope().getDeliveryTag(), false); // 需要确认
},
consumerTag -> { });

以下代码片段展示如何指定特定偏移量进行消费:

1
2
3
4
5
6
7
8
9
10
11
channel.basicQos(100); // 必须指定QoS
channel.basicConsume(
"my-stream",
false,
Collections.singletonMap("x-stream-offset", 5000), // 偏移量值
(consumerTag, message) -> {
// 消息处理
// ...
channel.basicAck(message.getEnvelope().getDeliveryTag(), false); // 需要确认
},
consumerTag -> { });

以下代码片段展示如何指定特定时间戳进行消费:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 一小时前
Date timestamp = new Date(System.currentTimeMillis() - 60 * 60 * 1_000)
channel.basicQos(100); // 必须指定QoS
channel.basicConsume(
"my-stream",
false,
Collections.singletonMap("x-stream-offset", timestamp), // 时间戳偏移量
(consumerTag, message) -> {
// 消息处理
// ...
channel.basicAck(message.getEnvelope().getDeliveryTag(), false); // 需要确认
},
consumerTag -> { });

其他流操作

以下操作可按照与经典队列及仲裁队列相似的方式执行,但其中部分操作具有特定于该类型队列的独特行为特征:
● 声明队列
● 删除队列
● 发布确认
● 消息消费(订阅):此过程需设定服务质量(QoS)预取值。在此机制中,确认作为一种信用手段发挥作用,用以更新消费者的当前偏移位置。
● 为消费者指定QoS预取值
● 消费者确认(请遵守QoS预取限制)
● 取消消费者订阅
以上各项功能共同确保了消息传递系统的高效性和可靠性。

流的单一活跃消费者特性

单一活跃消费者特性是RabbitMQ自3.11版本起引入的一项功能,旨在支持独占性消费及确保消息流的连续处理。在多个共享同一消息流与名称的消费者实例中启用此特性的条件下,仅有一个实例能够处于活动状态并接收消息,其余实例则保持待机状态。
该功能主要带来两大优势:

  1. 顺序消息处理:通过保证任何时候仅有单一消费者进行消息处理,从而实现对消息按序处理的需求。
  2. 消费过程的无缝切换:当当前活动中的消费者因故停止运行或发生故障时,系统将自动选择另一个备用消费者接管工作,以维持服务的持续性和稳定性。

超级流

超级流是一种通过将大规模的数据流分割为较小的子流来实现扩展的技术方案。这种机制与单一活跃消费者的模式相兼容,以确保各分区内消息传递的顺序性。自RabbitMQ 3.11版本起,该功能正式可用。
从结构上看,超级流是由一系列独立的传统流组成的逻辑单元。这种方法允许通过RabbitMQ流架构来拓展发布和消费能力:即把一个大的逻辑流拆分成多个分区流,并将这些分区分布在不同的集群节点上,从而达到负载均衡的效果。尽管如此,对于应用程序而言,整个超级流仍被视为一个统一的“大型”流,这主要归功于客户端库提供的抽象层支持。
在设计层面,超级流遵循了AMQP 0.9.1标准中的核心概念,包括交换器、队列以及它们之间的绑定关系。这意味着可以通过任一符合AMQP 0.9.1协议的库或管理插件来构建超级流的相关配置,具体操作包括定义Direct类型的交换器、“分区”形式的流及其间的关联规则。为了简化这一过程,推荐使用rabbitmq-streams add_super_stream命令行工具。例如,要创建一个名为invoices且包含3个分区的超级流,可执行如下指令:

1
rabbitmq-streams add_super_stream invoices --partitions 3

更多关于此命令的帮助信息,可通过运行rabbitmq-streams add_super_stream –help获取。
需要注意的是,相较于单一流处理方式,采用超级流会引入额外的复杂度。因此,在实际部署时不应无条件地将其作为所有涉及流处理场景的标准选择。仅当确认现有的单一流架构已无法满足需求时,才建议转向超级流方案。
RabbitMQ流提供了服务器端过滤机制,从而避免了客户端接收并处理整个消息流的必要性。当消费应用程序仅需处理特定子集的消息(例如来自特定地理区域的信息)时,这种机制有助于显著减少网络带宽消耗。流过滤功能支持多种协议,包括Stream Protocol、AMQP 0.9.1以及STOMP。以下示例将以AMQP 0.9.1为基础进行说明。
为了使过滤生效,发布消息时必须附加相应的过滤值。此值通过x-stream-filter-value头部字段指定:

1
2
3
4
5
6
7
8
channel.basicPublish(
"", // 默认交换机
"my-stream",
new AMQP.BasicProperties.Builder()
.headers(Collections.singletonMap("x-stream-filter-value", "california")) // 设置过滤值
.build(),
body
);

消费者若希望仅接收到与特定过滤条件匹配的消息,则需在订阅时指明x-stream-filter参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
channel.basicQos(100); // 配置服务质量
channel.basicConsume(
"my-stream",
false,
Collections.singletonMap("x-stream-filter", "california"), // 指定过滤器
(consumerTag, message) -> {
Map<String, Object> headers = message.getProperties().getHeaders();
if ("california".equals(headers.get("x-stream-filter-value"))) { // 客户端二次确认
// 处理消息
}
channel.basicAck(message.getEnvelope().getDeliveryTag(), false); // 确认消息已处理
},
consumerTag -> {});

值得注意的是,尽管实现了服务器端过滤,但该过程本质上是基于概率实现的——即使用Bloom Filter等空间高效的数据结构来近似匹配。这意味着偶尔可能会出现误报情况,因此建议在客户端层面也实施一定的过滤逻辑以确保准确性。
关于过滤机制的一些额外注意事项如下:
● 同一流中可以同时存在带有和不带有过滤值的消息。
● 当消费者设置了过滤器后,默认情况下未标记过滤值的消息不会被传递给该消费者;不过,通过设置x-stream-match-unfiltered=true参数可以改变这一行为,允许接收所有未经过滤的消息。
● x-stream-filter消费者参数既接受单个字符串形式的过滤条件,也支持数组形式以适应多个不同过滤标准的需求。
特性对比:常规队列 vs. 流
从传统意义上讲,RabbitMQ流并非真正的队列,因此它们并不完全符合AMQP 0.9.1定义下的队列语义。鉴于其本质特征,某些适用于其他类型队列的功能对于流来说可能无法提供或永远不可能实现。然而,只要采用了消费者确认机制,原本设计用于常规队列的AMQP 0.9.1客户端库同样能够有效地利用流服务。此外,由于流采用非破坏性读取模式,这进一步限制了一些功能的适用性。

数据保留

流被设计为不可变的仅追加磁盘日志。这意味着,除非采取特定措施,否则日志将无限制地增长直至磁盘空间耗尽。为了避免这种情形,可以针对每个流配置数据保留策略,基于日志的数据总量或数据年龄自动清除最旧的日志记录。
对于流的保留设置,有两个主要参数可用来控制:
● max-age:定义了数据在被删除前能够存在的最长持续时间。支持的时间单位包括年(Y)、月(M)、天(D)、小时(h)、分钟(m)和秒(s)。例如,设置为7D表示数据保留期限为一周。
● max-length-bytes:设定了整个日志文件允许的最大尺寸(以字节计)。
需要注意的是,这些保留规则是基于日志段进行评估执行的,因此还有一个额外的因素会影响实际的数据保留行为——即单个日志段的大小。无论怎样,只要某段内含有至少一条消息,那么该段就将被保留下来。此外,在使用代理提供的偏移跟踪功能时,消费者的偏移信息会被视为非消息数据存储于流中。

性能特性

鉴于所有数据操作前都会先写入磁盘以确保持久性,推荐采用速度最快的存储介质来提升性能。由于涉及到大量的磁盘I/O活动,随着每条消息体积增大,整体吞吐量可能会有所下降。与仲裁队列类似,增加副本数量虽然提高了数据冗余度,但也导致了更低的吞吐率,因为这增加了复制过程中的工作负荷及达成一致性所需的资源消耗。

流的行为

每个流都配备了一个主写入者(称为领导者)以及零到多个跟随其后的副本。当创建新的流时,会随机挑选一组节点来承载这些副本,但始终包含客户端最初连接的那个节点。领导者的初始指定可以通过以下几种方式之一实现:
● 通过x-queue-leader-locator这一可选参数,
● 或者设置名为queue-leader-locator的策略键,
● 亦或是直接修改配置文件里的相应字段。
关于领导者的选择策略,有如下选项:
● client-local:默认选择与发起请求的客户端相连的节点作为领导者。
● balanced:如果系统内的总队列数少于1000,则选取当前拥有最少领导者角色的节点;反之则随机指派。
为了正常运作,流需要在其声明所在的节点集群内保持足够数量的健康成员在线。一旦负责管理某个流的领导者节点发生故障,将立即从现有的副本集合中选举出一个新的领导者继续提供服务。重新加入网络的副本会尝试追赶最新的状态,而不会影响现有领导者的正常运行。新加入的副本必须经历一个完整的同步过程才能开始参与服务。

数据安全性

流通过跨多个节点复制数据的方式来保障信息的安全,并且只有当数据已经被成功复制到了预定数量的副本后才会向发布者返回确认信号。尽管如此,出于效率考虑,流并不会主动触发操作系统级别的刷新操作(fsync),而是依赖于系统的后台调度机制。因此,在异常断电的情况下,个别节点上的数据可能存在丢失风险。对于那些对数据完整性要求极高的应用场景,建议采用仲裁队列,因为它会在数据被正式写入并刷新至物理存储设备之后才发出确认通知。

可用性

一般来说,即使少数几个副本暂时离线,也不会显著影响整个流的服务可用性。根据具体的部署方案,RabbitMQ可能在恢复过程中自动重启某些节点,但这通常不会干扰到服务的整体连续性。例如,一个设置了三个副本的流能够在失去其中一个节点的情况下继续正常工作;同样地,五副本配置下可以容忍两处故障而不中断服务。然而,假如超过一半以上的节点永久失效,则需人工介入修复。

资源占用情况

相比于传统的仲裁队列,流在CPU和内存使用方面表现得更为高效。除了正在处理中的未完成写入之外,所有其他数据均保存于磁盘上。

偏移量追踪

利用代理内置的偏移量追踪功能(目前仅限于配合流插件使用),消费者的状态信息将以非消息形式存放在流内部。每次更新偏移值都会造成少量额外的磁盘空间开销。

消息编码限制

流使用AMQP 1.0标准对消息进行编码。这意味着,当接收来自AMQP 0.9.1的消息时会发生格式转换。虽然大部分情况下这种转换是可以无缝完成的,但对于包含复杂结构如数组或表作为头信息的情况,则无法完全保留原有格式,因为AMQP 1.0的应用属性部分仅支持基本类型如字符串和数字。

1
2
3
4
5
package com.lixiang.rabbitmq.utils;

public class RabbitConstant {
public static final String QUEUE_SMS_STREAM = "sms.stream";
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.lixiang.rabbitmq.utils;


import java.util.HashMap;
import java.util.Map;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.SneakyThrows;

public class RabbitUtils {
private static ConnectionFactory connectionFactory = new ConnectionFactory();

static {
connectionFactory.setHost("192.168.31.230");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("my_vhost");
}

@SneakyThrows
public static Connection getConnection() {
Connection conn = connectionFactory.newConnection();
return conn;
}

@SneakyThrows
public static void init(Channel channel){
Map<String, Object> quArgs = new HashMap<>();
quArgs.put("x-queue-type", "stream");
quArgs.put("x-max-length-bytes", 1000000000); // 1GB队列大小限制
quArgs.put("max-age", "7d"); // 1GB每个段大小限制
channel.queueDeclare(RabbitConstant.QUEUE_SMS_STREAM, true, false, false, quArgs);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.lixiang.rabbitmq;

import com.google.gson.Gson;
import com.lixiang.rabbitmq.entity.SMS;
import com.lixiang.rabbitmq.utils.RabbitConstant;
import com.lixiang.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

@Slf4j
public class OrderSystem {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
try (Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();) {
RabbitUtils.init(channel);

for (int i = 100; i < 200; i++) {
SMS sms = new SMS("乘客" + i, "13900000" + i, "您的车票已预订成功");
String jsonSMS = new Gson().toJson(sms);
channel.basicPublish("", RabbitConstant.QUEUE_SMS_STREAM, null, jsonSMS.getBytes());
}

log.info("订单信息已投递至Broker");
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package com.lixiang.rabbitmq;

import java.io.IOException;
import java.util.Collections;

import com.lixiang.rabbitmq.utils.RabbitConstant;
import com.lixiang.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class SmsSender {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.basicQos(1);
RabbitUtils.init(channel);
channel.basicConsume(RabbitConstant.QUEUE_SMS_STREAM,
false, Collections.singletonMap("x-stream-offset", "first"),
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String jsonSMS = new String(body);
log.info("SMSSender-短信发送成功:{}", jsonSMS);
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}