RocketMQ生产者投递消息同步复制和异步复制

Java应用接入RocketMQ生产消息

● pom依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<repositories> 
    <repository> 
        <id>aliyun</id> 
        <name>aliyun</name> 
        <url>https://maven.aliyun.com/repository/public</url> 
    </repository> 
</repositories> 
<pluginRepositories> 
    <pluginRepository> 
        <id>aliyun</id> 
        <name>aliyun</name> 
        <url>https://maven.aliyun.com/repository/public</url> 
    </pluginRepository> 
</pluginRepositories> 
<dependency> 
    <groupId>org.apache.rocketmq</groupId> 
    <artifactId>rocketmq-client</artifactId> 
    <version>4.9.2</version> 
</dependency> 

● 生产者投递消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException 
    //创建DefaultMQProducer消息生产者对象 
    DefaultMQProducer producer = new DefaultMQProducer("pg1"); 
    //设置NameServer 
    producer.setNamesrvAddr("192.168.31.103:9876"); 
    //设置NameServer节点地址,多个节点间用分号分割 
    //producer.setNamesrvAddr("192.168.31.103:9876");
    try
        //与NameServer建立长连接 
        producer.start(); 
        //发送一百条数据 
        for(int i = 0 ; i< 100 ; i++) { 
            //数据正文 
            String data = "{\"title\":\"X市2021年度第四季度税务汇总数据\"}"
            /*创建消息 
              Message消息三个参数 
              topic:代表消息主题,自定义为tax-data-topic说明是税务数据tags代表标志,用于消费者接收数据时进行数据筛选。
2021S1:代表2021年第一季度数据
body:代表消息内容 
            */ 
            Message message = new Message("TAX""2021S4", data.getBytes()); 
            //发送消息,获取发送结果 
            SendResult result = null
            result = producer.send(message); 
            //将发送结果对象打印在控制台 
            System.out.println("消息已发送:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus()); 
        } 
    }  finally { 
        try { 
            producer.shutdown(); 
        }catch (Exception e){ 
        } 
    } 

● 执行结果,同步复制下,数据持久化到Master主队列,由于没有从节点复制失败

1
2
3
消息已发送:MsgId:C0A81F6D310418B4AAC26F9FD63F0061,发送状态:SLAVE_NOT_AVAILABLE 
消息已发送:MsgId:C0A81F6D310418B4AAC26F9FD6430062,发送状态:SLAVE_NOT_AVAILABLE 
消息已发送:MsgId:C0A81F6D310418B4AAC26F9FD6460063,发送状态:SLAVE_NOT_AVAILABLE 

同步复制与异步复制的区别

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#关闭Broker 
sh /usr/local/rocketmq-4.9.2/bin/mqshutdown broker 
#重新生成配置文件,改为异步复制 
cat > single-master.properties <<-'EOF' 
#集群名称,同一个集群下的broker要求统一 
brokerClusterName=DefaultCluster 
#broker名称 
brokerName=broker-a 
#brokerId=0代表主节点,大于零代表从节点 
brokerId=0 
#删除日志文件时间点,默认凌晨 4 点 
deleteWhen=04 
#日志文件保留时间,默认 48 小时 fileReservedTime=48 
#Broker 复制策略 
#- ASYNC_MASTER 异步复制Master 
#- SYNC_MASTER 同步双写Master 
brokerRole=ASYNC_MASTER 
#刷盘方式 
#- ASYNC_FLUSH 异步刷盘,性能好宕机会丢数 
#- SYNC_FLUSH 同步刷盘,性能较差不会丢数 
flushDiskType=SYNC_FLUSH 
#末尾追加,NameServer节点列表,使用分号分割 
namesrvAddr=192.168.31.103:9876 
autoCreateTopicEnable=true 
EOF 
#防火墙放行 
firewall-cmd --zone=public --add-port=10911/tcp --permanent 
firewall-cmd  --reload 
#启动broker服务 
nohup sh ./bin/mqbroker -c ./single-master.properties > broker.log & 

● 改为异步复制后,发送结果

1
2
3
4
消息已发送:MsgId:C0A81F6D264418B4AAC26FA3EF500060,发送状态:SEND_OK 
消息已发送:MsgId:C0A81F6D264418B4AAC26FA3EF530061,发送状态:SEND_OK 
消息已发送:MsgId:C0A81F6D264418B4AAC26FA3EF550062,发送状态:SEND_OK 
消息已发送:MsgId:C0A81F6D264418B4AAC26FA3EF580063,发送状态:SEND_OK

● 同步复制

image-20220114-1

● 异步复制

image-20220114-2

Java应用接入RocketMQ消费数据

● pom依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<repositories> 
    <repository> 
        <id>aliyun</id> 
        <name>aliyun</name> 
        <url>https://maven.aliyun.com/repository/public</url> 
    </repository> 
</repositories> 
<pluginRepositories> 
    <pluginRepository> 
        <id>aliyun</id> 
        <name>aliyun</name> 
        <url>https://maven.aliyun.com/repository/public</url> 
    </pluginRepository> 
</pluginRepositories>
<dependency> 
    <groupId>org.apache.rocketmq</groupId> 
    <artifactId>rocketmq-client</artifactId> 
    <version>4.9.2</version> 
</dependency> 

● 消费者消费消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//创建消费者对象 
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg1"); 
try { 
    //设置NameServer节点 
    consumer.setNamesrvAddr("192.168.31.103:9876"); 
    /*订阅主题, 
    consumer.subscribe包含两个参数: 
    topic: 说明消费者从Broker订阅哪一个主题,这一项要与Provider保持一致。 
    subExpression: 子表达式用于筛选tags。 
        同一个主题下可以包含很多不同的tags,subExpression用于筛选符合条件的tags进行接收。
        例如:设置为*,则代表接收所有tags数据。 
        例如:设置为2020S4,则Broker中只有tags=2020S4的消息会被接收,而2020S2就会被排除在外。
    */ 
    consumer.subscribe("tax-data""*"); 
    //创建监听,当有新的消息监听程序会及时捕捉并加以处理。     
consumer.registerMessageListener(new MessageListenerConcurrently() { 
        public ConsumeConcurrentlyStatus consumeMessage
             List<MessageExt> msgs, ConsumeConcurrentlyContext context) 
             //批量数据处理 
             for (MessageExt msg : msgs) { 
                 System.out.println("消费者获取数据:" + msg.getMsgId() + "==>" + new String(msg.getBody())); 
                } 
                //返回数据已接收标识 
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; 
            } 
        }); 
    //启动消费者,与Broker建立长连接,开始监听。 
    consumer.start(); 
catch (Exception e) { 
    e.printStackTrace();