ml6team / fondant

Production-ready data processing made easy and shareable
https://fondant.ai/en/stable/
Apache License 2.0
341 stars 25 forks source link

Out of memory caused by partition write operations #406

Closed mrchtr closed 1 year ago

mrchtr commented 1 year ago

Issue

The issue arises when we are writing the dask dataframe to parquet files. The process involves creating a separate Parquet file for each partition and column. We construct an execution graph where the final steps involve these write operations. However, dask does not execute the graph partitions sequentially, leading to inefficient memory usage and out-of-memory errors (related to issue 10480). The write step in the dask computation graph has dependencies on the columns of every partition.

image

We need to optimize and declutter the write operations for better memory management and stable execution of fondant components.

Option 1: Declutter execution graph

We could consider using delayed objects to simplify the execution graph. This involves wrapping each column write process of each partition into a delayed object, which could potentially enable sequential execution.

image

Pseudo code for wrapping the write process within a delayed object:

def write_partition_to_parquet(partition, partition_index, output_dir):
        partition.to_parquet(os.path.join(output_dir, f'partition_{partition_index}.parquet'))

# this needs to be done for each column, e.g. COLUMN_df.to_delayed()
for partition_index, partition in enumerate(index_df.to_delayed()):
        delayed_task = delayed(write_partition_to_parquet)(partition, partition_index, output_dir + "/id")
        delayed_tasks.append(delayed_task)

However, I didn't find a practical solution for this yet. The execution of the graph is still not handled sequentially by dask. Also the graph looks still not optimal, cause we have number_column x number_of_partitions computations steps at the end.

Maybe this could be a short-term solution if we can successfully execute the subgraphs sequentially.

Option 2: Scalable workaround

I have been exploring a potentially more stable solution by leveraging the dask client in combination with a LocalCluster. Instead of using the dataframe.compute() method, I have been experimenting with dataframe.persist(). This approach triggers asynchronous partition-wise computation and immediately returns a list of pointers containing dask futures.

A dask future is a representation of an asynchronous computation. It can have different statuses, such as "pending" when waiting to complete and "finished" when the computation has successfully finished.

To utilise this approach, I would iterate over the pointers of finished futures, retrieve the computed pandas dataframe, and then write it column-wise to a single parquet file. In my experiments, this method has proven to be quite stable. It effectively releases memory and can spill data to disk when I'm close to out-of-memory limits.

I have included the source code at the end of the issue for running the experiment locally. You have the flexibility to adjust the number of threads, workers, and the size of the data generated for the simulation.

This approach seems to be the most stable solution, but it does require some foundational changes to the fondant core. We would have to incorporate the usage of the dask client.

Test code

Requirements: dask==2023.5.0 distributed==2023.5.0 pandas==2.0.2 dask[diagnostics] # optional pyarrow==12.0.1

Diagnostics dashboard If you run the code locally, you can access localhost:8787/status to get additional insights into the execution.

import os
import sys
import time
import pandas as pd
from dask.distributed import LocalCluster, wait
import dask.dataframe as dd
from dask.distributed import futures_of
from concurrent.futures import ThreadPoolExecutor
from distributed import Client

# Arguments
output_dir = './tmp'
num_workers = 1
thread_per_worker = 1
memory_limit_per_worker = 2e9 # 2Gb
num_thread_saving_parquets = 1

# Helper functions
def artificial_delay(milliseconds):
    end_time = time.time() + milliseconds / 1000
    while time.time() < end_time:
        _ = 2 + 2  # Perform a simple computation

def process_partition(df, gb_to_produce = 1):
    """Dummy method for processing partition. Large objects are created to simulate oom."""
    print(df.head())
    print("process partition")
    artificial_delay(10000)
    counter = 0
    large_string = "t" * ((1024 * 1024 * 128)) # ~130Mb
    print(sys.getsizeof(large_string))

    to_append = []
    while counter < (10) * gb_to_produce: # x Gb in Total
        counter += 1
        to_append.append([counter, large_string])

    return pd.DataFrame(data=to_append, columns=["id", "url"])

if __name__ == "__main__":
    # Record the start time
    start_time = time.time()

    worker_kwargs = {
        "n_workers": num_workers,
        "memory_limit": memory_limit_per_worker,
        "threads_per_worker": thread_per_worker
    }

    cluster = LocalCluster(**worker_kwargs)
    client = Client(cluster)

    dataframe = dd.from_dict({"id": [0, 1, 2, 3], "url": ["a", "b", "c", "d"]}, npartitions=4)

    dataframe = dataframe.map_partitions(process_partition, meta=dataframe)

    # Getting back a list of futures. Futures are submitted to the local client and will be executed
    # asynchronously. The list contains pointers to futures.
    dataframe = dataframe.persist()

    # List of Dask futures
    futures_list = futures_of(dataframe)

    # Create a dictionary to map futures to partition indices
    # TODO: if partition size is larger as memory, and partition gets assigned to new worker the
    # the futures identifier might be changed
    future_to_partition = {}
    for partition_idx, future in enumerate(futures_list):
        future_to_partition[future] = partition_idx

    with ThreadPoolExecutor(max_workers=num_thread_saving_parquets) as executor:
        while futures_list:
            # Check the status of all futures
            completed, not_completed = wait(futures_list, return_when="FIRST_COMPLETED")

            # Process completed futures
            for future in completed:

                partition_idx = future_to_partition[future]

                # Pandas result for partition is available - and can be accessed immediately
                pandas_df = future.result()

                # Save the pandas dataframe column wise to parquet
                for column_name in dataframe.columns:
                    # Extract the column data as a Pandas Series
                    column_series = pandas_df[column_name]

                    # Create a Pandas DataFrame with the single column
                    column_df = column_series.to_frame(name=column_name)

                    # TODO check filepath is aligned with fondant
                    # Store to parquet file
                    # Create a subdirectory for each partition
                    partition_directory = os.path.join(output_dir, f"{column_name}")
                    os.makedirs(partition_directory, exist_ok=True)

                    output_filename = f"{partition_idx}.parquet"
                    output_path = os.path.join(partition_directory, output_filename)

                    # Save the Pandas DataFrame as a Parquet file
                    column_df.to_parquet(output_path)

                # Remove the completed future from the list
                futures_list.remove(future)

            # TODO:
            # Sleep or perform other work while waiting for more futures to complete
            # Check what could be useful here!

    end_time = time.time()
    elapsed_time = end_time - start_time
    print(f"Elapsed time after saving to parquets: {elapsed_time} seconds")

    # Shutdown client
    client.close()
mrchtr commented 1 year ago

Seems to be that the local cluster solves it for now. @RobbeSneyders should we close the issue?