apache / arrow

Apache Arrow is the universal columnar format and multi-language toolbox for fast data interchange and in-memory analytics
https://arrow.apache.org/
Apache License 2.0
14.49k stars 3.52k forks source link

[Python][C++] Segfault reading bz2 compressed csv file with CSVStreamingReader #43604

Open jpfeuffer opened 2 months ago

jpfeuffer commented 2 months ago

Describe the bug, including details regarding any error messages, version, and platform.

I am getting reproducible segfaults when trying to close a CSVWriter in the simplest code possible.

import pyarrow as pa
from pyarrow import csv
import argparse

parser = argparse.ArgumentParser(description="Converts a potentially bz2/gz compressed csv file to a tsv/smi file with no header and just 'smiles' and 'id' columns.")
parser.add_argument("input_file", help="Path to the input bz2 file")
parser.add_argument("output_file", help="Path to the output csv file")
parser.add_argument("--chunksize", type=int, default=2**30, help="Size of block to process at a time")
parser.add_argument("--num_chunks", type=int, default=0, help="Number of chunks to process")

args = parser.parse_args()

input_file = args.input_file
output_file = args.output_file
chunksize = args.chunksize

readopt = csv.ReadOptions(block_size=chunksize)
parseopt = csv.ParseOptions(delimiter="\t")
convertopt = csv.ConvertOptions(column_types={"smiles": pa.string(), "id": pa.string()}, include_columns=["smiles", "id"])

reader = csv.open_csv(
    input_file,
    read_options=readopt,
    parse_options=parseopt,
    convert_options=convertopt
)

writeopt = csv.WriteOptions(include_header=False, delimiter="\t", quoting_style="none")
# schema is just smiles and id column, both strings
schema = pa.schema([
    ('smiles', pa.string()),
    ('id', pa.string())
])

with csv.CSVWriter(sink=output_file, schema=schema, write_options=writeopt) as writer:

    cnt = 0
    while cnt < args.num_chunks or args.num_chunks == 0:
        try:
            writer.write_batch(reader.read_next_batch())
            cnt += 1
            print("Finished chunk ", cnt)
        except StopIteration:
            break

    print("Done. Closing file.")

Output for num_chunks = 2 e.g.: It hangs for a long time after "Closing file."

Finished chunk  1
Finished chunk  2
Done. Closing file.
Segmentation fault (core dumped)

linux aarch64, pyarrow 17 (tried both pip and conda-forge), also tried many combinations of filesystem, pa.OSFile, batch_sizes, closing manually vs contexts, etc. My input file is b2zipped.

The culprit seems to be the limiting of the number of chunks that I added for testing. Writing ALL chunks seems to work.

Component(s)

Python

jorisvandenbossche commented 2 months ago

@jpfeuffer thanks for the report!

Could you also share a (snippet of) the csv file (or a similar one with dummy data) to see if we can reproduce this?

jpfeuffer commented 2 months ago

Unfortunately nothing small enough to upload. And since it is not working I cannot produce a subset easily. (Not sure a 22 MB subset for GitHub would be very representative either).

Data is from here. Warning 15GB. https://ftp.enamine.net/download/REAL/Enamine_REAL_HAC_11_21_666M_CXSMILES.cxsmiles.bz2

jpfeuffer commented 2 months ago

By the way, I just saw that even if I do not limit the chunks (and the process succeeds without segfault), the written file is always corrupt statrting from a few thousand lines in, with lines being mixed together etc.

Is something wrong with my code? Regular decompression works fine.

jpfeuffer commented 2 months ago

Hmm so I found the "error". If I use exactly the schema from the first read batch, it works (no segfaults, no garbled output).

cnt = 0
writer_init = False
while cnt < args.num_chunks or args.num_chunks == 0:
    try:
        b = reader.read_next_batch()
        if not writer_init:
            print(b.schema)
            writer = csv.CSVWriter(
                output_file,
                write_options=writeopt,
                schema=b.schema
            )
        writer.write_batch(b)
        cnt += 1
        print("Finished chunk ", cnt)
    except StopIteration:
        break
print("Done. Closing file.")
if writer_init:
    writer.close()

But if I print the schema:

smiles: string
id: string

So, what is different from manually specifying the schema like I did in the beginning?

schema = pa.schema([
    ('smiles', pa.string()),
    ('id', pa.string())
])
jorisvandenbossche commented 2 months ago

@jpfeuffer I was able to reproduce the segfault with the big file you provided. Running with gdb gives the following backtrace:

``` Finished chunk 1 Finished chunk 2 Finished chunk 3 Finished chunk 4 Done. Closing file. [Thread 0x7fff49c8f640 (LWP 911090) exited] Thread 22 "python" received signal SIGSEGV, Segmentation fault. [Switching to Thread 0x7fff4a7ff640 (LWP 911089)] 0x00007ffff604f87d in arrow::internal::Executor::Spawn, arrow::Future >, arrow::Result > >(arrow::Future >, bool)::{lambda(arrow::Result > const&)#2}::operator()(arrow::Result > const&)::{lambda()#1}>(arrow::internal::Executor::DoTransfer, arrow::Future >, arrow::Result > >(arrow::Future >, bool)::{lambda(arrow::Result > const&)#2}::operator()(arrow::Result > const&)::{lambda()#1}&&) (this=this@entry=0x5555560f7a70, func=...) at /home/joris/scipy/repos/arrow/cpp/src/arrow/util/thread_pool.h:85 85 return SpawnReal(TaskHints{}, std::forward(func), StopToken::Unstoppable(), (gdb) bt #0 0x00007ffff604f87d in arrow::internal::Executor::Spawn, arrow::Future >, arrow::Result > >(arrow::Future >, bool)::{lambda(arrow::Result > const&)#2}::operator()(arrow::Result > const&)::{lambda()#1}>(arrow::internal::Executor::DoTransfer, arrow::Future >, arrow::Result > >(arrow::Future >, bool)::{lambda(arrow::Result > const&)#2}::operator()(arrow::Result > const&)::{lambda()#1}&&) (this=this@entry=0x5555560f7a70, func=...) at /home/joris/scipy/repos/arrow/cpp/src/arrow/util/thread_pool.h:85 #1 0x00007ffff604fb83 in arrow::internal::Executor::DoTransfer, arrow::Future >, arrow::Result > >(arrow::Future >, bool)::{lambda(arrow::Result > const&)#2}::operator()(arrow::Result > const&) (result=..., __closure=0x7fff3c008778) at /home/joris/scipy/repos/arrow/cpp/src/arrow/util/thread_pool.h:247 #2 arrow::Future >::WrapResultOnComplete::Callback, arrow::Future >, arrow::Result > >(arrow::Future >, bool)::{lambda(arrow::Result > const&)#2}>::operator()(arrow::FutureImpl const&) && ( impl=..., this=0x7fff3c008778) at /home/joris/scipy/repos/arrow/cpp/src/arrow/util/future.h:442 #3 arrow::internal::FnOnce::FnImpl >::WrapResultOnComplete::Callback, arrow::Future >, arrow::Result > >(arrow::Future >, bool)::{lambda(arrow::Result > const&)#2}> >::invoke(arrow::FutureImpl const&) (this=0x7fff3c008770, a#0=...) at /home/joris/scipy/repos/arrow/cpp/src/arrow/util/functional.h:152 #4 0x00007ffff6283c01 in arrow::internal::FnOnce::operator()(arrow::FutureImpl const&) && (a#0=..., this=0x7fff3c009980) at /home/joris/conda/envs/arrow-dev/x86_64-conda-linux-gnu/include/c++/12.3.0/bits/unique_ptr.h:191 #5 arrow::ConcreteFutureImpl::RunOrScheduleCallback (in_add_callback=, callback_record=..., self=...) at /home/joris/scipy/repos/arrow/cpp/src/arrow/util/future.cc:110 #6 arrow::ConcreteFutureImpl::RunOrScheduleCallback (self=..., callback_record=..., in_add_callback=) at /home/joris/scipy/repos/arrow/cpp/src/arrow/util/future.cc:100 #7 0x00007ffff6283fd5 in arrow::ConcreteFutureImpl::DoMarkFinishedOrFailed (this=, state=) at /home/joris/scipy/repos/arrow/cpp/src/arrow/util/future.cc:148 #8 0x00007ffff628428a in arrow::ConcreteFutureImpl::DoMarkFinished (this=) at /home/joris/scipy/repos/arrow/cpp/src/arrow/util/future.cc:39 #9 0x00007ffff604ed8c in arrow::Future >::DoMarkFinished (res=arrow::Result>(...), this=0x7fff4a7fecb0) at /home/joris/scipy/repos/arrow/cpp/src/arrow/util/future.h:658 #10 arrow::Future >::MarkFinished (this=this@entry=0x7fff4a7fecb0, res=arrow::Result>(...)) at /home/joris/scipy/repos/arrow/cpp/src/arrow/util/future.h:403 #11 0x00007ffff60563d6 in arrow::BackgroundGenerator >::WorkerTask (state=...) at /home/joris/scipy/repos/arrow/cpp/src/arrow/util/async_generator.h:1797 #12 0x00007ffff605686c in arrow::BackgroundGenerator >::State::DoRestartTask(std::shared_ptr >::State>, arrow::util::Mutex::Guard)::{lambda()#1}::operator()() const (__closure=) at /home/joris/scipy/repos/arrow/cpp/src/arrow/util/async_generator.h:1666 #13 arrow::internal::FnOnce::FnImpl >::State::DoRestartTask(std::shared_ptr >::State>, arrow::util::Mutex::Guard)::{lambda()#1}>::invoke() (this=) at /home/joris/scipy/repos/arrow/cpp/src/arrow/util/functional.h:152 #14 0x00007ffff62c568c in arrow::internal::FnOnce::operator()() && (this=0x7fff4a7fedc0) at /home/joris/conda/envs/arrow-dev/x86_64-conda-linux-gnu/include/c++/12.3.0/bits/unique_ptr.h:191 #15 arrow::internal::WorkerLoop (it=..., state=...) at /home/joris/scipy/repos/arrow/cpp/src/arrow/util/thread_pool.cc:457 #16 operator() (__closure=) at /home/joris/scipy/repos/arrow/cpp/src/arrow/util/thread_pool.cc:618 #17 std::__invoke_impl > (__f=...) at /home/joris/conda/envs/arrow-dev/x86_64-conda-linux-gnu/include/c++/12.3.0/bits/invoke.h:61 #18 std::__invoke > (__fn=...) at /home/joris/conda/envs/arrow-dev/x86_64-conda-linux-gnu/include/c++/12.3.0/bits/invoke.h:96 #19 std::thread::_Invoker > >::_M_invoke<0> (this=) at /home/joris/conda/envs/arrow-dev/x86_64-conda-linux-gnu/include/c++/12.3.0/bits/std_thread.h:279 #20 std::thread::_Invoker > >::operator() (this=) at /home/joris/conda/envs/arrow-dev/x86_64-conda-linux-gnu/include/c++/12.3.0/bits/std_thread.h:286 #21 std::thread::_State_impl > > >::_M_run(void) (this=) at /home/joris/conda/envs/arrow-dev/x86_64-conda-linux-gnu/include/c++/12.3.0/bits/std_thread.h:231 #22 0x00007ffff4cf0e95 in std::execute_native_thread_routine (__p=) at ../../../../../libstdc++-v3/src/c++11/thread.cc:104 #23 0x00007ffff7c94ac3 in start_thread (arg=) at ./nptl/pthread_create.c:442 #24 0x00007ffff7d26850 in clone3 () at ../sysdeps/unix/sysv/linux/x86_64/clone3.S:81 ```

If I unpack the archive, and read the csv file directly (so without the automatic decompression), it doesn't segfault.

Based on that, trying to create a simpler reproducer with some generated data:

import pyarrow as pa
from pyarrow import csv
from pyarrow.tests.util import rands

size = 500_000
random_strings = [rands(10) for _ in range(size//100)]*100
table = pa.table({"col1": range(size), "col2": random_strings})

with pa.CompressedOutputStream("data.csv.bz2", "bz2") as out:
    csv.write_csv(table, out)

Reading this compressed file with a similar script as your original file:

import pyarrow as pa
from pyarrow import csv

input_file = "data.csv.bz2"
output_file = "test_out.csv"
num_chunks = 2

schema = pa.schema([('col1', pa.string()), ('col2', pa.string())])
convertopt = csv.ConvertOptions(column_types=schema)

with csv.open_csv(input_file, convert_options=convertopt) as reader:
    with csv.CSVWriter(output_file, schema=schema) as writer:

        cnt = 0
        while cnt < num_chunks or num_chunks == 0:
            try:
                writer.write_batch(reader.read_next_batch())
                cnt += 1
                print("Finished chunk ", cnt)
            except StopIteration:
                break

        print("Done. Closing file.")

also segfaults (similarly, if I increase the num_chunks so that the file is read until the end, it doesn't segfault).

jpfeuffer commented 2 months ago

That's great (I guess 😅). Btw: Did you see my comment with the update? It seems like it has something to do with the schema used.

jorisvandenbossche commented 2 months ago

Yes, was still trying to reproduce that observation ;) Now, as you mentioned, the schema is exactly the same, so it should be something else in the restructured code that causes the change.

By the way, I just saw that even if I do not limit the chunks (and the process succeeds without segfault), the written file is always corrupt statrting from a few thousand lines in, with lines being mixed together etc.

That's not something I see with my reproducer above, I think (with mixed together, you mean that there are newline markers missing?)

jorisvandenbossche commented 2 months ago

By the way, I just saw that even if I do not limit the chunks (and the process succeeds without segfault), the written file is always corrupt statrting from a few thousand lines in, with lines being mixed together etc.

That's not something I see with my reproducer above, I think (with mixed together, you mean that there are newline markers missing?)

Also, with the original file, after how rows/chunks do you see this? (to see if I can reproduce this part with the original file)

jpfeuffer commented 2 months ago

By the way, I just saw that even if I do not limit the chunks (and the process succeeds without segfault), the written file is always corrupt statrting from a few thousand lines in, with lines being mixed together etc.

That's not something I see with my reproducer above, I think (with mixed together, you mean that there are newline markers missing?)

Mmh I am not sure if it was 'just' missing newlines. It felt like it was even more garbled as if the concurrent streams are not flushed at the right time.

It was about after 1500 rows. I think if you do a few chunks and then tail the file you should see it.

jorisvandenbossche commented 2 months ago

With the original bz2 file, reading 2 chunks with a 2**30 blocksize (as in your original example), and then running tail -n 50 test_out.csv gives:

``` CCNC(=S)NCCCOCC1CCCO1 m_8____134546____83696 O=C(NCCCOCC1CCCO1)C(Br)Br m_22____4836780____20825978 O=C(NCCCOCC1CCCO1)C(Cl)Cl m_22____4836780____20853970 CCC(CC)NCCCOCC1CCCO1 m_270004____8288964____8798310 N#CCC(=O)NCCCOCC1CCCO1 m_22____4836780____8794444 CC(NCCCOCC1CCCO1)C1CC1 m_269862____7193234____7200230 COCC(=O)NCCCOCC1CCCO1 m_527____154721____157834 CC(C)C(C)NCCCOCC1CCCO1 m_270004____8288964____8334612 C(CNC1CCSC1)COCC1CCCO1 m_270004____8288964____8157326 CCS(=O)(=O)NCCCOCC1CCCO1 m_40____134551____6289440 CN(C)C(=O)NCCCOCC1CCCO1 m_68____1082804____22160914 CCNC(N)=NCCCOCC1CCCO1 m_264822____3500278____3499528 CNC(=NCCCOCC1CCCO1)NC m_264822____3500278____3499530 NC(=O)CCNCCCOCC1CCCO1 m_273610____11522760____11515976 CSCC(=O)NCCCOCC1CCCO1 m_527____154721____237535 CCC(O)CNCCCOCC1CCCO1 m_273610____11522760____11516276 CCCC(C)NCCCOCC1CCCO1 m_269862____7193234____7200560 CN(C)C(N)=NCCCOCC1CCCO1 m_264822____3500278____3499612 NCCC(=O)NCCCOCC1CCCO1 m_240690____7353008____3025680 CCC(C)CNCCCOCC1CCCO1 m_270004____8288964____9139664 CC(N)C(=O)NCCCOCC1CCCO1 m_240690____7353008____3025692 O=C(CCBr)NCCCOCC1CCCO1 m_270062____7616918____20844442 O=C(CCO)NCCCOCC1CCCO1 m_487____151097____12879784 O=S(=O)(CCl)NCCCOCC1CCCO1 m_40____134551____10300412 CC(C)(C)CNCCCOCC1CCCO1 m_270004____8288964____7548792 NC(=O)C(=O)NCCCOCC1CCCO1 m_22____4836780____12053348 NOCC(=O)NCCCOCC1CCCO1 m_240690____7353008____13151248 CC(CCO)NCCCOCC1CCCO1 m_270004____8288964____13397824 CNCC(=O)NCCCOCC1CCCO1 m_240690____7353008____7368582 CSCC(C)NCCCOCC1CCCO1 m_270004____8288964____15781876 C=CCCCNCCCOCC1CCCO1 m_270004____8288964____17507834 COCC(C)NCCCOCC1CCCO1 m_270004____8288964____8844972 C=C(Cl)C(=O)NCCCOCC1CCCO1 m_22____4836780____20839824 C1=NN=C(NCCCOCC2CCCO2)S1 s_27____134549____6894080 ClCCCCNCCCOCC1CCCO1 m_270004____8288964____24721168 CCNC(=O)NCCCOCC1CCCO1 m_2554____899626____951600 C(CNC1CCNC1)COCC1CCCO1 m_271302____10888820____10888454 NC1CC(NCCCOCC2CCCO2)C1 m_271302____10888820____15791668 C(CNC1=NCCN1)COCC1CCCO1 m_58668____2550466____2679600 C(CNCC1CCC1)COCC1CCCO1 m_270004____8288964____24715052 C[C@@H](O)C(=O)NCCCOCC1CCCO1 m_22____4836780____17541882 C(CNC1CCOC1)COCC1CCCO1 m_270004____8288964____8157394 C(CNCC1CNC1)COCC1CCCO1 m_271302____10888820____25575958 C1=CCC(NCCCOCC2CCCO2)C1 m_207____134553____26295706 [2H]C([2H])([2H])N(C)C(=O)NCCCOCC1CCCO1 m_2708____906386____23466438 C(CNCC1CCN1)COCC1CCCO1 m_271302____10888820____25576026 C1=COC(NCCCOCC2CCCO2)=N1 s_27____134549____11243710 CC(C)(N)CNCCCOCC1CCCO1 m_271302____10888820____8904400 NC1(CNCCCOCC2CCCO2)CC1 m_271302____10888820____25579862 CNS(=O)(=O)NCCCOCC1CCCO1 m_40____134551____9420296 ```

Not being familiar with the file, but that looks OK?

Also, do you see that issue with garbled written file with the simplified reproducer with generated data that I posted above?

jorisvandenbossche commented 2 months ago

(FWIW, the incremental open_csv function is single threaded, see https://arrow.apache.org/docs/python/csv.html#incremental-reading, so there should be no concurrent streams)

jorisvandenbossche commented 2 months ago

I also still see the segfault when using the restructured read/write code from https://github.com/apache/arrow/issues/43604#issuecomment-2275356020 (using the schema of the first batch to create the writer)

jorisvandenbossche commented 2 months ago

Further simplication (no need to also write the data out, just reading a subset is enough to trigger the segfault):

Generate some data:

import pyarrow as pa
from pyarrow import csv
from pyarrow.tests.util import rands

size = 500_000
random_strings = [rands(10) for _ in range(size//100)]*100
table = pa.table({"col1": range(size), "col2": random_strings})

with pa.CompressedOutputStream("data.csv.bz2", "bz2") as out:
    csv.write_csv(table, out)

Read part of the file:

import pyarrow as pa
from pyarrow import csv

input_file = "data.csv.bz2"
output_file = "test_out.csv"
num_chunks = 2

schema = pa.schema([('col1', pa.string()), ('col2', pa.string())])
convertopt = csv.ConvertOptions(column_types=schema)

with csv.open_csv(input_file, convert_options=convertopt) as reader:
    cnt = 0
    while cnt < num_chunks or num_chunks == 0:
        try:
            batch = reader.read_next_batch()
            cnt += 1
            print("Finished reading chunk ", cnt)
        except StopIteration:
            break

    print("Done. Closing file.")

Backtrace from running the above with gdb:

``` Finished reading chunk 1 Finished reading chunk 2 Done. Closing file. [Thread 0x7fff49c8f640 (LWP 1033591) exited] Thread 22 "python" received signal SIGSEGV, Segmentation fault. [Switching to Thread 0x7fff4a7ff640 (LWP 1033590)] 0x00007ffff604f87d in arrow::internal::Executor::Spawn, arrow::Future >, arrow::Result > >(arrow::Future >, bool)::{lambda(arrow::Result > const&)#2}::operator()(arrow::Result > const&)::{lambda()#1}>(arrow::internal::Executor::DoTransfer, arrow::Future >, arrow::Result > >(arrow::Future >, bool)::{lambda(arrow::Result > const&)#2}::operator()(arrow::Result > const&)::{lambda()#1}&&) (this=this@entry=0x5555560eec40, func=...) at /home/joris/scipy/repos/arrow/cpp/src/arrow/util/thread_pool.h:85 85 return SpawnReal(TaskHints{}, std::forward(func), StopToken::Unstoppable(), (gdb) bt #0 0x00007ffff604f87d in arrow::internal::Executor::Spawn, arrow::Future >, arrow::Result > >(arrow::Future >, bool)::{lambda(arrow::Result > const&)#2}::operator()(arrow::Result > const&)::{lambda()#1}>(arrow::internal::Executor::DoTransfer, arrow::Future >, arrow::Result > >(arrow::Future >, bool)::{lambda(arrow::Result > const&)#2}::operator()(arrow::Result > const&)::{lambda()#1}&&) (this=this@entry=0x5555560eec40, func=...) at /home/joris/scipy/repos/arrow/cpp/src/arrow/util/thread_pool.h:85 #1 0x00007ffff604fb83 in arrow::internal::Executor::DoTransfer, arrow::Future >, arrow::Result > >(arrow::Future >, bool)::{lambda(arrow::Result > const&)#2}::operator()(arrow::Result > const&) (result=..., __closure=0x7fff3c006e88) at /home/joris/scipy/repos/arrow/cpp/src/arrow/util/thread_pool.h:247 #2 arrow::Future >::WrapResultOnComplete::Callback, arrow::Future >, arrow::Result > >(arrow::Future >, bool)::{lambda(arrow::Result > const&)#2}>::operator()(arrow::FutureImpl const&) && ( impl=..., this=0x7fff3c006e88) at /home/joris/scipy/repos/arrow/cpp/src/arrow/util/future.h:442 #3 arrow::internal::FnOnce::FnImpl >::WrapResultOnComplete::Callback, arrow::Future >, arrow::Result > >(arrow::Future >, bool)::{lambda(arrow::Result > const&)#2}> >::invoke(arrow::FutureImpl const&) (this=0x7fff3c006e80, a#0=...) at /home/joris/scipy/repos/arrow/cpp/src/arrow/util/functional.h:152 #4 0x00007ffff6283c01 in arrow::internal::FnOnce::operator()(arrow::FutureImpl const&) && (a#0=..., this=0x7fff3c006170) at /home/joris/conda/envs/arrow-dev/x86_64-conda-linux-gnu/include/c++/12.3.0/bits/unique_ptr.h:191 #5 arrow::ConcreteFutureImpl::RunOrScheduleCallback (in_add_callback=, callback_record=..., self=...) at /home/joris/scipy/repos/arrow/cpp/src/arrow/util/future.cc:110 #6 arrow::ConcreteFutureImpl::RunOrScheduleCallback (self=..., callback_record=..., in_add_callback=) at /home/joris/scipy/repos/arrow/cpp/src/arrow/util/future.cc:100 #7 0x00007ffff6283fd5 in arrow::ConcreteFutureImpl::DoMarkFinishedOrFailed (this=, state=) at /home/joris/scipy/repos/arrow/cpp/src/arrow/util/future.cc:148 #8 0x00007ffff628428a in arrow::ConcreteFutureImpl::DoMarkFinished (this=) at /home/joris/scipy/repos/arrow/cpp/src/arrow/util/future.cc:39 #9 0x00007ffff604ed8c in arrow::Future >::DoMarkFinished (res=arrow::Result>(...), this=0x7fff4a7fecb0) at /home/joris/scipy/repos/arrow/cpp/src/arrow/util/future.h:658 #10 arrow::Future >::MarkFinished (this=this@entry=0x7fff4a7fecb0, res=arrow::Result>(...)) at /home/joris/scipy/repos/arrow/cpp/src/arrow/util/future.h:403 #11 0x00007ffff60563d6 in arrow::BackgroundGenerator >::WorkerTask (state=...) at /home/joris/scipy/repos/arrow/cpp/src/arrow/util/async_generator.h:1797 #12 0x00007ffff605686c in arrow::BackgroundGenerator >::State::DoRestartTask(std::shared_ptr >::State>, arrow::util::Mutex::Guard)::{lambda()#1}::operator()() const (__closure=) at /home/joris/scipy/repos/arrow/cpp/src/arrow/util/async_generator.h:1666 #13 arrow::internal::FnOnce::FnImpl >::State::DoRestartTask(std::shared_ptr >::State>, arrow::util::Mutex::Guard)::{lambda()#1}>::invoke() (this=) at /home/joris/scipy/repos/arrow/cpp/src/arrow/util/functional.h:152 #14 0x00007ffff62c568c in arrow::internal::FnOnce::operator()() && (this=0x7fff4a7fedc0) at /home/joris/conda/envs/arrow-dev/x86_64-conda-linux-gnu/include/c++/12.3.0/bits/unique_ptr.h:191 #15 arrow::internal::WorkerLoop (it=..., state=...) at /home/joris/scipy/repos/arrow/cpp/src/arrow/util/thread_pool.cc:457 #16 operator() (__closure=) at /home/joris/scipy/repos/arrow/cpp/src/arrow/util/thread_pool.cc:618 #17 std::__invoke_impl > (__f=...) at /home/joris/conda/envs/arrow-dev/x86_64-conda-linux-gnu/include/c++/12.3.0/bits/invoke.h:61 #18 std::__invoke > (__fn=...) at /home/joris/conda/envs/arrow-dev/x86_64-conda-linux-gnu/include/c++/12.3.0/bits/invoke.h:96 #19 std::thread::_Invoker > >::_M_invoke<0> (this=) at /home/joris/conda/envs/arrow-dev/x86_64-conda-linux-gnu/include/c++/12.3.0/bits/std_thread.h:279 #20 std::thread::_Invoker > >::operator() (this=) at /home/joris/conda/envs/arrow-dev/x86_64-conda-linux-gnu/include/c++/12.3.0/bits/std_thread.h:286 #21 std::thread::_State_impl > > >::_M_run(void) (this=) at /home/joris/conda/envs/arrow-dev/x86_64-conda-linux-gnu/include/c++/12.3.0/bits/std_thread.h:231 #22 0x00007ffff4cf0e95 in std::execute_native_thread_routine (__p=) at ../../../../../libstdc++-v3/src/c++11/thread.cc:104 #23 0x00007ffff7c94ac3 in start_thread (arg=) at ./nptl/pthread_create.c:442 #24 0x00007ffff7d26850 in clone3 () at ../sysdeps/unix/sysv/linux/x86_64/clone3.S:81 ```

The backtrace points to the threadpool / task spawning code (not very familiar with this part).

cc @pitrou

pitrou commented 2 months ago

Ok, I looked a bit into this and it's quite complex.

The crash happens at shutdown after the CSV streaming reader was destroyed. Since the streaming reader does some readahead by default (ReadOptions.use_threads = true), some read tasks are still running on the thread pool. This is driven by the ReadaheadGenerator which doesn't appear to be able to cleanup when all references to it are lost. @westonpace

pitrou commented 2 months ago

Hmm, I might have a fix actually.