aiidateam / disk-objectstore

An implementation of an efficient "object store" (actually, a key-value store) writing files on disk and not requiring a running server
https://disk-objectstore.readthedocs.io
MIT License
16 stars 8 forks source link

Implement `whence=2` for the `utils.PackedObjectReader.seek` method #136

Closed sphuber closed 1 year ago

sphuber commented 2 years ago

This value for whence is used for example by the constructor of the File class of the h5py package. Currently users will have to implement a workaround with first reading the file entirely into memory or write to disk to a temporary file.

giovannipizzi commented 2 years ago

The issue with implementing this is that, for compressed objects, seeking backwards cannot be implemented efficiently (at least not in the easy manner): every time you seek backwards, you need to get the total size, convert into a positive offset, and seek back from the beginning (i.e., decompress again the whole file).

I suggest creating a mid-size HDF5 file and checking performance of random access in the file:

I think there might be slightly more clever ways to do it - like keeping a "cache" of where to restart from in the compressed stream, e.g. every 1MB or so, that needs to include the uncompressed and compressed offsets, and the part of the buffer that the decompresser needs from the previous chunk of file. But this might require some testing and performance tuning.

giovannipizzi commented 2 years ago

(i.e.: let's merge it only once we know that the performance hit is reasonable, I think)

sphuber commented 2 years ago

(i.e.: let's merge it only once we know that the performance hit is reasonable, I think)

I would say that anything that is more efficient than fully copying the file to a temporary file on the local file system first, would be a win already, wouldn't it?

zhubonan commented 2 years ago

I think in any case having large HDF5 file stored as compressed may hinder the read performence. Would it even be possible to read a compressed stream from a offset in the middle?

ramirezfranciscof commented 2 years ago

This value for whence is used for example by the constructor of the File class of the h5py package.

Is there any reference to understand what exactly this does? I tried the h5py docs but did not find anything related to the whence keyword.

sphuber commented 2 years ago

whence is a keyword of the seek method on a file handle: https://docs.python.org/3/library/io.html#io.IOBase.seek . It determines with respect to what you are seeking. The File constructor must be doing something like:

class File

    def __init__(self, handle):
        handle.seek(10, whence=2) # Seek to 10 bytes from the end of the stream
giovannipizzi commented 2 years ago

I would say that anything that is more efficient than fully copying the file to a temporary file on the local file system first, would be a win already, wouldn't it?

I'm not sure. Imagine you read the file backwards, 1 byte at a time (with N bytes). If you extract the file, you uncompress once and then read backwards (relatively quickly). If you don't, you will uncompress N bytes to read the last one, then N-1 bytes to read the second byte (from the end), then N-2, .... This means you decompress O(N^2) bytes that I'm sure at some (not so large) file size starts to be worse than extracting it and storing it.

sphuber commented 2 years ago

Just out of interest, because I have never really looked into detail how these compression algorithms work, but where do you get this heuristic that to read the last byte of N bytes, you need to decompress just those N bytes. Wouldn't this also be dependent on the algorithm with which it was compressed. Is it even possible for all algorithms to arbitrarily decompress a subchunk of the compressed stream? I seem to vaguely remember that to decompress you would have to start at the beginning, or some point, for it all to make sense.

Anyway, when I opened the issue I went back in the history of the original implementation of whence=1 to see why I also didn't do whence=2 because I was sure we had discussed this and there was a reason, but I couldn't find any (not in commits, nor in issue/PR). But you clearly highlighted the reason why. Maybe it is indeed best to leave this not implemented and just document that the technique of copying the entire file to a temp file on disk is the work around.

zhubonan commented 2 years ago

pinging @espenfl as the vasprun parser used by aiida-vasp also uses whence=2. This makes it not possible to re-parse results from packed objects.

zhubonan commented 2 years ago

Anyway, when I opened the issue I went back in the history of the original implementation of whence=1 to see why I also didn't do whence=2 because I was sure we had discussed this and there was a reason, but I couldn't find any (not in commits, nor in issue/PR). But you clearly highlighted the reason why. Maybe it is indeed best to leave this not implemented and just document that the technique of copying the entire file to a temp file on disk is the work around.

I think the directly implementing whence=2 would be difficult for compressed stream. However, the PackedObjectReader is just for reading the bytes as they are, the decompression is actually done by ZlibLikeBaseStreamDecompresser that wraps it (reading from packed compressed file). So there is no limitation for enabling whence=2 for PackedObjectReader I think?

To get around the problem with compressed steam. I think copying to in memory buffer also works for smaller files. Perhaps we can implement some kind of logic:

  1. If the object is small, just copy it to an in memory buffer
  2. If the object is large, then we unpack the object into the loose folder, and read from the loose file. Of course, if such file exists then we can open it directly. The loose file will then be deleted in the next mantainance.

But this should be implemented at ZlibLikeBaseStreamDecompresser level not PackedObjectReader I think. We might have to pass additional information to ZlibLikeBaseStreamDecompresser to make it work.

giovannipizzi commented 1 year ago

As a future reference to myself. I was thinking to keep a track of a copy of some decompressers object in memory, e.g. every 1MB or 10MB, and when seeking only uncompress from the closest previous decompresser object.

I tried this implementation: but for the few tests I did, it looks like this is actually not very fast (or at least, does not really make things very fast for a lot of random access).

For future ref, I started from 8e4db6c306fdd5e9762ed82a1db3fbea3e186f3e

and I did these changes:

diff --git a/disk_objectstore/utils.py b/disk_objectstore/utils.py
index 0dcf5c6..121f219 100644
--- a/disk_objectstore/utils.py
+++ b/disk_objectstore/utils.py
@@ -641,6 +641,7 @@ class ZlibLikeBaseStreamDecompresser(abc.ABC):
     """

     _CHUNKSIZE = 524288
+    _BUFFER_CACHE_SIZE = 1 * 1024 * 1024 ## Should be larger than the _CHUNKSIZE

     def __init__(self, compressed_stream: StreamSeekBytesType) -> None:
         """Create the class from a given compressed bytestream.
@@ -653,6 +654,9 @@ class ZlibLikeBaseStreamDecompresser(abc.ABC):
         self._internal_buffer = b""
         self._pos = 0

+        self._cache_unconsumed = {0: (0, self._decompressor.copy())}
+        self._last_cache_unconsumed = b"" # Only set when manually seeking
+
     @property
     def mode(self) -> str:
         return getattr(self._compressed_stream, "mode", "rb")
@@ -713,6 +717,11 @@ class ZlibLikeBaseStreamDecompresser(abc.ABC):
                     "the compressed stream: there must be a problem in the incoming buffer"
                 )

+            caching_position = ((self._pos + len(self._internal_buffer)) // self._BUFFER_CACHE_SIZE) * self._BUFFER_CACHE_SIZE
+            if caching_position > max(self._cache_unconsumed.keys()):
+                self._cache_unconsumed[self._pos + len(self._internal_buffer)] = (self._compressed_stream.tell(), self._decompressor.copy())
+                data = self._cache_unconsumed[self._pos + len(self._internal_buffer)]
+
         # Note that we could be here also with len(self._internal_buffer) < size,
         # if we used 'break' because the internal buffer reached EOF.
         to_return, self._internal_buffer = (
@@ -783,10 +792,23 @@ class ZlibLikeBaseStreamDecompresser(abc.ABC):
             self._pos = 0
             return 0

-        if target < self.tell():
-            # I cannot go backwards. In this case, I am forced to go back to zero and restart
-            # (I always know how to go back to zero). Otherwise, I just continue from where I am.
-            self.seek(0)
+        # I cannot go backwards, and in general, if I seek forward, I try to
+        # skip anything I don't really need.
+        # In this case, I go back to the latest cached position before 
+        # the current one (worst case is zero, setup in the __init__).
+        cached_positions = sorted(self._cache_unconsumed)
+        for pos in cached_positions:
+            if pos <= target:
+                cache_pos = pos
+            else:
+                break
+
+        self._pos = cache_pos
+        self._internal_buffer = b""
+        compressed_stream_pos, decompressor = self._cache_unconsumed[cache_pos]
+        self._decompressor = decompressor.copy() # I need to copy again otherwise it will fail
+        self._compressed_stream.seek(compressed_stream_pos)
+        #print(f'seek, {target=}, {cache_pos=}, need to decompress only {target-cache_pos}')

         # Read target bytes, but at most `read_chunk_size` at a time to avoid memory overflow
         while self.tell() < target:

The main run script is

#!/usr/bin/env python
import os
import random
import time
import string

from disk_objectstore import Container

RANDOM_BYTES = True # If False, only use ASCII characters
TOT_SIZE = 100_000_000
#TOT_SIZE = 1_000
piece_length = 10
num_pieces = 10000
COMPRESS = True

def get_random_string(length):
    letters = (string.ascii_letters + string.digits)
    return ''.join(random.choice(letters) for i in range(length)).encode()

def main():
    print("Creating new empty container")
    container = Container("/tmp/test-container")
    container.init_container(clear=True)

    print(f"Generating {TOT_SIZE / 1024 / 1024:.2f} MB file in memory")
    if RANDOM_BYTES:
        content = os.urandom(TOT_SIZE)
    else:
        content = get_random_string(TOT_SIZE)
        #print(content)

    # Store a few small random sequence of bytes in random positions, to be
    # checked later
    to_check = []

    print(f"Fetching {num_pieces} random pieces for later check")
    for piece_idx in range(num_pieces):
        offset = random.randint(0, TOT_SIZE - 1)
        to_check.append((offset, content[offset:offset+piece_length]))

    print("Adding object directly to pack, compressed")        
    hashkey = container.add_objects_to_pack([content], compress=COMPRESS)[0]

    print("Running speed test for all pieces")
    t = time.monotonic()
    with container.get_object_stream(hashkey) as stream:
        stream.read()
        print(f"Time to uncompress all: {time.monotonic() - t} s")
        t = time.monotonic()
        stream.seek(0)
        #print(stream._cache_unconsumed)
        for offset, check_val in to_check:
            stream.seek(offset, 0)
            piece = stream.read(piece_length)
            assert piece == check_val, f"{piece=} {check_val=}"
        #print(stream._cache_unconsumed)
    print(f"Time to read {num_pieces=} random-offset {piece_length}-bytes pieces: {time.monotonic() - t} s")

if __name__ == "__main__":
    main()  # pylint: disable=no-value-for-parameter

The results show that (at least this implementation) is to very convenient: for a 95.37 MB file, uncompressing (in memory) takes ~0.18s, but read num_pieces=10000 with random access takes ~7.2 s. (As a comparison, if it's uncompressed it takes 0.08s). So indeed the approach suggested by @zhubonan is better - we uncompress the file and cache it on disk, then random access is fast. The only case where this is not convenient is if when one just wants to read the first few bytes and ignore the rest.

For reference, the time to decompress seems to be heavily dependent on the data. E.g. if I use random data (incompressible, actually it might take more space), fetching 1000 random pieces from a 9.54 MB file takes ~0.44s, while if I only use ASCII letters (uppercase+lowercase) + digits, it takes ~2.7s.