zenml-io / zenml

ZenML 🙏: The bridge between ML and Ops. https://zenml.io.
https://zenml.io
Apache License 2.0
4.07k stars 437 forks source link

Pandas materializer is slow when using large datasets with remote artifact stores #2213

Open strickvl opened 10 months ago

strickvl commented 10 months ago

Open Source Contributors Welcomed!

Please comment below if you would like to work on this issue!

Contact Details [Optional]

support@zenml.io

What happened?

There is a performance issue with the Pandas materializer in ZenML, particularly when dealing with large datasets materialized on remote artifact stores such as S3. The loading process is extremely slow, which becomes evident when handling datasets of significant size.

Steps to Reproduce

The issue was encountered when trying to load a large dataset (~30m rows) from a Snowflake database. Initially, the pipeline was tested with a limit of 100 on the SQL query, which worked fine. However, removing the limit to fetch the entire dataset leads to a memory outage and slow performance. Here's one way you might like to test this:

import pandas as pd
from zenml import step, pipeline

def create_large_dataset(num_rows=30_000_000):
    # Creating a DataFrame with 30 million rows and a few columns
    df = pd.DataFrame({
        'id': range(num_rows),
        'value': [f'value_{i % 1000}' for i in range(num_rows)],  # Cyclic repetition of values
        'number': range(num_rows),
        'date': pd.date_range(start='2020-01-01', periods=num_rows, freq='T')  # Minute frequency
    })
    return df

@step
def my_step() -> pd.DataFrame:
    df = create_large_dataset()
    return df

# Run this using a remote artifact store
@pipeline
def my_pipeline(my_step):
    my_step()

my_pipeline()

This code generates a DataFrame with 30 million rows, each row containing an ID, a cyclically repeating string value, a number, and a timestamp. You can adjust the create_large_dataset function to tailor the dataset to your specific needs.

Expected Behavior

The expectation is for the Pandas materializer to efficiently handle large datasets, even when they are stored on remote artifact stores. The materializer should be able to load the entire dataset without significant performance degradation or memory issues.

Potential Solutions

Benchmark the current implementation to identify bottlenecks. Investigate optimizations in data loading, possibly through chunking or more efficient memory management. Consider alternative approaches or tools that are better suited for handling large datasets. Explore the possibility of improving the integration with the remote artifact stores to optimize data transfer and loading.

Additional Context

This issue is critical for users who work with large-scale data in ZenML, as it affects the efficiency and feasibility of data pipelines.

strickvl commented 10 months ago

Potentially fixed/addressed by #2212.

htahir1 commented 10 months ago

2212 Was not a good solution as evidenced by tests, therefore this issue is still open for thoughts!

benitomartin commented 10 months ago

I would like to work on this issue. Is it still open or anyone is working on it?

htahir1 commented 10 months ago

@benitomartin no it’s open still .. you might want to check out the conversation at #2212 before starting though

benitomartin commented 10 months ago

OK I will check. I was thinking on using modin. Modin is a DataFrame for datasets from 1MB to 1TB+ (here a quick summary Modin

Screenshot 2024-01-09 222301

It would require only pip install modin

and changing the pandas import import modin.pandas as pd

Is this a possibility?

htahir1 commented 10 months ago

@benitomartin yes it’s a possibllity.. we just need to decide which backend (dask,ray etc) to support..

if we do support modin maybe we can ask our users to return a modin data frame instead and have a modin materializer as part of ZenML? That way we can just give a warning in the pandas materializer if the performance is low and ask the user to return a modin dataframe .. I prefer this way rather than doing magical back and forth conversion under the hood.. what do you think

benitomartin commented 10 months ago

If the idea is just to save and load the data, I think modin would do it faster or maybe polars, and they are very user friendly. If a person is working with large dataset, he would need a tool to work with them. And you can always transform to pandas if you want

So it is in my opinion a user friendly solution.

With pip install "modin[all]" it will install Modin with Ray and Dask engines

I'm not familiar with Ray or Dask, and I don't know how this will affect or interact with ZenML, but I guess it could work

htahir1 commented 10 months ago

I think it makes sense to make a modin integration into ZenML with a modin dataframe/series materializer. In order to do this, you can follow the integrations guide from step 3.

The materializer itself can be in src/zenml/integrations/modin/materializers/modin_materializer.py, and can be something like:

from zenml.materializers.pandas_materializer import PandasMaterializer
import modin.pandas as pd

class ModinMaterializer(BaseMaterializer):
    """Materializer to read data to and from modin.pandas."""

    ASSOCIATED_TYPES: ClassVar[Tuple[Type[Any], ...]] = (
        pd.DataFrame,
        pd.Series,
    )
    ASSOCIATED_ARTIFACT_TYPE: ClassVar[ArtifactType] = ArtifactType.DATA

    # Rest can be the same as the pandas materializer: https://github.com/zenml-io/zenml/blob/main/src/zenml/materializers/pandas_materializer.py

And in the pandas materializer you can check in the save and load if the data is above a threshold (lets say >1M rows) and then we raise a warning that the user should switch to modin by doing zenml integration install modin or pip install modin[all] and returning a modin dataframe instead

WDYT about this plan?

benitomartin commented 10 months ago

I think it sounds good. Allow me a few days to play around ZenML and Modin, as I have not tested anything so far. Then I will open a PR and If I'm stuck I will come back to you

benitomartin commented 10 months ago

I made a first test running the pipeline file with pandas and modin. I attach a table comparison

Mainly, using modin requires ray (or other framework) initialization. In my test this means that saving the df, takes longer in modin compare to pandas. Reading the file is faster with modin. However, from 5'000'000 rows (threshold) modin is beating pandas (26 vs 25 seconds) and for the 28'000'000 rows 2 minutes vs 1minute 10seconds. Details attached

Modin performance

In case you want me to prepare the integration file, I guess I have to create a folder and an init file with the ModinMaterializer and include the PandasMaterializer code, right?

I can include this in the load method after the raise ImportError and the else statement

raise ImportError(
                    "You have an old version of a `PandasMaterializer` "
                    "data artifact stored in the artifact store "
                    "as a `.parquet` file, which requires `pyarrow` "
                    "for reading, You can install `pyarrow` by running "
                    "'`pip install pyarrow fastparquet`'."
                )
        else:
            with fileio.open(self.csv_path, mode="rb") as f:
                df = pd.read_csv(f, index_col=0, parse_dates=True)
`    # Check if the DataFrame is too large and suggest using modin
    if len(df) > YOUR_THRESHOLD_SIZE:
        warning_message = (
            "Warning: The size of the DataFrame is large. "
            "Consider using `modin` library for more efficient loading. "
            "You can install it with '`pip install modin`'  or '`pip install modin[all]'`"
        )
        logger.warning(warning_message)`
htahir1 commented 10 months ago

@benitomartin great job! Great results so far. I would ask the following:

benitomartin commented 10 months ago

Yes I can go through the same process with AWS/GCP. Just I need a few days for it. But so wanted to share the first insight with you

The ray initialisation comes automatically, so I guess this is the default as I do not have ray installed but according to modin docs, they detect automatically the engine. I do not know Dask but I can try to set up and check ( but after the buckets load check)

Let me first see how it performs with AWS/GCP and then I check the rest.

Once we have all clear I can create the materializer integration and update the documents 💪

htahir1 commented 10 months ago

@benitomartin Thank you! let me know if i can support in any way

benitomartin commented 10 months ago

I will test GCP first today or latest tomorrow and attach here the tables with the times, like I did locally last time

benitomartin commented 10 months ago

I ran some tests today using GCP, see attached

gcloud

At 5'000'000 rows overall pandas and modin perform equal. However at 28'000'000 modin beats pandas overall but the reading time is longer. This is probably due to the missing materializer. If not, maybe saving with modin and reading with modin could be combined?

I also tried to initialize ray in advance, but whenever I ran the pipeline, another initialization takes place

This is the code I have used

from google.cloud import storage
from zenml import step, pipeline
import pandas as pd
# import modin.pandas as pd

import numpy as np
from datetime import datetime, timedelta
import time

# Set your GCS bucket name
GCS_BUCKET_NAME = "zenml-big-dataframe"
FILE_NAME = "my_data.csv"
FILE_PATH = "data/my_data.csv"

@step(enable_cache=False)
def save_df_to_gcs(num_rows: int = 10000) -> str:
    data = {
        "ID": range(1, num_rows + 1),
        "Value": np.random.rand(num_rows),
        "Date": [datetime.now() - timedelta(days=x % 365) for x in range(num_rows)],
    }
    df = pd.DataFrame(data)

    csv_file_path = FILE_NAME 

    df.to_csv(csv_file_path, index=False)

    # Initialize a client
    client = storage.Client()

    # Reference to the target bucket
    bucket = client.get_bucket(GCS_BUCKET_NAME)

    # Define the object name in the bucket (destination)
    blob = bucket.blob(FILE_PATH)

    # Upload the local file to GCS
    blob.upload_from_filename(csv_file_path)

    # Return the GCS path to the saved CSV file
    return f"gs://{GCS_BUCKET_NAME}/{FILE_PATH}"

@step(enable_cache=False)
def read_df_from_gcs(gcs_path: str) -> pd.DataFrame:

    # REad csv
    return pd.read_csv(gcs_path)

@pipeline(enable_cache=False)
def my_pipeline_gcs():
    start_time = time.time()

    gcs_path = save_df_to_gcs()  # 10k rows
    # gcs_path = save_df_to_gcs(num_rows=1000000)  # 1M rows
    # gcs_path = save_df_to_gcs(num_rows=5000000)  # 5M rows
    # gcs_path = save_df_to_gcs(num_rows=28000000)   # 28M rows
    read_df_from_gcs(gcs_path=gcs_path)

    end_time = time.time()
    elapsed_time = end_time - start_time
    print(f"Pipeline execution time: {elapsed_time} seconds")

# Run the pipeline when the script is executed directly
if __name__ == "__main__":
    my_pipeline_gcs()

I the local tests I ran this code (i forgot to share it last time)

from zenml import step, pipeline
# import pandas as pd
import modin.pandas as pd
import time
import numpy as np
from datetime import datetime, timedelta
import time

@step(enable_cache=False)
def save_df(num_rows: int = 10000) -> pd.DataFrame:
    data = {
        "ID": range(1, num_rows + 1),
        "Value": np.random.rand(num_rows),
        "Date": [datetime.now() - timedelta(days=x % 365) for x in range(num_rows)],
    }
    return pd.DataFrame(data)

@step(enable_cache=False)
def read_df(df: pd.DataFrame) -> pd.DataFrame:
    return df

@pipeline(enable_cache=False)
def my_pipeline():
    start_time = time.time()

    df = save_df()  # 10k rows
    # df = save_df(num_rows=1000000)  # 1M rows
    # df = save_df(num_rows=28000000)   # 28M rows
    reloaded_df = read_df(df=df)

    end_time = time.time()
    elapsed_time = end_time - start_time
    print(f"Pipeline execution time: {elapsed_time} seconds")

# Run the pipeline when the script is executed directly
if __name__ == "__main__":
    my_pipeline()

Let me know the next steps, I can try AWS and see what happens. But I think yo need to decide how to implement the materializer. We can have a quick call in zoom if you want to discuss the next steps

Files for local and gcs test can be seen here

Local: https://docs.google.com/spreadsheets/d/12qRGGBPdHGARav5z_1PoA1U7NrTECqdgZPcXOqbhr4I/edit?usp=sharing

GCS https://docs.google.com/spreadsheets/d/17CzytX9Mp4A7GLb65LQ-mFC6vW0-XOrNrs0fA588izU/edit?usp=sharing

htahir1 commented 10 months ago

Wow @benitomartin this is great! I am on the move right now but I can give some quick feedback (later tomorrow I'll add to this) -> Looks like you're testing reading/writing in a test but you should set probably already do this in a materializer with a remote artifact store (These two links will help):

benitomartin commented 10 months ago

Yes actually I was wondering If I have to connect gcs with zenml somehow, but could not manage to go through the documentation.This way I do not need to download the files

I will try to check tomorrow or tuesday and try to test with the integration of gcs and zenml.

One remark, when I run the pipelines I get the comment, that I shall run zenml up. It took me sometime to find the login command, just "default" as username and no password. It would be good to add this to the last comment of the pipeline, so that new users like me do not need to search for it

htahir1 commented 10 months ago

@benitomartin Good remark, the username and password are actually printed but only after the user is redirected! But maybe @strickvl has more ideas how to improve this

Regarding the task at hand, I think you are almost there. You just need to create the materializer and artifact store in your zenml server. I think the above two linked docs will give you enough info how to do it. Good luck and ping here if you need us!

strickvl commented 10 months ago

One remark, when I run the pipelines I get the comment, that I shall run zenml up. It took me sometime to find the login command, just "default" as username and no password. It would be good to add this to the last comment of the pipeline, so that new users like me do not need to search for it

Hi @benitomartin. Just wanted to check exactly when you're not seeing information about using default as the username etc since when you run zenml up exactly this instruction gets output to the terminal.

CleanShot 2024-01-15 at 10 46 01@2x

benitomartin commented 10 months ago

Hi @strickvl

I do not get this message. Maybe because I am using --bocking in windows. See below the whole pipeline warnings when running it with my code above with gcloud and modin.

modin_gcloud_pipeline

When I run zenml up --blocking, I do not get the username and password notification. So maybe you need to configure it for the ---blocking flag and windows users by adding the windows option for the pipeline run and the notification after running zenml up --blocking?

strickvl commented 10 months ago

Ah got it. I opened a PR that will fix this issue going forward.

htahir1 commented 9 months ago

@benitomartin I just remembered that another community member @christianversloot already added a polars method in this PR: https://github.com/zenml-io/zenml/pull/2229! So we're actually quite ahead at the curve already. Maybe we just need the warning now?

benitomartin commented 9 months ago

Ah ok. Sorry I have been busy these days and could not manage to move forward. @strickvl already added the warning in #2290, so I guess this issue is finished?

htahir1 commented 9 months ago

Yes! your tests were very useful though. Thank you. Id leave it up to @strickvl to close this

benitomartin commented 9 months ago

Ok! I will have a look If I can move forward with other contribution