ReactiveX / RxNetty

Reactive Extension (Rx) Adaptor for Netty
Apache License 2.0
1.38k stars 254 forks source link

Decompressing Responses... #573

Open ambiorix2099 opened 7 years ago

ambiorix2099 commented 7 years ago

First off, a million thanks for creating and maintaining RxNetty.

I'm using RxNetty version 0.5.2-rc.5 and am looking for an idiomatic way to decompress an HttpClientResponse<ByteBuf>. It seems like a typical use case, yet I haven't been able to find any clear examples. I've tried simply adding a channel handler via the addChannelHandlerLast method.

I have also tried adding a transformer via transformResponseContent, see below for the util class I created to return the transformer. The problem with it is that it only works for responses that happen to entirely fit and are sent in one packet. I'm guessing it fails because the ByteBuf I'm reading into the GzipInputStream is not chunked at the expected gzip byte boundaries? (if that made sense). In any case, I would greatly appreciate any help. Thanks.

public class RxNettyUtils {
    private RxNettyUtils() {}

    public static Observable.Transformer<ByteBuf, String> decompressingDecodingBytBufTransformer(Charset charset) {
        return byteBufObservable -> byteBufObservable.flatMap(byteBuf -> {
            try (
                    ByteBufInputStream byteBufStream = new ByteBufInputStream(byteBuf);
                    GZIPInputStream gzipByteStream = new GZIPInputStream(byteBufStream);
                    InputStreamReader characterStream = new InputStreamReader(gzipByteStream, charset);
            ) {
                return StringObservable.byLine(Observable.just(CharStreams.toString(characterStream)));
            } catch (Exception e) {
                return Observable.error(e);
            }
        });
    }

    public static SSLEngine newSslEngine(String host, int port) {
        try {
            final SSLEngine sslEngine = SSLContext.getDefault().createSSLEngine(host, port);
            sslEngine.setUseClientMode(true);
            return sslEngine;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}
brharrington commented 7 years ago

I haven't tried in 0.5.x, but in 0.4.x you can just use the HttpContentDecompressor provided by netty. For example:

https://github.com/Netflix/iep/blob/master/iep-rxhttp/src/main/java/com/netflix/iep/http/RxHttp.java#L579

I'm guessing something similar would work in 0.5.x.

ambiorix2099 commented 7 years ago

Wow! Thanks for the quick response.

I may well have to fall back to 0.4.x, but 0.5.x allows you to add a channel handler via the HttpClient's fluent builder, via pipelineConfigurator, I forgot to mention in my original post that I had already tried something like this:

.pipelineConfigurator(p -> p.addAfter("_rx_netty_http-client-codec", "decompressor", new HttpContentDecompressor()))

I see a bunch of other named and unnamed channel handlers scroll up from the DEBUG logs. The above was my attempt at putting the HttpContentDecompressor after the one named "_rx_netty_http-client-codec", because someone in another conversation thread mentioned that decompression should come after that handler.

The only reason I'm hoping to stick with 0.5.2.-rc.5 is becuase I had already figured out how to perform and SSL request, by simply passing in a new SsLEngine to the fluent builder's secure method. Doing the same with 0.4.x seemed not as straightforward.

If you have a good example of performing a 0.4.x request with BOTH secure HTTP and Decompression, I would gladly cut over to 0.4.x.

Again, my Googling skills may be betraying at the moment, but it seems like I've looked everyone for an example of what I'm trying to do here. Am trying to avoid a super deep dive into the source code.

jamesgorman2 commented 7 years ago

Yep, the problem is to transform the event stream you need to use the same GZIPInputStream across events so that it knows to concatenate the bytes. The trick is making sure you pass the same thing in and release everything at the end (eg using Observable.using). We need something like this for streaming data in the very near future, so I can post a snippet or gist when I have something for that. (see also #411).

The quick way if you want the whole content together (rather than as a stream) is to do:

request.content()
  .compose(io.reactivex.netty.util.CollectBytes.all())
  .compose(decompressingDecodingBytBufTransformer(...))

with the caveat that decompressingDecodingBytBufTransformer should release the ByteBuf to avoid mem leaks:

public static Observable.Transformer<ByteBuf, String> decompressingDecodingBytBufTransformer(Charset charset) {
        return byteBufObservable -> byteBufObservable.flatMap(byteBuf -> {
            try (
                    ByteBufInputStream byteBufStream = new ByteBufInputStream(byteBuf);
                    GZIPInputStream gzipByteStream = new GZIPInputStream(byteBufStream);
                    InputStreamReader characterStream = new InputStreamReader(gzipByteStream, charset);
            ) {
                String contentAsString = CharStreams.toString(characterStream);
                byteBuf.release()
                return StringObservable.byLine(Observable.just(contentA));
            } catch (Exception e) {
                return Observable.error(e);
            }
        });
    }
ambiorix2099 commented 7 years ago

@jamesgorman2

It worked! I'm so happy, I literally feel like crying. You guys are awesome!

ambiorix2099 commented 7 years ago

@jamesgorman2 Just wanted to thank you again for pointing me down the right path.

I'm not new to programming, but I'm new to RxJava and wrapping my head around stream processing. The compose and using operators are not covered in most of the Rx tutorials you see out there :-)

I'll be taking a stab at processing the gzip content I'm attempting to retrieve, as a stream. I have a good idea now of how I might do it with the using operator. Once I get something working, and it's not too naive, I'll share it as a gist.

On another note: Your workaround works, but unfortunately and as you alluded to, I am unable to begin emitting any items into a downstream observable until ALL the bytes are fetched into the upstream ByteBuf. Moreover, I see the bytes being quickly fetched, but then it takes forever to emit items demarcated by line endings. I understand this probably has more to do with what is happening in StringObservable.byLine.

I'm saying all this to ask this: Is streaming through bytes, for the purpose of emitting observable items really what I need to be doing? I'm drawn to its elegance but, at least in this instance and performance-wise, it seems like I'm much better off with simply downloading the entire file, then reading it line-by-line, the traditional way. From there, I guess, I could get back to the business of emitting the file lines as observable items.

EDIT: Yeah... I ditched StringObservable re-chunking streams mid-flight doesn't seem to scale well and really not necessary when you know the number of bytes before-hand.

jamesgorman2 commented 7 years ago

no worries @ambiorix2099. My usual consideration for deciding to collect vs chunk are: (a) can I reduce my time to first byte for the caller? and (b) can I reduce the amount of stuff I'm storing in memory at any one time?

In our code base we spend a lot of time dealing with JSON. For some calls we need the whole payload so just collect everything up and parse it at the end (also where it's small so it's not worth the bother of stream processing). For others we are being sent a large stream of data and don't want all that data hanging around in memory (usually its small but sometimes can be 100s of MB per client total). We also want want to be able to ACK our writes as soon as they happen - this means if there is a failure the client will have the best possible data about what we have ACK'd and will have to resend the least amount of data (and this is for sending data over the open internet, so it's when not if really).

To do this we use rxjava-json to stream objects from within a JSON document. We then write to disk stream-wise and start responding as soon as our writes are complete (or fail). This means in many cases the client will be receiving their ACKs before they have finished sending. We're looking to move to MessagePack+zip for better compression in the near future, but the way it works will stay the same.

We also use this principle for GETs, using tiny-rxjava-jdbc to read our DBs and stream rows into rxjava-json so we can get bytes downstream faster and store fewer rows of DB response (and handle backpressure).[1]

A lot of our assumptions are based on bursty traffic, so we want to reduce the risk of a memory blowout. If your likely load profile is much smoother (and you don't need to look to cutting time to first byte) then you don't need to worry about it.

On a more general question of style, my only other recommendation would be to take the conversion to String and breaking by line out of your decompressor. That way you can swap a collecting vs a chunking decompressor without changing the surrounding code (and reuse it elsewhere).

request.content()
  .compose(decompressingDecodingBytBufTransformer())
  .compose(bs -> StringObservable.decode(bs, charset)
  .compose(StringObservable::byLine);

    public static Observable.Transformer<ByteBuf, byte[]> decompressingDecodingBytBufTransformer(Charset charset) {
        return byteBufObservable ->
            byteBufObservable.compose(CollectBytes.all())
            .flatMap(byteBuf -> {
            try (
                    ByteBufInputStream byteBufStream = new ByteBufInputStream(byteBuf);
                    GZIPInputStream gzipByteStream = new GZIPInputStream(byteBufStream);
            ) {
                byte[] contentAsString = gzipByteStream.somethingToGetCurrentlyBufferedBytes();
                byteBuf.release()
                return contentAsString;
            } catch (Exception e) {
                return Observable.error(e);
            }
        });
    }

[1] you can also use rxjava-jdbc which inspired our code. The main difference is you get ORM vs transaction handling.

NiteshKant commented 7 years ago

@ambiorix2099 can this be closed?