0%

RabbitMQ基础知识

1. 简介

1.1. 概述

消息队列简称 MQ,英文全称为 Message Queue,它是一种跨进程的通信机制,用于上下游传递消息。采取典型的生产者和消费者模型,生产者不断的向队列中发送消息,消费者不断的从队列中获取消息。

MQ 的优点:

  • 任务异步处理

    在项目中,可将一些无需即时返回且耗时的操作提取出来,进行异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。此时MQ 就类似于一个中转站,使得消息的发送和接收可以异步进行,而生产者无需同步等待消费者及时返回。

    比如在商品购物业务中,用户支付完之后 ,支付业务直接将消息投递到 MQ 中,就直接可以返回用户结果,而无需再同步等待订单服务、短信服务等模块的响应,实现了任务的异步处理,从而实现了异步通信。

    image-20220320164432393
  • 应用程序解耦合

    MQ 相当于一个中介,生产者只需要将消息投递到 MQ中,而无需直接调用消费者模块的相关接口,消费者只需要监听 MQ 即可,从而实现应用程序的解耦合。并且如果之后需要增加一些业务需求,无需改变生产者代码,只需让新的业务监听 MQ 消息即可。

    比如在商品购物业务中,需要增加积分服务,那么只需要写完积分服务代码后,让其监听 MQ 即可,不需要再修改支付服务的代码,从而实现了解耦。而如果积分服务挂了,但并不会影响支付服务,从而实现了故障隔离(如果是同步调用,那么一个挂了,则相当于所有都挂了,消息队列机制则不会。)

    image-20220320164619922
  • 削峰填谷

    如果一时间有大量的消息涌入,使用 MQ 可以应对生产者的流量冲击(类似于流量控制),消费者只需要按照自己的处理能力对消息进行处理即可。

    比如在双十一秒杀时,有大量的订单消息,如果不对流量进行控制,那么订单服务可能会由于无法支撑那么大的并发量而挂掉。假设订单服务每秒只能处理10000次订单,而实际每秒有20000次订单,此时可以通过消息队列做一个缓冲作用,让订单服务分2秒处理掉这20000次订单。

    image-20220320165734757

MQ 的缺点:

  • 系统可用性降低:系统依赖于MQ的可靠性、安全性、吞吐能力。比如若MQ宕机,整个系统都会崩溃。
  • 数据链路复杂性增加:本来生产者只需要直接将消息发送给消费者,而此时加入一个 MQ做中转,可能出现消息丢失消息的转发顺序改变消息重复调用等等问题。
  • 数据一致性问题:生产者发送消息需要MQ和消费者共同处理,如果MQ处理成功,而消费者处理失败,则可能会造成数据一致性问题。或者是多个消费者要处理修改同一条消息时,也可能会造成一致性问题。

1.2. AMQP 和 JMS

MQ 是消息通信的模型;实现 MQ 的大致有两种主流方式:AMQP、JMS。

1.2.1. AMQP

AMQP 是一种协议,更准确的说是一种 binary wire-level protocol(链接协议)。这是其和 JMS 的本质差别,AMQP不从 API 层进行限定,而是直接定义网络交换的数据格式。

1.2.2. JMS

JMS 即 Java 消息服务(JavaMessage Service)应用程序接口,是一个 Java 平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。

1.2.3. AMQP 与 JMS 区别

  • JMS 是定义了统一的接口,来对消息操作进行统一;AMQP 是通过规定协议来统一数据交互的格式
  • JMS 限定了必须使用Java语言;AMQP 只是协议,不规定实现方式,因此是跨语言的。
  • JMS 规定了两种消息模式;而 AMQP 的消息模式更加丰富

1.3. 消息队列产品

市场上常见的消息队列有如下:

名称 开发语言 支持协议 可用性 单机吞吐量 消息延迟 可靠性
ActiveMQ Java AMQP… 高(主从架构) 万级 毫秒级 一般
RabbitMQ Erlang(并发强) AMQP 高(主从架构) 万级 微秒级
RocketMQ Java 自定义协议 非常高(分布式架构) 十万级 毫秒级
Kafka Scala 自定义协议 非常高(分布式架构) 十万级 毫米级以内 一般
  • RabbitMQ的可靠性高,消息延迟最低,社区活跃。
  • Kafka的吞吐量极高,但可能会丢失数据,社区活跃。
  • 如果是对可靠性要求较高的服务(如订单服务),可考虑使用 RabbitMQ/RocketMQ,如果对可靠性要求不太高、追求高吞吐量的服务(如日志服务),可考虑使用 Kafka。

1.4. RabbitMQ

RabbitMQ 是由 erlang 语言开发,基于 AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列。官方地址:http://www.rabbitmq.com/

RabbitMQ 基本结构:

image-20220320171238323
  • publisher:生产者,负责产生消息
  • exchange:交换机,负责路由消息到队列中
  • queue:队列,负责缓存消息
  • virtual host:虚拟主机,是对queue、exchange等资源的逻辑分组
  • consumer:消费者,负责处理消息。

RabbitMQ 提供了 6 种模式:简单模式,work 模式,Publish/Subscribe 发布与订阅模式,Routing 路由模式,Topics主题模式,RPC 远程调用模式(远程调用,不太算MQ);官网对应模式介绍:https://www.rabbitmq.com/getstarted.html

1555988678324

2. 安装

官方安装教程:https://www.rabbitmq.com/download.html

2.1 CentOS7 安装

2.1.1 安装依赖环境

1
yum install -y make gcc gcc-c++ m4 openssl openssl-devel ncurses-devel unixODBC unixODBC-devel java java-devel

2.1.2 安装 Erlang

注意 Erlang 和 RabbitMQ 需要版本对应,详情见https://www.rabbitmq.com/which-erlang.html

下载 Erlang,网址为 http://erlang.org/download/ ,比如下载 otp_src_23.3.tar.gz,然后通过 ftp 工具传输到 linux 服务器中,也可以使用 wget 命令在线下载(此文件下载非常慢)

1
2
3
4
5
6
7
8
9
wget http://erlang.org/download/otp_src_23.3.tar.gz # wget在线下载erlang
tar -zxvf otp_src_23.3.tar.gz # 解压
cd otp_src_23.3 # 进入解压文件夹目录
./configure --prefix=/usr/local/erlang # 设置erlang的安装目录
make && make install # 编译安装erlang
ll /usr/local/erlang/bin # 检查erlang是否安装
echo 'export PATH=$PATH:/usr/local/erlang/bin' >> /etc/profile # 将erlang添加到环境变量中
source /etc/profile # 刷新环境变量,使配置生效
erl # 输入erl,若出现下面信息,则erlang安装和配置成功。注意:使用halt().退出

image-20210511160105518

2.1.3 安装 RabbitMQ

下载 RabbitMQ,网址为https://www.rabbitmq.com/install-generic-unix.html#downloads,下载后通过 ftp 工具传输到 linux 服务器中,也可以使用 wget 命令在线下载

1
2
3
4
5
6
7
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.16/rabbitmq-server-generic-unix-3.8.16.tar.xz  # 下载RabbitMQ
tar -xvf rabbitmq-server-generic-unix-3.8.16.tar.xz # 解压
mv rabbitmq_server-3.8.16/ /usr/local/ # 将文件移动到/usr/local/目录下,其实可以直接解压到该目录。
echo 'export PATH=$PATH:/usr/local/rabbitmq_server-3.8.16/sbin' >> /etc/profile # 配置环境变量
source /etc/profile # 刷新环境变量,使配置生效
rabbitmq-plugins enable rabbitmq_management # 开启web管理插件,即开启管理界面
rabbitmq-server -detached # 启动mq

此时如果在浏览器输入http://ip:15672/就会看到web界面的登录界面,默认用户名密码均为guest,但如果此时我们登录,会遇到 User can only log in via localhost 这个问题。

image-20210511163839251

主要原因是 RabbitMQ 从 3.3.0 开始禁止使用 guest/guest 权限通过除 localhost 外的访问,所以我们需要进行一定配置,以使得其他主机也能访问此 RabbitMQ(注意开放端口15672,或者关闭防火墙 systemctl stop firewalld.service)

  • 在3.8版本以前,可以采取下面方法,然后就可以使用账号guest,密码guest进行登录了

    1
    2
    3
    4
    vim rabbitmq_server-3.7.8/ebin/rabbit.app 

    将:{loopback_users, [<<”guest”>>]},
    改为:{loopback_users, [guest]},
  • 在3.8版本以后,就不能使用上述方法了,因为都没有 rabbit.app 文件了,这个时候可以通过新添加一个用户账号来解决。

    1
    2
    3
    4
    5
    rabbitmqctl list_users # 查看所有用户
    rabbitmqctl add_user zhang 123 # 新增加一个用户zhang,密码为123
    rabbitmqctl set_user_tags zhang administrator # 设置用户zhang的标签为管理员
    rabbitmqctl set_permissions -p "/" zhang ".*" ".*" ".*" # 配置用户zhang的权限,指定允许访问的vhost以及write/read,该命令使用户user_admin具有/vhost1这个virtual host中所有资源的配置、写、读权限以便管理其中的资源
    rabbitmqctl list_user_permissions zhang # 查看用户zhang的权限

    当配置好新用户后,就可以使用新用户的账号和密码进行管理界面的登录了,不需要重启服务

image-20210511163536006

RabbitMQ 的其他命令

1
2
rabbitmq-server -detached # 启动服务,rabbitmq-server start也可以启动,但并木有后台运行
rabbitmqctl stop # 停止服务

2.2 Docker 安装

  1. 拉取 RabbitMQ 镜像(其中标签带 management 的,表示启动 Rabbitmq 后可以打开 web 管理界面,否则不行)

    1
    sudo docker pull rabbitmq:3.8.15-management
  2. 查看镜像

    1
    sudo docker images
  3. 创建挂载(映射)目录(文件夹路径可自己选,后面对应上就行)

    1
    mkdir -p /home/zhang/rabbitmq
  4. 创建容器 rabbitmq

    1
    2
    3
    4
    5
    6
    7
    8
    sudo docker run -d --name rabbitmq  --hostname mq1 -p 5672:5672 -p 15672:15672 -v /home/zhang/rabbitmq:/var/lib/rabbitmq -h myRabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3.8.15-management
    # -p 5672:5672 -p 15672:15672:端口映射,(5672:应用访问端口;15672:控制台Web端口号)
    # --name:容器名
    # --hostname:设置主机名,注意用于集群部署时使用
    # -v:映射目录,前表示主机部分,:后表示容器部分。
    # -h 主机名(RabbitMQ的一个重要注意事项是它根据所谓的 “节点名称” 存储数据,默认为主机名)
    # -d:容器后台运行
    # -e 指定环境变量:RABBITMQ_DEFAULT_USER:默认的用户名;RABBITMQ_DEFAULT_PASS:默认用户名的密码)
  5. 浏览器输入http://ip:15672即可访问 Rabbitmq 的 web 管理页面,用户名密码是-e参数所指定的用户名和密码。

3. Java 操作 RabbitMQ

创建一个 maven 项目,并添加 rabbitmq 的依赖。

1
2
3
4
5
6
7
8
9
<dependencies>
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<!--rabbitmq java 客户端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.11.0</version>
</dependency>
</dependencies>

3.1. Hello World

RabbitMQ 是消息代理:它接受并转发消息。可以将其视为邮局,将要发布的邮件放在邮箱中时,而邮局可以确保最终将邮件传递给相应收件人。

下面是 RabbitMQ 最简单的一种模式,“ P”表示生产者,“ C”是表示消费者。中间的框是一个队列 Queue,即保留的消息缓冲区。

image-20210513163253459
  • 生产仅意味着发送。发送消息的程序是生产者
  • 消费与接收具有相似的含义。一个消费者是一个程序,主要是等待接收信息:
  • 队列是 RabbitMQ 内部的邮政信箱的名称。尽管消息流经 RabbitMQ,但它只能存储在队列中队列仅由主机的存储器&磁盘限制约束,它本质上是一个大的消息缓冲器。许多生产者可以发送进入一个队列的消息,许多消费者可以尝试从一个队列接收数据。

下面将将用Java编写两个程序。一个是生产者,用于发送消息;一个是消费者,用于接收消息。

3.1.1. 编写生产者代码

创建一个类 com.tju.producer.Producer_HelloWorld

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class Producer_HelloWorld {
public static void main(String[] args) throws IOException,TimeoutException {
// 1. 创建工厂
ConnectionFactory factory = new ConnectionFactory();
// 2. 设置参数
factory.setHost("127.0.0.1"); // ip,默认值"localhost"
factory.setPort(5672); // 端口,默认值5672
factory.setVirtualHost("tju_virtual"); // 虚拟机,默认值"/"
factory.setUsername("zhang"); // 用户名,默认值guest
factory.setPassword("123"); // 密码,默认值guest
// 3. 创建连接 Connection
Connection connection = factory.newConnection();
// 4. 创建 Channel
Channel channel = connection.createChannel();
// 5. 创建队列 Queue
/*
参数1:队列名称(如果没有,则会自动创建)
参数二:是否持久化
参数三:是否独占,只能有一个消费者监听队列
参数四:是否自动删除(当没有消费者时)
参数五:一些其他参数设置
*/
channel.queueDeclare("hello", false, false, false, null);
// 6. 发送消息
String message = "Hello World!";
/*
参数1:交换机名称,简单模式下交换机会使用默认的"",设置为空字符串即可
参数2:路由key名称
参数3:配置信息
参数4:真实发送的消息数据(字节数组的形式byte[])
*/
channel.basicPublish("", "hello", null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");

// 7. 释放资源
channel.close();
connection.close();
}
}

3.1.2. 编写消费者代码

创建一个类 com.tjuconsumer.consumer_HelloWorld

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class Consumer_HelloWorld {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建工厂
ConnectionFactory factory = new ConnectionFactory();
// 2. 设置参数
factory.setHost("127.0.0.1"); // ip,默认值"localhost"
factory.setPort(5672); // 端口,默认值5672
factory.setVirtualHost("tju_virtual"); // 虚拟机,默认值"/"
factory.setUsername("zhang"); // 用户名,默认值guest
factory.setPassword("123"); // 密码,默认值guest
// 3. 创建连接 Connection
Connection connection = factory.newConnection();
// 4. 创建 Channel
Channel channel = connection.createChannel();
// 5. 创建队列 Queue(其实如果队列已经被生产者创建后,就无需再创建,但由于可能不知道生产者消费者的执行顺序,所以就双方都建立,避免队列不存在)
/*
参数1:队列名称(如果没有,则会自动创建)
参数二:是否持久化
参数三:是否独占,只能有一个消费者监听队列
参数四:是否自动删除(当没有消费者时)
参数五:一些其他参数设置
*/
channel.queueDeclare("hello", false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

// 6. 接收消息
Consumer consumer = new DefaultConsumer(channel){
/**
* 回调方法,当收到消息后,会自动执行该方法
* @param consumerTag 标识
* @param envelope 获取一些信息(比如交换机,路由key...)
* @param properties 配置信息
* @param body 真实数据
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:" + consumerTag);
System.out.println("Exchange:" + envelope.getExchange());
System.out.println("RoutingKey:" + envelope.getRoutingKey());
System.out.println("properties:" + properties);
System.out.println("body:" + new String(body));
}
};

/*
参数1:队列名称
参数2:是否自动确认
参数3:回调函数
*/
channel.basicConsume("hello", true, consumer);
// 注意消费者不要关闭资源,一直监听
}
}

3.1.3. 执行

执行消费者和生产者两个代码(顺序无所谓),然后就会发现,生产者发送的消息,被消费者所接收了。

image-20210513165036727

3.2. Work Queues

在 Hello World 模式下,一个队列只用于特定的一个生产者和一个消费者。在工作队列(Work queues)模式中,我们将创建一个工作队列,该队列将用于在多个工作人员之间分配耗时的任务。

在此模式下,生产者发送的多个消息,可以有多个消费者依次接收(但一个消息只有一个人能接收)。

应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

image-20210513165617572

下面将将用Java编写三个程序。一个是生产者,用于发送消息;两个是消费者,用于接收消息。

3.2.1. 编写生产者代码

创建一个类com.tju.producer.Producer_WorkQueues

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class Producer_WorkQueues {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建工厂
ConnectionFactory factory = new ConnectionFactory();
// 2. 设置参数
factory.setHost("127.0.0.1"); // ip,默认值"localhost"
factory.setPort(5672); // 端口,默认值5672
factory.setVirtualHost("tju_virtual"); // 虚拟机,默认值"/"
factory.setUsername("zhang"); // 用户名,默认值guest
factory.setPassword("123"); // 密码,默认值guest
// 3. 创建连接 Connection
Connection connection = factory.newConnection();
// 4. 创建 Channel
Channel channel = connection.createChannel();
// 5. 创建队列 Queue
/*
参数1:队列名称(如果没有,则会自动创建)
参数二:是否持久化
参数三:是否独占,只能有一个消费者监听队列
参数四:是否自动删除(当没有消费者时)
参数五:一些其他参数设置
*/
channel.queueDeclare("work_queues", false, false, false, null);
// 6. 发送消息
for (int i = 1; i <= 10; i++) {
String message = "【" + i + "】 Hello World!";
/*
参数1:交换机名称,简单模式下交换机会使用默认的"",设置为空字符串即可
参数2:路由名称
参数3:配置信息
参数4:真实发送的消息数据(字节数组的形式byte[])
*/
channel.basicPublish("", "work_queues", null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
}

// 7. 释放资源
channel.close();
connection.close();
}
}

3.2.2. 编写消费者代码

创建第一个类com.tjuconsumer.Consumer_WorkQueues1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class Consumer_WorkQueues1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建工厂
ConnectionFactory factory = new ConnectionFactory();
// 2. 设置参数
factory.setHost("127.0.0.1"); // ip,默认值"localhost"
factory.setPort(5672); // 端口,默认值5672
factory.setVirtualHost("tju_virtual"); // 虚拟机,默认值"/"
factory.setUsername("zhang"); // 用户名,默认值guest
factory.setPassword("123"); // 密码,默认值guest
// 3. 创建连接 Connection
Connection connection = factory.newConnection();
// 4. 创建 Channel
Channel channel = connection.createChannel();
// 5. 创建队列 Queue (参数1:队列名称(如果没有,则会自动创建) 参数二:是否持久化 参数三:是否独占,只能有一个消费者监听队列 参数四:是否自动删除(当没有消费者时)参数五:一些其他参数设置)
channel.queueDeclare("work_queues", false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

// 6. 接收消息
Consumer consumer = new DefaultConsumer(channel) {
/**
* 回调方法,当收到消息后,会自动执行该方法
* @param consumerTag 标识
* @param envelope 获取一些信息(比如交换机,路由key...)
* @param properties 配置信息
* @param body 真实数据
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:" + new String(body));
}
};

// 参数1:队列名称 参数2:是否自动确认 参数3:回调函数
channel.basicConsume("work_queues", true, consumer);

// 注意消费者不要关闭资源,一直监听
}
}

创建第二个类com.tjuconsumer.Consumer_WorkQueues2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class Consumer_WorkQueues2 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建工厂
ConnectionFactory factory = new ConnectionFactory();
// 2. 设置参数
factory.setHost("127.0.0.1"); // ip,默认值"localhost"
factory.setPort(5672); // 端口,默认值5672
factory.setVirtualHost("tju_virtual"); // 虚拟机,默认值"/"
factory.setUsername("zhang"); // 用户名,默认值guest
factory.setPassword("123"); // 密码,默认值guest
// 3. 创建连接 Connection
Connection connection = factory.newConnection();
// 4. 创建 Channel
Channel channel = connection.createChannel();
// 5. 创建队列 Queue (参数1:队列名称(如果没有,则会自动创建) 参数二:是否持久化 参数三:是否独占,只能有一个消费者监听队列 参数四:是否自动删除(当没有消费者时)参数五:一些其他参数设置)
channel.queueDeclare("work_queues", false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

// 6. 接收消息
Consumer consumer = new DefaultConsumer(channel) {
/**
* 回调方法,当收到消息后,会自动执行该方法
* @param consumerTag 标识
* @param envelope 获取一些信息(比如交换机,路由key...)
* @param properties 配置信息
* @param body 真实数据
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:" + new String(body));
}
};

// 参数1:队列名称 参数2:是否自动确认 参数3:回调函数
channel.basicConsume("work_queues", true, consumer);
// 注意消费者不要关闭资源,一直监听
}
}

3.2.3. 执行

首先执行两个消费者代码,接着执行生产者代码,然后就会发现,生产者发送的消息,被两个消费者所依次接收了。

  • 第一个消费者输出的内容

    image-20210513175350936

  • 第二个消费者输出的内容

    image-20210513175437273

3.3. Publish/Subscribe

在 Work queues 模式下,每个消息都恰好交付给一个消费者。在发布/订阅(Publish/Subscribe)模式中,我们将同一个消息可以传达给多个消费者。

在此模式下,生产者发送的单个消息,可以有多个消费者进行接收。

相比于之前的模式,发布/订阅模式多了一个交换机(Exchange),由下图中的X表示。生产者不再直接把消息发送给队列,而是发送给交换机。交换机可以通过与某个队列绑定,会把从生产者获得的消息传递给相应满足规则(路由key)的队列。

image-20210513180049284
  • 生产者:也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
  • 消费者:消息的接受者,会一直等待消息到来。
  • 消息队列用于接收消息、缓存消息、发送消息。
  • 交换机:图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
    • Fanout:广播,将消息交给所有绑定到交换机的队列
    • Direct:定向,把消息交给符合指定routing key 的队列
    • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

具体流程为:

  • 每个消费者监听自己的队列。

  • 生产者将消息发给 broker(其实就是发给交换机),由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息

发布订阅模式与工作队列模式的区别

1、工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机。

2、发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)。

3、发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑 定到默认的交换机 。

下面将将用Java编写三个程序。一个是生产者,用于发送消息;两个是消费者,用于接收消息。

3.3.1. 编写生产者代码

创建一个类com.tju.producer.Producer_PubSub

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class Producer_PubSub {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建工厂
ConnectionFactory factory = new ConnectionFactory();
// 2. 设置参数
factory.setHost("127.0.0.1"); // ip,默认值"localhost"
factory.setPort(5672); // 端口,默认值5672
factory.setVirtualHost("tju_virtual"); // 虚拟机,默认值"/"
factory.setUsername("zhang"); // 用户名,默认值guest
factory.setPassword("123"); // 密码,默认值guest
// 3. 创建连接 Connection
Connection connection = factory.newConnection();
// 4. 创建 Channel
Channel channel = connection.createChannel();
// 5. 创建交换机
/*
参数1:交换机名称
参数2:交换机的类型(direct定向、fanout广播、topic通配符、headers参数匹配)
参数3:是否持久化
参数4:自动删除
参数5:内部使用,一般false
参数6:其他参数
*/
channel.exchangeDeclare("test_fanout", BuiltinExchangeType.FANOUT,true,false,false,null);
// 6. 创建队列
channel.queueDeclare("test_fanout_queue1",true,false,false,null);
channel.queueDeclare("test_fanout_queue2",true,false,false,null);
// 7. 绑定队列和交换机(参数:队列名称、交换机名称、路由key,即绑定规则(如果交换机类型为fanout,则只需设置为""))
channel.queueBind("test_fanout_queue1","test_fanout","");
channel.queueBind("test_fanout_queue2","test_fanout","");
// 8. 发送消息
for (int i = 0; i < 10 ; i++) {
String message = "这是日志信息【" + i + "】";
channel.basicPublish("test_fanout","",null,message.getBytes(StandardCharsets.UTF_8));
}
// 9. 释放资源
channel.close();
connection.close();
}
}

3.3.2. 编写消费者代码

创建第一个类 com.tjuconsumer.Consumer_PubSub1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class Consumer_PubSub1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建工厂
ConnectionFactory factory = new ConnectionFactory();
// 2. 设置参数
factory.setHost("127.0.0.1"); // ip,默认值"localhost"
factory.setPort(5672); // 端口,默认值5672
factory.setVirtualHost("tju_virtual"); // 虚拟机,默认值"/"
factory.setUsername("zhang"); // 用户名,默认值guest
factory.setPassword("123"); // 密码,默认值guest
// 3. 创建连接 Connection
Connection connection = factory.newConnection();
// 4. 创建 Channel
Channel channel = connection.createChannel();
// // 5. 创建队列 Queue (参数1:队列名称(如果没有,则会自动创建) 参数二:是否持久化 参数三:是否独占,只能有一个消费者监听队列 参数四:是否自动删除(当没有消费者时)参数五:一些其他参数设置)
// channel.queueDeclare("test_fanout_queue1", false, false, false, null);
// 当创建过队列后,其实就没必要再重新创建了,直接根据队列名接收消息就可以了
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

// 6. 接收消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1接收:" + new String(body));
}
};
// 参数1:队列名称 参数2:是否自动确认 参数3:回调函数
channel.basicConsume("test_fanout_queue1", true, consumer);
}
}

创建第二个类 com.tjuconsumer.Consumer_PubSub2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class Consumer_PubSub2 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建工厂
ConnectionFactory factory = new ConnectionFactory();

// 2. 设置参数
factory.setHost("127.0.0.1"); // ip,默认值"localhost"
factory.setPort(5672); // 端口,默认值5672
factory.setVirtualHost("tju_virtual"); // 虚拟机,默认值"/"
factory.setUsername("zhang"); // 用户名,默认值guest
factory.setPassword("123"); // 密码,默认值guest

// 3. 创建连接 Connection
Connection connection = factory.newConnection();

// 4. 创建 Channel
Channel channel = connection.createChannel();

System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

// 5. 接收消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2接收:" + new String(body));
}
};

// 参数1:队列名称 参数2:是否自动确认 参数3:回调函数
channel.basicConsume("test_fanout_queue2", true, consumer);
}
}

3.3.3. 执行

首先执行两个消费者代码,接着执行生产者代码,然后就会发现,生产者发送的消息,被两个消费者均接收了。

  • 第一个消费者输出的内容

    image-20210513181738309

  • 第二个消费者输出的内容

    image-20210513181750695

3.4. Routing

路由模式特点:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在向 Exchange发送消息时,也必须指定消息的 RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息。
image-20210513182141441
  • P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
  • X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
  • C1:消费者,其所在队列指定了需要routing key 为 error 的消息
  • C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息

具体流程为:

  • 每个消费者监听自己的队列。

  • 生产者将消息发给 broker(其实就是发给交换机),由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息

下面将将用Java编写三个程序。一个是生产者,用于发送消息;两个是消费者,用于接收消息。

在编码上与 Publish/Subscribe发布与订阅模式 的区别是交换机的类型为:Direct,还有队列绑定交换机的时候需要指定routing key。

3.4.1. 编写生产者代码

创建一个类com.tju.producer.Producer_Routing

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class Producer_Routing {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建工厂
ConnectionFactory factory = new ConnectionFactory();
// 2. 设置参数
factory.setHost("127.0.0.1"); // ip,默认值"localhost"
factory.setPort(5672); // 端口,默认值5672
factory.setVirtualHost("tju_virtual"); // 虚拟机,默认值"/"
factory.setUsername("zhang"); // 用户名,默认值guest
factory.setPassword("123"); // 密码,默认值guest
// 3. 创建连接 Connection
Connection connection = factory.newConnection();
// 4. 创建 Channel
Channel channel = connection.createChannel();
// 5. 创建交换机
/*
参数1:交换机名称
参数2:交换机的类型(direct定向、fanout广播、topic通配符、headers参数匹配)
参数3:是否持久化
参数4:自动删除
参数5:内部使用,一般false
参数6:其他参数
*/
channel.exchangeDeclare("test_direct", BuiltinExchangeType.DIRECT,true,false,false,null);
// 6. 创建队列
channel.queueDeclare("test_direct_queue1",true,false,false,null);
channel.queueDeclare("test_direct_queue2",true,false,false,null);
// 7. 绑定队列和交换机(参数:队列名称、交换机名称、路由key,即绑定规则(如果交换机类型为fanout,则只需设置为""))
// 队列1与交换机绑定,,规则为error
channel.queueBind("test_direct_queue1","test_direct","error");
// 队列2与交换机绑定,,规则为error、info、warning
channel.queueBind("test_direct_queue2","test_direct","info");
channel.queueBind("test_direct_queue2","test_direct","error");
channel.queueBind("test_direct_queue2","test_direct","warning");
// 8. 发送消息
String message = "这是一条日志信息";
/*
参数1:交换机名称
参数2:Routing key,即绑定的规则
参数3:配置信息
参数4:要发送的消息
*/
channel.basicPublish("test_direct","info",null,message.getBytes(StandardCharsets.UTF_8));
// 9. 释放资源
channel.close();
connection.close();
}
}

3.4.2. 编写消费者代码

创建第一个类com.tjuconsumer.Consumer_Routing1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class Consumer_Routing1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建工厂
ConnectionFactory factory = new ConnectionFactory();
// 2. 设置参数
factory.setHost("127.0.0.1"); // ip,默认值"localhost"
factory.setPort(5672); // 端口,默认值5672
factory.setVirtualHost("tju_virtual"); // 虚拟机,默认值"/"
factory.setUsername("zhang"); // 用户名,默认值guest
factory.setPassword("123"); // 密码,默认值guest
// 3. 创建连接 Connection
Connection connection = factory.newConnection();
// 4. 创建 Channel
Channel channel = connection.createChannel();
// 5. 接收消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:" + new String(body));
System.out.println("将日志信息打印到控制台...");
}
};
// 参数1:队列名称 参数2:是否自动确认 参数3:回调函数
channel.basicConsume("test_direct_queue1", true, consumer);
}
}

创建第二个类com.tjuconsumer.Consumer_Routing2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class Consumer_Routing2 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建工厂
ConnectionFactory factory = new ConnectionFactory();
// 2. 设置参数
factory.setHost("127.0.0.1"); // ip,默认值"localhost"
factory.setPort(5672); // 端口,默认值5672
factory.setVirtualHost("tju_virtual"); // 虚拟机,默认值"/"
factory.setUsername("zhang"); // 用户名,默认值guest
factory.setPassword("123"); // 密码,默认值guest
// 3. 创建连接 Connection
Connection connection = factory.newConnection();
// 4. 创建 Channel
Channel channel = connection.createChannel();
// 5. 接收消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:" + new String(body));
System.out.println("将日志信息保存到磁盘...");
}
};
// 参数1:队列名称 参数2:是否自动确认 参数3:回调函数
channel.basicConsume("test_direct_queue2", true, consumer);
}
}

3.4.3. 执行

首先执行两个消费者代码,接着执行生产者代码,然后就会发现,由于我们发送消息时,绑定的规则为info,只与队列2的规则匹配,而与队列1的规则不匹配,从而消息只会发送到队列2,不会发送到队列1,因此消费者1不能收到消息,消费者2可以收到消息。

  • 第一个消费者输出的内容

    image-20210513182938856

  • 第二个消费者输出的内容

    image-20210513182900443

3.5. Topics

Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

通配符规则:

  • 路由格式必须以 . 分隔,比如 user.email 或者 user.aaa.email

  • #:匹配一个或多个词

  • *:匹配不多不少恰好1个词

举例:

item.#:能够匹配item.insert.abc 或者 item.insert

item.*:只能匹配item.insert

image-20210513183140138

具体流程为:

  • 每个消费者监听自己的队列。

  • 生产者将消息发给broker(其实就是发给交换机),由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息

下面将将用Java编写三个程序。一个是生产者,用于发送消息;两个是消费者,用于接收消息。

3.4.1. 编写生产者代码

创建一个类com.tju.producer.Producer_Topics

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class Producer_Topics {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建工厂
ConnectionFactory factory = new ConnectionFactory();
// 2. 设置参数
factory.setHost("127.0.0.1"); // ip,默认值"localhost"
factory.setPort(5672); // 端口,默认值5672
factory.setVirtualHost("tju_virtual"); // 虚拟机,默认值"/"
factory.setUsername("zhang"); // 用户名,默认值guest
factory.setPassword("123"); // 密码,默认值guest
// 3. 创建连接 Connection
Connection connection = factory.newConnection();
// 4. 创建 Channel
Channel channel = connection.createChannel();
// 5. 创建交换机
/*
参数1:交换机名称
参数2:交换机的类型(direct定向、fanout广播、topic通配符、headers参数匹配)
参数3:是否持久化
参数4:自动删除
参数5:内部使用,一般false
参数6:其他参数
*/
channel.exchangeDeclare("test_topic", BuiltinExchangeType.TOPIC,true,false,false,null);
// 6. 创建队列
channel.queueDeclare("test_topic_queue1",true,false,false,null);
channel.queueDeclare("test_topic_queue2",true,false,false,null);
// 7. 绑定队列和交换机(参数:队列名称、交换机名称、路由key,即绑定规则(如果交换机类型为fanout,则只需设置为""))
// 队列1与交换机绑定,规则为以order开头或者error结尾
channel.queueBind("test_topic_queue1","test_topic","#.error");
channel.queueBind("test_topic_queue1","test_topic","order.#");
// 队列2与交换机绑定,规则为任意
channel.queueBind("test_topic_queue2","test_topic","#.#");
// 8. 发送消息
String message = "这是一条日志信息";
/*
参数1:交换机名称
参数2:Routing key,即绑定的规则
参数3:配置信息
参数4:要发送的消息
*/
channel.basicPublish("test_topic","x",null,message.getBytes(StandardCharsets.UTF_8));
// 9. 释放资源
channel.close();
connection.close();
}
}

3.4.2. 编写消费者代码

创建第一个类com.tjuconsumer.Consumer_Topics1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class Consumer_Topics1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建工厂
ConnectionFactory factory = new ConnectionFactory();
// 2. 设置参数
factory.setHost("127.0.0.1"); // ip,默认值"localhost"
factory.setPort(5672); // 端口,默认值5672
factory.setVirtualHost("tju_virtual"); // 虚拟机,默认值"/"
factory.setUsername("zhang"); // 用户名,默认值guest
factory.setPassword("123"); // 密码,默认值guest
// 3. 创建连接 Connection
Connection connection = factory.newConnection();
// 4. 创建 Channel
Channel channel = connection.createChannel();
// 5. 接收消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:" + new String(body));
System.out.println("将日志信息打印到控制台...");
}
};
// 参数1:队列名称 参数2:是否自动确认 参数3:回调函数
channel.basicConsume("test_topic_queue1", true, consumer);
}
}

创建第二个类com.tjuconsumer.Consumer_Topics2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class Consumer_Topics2 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建工厂
ConnectionFactory factory = new ConnectionFactory();
// 2. 设置参数
factory.setHost("127.0.0.1"); // ip,默认值"localhost"
factory.setPort(5672); // 端口,默认值5672
factory.setVirtualHost("tju_virtual"); // 虚拟机,默认值"/"
factory.setUsername("zhang"); // 用户名,默认值guest
factory.setPassword("123"); // 密码,默认值guest
// 3. 创建连接 Connection
Connection connection = factory.newConnection();
// 4. 创建 Channel
Channel channel = connection.createChannel();
// 5. 接收消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:" + new String(body));
System.out.println("将日志信息保存到磁盘...");
}
};
// 参数1:队列名称 参数2:是否自动确认 参数3:回调函数
channel.basicConsume("test_topic_queue2", true, consumer);
}
}

3.4.3. 执行

首先执行两个消费者代码,接着执行生产者代码,然后就会发现,由于我们发送消息时,绑定的规则为x,只与队列2的规则匹配,而与队列1的规则不匹配,从而消息只会发送到队列2,不会发送到队列1,因此消费者1不能收到消息,消费者2可以收到消息。

  • 第一个消费者输出的内容

    image-20210513182938856

  • 第二个消费者输出的内容

    image-20210513182900443

4. SpringBoot 整合 RabbitMQ

前面已经介绍了 Java 代码如何操作 RabbitMQ了,但是可以发现,代码非常繁琐,可以利用 SpringBoot 直接整合 RabbitMQ 简化操作,提升开发效率。

4.1 Hello World

一个生产者,一个消费者。

Hello World 是 RabbitMQ 最简单的一种模式,“ P”表示生产者,“ C”是表示消费者。中间的框是一个队列 Queue,即保留的消息缓冲区。

image-20210513163253459
  • 生产仅意味着发送。发送消息的程序是生产者
  • 消费与接收具有相似的含义。一个消费者是一个程序,主要是等待接收信息:
  • 队列是 RabbitMQ 内部的邮政信箱的名称。尽管消息流经 RabbitMQ,但它只能存储在队列中队列仅由主机的存储器&磁盘限制约束,它本质上是一个大的消息缓冲器。许多生产者可以发送进入一个队列的消息,许多消费者可以尝试从一个队列接收数据。

生产者发送消息

引入依赖

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

在 application.properties 中配置 rabbitmq

1
2
3
4
5
6
# 配置rabbitmq连接信息
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=tju_virtual
spring.rabbitmq.username=zhang
spring.rabbitmq.password=123

创建消息队列(也可以在rabbitmq web界面直接创建)

1
2
3
4
5
6
7
8
9
10
@Configuration
public class RabbitmqConfig {
//队列名称
public static final String HELLO_WORLD = "hello.world";
//声明队列
@Bean()
public Queue helloWorldQueue(){
return QueueBuilder.durable(HELLO_WORLD).build();// 创建一个名称为 hello.world的队列
}
}

使用 springboot 提供的 RabbitmqTemplate 进行消息的发送与接收

1
2
3
4
5
6
7
8
9
10
11
12
13
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;

// Hello World 模式
@Test
public void testHelloWorld(){
String queue = "hello.world"; // 注意队列应该提前创建好(配置类或直接在web界面创建)
String message = "Hello RabbitTemplate!"; // 消息可以为任意类型,会自动转换
rabbitTemplate.convertAndSend(queue,message); // 发送消息
}
}

消费者接收消息

引入依赖、在 application.properties 中配置 rabbitmq。

监听消息,新建一个类注入到 IOC 中,并使用 @RabbitListener 注解指定监听的队列。

1
2
3
4
5
6
7
8
9
10
11
@Component
public class RabbitmqListener {
/**
* 【 Hello World 模式】监听某个队列的消息
* @param message 接收到的消息
*/
@RabbitListener(queues = "hello.world")
public void helloWorldListener(String message){
System.out.println("消费者接收到的消息为:" + message);
}
}

启动 SpringBoot 服务即可。

注意:消息一旦消费就会从队列删除,RabbitMQ 没有消息回溯功能

4.2 Work Queues

一个生产者,多个消费者,且一条消息只能被一个消费者所消费。

在 Hello World 模式下,一个队列只用于特定的一个生产者和一个消费者。在工作队列(Work queues)模式中,我们将创建一个工作队列,该队列将用于在多个工作人员之间分配耗时的任务。

在此模式下,生产者发送的多个消息,可以有多个消费者依次接收(但一个消息只有一个人能接收)。

应用场景:对于 任务过重或任务较多情况,可能生产消息的速度会 远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理,此时使用工作队列可以提高任务处理的速度。

image-20210513165617572

生产者发送消息

在配置类中创建队列

1
2
3
4
5
public static final String WORK_QUEUES ="work.queues";
@Bean()
public Queue workQueuesQueue(){
return QueueBuilder.durable(WORK_QUEUES).build();
}

使用 RabbitTemplate 发送消息【和 Hello World 模式一致】

1
2
3
4
5
6
7
8
9
// Work Queues 模式
@Test
public void testWorkQueues(){
String queue = "work.queues";
String message = "Hello RabbitTemplate,Message_";
for(int i=1;i<=50;i++){
rabbitTemplate.convertAndSend(queue,message + i);
}
}

消费者接收消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 【 Work Queues 模式】监听某个队列的消息
* @param message 接收到的消息
*/
@RabbitListener(queues = "work.queues")
public void workQueuesListener1(String message) throws InterruptedException {
System.err.println("消费者1接收到的消息为:" + message);
Thread.sleep(20); // 模拟不同消费者,有不同的消息处理能力
}

/**
* 【 Work Queues 模式】监听某个队列的消息
* @param message 接收到的消息
*/
@RabbitListener(queues = "work.queues")
public void WorkQueuesListener2(String message) throws InterruptedException {
System.out.println("消费者2接收到的消息为:" + message);
Thread.sleep(200);
}

此时会发现,消费者 1 和消费者 2 的消息处理能力虽然不同,却消费同样多的消息,也就是说消息被『平均消费』了。

然而由于不同的消费者有不同的处理速度,处理速度快的应该处理更多的消息,我们可以通过添加以下配置达到这种『能者多劳』的效果,此时消费者 1 和消费者 2 处理的消息数量将不同。

1
2
# 每次只能获取一条消息,处理完成才能获取下一个消息(也可以实现限流的效果)
spring.rabbitmq.listener.simple.prefetch=1

4.3 Publish/Subscribe

生产者发送的单个消息,所有消费者都进行接收。

在 Work queues 模式下,每个消息都恰好交付给一个消费者。在发布/订阅(Publish/Subscribe)模式中,我们将同一个消息可以传达给多个消费者。

相比于之前的模式,发布/订阅模式多了一个交换机(Exchange),由下图中的X表示。生产者不再直接把消息发送给队列,而是发送给交换机。交换机可以通过与某个队列绑定,会把从生产者获得的消息传递给相应满足规则(路由key)的队列。

image-20210513180049284
  • 生产者:也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
  • 消费者:消息的接受者,会一直等待消息到来。
  • 消息队列用于接收消息、缓存消息、发送消息。
  • 交换机:图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
    • Fanout:广播,将消息交给所有绑定到交换机的队列
    • Direct:定向,把消息交给符合指定 routing key 的队列
    • Topic:通配符,把消息交给符合 routing pattern(路由模式) 的队列

具体流程为:

  • 每个消费者监听自己的队列。

  • 生产者将消息发给 broker(其实就是发给交换机),由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息

消费者发送消息

在配置类中创建队列和交换机,并进行绑定

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 队列名称
public static final String FANOUT_QUEUE1 ="fanout.queue1";
public static final String FANOUT_QUEUE2 ="fanout.queue2";
// 交换机名称
public static final String FANOUT_EXCHANGE ="fanout.exchange";

// 声明队列
@Bean()
public Queue fanoutQueue1(){
return QueueBuilder.durable(FANOUT_QUEUE1).build();
}
@Bean()
public Queue fanoutQueue2(){
return QueueBuilder.durable(FANOUT_QUEUE2).build();
}
// 声明交换机(广播形式)
@Bean
public FanoutExchange fanoutExchange(){
return ExchangeBuilder.fanoutExchange(FANOUT_EXCHANGE).durable(true).build();
}
// 绑定队列和交换机
@Bean
public Binding bindingQueue1(@Qualifier("fanoutQueue1") Queue queue, @Qualifier("fanoutExchange") FanoutExchange exchange){
return BindingBuilder.bind(queue).to(exchange);
}
@Bean
public Binding bindingQueue2(@Qualifier("fanoutQueue2") Queue queue, @Qualifier("fanoutExchange") FanoutExchange exchange){
return BindingBuilder.bind(queue).to(exchange);
}

使用 RabbitTemplate 发送消息【和 Hello World 模式一致】

1
2
3
4
5
6
7
8
9
// Publish/Subscribe 模式,交换机使用 Fanout 广播
@Test
public void testFanoutExchange(){
String exchange = "fanout.exchange";
String message = "Hello RabbitTemplate!,Message_";
for(int i=1;i<=50;i++){
rabbitTemplate.convertAndSend(exchange,"",message + i); // 会将消息发送到交换机,而不是队列
}
}

消费者接收消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 【 Publish/Subscribe 模式】监听某个队列的消息
* @param message 接收到的消息
*/
@RabbitListener(queues = "fanout.queue1")
public void fanoutQueueListener1(String message) throws InterruptedException {
System.err.println("消费者1接收到的消息为:" + message);
}

/**
* 【 Publish/Subscribe 模式】监听某个队列的消息
* @param message 接收到的消息
*/
@RabbitListener(queues = "fanout.queue2")
public void fanoutQueueListener2(String message) throws InterruptedException {
System.out.println("消费者2接收到的消息为:" + message);
}

此时消费者 1 和消费者 2 均接收到生产者发送的所有消息。

4.4 Routing

生产者发送的单个消息,可以绑定多个消费者进行接收。

路由模式特点:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key),因此成为路由模式。
  • 消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey
  • Exchange 不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息。
image-20210513182141441
  • P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
  • X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
  • C1:消费者,其所在队列指定了需要routing key 为 error 的消息
  • C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息

具体流程为:

  • 每个消费者监听自己的队列。

  • 生产者将消息发给 broker(其实就是发给交换机),由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息

我们可以向之前那样,先在配置类中声明队列、交换机、绑定等信息,不过还有一种更简单的写法,直接在消费者的监听类中,在 @RabbitListener 注解中声明各种信息!

消费者接收消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 【 Routing 模式】监听某个队列的消息,交换机采用 direct定向方式
* @param message 接收到的消息
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"), // 队列名称
exchange = @Exchange(name = "direct.exchange", type = ExchangeTypes.DIRECT), // 交换机名称和类型
key = {"user"}) // key 可以写多个,用逗号分隔开
)
public void directQueueListener1(String message) throws InterruptedException {
System.out.println("用户接收到的消息为:" + message);
}

/**
* 【 Routing 模式】监听某个队列的消息,交换机采用 direct定向方式
* @param message 接收到的消息
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"), // 队列名称
exchange = @Exchange(name = "direct.exchange", type = ExchangeTypes.DIRECT), // 交换机名称和类型
key = {"admin"}) // key 可以写多个,用逗号分隔开
)
public void directQueueListener2(String message) throws InterruptedException {
System.out.println("管理员接收到的消息为:" + message);
}

生产者发送消息

1
2
3
4
5
6
7
8
9
// Routing 模式,交换机使用 direct 定向
@Test
public void testDirectExchange(){
String exchange = "direct.exchange";
String message2User = "这条消息只有用户可以收到!";
String message2Admin = "这条消息只有管理员可以收到!";
rabbitTemplate.convertAndSend(exchange,"user",message2User); // 会将消息发送到交换机,而不是队列
rabbitTemplate.convertAndSend(exchange,"admin",message2Admin); // 会将消息发送到交换机,而不是队列
}

此时不同消费者只能接收到符合所绑定key的消息。

4.5 Topics

生产者发送的单个消息,可以绑定多个消费者(支持通配符)进行接收。

Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 TopicExchange可以让队列在绑定Routing key 的时候使用通配符

Routingkey 一般都是有一个或多个单词组成,多个单词之间以点分割,例如: item.insert

通配符规则:

  • 路由格式必须以 . 分隔,比如 user.email 或者 user.aaa.email

  • #:匹配一个或多个词

  • *:匹配不多不少恰好1个词

举例:

item.#:能够匹配item.insert.abc 或者 item.insert

item.*:只能匹配item.insert

image-20210513183140138

具体流程为:

  • 每个消费者监听自己的队列。

  • 生产者将消息发给broker(其实就是发给交换机),由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息

消费者接收消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 【 Topics 模式】监听某个队列的消息,交换机采用 Topic 方式
* @param message 接收到的消息
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"), // 队列名称
exchange = @Exchange(name = "topic.exchange", type = ExchangeTypes.TOPIC), // 交换机名称和类型
key = {"china.*"}) // key 可以写多个,并且支持通配符写法!!用逗号分隔开
)
public void topicQueueListener1(String message) throws InterruptedException {
System.out.println("收到前缀为china的消息:" + message);
}

/**
* 【 Topics 模式】监听某个队列的消息,交换机采用 Topic 方式
* @param message 接收到的消息
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"), // 队列名称
exchange = @Exchange(name = "topic.exchange", type = ExchangeTypes.TOPIC), // 交换机名称和类型
key = {"#.videos"}) // key 可以写多个,并且支持通配符写法!!用逗号分隔开
)
public void topicQueueListener2(String message) throws InterruptedException {
System.out.println("收到后缀为videos的消息:" + message);
}

生产者发送消息

1
2
3
4
5
6
7
8
9
10
11
// Topics 模式,交换机使用 topic 定向
@Test
public void testTopicExchange(){
String exchange = "topic.exchange";
String message1 = "消息为:china.people";
String message2 = "消息为:us.videos";
String message3 = "消息为:china.videos";
rabbitTemplate.convertAndSend(exchange,"china.people",message1);
rabbitTemplate.convertAndSend(exchange,"us.videos",message2);
rabbitTemplate.convertAndSend(exchange,"china.videos",message3);
}

4.6. 模式总结

RabbitMQ工作模式:

1、简单模式 HelloWorld
一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)

2、工作队列模式 Work Queue
一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)

3、发布订阅模式 Publish/Subscribe
需要设置类型为fanout的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列

4、路由模式 Routing
需要设置类型为 direct 的交换机,交换机和队列进行绑定,并且指定routing key,当发送消息到交换机后,交换机会根据routing key 将消息发送到对应的队列

5、通配符模式 Topics
需要设置类型为topic的交换机,交换机和队列进行绑定,并且指定通配符方式的 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列

笼统的说,RabbitMQ工作模式分为两类

  • 一类是直接使用队列,包含Hello World 和 Work Queues 模式,不同的在于消费者数量而已。

  • 另一类使用交换机,可以使得不同消费者消费相同或者不同的消息,包含 Publish/Subscribe、Routing、Topics模式,不同的是交换机将消息发送到队列的方式不同,Pushlish/Subscribe 采用广播形式,所有队列都存有消息;Routing 采用路由形式,会根据 routing key,将消息发送到指定的队列;Topics 和 Routing 基本一致,只不过 routing key支持通配符而已!

-------本 文 结 束 感 谢 您 的 阅 读-------