brandonhilkert / sucker_punch

Sucker Punch is a Ruby asynchronous processing library using concurrent-ruby, heavily influenced by Sidekiq and girl_friday.
MIT License
2.65k stars 114 forks source link

Safely Wait For All Queues? #250

Closed metaskills closed 2 years ago

metaskills commented 2 years ago

Cheers! Assuming I am in a Lambda function with Rollbar configured to use_sucker_punch, I'd like a way to flush all queues of their work after some ActiveJob's have run. Is it safe to assume that SuckerPunch::Queue.shutdown_all would be this interface? From what I can read in the code the Queue.find_or_create will recreate the pools if needed via the Job.perform_* methods. Wondering if shutdown_all is also idempotent and safe to call if no jobs are in the queue. I think it is. But just wanted to check. Thank you!

brandonhilkert commented 2 years ago

shutdown_all will tell all queues to no longer accept any new work and start the countdown based on the setting SuckerPunch.shutdown_timeout. If the enqueued jobs aren't finished in that time period is will kill the process mid-job and bail.

If you want to make sure all work that had been enqueued is completed, I'd set the shutdown_timeout to something high (but reasonable) so make sure the lambda function waits before it shuts it all down.

metaskills commented 2 years ago

Thanks for the shutdown suggestion, that would not work without doing a specific Lambda Extension that would negate the need for SuckerPunch. Something like. https://github.com/customink/lambda_punch

What I need is some interface that tells SuckerPunch to do all the work. Like a #clear but some blocking method that just works off the jobs. The idea is that I would use this in a ActiveJob after_perform callback to flush things like Rollbar jobs if any were present after the invoke and before a potential freeze of the execution environment.

Is my assumption incorrect that calling SuckerPunch::Queue.shutdown_all leaves the queues in an inoperable state?

brandonhilkert commented 2 years ago

I'm not sure I follow the difference. You wouldn't want to call a method and then just wait until it finished b/c it would wait a really long time. I assume you have a timeout on the function itself, so what would be the different of setting the SuckerPunch shutdown timeout to the same value as the function timeout?

re: shutdown_all - Not sure what you mean by inoperable, but the queues will not longer accept any work after shutdown_all is called.

brandonhilkert commented 2 years ago

FWIW - shutdown_all is blocking and returns when the work is done or it reaches the timeout.

metaskills commented 2 years ago

Thanks for the help, I'll try to explain a bit more. Imagine using K8s container images with AWS Lambda for background jobs and event-driven messages using a different ENTRYPOINT and COMMAND. Same container image, different execution env. In this situation, the function's timeout is say 5 minutes. Could be more, but the idea is you are doing some work that takes far less than X minutes. During that time there may be 0 to 1 Rollbar errors. Because the work is done and because I want to avoid low re-used functions (maybe frozen one and eventually reaped) from dropping error reporting I'd like to flush the SuckerPunch queue after each unit of work is done. Sending 1 or 2 Rollbar errors is not going to take long. At most a few seconds.

Technically the at_exit might flush a function's frozen runtime's SuckerPunch queues when it moves to the SHUTDOWN state. But from what I have found, that is not guaranteed. Hence why I would like to tell the queues to process work thru some declarative interface after each INVOKE's main work is done before the runtime is frozen. I hope this helps explain things?

Not sure what you mean by inoperable...

Basically asking would things break?

but the queues will not longer accept any work after shutdown_all is called...

And that is the answer. I re-read the code and took a look at the Queue#kill method would be where things hit that point. Does that sound right? Do you have thoughts on a simple "wait_for_all" implementation?

brandonhilkert commented 2 years ago

Technically the at_exit might flush a function's frozen runtime's SuckerPunch queues when it moves to the SHUTDOWN state. But from what I have found, that is not guaranteed

I think this is where I'm not following. I'm not super familiar with the runtime, so it's hard for me to offer much insight here.

If shutdown_all isn't what your looking, I'm honestly not sure what the implementation would look like. I'm open to reviewing code, but I'm not sure this has large-scale value. Feels a little bit like a monkey-patch to me.

The shutdown_all method:

  1. Stops queues from accepting new work
  2. Starts a kill timer
  3. Allows existing work to be processed

From what I gather, you're saying all work would've been submitted, so that rules out the value of 1. 2 you say is not super important, but my experience is that this can never hurt. And 3 is what you're directly looking to do.

Does that sound right? Yup

I recognize I still may be missing something and if so, I'm sorry.

metaskills commented 2 years ago

Yup, number 3 is all I need. Essentially something like this? Warning, pseudo code, just typed up.

    def self.wait
      queues = all
      while queues.any? { |queue| !queue.idle? }
        sleep PAUSE_TIME
      end
    end
brandonhilkert commented 2 years ago

I guess I just don't see the value of not having 1 and 2 for this purpose. Either way, if that's all you want, I'd recommend monkey-patching a new method in to Queue with the method and details of your choice. You'll also want to call shutdown on the underlying @pool as is done in Queue#shutdown.

metaskills commented 2 years ago

Thanks, assuming I could convince you there is value in having a wait method without the 1 and 2, would you be open for a pull request?

brandonhilkert commented 2 years ago

I'm always up for looking at code.

metaskills commented 2 years ago

Done: https://github.com/brandonhilkert/sucker_punch/pull/251