sfu-db / connector-x

Fastest library to load data from DB to DataFrames in Rust and Python
https://sfu-db.github.io/connector-x
MIT License
2.01k stars 162 forks source link

Python read_sql() crash: Too many open files when used with multiprocessing #565

Open miohtama opened 10 months ago

miohtama commented 10 months ago

What language are you using?

Python

What version are you using?

connectorx                      0.3.2    

What database are you using?

PosgreSQL.

Ubuntu Linux 22.04.

What dataframe are you using?

Arrow 2

Can you describe your bug?

I am running a loop that exports data from the database in slices.

The query I am using looks like:


    import connectorx as cx

    while cursor < end:

        query = \
            f"SELECT id, pair_id, amount0_in, amount0_out, amount1_in, amount1_out, denormalised_ts, tx_index, log_index, tx_hash, denorm_block_number, trader_address " \
            f"from swap " \
            f"WHERE id >= {cursor} and id < {cursor + slice} and denorm_chain_id = {chain_id} " \
            f"ORDER BY pair_id, denormalised_ts"

        pa_table: Table = cx.read_sql(
            db_url,
            query,
            partition_on="id",
            partition_num=worker_threads,
            return_type="arrow2"
        )

       # Do some transformation on pyarrow.Table here using multiprocessing

       cursor += slice

The loop is using multiprocessing module, but this is not touching ConnectionX, so I suspect some kind of interaction between these two.

After running a script for a while I get:

thread 'r2d2-worker-2' panicked at 'called `Result::unwrap()` on an `Err` value: Os { code: 24, kind: Uncategorized, message: "Too many open files" }', /github/home/.cargo/registry/src/index.crates.io-6f17d22bba15001f/postgres-0.19.4/src/config.rs:342:14
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
^[[B^[[B^[[B^[[B

Traceback (most recent call last):
  File "/root/mikko/oracle/scripts/generic/export-trades.py", line 89, in <module>
    stats = write_trades_cache(
  File "/root/mikko/oracle/dex_ohlcv/writer/parquet_trades.py", line 877, in write_trades_cache
    stats += write_uniswap_v2_trades_connectorx(
  File "/root/mikko/oracle/dex_ohlcv/writer/parquet_trades.py", line 520, in write_uniswap_v2_trades_connectorx
    pa_table: Table = cx.read_sql(
  File "/root/.cache/pypoetry/virtualenvs/dex-ohlcv-bhZziL_o-py3.10/lib/python3.10/site-packages/connectorx/__init__.py", line 297, in read_sql
    result = _read_sql(
RuntimeError: timed out waiting for connection

What are the steps to reproduce the behavior?

Run the export script that issues read_sql multiple times for long time.

I checked using lsof and it seems like (nameless?) FIFO pipes are increasing with each loop.

If there are ways to "reset" ConnectorX Python bindings and internals, I can see if this would help e.g. by manually purging/deleting any OS resources ConnectorX might hold.

miohtama commented 10 months ago

I rewrote the loop to not to do fork() within the loop and the resource leakage is gone. However the downside for this is that as I cannot rely fork() to pass down data from parent process to child processes, I need to serialise the data between the parent process and child processes (child cannot just read it from the forked memory) and this slows down the data ETL.