facebook / zstd

Zstandard - Fast real-time compression algorithm
http://www.zstd.net
Other
23.23k stars 2.07k forks source link

Scatter-gather buffer support #2345

Open ned14 opened 3 years ago

ned14 commented 3 years ago

Lee Howes asked me to post this feedback here.

To provide some context, we use zstd to compress all the trades which occur in the United States daily, which we store into a live custom database for the US SEC and the major financial institutions as per Congressional mandate. This is a lot of data, for example a single channel from OPRA which supplies options and futures might be 1Tb-2Tb per day, and OPRA has forty something channels, and that is just one exchange. We need to provide queries across this live dataset up to a few seconds from just now, but also into the past for several years, and we need to return the first result for any arbitrary query within a millisecond 99.9% of the time, and maintain many sigmas of availability.

I'll firstly say, as I told Lee, that there is literally no other game in town than zstd for this problem. You're unique. This is because we have a power law distribution of sizes of blocks to individually compress, so a tiny proporation are massive (e.g. AMZN), but there are millions which are tiny. We therefore need to train a shared dictionary for those millions of small blocks to get their aggregate size down (as each have a lot of shared structure, so they share dictionaries well), but also fire as many CPU cores as possible to parallelise compression. We also have a time space tradeoff, so for some exchanges it is worth doing a fast compression as a low zstd level to get data off to S3 quickly, and later recompress that data with a higher zstd level to achieve better storage density. zstd gives us all the runtime knobs and twiddles to manually customise our compression per exchange, and that's fabulous for us.

zstd is close to perfect to our needs, but its current main weakness for us is in dictionary training:

  1. You must assemble your training set into a single, contiguous, block of memory. This requires us to copy a hundred Gb or so of memory needlessly. If the API instead took a list of pointers each to a training item, or even better again, it took in a sequence of gather buffers (i.e. each training block might itself be distributed across memory), this would be a lot more efficient.

  2. There is no way of dictionary training giving progress, which means the routine appears to just hang for an unpredictable time. Internally, the routine loops dictionary generation as it tries to figure out an optimal set. It would be very useful if it could call a callback function per round, ideally with the ability for the callback routine to say whether the training is "good enough" i.e. we might run a timer, and choose whatever is the best after one minute. It would be also useful if the warnings currently printable could instead be given to the callback function, so we can log them into our proper logger.

  3. Perhaps this is already supported, but dictionary training ought to be embarrassingly parallelisable. I'd like the ability to control the threads doing the training however, for example in C++ 23 all concurrency ought to act via an Executor. Breaking out the internals into public APIs so we can compose our own training implementations would be great.

Whilst we, in our current use case for zstd, don't need this, in general a scatter-gather based API design is a better call than a single buffer API. This lets one compress, or decompress, frames gathering in the data discontiguously distributed across memory, or scattering it discontiguously across memory. As you may know given you're under the same management structure, Eric Niebler designed Ranges for C++. These naturally act upon discontiguous distributions of data across memory which are composed into as-if apparently contiguous byte sequences, and we on WG21 are going to be extending his work (along with Eric's FB team) into a wider, standardised, framework for C++ whereby everything naturally works in terms of scatter-gather buffer sequences, in order to avoid unnecessary memory copies. If zstd were similarly scatter-gather buffer orientated, including its dictionary training API, that would be superb.

Finally I'd like to thank those supporting this repo with taking the time to engage so fully with the users. On multiple occasions I found treasures of discussion and advice inside the commentary in this issue tracker which has accumulated over the years. So few engineers bother to so fully engage with users to the detriment of us and the ecosystem, but you guys here do so, and it's a real credit to you. So thank you!

terrelln commented 3 years ago

Thanks for the feedback @ned14, it is super helpful!

Our dictionary building API and documentation need improvement. Basically everything I'm going to suggest here should be covered in a top-level doc-block in the zdict.h header. We also should improve the API to improve clarity (better parameter names, less implementation details), and better support for progress info with callbacks as you suggest.

You must assemble your training set into a single, contiguous, block of memory. This requires us to copy a hundred Gb or so of memory needlessly. If the API instead took a list of pointers each to a training item, or even better again, it took in a sequence of gather buffers (i.e. each training block might itself be distributed across memory), this would be a lot more efficient.

Currently, all of our training algorithms require memory in a single contiguous buffer. Two of these algorithms cannot be written in any other way (legacy, cover), and one (fastcover) could potentially be rewritten to avoid it. So there wouldn't be an efficiency gain for our current algorithms, because we'd need to concatenate into one buffer internally.

Ignoring efficiency, do you think a gather buffer API would be more ergonomic than the current API?

Dictionary training is an inherently lossy process. We don't need GBs of data to train a dictionary. Currently we will train on whatever the user passes us, but we wouldn't recommend training on more than 100 MB (10 MB is probably enough). If you have more data than that, then randomly sample it down to 10 MB. That is one thing we could potentially do with a gather buffer API, we could randomly sample 100 MB of it and only move that into a contiguous buffer.

There is no way of dictionary training giving progress, which means the routine appears to just hang for an unpredictable time. Internally, the routine loops dictionary generation as it tries to figure out an optimal set. It would be very useful if it could call a callback function per round, ideally with the ability for the callback routine to say whether the training is "good enough" i.e. we might run a timer, and choose whatever is the best after one minute. It would be also useful if the warnings currently printable could instead be given to the callback function, so we can log them into our proper logger.

That's good feedback. I'm unhappy with the current way that the dictionary builder displays its progress via stderr. It would be nice to migrate away from stderr and pass status info in a structured way into a callback, and let the callback stop training short, like you suggest.

I would gladly accept PRs that did this. Otherwise I hope we can find some time to improve the API at some point, but I can't guarantee it will be terribly soon.

Perhaps this is already supported, but dictionary training ought to be embarrassingly parallelisable. I'd like the ability to control the threads doing the training however, for example in C++ 23 all concurrency ought to act via an Executor. Breaking out the internals into public APIs so we can compose our own training implementations would be great.

We're working on exposing zstd's threading internals (compression not dictionary building currently) to allow users to pass in their own thread pools. The API we're looking at is something like this.

zstd is close to perfect to our needs, but its current main weakness for us is in dictionary training:

I can make several suggestions on how to improve dictionary training speed significantly without a (large) loss of compression ratio.

  1. Reduce the amount of training data. I would cap it at 100MB, and wouldn't expect a noticeable compression ratio loss from doing so, but would expect a large speed gain if the size was larger than that before. You should also try 10MB.
  2. Try the fastcover dictionary builder if you aren't already using it. It might perform slightly worse than cover, but it should be pretty close, since it is using the same high level algorithm, but with some approximations. You can play with some parameters to tradeoff training speed for dictionary quality. Some details are in these release notes.
    • You can tune the parameter f where higher means slower and maybe a better dictionary, and lower means faster and maybe a worse dictionary.
    • You can try a higher accel for faster training.
    • You can try to reduce steps.

Internally, we use the cover dictionary builder, because we value a small (1-3%) compression ratio improvement over really long training time (days in total). And we will train on ~5000 samples, but that is likely too many for use cases where the data is larger (10KB+), and slows down training unnecessarily. But we just haven't optimized for training time much.

Let me know if you have any further questions or suggestions! I will leave this issue open to track dictionary builder API/doc improvements.

ned14 commented 3 years ago

Thanks for the feedback @ned14, it is super helpful!

You should thank Lee for reaching out to me and asking me. I probably would have sent you something like this eventually anyway, but he definitely initiated it, not me.

You must assemble your training set into a single, contiguous, block of memory. This requires us to copy a hundred Gb or so of memory needlessly. If the API instead took a list of pointers each to a training item, or even better again, it took in a sequence of gather buffers (i.e. each training block might itself be distributed across memory), this would be a lot more efficient.

Currently, all of our training algorithms require memory in a single contiguous buffer. Two of these algorithms cannot be written in any other way (legacy, cover), and one (fastcover) could potentially be rewritten to avoid it. So there wouldn't be an efficiency gain for our current algorithms, because we'd need to concatenate into one buffer internally.

One of my domain expertises is bulk i/o, hence me being recruited to solve this specific problem due to my work on https://github.com/ned14/llfio. One thing I've learned is this: always write everything in scatter-gather buffers. Sure, it's a bit more painful to implement and debug, but the composability, flexibility and performance gains are worth it.

I cannot obviously say what will come in C++ 26, but some of us intend a preallocated locked DMA buffer based design using RIO on Windows and io_uring on Linux to achieve saturation of a 100Mbit NIC from a single kernel thread. In this, i/o buffers are allocated in fragments out of a number of 2Mb pages locked into memory for the exclusive purposes of DMA. i/o occurs only within the locked pages, and those buffer fragments have sizes and offsets not under our control. As you can probably surmise, the ability to have zstd scatter decompress without additional memory copying into those arbitrarily sized fragment buffers would be extremely important to maximum efficiency.

Ignoring efficiency, do you think a gather buffer API would be more ergonomic than the current API?

In C++, you just do:

byte buffer[...];
fh.read(0 /*offset*/, /*scatter buffer list*/ {buffer});

This is because C++ can compose scatter gather buffers from initialiser lists supplied with known size input, so that "just works".

For C, you need two functions just like readv() and read(). One takes a pointer to an array of struct iovec, or the moral equivalent thereof, the other takes a single buffer and internally simply calls the former. Any decent compiler will optimise out the loops iterating the buffer lists, and you'll get the same performance as now.

So to answer your question, I think it would be identically ergonomic to right now. Just do what POSIX does in readv() and read(), they're a fine API design.

Dictionary training is an inherently lossy process. We don't need GBs of data to train a dictionary. Currently we will train on whatever the user passes us, but we wouldn't recommend training on more than 100 MB (10 MB is probably enough). If you have more data than that, then randomly sample it down to 10 MB. That is one thing we could potentially do with a gather buffer API, we could randomly sample 100 MB of it and only move that into a contiguous buffer.

Me personally, I'd prefer if you could instead train multiple dictionaries using more CPU. In our situation, for all our small blocks, statistically speaking they ought to be categorisable into groups of close similarity, and each of those groups would be fairly dissimilar. You then create a shared dictionary for each. During compression of a given block, it gets compressed with all generated dictionaries in parallel, and we retain the smallest one.

Now, I can easily think of a O(N^2) algorithm for training categorisation, possibly with a few heuristics to tamp N downwards to keep everything sane. But surely you could use Bayesian classifier or something similar here to avoid exponential complexity?

Perhaps this is already supported, but dictionary training ought to be embarrassingly parallelisable. I'd like the ability to control the threads doing the training however, for example in C++ 23 all concurrency ought to act via an Executor. Breaking out the internals into public APIs so we can compose our own training implementations would be great.

We're working on exposing zstd's threading internals (compression not dictionary building currently) to allow users to pass in their own thread pools. The API we're looking at is something like this.

I can see why you'd choose that approach.

Me personally, I'd flip the API design such that all threading and dynamic memory allocation is done by dependency injection. I appreciate that that would be a massive breaking change for your current API, and indeed all current zstd using code would need to be refactored. So it's a showstopper.

What you've done will do, but please always ask yourself when designing C callbacks: "will this work well when executing on a GPU?". Because all C++ 23 conforming programs are expected to work perfectly on GPUs, they're a primary target now, and C will follow where C++ goes. And, for reference, runtime definable callbacks have really awful performance on GPUs, so you want to avoid any where the compiler's optimiser can't elide them after static analyis (tip: consider const pointers to function callbacks in a const table, like a polymorphic C++ class' vtable. Those are intentionally designed to be elidable).

I can make several suggestions on how to improve dictionary training speed significantly without a (large) loss of compression ratio.

  1. Reduce the amount of training data. I would cap it at 100MB, and wouldn't expect a noticeable compression ratio loss from doing so, but would expect a large speed gain if the size was larger than that before. You should also try 10MB.
  2. Try the fastcover dictionary builder if you aren't already using it. It might perform slightly worse than cover, but it should be pretty close, since it is using the same high level algorithm, but with some approximations. You can play with some parameters to tradeoff training speed for dictionary quality. Some details are in these release notes.

We're already on fastcover, and I shall look into capping input size. Thanks for your suggestions and feedback!

terrelln commented 3 years ago

Me personally, I'd prefer if you could instead train multiple dictionaries using more CPU. In our situation, for all our small blocks, statistically speaking they ought to be categorisable into groups of close similarity, and each of those groups would be fairly dissimilar. You then create a shared dictionary for each. During compression of a given block, it gets compressed with all generated dictionaries in parallel, and we retain the smallest one.

Now, I can easily think of a O(N^2) algorithm for training categorisation, possibly with a few heuristics to tamp N downwards to keep everything sane. But surely you could use Bayesian classifier or something similar here to avoid exponential complexity?

I worked with @ot and a few others on a project internally that used multiple dictionaries for LZ4. The approach we took is:

  1. Pick k (we use 4). A k-mer is a k-byte substring. We hash the k-mer and take the hash mod KmerDimension (we use 2^20) to get the kmerId.
  2. Call the fingerprint vector of source i F_i. It is the list of all kmerId's in the source (size(source_i) - k + 1 of them). You can skip over kmer's to make it smaller, like taking every other kmer.
  3. Given a cluster C_i of documents F_1, ..., F_n we will call its fingerprint vector the top KmersPerCluster (we use MaxDictSize / k) k-mers of all the documents in the cluster.
  4. Cluster these documents using Lloyd's algorithm. The closeness function between each fingerprint vector F_i and each cluster fingerprint C_i is the number of overlapping k-mers (higher is better).
  5. Once you have your clusters build a dictionary for each cluster.
  6. The fingerprint for a dictionary is the same as the fingerprint for a source.
  7. When compressing a source, choose the dictionary with the most k-mers in common.

The speed/accuracy of the clustering & selection can be traded off in two main ways:

This is something that we want to implement inside of zstd at some point in the future. Whenever we get an intern to work on it :).

One thing I've learned is this: always write everything in scatter-gather buffers

I'll keep that in mind! When we look at refreshing the dictionary building API we can at least provide the scatter-gather API, even if we can't take advantage of it internally yet.

Me personally, I'd flip the API design such that all threading and dynamic memory allocation is done by dependency injection. I appreciate that that would be a massive breaking change for your current API, and indeed all current zstd using code would need to be refactored. So it's a showstopper.

We provide ZSTD_customMalloc to inject dynamic memory allocation functions. And we also allow static allocation with our ZSTD_initStatic* functions. With the thread-pool being injected as well, that would remove all explicit threading inside of zstd.

tip: consider const pointers to function callbacks in a const table, like a polymorphic C++ class' vtable. Those are intentionally designed to be elidable

Good to know, I hadn't thought about making them const, but that is the intention.

ned14 commented 3 years ago

This is something that we want to implement inside of zstd at some point in the future. Whenever we get an intern to work on it :).

Google Summer of Code is another option.

One thing I've learned is this: always write everything in scatter-gather buffers

I'll keep that in mind! When we look at refreshing the dictionary building API we can at least provide the scatter-gather API, even if we can't take advantage of it internally yet.

Well, maybe. If I see a scatter-gather API, I assume it does not copy everything into an internally allocated buffer. If it did, I would get upset at expectations violated.

tip: consider const pointers to function callbacks in a const table, like a polymorphic C++ class' vtable. Those are intentionally designed to be elidable

Good to know, I hadn't thought about making them const, but that is the intention.

Just so I'm being clear here, and you understand what I'm saying, when one initialises a context one would pass in a const pointer to something like this:

struct ZCtx_callbacks {
  void * (*allocate)(size_t);
  void (*free)(void *);
  void (*launchthread)(void (*f)(void));
  ...
};

If the user allocates that as a global static const structure:

static const ZCtx_callbacks mycallbacks = { .allocate = myallocate, .free = myfree };

Then the compiler can hard assume that the pointers within that structure will never, ever change during the lifetime of the program. It then can rewrite your code for you to completely eliminate that pointer table because it will inline the implementations through devirtualisation, so there is no longer any indirect function calls at all.

That, in turn, then executes quickly on GPUs, because in codegen the indirect function calls have been completely eliminated. If you need to see an example of this technique in action, OpenSSL uses it. Hopefully that clarifies things.