ReactiveX / RxNetty

Reactive Extension (Rx) Adaptor for Netty
Apache License 2.0
1.38k stars 254 forks source link

What could cause the RxNetty client to return an empty stream ? #511

Open Crystark opened 8 years ago

Crystark commented 8 years ago

I'm using a fairly simple RxNetty client:

    public Service(String host, int port, int maxConnection) {
        endpoint = RxNetty
            .<ByteBuf, ByteBuf> newHttpClientBuilder(host, port)
            .withMaxConnections(maxConnection)
            .build();
    }

    public Observable<Response> submit(Observable<Request> o) {
        return o
            .flatMap(br -> endpoint.submit(HttpClientRequest.createPost(br.uri()).withContent(br.bytes())))
            .flatMap(r -> r.getContent()
                .doOnNext(bb -> bb.retain().touch("ByteBuf from proxy response"))
                .switchIfEmpty(Observable.just(null))
                .map(c -> new Response(r.getStatus(), c, r.getHeaders().entries()))
            );
    }

The thing is, I often get an empty observable from endpoint.submit and i'm not sure what could be causing that. I expect it to always return a HttpClientResponse<ByteBuf>.

What may cause it to return an empty stream ?

Note: I checked and my input Observable (o) is never empty.

NiteshKant commented 8 years ago

I expect it to always return a HttpClientResponse

Yes, HttpClient.submit() should either give a response or error, but it should not be empty. Is this reproducible?

Crystark commented 8 years ago

No, I haven't been able to reproduce it yet, only observe it in production. It isn't recent. I've tried switching to previous RxNetty version and still have the same behavior.

Crystark commented 8 years ago

I currently fixed it using switchIfEmpty(DEFAULT_RESPONSE.doOnNext(r -> metricFallback.increment())) so I can track how often it happens. Visually, we oberved approximatly 5 of those per second for 1500QPS so that seems rare.

Just so you know, I've changed a bit the code to make sure:

    public Observable<Response> submit(Observable<Request> o) {
        return o
            .switchIfEmpty(Observable.<Request> empty().doOnCompleted(() -> L.warn("BEFORE")))
            .flatMap(br -> endpoint.submit(HttpClientRequest.createPost(br.uri()).withContent(br.bytes()))
                .switchIfEmpty(Observable.<HttpClientResponse<ByteBuf>> empty().doOnCompleted(() -> L.warn("INSIDE"))))
            .switchIfEmpty(Observable.<HttpClientResponse<ByteBuf>> empty().doOnCompleted(() -> L.warn("AFTER")))
            .flatMap(r -> r.getContent()
                .doOnNext(bb -> bb.retain().touch("ByteBuf from proxy response"))
                .switchIfEmpty(Observable.just(null))
                .map(c -> new Response(r.getStatus(), c, r.getHeaders().entries()))
            );
    }

This always shows

INSIDE
AFTER
INSIDE
AFTER
INSIDE
AFTER
Crystark commented 8 years ago

Also FYI, this is the same RxNetty client that has the pool exhausted problem (#503)

Crystark commented 8 years ago

I haven't had time to spend on this matter lately, but i just thought i'd update this thread with my observed metrics. I get those empty observable about 0.1% of the time (50 for 50k QPS).

guptaabhiishek commented 5 years ago

We have changed our http client to use rxnetty. Intermittently we get Observable.empty response. Do you still see this?

jamesgorman2 commented 5 years ago

Hi Abhishek, two questions :

Do you have confirmation the server is sending bytes? Do you have reproduction code?

guptaabhiishek commented 5 years ago

Hi Abhishek, two questions : Do you have confirmation the server is sending bytes? Do you have reproduction code? I am afraid I cann't share code As this is against or company policy. This behavior is only when I run tests in CI(jenkins) then randomly client Observable returns empty however server is still processing the request which i could see in the server log. I cann't reproduce on my local as its totally random on any of our REST api call.