RabbitMQ消息可靠性保证
大约 2 分钟开源框架消息队列RabbitMq
生产者
生产者进行消息确认,可以通过发送方确认机制和事务机制两种方式保证消息的可靠性。事务机制会严重影响RabbitMq的性能,一般都采用发送方ACK确认机制实现。
以ACK为例:
声明回调处理
@Component public class MessageConfirmCallback implements RabbitTemplate.ConfirmCallback { @Autowired private RabbitTemplate rabbitTemplate; /** * 创建RabbitTemplate对象之后执行当前方法,为模板对象设置回调确认方法 * 设置消息确认回调方法 * 设置消息回退回调方法 */ @PostConstruct public void initRabbitTemplate(){ //设置消息确认回调方法 rabbitTemplate.setConfirmCallback(this::confirm); } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack){ System.out.println("消息进入交换机成功{}"); } else { System.out.println("消息进入交换机失败{} , 失败原因:" + cause); } } }
配置
# 开启生产者确认模式:(confirm),投递到交换机,不论失败或者成功都回调 spring.rabbitmq.publisher-confirm-type=correlated # 开启return退回模式 spring.rabbitmq.publisher-returns=true
测试用例
@Test void send1() { String context = "hi, fanout msg "; System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("durableExchange2","bbb", context); }
broker
使用镜像集群或者仲裁队列,可参考RabbitMQ持久化与高可用一文。
消费端
如果RabbitMQ成功的把消息发送给了消费者,那么RabbitMQ的ack机制会自动的返回成功,表明发送消息成功,下次就不会发送这个消息。但如果就在此时,消费者还没处理完该消息发送宕机,这个消息将会丢失,
解决方案:关闭自动ACK机制
## 消费端开启手动ACK
spring.rabbitmq.listener.simple.acknowledge-mode=manual
消费端逻辑:
@Component
@RabbitListener(queues = {"direct.a"})
public class TestA {
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitHandler
public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
try {
System.out.println("接收的mq消息:" + message);
// long deliveryTag 消息接收tag boolean multiple 是否批量确认
System.out.println("deliveryTag=" + deliveryTag);
/**
* 无异常就确认消息
* basicAck(long deliveryTag, boolean multiple)
* deliveryTag:取出来当前消息在队列中的的索引;
* multiple:为true的话就是批量确认,如果当前deliveryTag为5,那么就会确认
* deliveryTag为5及其以下的消息;一般设置为false
*/
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
/**
* 有异常就绝收消息
* basicNack(long deliveryTag, boolean multiple, boolean requeue)
* requeue:true为将消息重返当前消息队列,还可以重新发送给消费者;
* false:将消息丢弃
*/
// long deliveryTag, boolean multiple, boolean requeue
try {
//拒绝消息
//重新丢入队列
channel.basicNack(deliveryTag, false, true);
} catch (Exception e1) {
e1.printStackTrace();
}
}
}
}