luben / zstd-jni

JNI binding for Zstd
Other
808 stars 165 forks source link

Add Zstd.isLastBlock() for decompression #271

Closed skyguard1 closed 9 months ago

skyguard1 commented 11 months ago

As mentioned in this issue, network data packets are transmitted in blocks, and netty as a framework, in order to achieve universality, developers cannot be required to be compatible with our protocol. Therefore, is it possible to provide a method to determine the size of compressed data blocks during block decompression? We may not need to use streaming compression, but compressed block transmission is a very common requirement. My suggestion is to use two methods like this: public boolean isLastBlock(ByteBuffer src) or public int blockSize(ByteBuffer src) in Zstd. It is not intended to expose the details of the block, but to determine the size of the compressed data block. Thanks a lot

luben commented 11 months ago

I don't think blockSize is possible as the native library hides these details. With isLastBlock on the ZstdInputStream returning true if the last block was just processed by read, there is one complication - there are cases when we return all bytes of the last block but are not notified that this was the last block in the frame, this happens when the read buffer exactly coincide with the remainder in the frame, so we are notified that it's the last block on the next read from the native decompressor that processes 0 bytes. But we cannot return 0 from read in that case, as that would mean that the input stream has finished according to the InputStream protocol.

luben commented 11 months ago

@skyguard1, I looked at the source, and may be I can expose different read method, that can also return 0 read bytes and has notification about the last block in the frame. Are you willing to use that?

skyguard1 commented 11 months ago

Hi @luben Thanks for the reply I prefer to add method in Zstd, because developers using ZstdDecoder will process tens of thousands of requests and send millions of packets in a few minutes. Therefore, using ZstdInputStream will lead to frequent stream creation and closing, which will lead to a lot of unnecessary overhead (or the risk of memory leak if the stream is not closed when idle). In fact, ZstdDecoder also directly uses Zstd.decompress (dst, src) to decompress. I suggest adding a method to Zstd to determine whether it is a complete compressed data block (just like there is already a method Zstd. decompressedSize (src) to get the decompressed data block size). Is this possible? Thanks a lot

luben commented 11 months ago

decompressedSize will return the number from the frame header. If there are 2 frames in the buffer you give to Zstd.decompress it will fail as the output space will not be enough. I have to take a look to see if it's possible. Though I don't see how Zstd.decompress is cheaper than ZstdInputStream.

skyguard1 commented 11 months ago

Appreciate it. I don't know much about Zstd's data frame format, but I see that the block header has Last_Block and Block_Size for Compressed_Block in RFC-8478, maybe you can use this. In other open source implementations, I've seen Zstd used a lot more than ZstdInputStream, so I thought it would make sense to add this. Thanks a lot

luben commented 11 months ago

The block level info is not exposed by the zstd native library

skyguard1 commented 11 months ago

I submitted a issue for this: https://github.com/facebook/zstd/issues/3709

luben commented 11 months ago

Can you give me end-to-end description of the your setup and what you want to achieve?

skyguard1 commented 11 months ago

The scene is like this Client => ZstdEncoder => Network => ZstdDecoder => Server Step1: The client application writes data and compresses it with ZstdEncoder Step2: The client transmits the data packet through the network. Due to the network protocol, the data packet will be divided into multiple segments for transmission Step3: The data packet arrives at the host where the server is located and reaches the application layer after being unpacked by the transport layer Step4: ZstdDecoder read the data packet and decompresses it Step5: The server application gets the decompressed data The problem is step 4. When ZstdDecoder gets the data packet, it cannot judge whether the data packet is the last block, so it will report an error when decompressing if the data packet obtained is not complete. If it is another application, this will not be a problem, you can write the data block size in the encoder, and read it in the decoder. But netty is a framework. In order to be universal, it cannot define its own protocol, nor can it require developers to be compatible with its own protocol. That's the problem Please see: https://github.com/netty/netty/pull/10422 What I want to add is to add isLastBlock() or other methods that can determine the size of compressed data blocks in Zstd to solve the above problems Thanks

luben commented 11 months ago

can one segment contain end of one packet and beginning of next packet?

skyguard1 commented 11 months ago

Generally not. When a request arrives, a ChannelPipeline will be triggered. ChannelPipeline is a LinkedList, and will have multiple decoders, including ZstdDecoder. Generally, users will compress a piece of data and then send it. Therefore, netty needs to ensure that a compressed data block can be decompressed. Thanks

skyguard1 commented 11 months ago

The zstd library plan to add this feature in v1.5.6. I will follow this progress

luben commented 11 months ago

I don't think you need any knowledge about the Zstd blocks. I don't think you even need to know when one frame is finished.

Tell me something, at step 3 - how your network receiver knows it has received all segments belonging to a data packet, even without compression?

IMHO, you can use quite simple setup. Logically, you establish the connection (OutputStream and the sender side, InputStream on the receiver side). Then you wrap them in Zstd Output/Input streams. The decompressed payload goes to the protocol handler that stitches and splits segments into distinct data packets. This way you can reuse back-buffers inside Zstd native library and it will improve compression.

Take a look at the (Direct)Buffer classes - this may be easier to integrate in Netty than Input/Output streams.

skyguard1 commented 11 months ago

Thanks for the suggestion For uncompressed data, Netty provides several methods, such as FixedLengthFrameDecoder, LengthFieldBasedFrameDecoder, DelimiterBasedFrameDecoder, or users can inherit ByteToMessageDecoder to implement their own Decoder. For compressed data blocks, since the user's delimiter cannot be read, it is often necessary to judge how many bytes of data have been read during compression, and the order of these Decoders mentioned above is the data processed after compression, so the compression Decoder will be different. Thanks

luben commented 11 months ago

you can stuff the segments coming from the network in the decompressor input, take out the the decompressed data that is available and give it to the decoder - I don't think you need to know anything about the block and frame boundaries - decompressor is not replacement for decoder, you still need decoder. On the sender side, you just have to call flush after you write each data packet.

skyguard1 commented 11 months ago

I'm not sure if I understand your idea, but compression is usually the last step before network transmission, so ZstdDecoder is also the front of ChannelPipeline. Other decoders will get data after decompression by ZstdDecoder. Therefore, ZstdDecoder cannot use other decoders to decode. Thanks

luben commented 11 months ago

Yes, you need to run the Decompressor before any other decoder, Zstd does not need anything from the later (Decoder) stages. What do you think it need from them?

skyguard1 commented 11 months ago

I may have misunderstood your solution. I thought your solution was to use the Decoder to judge the integrity of the data block before decompression. How your network receiver knows it has received all segments belonging to a data packet, even without compression? In ZstdDecoder, or other compressed Decoders (zlib, lz4, snappy, brotli) are facing the same problem, I am using DirectBuffer for decompression, but I still do not understand how to judge the size of the compressed data block, except adding methods in Zstd , otherwise an Expcetion will be thrown when using Zstd.decompress(dst, src), which is the function I currently need.

skyguard1 commented 11 months ago

For example, java.util.zip.Inflater.finished() returns true if the end of the compressed data stream has been reached in jdk, which is what Netty uses when decompressing with Zlib. This is what I need.

luben commented 11 months ago

No, the decompressor does not know about packets, packets are decoded after it, in the Decoder that knows about packets.

My suggestion is to look into the streaming classes - they will decompress the network input and you can send the decompressed stream to the decoder (segmented as InputStream.read() may not return the whole packet). If you use DirectBuffers, take a look at the ZstdDirectBufferDecompressing(StreamNoFinalizer). You can have a buffer pool (say 16KB each buffer), you read from Zstd and send it to the decoder (I guess similarly how you probably have a buffer pool for reading from the network).

On the sender side, you should use also the streaming compression and call flush after the packet is written, so it goes over the wire. There is ZstdDirectBufferCompressingStream(NoFinalizer) if you work with DirectBuffers on the sender side also.

skyguard1 commented 11 months ago

Thanks for the suggestion However, ZstdEncoder has been released for more than 2 years. If ZstdDirectBufferCompressingStream is used instead, compatibility cannot be guaranteed. We need to ensure that the data blocks compressed by ZstdEncoder can be decompressed Please see: https://github.com/netty/netty/blob/4.1/codec/src/main/java/io/netty/handler/codec/compression/ZstdEncoder.java ZstdEncoder uses Zstd.compress(ByteBuffer dstBuf, ByteBuffer srcBuf, int level) for compression

skyguard1 commented 11 months ago

Since you have closed this https://github.com/luben/zstd-jni/issues/165, I am not sure whether you will agree, but I still hope you can consider returning a value less than 0 in ZstdDecompressCtx.decompress() when srcSize is not correct instead of throwing ZstdException, I know this is not the best solution, you could not modify decompress(), but add a new method and declare in the comment that this does not throw an exception when the compressed data package is incomplete. Of course, there is another solution, as you mentioned, refer to this https://github.com/luben/zstd-jni/issues/183, use the return value of read() of ZstdInputStream to judge whether the frame has been read, But I'm not sure whether the data block compressed with Zstd is supported, this needs to be verified

luben commented 11 months ago

But I'm not sure whether the data block compressed with Zstd is supported, this needs to be verified

Yes, they are completely inter-operable. On the sender side you can compress with Zstd.compress(...) and on the receiver side decompress using the streaming interfaces, and leave the packet reconstruction for the next decoder in the pipeline.

skyguard1 commented 11 months ago

When I use ZstdInputStream for decompression, when the compressed data block is incomplete, com.github.luben.zstd.ZstdInputStreamNoFinalizer.readInternal() throws ZstdIOException: Truncated source Code: https://github.com/luben/zstd-jni/blob/master/src/main/java/com/github/luben/zstd/ZstdInputStreamNoFinalizer.java#L177 Here is the test case:

protected void testDecompressionOfBatchedFlow(final ByteBuf expected, final ByteBuf data) throws Exception {
         final int compressedLength = data. readableBytes();
         int written = 0, length = rand. nextInt(100);
         while (written + length < compressedLength) {
             ByteBuf compressedBuf = data. retainedSlice(written, length);
             channel.writeInbound(compressedBuf);
             written += length;
             length = rand. nextInt(100);
         }
         ByteBuf compressedBuf = data. slice(written, compressedLength - written);
         assertTrue(channel. writeInbound(compressedBuf. retain()));

         ByteBuf decompressedBuf = readDecompressed(channel);
         assertEquals(expected, decompressedBuf);

         decompressedBuf. release();
         data. release();
     }

Decompression processing:

private void consumeAndDecompress(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws IOException {
         is = new ByteBufInputStream(in, false);
         zstdIs = new ZstdInputStream(is);
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         IOUtils. copy(zstdIs, bos);
         byte[] decompressed = bos.toByteArray();
         ByteBuf byteBuf = ctx.alloc().buffer(decompressed.length);
         byteBuf.writeBytes(decompressed, 0, decompressed.length);
         out. add(byteBuf);
     }

Ignoring netty-related bytebuf operations, the test case writes compressed data blocks in segments, uses ZstdInputStream to read them in ZstdDecoder, and writes decompressed data blocks into byte arrays and returns

luben commented 11 months ago

you can use zstdInputStream.setContinuous(true) - this way it will not throw but will return -1, next time you call .read() it will continue and return some bytes if there is new data coming from the upstream.

Also the idea for the Streams is that you can use them to decompress multiple data packets. Your example looks more like 'one shot' decompression that streams are not.

luben commented 11 months ago

what you want is something like:


class ... {
  in = new PipeInputStream()
  out = new PipeOutputStream(in)
  pool = BufferPool;

  void decompress(buf, output) {
    in.write_all(buf)
    chunk = pool.get()
    while out.read(chunk) > 0 {
       output.add(chunk);
      chunk = pool.get()
    } 
  }
}
skyguard1 commented 11 months ago

Do you plan to expose frameFinished in ZstdInputStream? When isContinuous is true, even if the frame is not finished, read() will still return -1. Provide a method like ZstdInputStream.isFrameFinished() to know whether the frame is finished when isContinuous is true

luben commented 11 months ago

why you need isFinished - if read() returns -1 it means that the frame is not finished

skyguard1 commented 11 months ago

When the data decompression is completed, ZstdInputStream.read() will return -1, so an indicator is needed to show whether the frame is finished when isContinuous is true(Incomplete compressed data blocks will also return -1) ,it can be distinguished whether the decompression is really completed.

luben commented 11 months ago

Why do you need to know if the frame is finished or not? Your decoder coming after the decompressor should be able to decode the data packet boundaries.

skyguard1 commented 11 months ago

ZstdDecoder needs to know when compression is completed. The current situation is like this

  1. The compressed data block is not transmitted in segments, for example, the data block is small, or the compressed data block is transmitted locally, ZstdInputStream.read() will return -1 when the decompression is completed
  2. Compressed data blocks are transmitted in segments, ZstdInputStream.read() will return -1 when isContinuous is true and more data is needed, but ZstdInputStream.read()will return -1 after decompression is completed

ZstdDecoder maintains a State internally. In fact, every netty compression decoder maintains a State. When the decompression is completed, the State will become finished. At this time, you can know that the decompressor has completed the decompression work. The return value of ZstdInputStream.read() cannot judge whether the decompression is completed when isContinuous is true

luben commented 11 months ago

The Zstd decompressor does not need to know where is data packet end - this is responsibility of the protocol decoder that comes after. You have connection with the sender and you continue receiving and decompressing the data that you receive. If the connection is closed, you know that there is nothing more to come and you can discard the decompressor after consuming all the data it produces.

skyguard1 commented 11 months ago

Thanks for the suggestion ZstdDecoder cannot judge whether the decompression is completed according to whether the connection is closed, for example, the connection may be closed abnormally. When decompression is completed, there is subsequent data coming, ZstdDecoder will skip these bytes instead of decompressing them. FINISHED means that the decompression is completed, and it does not mean that the connection is closed. From my point of view, it is suggested that ZstdInputStream.isFrameFinished() can be provided. Let the developer decide to judge whether the frame is finished.

luben commented 11 months ago

If the connection is close/aborted, where the subsequent data coming from? Show me a protocol that can continue one message over different connection?

ZstdInputStream.isFrameFinished() - one read may give you data from multiple frames. isFrameFinished may return false because one frame has finished but another frame has started.

skyguard1 commented 11 months ago

This involves netty's decoder design. Generally speaking, the user will write after compressing the data block, and netty's decoder will ensure that the compressed data can be decompressed. However, there will be users who write other data after writing compressed data blocks, we cannot rule out this situation. A decode() will only decompress a compressed data block in netty's decoder, so we have to consider the boundary situation, even if there are subsequent data, we can pass it to the next decoder to parse the data See: https://github.com/netty/netty/blob/4.1/codec/src/main/java/io/netty/handler/codec/compression/JZlibDecoder.java#L128 Thanks