dduo518 / hexo-blog

hexo静态blog点击 https://github.com/chong0808/hexo-blog/issues
3 stars 0 forks source link

延时队列 #75

Open dduo518 opened 1 year ago

dduo518 commented 1 year ago

延时队列

1688615206469

使用场景

延时队列是一种常见的数据结构,用于处理需要延迟执行的任务或事件。它常见的使用场景包括:

  1. 任务调度:延时队列可用于调度需要在未来特定时间执行的任务,如定时任务、定时提醒等。
  2. 消息队列:延时队列可用于处理具有延迟要求的消息,如消息的发送时间或处理时间有要求的场景。
  3. 缓存过期:延时队列可用于管理缓存中的数据过期,使得过期的数据可以在一定时间后被清除或更新。

分类

分布式延时队列

分布式延时队列是一种在分布式系统中处理延时任务的解决方案。它可以用于在多个节点上进行任务调度和执行,并确保任务按预期的延时时间执行。

实现分布式延时队列的一种常见方式是使用分布式消息队列。以下是一个基本的分布式延时队列的实现思路:

  1. 消息发布:将需要延时执行的任务作为消息发布到分布式消息队列中,并设置任务的延时时间。
  2. 消息消费:每个节点都有一个消费者进程,负责从消息队列中获取即将到期的任务。
  3. 延时任务处理:消费者进程获取到即将到期的任务后,进行相应的处理。这可以是执行任务的实际逻辑,或者是将任务发送到其他服务进行处理。
  4. 重试机制:如果某个节点的消费者进程由于故障或其他原因无法处理任务,系统需要有相应的重试机制来确保任务得到处理。这可以包括将任务重新放回消息队列中,或者将任务转发到其他可用节点进行处理。
  5. 监控和管理:分布式延时队列需要监控和管理机制来跟踪任务的状态和执行情况。这可以包括任务的计数、成功/失败的记录、任务超时等信息。

分布式延时队列的实现中,还需要考虑一些其他因素,如数据一致性、分片处理、负载均衡等。这些因素取决于具体的系统需求和设计。

开源的分布式消息队列系统(如Apache Kafka、RabbitMQ、RocketMQ、Plusar等)提供了一些原生支持或扩展来实现分布式延时队列的功能。此外,也可以使用其他分布式技术(如分布式锁、分布式调度器等)结合消息队列来构建自定义的分布式延时队列解决方案。

Apache Kafka、RabbitMQ、RocketMQ处理延时消息都并不是投递到要发送的topic中,都是先投递到一个delay topic,通过一个中间的topic 进行实现。

Apache Kafka & RocketMQ

Kafka 跟 RocketMQ 的实现方式大同小异,都是通过另外一个任务进程进行轮询消费检查。

Kafka并不支持延迟队列的功能,需要我们手动去实现,一般来说Kafka不支持任意时间精度的延迟消息,只支持固定级别的延迟,创建一个topic,该topic创建18个partiition,每个partition对应不同的延迟级别。

1688628499924

原理
  1. 首先创建一个单独针对延迟队列的 topic,同时创建 18 个 partition 针对不同的延迟级别
  2. 发送消息的时候根据延迟参数发送到延迟 topic 对应的 partition,对应的 key为延迟时间,同时把原 topic 保存到 header 中
  3. 内嵌的 consumer单独设置一个 ConsumerGroup去消费延迟 topic 消息,消费到消息之后如果没有达到延迟时间那么就进行 pause,然后 seek到当前 ConsumerRecordoffset位置,同时使用定时器去轮询延迟的 TopicPartition,达到延迟时间之后进行 resume
  4. 如果达到了延迟时间,那么就获取到 header中的真实 topic ,直接转发

1688628626012

RabbitMQ

RabbitMQ本身并不存在延迟队列的概念,但是可以通过DLX死信交换机和TTL消息过期来实现延迟队列。

原理
  1. 设置TTL(Time to live) 过期时间 a) 通过队列属性设置,队列中所有的消息都会拥有相同的过期时间,一般在实践中设置为消息默认的最大超时时间 b) 对消息单独设置过期时间,每条消息都有不同的过期时间 a、b俩个过期时间会以时间较小值为准
  2. 配置DLX(Dead Letter Exchange)死信交换机 一个消息要成为死信消息有以下3种 a) 消息被拒绝处理或者处理失败,reject b) 消息过期 c) 队列达到最大长度

1688629253860

Plusar

消息被存储在BookKeeper中。在消息发布到代理服务器后,DelayedDeliveryTracker在内存中维护时间索引(time -> messageId)。一旦指定的延迟时间结束,该消息将被传递给消费者。

1688629516408

内存单点延时队列

内存单点延时队列是一种在单个节点上使用内存存储来处理延时任务的解决方案。它适用于小规模或单节点环境下的延时任务处理。

以下是内存单点延时队列的基本实现思路:

  1. 任务存储:延时任务以数据结构的形式存储在内存中。可以使用数据结构如堆、链表、红黑树等来组织任务,并根据任务的到期时间进行排序。
  2. 任务调度:使用定时器或周期性的检查机制来扫描任务队列,找出到期的任务并进行处理。定时器可以使用操作系统提供的定时器或者自定义的定时器。
  3. 任务处理:一旦任务到期,将其从队列中移除,并进行相应的处理,如执行任务逻辑、发送通知、调用回调函数等。
  4. 容错机制:需要考虑异常情况和故障恢复。例如,当系统崩溃或重启时,需要从持久化存储或日志中恢复任务队列,并确保不会丢失或重复执行任务。
  5. 监控和管理:需要监控任务队列的状态、任务的执行情况以及系统的健康状况。可以记录任务执行日志、统计任务执行时间等信息,并提供管理接口来添加、删除或修改任务。

内存单点延时队列适用于简单的延时任务场景,如定时任务调度、简单的消息提醒等。由于是单点模式,系统的可靠性和可扩展性受限,不适用于高负载或分布式环境。对于大规模、高并发的延时任务需求,分布式延时队列是更合适的选择。

实现方式

延时队列的实现方式有多种,常见的有以下几种:

  1. 基于堆或优先队列:使用堆或优先队列数据结构来存储延时任务,根据任务的执行时间进行排序。每次取出队首元素(最近要执行的任务)进行处理。
  2. 时间轮(Time Wheel):类似于定时器轮盘,但时间轮可以动态调整时间间隔的大小。时间轮将时间划分为多个槽,每个槽对应一个时间间隔,每个槽中存放需要在该时间间隔内执行的任务。时间轮可以根据需要动态地增加或减少槽的数量,以适应不同的延时需求。
  3. 基于红黑树或平衡二叉搜索树:使用红黑树或平衡二叉搜索树来存储延时任务,并根据任务的执行时间进行排序。通过定时器周期性地检查红黑树或平衡二叉搜索树中是否有到期的任务,并进行处理。

基于时间堆的实现

延迟队列旨在存储具有特定延迟的项目,并在延迟过去后处理它们。 它可用于调度将来需要执行的任务或事件。

两个模块组成

queue.go 文件定义了管理延迟队列的数据结构和实现时间堆排序的方法

scheduler.go 文件提供了处理队列中项目的调度程序逻辑。

queueItem Struct

type queueItem struct {
    order *domain.TradeOrder
    delay int64
    index int
}

堆元素由订单Order、延迟时间delay和位置索引组成

delayQueue Type

type delayQueue []*queueItem

自定义一个底层为数组的大小堆数据结构

delayQueue Methods

delayQueue 定义了以下方法实现 container/heap 官方包的大小堆接口

scheduler Struct

type scheduler struct {
    dq       *delayQueue
    callback orderCallBack
    mu       sync.Mutex
}

定义 scheduler 结构体,表示处理延时队列中项目的调度器。该结构体包含了延时队列和一个回调函数,用于处理延时结束回调。

Push Method

方法用于将具有指定延迟的订单添加到延时队列中。该方法计算订单应该被处理的绝对时间,并将其插入到延时队列中。

func (t *scheduler) Push(order *domain.TradeOrder, delay int64) {
    t.mu.Lock()
    heap.Push(t.dq, &queueItem{
        delay: time.Now().UnixMilli() + delay,
        order: order,
    })
    t.mu.Unlock()
}

RunScheduler Method

调度器的主循环,它在一个独立的 goroutine 中运行。该方法使用定时器触发,并在每次触发时尝试处理延时队列中。

func (t *scheduler) RunScheduler(ctx context.Context) {
    var tick = time.NewTicker(time.Millisecond)
    defer tick.Stop()
    for range tick.C {
        select {
        case <-ctx.Done():
            return
        default:
        }
        t.pop(ctx)
    }
}

pop Method

用于从延时队列中取出最早的项目,并检查其是否已经达到延迟时间。如果还未达到延迟时间,将重新将项目放回延时队列,并延迟处理。否则,将在新的 goroutine 中调用回调函数处理。

func (t *scheduler) pop(ctx context.Context) {
    if t.dq.Len() == 0 {
        return
    }
    t.mu.Lock()
    item := heap.Pop(t.dq).(*queueItem)
    t.mu.Unlock()
    delay := item.delay - time.Now().UnixMilli()
    if delay > 0 {
        t.Push(item.order, delay)
        return
    }
    go t.callback(ctx, item.order)
}