1. 简介
1.1. 概述
消息队列简称 MQ,英文全称为 Message Queue,它是一种跨进程的通信机制,用于上下游传递消息。采取典型的生产者和消费者模型,生产者不断的向队列中发送消息,消费者不断的从队列中获取消息。
MQ 的优点:
任务异步处理
在项目中,可将一些无需即时返回且耗时的操作提取出来,进行异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。此时MQ 就类似于一个中转站,使得消息的发送和接收可以异步进行,而生产者无需同步等待消费者及时返回。
比如在商品购物业务中,用户支付完之后 ,支付业务直接将消息投递到 MQ 中,就直接可以返回用户结果,而无需再同步等待订单服务、短信服务等模块的响应,实现了任务的异步处理,从而实现了异步通信。
应用程序解耦合
MQ 相当于一个中介,生产者只需要将消息投递到 MQ中,而无需直接调用消费者模块的相关接口,消费者只需要监听 MQ 即可,从而实现应用程序的解耦合。并且如果之后需要增加一些业务需求,无需改变生产者代码,只需让新的业务监听 MQ 消息即可。
比如在商品购物业务中,需要增加积分服务,那么只需要写完积分服务代码后,让其监听 MQ 即可,不需要再修改支付服务的代码,从而实现了解耦。而如果积分服务挂了,但并不会影响支付服务,从而实现了故障隔离(如果是同步调用,那么一个挂了,则相当于所有都挂了,消息队列机制则不会。)
削峰填谷
如果一时间有大量的消息涌入,使用 MQ 可以应对生产者的流量冲击(类似于流量控制),消费者只需要按照自己的处理能力对消息进行处理即可。
比如在双十一秒杀时,有大量的订单消息,如果不对流量进行控制,那么订单服务可能会由于无法支撑那么大的并发量而挂掉。假设订单服务每秒只能处理10000次订单,而实际每秒有20000次订单,此时可以通过消息队列做一个缓冲作用,让订单服务分2秒处理掉这20000次订单。

MQ 的缺点:
- 系统可用性降低:系统依赖于MQ的可靠性、安全性、吞吐能力。比如若MQ宕机,整个系统都会崩溃。
- 数据链路复杂性增加:本来生产者只需要直接将消息发送给消费者,而此时加入一个 MQ做中转,可能出现消息丢失,消息的转发顺序改变,消息重复调用等等问题。
- 数据一致性问题:生产者发送消息需要MQ和消费者共同处理,如果MQ处理成功,而消费者处理失败,则可能会造成数据一致性问题。或者是多个消费者要处理修改同一条消息时,也可能会造成一致性问题。
1.2. AMQP 和 JMS
MQ 是消息通信的模型;实现 MQ 的大致有两种主流方式:AMQP、JMS。
1.2.1. AMQP
AMQP 是一种协议,更准确的说是一种 binary wire-level protocol(链接协议)。这是其和 JMS 的本质差别,AMQP不从 API 层进行限定,而是直接定义网络交换的数据格式。
1.2.2. JMS
JMS 即 Java 消息服务(JavaMessage Service)应用程序接口,是一个 Java 平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。
1.2.3. AMQP 与 JMS 区别
- JMS 是定义了统一的接口,来对消息操作进行统一;AMQP 是通过规定协议来统一数据交互的格式
- JMS 限定了必须使用Java语言;AMQP 只是协议,不规定实现方式,因此是跨语言的。
- JMS 规定了两种消息模式;而 AMQP 的消息模式更加丰富
1.3. 消息队列产品
市场上常见的消息队列有如下:
| 名称 |
开发语言 |
支持协议 |
可用性 |
单机吞吐量 |
消息延迟 |
可靠性 |
| ActiveMQ |
Java |
AMQP… |
高(主从架构) |
万级 |
毫秒级 |
一般 |
| RabbitMQ |
Erlang(并发强) |
AMQP |
高(主从架构) |
万级 |
微秒级 |
高 |
| RocketMQ |
Java |
自定义协议 |
非常高(分布式架构) |
十万级 |
毫秒级 |
高 |
| Kafka |
Scala |
自定义协议 |
非常高(分布式架构) |
十万级 |
毫米级以内 |
一般 |
- RabbitMQ的可靠性高,消息延迟最低,社区活跃。
- Kafka的吞吐量极高,但可能会丢失数据,社区活跃。
- 如果是对可靠性要求较高的服务(如订单服务),可考虑使用 RabbitMQ/RocketMQ,如果对可靠性要求不太高、追求高吞吐量的服务(如日志服务),可考虑使用 Kafka。
1.4. RabbitMQ
RabbitMQ 是由 erlang 语言开发,基于 AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列。官方地址:http://www.rabbitmq.com/
RabbitMQ 基本结构:
publisher:生产者,负责产生消息
exchange:交换机,负责路由消息到队列中
queue:队列,负责缓存消息
virtual host:虚拟主机,是对queue、exchange等资源的逻辑分组
consumer:消费者,负责处理消息。
RabbitMQ 提供了 6 种模式:简单模式,work 模式,Publish/Subscribe 发布与订阅模式,Routing 路由模式,Topics主题模式,RPC 远程调用模式(远程调用,不太算MQ);官网对应模式介绍:https://www.rabbitmq.com/getstarted.html
2. 安装
官方安装教程:https://www.rabbitmq.com/download.html
2.1 CentOS7 安装
2.1.1 安装依赖环境
1
| yum install -y make gcc gcc-c++ m4 openssl openssl-devel ncurses-devel unixODBC unixODBC-devel java java-devel
|
2.1.2 安装 Erlang
注意 Erlang 和 RabbitMQ 需要版本对应,详情见https://www.rabbitmq.com/which-erlang.html
下载 Erlang,网址为 http://erlang.org/download/ ,比如下载 otp_src_23.3.tar.gz,然后通过 ftp 工具传输到 linux 服务器中,也可以使用 wget 命令在线下载(此文件下载非常慢)
1 2 3 4 5 6 7 8 9
| wget http://erlang.org/download/otp_src_23.3.tar.gz tar -zxvf otp_src_23.3.tar.gz cd otp_src_23.3 ./configure --prefix=/usr/local/erlang make && make install ll /usr/local/erlang/bin echo 'export PATH=$PATH:/usr/local/erlang/bin' >> /etc/profile source /etc/profile erl
|

2.1.3 安装 RabbitMQ
下载 RabbitMQ,网址为https://www.rabbitmq.com/install-generic-unix.html#downloads,下载后通过 ftp 工具传输到 linux 服务器中,也可以使用 wget 命令在线下载
1 2 3 4 5 6 7
| wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.16/rabbitmq-server-generic-unix-3.8.16.tar.xz tar -xvf rabbitmq-server-generic-unix-3.8.16.tar.xz mv rabbitmq_server-3.8.16/ /usr/local/ echo 'export PATH=$PATH:/usr/local/rabbitmq_server-3.8.16/sbin' >> /etc/profile source /etc/profile rabbitmq-plugins enable rabbitmq_management rabbitmq-server -detached
|
此时如果在浏览器输入http://ip:15672/就会看到web界面的登录界面,默认用户名密码均为guest,但如果此时我们登录,会遇到 User can only log in via localhost 这个问题。
主要原因是 RabbitMQ 从 3.3.0 开始禁止使用 guest/guest 权限通过除 localhost 外的访问,所以我们需要进行一定配置,以使得其他主机也能访问此 RabbitMQ(注意开放端口15672,或者关闭防火墙 systemctl stop firewalld.service)
在3.8版本以前,可以采取下面方法,然后就可以使用账号guest,密码guest进行登录了
1 2 3 4
| vim rabbitmq_server-3.7.8/ebin/rabbit.app
将:{loopback_users, [<<”guest”>>]}, 改为:{loopback_users, [guest]},
|
在3.8版本以后,就不能使用上述方法了,因为都没有 rabbit.app 文件了,这个时候可以通过新添加一个用户账号来解决。
1 2 3 4 5
| rabbitmqctl list_users rabbitmqctl add_user zhang 123 rabbitmqctl set_user_tags zhang administrator rabbitmqctl set_permissions -p "/" zhang ".*" ".*" ".*" rabbitmqctl list_user_permissions zhang
|
当配置好新用户后,就可以使用新用户的账号和密码进行管理界面的登录了,不需要重启服务

RabbitMQ 的其他命令
1 2
| rabbitmq-server -detached rabbitmqctl stop
|
2.2 Docker 安装
拉取 RabbitMQ 镜像(其中标签带 management 的,表示启动 Rabbitmq 后可以打开 web 管理界面,否则不行)
1
| sudo docker pull rabbitmq:3.8.15-management
|
查看镜像
创建挂载(映射)目录(文件夹路径可自己选,后面对应上就行)
1
| mkdir -p /home/zhang/rabbitmq
|
创建容器 rabbitmq
1 2 3 4 5 6 7 8
| sudo docker run -d --name rabbitmq --hostname mq1 -p 5672:5672 -p 15672:15672 -v /home/zhang/rabbitmq:/var/lib/rabbitmq -h myRabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3.8.15-management
|
浏览器输入http://ip:15672即可访问 Rabbitmq 的 web 管理页面,用户名密码是-e参数所指定的用户名和密码。
3. Java 操作 RabbitMQ
创建一个 maven 项目,并添加 rabbitmq 的依赖。
1 2 3 4 5 6 7 8 9
| <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.11.0</version> </dependency> </dependencies>
|
3.1. Hello World
RabbitMQ 是消息代理:它接受并转发消息。可以将其视为邮局,将要发布的邮件放在邮箱中时,而邮局可以确保最终将邮件传递给相应收件人。
下面是 RabbitMQ 最简单的一种模式,“ P”表示生产者,“ C”是表示消费者。中间的框是一个队列 Queue,即保留的消息缓冲区。
- 生产仅意味着发送。发送消息的程序是生产者:
- 消费与接收具有相似的含义。一个消费者是一个程序,主要是等待接收信息:
- 队列是 RabbitMQ 内部的邮政信箱的名称。尽管消息流经 RabbitMQ,但它只能存储在队列中。队列仅由主机的存储器&磁盘限制约束,它本质上是一个大的消息缓冲器。许多生产者可以发送进入一个队列的消息,许多消费者可以尝试从一个队列接收数据。
下面将将用Java编写两个程序。一个是生产者,用于发送消息;一个是消费者,用于接收消息。
3.1.1. 编写生产者代码
创建一个类 com.tju.producer.Producer_HelloWorld
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| public class Producer_HelloWorld { public static void main(String[] args) throws IOException,TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("tju_virtual"); factory.setUsername("zhang"); factory.setPassword("123"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
channel.queueDeclare("hello", false, false, false, null); String message = "Hello World!";
channel.basicPublish("", "hello", null, message.getBytes(StandardCharsets.UTF_8)); System.out.println(" [x] Sent '" + message + "'");
channel.close(); connection.close(); } }
|
3.1.2. 编写消费者代码
创建一个类 com.tjuconsumer.consumer_HelloWorld
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| public class Consumer_HelloWorld { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("tju_virtual"); factory.setUsername("zhang"); factory.setPassword("123"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
channel.queueDeclare("hello", false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel){
@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("consumerTag:" + consumerTag); System.out.println("Exchange:" + envelope.getExchange()); System.out.println("RoutingKey:" + envelope.getRoutingKey()); System.out.println("properties:" + properties); System.out.println("body:" + new String(body)); } };
channel.basicConsume("hello", true, consumer); } }
|
3.1.3. 执行
执行消费者和生产者两个代码(顺序无所谓),然后就会发现,生产者发送的消息,被消费者所接收了。

3.2. Work Queues
在 Hello World 模式下,一个队列只用于特定的一个生产者和一个消费者。在工作队列(Work queues)模式中,我们将创建一个工作队列,该队列将用于在多个工作人员之间分配耗时的任务。
在此模式下,生产者发送的多个消息,可以有多个消费者依次接收(但一个消息只有一个人能接收)。
应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
下面将将用Java编写三个程序。一个是生产者,用于发送消息;两个是消费者,用于接收消息。
3.2.1. 编写生产者代码
创建一个类com.tju.producer.Producer_WorkQueues
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| public class Producer_WorkQueues { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("tju_virtual"); factory.setUsername("zhang"); factory.setPassword("123"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
channel.queueDeclare("work_queues", false, false, false, null); for (int i = 1; i <= 10; i++) { String message = "【" + i + "】 Hello World!";
channel.basicPublish("", "work_queues", null, message.getBytes(StandardCharsets.UTF_8)); System.out.println(" [x] Sent '" + message + "'"); }
channel.close(); connection.close(); } }
|
3.2.2. 编写消费者代码
创建第一个类com.tjuconsumer.Consumer_WorkQueues1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| public class Consumer_WorkQueues1 { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("tju_virtual"); factory.setUsername("zhang"); factory.setPassword("123"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("work_queues", false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:" + new String(body)); } };
channel.basicConsume("work_queues", true, consumer);
} }
|
创建第二个类com.tjuconsumer.Consumer_WorkQueues2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| public class Consumer_WorkQueues2 { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("tju_virtual"); factory.setUsername("zhang"); factory.setPassword("123"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("work_queues", false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:" + new String(body)); } };
channel.basicConsume("work_queues", true, consumer); } }
|
3.2.3. 执行
首先执行两个消费者代码,接着执行生产者代码,然后就会发现,生产者发送的消息,被两个消费者所依次接收了。
第一个消费者输出的内容

第二个消费者输出的内容

3.3. Publish/Subscribe
在 Work queues 模式下,每个消息都恰好交付给一个消费者。在发布/订阅(Publish/Subscribe)模式中,我们将同一个消息可以传达给多个消费者。
在此模式下,生产者发送的单个消息,可以有多个消费者进行接收。
相比于之前的模式,发布/订阅模式多了一个交换机(Exchange),由下图中的X表示。生产者不再直接把消息发送给队列,而是发送给交换机。交换机可以通过与某个队列绑定,会把从生产者获得的消息传递给相应满足规则(路由key)的队列。
- 生产者:也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
- 消费者:消息的接受者,会一直等待消息到来。
- 消息队列用于接收消息、缓存消息、发送消息。
- 交换机:图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
Fanout:广播,将消息交给所有绑定到交换机的队列
Direct:定向,把消息交给符合指定routing key 的队列
Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
具体流程为:
发布订阅模式与工作队列模式的区别
1、工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机。
2、发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)。
3、发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑 定到默认的交换机 。
下面将将用Java编写三个程序。一个是生产者,用于发送消息;两个是消费者,用于接收消息。
3.3.1. 编写生产者代码
创建一个类com.tju.producer.Producer_PubSub
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| public class Producer_PubSub { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("tju_virtual"); factory.setUsername("zhang"); factory.setPassword("123"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
channel.exchangeDeclare("test_fanout", BuiltinExchangeType.FANOUT,true,false,false,null); channel.queueDeclare("test_fanout_queue1",true,false,false,null); channel.queueDeclare("test_fanout_queue2",true,false,false,null); channel.queueBind("test_fanout_queue1","test_fanout",""); channel.queueBind("test_fanout_queue2","test_fanout",""); for (int i = 0; i < 10 ; i++) { String message = "这是日志信息【" + i + "】"; channel.basicPublish("test_fanout","",null,message.getBytes(StandardCharsets.UTF_8)); } channel.close(); connection.close(); } }
|
3.3.2. 编写消费者代码
创建第一个类 com.tjuconsumer.Consumer_PubSub1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| public class Consumer_PubSub1 { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("tju_virtual"); factory.setUsername("zhang"); factory.setPassword("123"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1接收:" + new String(body)); } }; channel.basicConsume("test_fanout_queue1", true, consumer); } }
|
创建第二个类 com.tjuconsumer.Consumer_PubSub2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public class Consumer_PubSub2 { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("tju_virtual"); factory.setUsername("zhang"); factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2接收:" + new String(body)); } };
channel.basicConsume("test_fanout_queue2", true, consumer); } }
|
3.3.3. 执行
首先执行两个消费者代码,接着执行生产者代码,然后就会发现,生产者发送的消息,被两个消费者均接收了。
第一个消费者输出的内容

第二个消费者输出的内容

3.4. Routing
路由模式特点:
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个
RoutingKey(路由key)
- 消息的发送方在向 Exchange发送消息时,也必须指定消息的
RoutingKey。
- Exchange不再把消息交给每一个绑定的队列,而是根据消息的
Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息。
- P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
- X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
- C1:消费者,其所在队列指定了需要routing key 为 error 的消息
- C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息
具体流程为:
下面将将用Java编写三个程序。一个是生产者,用于发送消息;两个是消费者,用于接收消息。
在编码上与 Publish/Subscribe发布与订阅模式 的区别是交换机的类型为:Direct,还有队列绑定交换机的时候需要指定routing key。
3.4.1. 编写生产者代码
创建一个类com.tju.producer.Producer_Routing
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| public class Producer_Routing { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("tju_virtual"); factory.setUsername("zhang"); factory.setPassword("123"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
channel.exchangeDeclare("test_direct", BuiltinExchangeType.DIRECT,true,false,false,null); channel.queueDeclare("test_direct_queue1",true,false,false,null); channel.queueDeclare("test_direct_queue2",true,false,false,null); channel.queueBind("test_direct_queue1","test_direct","error"); channel.queueBind("test_direct_queue2","test_direct","info"); channel.queueBind("test_direct_queue2","test_direct","error"); channel.queueBind("test_direct_queue2","test_direct","warning"); String message = "这是一条日志信息";
channel.basicPublish("test_direct","info",null,message.getBytes(StandardCharsets.UTF_8)); channel.close(); connection.close(); } }
|
3.4.2. 编写消费者代码
创建第一个类com.tjuconsumer.Consumer_Routing1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public class Consumer_Routing1 { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("tju_virtual"); factory.setUsername("zhang"); factory.setPassword("123"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:" + new String(body)); System.out.println("将日志信息打印到控制台..."); } }; channel.basicConsume("test_direct_queue1", true, consumer); } }
|
创建第二个类com.tjuconsumer.Consumer_Routing2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public class Consumer_Routing2 { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("tju_virtual"); factory.setUsername("zhang"); factory.setPassword("123"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:" + new String(body)); System.out.println("将日志信息保存到磁盘..."); } }; channel.basicConsume("test_direct_queue2", true, consumer); } }
|
3.4.3. 执行
首先执行两个消费者代码,接着执行生产者代码,然后就会发现,由于我们发送消息时,绑定的规则为info,只与队列2的规则匹配,而与队列1的规则不匹配,从而消息只会发送到队列2,不会发送到队列1,因此消费者1不能收到消息,消费者2可以收到消息。
第一个消费者输出的内容

第二个消费者输出的内容

3.5. Topics
Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:
举例:
item.#:能够匹配item.insert.abc 或者 item.insert
item.*:只能匹配item.insert

具体流程为:
下面将将用Java编写三个程序。一个是生产者,用于发送消息;两个是消费者,用于接收消息。
3.4.1. 编写生产者代码
创建一个类com.tju.producer.Producer_Topics
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| public class Producer_Topics { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("tju_virtual"); factory.setUsername("zhang"); factory.setPassword("123"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
channel.exchangeDeclare("test_topic", BuiltinExchangeType.TOPIC,true,false,false,null); channel.queueDeclare("test_topic_queue1",true,false,false,null); channel.queueDeclare("test_topic_queue2",true,false,false,null); channel.queueBind("test_topic_queue1","test_topic","#.error"); channel.queueBind("test_topic_queue1","test_topic","order.#"); channel.queueBind("test_topic_queue2","test_topic","#.#"); String message = "这是一条日志信息";
channel.basicPublish("test_topic","x",null,message.getBytes(StandardCharsets.UTF_8)); channel.close(); connection.close(); } }
|
3.4.2. 编写消费者代码
创建第一个类com.tjuconsumer.Consumer_Topics1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public class Consumer_Topics1 { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("tju_virtual"); factory.setUsername("zhang"); factory.setPassword("123"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:" + new String(body)); System.out.println("将日志信息打印到控制台..."); } }; channel.basicConsume("test_topic_queue1", true, consumer); } }
|
创建第二个类com.tjuconsumer.Consumer_Topics2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public class Consumer_Topics2 { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("tju_virtual"); factory.setUsername("zhang"); factory.setPassword("123"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:" + new String(body)); System.out.println("将日志信息保存到磁盘..."); } }; channel.basicConsume("test_topic_queue2", true, consumer); } }
|
3.4.3. 执行
首先执行两个消费者代码,接着执行生产者代码,然后就会发现,由于我们发送消息时,绑定的规则为x,只与队列2的规则匹配,而与队列1的规则不匹配,从而消息只会发送到队列2,不会发送到队列1,因此消费者1不能收到消息,消费者2可以收到消息。
第一个消费者输出的内容

第二个消费者输出的内容

4. SpringBoot 整合 RabbitMQ
前面已经介绍了 Java 代码如何操作 RabbitMQ了,但是可以发现,代码非常繁琐,可以利用 SpringBoot 直接整合 RabbitMQ 简化操作,提升开发效率。
4.1 Hello World
一个生产者,一个消费者。
Hello World 是 RabbitMQ 最简单的一种模式,“ P”表示生产者,“ C”是表示消费者。中间的框是一个队列 Queue,即保留的消息缓冲区。
- 生产仅意味着发送。发送消息的程序是生产者:
- 消费与接收具有相似的含义。一个消费者是一个程序,主要是等待接收信息:
- 队列是 RabbitMQ 内部的邮政信箱的名称。尽管消息流经 RabbitMQ,但它只能存储在队列中。队列仅由主机的存储器&磁盘限制约束,它本质上是一个大的消息缓冲器。许多生产者可以发送进入一个队列的消息,许多消费者可以尝试从一个队列接收数据。
生产者发送消息
引入依赖
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
在 application.properties 中配置 rabbitmq
1 2 3 4 5 6
| spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.virtual-host=tju_virtual spring.rabbitmq.username=zhang spring.rabbitmq.password=123
|
创建消息队列(也可以在rabbitmq web界面直接创建)
1 2 3 4 5 6 7 8 9 10
| @Configuration public class RabbitmqConfig { public static final String HELLO_WORLD = "hello.world"; @Bean() public Queue helloWorldQueue(){ return QueueBuilder.durable(HELLO_WORLD).build(); } }
|
使用 springboot 提供的 RabbitmqTemplate 进行消息的发送与接收
1 2 3 4 5 6 7 8 9 10 11 12 13
| @SpringBootTest public class SpringAmqpTest { @Autowired private RabbitTemplate rabbitTemplate;
@Test public void testHelloWorld(){ String queue = "hello.world"; String message = "Hello RabbitTemplate!"; rabbitTemplate.convertAndSend(queue,message); } }
|
消费者接收消息
引入依赖、在 application.properties 中配置 rabbitmq。
监听消息,新建一个类注入到 IOC 中,并使用 @RabbitListener 注解指定监听的队列。
1 2 3 4 5 6 7 8 9 10 11
| @Component public class RabbitmqListener {
@RabbitListener(queues = "hello.world") public void helloWorldListener(String message){ System.out.println("消费者接收到的消息为:" + message); } }
|
启动 SpringBoot 服务即可。
注意:消息一旦消费就会从队列删除,RabbitMQ 没有消息回溯功能
4.2 Work Queues
一个生产者,多个消费者,且一条消息只能被一个消费者所消费。
在 Hello World 模式下,一个队列只用于特定的一个生产者和一个消费者。在工作队列(Work queues)模式中,我们将创建一个工作队列,该队列将用于在多个工作人员之间分配耗时的任务。
在此模式下,生产者发送的多个消息,可以有多个消费者依次接收(但一个消息只有一个人能接收)。
应用场景:对于 任务过重或任务较多情况,可能生产消息的速度会 远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理,此时使用工作队列可以提高任务处理的速度。
生产者发送消息
在配置类中创建队列
1 2 3 4 5
| public static final String WORK_QUEUES ="work.queues"; @Bean() public Queue workQueuesQueue(){ return QueueBuilder.durable(WORK_QUEUES).build(); }
|
使用 RabbitTemplate 发送消息【和 Hello World 模式一致】
1 2 3 4 5 6 7 8 9
| @Test public void testWorkQueues(){ String queue = "work.queues"; String message = "Hello RabbitTemplate,Message_"; for(int i=1;i<=50;i++){ rabbitTemplate.convertAndSend(queue,message + i); } }
|
消费者接收消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
|
@RabbitListener(queues = "work.queues") public void workQueuesListener1(String message) throws InterruptedException { System.err.println("消费者1接收到的消息为:" + message); Thread.sleep(20); }
@RabbitListener(queues = "work.queues") public void WorkQueuesListener2(String message) throws InterruptedException { System.out.println("消费者2接收到的消息为:" + message); Thread.sleep(200); }
|
此时会发现,消费者 1 和消费者 2 的消息处理能力虽然不同,却消费同样多的消息,也就是说消息被『平均消费』了。
然而由于不同的消费者有不同的处理速度,处理速度快的应该处理更多的消息,我们可以通过添加以下配置达到这种『能者多劳』的效果,此时消费者 1 和消费者 2 处理的消息数量将不同。
1 2
| spring.rabbitmq.listener.simple.prefetch=1
|
4.3 Publish/Subscribe
生产者发送的单个消息,所有消费者都进行接收。
在 Work queues 模式下,每个消息都恰好交付给一个消费者。在发布/订阅(Publish/Subscribe)模式中,我们将同一个消息可以传达给多个消费者。
相比于之前的模式,发布/订阅模式多了一个交换机(Exchange),由下图中的X表示。生产者不再直接把消息发送给队列,而是发送给交换机。交换机可以通过与某个队列绑定,会把从生产者获得的消息传递给相应满足规则(路由key)的队列。
- 生产者:也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
- 消费者:消息的接受者,会一直等待消息到来。
- 消息队列用于接收消息、缓存消息、发送消息。
- 交换机:图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
Fanout:广播,将消息交给所有绑定到交换机的队列
Direct:定向,把消息交给符合指定 routing key 的队列
Topic:通配符,把消息交给符合 routing pattern(路由模式) 的队列
具体流程为:
消费者发送消息
在配置类中创建队列和交换机,并进行绑定
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public static final String FANOUT_QUEUE1 ="fanout.queue1"; public static final String FANOUT_QUEUE2 ="fanout.queue2";
public static final String FANOUT_EXCHANGE ="fanout.exchange";
@Bean() public Queue fanoutQueue1(){ return QueueBuilder.durable(FANOUT_QUEUE1).build(); } @Bean() public Queue fanoutQueue2(){ return QueueBuilder.durable(FANOUT_QUEUE2).build(); }
@Bean public FanoutExchange fanoutExchange(){ return ExchangeBuilder.fanoutExchange(FANOUT_EXCHANGE).durable(true).build(); }
@Bean public Binding bindingQueue1(@Qualifier("fanoutQueue1") Queue queue, @Qualifier("fanoutExchange") FanoutExchange exchange){ return BindingBuilder.bind(queue).to(exchange); } @Bean public Binding bindingQueue2(@Qualifier("fanoutQueue2") Queue queue, @Qualifier("fanoutExchange") FanoutExchange exchange){ return BindingBuilder.bind(queue).to(exchange); }
|
使用 RabbitTemplate 发送消息【和 Hello World 模式一致】
1 2 3 4 5 6 7 8 9
| @Test public void testFanoutExchange(){ String exchange = "fanout.exchange"; String message = "Hello RabbitTemplate!,Message_"; for(int i=1;i<=50;i++){ rabbitTemplate.convertAndSend(exchange,"",message + i); } }
|
消费者接收消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
|
@RabbitListener(queues = "fanout.queue1") public void fanoutQueueListener1(String message) throws InterruptedException { System.err.println("消费者1接收到的消息为:" + message); }
@RabbitListener(queues = "fanout.queue2") public void fanoutQueueListener2(String message) throws InterruptedException { System.out.println("消费者2接收到的消息为:" + message); }
|
此时消费者 1 和消费者 2 均接收到生产者发送的所有消息。
4.4 Routing
生产者发送的单个消息,可以绑定多个消费者进行接收。
路由模式特点:
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个
RoutingKey(路由key),因此成为路由模式。
- 消息的发送方在向 Exchange 发送消息时,也必须指定消息的
RoutingKey。
- Exchange 不再把消息交给每一个绑定的队列,而是根据消息的
Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息。
- P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
- X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
- C1:消费者,其所在队列指定了需要routing key 为 error 的消息
- C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息
具体流程为:
我们可以向之前那样,先在配置类中声明队列、交换机、绑定等信息,不过还有一种更简单的写法,直接在消费者的监听类中,在 @RabbitListener 注解中声明各种信息!
消费者接收消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
|
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue1"), // 队列名称 exchange = @Exchange(name = "direct.exchange", type = ExchangeTypes.DIRECT), // 交换机名称和类型 key = {"user"}) // key 可以写多个,用逗号分隔开 ) public void directQueueListener1(String message) throws InterruptedException { System.out.println("用户接收到的消息为:" + message); }
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue2"), // 队列名称 exchange = @Exchange(name = "direct.exchange", type = ExchangeTypes.DIRECT), // 交换机名称和类型 key = {"admin"}) // key 可以写多个,用逗号分隔开 ) public void directQueueListener2(String message) throws InterruptedException { System.out.println("管理员接收到的消息为:" + message); }
|
生产者发送消息
1 2 3 4 5 6 7 8 9
| @Test public void testDirectExchange(){ String exchange = "direct.exchange"; String message2User = "这条消息只有用户可以收到!"; String message2Admin = "这条消息只有管理员可以收到!"; rabbitTemplate.convertAndSend(exchange,"user",message2User); rabbitTemplate.convertAndSend(exchange,"admin",message2Admin); }
|
此时不同消费者只能接收到符合所绑定key的消息。
4.5 Topics
生产者发送的单个消息,可以绑定多个消费者(支持通配符)进行接收。
Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类Exchange可以让队列在绑定Routing key 的时候使用通配符!
Routingkey 一般都是有一个或多个单词组成,多个单词之间以点分割,例如: item.insert
通配符规则:
举例:
item.#:能够匹配item.insert.abc 或者 item.insert
item.*:只能匹配item.insert

具体流程为:
消费者接收消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
|
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue1"), // 队列名称 exchange = @Exchange(name = "topic.exchange", type = ExchangeTypes.TOPIC), // 交换机名称和类型 key = {"china.*"}) // key 可以写多个,并且支持通配符写法!!用逗号分隔开 ) public void topicQueueListener1(String message) throws InterruptedException { System.out.println("收到前缀为china的消息:" + message); }
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue2"), // 队列名称 exchange = @Exchange(name = "topic.exchange", type = ExchangeTypes.TOPIC), // 交换机名称和类型 key = {"#.videos"}) // key 可以写多个,并且支持通配符写法!!用逗号分隔开 ) public void topicQueueListener2(String message) throws InterruptedException { System.out.println("收到后缀为videos的消息:" + message); }
|
生产者发送消息
1 2 3 4 5 6 7 8 9 10 11
| @Test public void testTopicExchange(){ String exchange = "topic.exchange"; String message1 = "消息为:china.people"; String message2 = "消息为:us.videos"; String message3 = "消息为:china.videos"; rabbitTemplate.convertAndSend(exchange,"china.people",message1); rabbitTemplate.convertAndSend(exchange,"us.videos",message2); rabbitTemplate.convertAndSend(exchange,"china.videos",message3); }
|
4.6. 模式总结
RabbitMQ工作模式:
1、简单模式 HelloWorld
一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)
2、工作队列模式 Work Queue
一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)
3、发布订阅模式 Publish/Subscribe
需要设置类型为fanout的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列
4、路由模式 Routing
需要设置类型为 direct 的交换机,交换机和队列进行绑定,并且指定routing key,当发送消息到交换机后,交换机会根据routing key 将消息发送到对应的队列
5、通配符模式 Topics
需要设置类型为topic的交换机,交换机和队列进行绑定,并且指定通配符方式的 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列
笼统的说,RabbitMQ工作模式分为两类
一类是直接使用队列,包含Hello World 和 Work Queues 模式,不同的在于消费者数量而已。
另一类使用交换机,可以使得不同消费者消费相同或者不同的消息,包含 Publish/Subscribe、Routing、Topics模式,不同的是交换机将消息发送到队列的方式不同,Pushlish/Subscribe 采用广播形式,所有队列都存有消息;Routing 采用路由形式,会根据 routing key,将消息发送到指定的队列;Topics 和 Routing 基本一致,只不过 routing key支持通配符而已!