Closed LinusKardellInfobric closed 5 months ago
I like the second option, we can increase the inputBuffer
via bufferSize
if we want to get more data in ReadAsync
.
I think in that option ReadAsync
would also need to start out by calling DecompressStream
without reading any new input, to check if there is any data still buffered in the decompression context ~or in inputBuffer
~ which didn't fit in output
on the previous call (at least if the previous call ended with output.pos == output.size
).
Maybe something like (again, something I just quickly scratched together without testing):
EnsureNotDisposed();
var output = new ZSTD_outBuffer_s { pos = 0, size = (nuint)buffer.Length };
while (true)
{
// Check if we already have some data available to decompress
lastDecompressResult = DecompressStream(ref output, buffer.Span);
if (output.pos > 0)
{
return (int)output.pos;
}
// Otherwise, read some more data
if (input.pos >= input.size)
{
int bytesRead;
if ((bytesRead = await innerStream.ReadAsync(inputBuffer, 0, inputBufferSize, cancellationToken)
.ConfigureAwait(false)) == 0)
{
if (checkEndOfStream && lastDecompressResult != 0)
{
throw new EndOfStreamException("Premature end of stream");
}
return 0;
}
input.size = (nuint)bytesRead;
input.pos = 0;
}
}
input.pos >= input.size
checks that all input is consumed and we need to read more
True, but if I understand correctly it is still possible that the ZSTD_DCtx_s has consumed all compressed data from input
, but could not fit all the decompressed data in output
on the previous call.
We can add this before ReadAsync:
// frame is completely decoded so flush it before next read
if (lastDecompressResult == 0 && output.pos > 0) {
break;
}
I'm afraid I don't see the purpose of checking if a frame has ended there.
A frame ends at the flush/close of the compression stream, so a flush in compression will cause a flush in decompression.
Flushing the compression would end the current block (ZSTD_e_flush
), but not necessarily end the current frame (ZSTD_e_end
). And it is still possible that when a frame ends the sender immediately starts a new frame (so the start of the new frame is received at the same time as the end of the previous frame), or that data remains buffered in the ZSTD_DCtx_s
.
Should I make a pull request with my solution?
Flush before ReadAsync
then?
if (output.pos > 0) {
break;
}
I'm trying to use this library for adding compression to a network protocol, and the idea I had is to simply wrap the protocol in a ZSTD-compressed stream. Each time an end sends a message, it would flush out a compressed block. But I'm running into the problem that
DecompressionStream.ReadAsync
doesn't exit return the data it has already decompressed even if no more data is currently available on the inner stream. ReadAsync only exits onceoutput.pos >= output.size
, orinnerStream.ReadAsync
returns 0 (when the stream has ended). So there seems to be no way fetch all currently available data from a DecompressionStream when reading from a never-ending stream (unless you read one byte at a time from the DecompressionStream, but that would presumably be inefficient), and there seems to be no other way to access streaming decompression in this library without using unsafe code.I don't see any clear, clean way of having it check if there is any more available data (there is no generic DataAvailable on Stream, and no way to use timeout or cancellation with NetworkStream), so the only solutions I can think of would be to:
Have ReadAsync not await innerStream.ReadAsync if it already has some decompressed data, and instead possibly leave it in progress to be picked up by a subsequent call. Something like:
Though that's just something I quickly threw together, I haven't tested it, and Read would also need to be modified in order for mixed Sync/Async read to be safe, and I have no idea how to deal with that in Dispose.
ReadAsync
never callinnerStream.ReadAsync
whenoutput.pos > 0
(which means it would callinnerStream.ReadAsync
at most once on each call). Would have the drawback that ReadAsync might not fill the output buffer even if there is more data available.