RocketMQ Push与Pull模式
RocketMQ在消费者层面拥有两种数据获取方式:
● 推送模式Push

推送模式顾名思义,是由Broker作为发起方,主动向消费者将最新产生的数据推送过来,Push模式使用的消费者类为DefaultMQPushConsumer,在前面我们一直在利用DefaultMQPushConsumer进行开发。
1 | DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group"); |
● 拉取模式Pull

拉取模式是指由消费者作为发起方,定时向Broker发起队列查询请求,Broker将最近还没有消费的消息返回给消费者进行处理,拉取模式最显著的特点是发起者为消费方,定时拉取最近未消费数据,Broker返回后再有消费者确认接收应答。
Pull模式
在RocketMQ 4.6版本后,默认Pull模式不再使用DefaultMQPullConsumer拉取消息,改为使用DefaultLitePullConsumer,DefaultLitePullConsumer是更加轻量级的Pull模式消费者实现,以前使用DefaultMQPullConsumer实现Pull模式时要关注大量拉取时的开发细节,但这些在引入DefaultLitePullConsumer以后变的非常简单,RocketMQ做好了大量封装,让开发者只是用简单的几个API便可以实现复杂的拉取操作。
生产者PullProducer
生产者部分和标准代码相同,这里模拟发送100条税务数据
1 | public static void main(String[] args) { |
消费者PullConsumer
消费者部分变化较大,体现在三方面:
● 采用DefaultLitePullConsumer 对象拉取
● 调用poll方法拉取未消费数据
● 需要自定义获取间隔
1 | public static void main(String[] args) throws Exception { |
运行结果
● 情况1:先启动消费者,再启动生产者通过观察可以得到,消费者轮询每个队列,每次只取1个消息进行处理,因为发送消息的时候是1条1条发送的。
1 | 14:51:13.689 [main] INFO com.lixiang.rocketmq.pull.PullConsumer - 消费者启动成功,正在监听新 |
● 情况2:先启动生产者,在启动消费者通过观察得到,消费者在启动时会自动将之前积压在队列中的数据批量消费,默认每次10条,直到将所有积压数据消费掉然后再1条1条获取。类似情况下,如果生产者后续一次批量发送10条,消费者也是poll一次10条,最多不超过PullBatchSize默认10。
1 | 14:55:03.757 [main] INFO com.lixiang.rocketmq.pull.PullConsumer - 消费者启动成功,正在监听新 |
poll方法默认自动提交,可以手动提交吗
可以,需要两点调整:
● consumer.setAutoCommit(false); //关闭自动提交
● consumer.commitSync();//手动提交消费进度
注意:一定要在业务处理完毕后确保手动执行commitSync方法,否则下次消息会被重复拉取。工作中更推荐使用这种方式,在处理业务后手动提交,可以避免数据因为业务失败导致的消息丢失。
完整代码如下:
1 | public static void main(String[] args) throws Exception { |
如何调整批量获取数量
1 | consumer.setPullBatchSize(5); |
运行结果
1 | 15:03:04.160 [main] INFO com.lixiang.rocketmq.pull.PullConsumer - 消费者启动成功,正在监听新 |