dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.97k stars 1.5k forks source link

Assign ECS cpu/memory in tags on a per-asset basis #20899

Open leejlFG opened 8 months ago

leejlFG commented 8 months ago

What's the use case?

With the open source Dagster deployed to ECS with the ECSRunLauncher, memory and compute resources can be explicitly specified in one of two ways: in the dagster.yaml file or as job tags. Tagging individual assets with requisite ECS resources is supported neither through the op_tags kwarg nor the new tags kwarg. This means that to configure those resources, you have to set all runs to use the same resources or limit yourself to only materializing assets via jobs, effectively taking auto-materializations and manual materializations through the web UI off of the table.

Ideas of implementation

Ideally those resources could be optionally set on each asset, with the job tags taking precedence. If we have an asset group like

@asset(tags={"ecs/cpu": "1024", "ecs/memory": "8192"})
def thing_one():
    ...

@asset(tags={"ecs/cpu": "2048", "ecs/memory": "16384"})
def thing_two(thing_one):
    ...

thing_job = define_asset_job(tags={"ecs/cpu": "4096", "ecs/memory": "16384", ...)

the resource tag hierarchy would be defaults from dagster.yaml, then the individual tags, then the job tags. If no job tags are specified, the ECSRunLauncher should take the largest resource set from the individual assets and use those for the run.

Additional information

No response

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

leejlFG commented 8 months ago

I would happily work on this with a nudge in the right direction.

yuhan commented 6 months ago

Would this work for you? https://github.com/dagster-io/dagster/discussions/14544

ammerzon commented 6 months ago

The lack of this feature is a significant drawback for us because we would have to apply the highest hardware requirements of all assets to every run, regardless of whether they're needed and, therefore, overprovision. In addition, it would also be beneficial to provide capacityProvider on an asset-level basis.

@leejlFG What I don't understand is why this isn't working. Shouldn't the tags be applied with the get_cpu_and_memory_overrides function?

CleanShot 2024-05-27 at 17 05 19@2x

leejlFG commented 6 months ago

@yuhan using op_tags would be fine for our purposes, but since we use the ECSRunLauncher the maximum resources for any asset/op in the run needs to take priority for the entire ECS task, which is exactly the issue @ammerzon describes.

I may be wrong, but op_tags and plain tags on assets don't propagate and merge with run_tags in the ECSRunLauncher. Tags applied to jobs are run_tags, nothing else is. The get_cpu_and_memory_overrides function references only the run_tags. If one new set of tags needs to be merged with run_tags I think it makes the most sense to avoid using op_tags as they are basically not used at all outside k8s and op-based workflows.

@ammerzon you really need an ECSStepLauncher instead to have different assets inside the same run spin up with different resources, right?

Getting some sort of tag on assets so ad hoc and automaterializations launch with the necessary resources instead of the default ones sounds like a different but related problem than getting individual assets within a run to spawn with fresh containers in a step launcher, also with those specified resources.

ammerzon commented 6 months ago

@leejlFG, I see your point about these being only run_tags.

I hadn't considered that aspect before, good point 🤔 An ECSStepLauncher would be nice to have, but it's not a requirement for me. It's acceptable if all assets for the same run share the same resources.

What I care most about is having some sort of tag per asset so that auto-materializations/ad-hoc runs are launched with the necessary resources.