ReactiveX / RxNetty

Reactive Extension (Rx) Adaptor for Netty
Apache License 2.0
1.38k stars 255 forks source link

RxNetty Http server does not always unsubscribe on client disconnect #582

Open karel1980 opened 7 years ago

karel1980 commented 7 years ago

We've noticed that in the http server examples a client disconnetion is not always detected. The observable passed to rxnetty is subscribed, but the subscription is not cancelled until our observable emits something. See this example:

/*
 * Copyright 2015 Netflix, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

package io.reactivex.netty.examples.http.sse;

import io.netty.buffer.ByteBuf;
import io.netty.handler.logging.LogLevel;
import io.reactivex.netty.examples.ExamplesEnvironment;
import io.reactivex.netty.protocol.http.server.HttpServer;
import rx.Observable;

import java.util.concurrent.TimeUnit;

import static io.reactivex.netty.protocol.http.sse.ServerSentEvent.*;

/**
 * An <a href="http://www.w3.org/TR/eventsource/">Server sent event</a> "Hello World" example server.
 *
 * This server send an infinite Server Sent Event stream for any request that it recieves. The infinite stream publishes
 * an event every 10 milliseconds, with the event data as "Interval => [count starting with 0]"
 */
public final class HelloSseServer {

    public static void main(final String[] args) {

        ExamplesEnvironment env = ExamplesEnvironment.newEnvironment(HelloSseServer.class);
        HttpServer<ByteBuf, ByteBuf> server;

        server = HttpServer.newServer(49499)
                           .enableWireLogging("sse-server", LogLevel.DEBUG)
                           .start((req, resp) ->
                                          resp.transformToServerSentEvents()
                                              .writeAndFlushOnEach(getInterval()
                                                                             .onBackpressureBuffer(10)
                                                                             .map(aLong -> withData(
                                                                                     "Interval => " + aLong)))
                           );

        /*Wait for shutdown if not called from the client (passed an arg)*/
        if (env.shouldWaitForShutdown(args)) {
            server.awaitShutdown();
        }

        /*If not waiting for shutdown, assign the ephemeral port used to a field so that it can be read and used by
        the caller, if any.*/
        env.registerServerAddress(server.getServerAddress());
    }

    private static Observable<Long> getInterval() {
        return Observable.interval(1, TimeUnit.SECONDS)
                .take(5)
                .concatWith(Observable.never())
                .doOnEach(notification -> log(notification, false))
                .doOnSubscribe(() -> log("subscribe", true))
                .doOnUnsubscribe(() -> log("unsubscribe", true));
    }

    private static void log(Object something, boolean dumpstacktrace) {
        System.out.println(Thread.currentThread().getName());
        System.out.println(" - " + something);

        if (dumpstacktrace) {
            RuntimeException runtimeException = new RuntimeException();
            runtimeException.fillInStackTrace();
            runtimeException.printStackTrace();
        }
    }
}

In getInterval() the observable sends 5 values, then hangs with Observable.never() If a client connects, and disconnects before the 5th value is emitted we can see the unsubscribe being logged. If a client connects but doesn't disconnect until after the 5th value, then the unsubscribe never happens.

karel1980 commented 7 years ago

@jamesgorman2 do you need more information to get repro on this? I could try to turn this into a unit test if that would help?

jamesgorman2 commented 7 years ago

That'd be great @karel1980