PrefectHQ / prefect-aws

Prefect integrations with AWS.
https://PrefectHQ.github.io/prefect-aws/
Apache License 2.0
84 stars 40 forks source link

Task definition caching does not work if the task definitions come from separate deployments. #368

Closed taylor-curran closed 6 months ago

taylor-curran commented 8 months ago

Expectation / Proposal

We just moved from defining our own task-definition for prefect-ecs-task, to letting prefect defining them by itself in runtime.

After the change, we seem to have an issue where prefect defines a task_definition per task. Error from aws:

botocore.errorfactory.ClientException: An error occurred (ClientException) when calling the RegisterTaskDefinition operation: Too many concurrent attempts to create a new revision of the specified family.

Task-definition diff is:

The main think I see I can change in the work_pool-config regarding this, is: Auto Deregister Task Definition (Optional) , we have tried with this both enabled and disabled.

When doing a simple test like, where the same deployment is deployed multiple times, it runs ok, and same task_definition is used, both with and with out Auto Deregister Task Definition (Optional):

#!/usr/bin/env bash

_run_prefect_task() {
    echo "Trigger task: $1"
    poetry run prefect deployment run 'ecs-task-flow/snowflake_access_test'
    echo "Task done: $1"
}

_run_task_in_parallel() {
    CONCURRENT_RUNS="$1"
    for i in $(seq 1 "$CONCURRENT_RUNS"); do
        _run_prefect_task $i &
    done
    echo "Waiting for tasks to complete"
    wait
}

_run_task_in_parallel 200

Traceback / Example

In the case where we are being throttled, we have different deployments using the same image/image-tag, and then could also use the same task-definition.

From looking at: prefect_aws/workers/ecs_worker.py#L735 You can see that we cache the task definition to avoid the over-registering of task definitions, but for this user in particular, they have too many different deployments.

We might have a situation, where what is used as cache-key is the deployment-id, so that results in the same task-definition is added to cache multiple times, and then being created multiple times. Maybe we could use a hash from the task-definition as cache-key, together with some other values from the worker/deployment, but not the deployment-id?

mch-sb commented 8 months ago

A small example of when this issue presents an issue for us:

from prefect import flow, deploy
import asyncio
from prefect.deployments.deployments import run_deployment

@flow(log_prints=True)
def hello(deployment_index):
    print(f'hello {deployment_index}')

async def main():
    deployments=[]
    for x in range(200):
        deployments.append(
            await hello.to_deployment(
                name=f"deplyment-{x}",
                parameters=dict(
                    deployment_index=x,
                ),
            )
        )
    deployed_deployments = await deploy(
        *deployments,
        image="same image for all deployments",
        work_pool_name="same workpool for all deployments",
        build=False,
    )
    for uuid in deployed_deployments:
        await run_deployment(
            name=uuid,
            timeout=0,
        )

if __name__ == "__main__":
    asyncio.run(main())

Basically, a set of deployments, different parameters, but same image and work_pool, which results in one task_definition revision created per deployment, and we get the ClientException.

mbgrabbe commented 7 months ago

I ran into this very same issue yesterday while trying out ECS worker pool deployments. This is not a problem we see today with our infra block agent deployments.