cdrx / faktory_worker_python

Python worker for the Faktory project
BSD 3-Clause "New" or "Revised" License
67 stars 27 forks source link

Slow Job Processing due to Concurrency Architecture #56

Open ttstarck opened 5 months ago

ttstarck commented 5 months ago

I've noticed in some simple testing that this doesn't process jobs very fast. It seems that this is due to the code structure of how the jobs are fetched. This seems to be because the Thread/Process that pulls jobs can only pull a job once there is an Executor Thread that is not busy. If there are no Executor Threads ready, we hit a 0.25s sleep before checking again (source).

This means that once a job completes, we could almost take up to 0.25s before we start working another job.

I know the Ruby faktory client, the "Executor" (Processor) threads are the ones that are doing the fetching, so they will immediately pull the next job once they're complete with their current job.

Would you be open to a pull request to refactor this code to match the Ruby implementation where the Executor Threads are the ones doing the fetching?

cdrx commented 5 months ago

The 0.25s delay is only applied when the worker is either disconnecting or at the concurrency limit.

If the worker has capacity to be running jobs then the delay is not used and new jobs will be processed near instantly.

ttstarck commented 5 months ago

I'm running the example producer and worker code with some small tweaks to queue the jobs faster and add additional logging: Producer Worker

The Producer code queues ~6 jobs a second. However the Worker cannot keep up with that with a Concurrency of one.

Here are the log outputs of the worker:

DEBUG:faktory.worker:Running task: add(1, 2)
2024-05-21 15:16:10.826007 - add: 1 + 2 = 3
DEBUG:faktory.worker:Running task: subtract(10, 5)
2024-05-21 15:16:11.082362 subtract: 10 - 5 = 5
DEBUG:faktory.worker:Running task: multiply(8, 8)
2024-05-21 15:16:11.338471 multiply: 8 * 8 = 64
DEBUG:faktory.worker:Running task: add(1, 2)
2024-05-21 15:16:11.595590 - add: 1 + 2 = 3
DEBUG:faktory.worker:Running task: subtract(10, 5)
2024-05-21 15:16:11.852505 subtract: 10 - 5 = 5
DEBUG:faktory.worker:Running task: multiply(8, 8)
2024-05-21 15:16:12.109596 multiply: 8 * 8 = 64
DEBUG:faktory.worker:Running task: add(1, 2)
2024-05-21 15:16:12.361325 - add: 1 + 2 = 3
DEBUG:faktory.worker:Running task: subtract(10, 5)
2024-05-21 15:16:12.618927 subtract: 10 - 5 = 5
DEBUG:faktory.worker:Running task: multiply(8, 8)
2024-05-21 15:16:12.871962 multiply: 8 * 8 = 64

Each job is roughly taking ~0.25s to process, even though there enqueued jobs in Faktory is increasing in size.

cdrx commented 5 months ago

Increase your concurrency and you will get a better throughput.

The 0.25s delay is there to stop the worker process from spinning the CPU, allowing the in flight jobs to complete, when you are at the concurrency limit. When your workers are scaled appropriately you won't hit that delay at all and everything will be near instant.

ttstarck commented 5 months ago

I'm looking to run this for jobs that are heavy CPU intensive with very minimal IO (machine learning), so I wasn't planning on increasing concurrency for my use case.

I just tested increasing the concurrency to 2, and this made the processing of the jobs nearly instantaneous (which is what I originally expected).

cdrx commented 5 months ago

To answer your original question:

Would you be open to a pull request to refactor this code to match the Ruby implementation where the Executor Threads are the ones doing the fetching?

I would, but I think this would not be trivial. Jobs can be processed in other processes, so the socket would need to be shared. I think it would be difficult to get this right.

The easy path would be to make the 250ms timeout configurable, so you can tune to something more suitable for your use case.

A better alternative would be to change the logic where the time.sleep is to be cancellable. If Worker._process could "cancel" the sleep(0.25) when a job is complete, then the worker would instantly fetch the next job. That would remove the delay when the worker is at the concurrency limit without spinning the CPU.

I think this could be implemented with threading.Event fairly easily.