使用Java自带的定时任务TimeTask实现订单超时取消,但是有小伙伴提出这种实现,会有以下几个问题:线上服务挂了,导致服务下所有的定时任务失效。服务重启,定时任务也会失效。服务上线需要发布新的服务,原来服务也会关闭。针对上述服务挂了、或者服务重启导致消息失效的问题,需要使用独立于项目的服务,比如消息中间件,比如Redis或者RabbitMQ。本文主要讲解消息队列RabbitMQ。实现效果创建一个订单,超时30分钟未支付就取消订单。RabbitMQ本身是不支持延迟队列的,但可以利用RabbitMQ的存活时间 + 死信队列来实现消息延迟。TTL + DLX存活时间 TTLTTL全称为:time to live,意思为存活时间,当消息没有配置消费者,消息就一直停留在队列中,停留时间超过存活时间后,消息会被自动删除RabbitMQ支持两种TTL设置:对消息本身设置存活时间,每条消息的存活时间可以灵活设置为不同的存活时间。对传递的队列设置存活时间,每条传到到队列的过期时间都一致。当消息过期还没有被消费,此时消息会变成死信消息dead letter,这是实现延迟队列的关键。消息变为死信的条件:消息被拒绝basic.reject/basic.nack,并且requeue=false。消息的过期时间到期了。队列达到最大长度。死信交换机 DLX当上面的消息变成死信消息之后,它不会立即被删除,首先它要看有没有对应的死信交换机,如果有绑定的死信交换机,消息就会从发送到对应的死信交换机上。DLX全程为Dead Letter Exchanges,意思为死信交换机。死信交换机和普通交换机没什么区别,不同的是死信交换机会绑定在其他队列上,当队列的消息变成死信消息后,死信消息会发送到死信交换上。队列绑定死信交换机需要两个参数:x-dead-letter-exchange: 绑定的死信交换机名称。x-dead-letter-routing-key: 绑定的死信交换机routingKey。死信交换机和普通交换机的区别就是死信交换机的Exchange和routingKey作为绑定参数,绑定在其他队列上。项目实战消息发送的流程图:生产者将带有TTL的消息发送给交换机,由交换机路由到队列中。队列由于没有消费,消息一直停留在队列中,一直等到消息超时,变成死信消息。死信消息转发到死信交换机在路由到死信队列上,最后给消费者消费。创建死信队列@Configuration(5797812)public class DelayQueueRabbitConfig { // 下面是死信队列 /** * 死信队列 */ public static final String DLX_QUEUE = "queue.dlx"; /** * 死信交换机 */ public static final String DLX_EXCHANGE = "exchange.dlx"; /** * 死信routing-key */ public static final String DLX_ROUTING_KEY = "routingKey.dlx"; /** * 死信队列 * @return(756076230) */ @Bean(540609) public Queue dlxQueue() { return new Queue(DLX_QUEUE,true); } /** * 死信交换机 * @return(756076230) */ @Bean(540609) public DirectExchange dlxExchange() { return new DirectExchange(DLX_EXCHANGE,true,false); } /** * 死信队列和交换机绑定 * @return(756076230) */ @Bean(540609) public Binding bindingDLX() { return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(DLX_ROUTING_KEY); }}创建延迟队列,并绑定死信队列 // 下面的是延迟队列 /** * 订单延迟队列 */ public static final String ORDER_QUEUE = "queue.order"; /** * 订单交换机 */ public static final String ORDER_EXCHANGE = "exchange.order"; /** * 订单routing-key */ public static final String ORDER_ROUTING_KEY = "routingkey.order"; /** * 订单延迟队列 * @return(756076230) */ @Bean(540609) public Queue orderQueue() { Map<String,Object> params = new HashMap<>(); params.put("x-dead-letter-exchange", DLX_EXCHANGE); params.put("x-dead-letter-routing-key", DLX_ROUTING_KEY); return new Queue(ORDER_QUEUE, true, false, false, params); } /** * 订单交换机 * @return(756076230) */ @Bean(540609) public DirectExchange orderExchange() { return new DirectExchange(ORDER_EXCHANGE,true,false); } /** * 订单队列和交换机绑定 * @return(756076230) */ @Bean(540609) public Binding orderBinding() { return BindingBuilder.bind(orderQueue()).to(orderExchange()).with(ORDER_ROUTING_KEY); }绑定死信交换通过添加x-dead-letter-exchange、x-dead-letter-routing-key参数指定对应的交换机和路由。发送消息设置五秒超时时间@RestController(113091385)public class SendController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/dlx") public String dlx() { String date = DateUtil.dateFormat(new Date()); String delayTime = "5000"; System.out.println("【发送消息】延迟 5 秒 发送时间 " + date); rabbitTemplate.convertAndSend(DelayQueueRabbitConfig.ORDER_EXCHANGE,DelayQueueRabbitConfig.ORDER_ROUTING_KEY, message, message1 -> { message1.getMessageProperties().setExpiration(delayTime); return message1; }); return "ok"; } class DateUtil{ public static String dateFormat(Date date) { SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss"); return sdf.format(date); } } } 消费消息@RabbitListener(queues = DelayQueueRabbitConfig.DLX_QUEUE)public void delayPrecss(String msg,Channel channel,Message message){ System.out.println("【接收消息】" + msg + " 接收时间" + DateUtil.dateFormat(new Date()));} 控制台输出:【发送消息】延迟5 秒 发送时间 21:32:15【接收消息】延迟5 秒 发送时间 21:32:15 接收时间21:32:20发送消息,5秒之后消费者后会收到消息。说明延迟成功。队列都有先进先出的特点,如果队列前面的消息延迟比后的消息延迟更长,会出现什么情况。消息时序问题发送三条消息,延迟分别是10s、2s、5s:@RestController(113091385)public class SendController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/dlx") public String dlx() { dlxSend("延迟10秒","10000"); dlxSend("延迟2 秒","2000"); dlxSend("延迟5 秒","5000"); return "ok"; } private void dlxSend(String message,String delayTime) { System.out.println("【发送消息】" + message + "当前时间" + DateUtil.dateFormat(new Date())); rabbitTemplate.convertAndSend(DelayQueueRabbitConfig.ORDER_EXCHANGE,DelayQueueRabbitConfig.ORDER_ROUTING_KEY, message, message1 -> { message1.getMessageProperties().setExpiration(delayTime); return message1; }); }控制台输出:【发送消息】延迟10秒当前时间21:54:36【发送消息】延迟2 秒当前时间21:54:36【发送消息】延迟5 秒当前时间21:54:36【接收消息】延迟10秒 当前时间21:54:46【接收消息】延迟2 秒 当前时间21:54:46【接收消息】延迟5 秒 当前时间21:54:46所有的消息都要等10s的消息消费完才能消费,当10s消息未被消费,其他消息也会阻塞,即使消息设置了更短的延迟。因为队列有先进先出的特征,当队列有多条消息,延迟时间就没用作用了,前面的消息消费后,后的消息才能被消费,不然会被阻塞到队列中。插件实现解决消息时序问题针对上面消息的时序问题,RabbitMQ开发一个延迟消息的插件delayed_message_exchange,延迟消息交换机。使用该插件可以解决上面时序的问题。在Github官网找到对应的版本,我选择的是3.8.17:将文件下载下来放到服务器的/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.9/plugins目录下,执行以下命令,启动插件:rabbitmq-plugins enable rabbitmq_delayed_message_exchange启动插件,交换机会有新的类型x-delayed-message:x-delayed-message类型的交换机,支持延迟投递消息。发送消息给x-delayed-message类型的交换流程图:x-delayed-message类型的交换机接收消息投递后,并未将直接路由到队列中,而是存储到mnesia(一个分布式数据系统),该系统会检测消息延迟时间。消息达到可投递时间,消息会被投递到目标队列。配置延迟队列@Configuration(5797812)public class XDelayedMessageConfig { /** * 队列 */ public static final String DIRECT_QUEUE = "queue.delayed"; /** * 延迟交换机 */ public static final String DELAYED_EXCHANGE = "exchange.delayed"; /** * 绑定的routing key */ public static final String ROUTING_KEY = "routingKey.bind"; @Bean(540609) public Queue directQueue() { return new Queue(DIRECT_QUEUE,true); } /** * 定义延迟交换机 * 交换机的类型为 x-delayed-message * @return(756076230) */ @Bean(540609) public CustomExchange delayedExchange() { Map<String,Object> map = new HashMap<>(); map.put("x-delayed-type","direct"); return new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",true,false,map); } @Bean(540609) public Binding delayOrderBinding() { return BindingBuilder.bind(directQueue()).to(delayedExchange()).with(ROUTING_KEY).noargs(); }}发送消息: @GetMapping("/delay") public String delay() { delaySend("延迟队列10 秒",10000); delaySend("延迟队列5 秒",5000); delaySend("延迟队列2 秒",2000); return "ok"; } private void delaySend(String message,Integer delayTime) { message = message + " " + DateUtil.dateFormat(new Date()); System.out.println("【发送消息】" + message); rabbitTemplate.convertAndSend(XDelayedMessageConfig.DELAYED_EXCHANGE,XDelayedMessageConfig.ROUTING_KEY, message, message1 -> { message1.getMessageProperties().setDelay(delayTime); //message1.getMessageProperties().setHeader("x-delay",delayTime); return message1; }); } 消费消息: @RabbitListener(queues = XDelayedMessageConfig.DIRECT_QUEUE) public void delayProcess(String msg,Channel channel, Message message) { System.out.println("【接收消息】" + msg + " 当前时间" + DateUtil.dateFormat(new Date())); }控制台输出:【发送消息】延迟队列10 秒 22:00:01【发送消息】延迟队列5 秒 22:00:01【发送消息】延迟队列2 秒 22:00:01【接收消息】延迟队列2 秒 22:00:01 当前时间22:00:03【接收消息】延迟队列5 秒 22:00:01 当前时间22:00:05【接收消息】延迟队列10 秒 22:00:01 当前时间22:00:10解决了消息的时序问题。总结使用Java自带的延迟消息,系统重启或者挂了之后,消息就无法发送,不适于用在生产环境上。RabbitMQ本身不支持延迟队列,可以使用存活时间ttl + 死信队列dlx实现消息延迟。发送的消息设置ttl,所在的队列不设置消费者。队列绑定死信队列,消息超时之后,变成死信消息,再发送给死信队列,最后发送给消费者。发送多条不同延迟时间消息,前面消息没有到延迟时间,会阻塞后面延迟更低的消息,因为队列有先进先出的特性。RabbitMQ的x-delay-message插件可以解决消息时序问题。带有ttl的消息发送x-delayed-message类型的交换机,消息不会直接路由到队列中。而且存储到分布式数据系统中,该系统会检测消息延迟时间。消息到达延迟时间,消息才能会投递到队列中,最后发送给消费者。