延迟消息队列

1. JDK自带的延时队列

JDK中提供了一种延迟队列数据结构DelayQueue,其本质是封装了PriorityQueue,可以把元素进行排序。

jdk延迟队列
  1. 把订单插入DelayQueue中,以超时时间作为排序条件,将订单按照超时时间从小到大排序。
  2. 起一个线程不停轮询队列的头部,如果订单的超时时间到了,就出队进行超时处理,并更新订单状态到数据库中。
  3. 为了防止机器重启导致内存中的DelayQueue数据丢失,每次机器启动的时候,需要从数据库中初始化未结束的订单,加入到DelayQueue中。
  • 优点:简单,不需要借助其他第三方组件,成本低。

  • 缺点:

    • 所有超时处理订单都要加入到DelayQueue中,占用内存大。
    • 没法做到分布式处理,只能在集群中选一台leader专门处理,效率低。
    • 不适合订单量比较大的场景。

2.RabbitMQ的延时消息

RabbitMQ的延时消息主要有两个解决方案:

  • RabbitMQ Delayed Message Plugin

​ RabbitMQ Delayed Message Plugin是官方提供的延时消息插件,虽然使用起来比较方便,但是不是高可用的,如果节点挂了会导致消息丢失

  • 消息的TTL+死信Exchange

  • TTL:即消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL,如果对队列设置,则队列中所有的消息都具有相同的过期时间。超过了这个时间,我们认为这个消息就死了,称之为死信。

  • 死信Exchange(DLX):一个消息在满足以下条件会进入死信交换机

    • 一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。
    • TTL到期的消息。
    • 队列满了被丢弃的消息。
  • rabbitMq延时队列实现

  1. 定义一个BizQueue,用来接收死信消息,并进行业务消费。
  2. 定义一个死信交换机(DLXExchange),绑定BizQueue,接收延时队列的消息,并转发给BizQueue。
  3. 定义一组延时队列DelayQueue_xx,分别配置不同的TTL,用来处理固定延时5s、10s、30s等延时等级,并绑定到DLXExchange。
  4. 定义DelayExchange,用来接收业务发过来的延时消息,并根据延时时间转发到不同的延时队列中。
  • 优点:可以支持海量延时消息,支持分布式处理。

  • 缺点:

    • 不灵活,只能支持固定延时等级。
    • 使用复杂,要配置一堆延时队列。

引入文章: https://mp.weixin.qq.com/s/OmbyxkufVm-XzwIv_A514w

3. redis 延迟方案

3.1 redis key 过期监听

​ 定期删除(随机选取),惰性删除, 无法保证实时性。

​ Redis过期通知也是不可靠的,Redis在过期通知的时候,如果应用正好重启了,那么就有可能通知事件就丢了。要有其他方案兜底。

3.2 redis zset 方案

4 RocketMQ 方案

实现原理: 支持18个等级延迟队列,创建18个延迟对应内部topic,为每一个延迟队列开启延迟任务,到期写入真正的topic队列中

RocketMq延迟消息原理

5. 定时任务分布式批处理

定时任务分布式批处理解决方案,即通过定时任务不停轮询数据库的订单,将已经超时的订单捞出来,分发给不同的机器分布式处理:

RocketMq延迟消息原理

使用定时任务分布式批处理的方案具有如下优势:

  • 稳定性强:基于通知的方案(比如MQ和Redis),比较担心在各种极端情况下导致通知的事件丢了。使用定时任务跑批,只需要保证业务幂等即可,如果这个批次有些订单没有捞出来,或者处理订单的时候应用重启了,下一个批次还是可以捞出来处理,稳定性非常高。
  • 效率高:基于MQ的方案,需要一个订单一个定时消息,consumer处理定时消息的时候也需要一个订单一个订单更新,对数据库tps很高。使用定时任务跑批方案,一次捞出一批订单,处理完了,可以批量更新订单状态,减少数据库的tps。在海量订单处理场景下,批量处理效率最高。
  • 可运维:基于数据库存储,可以很方便的对订单进行修改、暂停、取消等操作,所见即所得。如果业务跑失败了,还可以直接通过sql修改数据库来进行批量运维。
  • 成本低:相对于其他解决方案要借助第三方存储组件,复用数据库的成本大大降低。

但是使用定时任务有个天然的缺点:没法做到精度很高。定时任务的延迟时间,由定时任务的调度周期决定。如果把频率设置很小,就会导致数据库的qps比较高,容易造成数据库压力过大,从而影响线上的正常业务。

所以一般需要抽离出超时中心和超时库来单独做订单的超时调度,在阿里内部,几乎所有的业务都使用基于定时任务分布式批处理的超时中心来做订单超时处理,SLA可以做到30秒以内:

阿里跑批定时任务方案