ml6team / fondant

Production-ready data processing made easy and shareable
https://fondant.ai/en/stable/
Apache License 2.0
341 stars 25 forks source link

Distributed processing and scaling up #500

Closed rom1504 closed 1 year ago

rom1504 commented 1 year ago

Hello!

I like the goals you have with fondant. I also believe data processing at scale is quite important for ML. I have similar goals with img2dataset, clip-retrieval, video2dataset and cc2dataset and these tools worked pretty well at scale.

As you've seen, a lot of different filtering and transformation are possible and making those modular and reusable is nice. It's true for text and images and it's even more the case for bigger modalities such as video, 3d and bio data.

You made the choice here to package everything with docker and use kubeflow and dask and have a well defined component structure. I find the architecture to be quite similar to https://github.com/jina-ai/jina

I am wondering what you found in term of

Thank you for any insights!

PS: maybe this should go to the discussion tab; feel free to move it there if it makes more sense

RobbeSneyders commented 1 year ago

Hi @rom1504, thanks for starting this discussion!

I like the goals you have with fondant. I also believe data processing at scale is quite important for ML. I have similar goals with img2dataset, clip-retrieval, video2dataset and cc2dataset and these tools worked pretty well at scale.

Thanks for your work on those! We've used some of your tools and included them in some of our components. We opted to copy in part of the tools instead of using them as dependency because of some dependency conflict issues. I'll open an issue on one of your repos with more info.

You made the choice here to package everything with docker and use kubeflow and dask and have a well defined component structure. I find the architecture to be quite similar to https://github.com/jina-ai/jina

There's indeed some similarities with Jina, but I think the angle is a bit different. Jina focuses on hosting your models, and allows you to chain them into pipelines. Fondant focuses on reusable data processing, with the possibility to host models inside components. It will be interesting to see how much those angles converge.

Our focus on reusable data processing is also the reason for docker and our well defined component structure. It helps with the interoperability and reusability of the components.

I am wondering what you found in term of

  • speed to turn up a pipeline: if you use N components each using its own docker file, is that still fast?

There's a couple of aspects to this:

  • overhead of docker: does each component use some minimum of ram?

When running on Linux, there is virtually no overhead. On Mac or Windows there might be, but that's not really relevant for scaling.

The only place where there might be overhead is on the networking side. We've noticed that we don't achieve the same performance downloading images as img2dataset, but we haven't had the time to properly investigate where the issue lies. We think it might be due to docker, but if it is, it might be resolved by proper network configuration (although that might not be possible on every orchestrator).

If you want to get a feel yourself for the points mentioned above, I recommend giving our local runner a spin (see our Getting started docs.

  • scaling up: in my tools I've tried to keep things as minimal as possible to be able to scale to billions (or even trillions in the case of cc2dataset) of samples. I've found that optimizing for network, cpu and ram constraints usually forces some specific designs (eg for img2dataset shuffling the dataset beforehand to avoid killing external hosts, having worker which each has a thread pool to have different levels of parallelism, ...). How are things working for you there? Is dask reliable enough? is kubeflow able to handle large scale ? I'm particularly curious if you think your architecture will keep working at any scale or if it will need to be adapted and where

Design

Proper design will still be important to be able to scale pipelines and components. We've seen this for instance when extracting URLs from common crawl, where we had to combine a lot of steps into a single component to prevent large data movement, while smaller parts of the flow could have been reusable for other use cases. Or when doing global deduplication, where we had to cluster the data in a first component, to then do local deduplication per cluster in a second component, splitting a single logical step into two components.

Supporting nested pipelines (including a pipeline as a component in a larger pipeline) might offer some benefits here in the future. But mainly the reusability of components will. Since a component can be implemented once and reused many times, it only needs to be designed properly once.

Dask

We currently get two things from Dask:

You can see both of these combined in our PandasTransformComponent, where you only need to implement your transform function on a single Pandas dataframe. We use Dask to load the data in chunks as Pandas dataframes and execute them in parallel. See this simple example.

Concurrency on a single core is not handled by Dask, and currently needs to be implemented manually, but we might offer an abstraction for this in Fondant in the future.

Dask might not always be the best choice for every data type or transformation, so we made sure to encapsulate the Dask-related code in specific dataIO classes. This allows us to support additional frameworks in the future.

Kubeflow pipelines

Kubeflow pipelines is only one of the orchestrators that we support, but we can use it as an example, as the other orchestrators we (will) support are very similar. The defining features are:

This is also currently the limit to our scaling.

I can see paths to move beyond this single-machine limit though:


This became quite long :sweat_smile:, but it was helpful for myself as well to write this down in a (hopefully) structured way. Looking forward to your feedback.

RobbeSneyders commented 1 year ago

Turning this into a discussion so we can track the progress towards distributed execution in #549