RocketMQ五种消息类型

RocketMQ支持五种消息类型

● 普通消息:普通消息也称为并发消息,和传统的队列相比,并发消息没有顺序,但是生产消费都是并行进行的,单机性能可达十万级别的TPS。
● 分区有序消息:与Kafka中的分区类似,把一个Topic消息分为多个分区“保存”和消费,在一个分区内的消息就是传统的队列,遵循FIFO(先进先出)原则。
● 全局有序消息:如果把一个Topic的分区数设置为1,那么该Topic中的消息就是单分区,所有消息都遵循FIFO(先进先出)的原则。
● 延迟消息:消息发送后,消费者要在一定时间后,或者指定某个时间点才可以消费。在没有延迟消息时,基本的做法是基于定时计划任务调度,定时发送消息。在 RocketMQ中只需要在发送消息时设置延迟级别即可实现。
● 事务消息:主要涉及分布式事务,即需要保证在多个操作同时成功或者同时失败时,消费者才能消费消息。RocketMQ通过发送Half消息、处理本地事务、提交(Commit)消息或者回滚(Rollback)消息优雅地实现分布式事务。

image-20220116-1

Broker、分区、队列的关系

image-20220116-2

RocketMQ发送之普通消息

image-20220116-3

● NameServer:192.168.31.103
● Master:192.168.31.105
● Slave:192.168.31.111

执行流程

1. Master与Slave启动向NameServer注册
2. 生产者Producer发送数据前从NameServer获取Master的IP、端口等通信参数
3. 生产者Producer向Master发送消息
4. Master向Slave进行消息同步

代码案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
//普通消息类型 
@Slf4j 
public class MessageType1 
    public static void main(String[] args) 
        //DefaultMQProducer用于发送非事务消息 
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); 
        //注册NameServer地址 
        producer.setNamesrvAddr("192.168.31.103:9876"); 
        //异步发送失败后Producer自动重试2次 
        producer.setRetryTimesWhenSendAsyncFailed(2); 
        try { 
            //启动生产者实例 
            producer.start(); 
            //消息数据 
            String data = "{\"title\":\"汇总数据\"}"
            //消息主题             
Message message = new Message("tax-data""2021S4", data.getBytes()); 
            //发送结果 
            SendResult result = producer.send(message); 
            log.info("Broker响应:" + result); 
        } catch (Exception e) { 
            e.printStackTrace(); 
        } finally { 
            try { 
                //关闭连接 
                producer.shutdown(); 
                log.info("连接已关闭"); 
            }catch (Exception e){ 
                e.printStackTrace(); 
            } 
        } 
    } 

运行结果

1
2
3
4
5
6
7
8
23:00:48.741 [main] INFO com.lixiang.rocketmq.mtype.MessageType1 - Broker响应:
SendResult [ 
    sendStatus=SEND_OK,  
    msgId=7F000001347018B4AAC20A1687DC0000,  
    offsetMsgId=C0A81F6900002A9F00000000000A21CB,  
    messageQueue=MessageQueue [topic=tax-data, brokerName=broker-a, queueId=3],  
    queueOffset=2 

● sendStatus:发送状态,SEND_OK代表成功
● msgId:消息由RocketMQ分配的全局唯一Id,由producer客户端生成,调用方法MessageClientIDSetter.createUniqID()生成全局唯一的Id
● offsetMsgId:Broker服务端将消息追加到内存后会返回其物理偏移量,即在commitlog文件中的偏移量,然后会生成一个Id
● messageQueue:消息队列内容
● topic:主题名称
● brokerName:broker服务器名字,在RocketMQ xxx.propertites配置文件中brokerName项定义
● queueId:queueId队列Id,默认会初始化4个(0-3)
● queueOffset:queueId对应队列逻辑上的位置(偏移量)