Netflix / metaflow

Open Source Platform for developing, scaling and deploying serious ML, AI, and data science systems
https://metaflow.org
Apache License 2.0
8.12k stars 764 forks source link

Error when a resumed flow calls `S3(run=self).get` for key that was "put" during origin run #664

Open bishax opened 3 years ago

bishax commented 3 years ago

When a flow that stores an object in s3 with S3(run=self).put fails and is later resumed, then the cloned results of the previous tasks do not extend to the aforementioned object. This results in subsequent calls to S3(run=self).get to fail but I would expect resume to cover this case as it involves data versioned under a specific flow run.

See below for a flow that reproduces this behaviour:

# file: flow.py
from metaflow import current, FlowSpec, Run, step, S3
from metaflow.datatools.s3 import MetaflowS3NotFound

class BugFlow(FlowSpec):
    @step
    def start(self):
        """No-op."""
        self.next(self.store_to_s3)

    @step
    def store_to_s3(self):
        """Store some data in S3 under this run."""
        with S3(run=self) as s3:
            s3.put("mykey", b"myvalue")
        self.next(self.fail_on_run_but_not_resume)

    @step
    def fail_on_run_but_not_resume(self):
        """Trigger a failure on run but not resume."""
        if current.origin_run_id is None:
            raise Exception("I fail on run but not resume.")
        self.next(self.end)

    @step
    def end(self):
        """Access S3 artifacts."""
        origin_run = Run(f"BugFlow/{current.origin_run_id}")
        with S3(run=origin_run) as s3:
            s3_obj = s3.get("mykey")
            print(s3_obj.text)  # Prints "myvalue"

        try:
            with S3(run=self) as s3:
                s3.get("mykey")
        except MetaflowS3NotFound:
            print("Task failed successfully")

if __name__ == "__main__":
    BugFlow()
$ python flow.py run; python flow.py resume 
savingoyal commented 3 years ago

@bishax this is a valid call out. With resume, a new run-id is minted for the workflow execution which is then passed to the S3 object (S3(run=self)). In case of resuming a failed workflow, it may not be immediately obvious to Metaflow what is the right S3 object to construct - In many cases, it may well be desirable for S3(run=self) to point to a versioned object for the current run id and not the origin run id. Would something of this sort work for you -

with S3(run=origin_run or current.run_id) as s3:
    s3_obj = s3.get("mykey")
    print(s3_obj.text)
bishax commented 3 years ago

I can see broadly see three ways to get the desired behaviour: 1) Modify the behaviour of S3 e.g. define a fallback S3._s3root when S3 is constructed with run and run.origin_run_id is not None such that if get (or similar non-destructive actions) fail then they are tried again with an S3._s3root corresponding to run.origin_run_id. 2) When resume is called copy the objects in metaflow/data/<flow name>/<origin run id> to metaflow/data/<flow name>/<run id> 3) Rely on the user to not trip-up

Each solution has it's issues: 1) Requires changes across several methods of S3, complicates the constructor further. 2) Has to be called/implemented in a way that is compatible with other datastores (current and future).

When you say...

Would something of this sort work for you -

with S3(run=origin_run or current.run_id) as s3: s3_obj = s3.get("mykey") print(s3_obj.text)

Do you mean something along the lines of 1 (Changing the behaviour of S3) or 3 (Rely on me to pass in something different)?