RabbtiMQ:发布确认高级(rabbitmq生产者消息确认)

在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。那么,如何才能进行 RabbitMQ 的消息可靠投递呢? 特别是在这样比较极端的情况,RabbitMQ 集群不可用的时候,无法投递的消息该如何处理呢

发布确认springboot版

确认机制方案

RabbtiMQ:发布确认高级(rabbitmq生产者消息确认)

代码架构图

RabbtiMQ:发布确认高级(rabbitmq生产者消息确认)

配置文件

在配置文件当中需要添加
spring.rabbitmq.publisher-confirm-type=correlated
⚫ NONE
禁用发布确认模式,是默认值
⚫ CORRELATED
发布消息成功到交换器后会触发回调方法
⚫ SIMPLE

经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法,
其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker

回退消息

在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。

通过设置 mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。

配置文件中加入: spring.rabbitmq.publisher-returns=true

实现 在下方的   MyCallBack.java

代码

配置类:ConfirmConfig

消息生产者:ProducerController

回调接口:MyCallBack

消息消费者:Consumer

测试

修改上面 生产者中的代码,使其发送两条消息,routingKey为key1(存在),key12(不存在),结果:

RabbtiMQ:发布确认高级(rabbitmq生产者消息确认)

 备份交换机

备份交换机可以理解为 RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列了。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。

代码结构

RabbtiMQ:发布确认高级(rabbitmq生产者消息确认)

修改配置类

 报警消费者

测试

修改上面 生产者中的代码,使其发送两条消息,routingKey为key1(存在),key12(不存在),结果:

RabbtiMQ:发布确认高级(rabbitmq生产者消息确认)

正文完
 0