pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
29.31k stars 1.86k forks source link

`collect_async` is blocking #18718

Open winstxnhdw opened 6 days ago

winstxnhdw commented 6 days ago

Checks

Reproducible example

from typing import TypedDict, Awaitable, Iterable
from polars import DataFrame, LazyFrame

class Word(TypedDict):
    id: int
    text: str
    start_time: float
    end_time: float

class Speaker(TypedDict):
    speaker: str
    start_time: float
    end_time: float

def get_intersection(transcription: Iterable[Word], diarisation: Iterable[Speaker]) -> Awaitable[DataFrame]:
    intersection_expression = min_horizontal(
        'end_time',
        'end_time_right',
    ) - max_horizontal(
        'start_time',
        'start_time_right',
    )

    return (
        LazyFrame(transcription)
        .join(LazyFrame(diarisation), how='cross')
        .with_columns(intersection_expression.alias('intersection'))
        .collect_async()
    )

Log output

No response

Issue description

In polars, Most of the CPU-bound activities happen in Rust where the Python GIL is dropped. Ideally, collect_async should take advantage of this for polars to maximise CPU usage. As of right now, collect_async will block the main event loop and stop your single worker server from handling any more requests until the DataFrame is created.

EDIT: DataFrame creation is blocking the event loop. We can fix this by running it in a separate thread.

Expected behavior

collect_async should not block the main event loop and act as an actual async function that will allow Python to perform context switching and process other tasks that drop the GIL concurrently.

Installed versions

``` Polars: 1.7.0 Index type: UInt32 Platform: Linux-5.15.153.1-microsoft-standard-WSL2-x86_64-with-glibc2.40 Python: 3.12.5 (main, Aug 9 2024, 08:20:41) [GCC 14.2.1 20240805] ----Optional dependencies---- adbc_driver_manager altair cloudpickle connectorx deltalake fastexcel fsspec 2024.9.0 gevent great_tables matplotlib 3.9.2 nest_asyncio numpy 1.26.4 openpyxl pandas 2.2.2 pyarrow 16.1.0 pydantic 2.9.1 pyiceberg sqlalchemy 2.0.34 torch 2.3.1+cu121 xlsx2csv xlsxwriter ```
Object905 commented 6 days ago

Well I happen to implement that 😃

About until the DataFrame is created - is the bottleneck here is creation of LazyFrames? Because LazyFrames are actually always created eagerly from python (in init it creates regular DataFrame by iterating over all objects, then calls .lazy()). And later collect_async does only join+with_columns part asynchronously. To support something like this LazyFrames has to support "scanning" python iterables/generators, etc, which would actually be a great feature.

It does use polars threadpool here that runs collect on background, and then only resolves the future with GIL acquired. It should work with scan_parquet, scanscv, scan*, because they're executed on rust side in .collect and not eagerly.

winstxnhdw commented 6 days ago

About until the DataFrame is created - is the bottleneck here is creation of LazyFrames?

Hmm.. Is there a way I can verify this? The LazyFrame only has ~50 rows but it is blocking the event loop for more than 10 seconds, which I think is an usually long time for creating such a trivial DataFrame.

It does use polars threadpool here that runs collect on background, and then only resolves the future with GIL acquired.

I see that you are spawning the pool in Rust. I am not sure about PyO3 but in my experience, I found that spawning a separate background thread in a C++ extension would still block the event loop as Python still has to constantly poll the job for completion (which is blocking unless you spawn a thread from Python and wrap the future).

Object905 commented 6 days ago

Well it's a different story then if it's only 50 rows. Care to provide data to reproduce?

Theoretically It shouldn't block event loop, because, because it's only resolving a future by calling this callback here at the end of exectuon of .collect on rust side, And does not hold GIL during actual collect.

winstxnhdw commented 6 days ago

I will get the data for you tomorrow.

not hold GIL during actual collect

Not holding the GIL is one thing, but it still needs to run in a separate thread for it to not block the event loop. If resolving the future involves some kind of polling of the results, the event loop will still be blocked.

Object905 commented 6 days ago

Internally it may pool on the results, but if it were to pool by blocking whole loop that would defeat the whole purpose of Future

Object905 commented 6 days ago

Hmm.. Is there a way I can verify this? (LazyFrame creation)

You can try to create LazyFrames on separate lines and time only that (or event just print-debug). Because init is sync.

winstxnhdw commented 5 days ago

You are right. LazyFrame's creation is indeed the one responsible for blocking the event loop. I took your advice and tried using scan_ndjson but it was still blocking the event loop.

def get_intersection(transcription: IO[bytes], diarisation: IO[bytes]) -> Awaitable[DataFrame]:
    intersection_expression = min_horizontal(
        'end_time',
        'end_time_right',
    ) - max_horizontal(
        'start_time',
        'start_time_right',
    )

    return (
        scan_ndjson(transcription)
        .join(scan_ndjson(diarisation), how='cross')
        .with_columns(intersection_expression.alias('intersection'))
        .collect_async()
    )
deanm0000 commented 5 days ago

When is the blocking happening? The output of the function is an awaitable so it seems like we need more info.

Are you doing something simple like this?

df_awaitable= get_intersection(tr, di)
df = await df

more like this?

asyncio.get_running_loop().run_until_complete(df_awaitable)

something else?

winstxnhdw commented 5 days ago

I am doing the former. I expect Awaitables to completely use the event loop. It defeats the purpose to use collect_async if I were to also run it in a thread pool. I could just use collect then.

deanm0000 commented 5 days ago

You still have the issue that you're scanning an IO object so even though the polars binary would be non-blocking, it's getting its data from python which is blocking.

Try either making the inputs eager DFs or files.

In the case where the input is LazyFrame(transcription: dict) you're going to have to wait for polars to parse through all its dicts to send to arrow memory.

In the case where the input is scan_ndjson(transcription: IO) every time scan is looking at data it needs python which (I think) would be blocking.

So maybe try one of these

def get_intersection(transcription: pl.DataFrame, diarisation: pl.DataFrame) -> Awaitable[DataFrame]:
    transcription=transcription.lazy()
    diarisation=diarisation.lazy()

or

def get_intersection(transcription: Path, diarisation: Path) -> Awaitable[DataFrame]:
    transcription=pl.scan_ndjson(transcription)
    diarisation=pl.scan_ndjson(diarisation)
winstxnhdw commented 5 days ago

The problem is that polars should be non-blocking end-to-end. This should be possible since reading from IO is async/await’s strength.

For now, collect_async is redundant in practice, and running collect in a thread pool + wrap futures would have the same effect. I suggest we add this to the documentation to reduce the amount of time wasted on this issue.