spring-rabbit的应用

Spring AMQP概述

Spring AMQP是Spring框架对AMQP协议的支持项目,主要包含两个部分:

  1. spring-amqp:基础抽象
  2. spring-rabbit:RabbitMQ实现
    Spring AMQP提供了:
    ● 用于异步处理入站消息的监听器容器
    ● RabbitTemplate用于发送和接收消息
    ● 声明队列、交换机和绑定的支持

代码案例

● pom.xml

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

● 配置文件

1
2
3
4
5
6
7
8
9
10
11
12
spring:
rabbitmq:
host: 192.168.31.230
port: 5672
virtual-host: v_myhost
username: admin
password: admin
listener:
simple:
acknowledge-mode: manual
application:
name: amqp

● RabbitMQ配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Configuration
public class RabbitMQConfig {
@Bean
public Queue myQueue() {
return new Queue(RabbitMQConstant.QUEUE_NAME, true, false, false, null);
}

@Bean
public Exchange myExchange() {
return new FanoutExchange(RabbitMQConstant.EXCHANGE_NAME, true, false, null);
}

@Bean
public Binding myBinding() {
return BindingBuilder.bind(myQueue()).to((FanoutExchange) myExchange());
}
}

● 消息生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Service
@Slf4j
public class RabbitMQProducer {
@Autowired
private RabbitTemplate rabbitTemplate;

public void sendMessage(String message) {
rabbitTemplate.convertAndSend(
RabbitMQConstant.EXCHANGE_NAME,
"", // FanoutExchange不需要路由键
message
);
log.info("消息发送成功: {}", message);
}
}

● 消息消费者(手动确认)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Service
@Slf4j
public class RabbitMQConsumer {
@RabbitListener(queues = RabbitMQConstant.QUEUE_NAME)
public void receiveMessage(String message, Message amqpMessage, Channel channel) throws IOException {
try {
log.info("收到消息: {}", message);
// 业务处理...
channel.basicAck(amqpMessage.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicNack(amqpMessage.getMessageProperties().getDeliveryTag(), false, true);
}
}
}