Spring AMQP概述
Spring AMQP是Spring框架对AMQP协议的支持项目,主要包含两个部分:
- spring-amqp:基础抽象
- spring-rabbit:RabbitMQ实现
Spring AMQP提供了:
● 用于异步处理入站消息的监听器容器
● RabbitTemplate用于发送和接收消息
● 声明队列、交换机和绑定的支持
代码案例
● pom.xml
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
● 配置文件
1 2 3 4 5 6 7 8 9 10 11 12
| spring: rabbitmq: host: 192.168.31.230 port: 5672 virtual-host: v_myhost username: admin password: admin listener: simple: acknowledge-mode: manual application: name: amqp
|
● RabbitMQ配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @Configuration public class RabbitMQConfig { @Bean public Queue myQueue() { return new Queue(RabbitMQConstant.QUEUE_NAME, true, false, false, null); } @Bean public Exchange myExchange() { return new FanoutExchange(RabbitMQConstant.EXCHANGE_NAME, true, false, null); } @Bean public Binding myBinding() { return BindingBuilder.bind(myQueue()).to((FanoutExchange) myExchange()); } }
|
● 消息生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Service @Slf4j public class RabbitMQProducer { @Autowired private RabbitTemplate rabbitTemplate;
public void sendMessage(String message) { rabbitTemplate.convertAndSend( RabbitMQConstant.EXCHANGE_NAME, "", message ); log.info("消息发送成功: {}", message); } }
|
● 消息消费者(手动确认)
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @Service @Slf4j public class RabbitMQConsumer { @RabbitListener(queues = RabbitMQConstant.QUEUE_NAME) public void receiveMessage(String message, Message amqpMessage, Channel channel) throws IOException { try { log.info("收到消息: {}", message); channel.basicAck(amqpMessage.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { channel.basicNack(amqpMessage.getMessageProperties().getDeliveryTag(), false, true); } } }
|