环境准备

生产者CmProducer
生产者是一致的,循环生成10条普通消息投给给Broker,主题为:cm-sample-data ,Tag:test ,Key:n
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| @Slf4j public class CmProducer { public static void main(String[] args) { DefaultMQProducer producer = new DefaultMQProducer("cm-producer-group"); producer.setNamesrvAddr("192.168.31.103:9876"); producer.setRetryTimesWhenSendAsyncFailed(2); try { producer.start(); for(Integer i = 0 ; i < 10 ; i++) { String data = "第" + i + "条消息数据"; Message message = new Message("cm-sample-data", "test", i.toString(), data.getBytes()); SendResult result = producer.send(message); log.info("Broker响应:" + result); } }catch (Exception e){ e.printStackTrace(); }finally { try { producer.shutdown(); log.info("连接已关闭"); }catch (Exception e){ e.printStackTrace(); } } } }
|
集群模式消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| @Slf4j public class CmClusterConsumer { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cm-cluster-consumer-group"); consumer.setNamesrvAddr("192.168.31.103:9876"); consumer.setMessageModel(MessageModel.CLUSTERING); consumer.subscribe("cm-sample-data", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { list.forEach(msg->{ log.info("收到消息:" + new String(msg.getBody())); }); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); log.info("集群消费者启动成功,正在监听新消息"); } }
|
运行结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| 启动1-4个实例: 实例1: 21:54:58.944 [main] INFO com.lixiang.rocketmq.consumemode.CmClusterConsumer - 集群消费者启动成功,正在监听新消息 21:55:08.963 [ConsumeMessageThread_3] INFO com.lixiang.rocketmq.consumemode.CmClusterConsumer - 收到消息:第2条消息数据 21:55:08.979 [ConsumeMessageThread_5] INFO om.lixiang.rocketmq.consumemode.CmClusterConsumer - 收到消息:第6条消息数据 实例2: 21:55:01.010 [main] INFO com.lixiang.rocketmq.consumemode.CmClusterConsumer - 集群消费者启动成功,正在监听新消息 21:55:08.949 [ConsumeMessageThread_1] INFO com.lixiang.rocketmq.consumemode.CmClusterConsumer - 收到消息:第0条消息数据 21:55:08.949 [ConsumeMessageThread_3] INFO com.lixiang.rocketmq.consumemode.CmClusterConsumer - 收到消息:第4条消息数据 21:55:08.985 [ConsumeMessageThread_4] INFO com.lixiang.rocketmq.consumemode.CmClusterConsumer - 收到消息:第8条消息数据 实例3: 21:55:02.987 [main] INFO com.lixiang.rocketmq.consumemode.CmClusterConsumer - 集群消费者启动成功,正在监听新消息 21:55:08.965 [ConsumeMessageThread_1] INFO com.lixiang.rocketmq.consumemode.CmClusterConsumer - 收到消息:第1条消息数据 21:55:08.978 [ConsumeMessageThread_2] INFO com.lixiang.rocketmq.consumemode.CmClusterConsumer - 收到消息:第5条消息数据 21:55:08.988 [ConsumeMessageThread_3] INFO com.lixiang.rocketmq.consumemode.CmClusterConsumer - 收到消息:第9条消息数据 实例4: 21:55:04.490 [main] INFO com.lixiang.rocketmq.consumemode.CmClusterConsumer - 集群消费者启动成功,正在监听新消息 21:55:08.978 [ConsumeMessageThread_1] INFO com.lixiang.rocketmq.consumemode.CmClusterConsumer - 收到消息:第3条消息数据 21:55:08.982 [ConsumeMessageThread_2] INFO com.lixiang.rocketmq.consumemode.CmClusterConsumer - 收到消息:第7条消息数据
|
广播模式消费者
只有setMessageModel方法传入BROADCASTING常量,其他没有任何变化
1 2 3 4 5 6 7 8 9
| @Slf4j public class CmBroadcastConsumer { public static void main(String[] args) throws Exception { consumer.setMessageModel(MessageModel.BROADCASTING); } }
|
运行结果
1-4个实例均消费到10条消息,不过不同实例之间获取消息的前后顺序均有差别。
1 2 3 4 5 6 7 8 9 10 11
| 21:59:10.398 [main] INFO com.lixiang.rocketmq.consumemode.CmBroadcastConsumer - 广播消费者启动成功,正在监听新消息 21:59:16.379 [ConsumeMessageThread_5] INFO com.lixiang.rocketmq.consumemode.CmBroadcastConsumer - 收到消息:第4条消息数据 21:59:16.380 [ConsumeMessageThread_3] INFO com.lixiang.rocketmq.consumemode.CmBroadcastConsumer - 收到消息:第2条消息数据 21:59:16.380 [ConsumeMessageThread_1] INFO com.lixiang.rocketmq.consumemode.CmBroadcastConsumer - 收到消息:第0条消息数据 21:59:16.380 [ConsumeMessageThread_4] INFO com.lixiang.rocketmq.consumemode.CmBroadcastConsumer - 收到消息:第3条消息数据 21:59:16.380 [ConsumeMessageThread_2] INFO com.lixiang.rocketmq.consumemode.CmBroadcastConsumer - 收到消息:第1条消息数据 21:59:16.381 [ConsumeMessageThread_6] INFO com.lixiang.rocketmq.consumemode.CmBroadcastConsumer - 收到消息:第5条消息数据 21:59:16.386 [ConsumeMessageThread_7] INFO com.lixiang.rocketmq.consumemode.CmBroadcastConsumer - 收到消息:第6条消息数据 21:59:16.389 [ConsumeMessageThread_8] INFO com.lixiang.rocketmq.consumemode.CmBroadcastConsumer - 收到消息:第7条消息数据 21:59:16.399 [ConsumeMessageThread_9] INFO com.lixiang.rocketmq.consumemode.CmBroadcastConsumer - 收到消息:第8条消息数据 21:59:16.399 [ConsumeMessageThread_10] INFO com.lixiang.rocketmq.consumemode.CmBroadcastConsumer - 收到消息:第9条消息数据
|