为了保证消息从队列可靠的达到消费者,RabbitMQ 提供了 消息确认机制 (Message Acknowledgement)。消费者在订阅队列时,可以指定 autoAck 参数,当 autoAck 参数等于 false 时,RabbitMQ 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移除消息(实际上是先打上删除标记,之后在删除)。当 autoAck 参数等于 true 时,RabbitMQ 会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息。
采用消息确认机制,只要将自动ack设置为false,消费者有足够的时间处理消息数据,消费者可以在处理完后续的业务逻辑后再进行提交ack,确保消息确实是被消费了,防止服务宕机可能导致的消息丢失。而MQ会一直等待消费者手动提交ack!!
在rabbitmq管理页面上可以详细看到队列中的消息情况:
自动ACK: 消费者 配置中如果是自动ack机制, MQ将消息发送给消费者后直接就将消息给删除了 ,这个的前提条件是消费者程序没有出现异常,如果消费者接收消息后处理时出现异常,那么MQ将会尝试重发消息给消费者直至达到了 消费者服务 中配置的最大重试次数后将会直接抛出异常不再重试。
手动ACK: 消费者 设置了手动ACK机制后,可以显式的提交/拒绝消息(这一步骤叫做发送ACK),如果消息被消费后正常被提交了ack,那么此消息可以说是流程走完了,然后MQ将此消息从队列中删除。而如果消息被消费后被拒绝了,消费者可选择让MQ重发此消息或者让MQ直接移除此消息。后面可以使用死信队列来进行接收这些被消费者拒绝的消息,再进行后续的业务处理。
RabbitMQ 消息确认机制分为两大类:发送方确认、接收方确认。
其中发送方确认又分为:生产者到交换机到确认、交换机到队列的确认。(借用下大佬的图)
使用
application.yml
spring: rabbitmq: host: 127.0.0.1 port: 5672 username: admin password: admin # 消息确认(ACK) publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange) publisher-returns: true #确认消息已发送到队列(Queue)理解:springboot中需要给RabbitTemplate设置一些方法的回调即可。
通常情况下我们可以直接在配置类中设置好这些东西,但是可能由于某些业务需求,并不是所有的消息都使用常用的方式,也可以将我们的消息发送服务实现接口然后重写这些回调。
配置类方式(全局方式):
@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数 rabbitTemplate.setMandatory(true); //确认消息送到交换机(Exchange)回调 rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { System.out.println("
确认消息送到交换机(Exchange)结果:"); System.out.println("相关数据:" + correlationData); System.out.println("是否成功:" + ack); System.out.println("错误原因:" + cause); }); //确认消息送到队列(Queue)回调 rabbitTemplate.setReturnsCallback(returnedMessage -> { System.out.println("
确认消息送到队列(Queue)结果:"); System.out.println("发生消息:" + returnedMessage.getMessage()); System.out.println("回应码:" + returnedMessage.getReplyCode()); System.out.println("回应信息:" + returnedMessage.getReplyText()); System.out.println("交换机:" + returnedMessage.getExchange()); System.out.println("路由键:" + returnedMessage.getRoutingKey()); }); return rabbitTemplate;}以接口的形式访问发送一下。注意:确认消息送到队列(Queue)回调,只有在出现错误时才回调。
将发送的服务类实现接口,实现回调
@Servicepublic class SendMessageService implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback { private static Logger logger = LoggerFactory.getLogger(SendMessageService.class); @Autowired public RabbitTemplate rabbitTemplate; public void sendMessage(String str){ rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnsCallback(this); rabbitTemplate.setConfirmCallback(this); // CorrelationData构造函数中的id可以随便写,但是必须要非null而且是唯一的 rabbitTemplate.convertAndSend("exchange","routingKey", str,new CorrelationData(UUID.randomUUID().toString())); } @Override public void returnedMessage(ReturnedMessage returnedMessage) { System.out.println("sender return success" + returnedMessage.toString()); } @Override public void confirm(CorrelationData correlationData, boolean b, String s) { logger.info("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"); if (!b) { logger.error("消息发送异常!"); // 进行处理 } else { logger.info("发送者已经收到确认,correlationData={} ,ack={}, cause={}", correlationData.getId(), b, s); } }}接口方式访问下。没问题
需要注意的是:配置类方式和局部方式只能选择其一,如果一个RabbitTemplate设置了两个或者多个ConfirmCallback/ReturnCallback,会报错的不支持。类似这样的报错:Only one ConfirmCallback/ReturnCallback is supported by each RabbitTemplate。在开发过程中需要注意!!
消费者确认发生在监听队列的消费者处理业务失败,如:发生了异常,不符合要求的数据等,这些场景我们就需要手动处理,比如重新发送或者丢弃。
RabbitMQ 消息确认机制(ACK)默认是自动确认的,自动确认会在消息发送给消费者后立即确认,但存在丢失消息的可能,如果消费端消费逻辑抛出异常,假如你用回滚了也只是保证了数据的一致性,但是消息还是丢了,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息。
消费者收到消息后,手动调用 Basic.Ack 或 Basic.Nack 或 Basic.Reject 后,RabbitMQ 收到这些消息后,才认为本次投递完成。
配置,注意是simple模式的ack还是direct模式,或者两个都设置上
server: port: 9000spring: rabbitmq: username: admin password: admin virtual-host: / listener: simple: acknowledge-mode: manual direct: acknowledge-mode: manual # 集群配置,集群配置时使用 rabbitmq.addresses即可,不用配置rabbitmq.port rabbitmq.host了 addresses: 192.168.0.101:5672,192.168.0.101:5673,192.168.0.101:5673消费者接收数据
其中在调用basiAck或basicNack时必须要携带一个tag,它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel。而在接收者方法上使用 @Header(AmqpHeaders.DELIVERY_TAG) 可以直接获取到这个tag。
@Componentpublic class MQConsumer { @Autowired private DispatcherService dispatcherService; @RabbitListener(queues = "order.queue") public void messageConsumer(String orderMsg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception { try { System.out.println("消息:" + orderMsg); JSONObject order = JSONObject.parseObject(orderMsg); String orderId = order.getString("orderId"); // 派单处理 dispatcherService.dispatch(orderId); System.out.println(1 / 0); // 出现异常 // 手动确认 channel.basicAck(tag, false); } catch (Exception e) { // 如果出现异常的情况下 根据实际情况重发 // 重发一次后,丢失 // 参数1:消息的tag // 参数2:多条处理 // 参数3:重发 // false 不会重发,会把消息打入到死信队列 // true 重发,建议不使用try/catch 否则会死循环 // 手动拒绝消息 channel.basicNack(tag, false, false); } } }消息的接收者也可使用普通类实现 ChannelAwareMessageListener 接口,重写方法完成,这种是直接全局性接收的。没有最好的,只有最合适的,根据项目情况选择全局接收还是单个类接收自己监听的。
/** * 接收者 * **/@Componentpublic class Consumer implements ChannelAwareMessageListener{ @Override public void onMessage(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { if ("queue_name".equals(message.getMessageProperties().getConsumerQueue())) { System.out.println("消费的消息来自的队列名为:"+message.getMessageProperties().getConsumerQueue()); System.out.println("接收消息: " + new String(message.getBody(), "UTF-8")); System.out.println("执行queue_name中的消息的业务处理流程......"); } if ("fanout.A".equals(message.getMessageProperties().getConsumerQueue())) { System.out.println("消费的消息来自的队列名为:" + message.getMessageProperties().getConsumerQueue()); System.out.println("接收消息: " + new String(message.getBody(), "UTF-8")); System.out.println("执行fanout.A中的消息的业务处理流程......"); } // 手动提交ack,并且批量确认消息 channel.basicAck(deliveryTag, true); } catch (Exception e) { e.printStackTrace(); /** * 拒绝消息,参数说明: * long deliveryTag:唯一标识 ID。 * boolean requeue:如果 requeue 参数设置为 true, * 则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者; * 如果 requeue 参数设置为 false,则 RabbitMQ 立即会还把消息从队列中移除, * 而不会把它发送给新的消费者。 */ channel.basicReject(deliveryTag, true); } }} | 留言与评论(共有 0 条评论) “” |