Eventual-Inc / Daft

Distributed data engine for Python/SQL designed for the cloud, powered by Rust
https://getdaft.io
Apache License 2.0
2.34k stars 164 forks source link

A lazy-executing `ray.data.Datasource` for `daft.DataFrame` #3374

Open NellyWhads opened 1 day ago

NellyWhads commented 1 day ago

Is your feature request related to a problem?

The current APIs to convert a daft.DataFrame to a ray.Dataset include:

daft.DataFrame.to_ray_dataset daft.DataFrame.write_parquet -> Load directly using ray.data.read_parquet daft.DataFrame.to_torch_map_dataset daft.DataFrame.to_torch_iter_dataset

All of these implementations require materialization of the entire dataframe. This is of particular concern for large datasets with images and other dense objects which cannot fit into system/ray object memory.

Describe the solution you'd like

I would like to see a ray.data.Datasource which creates a ray.data.Dataset instead of a ray.data.MaterializedDataset. The implementation should lazily-reference the appropriate block/partition of the dataframe and materialize it.

I'm more than willing to continue trying things out, but need some guidance from the devs on what may be a reasonable next step.

Describe alternatives you've considered

See links in the description.

The only "working" alternative solution I have so far is the following, rather crude, implementation:

class DaftMaterializedDataSource(Datasource):
    def __init__(self, results: List[RayMaterializedResult], schema: pa.Schema):
        self._results = results
        self._schema = schema

    def estimate_inmemory_data_size(self) -> None:
        return None

    def get_read_tasks(
        self,
        parallelism: int,
    ) -> List[ReadTask]:
        assert len(self._results) == parallelism, f"Expected {parallelism=} and {len(self._results)=} to match."

        return [
            ReadTask(
                read_fn=lambda result=result: [ray.get(_make_ray_block_from_micropartition.remote(result.partition()))],
                metadata=BlockMetadata(
                    num_rows=None,
                    size_bytes=None,
                    schema=self._schema,
                    input_files=None,
                    exec_stats=None,
                ),
            )
            for result in self._results
        ]

    def get_name(self) -> str:
        return "DaftMaterialized"

# Make a dataframe with a strong pyarrow-compatible schema (ie. no `daft.DataType.python()` columns)
# Add some udf methods to create new columns to make it easy to trace the lazy execution
df = ...

datasource = DaftMaterializedDataSource(
    # This is effectively a pedantic way to call `run()`
    results=list(daft.context.get_context().runner().run_iter(df._builder)),
    schema=df.schema().to_pyarrow_schema(),
)

dataset = ray.data.read_datasource(datasource=datasource, override_num_blocks=num_partitions)

dataset.show()

Additional Context

I have found the following limitations after a few implementation attempts:

  1. Partitioning logic can only be applied to a dataframe and its logical plan. This means that we can't use the Datasource parallelism configuration to set the partitions on the dataframe in a meaningful way. If the dataframe was not partitioned before applying a set of logical operations, the posthumous partitioning call will have no practical effect. This makes a lot of sense as implemented, since global operations like groupby would need access to the entire dataframe at a time. I don't know how this could be circumvented.
  2. The df._builder (perhaps other properties as well) cannot be serialized to a ray object ref. This prevent us from passing a dataframe into the constructor of a ray.data.Datasource.
  3. RayMaterializedResult can only be produced from a generator. ray.data.Datasource.get_read_tasks() needs to create a list of tasks, which thereby requires a call to list(daft.context.get_context().runner().run_iter(df._builder)), which effectively materializes the entire dataframe.
  4. Scheduler doesn't seem to provide a way listing/accessing individual threads. This leads to the same limitation as above.

Would you like to implement a fix?

Yes

desmondcheongzx commented 23 hours ago

Hi @NellyWhads, thanks for raising this issue! I'll probably take a closer look later this week to see what it would take to make this happen. Our initial impression is that this should be very doable. If you'd like to work on this we can also give you detailed pointers once we've taken that closer look.

NellyWhads commented 22 hours ago

Thanks @desmondcheongzx - I'm not particularly tied to having my name on the implementation. Though I have a working familiarity with ray, I imagine the active maintainers here may have some more experience and insight. If you're willing to implement a fully working solution, I'd be equally as appreciative!