apache / arrow-adbc

Database connectivity API standard and libraries for Apache Arrow
https://arrow.apache.org/adbc/
Apache License 2.0
328 stars 83 forks source link

[Python] Keeping cursors alive when using cursor.fetch_record_batch() #1893

Open davlee1972 opened 1 month ago

davlee1972 commented 1 month ago

What feature or improvement would you like to see?

When returning back a record batch reader, the associated cursor has to be kept alive until the record batch reader is consumed.

Something like the code below won't work because the data stream would be closed before the record batch reader is used.

def get_batch_reader(self):
    cursor = self.conn.cursor()
    cursor.execute("select abc from xyz")
    return cursor.fetch_record_batch()

fetch_record_batch() needs to create a copy of the cursor and keep it alive until the batch reader is consumed or goes out of scope.

DuckDB gets away with this because they manage cursors internally.

results = duckdb.sql("SELECT * FROM my_arrow_table").fetch_record_batch(chunk_size)

davlee1972 commented 4 weeks ago

I created a temporary hack for pyarrow.RecordBatchReader to fix this..

Here's the error example: Attempt to read from a stream that has already been closed

def get_reader():
    cursor = snowflake.source.conn.cursor()
    cursor.execute("select * from FUND_FLOWS limit 10")
    return cursor.fetch_record_batch()

my_reader = get_reader()
my_reader.read_all()
---------------------------------------------------------------------------
ArrowInvalid                              Traceback (most recent call last)
Cell In[6], line 7
      4     return cursor.fetch_record_batch()
      6 my_reader = get_reader()
----> 7 my_reader.read_all()

File [~/miniconda3/lib/python3.9/site-packages/pyarrow/ipc.pxi:757](http://my_server:8888/lab/tree/tutorials/~/miniconda3/lib/python3.9/site-packages/pyarrow/ipc.pxi#line=756), in pyarrow.lib.RecordBatchReader.read_all()

File [~/miniconda3/lib/python3.9/site-packages/pyarrow/error.pxi:91](http://my_server:8888/lab/tree/tutorials/~/miniconda3/lib/python3.9/site-packages/pyarrow/error.pxi#line=90), in pyarrow.lib.check_status()

ArrowInvalid: Attempt to read from a stream that has already been closed

Here's my temporary hack to create replacement classes for RecordBatchReader.. image