跳至主要內容

RabbitMQ消息可靠性保证

xw大约 2 分钟开源框架消息队列RabbitMq

生产者

生产者进行消息确认,可以通过发送方确认机制和事务机制两种方式保证消息的可靠性。事务机制会严重影响RabbitMq的性能,一般都采用发送方ACK确认机制实现

以ACK为例:

  1. 声明回调处理

    @Component
    public class MessageConfirmCallback implements RabbitTemplate.ConfirmCallback {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
        /**
         * 创建RabbitTemplate对象之后执行当前方法,为模板对象设置回调确认方法
         * 设置消息确认回调方法
         * 设置消息回退回调方法
         */
        @PostConstruct
        public void initRabbitTemplate(){
            //设置消息确认回调方法
            rabbitTemplate.setConfirmCallback(this::confirm);
        }
    
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            if (ack){
                System.out.println("消息进入交换机成功{}");
            } else {
                System.out.println("消息进入交换机失败{} , 失败原因:" + cause);
            }
        }
    }
    
    
  2. 配置

    # 开启生产者确认模式:(confirm),投递到交换机,不论失败或者成功都回调
    spring.rabbitmq.publisher-confirm-type=correlated
    # 开启return退回模式
    spring.rabbitmq.publisher-returns=true
    
  3. 测试用例

        @Test
        void send1() {
            String context = "hi, fanout msg ";
            System.out.println("Sender : " + context);
            this.rabbitTemplate.convertAndSend("durableExchange2","bbb", context);
        }
    

broker

使用镜像集群或者仲裁队列,可参考RabbitMQ持久化与高可用一文。

消费端

如果RabbitMQ成功的把消息发送给了消费者,那么RabbitMQ的ack机制会自动的返回成功,表明发送消息成功,下次就不会发送这个消息。但如果就在此时,消费者还没处理完该消息发送宕机,这个消息将会丢失,

解决方案:关闭自动ACK机制

## 消费端开启手动ACK
spring.rabbitmq.listener.simple.acknowledge-mode=manual

消费端逻辑:

@Component
@RabbitListener(queues = {"direct.a"})
public class TestA {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitHandler
    public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
        try {
            System.out.println("接收的mq消息:" + message);
            // long deliveryTag 消息接收tag boolean multiple 是否批量确认
            System.out.println("deliveryTag=" + deliveryTag);
            /**
             * 无异常就确认消息
             * basicAck(long deliveryTag, boolean multiple)
             * deliveryTag:取出来当前消息在队列中的的索引;
             * multiple:为true的话就是批量确认,如果当前deliveryTag为5,那么就会确认
             * deliveryTag为5及其以下的消息;一般设置为false
             */ 
            channel.basicAck(deliveryTag, true);
        } catch (Exception e) {

            /**
             * 有异常就绝收消息
             * basicNack(long deliveryTag, boolean multiple, boolean requeue)
             * requeue:true为将消息重返当前消息队列,还可以重新发送给消费者;
             *         false:将消息丢弃
             */

            // long deliveryTag, boolean multiple, boolean requeue
            try {
                //拒绝消息
                //重新丢入队列
                channel.basicNack(deliveryTag, false, true);
            } catch (Exception e1) {
                e1.printStackTrace();
            }

        }
    }
}