ml6team / fondant

Production-ready data processing made easy and shareable
https://fondant.ai/en/stable/
Apache License 2.0
339 stars 26 forks source link

Distributed execution #549

Open RobbeSneyders opened 1 year ago

RobbeSneyders commented 1 year ago

Fondant currently supports execution across cores on a single machine, which enables workflows using millions of images, but becomes a bottleneck when scaling to tens or hundreds of millions of images.

This issue aims to describe how we can move to execution distributed across multiple machines.


The orchestrators we currently support, or are looking to support in the near future, share the following defining features:

Based on this description, we should implement distributed execution as a component that launches a distributed job "somewhere else" and monitors its progress.

This "somewhere else" can be different places:

It seems like the managed orchestrator provide managed versions of these options as well:


To decide how to move forward, I think we need to answer the following questions (among others):


Due to the complexity of this topic, we probably need to create a PoC to understand the possibilities and constraints of the different orchestrators.

### Tasks
- [ ] https://github.com/ml6team/fondant/issues/571
rom1504 commented 1 year ago

"Data is read and written in each container" does that mean data is stored in a local disk ? that can quickly be a bottleneck for downloading images (and even more for videos)

RobbeSneyders commented 1 year ago

Each component streams the data from and to either local disk or remote storage and passes a reference to the stored data to the next component. This "passing by reference" is different from the "passing by value" happening in Apache Spark or similar frameworks.

This design is a trade-off, but allows for the following optimizations:

The approaches for distributed execution mentioned above would make this trade-off a bit "softer", as you could define a DAG within a certain component, which is then executed on a cluster which does pass data by value between the DAG steps.

We already noticed that this trade-off plays a big part in deciding where to put component boundaries when designing a pipeline. Sometimes it's more efficient to bundle functionality in a single component, but it lowers reusability of the separate functionalities.

Where do you see the bottleneck for downloading images and videos? I assume it's not in the downloading itself, but in combination with other functionality that needs the image / video data?

rom1504 commented 1 year ago

What we usually do in img2dataset and video2dataset is to do all intermediary processing in memory, read the input from remote storage with fsspec and write the output to remote storage with fsspec. That allows scaling to any amount of data (for example many dozens TBs)

Writing to disk between each component means you introduce

That said I agree the properties you mention are nice.

I wonder if introducing the possibility to have this intermediary space be in memory would mitigate some of the issues it introduces. Do you think that's possible?

Do you already support reading and writing from/to remote storages (s3,GCS,hdfs,...) through fsspec for the initial input and final output ?

rom1504 commented 1 year ago

Just to clarify what I'm saying: I think one very important point to think about is the granularity of the data.

I found having shard be the unit that component manipulate is convenient and makes it possible to reduce issues with temporary data, redundancy etc. Spark usually call this partitioning. Other distributed frameworks have similar concepts. I've been assuming you have that too but is that true actually are you using shards ? Or are you working at the level of the whole dataset?

I'm wondering how that question (shards) plays with your concept of intermediary storage between components

RobbeSneyders commented 1 year ago

What we usually do in img2dataset and video2dataset is to do all intermediary processing in memory, read the input from remote storage with fsspec and write the output to remote storage with fsspec. That allows scaling to any amount of data (for example many dozens TBs)

This is exactly what a single Fondant component does. It uses fsspec to read the input data from, and write the output data to remote storage.

I might have created some confusion by mentioning local disk in this regard, since the local runner can use local disk as "remote storage" as well. Within a component, the local disk is not explicitly used, although data might spill from memory to disk when running low on memory.

I've been assuming you have that too but is that true actually are you using shards ? Or are you working at the level of the whole dataset?

We indeed have shards, and we call them partitions as well. The data on the remote storage is partitioned, and the partitions can be read, processed, and written one by one.

A component actually creates a (Dask) computation graph of its work, which can be executed on the partitions separately. The graph is currently executed inside the component, which means that it can only use the single machine the component is running on.


With this context in mind, the approach we would take to enable distributed execution, is to enable distribution within a single component, by submitting the computation graph as a job to a kubernetes / dask / spark cluster instead. The component will still read its input data and write its output data to / from remote storeage.

If we would want to implement img2dataset and video2dataset using Fondant, we should evaluate which parts need to be implemented in a single component, and which parts can be split into separate components.

rom1504 commented 1 year ago

Ok, I see. Doing distribution inside a component means one component = one job. I think that limits the scope of potential components.

I am interested by a different kind of "component" (currently called subsampler in video2dataset) which looks like

I understand your plans for fondant, I guess the next step is for you all to try implementing these jobs / components and see how well that'd work on some scaled up examples.

Basically you could just have a img2dataset fondant component if you like. For the current implementation it probably makes sense.

On Mon, Oct 30, 2023, 17:10 Robbe Sneyders @.***> wrote:

What we usually do in img2dataset and video2dataset is to do all intermediary processing in memory, read the input from remote storage with fsspec and write the output to remote storage with fsspec. That allows scaling to any amount of data (for example many dozens TBs)

This is exactly what a single Fondant component does. It uses fsspec to read the input data from, and write the output data to remote storage.

I might have created some confusion by mentioning local disk in this regard, since the local runner can use local disk as "remote storage" as well. Within a component, the local disk is not explicitly used, although data might spill from memory to disk when running low on memory.

I've been assuming you have that too but is that true actually are you using shards ? Or are you working at the level of the whole dataset?

We indeed have shards, and we call them partitions as well. The data on the remote storage is partitioned, and the partitions can be read, processed, and written one by one.

A component actually creates a (Dask) computation graph of its work, which can be executed on the partitions separately. The graph is currently executed inside the component, which means that it can only use the single machine the component is running on.

With this context in mind, the approach we would take to enable distributed execution, is to enable distribution within a single component, by submitting the computation graph as a job to a kubernetes / dask / spark cluster instead. The component will still read its input data and write its output data to / from remote storeage.

If we would want to implement img2dataset and video2dataset using Fondant, we should evaluate which parts need to be implemented in a single component, and which parts can be split into separate components.

— Reply to this email directly, view it on GitHub https://github.com/ml6team/fondant/issues/549#issuecomment-1784768449, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAR437WJOHS5M435JDEX7XLYB5VIJAVCNFSM6AAAAAA6NZA356VHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMYTOOBUG43DQNBUHE . You are receiving this because you are subscribed to this thread.Message ID: @.***>

RobbeSneyders commented 1 year ago

I think there's benefits of having them within a single component or split across components.

Not all steps need access to all the data though. Some of the subsamplers only use the index (eg. the frame subsampler), only audio data (eg. whisper subsampler), or only image data (eg. caption subsampler). Since fondant only reads the data that a component needs, this reduces the cost of writing & reading the data between components.