0%

RabbitMQ进阶知识

1. 消息可靠性

可靠性是评估消息队列优劣的一个重要标准之一,在一些核心业务,尤其是一些涉及到 money 方面的一些业务中,可靠性至关重要!消息队列在传递消息的过程中要保证消息的不丢失。在消息的传递过程中,涉及到生产者、RabbitMQ和消费者,那么消息可能在哪些情况下丢失呢?主要有以下三个方面:

  • 消息在生产者到 RabbitMQ 的传递之间丢失。
  • RabbitMQ 宕机,导致消息丢失。
  • 消费者发生异常,导致消息丢失。
image-20220319100542408

相对应的解决方案如下:

  • 生产者丢失消息
    • 开启 RabbitMQ 事务(同步)
    • 开启 confirm 机制(异步,推荐)
  • RabbitMQ 丢失消息:开启 RabbitMQ 持久化
  • 消费者丢失消息:采用 ack 机制。

1.1 生产者丢失消息

在生产者将消息投递给消息队列时,可能出现以下问题:

  1. 外界环境问题导致:发生网络丢包、网络故障等造成消息丢失。
  2. 代码层面,配置层面,考虑不全导致消息丢失,比如发送给不存在的交换器、发送给路由不到的队列。

一般来说,可采用以下两个方案解决消息丢失问题:

事务机制

  • RabbitMQ 提供了事务功能,生产者发送数据之前开启 RabbitMQ 事务 channel.txSelect() ,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,中间出现了某些问题,那么生产者会收到异常报错,此时就可以回滚事务 channel.txRollback() ,重新发送消息;如果 RabbitMQ 收到了消息,那么可以提交事务 channel.txCommit()
  • 优点在于事务操作可以保证消息一定能够发送到RabbitMQ中,发送端不会出现消息丢失的情况;
  • 缺点在于事务机制是阻塞(同步)的,每次发送消息必须要等到mq回应之后才能继续发送消息,生产者生产消息的吞吐量和性能都会大大降低。

confirm 机制(推荐)

  • 由于事务操作会大大降低生产者的性能,RabbitMQ 提供了一种confirm机制来避免消息发送给 MQ 的过程中出现丢失情况,并且这种机制是『异步的』,在发送完一个消息后可以继续发送下一个消息,MQ 接收到消息后会异步回调接口告知消息接收结果,克服了事务『同步』所固有的缺点。

  • 具体而言,生产者开启 confirm 机制后,为每个消息指定一个唯一的id,如果消息成功发送到了 MQ 中,那么 MQ 就会返回 一个 ack 消息,表示消息接收成功,反之会返回一个 nack 消息,表示消息接收失败,可以进行重试。依据这个机制,我们可以维护每个消息 id 的状态,如果超过一定时间还是没有接收到 MQ 的回调,那么就重发消息。

  • 在 SpringBoot 配置文件中,配置以下内容

    1
    2
    3
    4
    5
    6
    spring:
    rabbitmq:
    publisher-confirm-type: correlated # 开启消息到达exchange的回调,发送成功失败都会触发回调
    publisher-returns: true # 开启消息从exhcange路由到queue的回调,只有路由失败时才会触发回调
    template:
    mandatory: true # 如果exchange根据routingKey将消息路由到queue时找不到匹配的queue,触发return回调,为false时,exchange直接丢弃消息。
    • publisher-confirm-type 可以设置为三种类型:simple、correlated,、none;

      • NONE 值是禁用发布确认模式,默认值。
      • CORRELATED 值是发布消息成功/失败到交换器后会触发回调方法 ConfirmCallback
      • SIMPLE 值是同步等待 confirm 结果,直到超时。

      我们可以在 RabbitTemplate 配置 ConfirmCallBack 回调函数,用于接收 MQ 返回的回调信息。可以结合这个机制自己在内存里维护每个消息 id 的状态,如果超过一定时间还没接收到这个消息的回调,那么可以重发消息。

    • publish-returns:开启消息从交换机到队列的回调,只有路由失败时才会触发回调。我们可以在 RabbitTemplate 配置 ReturnCallback 回调函数。

    • template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息

    • 总结:

      • 消息正确到达交换机,触发ConfirmCallback 回调,返回 ack,消息没有正确到达交换机,触发ConfirmReturnCallback 回调,返回 nack,并带有异常信息(比如交换机不存在);
      • 当消息正确发送交换机后,如果消息正确的从交换机路由到队列,不触发 ReturnCallback 回调,而消息没有正确的从交换机路由到队列,在设置 mandory=true 的情况下,触发 ReturnCallback 回调(比如队列不存在);
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    @Component
    @Slf4j
    public class Producer {
    @Value("${exchange-name}")
    private String exchangeName;

    @Resource
    private RabbitTemplate rabbitTemplate;

    public void send(){
    // 1. 配置ConfirmCallBack 回调函数
    rabbitTemplate.setConfirmCallback((correlationData, ack, cause)->{
    if(ack){
    log.info("消息{}接收成功",correlationData.getId());
    }else{
    log.error("消息{}接收失败,原因{}",correlationData.getId(),cause);
    }
    });
    // 2.配置ReturnCallback 回调函数
    rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey)->{
    log.error("消息{}发送失败,应答码{},原因{},交换机{},路由键{}",message.toString(),replyCode,replyText,exchange,routingKey);
    });

    // 3.发送消息,注意每个消息要指定一个唯一的id
    for (int i = 0; i < 10; i++) {
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    rabbitTemplate.convertAndSend(exchangeName,"","消息==>"+i,correlationData);
    }
    }
    }

注意:事务机制和确认机制二者不能共存!

1.2 RabbitMQ 丢失消息

RabbitMQ 的消息默认存放在内存上面,如果不特别声明设置,消息不会持久化保存到硬盘上面的。如果消息已经由生产者传递到了 RabbitMQ 的队列中,而消费者还没来得及消费时,RabbitMQ 意外宕机,此时消息就会丢失。

解决方法就是开启 RabbitMQ 持久化功能。消息到达队列后,将其持久化到磁盘中,那么即使 RabbitMQ 意外宕机了,也能在重启后自动从磁盘中读取数据,恢复原始消息。

持久化分为以下三个步骤:

  1. 交换机设置持久化。

    SpringAMQP 中可以通过以下代码指定交换机持久化

    1
    2
    3
    4
    5
    @Bean
    public DirectExchange simpleExchange(){
    // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
    return new DirectExchange("simple.direct", true, false);
    }

    事实上,默认情况下,由 SpringAMQP 声明的交换机都是持久化的,不用特意指定。

  2. 队列设置持久化。

    SpringAMQP 中可以通过代码指定交换机持久化:

    1
    2
    3
    4
    5
    @Bean
    public Queue simpleQueue(){
    // 使用QueueBuilder构建队列,durable就是持久化的
    return QueueBuilder.durable("simple.queue").build();
    }

    事实上,默认情况下,由 SpringAMQP 声明的队列都是持久化的,不用特意指定。

  3. 消息设置持久化。

    利用 SpringAMQP 发送消息时,可以设置消息的属性(MessageProperties),指定 delivery-mode:

    • 1 代表非持久化(NON_PERSISTENT)
    • 2 代表持久化(PERSISTENT)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    @Test
    public void testDurableMessage() {
    // 1.准备消息
    Message message = MessageBuilder.withBody("hello, spring".getBytes(StandardCharsets.UTF_8))
    .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
    .build();
    // 2.发送消息
    rabbitTemplate.convertAndSend("simple.queue", message);
    }

    事实上,默认情况下,由 SpringAMQP 发出的消息都是持久化的,不用特意指定。

注意:即使 RabbitMQ 开启了持久化机制,也存在丢失数据的一种可能,即消息写到了 RabbitMQ 中,但是还没来得及持久化到磁盘上,此时 RabbitMQ 挂了,就会导致内存里的一点点数据丢失。解决方案是持久化可以跟生产者那边的 confirm 机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者 ack 了,所以哪怕是在持久化到磁盘之前,RabbitMQ 挂了,数据丢了,生产者收不到 ack ,那么生产者也可以重发消息。

1.3 消费者丢失消息

如果消费者收到消息后没来得及处理或者在处理过程中出现了异常(比如重启或断电等),此时消费者进程挂掉了,而 RabbitMQ 不知道消费者挂掉了,以为消息已经成功被消费了,就会从队列中删除消息,从而导致消息丢失。

解决方法是采用 RabbitMQ 提供的 ack 确认机制。当消费者获取消息后,需要向 RabbitMQ 发送 ack 回执,表明自己已经处理消息。没有收到 ACK 的消息,消费者断开连接后,RabbitMQ 会把这条消息发送给其他消费者。如果没有其他消费者,消费者重启后会重新消费这条消息,重复执行业务逻辑。其中 ack 在 AMQP 中有三种确认模式:

1
2
3
4
5
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto # none,关闭ack;manual,手动ack;auto:自动ack
  • manual:手动 ack,需要在业务代码结束后,调用 api (basicAck/basicNack )发送 ack/nack

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    @Component
    @Slf4j
    public class MyConsumer {

    @RabbitListener(queues = {"${queue-name}"}")
    public void msgConsumer(String msg, Channel channel, Message message) throws IOException {
    try {
    int temp = 10/0;
    log.info("消息{}消费成功",msg);
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    } catch (Exception e) {
    log.error("接收消息过程中出现异常,执行nack");
    // 第三个参数为true表示异常消息重新返回队列,会导致一直在刷新消息,且返回的消息处于队列头部,影响后续消息的处理
    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
    }
    }
    }
  • auto:自动 ack(默认),由 spring 监测 listener 代码是否出现异常,没有异常则返回 ack,反之返回 nack。

    注意:如果消费者执行异常的话,就相当于执行了 nack 方法,消息会requeue(重新入队)到队列头部,然后再次发送给消费者,但是可能消费者继续出现异常,周而复始,消息会被无限期的执行,从而导致后续的消息无法消费。发生这种原因所在便是因为 RabbitMQ的消息失败重试机制。但是在很多情况下,我们并不想无限重试,而是重试到一定阈值后,就认为此消息无法被正确处理,就放弃处理或专用人工处理等。

    为了解决这一问题,我们可以在配置文件中对 RabbitMQ 的消息重试(retry)进行重新配置。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    spring:
    rabbitmq:
    listener:
    simple:
    prefetch: 1
    acknowledge-mode: auto # 消费者自动ack消息确认
    retry:
    enabled: true # 开启消费者失败重试
    initial-interval: 1000 # 初始失败重试间隔
    multiplier: 3 # 失败的等待时长配置,下次重试间隔= multiplier * initial-interval
    max-attempts: 4 # 最大重试次数
    stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

    在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:

    • RejectAndDontRequeueRecoverer:(默认)重试耗尽后,直接 reject,丢弃消息。
    • ImmediateRequeueMessageRecoverer:重试耗尽后,返回 nack,消息重新入队
    • RepublishMessageRecoverer:(推荐)重试耗尽后,将失败消息投递到指定的交换机

    可以看出前两种实现都不太好,推荐第三种方式,当重试几次后,仍然得不到好的处理,就将无法被正确处理的消息投递到指定的交换机中,然后在存储到专门用于存储异常消息的队列中,后续可采取人工方式进行集中处理!

    image-20220319155905258
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    @Configuration
    public class ErrorMessageConfig {

    // 处理异常消息的交换机
    @Bean
    public DirectExchange errorMessageExchange(){
    return new DirectExchange("error.direct");
    }

    // 处理异常消息的队列
    @Bean
    public Queue errorQueue(){
    return new Queue("error.queue");
    }

    // 绑定异常交换机和队列
    @Bean
    public Binding errorMessageBinding(){
    return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
    }

    // 配置失败消息投递到指定的交换机和Routing key
    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
    return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
    }
  • none:关闭 ack,MQ 在消息投递后会立即删除消息。

2. 消息有效期

默认情况下,RabbitMQ中的消息是不会过期的,即使消息没被消费掉,也会一直存储在队列中。

2.1 TTL

如果我们想给消息指定一个有效时间,那么通过指定TTL(Time-To-Live,即存活时间)来实现,当消息在队列中的存活时间超过指定的 TTL 时(注意是从入队时开始算),这个消息就会被清除。

RabbitMQ 支持以下两种方式设置 TTL:

  • 在声明队列时,指定此队列中消息的有效期,那么所有进入该队列的消息都会有一个相同的有效期。

    当消息过期了就会被立马删除,因为消息进入 RabbitMQ 后是存在一个消息队列中,队列的头部是最早要过期的消息,所以 RabbitMQ 只需要一个定时任务,从头部开始扫描是否有过期消息,有的话就直接删除。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    // 指定队列中消息的有效期,下面两种方式均可以
    @Bean
    public Queue ttlQueue(){
    return QueueBuilder
    .durable("ttl.queue")
    .ttl(10000) // 指定队列中消息的过期时间为10s
    .build();
    }

    @Bean
    Queue queue() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-message-ttl", 10000);
    return new Queue("ttl.queue", true, false, false, args);
    }
  • 在发送消息时,指定此消息的有效期,那么不同的消息就具有不同的有效期。

    当消息过期后并不会立马被删除,而是当消息要投递给消费者的时候才会去删除,因为每条消息的过期时间都不一样,想要知道哪条消息过期,必须要遍历队列中的所有消息才能实现,当消息比较多时这样就比较耗费性能,因此针对这种情况,消息要投递给消费者的时候才去删除。

    1
    2
    3
    4
    // 指定消息的有效期
    Message message = MessageBuilder.withBody("hello TTL message".getBytes())
    .setExpiration("10000") // 设置消息的过期时间为10s
    .build();

如果两种方式 TTL 同时指定,会以时间短的为准。

2.2 死信交换机

当一个消息过期后,它实际上会成为一个死信(dead-lettered),不仅如此,以下几种情况均会使得消息成为死信:

  • 消费者拒收消息(basic.reject/basic.nack)并且没有重新入队 requeue=false。

  • 消息 TTL 过期

  • 队列达到最大长度,已经堆积满了,最早的消息被丢弃

当一个消息成为死信时:

  1. 如果当前队列配置了dead-letter-exchange 属性,指定了一个死信交换机(Dead-Letter-Exchange,DLX),那么它就会被投递到这个交换机中,之后发送到绑定死信交换机的死信队列中。

    可以利用死信交换机收集所有消费者处理失败的消息(死信),交由人工处理,进一步提高消息队列的可靠性(注意和消费者消息重试的 RepublishMessageRecoverer 区分)。

    image-20220319171652756
  2. 如果当前队列没有指定交换机,这个消息就会被丢弃。

实际上,DLX 本身就是一个普通交换机,我们可以为任意队列指定 DLX,当该队列中存在死信时,RabbitMQ 就会自动的将这个死信投递到 DLX 上去,进而被路由到死信队列中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// 配置死信交换机和死信队列
@Configuration
public class DeadLetterConfig {
// 正常交换机、队列名称
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static final String NORMAL_QUEUE = "normal_queue";
// 死信交换机、队列名称
public static final String DL_EXCHANGE = "dl_exchange";
public static final String DL_QUEUE = "dl_queue";

// 声明死信的交换机、队列
@Bean("dlExchange")
public Exchange dlExchange(){
return ExchangeBuilder.topicExchange(DL_EXCHANGE).durable(true).build();
}
@Bean("dlQueue")
public Queue dlQueue(){
return QueueBuilder.durable(DL_QUEUE).build();
}

// 声明正常的交换机、队列
@Bean("normalExchange")
public Exchange normalExchange(){
return ExchangeBuilder.topicExchange(NORMAL_EXCHANGE).durable(true).build();
}
@Bean("normalQueue")
public Queue normalQueue(){
Map<String,Object> params = new HashMap<>();
// 正常队列绑定死信交换机
params.put("x-dead-letter-exchange",DL_EXCHANGE); // x-dead-letter-exchange:死信交换机的名称
params.put("x-dead-letter-routing-key","dl.message"); // x-dead-letter-routing-key:死信交换机的路由键,注意这个key要能够使得死信交换机和死信队列之间绑定的key生效
params.put("x-message-ttl",10000); // 设置过期时间
params.put("x-max-length",10); // 设置队列的最大长度
return QueueBuilder.durable(NORMAL_QUEUE).withArguments(params).build();

/*
// 下面方式也可以
return QueueBuilder.durable(NORMAL_QUEUE) // 指定队列名称,并持久化
.ttl(10000) // 设置队列的超时时间,10秒
.deadLetterExchange(DL_EXCHANGE) // 指定死信交换机
.build();
*/

}

// 正常队列和正常交换机绑定
@Bean
public Binding normalQueueExchange(@Qualifier("normalQueue") Queue queue, @Qualifier("normalExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("normal.#").noargs();
}

// 死信队列和死信交换机绑定
@Bean
public Binding dlQueueExchange(@Qualifier("dlQueue") Queue queue, @Qualifier("dlExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("dl.#").noargs();
}
}

3. 延迟队列

延迟队列是⼀种带有延迟功能的消息队列, 生产者将消息发送到消息队列服务端,但并不期望这条消息立马投递,而是推迟到在当前时间点之后的某⼀个时间投递到消费者进行消费,该消息即定时消息。

我们经常在业务中需要用到延迟功能,比如:

  • 比如在电商交易中,当用户下单后,需要在 30 分钟之内付款,否则订单将被自动取消。
  • 在会议预定成功后,会在会议开始前几分钟通知所有预定该会议的用户。
  • 用户下单外卖以后,距离超时时间还有 10 分钟时提醒外卖小哥即将超时。

RabbitMQ默认没有延迟队列,但可以通过以下两种方式实现延迟队列:

3.1 死信队列 + TTL

我们在前面已经介绍过死信队列和 TTL 的概念与用法,通过这两个东西我们可以巧妙的实现延迟队列功能。基本思想是:

  • 给一个设置有 TTL 的队列 Q1 指定一个死信交换机 DLX。
  • 让一个死信队列 Q2 绑定此死信交换机 DLX。
  • 对于Q1,我们不添加任何消费者对其进行消费,那么一旦生产者发送消息投递到 Q1 中后,由于没有消费者进行消费,那么消息一定会超时,从而会被投递到 DLX,从而被投递到 Q2中,我们设置消费者用于对 Q2 进行消费,此时就实现了生产者与消费者之间消息的延迟接收。
image-20220319175550818

3.2 官方插件

因为延迟队列的需求非常多,所以 RabbitMQ 的官方也推出了一个 DelayExchange 插件,用于支持延迟队列效果。通过安装插件,自定义交换机,让交换机拥有延迟发送消息的能力,从而实现延迟消息。

与死信队列 + TTL 实现延迟队列的方式相比:

  • 延迟插件方式只需创建一个交换机和一个队列,而死信队列方式需要创建两个交换机(死信队列交换机+处理队列交换机)、两个队列(死信队列+处理队列)。
  • 死信队列的方式不需要格外安装任何内容,而延迟插件需要额外下载插件并安装。
  • 延迟交换机主要是变更了消息存储的维度到交换机,但是假如消息正在交换机中存储,但是还未路由到队里中,一旦服务宕机,延迟交换机中存储的消息直接就丢失了。只适用于对于数据少量丢失容忍性比较强的业务场景。

延迟插件开源地址为:https://github.com/rabbitmq/rabbitmq-delaed-message-exchange

选择合适的版本,我的 RabbitMQ 版本为3.8.15,那么插件可以选择3.8.9版本,然后下载 .ez文件

image-20220319214015332

将.ez文件传到服务器,并复制到 docker 容器内部

1
2
sudo docker cp rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez rabbitmq_study:/plugins
# 这里的rabbitmq_study是我的rabbitmq容器的名称(按自己的名称来),也可以用容器id代替

进入 rabbitmq 容器内部,并启动插件功能

1
2
sudo docker exec -it rabbitmq_study /bin/bash
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

此时我们在 RabbitMQ 的管理页面创建交换机时,就会发现多了一个 x-delayed-message 类型。

image-20220319214751218

从名称上就可以看出 rabbitmq-delaed-message-exchange实际上是针对交换机做的延迟,而不是队列,其原理。当我们发送消息到类型为 x-delayed-message 的交换机时,此交换机会有以下处理步骤:

  1. 接收消息
  2. 判断消息 header 中是否具备 x-delay 属性,如果有 x-delay 属性,说明是延迟消息,则将持久化到硬盘,并读取 x-delay 值,作为延迟时间。如果没有就按正常消息处理。
  3. 当 x-delay 时间到期后,再将消息投递到指定队列,实现消息的延迟发送。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 创建延迟交换机
// 方式1:注解的方式,将delayed属性设置为true即可
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue", durable = "true"),
exchange = @Exchange(name = "delay.direct", delayed = "true"),
key = "delay"
))
public void listenDelayExchange(String msg) {
log.info("消费者接收到了delay.queue的延迟消息");
}

// 方式2:Bean的方式设置 delayed属性为true即可
@Bean
public Exchange delayedExchange(){
DirectExchange directExchange = new DirectExchange(RDELAYED_EXCHANGE, true, false);
directExchange.setDelayed(true);
return directExchange;
}
1
2
// 生产者发送消息,注意需要在请求头中设置消息的延迟时间
Message msg = MessageBuilder.withBody(("hello Delay Exchange").getBytes("UTF-8")).setHeader("x-delay", 10000).build();

4. 消息堆积

消息堆积是指当生产者发送消息的速度大于消费者处理消息的速度,不可避免的会导致队列中消息越来越多,不断堆积,直到队列存储到达上限。

针对于消息堆积,一般的解决方案为:

  1. 排查消费者内部逻辑,分析出性能瓶颈(比如消费者可能出现异常宕机了),并且可以采取多线程形式提升消息处理速度。
  2. 增加多个消费者,从而增加处理速度。
  3. 扩大队列的容量,提升存储上限。
  4. 惰性队列。

惰性队列与普通队列的区别:

  • 普通队列中的消息会尽可能的存储在内存之中,这样可以更加快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留一份备份。当 RabbitMQ 需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的时间,也会阻塞队列的操作,进而无法接收新的消息。
  • 惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中。它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。
    • lazy queue 消息不持久化 , 但是这种模式还是会把消息放到硬盘里,RAM的使用率会一直很稳定,但是重启后一样会丢失消息。
    • lazy queue 消息持久化,这种方式无疑是最佳搭配,消息放到硬盘并且不会因为服务器重启而丢失,面对高并发也是从容不已。
  • 惰性队列减少了内存的消耗,但会增加 I/O 的使用,相当于以空间换时间,当然惰性队列的发送速度不一定比普通队列慢,尤其在高并发场景下可能比普通队列还快,原因是普通队列会由于内存不足而不得不将消息换页至磁盘。

在队列声明的时候可以通过x-queue-mode参数来设置队列的模式,取值为『default』和『lazy』,其中 lazy 就是定义惰性队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// Springboot 声明队列为LazyQueue

// 方式1
@Bean
public Queue lazyQueue() {
return QueueBuilder.durable("lazy.queue")
.lazy() // 惰性队列
.build();
}

// 方式2
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "lazy.queue", durable = "true",arguments = @Argument(name = "x-queue-mode",value = "lazy")),
exchange = @Exchange(name = "lazy.direct"),
key = "lazy"
))
public void listenLazyQueue(String msg) {
log.info("消费者接收到了lazy.queue的延迟消息");
}

5. 消费端限流

在业务高峰时期,可能会出现生产者发送消息的速度远超消费者处理消息的速度的情况,比如双十一时,大量的订单涌入,此时消息队列就会囤积大量的消息,而如果此时消费者一次取出大量的消息, 但是又无法同时处理这么多消息, 就可能导致服务崩溃, 所以需要对消费端进行限流。

RabbitMQ 提供了一种 qos (服务质量保证)功能,在手动确认消息的前提下,如果指定 Qos 数目的消息没有被确认前,不会消费新的消息。

可以在 springboot 配置类中配置 prefetch=n,此值表示消费端每次从 MQ 拉 n 条消息消费,直到手动确认消费完毕后,才会继续拉去下一条消息。比如将 perfetch 设置为1000,那么一次拉 1000 条消息,等待手动确认后,再拉取 1000 条消息,就可以实现消息的削峰限流。

1
2
3
4
5
6
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual # 手动确认消息
prefetch: 1000 # 一次最多取消息的个数

6. 幂等性(消息重复消费)

消息的幂等性是指对于同一个系统,在同样条件下,一次请求和重复多次请求对资源的影响是一致的,不会因为多次点击而产生了副作用。

生活中常见要求幂等性的场景如:

  1. 用户支付时,一笔订单应当只能扣一次钱,无论是网络问题或者其他问题而重新付款,都只应该扣一次钱;
  2. 用户在APP上连续点击了多次提交订单,后台应该只产生一个订单;
  3. 用户对一篇文章进行点赞,无论点了多少次,最终实际上也能将点赞数+1;
  4. 前端重复提交选中的数据,后台也只会产生对应这个数据的一个反应结果。

增删改查的幂等性:

  • Get 用于获取资源,不会对系统资源进行改变,因此是幂等的。
  • Delete 用于删除资源,虽然改变了系统资源,但是第一次和第N次删除操作对系统的作用是相同的,所以是幂等的。比如要删除一个 id 为 1 的资源,可能第一次调用时会删除,而后面所有调用的时候由于系统中已经没有这个 id 的资源了,就无法进行删除了。但是第一次操作和后面的操作对系统的作用是相同的,所以这也是幂等的,调用者可以多次调用这个接口不必担心错误。
  • Post 用于新增资源,这意味着每次调用都会在系统中产生新的资源,所以该操作注定不是幂等操作。
  • Put 用于修改资源,如果是设置具体值,则是幂等的,比如设置用户张三的分数为98,那么无论修改一次还是多次,最终结果都一样。但如果是增量修改,则可能不是幂等的,比如将用户张三的分数减10,那么多次修改和一次修改的效果是完全不同的。

RabbitMQ 的幂等性问题

  • 消息重复投递。生产者发送消息到 broker后,broker在confirm 确认的时候出现网络故障,使得生产者没收到该消息ACk 而重新发送消息到 broker 中。

  • 消息重复消费消费者处理完一条消息后,在向 MQ 发送 ack 确认时出现了网络故障,使得 MQ 没有收到此 ack 确认, 那么 MQ 并不会将该条消息删除,而是重新发送给其他的消费者或者当重新建立起连接后,再次发送给该消费者,这就造成了消息的重复消费。

  • 解决了消息重复消费的问题其实就解决了消息重复投递的问题。

解决方案:

  • ==全局唯一ID + 唯一索引==

    通过时间戳或者UUID或按自己的规则生成一个全局唯一id,每次消费消息时用该id先判断该消息是否已消费过。比如基于数据库的唯一键来保证重复数据不会重复插入多条。因为有唯一键约束了,重复数据插入只会报错,不会导致数据库中出现脏数据。不过在高并发时,如果是单个数据库就会有写入性能瓶颈(可采用分库分表提升性能)。

  • ==Redis原子性==

    利用 redis 执行 setnx 命令,天然具有幂等性。从而实现不重复消费,如果 ack 失败,在 RabbitMQ 将消息交给其他的消费者时,先执行 setnx,如果 key 已经存在(说明之前有人消费过该消息),获取他的值,如果是 0,当前消费者就什么都不做,如果是 1,直接 ack。

7. 集群(高可用)

使用了 MQ 后,整个系统会依赖于MQ的可靠性、安全性、吞吐能力,比如若MQ宕机,整个系统都会崩溃。因而会降低系统的可用性,那么如何改善这一缺点呢?答案是搭建集群环境。

RabbitMQ的集群有两种模式:

  • 普通集群【无高可用】:是一种分布式集群,将队列分散到集群的各个节点,从而提高整个集群的并发能力。
  • 镜像集群【高可用】:是一种主从集群,普通集群的基础上,添加了主从备份功能,提高集群的数据可用性。

7.1 普通集群

普通集群模式是使用多台服务器,在每台服务器中启动一个 RabbitMQ 实例,我们创建的每一个队列 Queue,它的元数据(主要就是 Queue 的一些配置信息)会在所有的 RabbitMQ 实例中进行同步,但是队列中的消息只会存在于一个 RabbitMQ 实例上,而不会同步。

当消费者拉取消息时,如果连接到了另外一个实例,那么此实例实际会通过元数据定位到 队列所在的位置,之后访问队列所在的实例,并拉取数据发送给消费者。

可见,这种集群可以提高 RabbitMQ 的消息吞吐能力,但是无法保证高可用,因为一旦一个 RabbitMQ 实例挂了,其中的消息就访问不到了,其他实例中并不会保存挂掉的实例中的消息。

  • 如果 MQ 做了持久化,那么实例恢复后,才可以继续访问;
  • 如果 MQ 没做持久化,那么消息就丢了。
image-20220320204329857

7.2 镜像集群

由于普通集群并没有实现 RabbitMQ 的高可用,实际生产条件下一般不会使用,而是会使用镜像集群。

相较于普通集群,镜像集群会将队列的所有数据(包括实际消息)同时存储在多台机器上,而不是仅仅存储队列的元数据。每个 RabbitMQ 实例都有一份镜像数据(副本数据)。在每次写入消息的时候都会自动把数据同步到多台实例上去,这样一旦其中一台机器发生故障,其他机器还有一份副本数据可以继续提供服务,也就实现了高可用。

本质上是主从结构,创建队列的节点被称为该队列的主节点,备份到的其它节点叫做该队列的镜像节点。一旦某个 RabbitMQ 示例挂机了,那么其镜像节点就会成为新的主节点。

image-20220320205249660

镜像集群模式并不需要额外搭建,只需要我们在普通集群的基础上将队列配置为镜像队列即可。

-------本 文 结 束 感 谢 您 的 阅 读-------