mosquito-cr / mosquito

A background task runner for crystal applications supporting periodic (CRON) and manually queued jobs
MIT License
227 stars 24 forks source link

Leaky Bucket Queue? #101

Open robacarp opened 2 years ago

robacarp commented 2 years ago

Re: #100

A leaky bucket rate limited is best implemented at the queue layer, but mosquito doesn't yet have the notion of configurable queue classes. If it did, this might work:

require "./queue"

module Mosquito::LeakyBucket
  class Bucket < Mosquito::Queue
    def metadata : Metadata
      Metadata.new @config_key
    end

    def can_enqueue? : Bool
      # todo max size needs configurable
      size < 15
    end

    def will_drip? : Bool
      # TODO drip interval needs configurable
      last_drip = metadata["last_drip"]?
      return true if last_drip.nil?
      last_drip = Time.unix_ms last_drip.to_i
      Time.utc - last_drip > 10.seconds
    end

    def drip!
      metadata["last_drip"] = Time.utc.to_unix_ms.to_s
    end

    def dequeue : Task?
      unless will_drip?
        puts "not dripping"
        return
      end
      Log.info { "time to drip!" }
      drip!
      super
    end
  end

  module Limiter
    module ClassMethods
      def queue
        Mosquito::LeakyBucket::Bucket.new queue_name
      end
    end

    macro included
      extend ClassMethods
    end

    def rescheduleable? : Bool
      false
    end

    # def enqueue : Task
    #   if self.class.queue.can_enqueue?
    #     previous_def
    #   else
    #     raise "No room left in bucket"
    #   end
    # end
  end
end