在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。那么,如何才能进行 RabbitMQ 的消息可靠投递呢? 特别是在这样比较极端的情况,RabbitMQ 集群不可用的时候,无法投递的消息该如何处理呢
发布确认springboot版
确认机制方案

代码架构图

配置文件
在配置文件当中需要添加
spring.rabbitmq.publisher-confirm-type=correlated
⚫ NONE
禁用发布确认模式,是默认值
⚫ CORRELATED
发布消息成功到交换器后会触发回调方法
⚫ SIMPLE
经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法,
其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker
回退消息
在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。
通过设置 mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。
配置文件中加入: spring.rabbitmq.publisher-returns=true
实现 在下方的 MyCallBack.java
代码
配置类:ConfirmConfig
消息生产者:ProducerController
回调接口:MyCallBack
消息消费者:Consumer
测试
修改上面 生产者中的代码,使其发送两条消息,routingKey为key1(存在),key12(不存在),结果:

备份交换机
备份交换机可以理解为 RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列了。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。
代码结构

修改配置类
报警消费者
测试
修改上面 生产者中的代码,使其发送两条消息,routingKey为key1(存在),key12(不存在),结果:
