apache / arrow

Apache Arrow is a multi-language toolbox for accelerated data interchange and in-memory processing
https://arrow.apache.org/
Apache License 2.0
14.31k stars 3.48k forks source link

[Python] BUG: Process hangs indefinitely on UnicodeDecodeError When use_threads=True in pyarrow.csv.read_csv #43892

Open dxdc opened 3 weeks ago

dxdc commented 3 weeks ago

Summary

When using pyarrow.csv.read_csv with ReadOptions(use_threads=True) and encountering a UnicodeDecodeError, Python hangs indefinitely during the shutdown process. This issue occurs consistently across multiple Python versions and pyarrow versions.

NOTE: I originally reported this here #43741 but now I have a working file that can be tested.

I hope that someone familiar with the internals of the pyarrow.csv module, particularly with the threading and shutdown procedures, can help identify and resolve this issue.

Steps to Reproduce

  1. Run the following Python script:

    import atexit
    import pyarrow.csv as pv
    
    @atexit.register
    def on_exit():
        print("Program exited successfully.")
    
    # setting use_threads to False does not hang python
    read_options = pv.ReadOptions(encoding="big5", use_threads=True)
    parse_options = pv.ParseOptions(delimiter="|")
    
    with open("sample.txt", "rb") as f:
        try:
            table = pv.read_csv(f, read_options=read_options, parse_options=parse_options)
        except Exception as e:
            print(f"An error occurred: {e}")
            raise
  2. Use a file (sample.txt) that contains data in an encoding (e.g., Big5, Shift-JIS) likely to trigger a UnicodeDecodeError.

    NOTE: Minor edits to this file result in the issue no longer being reproducible.

  3. Observe that the script prints "Program exited successfully." but then hangs indefinitely during the Python shutdown process.

Expected Behavior

The script should exit cleanly after execution, even if a UnicodeDecodeError occurs.

Actual Behavior

The script hangs indefinitely during the logging shutdown process after encountering a UnicodeDecodeError. This behavior is consistent when use_threads=True is set.

Output

The output includes a traceback ending with a UnicodeDecodeError, followed by a hang during the logging shutdown process. Below is the detailed Pdb step trace after the program exits:

File "pyarrow/_csv.pyx", line 1261, in pyarrow._csv.read_csv
File "pyarrow/_csv.pyx", line 1270, in pyarrow._csv.read_csv
File "pyarrow/error.pxi", line 155, in pyarrow.lib.pyarrow_internal_check_status
File "pyarrow/error.pxi", line 89, in pyarrow.lib.check_status
File "pyarrow/io.pxi", line 1973, in pyarrow.lib._cb_transform
File "pyarrow/io.pxi", line 2014, in pyarrow.lib.Transcoder.call
UnicodeDecodeError: 'big5' codec can't decode byte 0x96 in position 68: illegal multibyte sequence
Program exited successfully.
--Return--

/path/to/your/script.py(15)on_exit()->None
-> pdb.set_trace()
(Pdb) n
--Call--
/usr/lib/python3.11/logging/init.py(2170)shutdown()
-> def shutdown(handlerList=_handlerList):
(Pdb) n
/usr/lib/python3.11/logging/init.py(2177)shutdown()
-> print(handlerList)
(Pdb) n
[<weakref at 0x10ac5d210; to '_StderrHandler' at 0x10abf3690>]
/usr/lib/python3.11/logging/init.py(2178)shutdown()
-> for wr in reversed(handlerList[:]):
(Pdb) n
/usr/lib/python3.11/logging/init.py(2181)shutdown()
-> try:
(Pdb) n
/usr/lib/python3.11/logging/init.py(2182)shutdown()
-> h = wr()
(Pdb) n
/usr/lib/python3.11/logging/init.py(2183)shutdown()
-> if h:
(Pdb) n
/usr/lib/python3.11/logging/init.py(2184)shutdown()
-> try:
(Pdb) n
/usr/lib/python3.11/logging/init.py(2185)shutdown()
-> h.acquire()
(Pdb) n
/usr/lib/python3.11/logging/init.py(2186)shutdown()
-> h.flush()
(Pdb) n
/usr/lib/python3.11/logging/init.py(2187)shutdown()
-> h.close()
(Pdb) n
/usr/lib/python3.11/logging/init.py(2195)shutdown()
-> h.release()
(Pdb) n
/usr/lib/python3.11/logging/init.py(2178)shutdown()
-> for wr in reversed(handlerList[:]):
(Pdb) n
^C #### if I don't CTRL-C it hangs forever

Environment

Additional Information

Suggested Priority

High - The hang is significant as it prevents Python from exiting cleanly, which could impact various applications relying on pyarrow for multi-threaded CSV processing.

Please let me know if additional information is required.

Component(s)

Python

dxdc commented 4 days ago

@amol- @rok

any chance you're able to take a look at this issue?

It's very simple to reproduce. The most ultra-simple use case is running this stripped down script with this file:

import pyarrow.csv as pv

# setting use_threads to False does not hang python
read_options = pv.ReadOptions(encoding="big5", use_threads=True)
parse_options = pv.ParseOptions(delimiter="|")

with open("sample.txt", "rb") as f:
    table = pv.read_csv(f, read_options=read_options, parse_options=parse_options)

There is a bug with threads and pyarrow. I have now an additional file I can use for testing on my side. I'm also willing to dig into it if you have a sense of where the issue may lie.

amol- commented 4 days ago

It seems it might have something to do with the Read operation not getting properly aborted.

The TransformInputStream::Read method, doesn't do anything special to handle the case where the transformer has failed. So it isn't immediately obvious where the read operation would get aborted in such case.

The following patch seemed to fix the issue for me

diff --git a/cpp/src/arrow/io/transform.cc b/cpp/src/arrow/io/transform.cc
index 3fdf5a7a9..a8c40ee53 100644
--- a/cpp/src/arrow/io/transform.cc
+++ b/cpp/src/arrow/io/transform.cc
@@ -102,7 +102,11 @@ Result<int64_t> TransformInputStream::Read(int64_t nbytes, void* out) {
     const bool have_eof = (buf->size() == 0);
     // Even if EOF is met, let the transform function run a last time
     // (for example to flush internal buffers)
-    ARROW_ASSIGN_OR_RAISE(buf, impl_->transform_(std::move(buf)));
+    auto transform_status = impl_->transform_(std::move(buf));
+    if (!transform_status.ok()) {
+      RETURN_NOT_OK(this->Abort());
+      RETURN_NOT_OK(transform_status);
+    }
     avail_size += buf->size();
     avail.push_back(std::move(buf));
     if (have_eof) {

but someone who is more confident with the IO part of the codebase might have to check this more in detail

dxdc commented 4 days ago

@amol- Thanks for your quick analysis. I had a feeling the issue might be more complex within the repo, and it looks like your findings point in that direction.

On my end, setting use_threads = False seems to resolve the issue, so I believe it's rooted in thread management. I did manage to reproduce the error on a particular file that would fail intermittently, but no longer have access to it unfortunately.

amol- commented 4 days ago

On my end, setting use_threads = False seems to resolve the issue, so I believe it's rooted in thread management.

Yes, it does have to do with threading, or better with the threads getting hung up on waiting for some async future completing. As you can't exactly kill threads, on shutdown the threadpool has to gently ask to the threads to quit, but if a thread is stuck in some syscall (like waiting for a mutex) it will never notice that it has to quit and will hang there forever.

Yesterday evening I didn't have the time to investigate more closely the relationship between the components involved in async reading and the threadpool, but I'll try to get back to it as soon as I have time to.