kedro-org / kedro

Kedro is a toolbox for production-ready data science. It uses software engineering best practices to help you create data engineering and data science pipelines that are reproducible, maintainable, and modular.
https://kedro.org
Apache License 2.0
9.87k stars 893 forks source link

Partition-wise pipeline runs #432

Closed seeM closed 4 years ago

seeM commented 4 years ago

Description

Most of my usage requires running the same pipeline independently on each of a set of partitions. For example, an ETL pipeline that selects 1 week's worth of data, applies a sequence of transformations, and writes the result to a document store or another database.

I consider this a common usage pattern. The core requirement being to run pipelines "partition-wise" rather than "node-wise" (see Context for more detail), as in https://github.com/quantumblacklabs/kedro/issues/191 as well as this stackoverflow question. So I would like to know:

  1. If this is already supported with pointers to docs and/or examples. https://github.com/quantumblacklabs/kedro/issues/191 suggests not.
  2. If not, is this a case that the framework could directly support or is planning to support?
  3. If not, what would be a good workaround?

If I'm not alone in considering this a common pattern without good support, I'd like to propose to either support this in the framework or if that's not possible to document the preferred strategy.

Context

The primary way to deal with partitions, PartitionedDataSet, is well documented but is kind of a red herring here. It falls short here in at least three ways, all because computation is node-wise (i.e transformations are computed for a given node across all partitions before moving to the next node, with persistence as the final step) rather than partition-wise:

  1. Memory inefficient: all partitions are held in memory at the same time, even though transformations are independent.
  2. Computation inefficient: pipelines can run two nodes in parallel, but cannot parallelise across partitions, requiring the node logic to handle that instead (via pandas, dask, spark, multiprocessing, etc). Note that node-level logic can't solve for memory inefficiency.
  3. Persistence is artificially delayed: Ideally you could compute all nodes and persist for the first partition, repeat for the second, etc. Not being able to do so is bad because an error at the persistence stage in one partition wastes computation, and because it makes pipelines harder to debug than if you could run a single partition at a time.

Possible Alternatives

IMO the best available strategy is to parameterise the pipeline by some partition id (in the above case some identifier of the week's worth of data, like the start and end dates). Then to have a higher-level script (or perhaps another pipeline) that runs the lower-level parameterised pipelines possibly in parallel. If using a higher-level pipeline, we run into issues around unique dataset names (relates to https://github.com/quantumblacklabs/kedro/issues/162 and https://github.com/quantumblacklabs/kedro/issues/396) that I'm not sure how to solve.

limdauto commented 4 years ago

Hi @seeM, thank you for a very thoughtful issue. At the moment, you can already emulate this behaviour using before_pipeline_run Hook. The idea is you can supply the partition's name as a parameter to the pipeline run and dynamically add an input dataset and output dataset to the catalog accordingly. Please see this repo for a demo pipeline: https://github.com/limdauto/demo-kedro-parameterised-runs. The README contains some information about how it works.

Generally speaking though, parameterising pipeline runs is definitely something on our radar and there might be first-class construct to facilitate this. However, I think coordinating parameterised pipeline runs is beyond the scope of Kedro though. Like you said, it is the job of a higher-level script, or a scheduler that is aware of your business requirements.

seeM commented 4 years ago

@limdauto, Thanks for the demo, it's very helpful! In hindsight, "partition-wise pipeline runs" are simply "pipeline runs", making the issue more about parameterised pipelines. I would still like to see what first-class support for this might look like, but your demo presents a good alternative for now.

I'm happy for you to close this.