temporalio / sdk-python

Temporal Python SDK
MIT License
472 stars 75 forks source link

[Feature Request] Activity specific worker tuning #663

Closed gregbrowndev closed 1 month ago

gregbrowndev commented 1 month ago

Is your feature request related to a problem? Please describe.

My application is a batch-oriented ETL pipeline. There are different async activities that interact with a different third-party API throughout the workflow. For example, the geocoding activity uses Google Places API, another activity might use the OpenAI API, etc.

Each activity is subject to external rate limits.

While I can control rate limiting within my code, I can't easily control the concurrency of activities executed by the worker. This means when multiple activities of the same type are running, they are in contention for the rate limit, and simply take longer to finish or worse the activity can end up timing out due to the global throttling.

My solution so far has been to deploy a separate worker for each activity and set the max_concurrent_activities worker option to control the concurrency and allow one or two activities to complete as quick as possible. However, this has created a lot of DevOps overhead and it can be hard for people on the team to set the worker up correctly when they add a new activity that has a rate limit.

Describe the solution you'd like

I saw the experimental WorkerTuner option added in this PR: https://github.com/temporalio/sdk-python/pull/559.

It would be great for this to support tuning for specific activities, e.g. defining the number of slots for a given activity name. This would allow a single async worker to handle all of my activities but only let a controllable number of each type run at any given time.

Additional context

I saw there is a Go- (and Java I think) based example for creating a Mutex Workflow that could control concurrency globally. This might be a better overall solution, but I struggled to apply this pattern in Python. I wouldn't expect the worker tuning to prevent contention if you scaled out the number of workers. However, in my solution, I have specifically disabled autoscaling on this worker for this reason, so controlling this at the worker level would be a lot more simple.

cretz commented 1 month ago

This will not work with how slots are used. You should use separate task queues if you need separate tuning options.

A slot is a reserved spot to request work from a task queue from the server (and if no slot available, we don't ask for work). The activity name is not known at slot reservation time. Granted when we expand the slot supplier interface in Python to allow custom implementations, you can use the existing scheduled activities to help you derive the next slot reservation, but it will still always be per task queue because Temporal distributes work to workers at the task queue level not the activity name level.

gregbrowndev commented 1 month ago

Thanks, @cretz that does make sense.

I think then the only other option is the mutex workflow (at least if I understand it). Basically, I just need to avoid scheduling an activity altogether if there's already a given number of them executing already.

If I have a workflow and the next activity to schedule is geocoding, I assume the mutex workflow would allow me to simply make the calling workflow wait until the resource can be acquired before it can schedule it.

I can't allow the workflow to schedule the activity and make the activity wait on a semaphore because it would eventually timeout if 100s of these activities were scheduled during a short period.

Happy for you to close the issue šŸ‘šŸ»

cretz commented 1 month ago

I think then the only other option is the mutex workflow

I think you should use separate task queues (i.e. separate workers) for activities that need to have their concurrency limited separately. This is a very common approach.

Granted the only global setting is max task queue concurrent activities there, so yes a mutex workflow may work best for global, but note it would not support high-throughput rate limiting.