pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
30.68k stars 1.99k forks source link

`write_parquet` followed by `read_parquet` may lead to a deadlock #19380

Open MTandHJ opened 1 month ago

MTandHJ commented 1 month ago

Checks

Reproducible example

                    chunk.write_parquet(
                        os.path.join(
                            path,
                            self.DEFAULT_PARQUET_FILE.format(chunk=k)
                        )
                    )
        for k in range(num_chunks):
            parquet_file = path.format(chunk=k)
            yield pl.read_parquet(
                parquet_file, columns=columns
            )

Log output

None.

Issue description

I use polars to preprocess the .csv data and write it into parquet files. However, I found if I immediately read this file (using read_parquet), the process will be hanged in case multiprocess sampling (num_workers > 0) is adopted in DataLoader (torch). Moreover, when I restart the script again (no need to process the data), the data loading process can be regular. Hence, maybe something after write_parquet is not completely terminated?

Thanks!

Expected behavior

None.

Installed versions

``` --------Version info--------- Polars: 1.9.0 Index type: UInt32 Platform: Linux-5.15.0-118-generic-x86_64-with-glibc2.31 Python: 3.9.18 (main, Sep 11 2023, 13:41:44) [GCC 11.2.0] ----Optional dependencies---- adbc_driver_manager altair cloudpickle connectorx deltalake fastexcel fsspec 2023.10.0 gevent great_tables matplotlib 3.8.3 nest_asyncio 1.6.0 numpy 1.22.4 openpyxl pandas 2.2.1 pyarrow pydantic pyiceberg sqlalchemy torch 2.2.2 xlsx2csv xlsxwriter ```
coastalwhite commented 1 month ago

Can you provide a somewhat reproducible example of this happening?

MTandHJ commented 1 month ago

The following codes is hanged:

import torchdata.datapipes as dp
import polars as pl
from torchdata.dataloader2 import DataLoader2, MultiProcessingReadingService

class DataSet(dp.iter.IterDataPipe):

    def __init__(self):
        super().__init__()

        df = pl.DataFrame(
            {
                'a': range(10),
                'b': range(10),
                'c': range(10),
            }
        )

        df.write_parquet("test.parquet")
        print(">>> Save parquet")

    def __iter__(self):
        df = pl.read_parquet("test.parquet")
        print(">>> Load parquet")
        for row in df.iter_rows():
            print(">>> Yield row")
            yield row

dataloader = DataLoader2(
    DataSet(),
    reading_service=MultiProcessingReadingService(num_workers=4)
)

for row in dataloader:
    print(row)

Outputs:

>>> Save parquet

The following code is regular:

import torchdata.datapipes as dp
import polars as pl
from torchdata.dataloader2 import DataLoader2, MultiProcessingReadingService

class DataSet(dp.iter.IterDataPipe):

    def __init__(self):
        super().__init__()

        df = pl.DataFrame(
            {
                'a': range(10),
                'b': range(10),
                'c': range(10),
            }
        )

        # test.parquet is already saved
        # df.write_parquet("test.parquet")
        # print(">>> Save parquet")

    def __iter__(self):
        df = pl.read_parquet("test.parquet")
        print(">>> Load parquet")
        for row in df.iter_rows():
            print(">>> Yield row")
            yield row

dataloader = DataLoader2(
    DataSet(),
    reading_service=MultiProcessingReadingService(num_workers=4)
)

for row in dataloader:
    print(row)

Outputs:

>>> Load parquet
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Load parquet
>>> Yield row
>>> Yield row
>>> Yield row
>>> Load parquet
>>> Load parquet
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
(0, 0, 0)
(0, 0, 0)
(0, 0, 0)
(0, 0, 0)
(1, 1, 1)
(1, 1, 1)
(1, 1, 1)
(1, 1, 1)
(2, 2, 2)
(2, 2, 2)
(2, 2, 2)
(2, 2, 2)
(3, 3, 3)
(3, 3, 3)
(3, 3, 3)
(3, 3, 3)
(4, 4, 4)
(4, 4, 4)
(4, 4, 4)
(4, 4, 4)
(5, 5, 5)
(5, 5, 5)
(5, 5, 5)
(5, 5, 5)
(6, 6, 6)
(6, 6, 6)
(6, 6, 6)
(6, 6, 6)
(7, 7, 7)
(7, 7, 7)
(7, 7, 7)
(7, 7, 7)
(8, 8, 8)
(8, 8, 8)
(8, 8, 8)
(8, 8, 8)
(9, 9, 9)
(9, 9, 9)
(9, 9, 9)
(9, 9, 9)
MTandHJ commented 1 month ago

Can you provide a somewhat reproducible example of this happening?

Besides, if the number_workers is set to 0:

dataloader = DataLoader2(
    DataSet(),
    reading_service=MultiProcessingReadingService(num_workers=0)
)

Outputs:

>>> Load parquet
>>> Yield row
(0, 0, 0)
>>> Yield row
(1, 1, 1)
>>> Yield row
(2, 2, 2)
>>> Yield row
(3, 3, 3)
>>> Yield row
(4, 4, 4)
>>> Yield row
(5, 5, 5)
>>> Yield row
(6, 6, 6)
>>> Yield row
(7, 7, 7)
>>> Yield row
(8, 8, 8)
>>> Yield row
(9, 9, 9)
ritchie46 commented 1 month ago

You are using multiprocessing. That is likely the source of your deadlocks:

https://docs.pola.rs/user-guide/misc/multiprocessing/

MTandHJ commented 1 month ago

You are using multiprocessing. That is likely the source of your deadlocks:

https://docs.pola.rs/user-guide/misc/multiprocessing/

Thanks! This answers the raising of deadlocks:

For reading the file with pl.read_parquet the file has to be locked. Then os.fork() is called, copying the state of the parent process, including mutexes. Thus all child processes will copy the file lock in an acquired state, leaving them hanging indefinitely waiting for the file lock to be released, which never happens.

Somewhat differently, in my case the file lock is caused by write_parquet instead of read_parquet. It will be helpful if the state can be manually changed after saving (any risk here?).

Thanks again!