Closed strangemonad closed 1 month ago
Hi Shawn, awesome suggestion, I feel like this would also make post execution in all our examples much more intuitive to new users.
Right now, the remote orchestrators (especially Airflow) are the main obstacle. We will probably need to majorly rework those. But IMO this is a great suggestion and worth the effort, I'll look into how this can be done.
Hey @strangemonad. As it currently stands we won't be able to implement exactly what you asked very soon as there are many open questions around this:
However within the pull request that addresses your other issue, I have snuck in a small improvement, that could at least offer some of the behaviour that you are asking for:
First of all you would be able to access all completed runs of the pipeline that you just ran.
pipe = example_pipeline(...)
pipe.run()
last_completed_run = pipe.get_runs()[-1]
As this does not address the concern of race conditions, the alternative way of using this would be by defining the run name.
pipe = example_pipeline(...)
run_name = f"run_{time.time_ns()}"
pipe.run(run_name=run_name)
run_view= pipe.get_run(run_name=run_name)
Does this help you?
Hey @AlexejPenner, this idea of merging some of the PipelineView and RunView behavior is interesting. I think all the points you've mentioned are really appropriate and maybe hint that it's worthwhile using this use case to flesh out the right abstractions (getting this abstraction correct is probably more important to me long term, and I'm willing to wait a little, than quickly solving for just fetching the runs).
I think a solid abstraction over pipelines and runs will benefit us all and the points you've made probably mean there's more detail to unpack? (this probably means lot's of smaller issues could get created and this issue could track a larger design?)
Please take all the remaining thoughts as a sketch of a design and note necessarily exact naming and proposals.
2. What if the run method contains a schedule That's a really good question and one that I've been struggling with separately from this specific issue of fetching pipeline run details. E.g. when targeting KFP, because it's not clear at which point the pipeline exists on KFP or if it's an updated version of a pipeline that already exists, it's not easy right now to "upgrade" a scheduled pipeline.
2.1 I think I'd much prefer to have a separate Pipeline#run()
which implies "run once" and Pipeline#schedule()
than the current run()
which might either mean run once or run and schedule future runs.
2.2 If I'm deploying a new version of a pipeline that's already running on a schedule, in KFP, I really want to be able to know when the pipeline has been uploaded and then disable the existing schedule and create a create a new schedule on the new pipeline.
1. async run creations / lazy run name creation in vertex ai I think these are details that are probably important to model in the Zenml API and related to the question above of handling schedules. I think vertex "fixed" the KFP API here. Creating a pipeline and creating creating a run of a pipeline can take a while so exposing these as jobs in the vertex API makes sense.
Locally, what I'd like to see is an abstraction that continues to wrap all of this up. Zenml mostly does this already. For example, the prepare_or_run
already does a lot locally like building the collecting dependencies and possibly building a docker image. You could make a case that it would be nice for this to also run asynchronously in some cases. Internally, you're first preparing, then uploading, then possibly waiting for the pipeline create job to complete... etc.
3. airflow DAG return I think this is a problem with the current API. It leaks details about the orchestrator you're currently using. I agree something needs to be returned but it should be consistent for all orchestrators (as you pointed out to make it future proof).
All that being said, I'm not sure what the best implementation mechanism is or even if that should be a PipelineView
or `RunView object or some sort of URI.
There's probably 2 APIs, a lower level one where the various async steps are exposed with more granularity and a higher-level one that's synchronous and easier to use (e.g. in notebooks).
run()
vs run_async()
. I realize there's already a flag that specifies sync vs async in run
but that's just for the remote orchestrator run portion and I think specifying this as a flag doesn't make for a great API since ideally the return type changes (async returns some sort of awaitable).I can get some sort of value back from my_pipeline().run_async()
that I can await for completion. Under the hood this runs all parts of the workflow (e.g. in the case of vertex, it awaits the pipeline creation job completion AND the pipeline run). This is really nice in some cases where I can't locally do anything more until the pipeline has results.
The returned value is the same datatype across ALL orchestrators. Each orchestrator might provide a specific implementation that gives access to orchestrator specific details. I think this would more gracefully handle cases like getting the Ariflow DAG or how the vertex pipeline currently logs the dashboard job url. These could be all wrapped up in this dataclass. For example
class PipelineView(BaseModel, ABC):
name: str
#...
native_pipeline: Any # not sure about the right name for this
class AirflowPipelineView(PipelineView):
native_pipeline: DAG
class VertexPiplineView(PipelineView):
native_pipeline: ...
uri: URI
If I want finer grained control over what is sync vs what is async (e.g. I want to block on locally preparing and uploading but not block on remotely running the pipeline) I can use the lower level API to execute parts of the workflow blocking and other parts non-blocking. For example
p = my_pipeline().upload() # sync await of local prepare and remove upload (e.g. in KFP and vertex I now have a pipeline instance)
r = p.run_async() # r is awaitable
A zenml URI abstraction over all of these concepts that get stored in the artifact store. e.g. a PipelineView
could have a pipeline://
uri scheme and a RunView
could have a run://
uri scheme. Just enough state could be stored to know what state the various workflows are in. E.g. once a vertex pipeline-run job is created, you store the job ref and you know that you're now in the AWAITING_REMOTE_JOB
state of the workflow.
The awaitable workflow above might also have some sort of way of being polled for it's current state. E.g. if I have a r = run_async()
, I might be able to poll it to know what state it's in OR r.await()
to await for completion. This could be very useful for creating interactive jupyter widgets where I might have r.repr()
show me progress (e.g. preparing docker image, waiting for vertex ai pipeline creation, waiting for vertex ai pipeline run, ...)
Let me know if this mostly makes sense
also, let me know if sketching out some of these details in code might be helpful
Thanks for all the detailed ideas and thoughts. You have given us a lot of valuable input for our Design Planning around this. As you mentioned, this will probably turn into a collection of design changes. Especially polling state and handling asynchronous runs are due some love from our side. I'll keep you updated and would love to hear your input once we have come up with a coherent Design Document on this.
This discussion was extremely useful @strangemonad and has been well integrated into ZenML for a while, so closing this issue! :-)
Contact Details [Optional]
shawnmorel@gmail.com
Describe the feature you'd like
A very common case is to fetch the run results of a pipeline run. It would be very convenient to return a value from
BasePipeline#run()
that could be used to fetch the pipeline run details.Given
Option 1
run
could be an instance of a RepositoryPipelineRunView
. This approach would mean that there's some sort of implicitly created, globalRepository()
instance.Option 2
run
could be some sort ofrun_uri
and there could be aRepository().get_run(run_uri)
method. This pattern is familiar to other systems (e.g. mlflow has various uri schemes for different artifact types).Both options could work for synchronous and asynchronous pipeline run executions. The cleanest option is probably for the
PipelineRunView
to fetch the most up to date status (possibly running). The run view could possibly also have an option to await pipeline termination by returning a Future or asyncio awaitable.Either approach means anyone could render an updating jupyter display of the pipeline status enum e.g. I currently render the following and hyperlink it to the KFP run url
Run
my_pipeline-25-Jun
Status
completed
Is your feature request related to a problem?
Yes. As more people are concurrently running the same pipeline either in different experiments or different CI runs of the pipeline with different parameters, there's an inherent race condition when you try to query
repo.get_pipeline(pipeline_name=pipeline_name).runs[-1]
. There's no guarantee that's the run you just completed, it could be another concurrent run of the same pipelineHow do you solve your current problem with the current status-quo of ZenML?
We don't solve it cleanly at the moment. A messy alternative would be renaming pipelines with various suffixes or leveraging KF namespaces and profiles more heavily. Neither are very elegant solutions
Any other comments?
No response