piskvorky / smart_open

Utils for streaming large files (S3, HDFS, gzip, bz2...)
MIT License
3.22k stars 382 forks source link

Ranged read http header leaves TO bytes blank, causing S3 filesystems to read full file length #725

Open davidparks21 opened 2 years ago

davidparks21 commented 2 years ago

Problem description

smart_open is effectively crashing SeaweedFS/S3 and Ceph/S3 filesystems when doing many small reads over large files (ex: 32k read on a 4GB file).

On a SeaweedFS/S3 filesystem (also demonstrated on Ceph/S3), using the code shown in the reproduce section below, I am reading a small segment of data (32k in this example) from a large file (4GB in this example). This simple request causes SeaweedFS to move the full file for just a 32k range read. It appears that this is expected behavior based on a reading of the protocol specifications. Notably boto3 does not trigger the same behavior.

When I run the code and look at the HTTP traffic being generated we see the following GET request:

GET /hengenlab/CAF77/Neural_Data/highpass_750/Headstages_256_Channels_int16_2021-02-02_14-28-24.bin HTTP/1.1
Host: seaweed-filer.seaweedfs:8333
Accept-Encoding: identity
Range: bytes=0-
User-Agent: Boto3/1.24.59 Python/3.8.2 Linux/5.4.0-125-generic Botocore/1.27.59
X-Amz-Date: 20220921T204638Z
X-Amz-Content-SHA256: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855
Authorization: AWS4-HMAC-SHA256 Credential=KL2PPBIZ4OYKR420C28D/20220921/us-east-1/s3/aws4_request, SignedHeaders=host;range;x-amz-content-sha256;x-amz-date, Signature=8009c4fdc85311977066c6988047a72658579e02c02b544fa8d48d8d8b9e8d57
amz-sdk-invocation-id: eace9bc1-a1d1-4244-84ee-1caa164bc294
amz-sdk-request: attempt=1

Notably Range: bytes=0- is our culprit. My assumption is that smart_open intends to open the file for streaming and read data from the stream as dictated by calls to f.read(...).

When performing a ranged read with just boto3 code the header looks like this:

GET /hengenlab/CAF77/Neural_Data/highpass_750/Headstages_256_Channels_int16_2021-02-02_14-28-24.bin?partNumber=1 HTTP/1.1
Host: seaweed-filer.seaweedfs:8333
Accept-Encoding: identity
Range: bytes=1-32768
User-Agent: Boto3/1.24.59 Python/3.8.2 Linux/5.4.0-125-generic Botocore/1.27.59
X-Amz-Date: 20220921T205137Z
X-Amz-Content-SHA256: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855
Authorization: AWS4-HMAC-SHA256 Credential=KL2PPBIZ4OYKR420C28D/20220921/us-east-1/s3/aws4_request, SignedHeaders=host;range;x-amz-content-sha256;x-amz-date, Signature=5d3398396c217f4a87284479bc6bc947344c256a30552fe98b6057167d7143fb
amz-sdk-invocation-id: 489f165b-cb4d-4ca0-bc49-cc1e70618518
amz-sdk-request: attempt=1

Using boto3 does not cause any issue. With smart_open the SeaweedFS/S3 filesystem is interpreting the lack of a to-bytes value as the full file. It is then moving the full (4 GB) data file from a volume server to an S3 server, where just 32k are passed to the end user job. This has the effect of very quickly oversaturating the network.

The protocol specifications seem to agree that the behavior by SeaweedFS/S3 is the correct way to interpret this Range header. E.g. how can the filesystem know that the user won't need to read the whole file given this header.

If the last-byte-pos value is absent, or if the value is greater than or equal to the current length of the entity-body, last-byte-pos is taken to be equal to one less than the current length of the entity- body in bytes.

https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35

Steps/code to reproduce the problem

This code triggers the full (4 GB) file transfer (internally, not to the end user application) for a small 32k ranged read.

import import smart_open

while True:
    with smart_open.open('s3://bucket/path/to/4gb/file.bin', 'rb') as f:
        b = f.read(32768)
        print(f'Read {len(b)}')

This boto3 version of the code does not trigger the same issue:

import boto3

while True:
    obj = boto3.resource('s3', endpoint_url='https://swfs-s3.endpoint').Object('bucket', 'path/to/4gb/file.bin')
    stream = obj.get(Range='bytes=1-32768')['Body']
    res = stream.read()
    print(f'Read: {len(res)} bytes')

I'd like to open a discussion to determine how to properly interpret the S3 protocol and figure out whether an issue like this should be in the domain of the filesystem(s) or should be changed in smart_open.

Versions

>>> print(platform.platform())
Linux-5.19.0-76051900-generic-x86_64-with-glibc2.17
>>> print("Python", sys.version)
Python 3.8.10 (default, Jun  4 2021, 15:09:15) 
[GCC 7.5.0]
>>> print("smart_open", smart_open.__version__)
smart_open 6.0.0
davidparks21 commented 2 years ago

I've been looking through the smart_open code today to find a solution to this problem. I notice that the range is set by default in Reader.__init__() where a seek(0) is run. This seek doesn't have the to-bytes information so the range header is set to bytes=from- at this point. There is an option in transport_params to defer_seek=True which skips this seek.

If you skip that first seek, the first call to read() will end up setting the same range:

class _SeekableRawReader(object):
    # ...
    def read(self, size=-1):
        """Read from the continuous connection with the remote peer."""
        if self._body is None:
            # This is necessary for the very first read() after __init__().
            self._open_body()

self._open_body() is called without parameters, which has the same effect of setting bytes=from-.

I guess the most obvious solution then is to set the to-bytes in the call to self._open_body() and to use the defer_seek transport param. I've tested that this works and solved the problem in SeaweedFS.

However, I question why defer_seek defaults to False, or why it's there at all if the seek operation is going to effectively crash at least 2 major S3 filesystems (Ceph and SeaweedFS tested). So I'd still like to discuss how best to proceed. I'm happy to put in a pull request for this.

davidparks21 commented 2 years ago

Here's the effect on SeaweedFS, just to elucidate the effect this issue has. At about 04:00 I was running a continuous loop reading 100 bytes from a 4 GB file, it issued a few calls per second. No appreciable bandwidth reported during that time. This was using the patch discussed above along with defer_seek=True in transport params.

At 4:10 I simply changed defer_seek=False and ran the same loop reading 100 bytes. You can see the effect that just a few 100-byte reads per second has, bandwidth into SeaweedFS immediately jumps to over 5GB/sec because on each request the full file is being transferred from storage to the server that is servicing the S3 request (which ultimately discards the 4GB file and serves just the 100 bytes requested).

image

Here's the effect on a Ceph cluster that was (nearly) unused for the same test.

image

Ceph does handle the situation better. The example above was running about 3 reads/s in a loop and reading 200k bytes, so well under 1 MB/sec of end-user IO. But Ceph monitoring shows a throughput of 150 MB/sec, two orders of magnitude over what is actually being requested by the user. Ceph doesn't become inoperable as SWFS does at the scale of this test, but it drastically limits scalability when we get to higher performance IO (example: high IO ML jobs).

cadnce commented 2 years ago

Nice hunting champ! 👑

I was looking where the defer_seek was introduced and why it defaults to false. Looks like this was it - https://github.com/RaRe-Technologies/smart_open/pull/495

If I’m reading it correctly it seems like it just there to prevent unexpected behaviour when accessing content_length or validating if the object exists before actually checking the object exists in storage.

davidparks21 commented 2 years ago

So I only succeeded in solving the problem for a single read less than the buffer size (~132k by default). There's a more fundamental issue at play.

smart_open is opening the file for streaming which explicitly does NOT generate multiple HTTP requests. Instead multiple calls to f.read(...) keep pulling data from the StreamingBody on the same open HTTP request. This has the obvious benefit of reducing network overhead (avoiding repeat HTTP headers). However, in that scenaro there is no way for smart_open to set an appropriate range for the file at the time it's opened without the user explicitly restricting the file to a specific range. This has the negative side effect of forcing the filesystem to queue up the full file (SeaweedFS/S3) or a large chunk of it (Ceph/S3).

So we're damned if we do and damned if we don't.

One solution is to give the user the ability to specify a range in the open request. If they fail to do this then we fall back on multiple HTTP requests which can set the range correctly.

Another solution from the filesystem would be for the filesystem to only queue up what it needs as bytes are read. While we might be able to get one filesystem to implement this, ensuring all S3 filesystems operate that way is probably wishful thinking.

I can't think of another way to handle this situation in a way that both protects the filesystem and keeps network overhead to a minimum. Can anyone else?

davidparks21 commented 2 years ago

After sleeping on this here's the solution I propose implementing (I'm working towards this and a pull request).

We have two use cases that are at odds here:

Use case 1: The file is opened for streaming and many read calls will be performed. This is common if you open a file and pass it to many underlying libraries which were optimized for local IO access. With many small reads of the file, repeating the HTTP headers would be an unacceptable overhead, it would also translate into an explosion of IOPS, which would not be good for the S3 filesystem either.

Use case 2: A single small ranged read of a large file. This use case is common in many applications such as machine learning, where a large binary data file contains many small samples. The file is opened, read once, and closed, often at a very high frequency. For example, my use case is to read up to 20 samples/s of 25 Mb segments of a multi-GB file, up to 500 MB/s, to feed a single GPU, and I often run 10's of GPUs. In this use case, the overhead of an unspecified byte range is a showstopper because it doesn't allow the S3 filesystem to optimize the internal queuing of the file.

These two use cases are fundamentally at odds, so any solution is a balancing act between them. I propose we remove defer_seek from transport_params, it's unnecessary (see comment below), and add a transport_params called optimize_reads which takes on one of three user-specified values: auto (default), optimize_for_streaming, and optimize_for_partial_reads.

Description of these modes:

Tasks necessary to implement this (assuming no one raises a concern here then I'm working on a pull request with these changes):

1) seek will not open the body, it will only update the position. 2) Remove defer_seek (deprecate and make it a noop), it's unnecessary when seek doesn't open the body. There are 14 points in s3.py where self._content_length is referenced. Those 14 points need to all support a potential None value. This is not unreasonable. 3) Add optimize_reads to transport_params as discussed above. 4) Add a read call counter to Reader / _SeekableRawReader to support the auto option described above. 5) Add a conditional step in Reader / _SeekableRawReader that closes the body when optimize_for_partial_reads == True or (auto == True and read_call_counter > 1).

mpenkov commented 2 years ago

Sounds reasonable, but let's use shorter values for the options, e.g. optimize: [reading|streaming|auto]

@jcushman Can you comment on the defer seek part?

mpenkov commented 2 years ago

After thinking about this some more, I can think of another alternative. We can specify the range to always read at most N bytes, where N is configurable via transport parameters. For the streaming case, N would be sufficiently large, e.g. several MB (we could make this the default). For the reading case, it would be smaller, e.g. several kB. I think this would satisfy both use cases.

I think this would be simpler than the optimize for partial reads/streaming/auto option. What's your opinion?

davidparks21 commented 2 years ago

This works as long as the first read request still sets the first byte range to exactly what is requested in that first read. This covers the use case of open, single-read, close where even a modest inefficiency in byte-range would reduce scalability when pushing the system to its limits, such as occurs in ML.

I like your solution better for the streaming case. Although it necessitates more HTTP headers, setting the value to a reasonably large number (in the MB range, maybe 10 MB is good?), limits the damage that can be done if the file is obscenely large. In fact, looking at the Ceph / SeaweedFS comparison here we can see that most likely Ceph is already implementing similar logic in their S3 protocol (we see 1 MB/s of 200k end-user reads actually transfer 150 MB/s internally), which means Ceph doesn't crash under current conditions, it just performs sub-optimally. SeaweedFS doesn't have this logic and internally queues a few orders of magnitude more data. Setting this limit would let smart_open ensure that filesystems like SeaweedFS operate more like Ceph in the worst-case scenario (e.g. multiple reads of a small segment of a massive file).

So then I'm implementing this set of changes:

  1. seek will not open the body, it will only update the position, a call to seek will still close an open body as it does now.
  2. Remove defer_seek (do you prefer deprecating and making defer_seek a noop or simple removal?), it's unnecessary when seek doesn't open the body. There are 14 points in s3.py where self._content_length is referenced. Those 14 points need to all support a potential None value. This appears manageable.
  3. Add stream_size (naming preference?) to transport_params, default to 10 MB, 10485760 bytes(?).
  4. Add tracking of byte-range specified by stream_size to _SeekableRawReader, close the body when a read violates the range.
  5. On first call to read always set the byte-range header to exactly the read size requested. The second and subsequent calls to read follow 4 above.
mpenkov commented 2 years ago

Yes, that makes sense.

  1. Seeking should not close the body when the seek position is within the buffer
  2. I think we can remove it, and provide documentation for people who may have been using that parameter.
  3. 10MB default makes sense. Maybe call it download_bufsize?
  4. Close the old body and open a new body to satisfy the read request, right?
  5. Yes

Sounds like a fair bit of work, have fun ;)

davidparks21 commented 2 years ago

A few details:

  1. Seek does need to close the body even within the 10 MB range, that range is not a buffer that has been downloaded, it's an open HTTP request that is streaming bytes, it has no seek functionality.
  2. Ok.
  3. The term buf doesn't quite fit since it's not actually a read buffer (we're not actually downloading anything in that 10 MB range until the user requests it). It's a range limit that is making a guarantee to the S3 server that we won't exceed that range on this HTTP request. There is already a buffer_size of DEFAULT_BUFFER_SIZE = 128 * 1024 minimum read size which is already implemented, I'm not changing that. We want to distinguish between the two as much as possible. We could use something like stream_range?
  4. Yes.

It is a bit of work, but I would have built this functionality myself anyway, so I'm much happier having it in a widely supported platform. Plus the two smart_open/S3 writer classes don't need any change so not having to deal with that means this is a lot less work than if I went it alone. I'm quite a fan of what you've done with smart_open, it's really a nice solution for remote filesystem access. I'm pretty far along already, I'm hoping to have something presentable in the next day or two.

davidparks21 commented 2 years ago

Actually, while I'm at it, the DEFAULT_BUFFER_SIZE of 128k feels a bit arbitrary. That buffer doesn't reduce network overhead since the streaming body is staying open, it's just forcing a minimum, blocking, read-ahead. It seems like a better choice for a DEFAULT_BUFFER_SIZE would be the common ethernet MTU (minimum transmission unit) of 1500, meaning that we read ahead only what would most likely fit in a single internet-bound network packet. Anything above that seems sub-optimal to me.

cadnce commented 2 years ago

Now I’m a big of smart_open 💯. but the other day discovered s3fs, gcsfs and fsspec. I wonder how that would deal with this problem. Admittedly their interfaces are not as nice.

But almost makes me wonder if smart open would just become the magical wrapper interface. Or if that takes too much from what this project was doing. 🤷‍♀️

davidparks21 commented 2 years ago

I doubt the fuse-based filesystem will be especially efficient for streaming and dealing with high-performance reads of large files, but I haven't really put it to the test, I vaguely recall testing it out a few years ago but that didn't go anywhere. I'm not using GCP, so no comment on gcsfs, but it's only one filesystem, having the multi-filesystem back end of smart_open is nice. And I don't know about fsspec, looks interesting, but the github page didn't look too inspired. I tried using pyfilesystem a long while back, but they heavily prioritized a consistent interface over performance and I found both S3 and their tar filesystem sorely lacking on the performance side with no indication they were willing to address the issues. smart_open is the only one I'm aware of that provides a cross-filesystem framework that is flexible enough to deal with high-performance demands, and the drop in replacement for system open is a major value prop.

davidparks21 commented 2 years ago

I've finished a first pass at this optimization. We've got a clean set of unit tests (your existing unit tests were fantastic and a huge help in catching edge cases). I kept all your unit tests except those specific to defer_seek or _SeekableRawReader. I added a few unit tests to check the details of the stream_range addition. I also updated a few unrelated unit tests that were failing because they were not cleaning up after themselves.

It ended up being nearly a full re-write of the s3.Reader class. I simplified things some by removing the _SeekableRawReader and incorporating that functionality into the Reader class.

The commit is currently on my fork at the link below. I could test it under load in our lab for a while before passing over a pull request, or I can send over a pull request on the basis of passing unit tests now, let me know what works best for you.

https://github.com/RaRe-Technologies/smart_open/commit/414bd948b50c217e729b35e82504d5852f8fbb1e

davidparks21 commented 2 years ago

An in-the-wild load test looks pretty good. I'm running two GPU based ML jobs performing small random reads over a 2TB dataset (3.5 GB data file sizes). Namespace bandwidth (top) shows end-user data received by the jobs, and Ceph/S3 bandwidth (bottom) shows what Ceph is reporting (this is on an otherwise unused S3 region, independent of the cluster main filesystem). Ceph throughput is reporting ~350 MB/sec vs. ~650 MB/sec end-user received throughput which is ideal, the difference is due to cache hits in front of Ceph/S3. This same job before the update would have brought Ceph/S3 to its knees.

cadnce commented 2 years ago

That looks good! What are the actual file types you’re reading, parquet?

davidparks21 commented 2 years ago

That looks good! What are the actual file types you’re reading, parquet?

These are raw binary electrophysiology data, 25 khz recordings of electrical potential at 512 recording sites in freely behaving mice, capable of resolving individual neuron spiking, a la Neuralink style (probably slightly higher resolution here). The format is a proprietary Whitematter/eCube format, but it's just a binary dump of int16's. I'm at UC Santa Cruz and do this jointly with Wash U. in St. Louis, with the multi-institution cluster run by UCSD.

cadnce commented 2 years ago

That looks good! What are the actual file types you’re reading, parquet?

These are raw binary electrophysiology data, 25 khz recordings of electrical potential at 512 recording sites in freely behaving mice, capable of resolving individual neuron spiking, a la Neuralink style (probably slightly higher resolution here). The format is a proprietary Whitematter/eCube format, but it's just a binary dump of int16's. I'm at UC Santa Cruz and do this jointly with Wash U. in St. Louis, with the multi-institution cluster run by UCSD.

Fascinating, thanks for the reply, sounds like you have some interesting research going on. Reason for asking was I half assumed it was a standard format Like parquet or ORC and that you’d actually be expecting 2 small reads (footer then row group header) followed by some long reads

davidparks21 commented 2 years ago

I briefly considered being more predictive about this kind of pattern, but there wasn't a readily obvious way to do that. At this point it would be easy to implement more predictive logic just by manipulating the stream_range value at the top of the read call where you make a prediction about what the future will be. You could tighten the bounds or relax them based on your prediction just by setting that value and you would influence how much data the S3 server is queuing up behind the scenes next time the user seeks or reads beyond the current range and needs a new HTTP request. The hard part would be defining the logic and characterizing its accuracy in the wild. Maybe this has been done somewhere?

Depending on the recording rig we do use formats like HDF5 (we support 5 different hardware platforms in our code base). HDF5 does a lot of index scanning then a ton of sequential reads which are a terror for trying to stream over a high-latency/high-bandwidth S3 link (it's impossible to set the user block size optimally for all use cases). In this situation my general-purpose solution is to provide a data transformation service that reformats the data from its original form (typically optimized for write performance and anti-optimized for read performance) into a pure binary format explicitly optimized for the particular use case at hand (the user specifies their requirements in the data transformation step). This data transformation step allows me to return to the optimal open-read-close paradigm when operating at scale.