MQ 面向的问题
1、分布式架构下的服务之间相互调用,在某些场景下不需要立刻返回结果,同步操作影响性能;
2、服务之间的耦合度过高,某个服务宕机会导致整个程序不可用。
常用 MQ 比较
市面上常用的 MQ 有 ActiveMQ、RocketMQ、Kafka、RabbitMQ。
- 语言支持:ActiveMQ、RocketMQ 只支持 Java 语言,Kafka(底层 Scala) 和 RabbitMQ(底层 Erlang)支持多种语言。
- 效率:ActiveMQ、RocketMQ、Kafka 都是毫秒级别的,RabbitMQ 是微秒级别的。
- 消息丢失、重复问题:RabbitMQ 针对消息的持久化和重复问题有比较成熟的解决方案。
RabbitMQ 严格遵守 AMQP 协议(高级消息队列协议),可以帮助我们在进程之间传递异步消息。
RabbitMQ的使用
新手也能看懂,消息队列其实很简单
windows10环境下的RabbitMQ安装步骤(图文)
RabbitMQ 架构
RabbitMQ 基本架构图:

上图中出现的名词的释义:
- Publisher(生产者):生产、发布消息到 Exchange。
- Exchange(交换机):将从 Publisher 接受到的消息转发到相应的 Queue。
- Routes(路由):Exchange 转发消息的策略(即对消息进行过虑)。
- Queue(队列):存储 Exchange 转发的消息,并转发给相应的 Consumer。
- Consumer(消费者):接收、消费 Queue 中的消息。
RabbitMQ 详细架构图:

RabbitMQ与AMQP协议详解
RabbitMQ 工作模式
RabbitMQ 常用的工作模式有以下五种:

RabbitMQ Tutorials
RabbitMQ的六种工作模式
Hello World
收发模式:一个生产者,一个默认交换机,一个队列,一个消费者。
1、Publisher 生产消息,将消息放入默认 Exchange,Exchange 将消息转发到指定的 Queue。
2、Consumer 监听 Queue,如果有消息就消费掉。

下面分别从 Publisher 和 Consumer 的角度展示 RabbitMQ 的具体用法。
一、Publisher 发布消息
1、建立 Connection:
public class RabbitMQClient {
public static Connection getConnection() {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("test");
factory.setPassword("test");
factory.setVirtualHost("/test");
Connection connection = null;
try {
connection = factory.newConnection();
} catch (Exception e) {
e.printStackTrace();
}
return connection;
}
}
2、创建 Channel:
Connection connection = getConnection();
Channel channel = connection.createChannel();
3、发布消息到 Exchange:
// 参数1(String exchange):指定exchange,""表示使用默认的exchange
// 参数2(String routingKey):指定路由规则,这里使用具体的队列名称
// 参数3(BasicProperties props):指定发布消息所携带的属性
// 参数4(byte[] body):指定发布的具体消息
channel.basicPublish("", "helloworld", null, "Hello-World!".getBytes());
4、释放资源:
// 和JDBC一样,从小到大依次关闭
channel.close();
connection.close();
二、Consumer 消费消息
1、获取 Connection、创建 Channel:
Connection connection = getConnection();
Channel channel = connection.createChannel();
2、声明 Queue:
// 参数1(String queue):指定队列名称
// 参数2(boolean durable):当前队列是否需要持久化
// 参数3(boolean exclusive):是否排外。设置为true有两个效果:1、connection.close()————当前队列会自动删除
// 2、当前队列只能被一个消费者消费
// 参数4(boolean autoDelete):若该队列没有消费者消费,队列自动删除
// 参数5(Map<String, Object> arguments):当前队列的其他信息
channel.queueDeclare("helloworld",true,false,false,null);
3、监听 Queue:
// 参数1(String queue):指定消费哪个队列
// 参数2(boolean autoAck):是否自动ACK
// 设置为true表示Consumer接收到消息后,告知RabbitMQ将该消息已消费,RabbitMQ将该消息从队列中删除
// 参数3(Consumer callback):指定消费回调
channel.basicConsume("helloworld", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body) throws IOException {
// do something....
}
});
4、释放资源:
channel.close();
connection.close();
RabbitMQ队列、交换器和绑定的操作
Work queues
工作队列模式(资源竞争):一个生产者,一个默认交换机,一个队列,多个消费者。
适用场景:验证码、邮件。
1、Publisher 生产消息,将消息放入默认 Exchange,Exchange 将消息转发到指定的 Queue。
2、多个 Consumer 监听同一个 Queue。

默认情况下,RabbitMQ 会将 Queue 中的消息平均分配给多个 Consumer 去消费。但是在实际情况中,不同服务器的性能可能是不一样的,因此需要让多个 Consumer 根据自身的实际消费的能力去获取消息。之前的代码中,我们将 basicConsume() 的 autoAck 参数设为 true,这表示 Consumer 接收到消息就会告知 RabbitMQ 消息被消费,此处需要修改为 Consumer 真正消费完消息后再告知 RabbitMQ 消息被消费。
这里会创建两个 Consumer 连接同一个 Queue,两个 Consumer 的代码中可以重复声明同一个队列,但是声明所使用的参数必须一致,否则会抛出异常。另外 Exchange 也是可以重复声明的,只要保证声明所使用的参数一致即可。
基于以上事实,需要修改的地方有:
1、指定 Consumer 缓存消息的数量。在实际使用 RabbitMQ 过程中,如果完全不配置 Qos,这样 RabbitMQ 会尽可能快速地发送队列中的所有消息到 Consumer。由于 Consumer 会在本地缓存所有的消息,从而极有可能导致服务器内存不足影响其它进程的正常运行。因此我们需要通过设置 Qos 的 prefetchCount(默认大小是 Integer.MAX_VALUE,这也就是为什么说不设置 Qos 很有可能会 OOM) 来控制 Consumer 的流量。
// 参数(int prefetchCount):预取数量
channel.basicQos(2);
2、设置 Consumer 不自动应答(autoAck = false),等实际消费完成后再应答:
channel.basicConsume("helloworld", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body) throws IOException {
// do something.....
// 参数1(long deliveryTag):发布的每一条消息都会获得一个唯一的 deliveryTag,
// 任何 channel 上发布的第一条消息的deliveryTag为1,此后的每一条消息都会加1,
// deliveryTag 在 channel范围内是唯一的。
// 参数2(boolean multiple):批量确认标志。
// true表示执行批量确认,此deliveryTag之前收到的消息全部进行确认;
// false则表示只对当前收到的消息进行确认.
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
RabbitMQ之Qos prefetch
RabbitMQ如何保证消息的可靠性?
Publish/Subscribe
发布订阅模式(资源共享):一个生产者,一个交换机,多个队列,多个消费者。
适用场景:公众号订阅。
1、Publisher 生产消息,将消息放入指定的 Exchange,Exchange 将消息转发到绑定的 Queue。
2、多个 Consumer 监听各自的 Queue。

使用发布订阅模式,同一条消息会被转发到多个队列中。由于这里需要指定 Exchange 的类型为 Publish/Subscribe,因此这里不能再使用默认的 Exchange。
需要修改的地方是:创建 FANOUT 类型的 Exchange,并将 Exchange 和 指定的 Queue 绑定:
// 参数1(String exchange):指定exchange
// 参数2(BuiltinExchangeType type):指定exchange类型
// FANOUT-Publish/Subscribe,DIRECT-Routing,TOPIC-Topics
channel.exchangeDeclare("pubsub-exchange", BuiltinExchangeType.FANOUT);
// 参数1(String queue):指定队列
// 参数2(String exchange):指定exchange
// 参数3(String routingKey):指定路由规则,FANOUT类型exchange不需要路由规则
channel.queueBind("pubsub-queue1", "pubsub-exchange","");
channel.queueBind("pubsub-queue2", "pubsub-exchange","");
// 修改 exchange 名称
channel.basicPublish("pubsub-exchange", "", null, "Hello-World!".getBytes());
Routing
路由模式:一个生产者,一个交换机,多个队列,多个消费者。
1、Publisher 生产消息,将消息放入指定的 Exchange,Exchange 根据路由规则,将消息转发到绑定的 Queue。
2、多个 Consumer 监听各自的 Queue。

需要修改的地方是:创建 DIRECT 类型的 Exchange,将 Exchange 和 Queue 绑定并设置路由规则:
// 指定Exchage类型为DIRECT
channel.exchangeDeclare("routing-exchange", BuiltinExchangeType.DIRECT);
// 绑定Queue,并为不同的Queue设置不同的路由规则
channel.queueBind("routing-queue-error", "routing-exchange","ERROR");
channel.queueBind("routing-queue-info", "routing-exchange","INFO");
// 使用不同的路由规则发送消息
channel.basicPublish("routing-queue-error", "ERROR", null, "ERROR1".getBytes());
channel.basicPublish("routing-queue-infoe", "INFO", null, "INFO1".getBytes());
Topics
主题模式(路由的一种):一个生产者,一个交换机,多个队列,多个消费者。
1、Publisher 生产消息,将消息放入指定的 Exchange,Exchange 根据路由规则,将消息转发到指定的绑定的 Queue。
2、多个 Consumer 监听各自的 Queue。

主题模式可以认为是拥有模糊功能的路由模式,使用占位符和通配符匹配主题。
需要修改的地方是:创建 TOPIC 类型的 Exchange,将 Exchange 和 Queue 绑定并设置主题匹配规则:
// 指定Exchage类型为TOPIC
channel.exchangeDeclare("topic-exchange", BuiltinExchangeType.TOPIC);
// 路由匹配规则: * 占位符 # 通配符
// 例如,动物主题: speed.color.what
// *.red.* -> 红色的动物
// fast.# -> 快的动物
// *.*.rabbit -> 兔子
channel.queueBind("routing-queue1", "topic-exchange","*.red.*");
channel.queueBind("routing-queue2", "topic-exchange","fast.#");
channel.queueBind("routing-queue2", "topic-exchange","*.*.rabbit");
// 发送同一主题的消息
channel.basicPublish("topic-exchange", "fast.red.monkey", null, "红快猴".getBytes());
channel.basicPublish("topic-exchange", "slow.black.dog", null, "黑慢狗".getBytes());
channel.basicPublish("topic-exchange", "fast.white.cat", null, "快白猫".getBytes());
RabbitMQ 交换机类型
纵观上述的几种工作模式,其实都是依赖于不同的交换机类型,所以交换机类型是工作模式的核心。
RabbitMQ 交换机主要有如下 4 种类型:
- Direct exchange:直连交换机,对应 Routing 工作模式;
- Fanout exchange:扇型交换机,对应 Publish/Subscribe 工作模式;
- Topic exchange:主题交换机,对应 Topic 工作模式;
- Headers exchange:头交换机,设置 Exchange 的类型为 BuiltinExchangeType.HEADERS 即可。
Direct exchange
直连型交换机是根据消息携带的路由键(routing key)将消息发送给对应队列的,步骤如下:
- 将一个队列绑定到某个交换机上,同时赋予该绑定一个路由键;
- 当一个携带着路由键为 RK 的消息被发送给直连交换机时,交换机会把它发送给绑定键同样为 RK 的队列。
Fanout exchange
扇型交换机将消息发送给绑定到它身上的所有队列。不同于直连交换机,路由键在此类型上不启任务作用。如果 N 个队列绑定到某个扇型交换机上,当有消息发送给此扇型交换机时,交换机会将消息的发送给这所有的 N 个队列。
Topic exchange
主题交换机中,队列通过路由键绑定到交换机上,然后交换机根据消息里的路由键,将消息发送给一个或多个绑定队列。
扇型交换机和主题交换机异同:
- 对于扇型交换机路由键是没有意义的,只要有消息,它都发送到它绑定的所有队列上;
- 对于主题交换机,路由规则由路由键决定,只有满足路由键的规则,消息才可以发送到对应的队列上。
类似主题交换机,但是头交换机使用多个消息属性来代替路由键建立路由规则。通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。
该交换机有个重要参数:”x-match”,
- 当 ”x-match” 为 “any” 时,消息头的任意一个值被匹配就可以满足条件;
- 当 ”x-match” 设置为 “all” 的时候,就需要消息头的所有值都匹配成功。
代码示例:
Dictionary<string, object> aHeader = new Dictionary<string, object>();
aHeader.Add("format", "pdf");
aHeader.Add("type", "report");
aHeader.Add("x-match", "all");
channel.QueueBind(queue: "queue.A", exchange: "agreements",
routingKey: string.Empty, arguments: aHeader);
其他交换机
除此上述 4 种类型的交换机之外,RabbitMQ 内部定义了一些交换机:
- 默认交换机:是 RabbitMQ 预先声明好的名字为 “” 的直连交换机,每个新建的队列都会自动绑定到默认交换机上,绑定的路由键和队列名称相同。
- amq.* exchanges:是 RabbitMQ 默认创建的交换机。这些队列名称被预留做 RabbitMQ 内部使用,不能被应用使用,否则抛出403 (ACCESS_REFUSED)错误。
还有一类特殊的交换机:Dead Letter Exchange(死信交换机)。死信交换机会用于处理 Consumer 不能处理到的消息,将这类消息重新发布到另外一个 Queue 中,等待重试或者人工干预。
RabbitMQ Exchange类型详解
中间件系列三 RabbitMQ之交换机的四种类型和属性
【RabbitMQ】一文带你搞定RabbitMQ死信队列
RabbitMQ 高可用
通过上面的介绍,我们已经了解了 RabbitMQ 的基本用法,但是在实际使用过程中必然会出现各种各样的问题。以下图为例,消息在 Publisher 到 Consumer 的传递过程中,都有可能出现问题:

1、Publisher 发送消息时,由于网络问题,导致消息没有发送到 RabbitMQ。在这种情况下, Publisher 需要重新发送消息,RabbitMQ 提供了事务操作和 Confirm 机制。
2、消息到达 RabbitMQ,还没发送给 Consumer,但是 RabbitMQ 宕机了,消息是否就丢失了?针对这种情况,RabbitMQ 提供了 Return 机制和持久化操作。
3、Consumer 在消费消息时,出现异常或者宕机了。因此在消费消息时,我们应该手动 ACK。手动 ACK 在 Work queues 已经介绍过了。
Confirm 机制
RabbitMQ 的事务保证消息 100% 传递,可以通过事务的回滚去记录日志,后面定时再次发送当前消息。但是事务操作效率太低。加入事务操作后,比平时的操作效率至少要慢 100 倍。这里就不再介绍事务了。
RabbitMQ 除了事务,还提供了 Confirm 机制。
1、普通 Confirm 方式。每发送一条消息,调用 waitForConfirms() 方法等待 RabbitMQ Confirm。实际上是一种串行化的 Confirm。
// 开启 Confirm
channel.confirmSelect();
channel.basicPublish("", "helloworld", null, "Hello World!".getBytes());
// 判断消息是否发送成功
if (channel.waitForConfirms()) {
System.out.println("消息发送成功");
} else {
System.out.println("消息发送失败");
// do something....
}
2、批量 Confirm 方式。每次发送一批消息后,调用 waitForConfirms() 方法等待 RabbitMQ Confirm。提高了 Confirm效率,但是如果一条消息出现问题,则这一批消息全部需要重发。
// 开启confirm
channel.confirmSelect();
// 批量发送消息
for (int i = 0; i < 10; i++) {
channel.basicPublish("", "helloworld", null, ("Hello World!" + i).getBytes());
};
// waitForConfirmsOrDie() 作用和 waitForConfirms() 一样。只是前者没有返回值,若有一个消息发送失败,
// 就全部失败,抛出异常。
channel.waitForConfirmsOrDie();
3、异步 Confirm 方式(推荐)。上面提到的两种 Confirm 机制,都会造成程序阻塞,而异步 Confirm 采用监听器的方式,不会出现程序的阻塞的情况。
// 开启 Confirm
channel.confirmSelect();
// 设置confirm监听器
channel.addConfirmListener(new ConfirmListener() {
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息发送成功");
}
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
// do something
}
});
// 批量发送消息
for (int i = 0; i < 10; i++) {
channel.basicPublish("", "helloworld", null, ("Hello World!" + i).getBytes());
};
深入学习RabbitMQ(三):channel的confirm模式
持久化
RabbitMQ 的持久化包含以下 3 个部分:
- Exchange 持久化:在声明时指定 durable 为 true。
- Queue 持久化:在声明时指定 durable 为 true。
- Message 持久化:在投递时指定 delivery_mode = 2(1是非持久化)。
Exchange 和 Queue 的持久化只能保证本身的元数据不会因异常而丢失,但是不能保证内部的 Message 不会丢失。要确保 Message 不丢失,还需要将 Message 也持久化。
除此之外,Exchange 并不能持久化 Message。也就是说即使 Exchange 和 Message 都持久化,如果 RabbitMQ 宕机了,那么 Exchange 中还未发送给 Queue 的消息也会丢失掉,只有 Queue 可以持久化 Message。
另外,如果 Exchange 和 Queue 都是持久化的,那么它们之间的 Binding 也是持久化的。因此,一旦确定了 Exchange 和 Queue 的 durable,就不能修改了。如果非要修改,唯一的办法就是删除原来的 Exchange 或 Queue 后,重现声明。
Publisher 发送持久化消息代码示例:
// MessageProperties.MINIMAL_PERSISTENT_BASIC 设置消息持久化属性
channel.basicPublish("queue1", "", MessageProperties.MINIMAL_PERSISTENT_BASIC,
"Hello World!".getBytes());
RabbitMQ之消息持久化
RabbitMQ 持久化
Return 机制
我们知道 Confirm 机制保证消息发送到 Exchange,但是无法保证 Exchange 将消息转发到 Queue,因此我们需要 Return 机制保证消息确实发送到了 Queue中。
Return 机制由 Publisher 发布消息时的两个标志位控制:
- mandatory:当 mandatory 标志位设置为 true 时,如果 Exchange 根据自身类型和消息 routeKey 无法找到一个符合条件的 Queue,那么会调用 basic.return 方法将消息返还给 Publisher;当 mandatory 设为 false 时,出现上述情形 RabbitMQ 会直接将消息扔掉。
- immediate:当 immediate 标志位设置为 true 时,如果 Exchange 在将消息转发到 Queue(s) 时发现对应的 Queue 上没有 Consumer,那么这条消息不会放入 Queue 中。当与消息 routeKey 关联的所有 Queue(一个或多个)都没有 Consumer 时,该消息会通过 basic.return 方法返还给给 Publisher。
RabbitMQ 3.0以后的版本里,去掉了 immediate 参数的支持,发送带 immediate 标记的 publish 会报错。
这里需要作出的修改是:1、为 channel 设置 Return 监听器;2、Publisher 发布消息时开启 mandatory。
// 设置Return监听器
channel.addReturnListener(new ReturnListener() {
// 消息没有送达才会调用
public void handleReturn(int arg0, String arg1, String arg2, String arg3,
BasicProperties arg4, byte[] arg5) throws IOException {
System.out.println(new String(arg5, "UTF-8") + "没有送达Queue");
}
});
// 参数1(String exchange):指定exchange,""表示使用默认的exchange
// 参数2(String routingKey):指定路由规则,这里使用具体的队列名称
// 参数3(boolean mandatory):是否开启 mandtory
// 参数4(BasicProperties props):指定传递消息所携带的属性
// 参数5(byte[] body):指定发布的具体消息
channel.basicPublish("", "no-exist-queue", true, null, ("Hello World!" + i).getBytes());
RabbitMQ之mandatory和immediate
消息重复消费
消息的消费一般分为两种:
- 幂等操作:同一条信息多次消费后得到的结果是一样的(如发送验证码);
- 非幂等操作:同一条消息只能消费一次,多次消费会得到不同的结果(如生成订单)。
对于非幂等操作,我们需要保证消息不能被重复消费。要做到这一点,首先我们要弄清楚出现重复消费的原因是什么?究其本质,就是因为 Consumer 没有给 RabbitMQ 返回一个 ack,导致 RabbitMQ 认为该 Consumer 没有成功消费消息,转而将消息让另一个 Consumer 消费。
针对这种情况,我们需要引入 Message-id 和 Redis 来解决。
整体思路是:Publisher 发布消息时,需要为该消息生成唯一的 Message-id。Consumer 在消费消息之前,先将消息的 Message-id 放到 Redis 中;消费完消息后,也要更新消息 Message-id 的状态。例如:
- Message-id:0 表示消息正在消费
- Message-id:1 表示消息消费成功
若该消费者由于种种原因没有给 RabbitMQ 返回 ack,那么 RabbitMQ 将消息交给其他 Consumer 消费时,其他 Consumer 需要先查看 Redis 中该 Message-id 的值。若 Message-id 不存在,则进行消息消费;若 Message-id 已经存在:
- Message-id 为 0,表示上一个 Consumer 还在消费,新的 Consumer 就什么都不做;
- Message-id 为 1,表示上一个 Consumer 消费完成,新的 Consumer 代替上一个 Consumer 返回 ack。
【RabbitMQ】保证消息的不重复消费
RabbitMQ系列(四)--消息如何保证可靠性传输以及幂等性 |