Java应用接入RocketMQ生产消息
● pom依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| <repositories> <repository> <id>aliyun</id> <name>aliyun</name> <url>https: </repository> </repositories> <pluginRepositories> <pluginRepository> <id>aliyun</id> <name>aliyun</name> <url>https: </pluginRepository> </pluginRepositories> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.9.2</version> </dependency>
|
● 生产者投递消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException { DefaultMQProducer producer = new DefaultMQProducer("pg1"); producer.setNamesrvAddr("192.168.31.103:9876"); try{ producer.start(); for(int i = 0 ; i< 100 ; i++) { String data = "{\"title\":\"X市2021年度第四季度税务汇总数据\"}";
Message message = new Message("TAX", "2021S4", data.getBytes()); SendResult result = null; result = producer.send(message); System.out.println("消息已发送:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus()); } } finally { try { producer.shutdown(); }catch (Exception e){ } } }
|
● 执行结果,同步复制下,数据持久化到Master主队列,由于没有从节点复制失败
1 2 3
| 消息已发送:MsgId:C0A81F6D310418B4AAC26F9FD63F0061,发送状态:SLAVE_NOT_AVAILABLE 消息已发送:MsgId:C0A81F6D310418B4AAC26F9FD6430062,发送状态:SLAVE_NOT_AVAILABLE 消息已发送:MsgId:C0A81F6D310418B4AAC26F9FD6460063,发送状态:SLAVE_NOT_AVAILABLE
|
同步复制与异步复制的区别
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| #关闭Broker sh /usr/local/rocketmq-4.9.2/bin/mqshutdown broker #重新生成配置文件,改为异步复制 cat > single-master.properties <<-'EOF' #集群名称,同一个集群下的broker要求统一 brokerClusterName=DefaultCluster #broker名称 brokerName=broker-a #brokerId=0代表主节点,大于零代表从节点 brokerId=0 #删除日志文件时间点,默认凌晨 4 点 deleteWhen=04 #日志文件保留时间,默认 48 小时 fileReservedTime=48 #Broker 复制策略 #- ASYNC_MASTER 异步复制Master #- SYNC_MASTER 同步双写Master brokerRole=ASYNC_MASTER #刷盘方式 #- ASYNC_FLUSH 异步刷盘,性能好宕机会丢数 #- SYNC_FLUSH 同步刷盘,性能较差不会丢数 flushDiskType=SYNC_FLUSH #末尾追加,NameServer节点列表,使用分号分割 namesrvAddr=192.168.31.103:9876 autoCreateTopicEnable=true EOF #防火墙放行 firewall-cmd --zone=public --add-port=10911/tcp --permanent firewall-cmd --reload #启动broker服务 nohup sh ./bin/mqbroker -c ./single-master.properties > broker.log &
|
● 改为异步复制后,发送结果
1 2 3 4
| 消息已发送:MsgId:C0A81F6D264418B4AAC26FA3EF500060,发送状态:SEND_OK 消息已发送:MsgId:C0A81F6D264418B4AAC26FA3EF530061,发送状态:SEND_OK 消息已发送:MsgId:C0A81F6D264418B4AAC26FA3EF550062,发送状态:SEND_OK 消息已发送:MsgId:C0A81F6D264418B4AAC26FA3EF580063,发送状态:SEND_OK
|
● 同步复制

● 异步复制

Java应用接入RocketMQ消费数据
● pom依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| <repositories> <repository> <id>aliyun</id> <name>aliyun</name> <url>https: </repository> </repositories> <pluginRepositories> <pluginRepository> <id>aliyun</id> <name>aliyun</name> <url>https: </pluginRepository> </pluginRepositories> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.9.2</version> </dependency>
|
● 消费者消费消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg1"); try { consumer.setNamesrvAddr("192.168.31.103:9876");
consumer.subscribe("tax-data", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println("消费者获取数据:" + msg.getMsgId() + "==>" + new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } catch (Exception e) { e.printStackTrace(); }
|