Open marin-ma opened 1 year ago
cc: @FelixYBW
@pedroerp Any plan to unified the codec in Velox? It doesn't make sense we have 3 codec definition in current Velox. In Gluten shuffle we currently use arrow's. In Spill we use Folly.
It's the first step to add de-/compression accelerators in Velox
@mbasmanova @oerling
CC: @majetideepak
CC: @xiaoxmeng
@yaqi-zhao Based on discussion in #7445 , as Arrow codec currently doesn't support async compression, I would suggest we unify the compression API first, and extend the API to add interfaces for async.
@yaqi-zhao Based on discussion in #7445 , as Arrow codec currently doesn't support async compression, I would suggest we unify the compression API first, and extend the API to add interfaces for async.
I have tried to add interface for async decompression (class AsyncDecompressor ) in PR(https://github.com/facebookincubator/velox/pull/6176) . What I worried now is that how to design an interface for different hardware since the storage of the async status may be varied for different hardware. Another concern is that the upper-level caller may need massive change to apply the async interface.
Could you initiate a design proposal to facilitate the review? Thanks! @yaqi-zhao , @marin-ma
Could you initiate a design proposal to facilitate the review? Thanks! @yaqi-zhao , @marin-ma
@george-gu-2021 . I only have an intial idea about how to design Interface for IAA and you can refer to "class AsyncDecompressor in PR(https://github.com/facebookincubator/velox/pull/6176)".
class AsyncDecompressor {
public:
explicit AsyncDecompressor(){};
virtual ~AsyncDecompressor() = default;
virtual int decompress(
const char* src,
uint64_t srcLength,
char* dest,
uint64_t destLength) = 0;
virtual bool waitResult(int job_id) = 0;
virtual void releaseJob(int job_id) = 0;
};
With the concern I proposed above, I don't konw how to design a universal API for all hardware. Maybe @marin-ma @FelixYBW already have some insights already.
@yaqi-zhao It took me sometime to review #6176. Here are some initial thoughts:
QplJobPool
should be moved into the common module, which can be addressed after moving the unified compression API into the common module.PageReader
should not contains anything directly related to QPL
. IAACodec
uses any external resources, it should be responsible for holding the resource and managing its lifetime, then the codec is considered "stateful", which is similar to a streaming compressor. Hence, we can move the states related to QPL
from PageReader
into the IAACodec
, and avoid creating the codec each time we call its API from different function: https://github.com/facebookincubator/velox/pull/6176/files#diff-58ef2bd79ff4173b5a384a3016d6f7bd7d4b237876251cc9d9910e162993eaaaR1147-R1151for different hardware since the storage of the async status may be varied for different hardware.
Could you elaborate what kind of async status that should be handled? I would think the status is either successful, or failed. If it fails, the caller should be responsible for handling the status, deciding whether to fallback or throws exception (or returns error code). But anyhow the status should be general, not qplxxx
@george-gu-2021 @yaqi-zhao The detailed implementation of the Async compression API is somehow beyond the scope of this discussion. I would suggest we unify the compression module in the first step, and then we can extend different compression codecs in the common module, therefore make the IAA codec available for shuffle, spill, etc.
Besides, we can not only have the async IAA codec but also create a synchronized one.
@majetideepak @xiaoxmeng
Do you have any insights on the proposal of replacing dwrf/folly compression with arrow? Arrow compression API is already mature and can satisfy the requirements of other modules, so I'm planning to work on a PR to replace the codecs under velox/common/compression with Arrow codec implementation, but there are two paths we can choose:
Codec
returns arrow return code, the caller should handle the return code, such as throwing exception for arrow::Status::Invalid
.I would appreciate your suggestions and comments. Thanks!
I would suggest we unify the compression module in the first step
That would be nice. I didn't realize we have 3 implementations.
@marin-ma I'm a bit concerned about using Arrow's implementation directly. At a minimum we'd need to re-write it to match coding style and design patterns of Velox and replace status with Velox-specific. Can we maybe start by replacing dwrf/folly compression with velox/common/compression. Are there any differences in these 2 implementations?
@mbasmanova
Can we maybe start by replacing dwrf/folly compression with velox/common/compression.
Thank you for your advice!
Are there any differences in these 2 implementations?
Yes. The supported codecs are different, which have been listed in the issue description. And dwrf supports configuring a few codec-specific parameters, while folly doesn't.
@marin-ma
Yes. The supported codecs are different, which have been listed in the issue description. And dwrf supports configuring a few codec-specific parameters, while folly doesn't.
Got it. What are the "few codec-specific parameters"? If we need these, then maybe you can start looking at replacing these 2 implementations with a single unified one.
@mbasmanova
Got it. What are the "few codec-specific parameters"?
CompressionOptions
defined here
https://github.com/facebookincubator/velox/blob/9cfa1a6fd4262c355e71f3a1a31f599d682f1597/velox/dwio/common/compression/Compression.h#L68-L90
@marin-ma thanks for putting together the proposal. A suggested sequence of steps could be:
@pedroerp How do you think we define a async (de)compressor like this?
class AsyncDecompressor {
public:
explicit AsyncDecompressor(){};
virtual ~AsyncDecompressor() = default;
virtual Futures<int> decompress(
const char* src,
uint64_t srcLength,
char* dest,
uint64_t destLength) = 0;
};
@pedroerp How do you think we define a async (de)compressor like this?
class AsyncDecompressor { public: explicit AsyncDecompressor(){}; virtual ~AsyncDecompressor() = default; virtual Futures<int> decompress( const char* src, uint64_t srcLength, char* dest, uint64_t destLength) = 0; };
I would think returning SemiFuture
instead of Future
should be better. Then the caller can decide whether use .via()
to execute in another Executor, or use .get()
to execute inline. The API should be like:
folly::SemiFuture<uint64_t> decompressAsync(
const char* src,
uint64_t srcLength,
char* dest,
uint64_t destLength);
And the implementation for IAACompressor:
folly::SemiFuture<uint64_t> GzipIAADecompressor::decompressAsync(
const char* src,
uint64_t srcLength,
char* dest,
uint64_t destLength) {
// Acquire HW resources.
...
// Sumit job.
status = qpl_submit_job(job);
// If it fails, return SemiFuture ready with exception.
// If it succeeds, add `waitResult` callback.
if (status != QPL_STS_OK) {
qpl_job_pool.releaseJob(deflate_job.first);
return folly::makeSemiFutureWith([]() -> uint64_t {
throw std::runtime_error("Cannot submit job, error status" + status);
});
}
return folly::makeSemiFuture().deferValue(
[this, job](auto&&) -> uint64_t { waitResult(job); })
}
For caller, in the stage of pre-decompressing the page:
...
auto future = decompressor->decompressAsync(
(const char*)pageData,
compressedSize,
(char*)uncompressedData->asMutable<char>(),
uncompressedSize);
if (future.isReady()) {
auto result = std::move(future).getTry();
if (result.hasException()) {
return false;
}
}
// Store the SemiFuture.
decompressionFuture_ = std::move(future);
...
For caller, getting decompressed result:
// If decompressionFuture_ has value.
auto decompressedSize = std::move(decompressionFuture_).get(); // Execute `waitResult` callback inline.
I plan to submit a series of PRs for the implementation of compression codecs API. The submission sequence may change based on practical considerations. The proposed order is:
Late to the discussion, but +1. We could consider keeping Arrow::status as is and let the client handle it. There is some discussion on adding Status to Velox here https://github.com/facebookincubator/velox/discussions/6323 It is easier to start the migration starting with leaf components such as compression.
CC: @karteekmurthys
I plan to submit a series of PRs for the implementation of compression codecs API. The submission sequence may change based on practical considerations. The proposed order is:
- Add Compression Codec API. Add support for LZ4_FRAME, LZ4_RAW, and LZ4_HADOOP codecs. Include unit tests.
- Add support for GZIP/ZLIB Codec
- Add support for ZSTD codec.
- Add support for SNAPPY Codec.
- Integrate LZO Codec, reuse the existing implementation, primarily for use in the parquet reader. Currently, only decompression is available for LZO.
- Replace the decompression methods in dwio and parquet reader with the new API. Move the API to velox/common/compression and remove velox/dwio/common/compression.
- Replace the compression in the arrow parquet writer with the new API. Remove dependencies related to arrow compression.
- Add Asynchronous Compression API.
I've created some PRs to add the unified API, which are now ready for review. The initial patch #7589 has been created for the first goal. Since goal 2-5 depends on 1, I've submitted a draft PR #7603 and have implemented the remaining compression codecs. If necessary, I can split this into smaller PRs for easier review. I would greatly appreciate your review and feedback on these PRs when you have a moment. Thank you for your time. @pedroerp @mbasmanova @majetideepak
@marin-ma @FelixYBW Hi, sorry I didn't see this in time. Just some background: The Arrow codec was only used in Parquet writer (in the datasink, not datasource), which we recently moved into Velox (Velox used to call into Arrow Parquet writer). Parquet reader (in the data source) was an in house one that only uses Velox codec, and with https://github.com/facebookincubator/velox/pull/5914 and https://github.com/facebookincubator/velox/issues/6105, it is now using the common dwio::common::PagedInputStream
which in turn uses "velox/dwio/common/compression/Compression.h" . For the writer, the plan was to replace Arrow codecs with Velox ones if Velox already supports them. This was described in https://github.com/facebookincubator/velox/issues/6901. And yes, adding the missing Async codec interface/implementation in Velox would be great!
Thank you, @yingsu00 . We should use the same one for spill and shuffle as well. Can you review #7589?
We could consider keeping Arrow::status as is and let the client handle it. There is some discussion on adding Status to Velox here #6323
I just merged today a first cut of a Status class for Velox which we could use if we don't want to rely on exceptions here.
Thank you. @marin-ma will update the PR.
@FelixYBW @marin-ma Is there any progress in this issue? I see the PR has no update for a long time.
@FelixYBW @marin-ma Is there any progress in this issue? I see the PR has no update for a long time.
This PR is pending on the second PR to add the Result
return class described in https://github.com/facebookincubator/velox/pull/8084
y a first cut of a Status class for Velox which we c
@marin-ma It seems the PR https://github.com/facebookincubator/velox/pull/8084 has been merged one month ago. Is there any progress on the PR https://github.com/facebookincubator/velox/pull/7589?
@marin-ma It seems the PR #8084 has been merged one month ago. Is there any progress on the PR #7589?
there are 3 PRs planed as show in comments of 8084, copied below. 8084 is the first one. we are waiting for the second one.
This first one introduces a Status object that carries the (success or error) status of an operation. Based on arrow::Status. The second will add a Result object able to represent a value returned from an operation along with a Status code. This is also > based on arrow::Result The third one will refactor our Parquet Write to use these object and remove more dependencies from Arrow.
@FelixYBW @marin-ma I saw the PR(https://github.com/facebookincubator/velox/pull/7589) has been marked as stale. Will the PR still be reviewed and merged in the future?
@yaqi-zhao According to https://github.com/facebookincubator/velox/pull/8084#issuecomment-1908834455 the Result
implementation is in design. We're waiting for its landing first.
@yaqi-zhao According to #8084 (comment) the
Result
implementation is in design. We're waiting for its landing first.
@marin-ma Got it, thanks!
@yaqi-zhao is that blocking this work? That fell off of my radar, but I can take a stab at it soon if this is needed here.
take a s
@pedroerp Yes, https://github.com/facebookincubator/velox/pull/6176 depends on this issue. Here is the pervious comment that ask https://github.com/facebookincubator/velox/pull/6176 waiting for https://github.com/facebookincubator/velox/issues/7471 to be merged first.
I see. I think my question in whether this Issue depends on the "Result" implementation design.
I see. I think my question in whether this Issue depends on the "Result" implementation design.
I'm not sure. According to @marin-ma's previous comments, this issue is pending on the "Result" implementation design.
@pedroerp Should I go ahead with https://github.com/facebookincubator/velox/pull/7589 and use the Status
return code instead of throwing exceptions? If it's ok for you, we may consider changing the API to use the Result
type in the future once the Result
implementation is ready.
@marin-ma yes, we should use Status and Expected to avoid throwing exception in hot paths. @mbasmanova has recently added a few examples of how that can be used.
Description
Context
Within the current Velox implementation, there are three distinct modules utilizing different compression codecs and methods:
Parquet datasource - Uses Arrow codec
DWRF datasource - Uses a self-defined streaming compressor/decompressor
Spill - Uses Folly Codec
Proposal
It would be better to unify the compression strategy across these modules. Given that the Arrow compression API has already been integrated into Velox along with the parquet writer, it would be more efficient to embrace the Arrow compression API instead of creating a new Codec API from scratch:
Codec
interface for batch data compression and aDe/Compressor
interface for streaming data compression. It also supports configuring the parameters for different compressing algorithms withCodecOptions
.arrow::Status/Result
for return codes, the API is general for use and easy to extend. We have already implemented compressing codecs with QAT/IAA accelerators and successfully used in our project Gluten. With the unified Arrow compression API, we can directly contribute these enhancements to Velox.