rails / solid_queue

Database-backed Active Job backend
MIT License
1.96k stars 132 forks source link

Dynamic scheduled tasks #186

Open wollistik opened 8 months ago

wollistik commented 8 months ago

Hi @rosa, I really appreciate the work you have done here and I am eager to switch over to solid_queue. There is only one thing left, which is holding me back.

Other gems like resque-scheduler or sidekiq-scheduler offer the ability to dynamically add or remove tasks to the schedule (see https://github.com/resque/resque-scheduler?tab=readme-ov-file#dynamic-schedules). Since everything seems to be stored in the database for solid_queue, this should be quite easy to achieve.

Maybe it is only a documentation issue and it is already possible (this would be awesome 🤞 ). Happy to hear from you!

rosa commented 8 months ago

@wollistik, no, this is not supported yet, you didn't miss it 😅

This is something we can consider adding for sure. May I ask what's your use case for it?

wollistik commented 8 months ago

Hi @rosa, we have some cron like jobs, where the user is able to

Therefore the dynamic scheduling feature was really nice to implement these requirements.

But after I thought about this the last two days, I might be completely redesign this feature and come up with a different solution, because it was rarely used and still required some quirks to get it working.

rosa commented 8 months ago

Ahh got it! That makes sense 👍 Yes, I thought a bit more about it yesterday and this is something I want to implement eventually 😊

abrunner94 commented 7 months ago

Maybe this helps, but the way I implemented dynamic cron schedules is by having one job run every minute and check if the cron schedule matches the one stored in my database table, with my table containing cron expressions. I use Fugit to check if the cron matches the current time.

AquisTech commented 7 months ago

@abrunner94 Even I have done a similar thing. Mine is not exactly the cron like scheduling but I am having different schedules. First task is scheduled to be executed after 15 days and then onwards every alternate days. So in the execution of first job I enqueue next job and in that job I keep enqueueing next jobs every 2 days as per condition.

jason-rutherford commented 3 weeks ago

SolidQueue is great. I’ve never embraced a new Rails addition more warmly than this one. Thanks to every contributor!

I took at look into dynamic recurring jobs and I think SolidQueue is really close to supporting them, at least how I envision. There are just a couple of missing parts which I will try to detail below.

TLDR; The Scheduler is to recurring_tasks like a media player is to playing your music/video playlist. It currently only supports (static) playlists predefined in a config file, which is only loaded at application startup. It would be awesome if we could update our playlist (recurring_tasks) while music is playing and without having to restart the playback.

The dirty details of exploration and a hacky workaround

I wanted to dive into SolidQueue in hope to uncover what it will take to run dynamic recurring tasks. First as a reminder, the README tells us that the Scheduler handles recurring tasks. Currently it seems that only static recurring tasks work out of the box while dynamic has some plumbing in place. What is a static vs. dynamic recurring task? If I understand correctly,

I will give an example of how to create a dynamic recurring task in my use case described way down below. But even if you created one now it will not run automatically. Remember, the Scheduler holds the list of recurring tasks which:

  1. currently only knows about static recurring tasks (loaded from config file) and
  2. only gets loaded upon boot

So here is one way to solve (1). In SolidQueue::Configuration#recurring_tasks lets return static + dynamic. BTW, we're just hacking this to work.

# in lib/solid_queue/configuration.rb
      def recurring_tasks
        @recurring_tasks ||= recurring_tasks_config.map do |id, options|
          RecurringTask.from_configuration(id, **options)
        end.select(&:valid?)

+       @recurring_tasks + RecurringTask.where(static: false)
      end

So now assuming that we have at least one static recurring job and a dynamic one then all we need to is restart our application for the Scheduler to know about them. Yeah I know, that's not ideal so lets try to address that now.

I could not figure out if or how to reach the instance of the Scheduler in an attempt update the attributes with a reloaded recurring_tasks list. So instead, what if we just restart the Scheduler right after we create our dynamic recurring tasks hoping that it will reload the recurring tasks list?

# after dynamic recurring task creation...
SolidQueue::Process.where(kind: "Scheduler").all.map(&:deregister)

Assuming that SolidQueue::Process is public API, deregister will restart the Scheduler process fine, but it can take the Supervisor up to a minute which totally works for my use case but maybe not others. Regardless, we still have a problem. The Supervisor holds the Configuration (with recurring_tasks) from boot up, meaning the newly spawned Scheduler process will not have our newly created dynamic recurring task. No bueno.

To address that, first we can update Configuration so that we can tell it to reload and return a fresh recurring_tasks list.

# in lib/solid_queue/configuration.rb
+    def reload_recurring_tasks
+      recurring_tasks
+    end

    private
    ...

Then lets go to the Supervisor#replace_fork and just before we fork the new Scheduler process we make sure it will have a fresh reloaded list of recurring tasks.

# in lib/solid_queue/supervisor.rb
def replace_fork(pid, status)
        SolidQueue.instrument(:replace_fork, supervisor_pid: ::Process.pid, pid: pid, status: status) do |payload|
          if terminated_fork = forks.delete(pid)
            payload[:fork] = terminated_fork
            handle_claimed_jobs_by(terminated_fork, status)

+            if configured_processes[pid].kind == :scheduler
+              configured_processes[pid].attributes[:recurring_tasks] = configuration.reload_recurring_tasks
+            end

            start_process(configured_processes.delete(pid))
          end
        end
      end

I tried to find a better way. This feels pretty hackish but it does get the job done. But now we seem to have everything in place for my use case.

Final thoughts. I think what it boils down to is that it would be awesome if the Scheduler could reload the recurring_tasks as they get updated. It could poll for updates periodically. It could receive a signal that triggers a reload. It could have some after hooks that trigger a reload. Whatever it is, I am super excited for it.

Use Case

I have an Event AR that has a start_time and end_time. When an event is created I would like to schedule a one-time job at the start_time. I call this job LiveEventDispatcherJob. When it runs at the scheduled event.start_time it creates some other jobs, one of which is the dynamic recurring task LiveEventPollingJob which runs every 15 seconds or so. There is also a LiveEventCleanupJob which runs at event.end_time which deletes the recurring task and, somewhat unfortunately, restarts the Scheduler just to refresh its recurring_tasks list.

# config/recurring.yml
 development:
   periodic_hello:
     command: "puts 'Hello, static recurring world!'"
     priority: 2
     schedule: every 10 minutes

Beware if you are doing development for dynamic recurring tasks. The current SolidQueue Supervisor will not start a Scheduler process if there are no recurring jobs. As a workaround I was using the above dummy recurring task.

class Event < ApplicationRecord
  after_create :create_event
  validates :name, :start_time, :end_time, presence: true

  private

  def create_event
    LiveEventDispatcherJob.set(wait_until: self.start_time).perform_later(event_id: self.id)
  end
end
class LiveEventDispatcherJob < ApplicationJob
  queue_as :default

  POLLING_SCHEDULE = "every 15 seconds"

  def perform(event_id:)
    puts "dispatching for the start of live event with id: #{event_id}"
    event = Event.find(event_id)

    # What I wish ActiveJob had (passing schedule to set):
    # LiveEventPollingJob.set(schedule: POLLING_SCHEDULE).perform_later(event_id: event.id)

    # What I am doing instead:
    SolidQueue::RecurringTask.find_or_create_by(
      static: false,
      key: "LiveEventPollingEvent#{event.id}",
      schedule: POLLING_SCHEDULE,
      class_name: "LiveEventPollingJob",
      arguments: [ { event_id: event.id } ]
    )

    # Restart the Scheduler to pick up dynamic recurring task changes
    SolidQueue::Process.where(kind: "Scheduler").all.map(&:deregister)

    # Schedule the cleanup job
    LiveEventCleanupJob.set(wait_until: event.end_time).perform_later(event_id: event.id)
  end
end
class LiveEventPollingJob < ApplicationJob
  queue_as :default

  def perform(event_id:)
    puts "polling for live event with id: #{event_id}"
    sleep 5
  end
end
class LiveEventCleanupJob < ApplicationJob
  queue_as :default

  def perform(event_id:)
    puts "cleaning up after the end of live event with id: #{event_id}"
    SolidQueue::RecurringTask.where(key: "LiveEventPollingEvent#{event_id}").destroy_all
    # ...

    # Restart the Scheduler to pick up dynamic recurring task changes
    SolidQueue::Process.where(kind: "Scheduler").all.map(&:deregister)
  end
end

So my hacks aside, I think SolidQueue is very close to supporting dynamic recurring tasks, at least for the my use case. Thank you SolidQueue team.