mosquito-cr / mosquito

A background task runner for crystal applications supporting periodic (CRON) and manually queued jobs
MIT License
227 stars 24 forks source link

Implement single-binary concurrent workers #18

Open robacarp opened 6 years ago

robacarp commented 6 years ago

This would also be a good opportunity to clean up the Runner class, as it's currently wired up like a pile of spaghetti.

Off the top of my head, here's a checklist:

m-o-e commented 2 years ago

Fyi, I'm using this to have multiple Mosquito::Runner's in one process (works with Mosquito v1.0.0rc2):

class Swamp
  Log = ::Log.for("swamp")

  @@fibers = [] of Fiber
  def self.start(n=1)
    raise Exception.new("Swamp already started") unless @@fibers.empty?
    n.times do
      @@fibers << spawn { Mosquito::Runner.start }
    end
    Log.info &.emit("Swamp started (#{n} workers)", { ev: "swamp.started", workers: n })
  end

  def self.stop(timeout = 28.seconds)
    Log.info &.emit("Swamp stopping", { ev: "swamp.stopping", workers: @@fibers.size, timeout: timeout.total_seconds })
    t = Time.monotonic
    Mosquito::Runner.stop
    begin
      while !@@fibers.empty?
        fiber = @@fibers.shift
        while !fiber.dead?
          sleep 0.42
          raise IO::TimeoutError.new if Time.monotonic - t > timeout
        end
      end
    rescue ex : IO::TimeoutError
      Log.error &.emit("Swamp failed to stop", { ev: "swamp.stop_failed", reason: "timeout", workers: @@fibers.size })
    else
      Log.info &.emit("Swamp stopped", { ev: "swamp.stopped", workers: @@fibers.size })
    end
  end
end