tensorflow / io

Dataset, streaming, and file system extensions maintained by TensorFlow SIG-IO
Apache License 2.0
706 stars 287 forks source link

Exception when not fully consuming generator/iterator that feeds into ArrowStreamDataset #1792

Open kszlim opened 1 year ago

kszlim commented 1 year ago

Repro code:

import tensorflow as tf
import tensorflow_io.arrow as arrow_io
import pyarrow as pa

if __name__ == "__main__":
    data = pa.array(list(range(5000)), type=pa.float32())
    names = ["data"]
    batches = (pa.RecordBatch.from_arrays([data], names=names) for _ in range(100))
    ds = arrow_io.ArrowStreamDataset.from_record_batches(batches, output_types=(tf.float32,), batch_size=5000, batch_mode="drop_remainder")
    for sample in ds.take(10):
        print(sample)

Exception in thread Thread-1 (run_server): Traceback (most recent call last): File ".../venv/lib/python3.10/site-packages/tensorflow_io/python/ops/arrow_dataset_ops.py", line 572, in run_server writer.write_batch(batch) File "pyarrow/ipc.pxi", line 484, in pyarrow.lib._CRecordBatchWriter.write_batch File "/usr/lib/python3.10/socket.py", line 723, in write return self._sock.send(b) BrokenPipeError: [Errno 32] Broken pipe

During handling of the above exception, another exception occurred:

Traceback (most recent call last): File "/usr/lib/python3.10/threading.py", line 1016, in _bootstrap_inner self.run() File "/usr/lib/python3.10/threading.py", line 953, in run self._target(*self._args, **self._kwargs) File ".../venv/lib/python3.10/site-packages/tensorflow_io/python/ops/arrow_dataset_ops.py", line 578, in run_server outfile.close() File "/usr/lib/python3.10/socket.py", line 723, in write return self._sock.send(b) BrokenPipeError: [Errno 32] Broken pipe

kszlim commented 1 year ago

Anyone run into this?