mosquito-cr / mosquito

A background task runner for crystal applications supporting periodic (CRON) and manually queued jobs
MIT License
227 stars 24 forks source link

Add persist_run_count flag #100

Closed xaviablaza closed 2 years ago

xaviablaza commented 2 years ago

Context for this change:

  1. I have a mosquito queued job that calls an API, sometimes the API call takes really short or really long to return (1 second to 3 minutes or more)
  2. I use: throttle limit: 10, per: 1.minute. I limit 10 jobs to run per minute; if I run 11 jobs concurrently then I get a status code 429 TOO MANY REQUESTS
  3. persist_run_count is used to persist the run count between window intervals; if the job is executed then the run count goes up, and if the job has persist_run_count set to true and succeeds, then the run count goes down, allowing more jobs to be enqueued
robacarp commented 2 years ago

@xaviablaza thanks for jumping in! If the reference implementation of the rate limiter doesn't fit for you, it's easy enough to implement a custom rate limiter and using that instead. Let me know if I can help!

xaviablaza commented 2 years ago

I wanted to build something like a leaky bucket rate limiter, and just decided on using run_from in the initial worker configuration and strictly limiting the number of workers working on a specific task on the infrastructure side

robacarp commented 2 years ago

@xaviablaza if you're still interested, I prototyped this leaky bucket style rate limiter:

require "./queue"

module Mosquito::LeakyBucket
  # When rate limited, this will cause the job order to be shuffled.  A proper
  # leaky bucket would be implemented at the Queue level, but mosquito doesn't
  # currently have the ability to instantiate mutiple types of queues. See
  # Runner#fetch_queues.
  #
  # When rate limited, the mosquito runner will verbosely complain about the
  # many job failures.
  module Limiter
    DEFAULT_DRIP_RATE = 10.milliseconds
    DEFAULT_BUCKET_SIZE = 15

    module ClassMethods
      def leaky_bucket(*,
          drip_rate : Time::Span = DEFAULT_DRIP_RATE,
          bucket_size : Int32 = DEFAULT_BUCKET_SIZE
      )
        @@drip_rate = drip_rate
        @@bucket_size = bucket_size
      end
    end

    macro included
      extend ClassMethods

      @@drip_rate = DEFAULT_DRIP_RATE
      @@bucket_size = DEFAULT_BUCKET_SIZE

      before do
        retry_later unless will_drip?
      end

      after do
        drip! if executed?
      end
    end

    def rescheduleable? : Bool
      rate_limited?
    end

    def reschedule_interval(retry_count : Int32) : Time::Span
      if rate_limited?
        time_to_drip
      else
        super
      end
    end

    def rate_limited? : Bool
      ! will_drip?
    end

    def enqueue : Task
      if can_enqueue?
        super
      else
        raise "No room left in bucket"
      end
    end

    def can_enqueue? : Bool
      self.class.queue.size < @@bucket_size
    end

    def will_drip? : Bool
      time_to_drip <= 0.seconds
    end

    @_time_to_drip : Time::Span? = nil
    def time_to_drip : Time::Span
      @_time_to_drip ||= begin
        last_drip = metadata["last_drip"]?
        return 0.seconds if last_drip.nil?
        last_drip = Time.unix_ms last_drip.to_i64
        last_drip + @@drip_rate - Time.utc
      end
    end

    def drip!
      now = Time.utc.to_unix_ms
      last_drip = metadata["last_drip"]?

      if last_drip
        return unless last_drip.to_i64 < now
      end

      metadata["last_drip"] = now.to_s
    end
  end
end

If you add that module to your project, then you can use it like this:

class LeakyBucketJob < Mosquito::QueuedJob
  include Mosquito::LeakyBucket::Limiter

  leaky_bucket(drip_rate: 1.second, bucket_size: 10)

  def perform
    log "drip"
  end
end

At a minimum I need to write tests for it, but it seems like a great candidate for release. Thanks!