Open bmb1603 opened 4 months ago
Hey @bmb1603, thanks for writing this up. We have something quite similar in our backlog: automatically restarting after a certain number of jobs. I'm not sure this will make it to v1.0 as I have other priorities but if you or anyone wants to take a stab a this, please do! I'd probably go for max number of jobs as a criteria to start because it seems the simplest, and it might make sense to be able to configure globally but also per worker.
Thank you for your reply @rosa
In this context, is there currently a built-in mechanism to clean up unused queues created via wildcard patterns? Otherwise, could you advise what's the recommended way to manually shut down specific workers that are no longer needed? For example, if we have workers listening to object_*_queue
, how can we shut down the worker for object_123_queue
when it's no longer required?
Not sure I completely understand 🤔 Would it be enough to remove that worker from your configuration and restart?
So if I configure
production:
workers:
- queues: "object_*_queue"
processes: 1
threads: 1
And then have a Job like that:
class ParallelJob < ApplicationJob
queue_as do
object_id = arguments.first.id
"object_#{object_id}_queue"
end
In this example, when enqueueing ParallelJob.perform_later(object)
, SolidQueue would "dynamically" start a new Worker for each Queue/Object, right? The question is, how do I stop Workers in case I don't plan to schedule any further Jobs for a Queue/Object.
Ahh, ok!
queues: "object_*_queue"
This won't quite work, *
is only allowed at the end, for prefix matches. This would be ignored, so your worker won't fetch any queues. You'd need to specify this as:
queues: "object_*"
Assuming that, then
SolidQueue would "dynamically" start a new Worker for each Queue/Object, right?
Not exactly; it'll start just one worker for any queues that match that prefix. The same worker will process jobs in queue object_a
than in queue object_b
or object_123
.
Ahh got it, thanks for clarifying, it all makes sense now!
@rosa I'm taking a run at implementing this since I'm deployed on Heroku which necessitates periodically killing workers that have grown too big for their own good... (Heroku gives one a choice between large bills or ridiculously limited resources)
Before V0.5, I was able to use an around_perform block and send a SIGTERM to the worker when I hit my OOM criteria. After V0.5, signaling the worker has become less reliable (it doesn't reliably restart the worker and generates prune errors). Given how hacky that solution was to start with, I've decided to try and do things the Right Way(â„¢) and submit a PR.
I've been trying to find the right alchemy to gracefully stop the worker. If I don't stop the worker cleanly, it seems some jobs can get left in a state where they don't get restarted (or if they do, it takes so long that I've not been patient enough to for the SolidQueue maintenance processes to pick them up again).
It looks like calling Worker#stop is almost right but it can leave Jobs in a claimed state that can take a fair amount of time to clear. Interestingly, while this should be equivalent to using Signals to terminate the worker, the workers reliably restart which wasn't the case with signaling from the around_perform block.
Any suggestions for a way to gracefully release outstanding claims and letting the worker finish any in-flight jobs would be appreciated.
@rosa I've tracked down the root cause of my confusion.
When I force restart a worker with an in-flight job, that leaves any blocked executions waiting for the concurrency expiry + the maintenance window in order to release the concurrency lock.
Is there any database backed book keeping that would let me identify the in-flight job that got killed without finishing? And then, is there any tooling that I could use to effectively trigger an immediate maintenance processing step based on those jobs or is this part of what I have to build to complete this feature?
Thanks.
Hey @hms! Sorry for the delay in reviewing this; I was busy with other recent Solid Queue stuff.
When I force restart a worker with an in-flight job, that leaves any blocked executions waiting for the concurrency expiry + the maintenance window in order to release the concurrency lock.
Aha! This seems like a bug/oversight related to some new behaviour I introduced so that in-flight jobs are marked as failed when their worker terminates without giving them time to finish. This is here when the process is pruned because it died without having the chance to deregister, and still had claimed jobs, and here when the supervisor detects the process has died and replaces it with another one. These ultimately call this method, that marks the job as failed but doesn't unblock other jobs waiting on that same concurrency key because that's done as part of perform
here, but in this case failed_with
is called outside perform
.
This is something I need to fix... I need to do some changes around process pruning and processes dying/reviving as part of an issue that came up in #324, so I'll handle that. I think that'd answer your question:
And then, is there any tooling that I could use to effectively trigger an immediate maintenance processing step based on those jobs or is this part of what I have to build to complete this feature?
However, looking at this feature in particular, which is the main goal, I think I'd go with something simpler that avoids having to kill a worker mid-job from the job. I assume your workers' memory grows as they run more jobs... would it be simpler to configure a max number of jobs that a worker is allowed to perform before it terminates itself? It could be also a memory limit but the number of jobs is much easier to measure as you just need a counter. Then, it'd be up to the worker to count how many jobs it has processed, and just shut down gracefully when that's the case 🤔 . Then the supervisor will replace it. Do you think that could work for your case?
@rosa Thank you for getting back to me.
I found all of the bits of code reference above and even had some "fixes" for the issues at hand, but since it was very quickly changing from features to architectural / philosophical design level issues, I didn't want to go any further without A) coordinating with you; B) seeing where you want to head with some of these issues (if address them at all!); C) because some of the changes could end up being pretty fundamental, if you are ok with my taking a crack at them or if you felt it something that had to be handled by the SolidQueue core team.
At the high level, I think the notion of worker recycling is a pretty important feature. But looking under the hood, it reveals a couple of significant questions and potentially touches a lot of the overall implementation:
If we allow for worker recycling, then do we want to support the notion of "wait until the Pool is empty / drained" before shutdown? (Given I have some long running and very resource expensive jobs that are not decomposable any further, I'd prefer to have the choice/control).
The worker constraints under consideration and if they are worth the effort:
Both Job count and Worker aging address Jobs that have issues with resource leaks (possibly out of the developers control via 3rd party Gems / APIs). Also, Job count can be a less than ideal way to implement the memory control as restart with N = 1 absolutely controls memory. So, all three are interesting and potentially valuable to implement. While the configuration for these options I think should fall under the worker by queue(s) section, they would be tracked, implemented on a per process basis without cross process accounting.
How to communicate with the Supervisor that this worker stopped based on a shutdown request Vs. crash and with Vs. without inflight jobs. This would allow for easy decision to run or skip def handle_claimed_jobs_by(terminated_fork, status).
@hms
This is not hard to implement if you are open to a new SolidQueue config -- a Proc that returns the current memory in bytes. This way, SolidQueue doesn't have to implement something that works across platforms/OSs or take on a Gem dependency.
Ah, that's a good idea! +1 to a Proc you provide to return the memory, combined with a configured threshold. We can start with that one and not do job count. It was just an idea to do it in a simpler way, but if a Proc works, then that's better. And yes, this could be configurable per worker. I'd keep it as simple as possible now and would only have a single threshold, no reporting threshold, just the termination one.
If we allow for worker recycling, then do we want to support the notion of "wait until the Pool is empty / drained" before shutdown?
I think I'd rely on the currently existing SolidQueue.shutdown_timeout
for this, the time a worker has to terminate in-flight work before exiting. I'd have that apply to this threshold as well. When the memory threshold is reached, we initiate the shutdown as if it was triggered by a TERM
or INT
signal, and this timeout applies.
How to communicate with the Supervisor that this worker stopped based on a shutdown request Vs. crash and with Vs. without inflight jobs.
Do you think the current way the supervisor detects this wouldn't work in this case?
@rosa
Do you think the current way the supervisor detects this wouldn't work in this case?
The issue I sort of backed into with the supervisor is that it currently fails claimed jobs via handle_claimed_jobs_by(terminated_fork, status). I assumed this was intentional and that I didn't understand the use-case(s), so I was looking for tooling to allow disambiguating between: workers that were pruned and where the job should be failed to maintain the existing behavior and workers that are being recycled intentionally.
The issue I sort of backed into with the supervisor is that it currently fails claimed jobs via handle_claimed_jobs_by(terminated_fork, status). I assumed this was intentional
Yes! But this only happens if the supervisor thinks the process exited ungraciously. I added it because before, we would have problematic jobs that caused a worker to get killed and then these jobs would be released and put back in the queue, only to be picked up by another worker that would be killed and so on 😅 Not desirable. In that case, we want the jobs to fail. In the regular situation, however, if a worker has time to shut down orderly, it should run this callback, which in turn would destroy the process record and release claimed jobs.
Are you finding a scenario where this is not happening correctly?
@rosa
Are you finding a scenario where this is not happening correctly?
Yes, I am running into this as an issue. Because SolidQueue is doing a lot via ConcurrentRuby; can have many jobs with their "fingers" in the database (introducing lock waits and random ordering of database writes); and the shutdown process is, by design, async and and with soft and hard time limits that might not align with a running Jobs time to complete, it seems it's easy to leave a Job in a state that gets reaped by the Supervisor.
Since the fork_reaping is there to specifically address a real-world problem, rather than remove it, I'd like enhance it by making orderly / disorderly shutdown information available to the supervisor. This way, jobs that source from an orderly shutdown are 100% safe to not be failed. For a quick, let's touch as little as code possible, I could update the Process.metadata hash with an orderly shutdown flag. Then, the supervisor can interrogate that value and fail or restart the job accordingly.
On to separate issues where I would appreciate a little feedback and/or guidance from you:
My current approach for recycle on OOM is: Mix-in a recyclable concern into the worker and the pass the worker into pool.post (since the logic for recycling belongs to the worker). The worker makes recycle decisions as close to actual completion of the Job that's run via the Pool.thread. As long as there is the possibility of other threads still running, I'm uncomfortable with #exit! on job completion and OOM. To this end, what I'm doing is:
def post(execution)
available_threads.decrement
future = Concurrent::Future.new(args: [ execution ], executor: executor) do |thread_execution|
wrap_in_app_executor do
thread_execution.perform
ensure
shutdown if worker.recycle_on_oom!
available_threads.increment
mutex.synchronize { on_idle.try(:call) if idle? }
end
end
....
end
Worker.recycle_on_oom! here calls #stop on itself. This way, the pool drains of resources (just in case someone was to use multi-threading as an option with worker recycling -- which offers the opportunity to forever trap a slow, but memory inexpensive Job behind a fast, but memory intensive Job. That would be Very Bad(â„¢).
@rosa Unfortunately, I'm still missing something and this is not quite right. Randomly, Jobs don't finish processing (despite reaching the code in the ensure). Even worse, the Worker becomes inoperative, where it continues to poll, but never gets new work and ignores signals. If I stop SolidQueue, this worker will become a Zombie. Any help or suggestions would be great.
On a side note, I would love some feedback on the tradeoff of simply documenting the risks of multithreaded workers that can recycle on OOM Vs. simply not allowing workers to recycle if they are set to use threads. It's one heck of a foot-gun and it will be more than "just a flesh wound" if the application falls into the wrong timing pattern.
Lastly, I'm running into what looks like a SolidQueue deadlock on Job completion and the worker gets into a bad state where it polls in a tight loop and ignores shutdown requests/demands from the supervisor. Should the Supervisor stop, it becomes a Unix zombie process (Not sure what happens on windows). Unix zombies are a real DevOps issues. So, I've added this for your consideration:
def prune
deregister(pruned: true)
kill_pruned_process(self) # New
end
# We only get there if a Process has stopped heart beating and just was just removed from the
# Process table (i.e.,. it's dead to SolidQueue). Unfortunately, that only makes it invisible and
# can eaves it with runnable access to the solid_queue tables but doing who knows what or when.
#
# This makes sure if we remove it from the database (above via deregister), we remove it from the world.
# Note: when in this state, it's become a petulant child(process) and ignores SIGTERM and SIGQUIT, so
# we are reaching for a bigger.... signal
def kill_pruned_process(pruned_process)
SolidQueue.logger.debug { "Killing pruned processes #{pruned_process.inspect}" }
::Process.kill :SIGKILL, pruned_process.pid
rescue SystemCallError
# Ignored
end
Currently, Solid Queue worker processes run indefinitely, which can lead to memory bloat and potential resource leaks over time. This is especially problematic for long-running, high-volume systems.
Feature request: Implement an automatic worker process recycling mechanism. This could:
This feature would help maintain a clean state, prevent memory leaks, and ensure consistent performance without manual intervention.
Potential configuration options: