tensorflow / tfx

TFX is an end-to-end platform for deploying production ML pipelines
https://tensorflow.github.io/tfx/
Apache License 2.0
2.11k stars 711 forks source link

`LocalDagRunner` ignores output artifacts from container components #4555

Closed IanTayler closed 1 year ago

IanTayler commented 2 years ago

System information

Describe the current behavior

Even though output artifacts are correctly written inside the container, these files are never writter to the pipeline root and are never passed to the subsequent components that need them.

After running the code I wrote below, I run docker container ls --all and can check that I had two container runs but the second one got an empty value for output x even though by doing docker cp <first_container_hash>:/tmp/output/ first_out I can check that my container is correctly writing the value inside the provided path in /tmp/output.

I have also tried changing the InputValuePlaceholder for the String to an InputUriPlaceholder (while changing echo to cat) and that makes the second container fail with a No such file or directory error message from cat.

Of course, the original pipeline I got this problem with was more complicated than the one shown below. In that one, I also saw this problem with tfx.types.experimental.simple_artifacts.Metrics artifacts. These are the only two types of artifacts I tested the LocalDagRunner with. I don't want to give the impression I saw it working for other artifact types.

docker inspect shows there are no volumes in the containers created by the LocalDagRunner.

Describe the expected behavior

The file written in the docker file to the output location should be copied to the host pipeline_root for inspection and passed to all components that need it. Probably the most reasonable way to get this would be by using volumes.

Standalone code to reproduce the issue

import click
from tfx.dsl.component.experimental.container_component import (
    create_container_component,
)
from tfx.dsl.component.experimental.placeholders import (
    ConcatPlaceholder,
    InputValuePlaceholder,
    OutputUriPlaceholder,
)
from tfx.orchestration.metadata import sqlite_metadata_connection_config
from tfx.types.standard_artifacts import String
from tfx.v1.dsl import Pipeline
from tfx.v1.orchestration import LocalDagRunner

PIPELINE_NAME = "mlops-dummy-pipeline"
PIPELINE_ROOT = "/tmp/output/"
METADATA_CONNECTION_CONFIG = sqlite_metadata_connection_config(
    "/tmp/pipeline_metadata.db"
)

python_script_cmd = ConcatPlaceholder(
    [
        """
import os

os.makedirs(os.path.dirname('""",
        OutputUriPlaceholder("x"),
        """'), exist_ok=True)
with open('""",
        OutputUriPlaceholder("x"),
        """', 'w') as f:
    f.write('someoutput')
""",
    ]
)

@click.command(help=__doc__)
def _broken_localdag_runner():
    component_image = "python:3.8"
    component_a = create_container_component(
        name="A",
        image=component_image,
        outputs={"x": String},
        command=["python", "-c", python_script_cmd],
    )()
    component_b = create_container_component(
        name="B",
        image=component_image,
        inputs={"x": String},
        command=["echo", InputValuePlaceholder("x")],
    )(x=component_a.outputs["x"])
    components = [
        component_a,
        component_b,
    ]
    pipeline = Pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        components=components,
        metadata_connection_config=METADATA_CONNECTION_CONFIG,
    )
    runner = LocalDagRunner()
    out = runner.run(pipeline)
    print(out)

if __name__ == "__main__":
    _broken_localdag_runner()

Other info / logs

Containers after running:

Screenshot from 2022-01-03 17-04-22

Value of output inside container (run number 7):

Screenshot from 2022-01-03 17-07-39

Let me know if there's any other information I can provide or any other way I can help with this issue.

charlesccychen commented 2 years ago

Hi @IanTayler, thank you for the report and apologies for the late reply.

Currently, you are right that the current implementation does not use volumes to achieve value-passing here. In current usage (and in usage on cloud-based runners), users set their pipeline root to be a storage bucket (say on GCS or S3). Then, subsequent writes to the intermediate files are accessible by all containers when run in a certain cloud project. We currently have a limitation that this scheme does not work as well for local execution, and have not yet implemented the local volume type approach.

SoleRivas commented 2 years ago

Hi @charlesccychen, thank you very much for the response. Soledad here, I work on the same team that @IanTayler and he's OOO today.

Is the local volume implementation on your development roadmap? If that's the case, do you have an estimate of when this will be Implemented?

Thanks in advance!

IanTayler commented 2 years ago

For future reference, I was actually able to work around this by using a BeamDagRunner and attaching a platform_config_pb2.DockerPlatformConfig() to the components, setting the volumes field to the local pipeline root (which should be an absolute path for it to work properly).

singhniraj08 commented 1 year ago

@IanTayler,

Could you please confirm if this issue can be closed since the workaround of using BeamDagRunner and setting volumes field to local pipeline works for you. Thank you!

IanTayler commented 1 year ago

The workaround works, although it's a bit hacky.

Whether that means this issue can be closed depends on whether the tfx team thinks local pipeline roots should be implemented for LocalDagRunner or not. On our side, we have a working alternative and would not mind too much either way.

jiyongjung0 commented 1 year ago

Thank you for the response. We (still) don't have a good plan to improve LocalDagRunner integration with the container component support yet. Let me close the bug for now.

google-ml-butler[bot] commented 1 year ago

Are you satisfied with the resolution of your issue? Yes No