airbytehq / PyAirbyte

PyAirbyte brings the power of Airbyte to every Python developer.
https://docs.airbyte.com/pyairbyte
Other
215 stars 31 forks source link

💡 Feature Request: Add `to_arrow()` on `Dataset` class #204

Closed oberalppaolof closed 2 months ago

oberalppaolof commented 5 months ago

Arrow is a crucial library and support it natively is a big opportunity. So we can avoid to use to_pandas().

avirajsingh7 commented 3 months ago

Hey @aaronsteers can you assign me this one?

aaronsteers commented 3 months ago

@avirajsingh7 - It's yours! Thanks for volunteering!

avirajsingh7 commented 3 months ago

@aaronsteers is this correct idea to convert data into pandas then into arrow,

    def get_arrow_table(self, stream_name: str) -> pa.Table:
        table_name = self._read_processor.get_sql_table_name(stream_name)
        engine = self.get_sql_engine()

        df = pd.read_sql_table(table_name, engine, schema=self.schema_name)
        return pa.Table.from_pandas(df)

or I should process data from table using _read_processor

aaronsteers commented 3 months ago

@avirajsingh7 - I think that is a decent default implementation. However, this approach will require that the entire dataset fits into memory.

I've done some research on my side and I think what we want here is to return a pyarrow Dataset object instead of a PyArrow Table object.

For large datasets that will not fit into memory, I believe the Dataset construct will allow us to break the entire table's data into chunks when necessary, avoiding crashing in scenarios where RAM is limited or the data size is just huge.

In terms of the actual implementation, I don't think we need to have very complex handing at this time. As long as the return type will allow us to refactor in the future, we should be good with an implementation that just puts the whole thing in memory, similar to your example above.

The Python example here shows a "zero copy" DuckDB version that returns a dataset object:

But for the generic implementation, we might just have to go through pandas.

avirajsingh7 commented 3 months ago

@aaronsteers pd.read_sql_table has option for chunk_size,

Screenshot 2024-06-25 at 10 47 51 PM

We can get actual dataset in smaller pandas dataframe and creating a pyarrow_tables from them,

Later on returning pyarrow dataset to users,


    def get_arrow_dataset(self, stream_name: str) :

        table_name = self._read_processor.get_sql_table_name(stream_name)
        engine = self.get_sql_engine()

        chunks = pd.read_sql_table(table_name, engine, schema=self.schema_name,chunksize=500000)

        first_chunk = next(chunks)
        combined_schema = pa.Schema.from_pandas(first_chunk)
        arrow_tables = []

        pa.Table.from_pandas(first_chunk, schema=combined_schema)
        for chunk in chunks:

            # Convert each chunk to Arrow Table
            chunk_arrow_table = pa.Table.from_pandas(chunk, schema=combined_schema)
            arrow_tables.append(chunk_arrow_table)

        dataset = pa.dataset.dataset(arrow_tables)
        return dataset

Once we have dataset , users can access and manipulate the dataset as needed

What are your views on this approach?

avirajsingh7 commented 3 months ago

@aaronsteers Raised PR for this one