Open wjzhou opened 6 months ago
Maybe adding a buffer layer here is better: https://arrow.apache.org/docs/python/generated/pyarrow.input_stream.html#pyarrow.input_stream ?
CompressInput just managers the decompress, rather than do-buffer for physical io
@mapleFU Thank you for provide the alternative.
The buffer_size
parameter in pyarrow_s3fs.open_input_stream(path_src, buffer_size=10_000_000)
will do the same thing as pyarrow_s3fs.open_input_stream
.
My point is,
gz
file instead of plain csv
file, the read speed would slow down by 10x, e.g. assuming 5ms latency with negligibale trasfering time, the 65K read will max out at 13M/s speed. For me, my reading slow down from around 100M/s to around 5M/s.The buffer_size parameter in pyarrow_s3fs.open_input_stream(path_src, buffer_size=10_000_000) will do the same thing as pyarrow_s3fs.open_input_stream.
Hmmm so the input buffer of s3fs is useless? It doesn't to underlying buffering
The default 65K might be a little bit too small for nowaday computers, e.g., when not using gz file, the default batch size for csv streaming is 1M,
I agree fixed size buffering is weird, but I think the CompressedInputStream's buffer-size is just "decompress-input-buffer-size" rather than "s3-io-size"
This can be add in C++ firstly and making a chunk-size as input argument asmax(kChunkSize, input-chunk-size)
? Or maybe try again with a input stream wrapped with a buffer_size: https://arrow.apache.org/docs/python/generated/pyarrow.input_stream.html#pyarrow-input-stream or f = pa.BufferedInputStream(raw, buffer_size=1024*1024)
?
set buffer_size=10_000_000
explicitly in pyarrow_s3fs.open_input_stream
works, just the default is None
Acturally, after thinking of it, I think the acturally problem is when CompressedInputStream is in use, it will convert a
a upper layer Read(1_000_000, out)
into multiply lower layer calling of Read(64 * 1024, out)
e.g.
Result<int64_t> Read(int64_t nbytes, void* out) {
...
while (nbytes - total_read > 0 && decompressor_has_data) {
...
ARROW_ASSIGN_OR_RAISE(decompressor_has_data, RefillDecompressed());
...
}
}
Result<bool> RefillDecompressed() {
...
RETURN_NOT_OK(EnsureCompressedData());
...
}
Status EnsureCompressedData() {
...
raw_->Read(kChunkSize,
compressed_for_non_zero_copy_->mutable_data_as<void>()));
// where kChunkSize is 64k
...
}
Or maybe try again with a input stream wrapped with a buffer_size: Once know the reason of the problem, the adding of
BufferedInputStream
is a good solution.
But for a new pyarrow user, it is hard to know that the pa.BufferedInputStream is needed.
For example, the following code is totally fine if the path_src is not a compressed file, it will issue one read to load the whole file
path_src = "file.csv"
with pyarrow_s3fs.open_input_stream(path_src) as f:
buff = f.read()
However, if the path_src = "file.csv.gz"
, the user needs to add the BufferedInputStream
wrapper in order to prevent the 64K ranged reading
So we should enhance the "csv" module, and adjust the io-size before opening the CompressedInputStream. We should do a minor refactor here: https://github.com/apache/arrow/blob/657c4faf21700c0899703a4759bde76235c38199/cpp/src/arrow/dataset/file_csv.cc#L278-L294
This also raised when it's a Json format. @pitrou @lidavidm should we change OpenCompressed
to WrapCompressed()
to wrap the input, or change it to OpenBufferedAndCompressed(std::optional<Compression>, std::optional<int64_t> blockSize)
?
The bottom line is that CompressedInputStream::kChunkSize
is inadequate for some input file backends (such as S3).
I think we should add a new InputStream
method that advertises a preferred chunk size:
/// \brief Return the preferred chunk size for reading at least `nbytes`
///
/// Different file backends have different performance characteristics
/// (especially on the latency / bandwidth spectrum).
/// This method informs the caller on a well-performing read size
/// for the given logical read size.
///
/// Implementations of this method are free to ignore the input `nbytes`
/// when computing the return value. The return value might be smaller,
/// larger or equal to the input value.
///
/// This method should be deterministic: multiple calls on the same object
/// with the same input argument will return the same value. Therefore,
/// calling it once on a given file should be sufficient.
///
/// There are two ways for callers to use this method:
/// 1) callers which support readahead into an internal buffer will
/// use the return value as a hint for their internal buffer's size;
/// 2) callers which require exact read sizes will use the return value as
/// an advisory chunk size when reading.
///
/// \param[in] nbytes the logical number of bytes desired by the caller
/// \return an advisory physical chunk size, in bytes
virtual int64_t preferred_read_size(int64_t nbytes) const;
Then the CompressedInputStream
implementation can call preferred_read_size
on its input to decide its compressed buffer size.
@felipecrv What do you think?
@pitrou Note that:
file_csv
will be S3 <-- Buffered <-- Csv reader
. Here it would provide block_size
io size for csv readerfile_csv
will be S3 <-- Compress <-- Buffered <-- Csv reader
. I prefer changing it to S3 <-- Buffered <-- Compress <-- Csv reader
- Otherwise, if compress enabled, the
file_csv
will beS3 <-- Compress <-- Buffered <-- Csv reader
. I prefer changing it toS3 <-- Buffered <-- Compress <-- Csv reader
Or we could even remove the additional buffering and have S3 <-- Compress <-- CSV reader
. I have forgotten why I added the buffered layer in this case...
Then the CompressedInputStream implementation can call preferred_read_size on its input to decide its compressed buffer size.
@felipecrv What do you think?
Could work well, but a more flexible alternative to this could be a Readable::Read
version that can mutate the size requested by caller to match what was actually read. I've seen this idea in the protobuf stream interfaces [1]. Or a min/max constraint to let the implementation decide the best fit [2] size_t read(void* buffer, size_t minBytes, size_t maxBytes);
[1] https://github.com/protocolbuffers/protobuf/blob/main/src/google/protobuf/io/zero_copy_stream.h [2] https://github.com/capnproto/capnproto/blob/d0c1ad5442831b1e441daa74622f3ea9f92a305c/c%2B%2B/src/kj/io.h#L41-L43
Or a min/max constraint to let the implementation decide the best fit [2]
size_t read(void* buffer, size_t minBytes, size_t maxBytes);
That's another possibility, but it forces the caller to allocate maxBytes
even if the result could be as small as minBytes
.
Note that those are not either/or. We could start with one and add the other if needed/desired.
Any way, a block_size
is config for input. Without compressed input, config would also being used. Wouldn;t we adapt by that?
That's another possibility, but it forces the caller to allocate
maxBytes
even if the result could be as small asminBytes
.
But the caller chooses maxBytes
:) The existing Read(n, ptr)
calls become Read(min_bytes, /*max_bytes=*/n, ptr)
where min_bytes
is the minimum amount of data that lets the caller make progress on whatever algorithm it's implementing.
The stream tries to fill buffer as much as possible if that is desirable like in the case of S3 calls.
Note that those are not either/or. We could start with one and add the other if needed/desired.
Which one you think is simpler?
But the caller chooses
maxBytes
:)
Right, but they have no idea what value could be reasonable. The caller could think "ok, I know that some filesystems really like large chunk sizes, so I'm going to allocate 100 MiB", only to get 64 kiB as result.
Or conversely, the caller could think "ok, let's be conservative so as not to waste memory unduly, so I'm going to allocate 512 kiB", and S3 will perform awfully.
The point here is to let the file implementation inform the caller before making any impacting decision (such as allocating a large memory area).
Which one you think is simpler?
Both are simple conceptually and should be simple implementation-wise. They have different implications though.
Anyway, I think shouldn't this:
[S3 <-- Buffered] <-- Compress <-- Csv reader
for this
Describe the bug, including details regarding any error messages, version, and platform.
I'm using pyarrow.csv.open_csv to stream read a 15GB gz csv file over S3. The speed is unusable slow.
Here, even when I set the block_size=5_000_000, the reader are issuing 65K ranged read over S3.
This is bad for two reason:
After digging into the code, I find this
kChunkSize
is hard coded in CompressedInputStream https://github.com/apache/arrow/blob/f6127a6d18af12ce18a0b8b1eac02346721cc399/cpp/src/arrow/io/compressed.cc#L432Currently, my workaround is to use buffered stream
file_src = pyarrow_s3fs.open_input_stream(path_src, buffer_size=10_000_000)
But it is not obvious from the doc. Could we set this value higher? Or at least add some doc to clarify the usage.
Component(s)
C++, Python