apache / arrow

Apache Arrow is the universal columnar format and multi-language toolbox for fast data interchange and in-memory analytics
https://arrow.apache.org/
Apache License 2.0
14.49k stars 3.52k forks source link

[C++] Util: Compression supports a Compression/Decompression Context #37169

Open mapleFU opened 1 year ago

mapleFU commented 1 year ago

Describe the enhancement requested

In compression, zstd supports a Compression/Decompression context[1]. It will make it better for system memory usage. RocksDB can utilize the Context[2] and Context Cache[3].

So here I propose an support for context.

class ARROW_EXPORT CodecContext {
   std::unique_ptr<CompressionContext> compress_ctx
   std::unique_ptr<DecompressionContext> decompress_ctx;
};

/// \brief Compression codec options
class ARROW_EXPORT CodecOptions {
 public:
  int compression_level;
  + std::unique_ptr<CodecContext> context;
};

(this require compression / decompression not run concurrently, if they run concurrently, we may need another interface like

  /// \brief One-shot decompression function
  ///
  /// output_buffer_len must be correct and therefore be obtained in advance.
  /// The actual decompressed length is returned.
  ///
  /// \note One-shot decompression is not always compatible with streaming
  /// compression.  Depending on the codec (e.g. LZ4), different formats may
  /// be used.
  virtual Result<int64_t> Decompress(int64_t input_len, const uint8_t* input,
                                     int64_t output_buffer_len,
                                     uint8_t* output_buffer, CompressionCtx*) = 0;

  /// \brief One-shot compression function
  ///
  /// output_buffer_len must first have been computed using MaxCompressedLen().
  /// The actual compressed length is returned.
  ///
  /// \note One-shot compression is not always compatible with streaming
  /// decompression.  Depending on the codec (e.g. LZ4), different formats may
  /// be used.
  virtual Result<int64_t> Compress(int64_t input_len, const uint8_t* input,
                                   int64_t output_buffer_len, uint8_t* output_buffer, DecompressionCtx*) = 0;
  1. https://raw.githack.com/facebook/zstd/release/doc/zstd_manual.html
  2. https://github.com/facebook/rocksdb/blob/a09c141dde51372d14bcfd3affdd242f1248c761/util/compression.h#L362
  3. https://github.com/facebook/rocksdb/blob/a09c141dde51372d14bcfd3affdd242f1248c761/util/compression_context_cache.h#L27

Component(s)

C++

wgtmac commented 1 year ago

This should be useful. What about directly add the concrete context into sub-class to avoid another abstraction of CodecContext?

For example:

class ARROW_EXPORT ZstdCodecOptions : public CodecOptions {
  ZSTD_CStream* cstream_;
  ZSTD_DStream* dstream_;
};
mapleFU commented 1 year ago

Both of them LGTM. @kou @pitrou What kind of interface would you like? Or should I post this to dev mailgroup?

kou commented 1 year ago

Can we put them to Codec itself instead of CodecOptions? It seems that Codec and CodecOptions are one-to-one relation.

mapleFU commented 1 year ago

@kou

I've another question, does Codec gurantee it will not call Compress parallel ? If not, maybe we can only use:

  /// \brief One-shot decompression function
  ///
  /// output_buffer_len must be correct and therefore be obtained in advance.
  /// The actual decompressed length is returned.
  ///
  /// \note One-shot decompression is not always compatible with streaming
  /// compression.  Depending on the codec (e.g. LZ4), different formats may
  /// be used.
  virtual Result<int64_t> Decompress(int64_t input_len, const uint8_t* input,
                                     int64_t output_buffer_len,
                                     uint8_t* output_buffer, CompressionCtx*) = 0;

  /// \brief One-shot compression function
  ///
  /// output_buffer_len must first have been computed using MaxCompressedLen().
  /// The actual compressed length is returned.
  ///
  /// \note One-shot compression is not always compatible with streaming
  /// decompression.  Depending on the codec (e.g. LZ4), different formats may
  /// be used.
  virtual Result<int64_t> Compress(int64_t input_len, const uint8_t* input,
                                   int64_t output_buffer_len, uint8_t* output_buffer, DecompressionCtx*) = 0;

And if we can gurantee that Compress and Decompress will not in parallel, We can put context in Codec

kou commented 1 year ago

Compress and Decompress aren't multi-thread safe.

Can we create a Codec for each thread and put compress/decompress contexts to Codec?

pitrou commented 1 year ago

Why would be expose this as an API, rather than use internally?

mapleFU commented 1 year ago

The RocksDB would use it internally (like https://github.com/facebook/rocksdb/blob/6a3da5635e1013f03930453481f49724f2319252/util/compression.h#L437 )

Personally, I also like use it internally, but this means that we need an "object-pool" for context. I didn't test the cost of it. Maybe I can try to draft a poc for that :-)

mapleFU commented 1 year ago

Oh, I can always learn something I didn't catch from arrow's code.

Codec doesn't uses context. But we can create a Decompressor from Codec, the decompressor has a Stream, which is same as context. Let me try to benchmark it

mapleFU commented 1 year ago

For parallel execution, use one separate ZSTD_CStream per thread.

note : since v1.4.0, ZSTD_CStream and ZSTD_CCtx are the same thing.

Parameters are sticky : when starting a new compression on the same context, it will re-use the same sticky parameters as previous compression session. When in doubt, it's recommended to fully initialize the context before usage. Use ZSTD_CCtx_reset() to reset the context and ZSTD_CCtx_setParameter(), ZSTD_CCtx_setPledgedSrcSize(), or ZSTD_CCtx_loadDictionary() and friends to set more specific parameters, the pledged source size, or load a dictionary.

Seems that we can create a decompressor, which could make full use of ctx, and saving some memory. However, it shares different api with Codec.

pitrou commented 1 year ago

You could first investigate if it makes a significant difference on our use cases.

mapleFU commented 1 year ago

oops, I've port the implemention in https://github.com/mapleFU/arrow/commit/0010894d38ecd07c94ac048b98071a375cd26a7a

It doesn't has remarkable speed optimization, but it can decrease some memory allocations

pitrou commented 1 year ago

Did you benchmark it on some workload(s)?

mapleFU commented 1 year ago

Yeah, I create a "non-streaming" decompress https://github.com/mapleFU/arrow/commit/36f2f31dbdaea8ace2fb3be695fc610f830d50c6

The speed don't has any benefits or downgrade, but it decrease some allocation.

pitrou commented 1 year ago

If it can't be measured, then IMHO it's not worth the added complexity and maintenance...

kou commented 1 year ago

It seems that https://github.com/apache/arrow/compare/main...mapleFU:arrow:arrow-zstd-compress uses context in ZSTDCodec::Decompress() but adds benchmarks for ZSTDDecompressor::Decompress() not ZSTDCodec::Decompress().

mapleFU commented 1 year ago

Nope, the benchmark uses auto& decompressor = codec...

kou commented 1 year ago

Oh, sorry.