dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
10.39k stars 1.29k forks source link

[dagster-k8s] Set k8s configuration at runtime #21299

Open abhinavDhulipala opened 1 month ago

abhinavDhulipala commented 1 month ago

What's the use case?

Howdy, I think a really useful feature would be to dynamically set a downstream op k8s config based on the output of an upstream or or at least through a op run config. That way we could at least launch other jobs from the output of a current job.

I have 2 particular use cases that I am having a tough time using dagster to implement.

Scenario 1:

I have an op that based on op configs builds a particular docker container. This container has built software dependencies that we use to run experiments. Think A/B comparison of a particular piece of software or a regression analysis. I.E how performant is our code today compared to yesterday. I want my users to be able to create a docker container that installs a particular version of our compiler and use that images to compile and run a suite of programs.

Here is an example run config:

ops:
  docker_build_image:
    config:
      dep_list:
        manifest
         - name: apt_package_name
            version: pinned.version.semver
         custom_tag: exp1-volatile
# test how well our new compiler performs on varying matrix multiplication shapes. I.e [M X K] * [K X N]
  run_gemm:
    config:
      shapes:
        M: [128, 256, 1024, ...]
        N: [128, 256, 1024, ...]

A block Diagram for clarity

erDiagram
    "Docker Build" ||--o{ "Run Experiment" : "use upstream image"

Scenario 2: Variable resource hinting Similar to the above scenario, given an experiment with different shapes and sizes, we can imagine that we'd want more memory for larger and larger matrix sizes.

ops:
# test how well our new compiler performs on varying matrix multiplications, with oyr 
  param_gemm:
    config:
      shapes:
        M: [128, 256, 1024, ...]
        N: [128, 256, 1024, ...]
flowchart LR
   up[[Parameterize builds]] -->|500Mib mem request/limit| down1([128 X matrix])
   up[[Parameterize builds]] -->|1Gi mem request/limit| down2([256 X matrix])
   up[[Parameterize builds]] -->|4Gi mem request/limit| down3([1024 X matrix])

Ideas of implementation

The ideal solution would be to add this as Output/DynamicOutput metadata:


@dataclass
class MatrixConf:
    M: int
    N: int

@op(out=DynamicOut(MatrixConf))
def paramatrize(context: OpExecutionContext) -> Generator[DynamicOutput[MatrixConf], None, None]:
    yield DynamicOutput(
        MatrixConf(128, 128),
        metadata={"dagster_k8s/config": {"container_spec_config": {"resources": {"limits": {"memory": '1Gi'}}}}},
    )
    yield DynamicOutput(
        MatrixConf(256, 128),
        metadata={"dagster_k8s/config": {"container_spec_config": {"resources": {"limits": {"memory": '2Gi'}}}}},
    )
    ...

@op(ins=In(MatrixConf))
def run():
    ...

@job(executor_def=k8s_job_executor)
def job():
    paramatrize().map(run).collect()

Another solution thats nearly as good, but would be sufficient in the interim is being able to set tags during op config.

Additional information

The current workaround is independently building the images as a separate job, then launching a run job with modified execution paramaters. This is a really poor solution and only works for trivially small op enumerations.

execution:
  config:
    job_image: <image built>:tags-built

I don't think the graphql api allows setting execution parameters (could be wrong there), but this means that we need a manual step here.

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization. I would be willing to contribute here if the maintainers express interest in potentially upstreaming this and agreeing on a implementation.

abhinavDhulipala commented 1 month ago

A potential solutions here in the interim is to use dagster pipes and the k8s pipe client as described here

danielgafni commented 3 weeks ago

I think this is a big missing feature which is really important for complex K8s workloads.

I want to be able to specify from config:

And also dynamically set these values based on previous ops outputs.