什么是延时队列,延时队列应用于什么场景
延时队列顾名思义,即放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费。
那么,为什么需要延迟消费呢?我们来看以下的场景
- 网上商城下订单后30分钟后没有完成支付,取消订单(如:淘宝、去哪儿网)
- 系统创建了预约之后,需要在预约时间到达前一小时提醒被预约的双方参会
- 系统中的业务失败之后,需要重试
这些场景都非常常见,我们可以思考,比如第二个需求,系统创建了预约之后,需要在预约时间到达前一小时提醒被预约的双方参会。那么一天之中肯定是会有很多个预约的,时间也是不一定的,假设现在有1点 2点 3点 三个预约,如何让系统知道在当前时间等于0点 1点 2点给用户发送信息呢,是不是需要一个轮询,一直去查看所有的预约,比对当前的系统时间和预约提前一小时的时间是否相等呢?这样做非常浪费资源而且轮询的时间间隔不好控制。如果我们使用延时消息队列呢,我们在创建时把需要通知的预约放入消息中间件中,并且设置该消息的过期时间,等过期时间到达时再取出消费即可。
Rabbitmq实现延时队列一般而言有两种形式:
第一种方式:利用两个特性: Time To Live(TTL)、Dead Letter Exchanges(DLX)
第二种方式:利用rabbitmq中的插件x-delay-message
利用TTL DLX实现延时队列的方式
TTL DLX是什么
TTL
RabbitMQ可以针对队列设置x-expires(则队列中所有的消息都有相同的过期时间)或者针对Message设置x-message-ttl(对消息进行单独设置,每条消息TTL可以不同),来控制消息的生存时间,如果超时(两者同时设置以最先到期的时间为准),则消息变为dead letter(死信)
Dead Letter Exchanges(DLX)
RabbitMQ的Queue可以配置x-dead-letter-exchange和x-dead-letter-routing-key(可选)两个参数,如果队列内出现了dead letter,则按照这两个参数重新路由转发到指定的队列。
x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange
x-dead-letter-routing-key:出现dead letter之后将dead letter重新按照指定的routing-key发送
Springboot集成rabbitmq实现第一种方式
在pom.xml文件中增加rabbitmq的依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
初始化queue exchange和queue及exchange之间的binding关系
Config.java
package com.example.demo.deadLetter;
import java.util.HashMap;
import java.util.Map;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.example.demo.Constants.Constants;
@Configuration
public class Config {
// 创建一个立即消费队列
@Bean
public Queue immediateQueue() {
// 第一个参数是创建的queue的名字,第二个参数是是否支持持久化
return new Queue(Constants.IMMEDIATE_QUEUE, true);
}
// 创建一个延时队列
@Bean
public Queue delayQueue() {
Map<String, Object> params = new HashMap<>();
// x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称,
params.put("x-dead-letter-exchange", Constants.IMMEDIATE_EXCHANGE);
// x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。
params.put("x-dead-letter-routing-key", Constants.IMMEDIATE_ROUTING_KEY);
return new Queue(Constants.DELAY_QUEUE, true, false, false, params);
}
@Bean
public DirectExchange immediateExchange() {
// 一共有三种构造方法,可以只传exchange的名字, 第二种,可以传exchange名字,是否支持持久化,是否可以自动删除,
//第三种在第二种参数上可以增加Map,Map中可以存放自定义exchange中的参数
return new DirectExchange(Constants.IMMEDIATE_EXCHANGE, true, false);
}
@Bean
public DirectExchange deadLetterExchange() {
// 一共有三种构造方法,可以只传exchange的名字, 第二种,可以传exchange名字,是否支持持久化,是否可以自动删除,
//第三种在第二种参数上可以增加Map,Map中可以存放自定义exchange中的参数
return new DirectExchange(Constants.DEAD_LETTER_EXCHANGE, true, false);
}
@Bean
//把立即消费的队列和立即消费的exchange绑定在一起
public Binding immediateBinding() {
return BindingBuilder.bind(immediateQueue()).to(immediateExchang#У3У{'V{;{.KnZ螊8^ik[#Юh&VcGG3r&&F6vF"&V&WFW&r"F&vWC&&FVE6vU.Knh2i#Ю[nZ螊8^Xz>Xb673&66У&R673''W6&6#ЧW&&&FVE6vUs#R禗 У&SУFcУnhz{X&&&FZ螊8^yNzУb673&66У&R673''W6&6#Ч7VF&&&FVE6vUs#R&F&FfW"Rv&SУFcУ&hУb673&66У&R673''W6&6#Ч&&&FvV&&&FVE6vUУ&SУFcУw>YcУ3&nh&&&FZKxk[#У3УV6rfУb673&66У&R673''W6f#Ч6vR6VРЦf6ЦfРЦ7&&v&RЦ7&&v&RVW#Ц7&&v&R7F6Ц7&&v&RWVSЦ7&&v6BFV7&&v6BFwW&FЦ6РФ6wW&FV&2672FV6rРТ[KKzXkhK&VТV&2VWVRVFFUVWVRТKKXiyGVWV^yNYXiniJK^XТ&WGW&WrVWVR6DDUTUG'VRТТ&VТV&27W7F6FVW6Тf7G&7BfwC&w26f7G&7BfwCТ&w2B'VVBR"&F&V7B"Т&WGW&Wr7W7F66TE'VVBW76vR"G'VRfR&w2ТТ&VТV&2&&Т&WGW&VW"VFFUVWVRFVW6F6DU&w2ТУFcУV6V"fУb673&66У&R673''W6f#Ч6vR6VРЦfBFFTfCЦfFFSРЦ7&&vW6WF7&&v&R76vSЦ7&&v&R76vU&67&&v&&B&R&&EFVFSЦ7&&v&Vf7FFFWFVCЦ7&&v7FW&VR'f6SРЦ6Ц6FV&РФ6W'f6PЧV&2672FV6V"РТWFV@Т&fFR&&&EFVFR&&&EFVFSРТV&2f6V&&FVFТ77FV&FVF"FVFТ6FFTfB6Fb6FFTfB'FB72"ТF2&&EFVFR'DV6TE6DU&76vU&6Т&FPТV&276vR&476vR76vR76vRF&W6WFТ76vRD76vU&FW2DFVFVFТ77FV&Fb&BFFR"FV6V"Т&WGW&W76vSТТУ&SУFcУV&V6VfW"fУb673&66У&R673''W6f#Ч6vR6VРЦ7&&v&&BF&&&CЦ7&&v&&BF&&D7FVЦ7&&v6BFwW&F7&&v7FW&VRCРЦ6Ц6FV&РФ6@ФV&&&@Ф6wW&FV&2672FV&V6VfW"РТ&&&D7FVVWVW26DDUTUТV&2fvWB&&Т77FV&&V6VfR"&ТУ&SУFcУ7BfУb673&66У&R673''W6f#Ч6vR6РЦfFFSРЦV7CЦV"FЦ7&&v&Vf7FFFWFVCЦ7&&v&FW7BB&W7CЦ7&&vFW7BBB&V#РЦ6VV6V#Ц6FV&РФ'VF7&V"72Ф7&W7@ЧV&2672&&&DFW7D6F7G2Т ТWFV@ТFV6V"FV6V#ТFW7@ТV&2fFW7CТ&&&Т&6WD&&"Т&6WD&.(NZТ&6WD&FFRТ&6WDF&VVТFV6V"&#ТУFcУjX[>K&&&&FZni{nNKNzxk[yNih~z[K{NZIyX[57&&&&F[ni{n^Z{J.zKK^XNih~zhn{~{XyNyX[>ih~z[ZJ~Z^YZIiJ |