Rogdham / python-xz

Pure Python implementation of the XZ file format with random access support
MIT License
24 stars 2 forks source link

Parallelize decoding #3

Open mxmlnkn opened 2 years ago

mxmlnkn commented 2 years ago

Xz is pretty slow compared to other compression formats. It would be really cool if python-xz could be parallelized such that it prefetches the next blocks and decodes them in parallel. I think this would be a helpful feature and unique selling point for python-xz. I don't think there is a parallelized XZ decoder for Python at all, or is there?

I'm doing something similar in indexed_bzip2. But, I am aware that this adds complexity and problems:

I implemented a very rudimentary sketch on top of python-xz using multiprocessing.pool.Pool. It has the same design as indexed_bzip2, which is:

With this, I was able to speed up the decompression of a 3.1GiB xz file (decompressed 4GiB) consisting of 171 blocks by factor ~7 on an 8-core CPU (16 virtual cores):

Hower, at this point I'm becoming uncertain whether this might be easier to implement inside python-xz itself or whether the wrapper is a sufficient ad-hoc solution. It only uses public methods and members of XZFile, so it should be stable during non-major version changes.

Rudimentary unfinished sketch / proof of work:

decompress-xz-parallel.py

Click to expand ```python3 #!/usr/bin/env python3 # -*- coding: utf-8 -*- import bisect import io import lzma import math import multiprocessing.pool import os import resource import sys import time from typing import Iterable import xz from parallel_xz_decoder import ParallelXZReader def benchmark_python_xz_serial(filename): print("== Benchmark serial xz file decompression ==") size = 0 t0 = time.time() with xz.open(filename, 'rb') as file: t1 = time.time() while True: readSize = len(file.read(32 * 1024 * 1024)) if readSize == 0: break size += readSize if time.time() - t1 > 5: t1 = time.time() print(f"{t1 - t0:.2f}s {resource.getrusage(resource.RUSAGE_SELF).ru_maxrss // 1024} MiB RSS") file.close() t1 = time.time() print(f"Reading {size} B took: {t1-t0:.3f}s") def test_python_xz_parallel(filename): print("== Test parallel xz file decompression ==") size = 0 t0 = time.time() with xz.open(filename, 'rb') as file, ParallelXZReader(filename, os.cpu_count()) as pfile: t1 = time.time() while True: readData = file.read(8 * 1024 * 1024) parallelReadData = pfile.read(len(readData)) print("Read from:", file, pfile) if readData != parallelReadData: print("inequal", len(readData), len(parallelReadData)) assert readData == parallelReadData readSize = len(readData) if readSize == 0: break size += readSize if time.time() - t1 > 5: t1 = time.time() print(f"{t1 - t0:.2f}s {resource.getrusage(resource.RUSAGE_SELF).ru_maxrss // 1024} MiB RSS") file.close() t1 = time.time() print(f"Reading {size} B took: {t1-t0:.3f}s") def benchmark_python_xz_parallel(filename): print("== Benchmark parallel xz file decompression ==") size = 0 t0 = time.time() with ParallelXZReader(filename, os.cpu_count()) as file: t1 = time.time() while True: readSize = len(file.read(8 * 1024 * 1024)) if readSize == 0: break size += readSize if time.time() - t1 > 5: t1 = time.time() print(f"{t1 - t0:.2f}s {resource.getrusage(resource.RUSAGE_SELF).ru_maxrss // 1024} MiB RSS") file.close() t1 = time.time() print(f"Reading {size} B took: {t1-t0:.3f}s") if __name__ == '__main__': print("xz version:", xz.__version__) filename = sys.argv[1] benchmark_python_xz_serial(filename) test_python_xz_parallel(filename) benchmark_python_xz_parallel(filename) # TODO test with multistream xz ```

parallel_xz_decoder.py

Click to expand ```python3 #!/usr/bin/env python3 # -*- coding: utf-8 -*- import bisect import io import lzma import math import multiprocessing.pool import os import resource import sys import time from typing import Iterable import xz # TODO Add tests for everything def overrides(parentClass): """Simple decorator that checks that a method with the same name exists in the parent class""" def overrider(method): assert method.__name__ in dir(parentClass) assert callable(getattr(parentClass, method.__name__)) return method return overrider class LruCache(dict): def __init__(self, size: int = 10): self.size = size self.lastUsed: List[int] = [] def _refresh(self, key): if key in self.lastUsed: self.lastUsed.remove(key) self.lastUsed.append(key) def __setitem__(self, key, value): super().__setitem__(key, value) self._refresh(key) while super().__len__() > self.size: super().__delitem__(self.lastUsed.pop(0)) def __getitem__(self, key): value = super().__getitem__(key) self._refresh(key) return value class Prefetcher: def __init__(self, memorySize): self.lastFetched = [] self.memorySize = memorySize def fetch(self, value): if value in self.lastFetched: self.lastFetched.remove(value) self.lastFetched.append(value) while len(self.lastFetched) > self.memorySize: self.lastFetched.pop(0) def prefetch(self, maximumToPrefetch) -> Iterable: if not self.lastFetched or maximumToPrefetch <= 0: return [] consecutiveCount = 0 values = self.lastFetched[::-1] for i, j in zip(values[0:-1], values[1:]): if i == j + 1: consecutiveCount += 1 else: break # I want an exponential progression like: logStep**consecutiveCount with the boundary conditions: # logStep**0 = 1 (mathematically true for any logStep because consecutiveCount was chosen to fit) # logStep**maxConsecutiveCount = maximumToPrefetch # => logStep = exp(ln(maximumToPrefetch)/maxConsecutiveCount) # => logStep**consecutiveCount = exp(ln(maximumToPrefetch) * consecutiveCount/maxConsecutiveCount) prefetchCount = int(round(math.exp(math.log(maximumToPrefetch) * consecutiveCount / (self.memorySize - 1)))) return range(self.lastFetched[-1] + 1, self.lastFetched[-1] + 1 + prefetchCount) class ParallelXZReader(io.BufferedIOBase): # TODO test if a simple thread pool would also parallelize equally well """Uses a process pool to prefetch and cache decoded xz blocks""" def __init__(self, filename, parallelization): print("Parallelize:", parallelization) self.parallelization = parallelization - 1 # keep one core for on-demand decompression self.pool = multiprocessing.pool.Pool(self.parallelization) self.offset = 0 self.filename = filename self.fileobj = xz.open(filename, 'rb') self.blockCache = LruCache(2 * parallelization) self.prefetcher = Prefetcher(4) assert self.fileobj.seekable() and self.fileobj.readable() print(self.fileobj.stream_boundaries) print(self.fileobj.block_boundaries) # contains uncompressed offsets and therefore sizes -> perfect! def _findBlock(self, offset: int): blockNumber = bisect.bisect_right(self.fileobj.block_boundaries, offset) print("Look for offset:", offset, "found:", blockNumber) if blockNumber <= 0: return blockNumber - 1, 0, 0 if blockNumber >= len(self.fileobj.block_boundaries) or blockNumber <= 0: return blockNumber - 1, offset - self.fileobj.block_boundaries[blockNumber - 1], -1 blockSize = self.fileobj.block_boundaries[blockNumber] - self.fileobj.block_boundaries[blockNumber - 1] offsetInBlock = offset - self.fileobj.block_boundaries[blockNumber - 1] assert offsetInBlock >= 0 assert offsetInBlock < blockSize return blockNumber - 1, offsetInBlock, blockSize def _blockSize(self, blockNumber): blockNumber += 1 if blockNumber >= len(self.fileobj.block_boundaries) or blockNumber <= 0: return -1 return self.fileobj.block_boundaries[blockNumber] - self.fileobj.block_boundaries[blockNumber - 1] @staticmethod def _decodeBlock(filename, offset, size): with xz.open(filename, 'rb') as file: file.seek(offset) return file.read(size) def __enter__(self): return self def __exit__(self, exception_type, exception_value, exception_traceback): self.close() @overrides(io.BufferedIOBase) def close(self) -> None: self.fileobj.close() self.pool.close() @overrides(io.BufferedIOBase) def fileno(self) -> int: # This is a virtual Python level file object and therefore does not have a valid OS file descriptor! raise io.UnsupportedOperation() @overrides(io.BufferedIOBase) def seekable(self) -> bool: return True @overrides(io.BufferedIOBase) def readable(self) -> bool: return True @overrides(io.BufferedIOBase) def writable(self) -> bool: return False @overrides(io.BufferedIOBase) def read(self, size: int = -1) -> bytes: print("\nread", size, "from", self.offset) result = bytes() blocks = [] blockNumber, firstBlockOffset, blockSize = self._findBlock(self.offset) print("Found block:", blockNumber, blockSize, firstBlockOffset) if blockNumber >= len(self.fileobj.block_boundaries) or blockNumber < 0: return result pendingBlocks = sum(not block.ready() for block in self.blockCache.values()) availableSize = blockSize - firstBlockOffset while True: # Fetch Block self.prefetcher.fetch(blockNumber) if blockNumber in self.blockCache: fetchedBlock = self.blockCache[blockNumber] else: print("fetch block:", blockNumber, "sized", self._blockSize(blockNumber)) fetchedBlock = self.pool.apply_async( ParallelXZReader._decodeBlock, (self.filename, self.fileobj.block_boundaries[blockNumber], self._blockSize(blockNumber)), ) self.blockCache[blockNumber] = fetchedBlock pendingBlocks += 1 blocks.append(fetchedBlock) if size <= availableSize or blockSize == -1: break size -= availableSize self.offset += availableSize # Get metadata for next block blockNumber += 1 if blockNumber >= len(self.fileobj.block_boundaries): break blockSize = self._blockSize(blockNumber) offsetInBlock = self.offset - self.fileobj.block_boundaries[blockNumber - 1] availableSize = blockSize - offsetInBlock # TODO apply prefetch suggestion maxToPrefetch = self.parallelization - pendingBlocks toPrefetch = self.prefetcher.prefetch(self.parallelization) print("Prefetch suggestion:", toPrefetch) for blockNumber in toPrefetch: if blockNumber < len(self.fileobj.block_boundaries) and blockNumber not in self.blockCache: fetchedBlock = self.pool.apply_async( ParallelXZReader._decodeBlock, (self.filename, self.fileobj.block_boundaries[blockNumber], self._blockSize(blockNumber)), ) self.blockCache[blockNumber] = fetchedBlock pendingBlocks += 1 print("pending blocks:", pendingBlocks) print("Got blocks:", blocks) while blocks: block = blocks.pop(0) # Note that it is perfectly safe to call AsyncResult.get multiple times! toAppend = block.get() print(f"Append view ({firstBlockOffset},{ size}) of block of length {len(toAppend)}") if firstBlockOffset > 0: toAppend = toAppend[firstBlockOffset:] if not blocks: toAppend = toAppend[:size] firstBlockOffset = 0 result += toAppend if blockNumber == 21: print("Result:", len(result)) # TODO fall back to reading directly from fileobj if prefetch suggests nothing at all to improve latency! # self.fileobj.seek(self.offset) # result = self.fileobj.read(size) self.offset += len(result) return result @overrides(io.BufferedIOBase) def seek(self, offset: int, whence: int = io.SEEK_SET) -> int: if whence == io.SEEK_CUR: self.offset += offset elif whence == io.SEEK_END: self.offset = self.cumsizes[-1] + offset elif whence == io.SEEK_SET: self.offset = offset if self.offset < 0: raise ValueError("Trying to seek before the start of the file!") if self.offset >= self.cumsizes[-1]: return self.offset return self.offset @overrides(io.BufferedIOBase) def tell(self) -> int: return self.offset ```

Manual Shell Execution

base64 /dev/urandom | head -c $(( 4*1024*1024*1024  )) > large
xz -T 0 --keep large
python3 decompress-xz-parallel.py large.xz
mxmlnkn commented 2 years ago

What I am missing interface-wise is the size of the last block. I can get all other sizes from the difference of neighboring blocks but not so for the last block. Strictly speaking, if we are talking about block boundaries, then the end of the last block should also be returned even if it is not the beginning of a new block. Then, I could infer the decompressed size of the last block from this.

Edit: Ok, I can simply use file.seek(0, io.SEEK_END) to geht the size and everything is fine.

Rogdham commented 2 years ago

After some thinking on this issue, I believe that this issue is complex, and would add too much complexity for this project: there are many ways to do parallelism in Python (threading, asyncio, gevent, etc.) and I feel like this will be too much for a single use case (parallel decompression).

This would be in the same way that I chose not to support other features, such as decompressing a stream on the fly: python-xz needs the whole xz file before being able to open it (the first things it does is seeking to the very end).

So I would suggest creating your own library to support this specific use-case. If you decide to base it on python-xz I would be happy and to some extend would try to provide some support, for example by improving the API of python-xz.


With that being said, I agree that the API to get the blocks and their positions/sizes is not optimal at all. I copied that from the lzmaffi library. Indeed, the first value in the list will always be 0, which is not really useful.

I feel like a better API would be to have an attribute returning all the blocks as a list of objects, and from each object you would be able to get the size and the position.

I can see two possible APIs, let me know what you think file.blocks should return:

  1. A dict whose keys are block positions, and whose values are XZBlock instances (len(block) to have their length
>>> for pos, block in file.blocks.items():
...     print(pos, len(block))
0 100
100 200
  1. A list of XZBlock instances, and add an attribute to each one to get the position
>>> for block in file.blocks:
...     print(block.something, len(block))  # attribute name `something` to be decided
0 100
100 200

Ok, I can simply use file.seek(0, io.SEEK_END) to geht the size and everything is fine.

For the time being you can use len(file) that should work as well :wink:

mxmlnkn commented 2 years ago

I arranged myself with what is currently available, so it isn't that pressing anymore. Feel free to close this issue if you want. I improved and benchmarked the already posed sketch and added it to (the public interface of) ratarmountcore, from which it could even be used from other programs. The only thing missing is the check whether the block fits into memory and if not, fall back to on-demand reading and seeking. I'm not sure whether the ParallelXZReader class is worth a separate package albeit it would make it much easier for people to find it. After some tuning and fixes, the speedups on my 12-core are pretty consistent (for 0B per file, it is not decompression bandwidth bound):

parallel-xz-ratarmount-comparison

But about the proposed interfaces, based on the getter name blocks, the second one feels more correct, because it really is only a list of blocks. Compared with indexed_bz2 and indexed_zstd, which is based on the former, the first interface would be a tad more consistent except for the missing "size" offset as the last element.

piskvorky commented 1 year ago

@mxmlnkn @Rogdham @pauldmccarthy seeing the conversation above: is there any appetite to unify the indexed_gzip + indexed_bzip2 + python-xz approach?

I mean in the sense of a unified API solution to random-seeking. Python's built gzip and bz2 and lzma share the inefficient DecompressReader.seek() implementation. So rather than replace Python's three built-ins by three distinct libraries (which are great btw, thanks) as we do now, factor out the seek points / indexing functionality, and use that? I'm not interested in parallel decompression, only in better support for seeking (which TBH feels like should have been a part of core Python already).

And maybe even submit that API back to CPython, to replace the seek(0) atrocity in DecompressReader if the user chooses to do so.

In my imagination, the benefit would be a) lower maintenance / less duplicated effort, and b) greater reach of the outstanding work done on your libraries. But maybe I'm way off, please let me know.

mxmlnkn commented 1 year ago

@mxmlnkn @Rogdham @pauldmccarthy seeing the conversation above: is there any appetite to unify the indexed_gzip + indexed_bzip2 + python-xz approach?

I want to interject that I'm still heavily working on pragzip, which is the same code base as indexed_bzip2. Single-threaded pragzip should already have feature parity to indexed_gzip but I still have some todos for parallel decompression. Pragzip also comes with its own command line tool replacement for GNU gzip btw. Pragzip has its own Github repository for better visibility and it has its own PyPI package because it has different system dependencies (zlib) than indexed_bzip2 and because it is still frequently updated with changes that are completely irrelevant to indexed_bzip2.

But, the underlying infrastructure for parallel block-based decompression with random-seek capabilities is sufficiently generic to work with both bzip2 and gzip already and probably can also be extended to work with xz and zstandard. The interfaces of pragzip and indexed_bzip2 Python modules are also mostly similar. But, I'm only a single person doing this mostly in my free time, so the speed of progress is limited.

And maybe even submit that API back to CPython, to replace the seek(0) atrocity in DecompressReader if the user chooses to do so.

I have zero experience with that... It sounds difficult and (just speculating) it might not work because C++17, which indexed_bzip2 and pragzip heavily use is not allowed inside CPython? Also, other Python interpreters like PyPy, Codon(?), and others still would require the packages to be maintained.

In my imagination, the benefit would be a) lower maintenance / less duplicated effort, and b) greater reach of the outstanding work done on your libraries. But maybe I'm way off, please let me know.

a) For only the single-threaded random-seek support, I don't think it would save that much of effort. Each compression format index requires slightly different data and the main problem is to use existing compression libraries correctly (zstandard library, Python lzma, ...), i.e., highly divergent glue code. As for a generic infrastructure for random seeking plus block-parallel decompression, see my first paragraph.

b) I would like greater reach and adoption :). But, to be honest, I would also like some attribution and direct contact with users. And, if it's inside CPython, then it becomes basically invisible inside the whole amalgam code base of CPython.

Some unified library like libarchive is, would be nice though. In an ideal work, it might even be integrated into libarchive but starting a separate project is always easier to get done and libarchive is written in C, which I don't want to write.

pauldmccarthy commented 1 year ago

Hi @piskvorky @mxmlnkn @Rogdham, in principle I like the idea of a unified library, but unfortunately I just don't have the time to contribute more beyond discussion/review (indexed_gzip is solely in maintenance mode for the time being, and will likely soon be supplanted by the great work being done by @mxmlnkn anyway :smile:).

Rogdham commented 1 year ago

I have nothing against a unified library as well, and I agree there are many things that could be factored (e.g. the python-xz code for file-objects is generic already).

The good thing is that having a few different file formats to consider would make it pretty clear what can be factored and what can not.

Feel free to ping me if my knowledge about XZ file format can be of any help!