aesteve / vertx-sse

Add support for Server-Sent-Events in Vert.x Web
Apache License 2.0
47 stars 13 forks source link

IllegalStateException when client closes connection #9

Open junglie85 opened 7 years ago

junglie85 commented 7 years ago

Hopefully you're open to being asked questions in the issues? Really enjoying this library, but I'm trying to figure out how to gracefully deal with when the remote client disconnects, rather than the following exception:

java.lang.IllegalStateException: Response has already been written
    at io.vertx.core.http.impl.HttpServerResponseImpl.checkWritten(HttpServerResponseImpl.java:588)
    at io.vertx.core.http.impl.HttpServerResponseImpl.write(HttpServerResponseImpl.java:613)
    at io.vertx.core.http.impl.HttpServerResponseImpl.write(HttpServerResponseImpl.java:296)
    at io.vertx.core.http.impl.HttpServerResponseImpl.write(HttpServerResponseImpl.java:55)
    at io.vertx.ext.web.handler.sse.impl.SSEConnectionImpl.writeHeader(SSEConnectionImpl.java:143)
    at io.vertx.ext.web.handler.sse.impl.SSEConnectionImpl.withHeader(SSEConnectionImpl.java:131)
    at io.vertx.ext.web.handler.sse.impl.SSEConnectionImpl.id(SSEConnectionImpl.java:101)
    at uk.ashleybye.rxweb.verticles.ApiVerticle$testSse$1$1.accept(ApiVerticle.kt:71)
    at uk.ashleybye.rxweb.verticles.ApiVerticle$testSse$1$1.accept(ApiVerticle.kt:20)
    at io.reactivex.internal.observers.LambdaObserver.onNext(LambdaObserver.java:60)
    at io.reactivex.internal.operators.observable.ObservableObserveOn$ObserveOnObserver.drainNormal(ObservableObserveOn.java:200)
    at io.reactivex.internal.operators.observable.ObservableObserveOn$ObserveOnObserver.run(ObservableObserveOn.java:252)
    at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:61)
    at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:52)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)

I can't figure out from the tests how you're going about it. Perhaps I should use EventSource rather than just push data from the Observable directly? But that still seems like it would not affect the client connection.

Anyway, this is my rather noddy test example (excuse Kotlin vice Java, but it should be reasonably clear what's going on):

private fun testSse(): SSEHandler {
    val sse = SSEHandler.create()

    sse.connectHandler { connection ->
        val address = connection.request().connection().remoteAddress()

        Observable
                .interval(0, 1000, TimeUnit.MILLISECONDS)
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.io())
                .subscribe(
                        { value ->
                            val dateTime = LocalDateTime.now()
                            connection.id("$value", "$address")
                            connection.data("${connection.lastId()} | " +
                                    "${dateTime.hour}:" +
                                    "${dateTime.minute}:" +
                                    "${dateTime.second}." +
                                    "${dateTime.nano}")
                        },
                        { error ->
                            error.printStackTrace()
                        },
                        {
                            sse.closeHandler { sseConnection ->
                                sseConnection?.close()
                            }
                        }
                )
    }

    return sse
}

As an aside (probably another question really), why does connection.id(...) send the details to the client? And why does lastId() return null?

http localhost:8080/sse Accept:text/event-stream --stream
HTTP/1.1 200 OK
Cache-Control: no-cache
Connection: keep-alive
Content-Type: text/event-stream
Transfer-Encoding: chunked

id: 0
data: 0:0:0:0:0:0:0:1:60576

data: null | 7:31:37.835000000

id: 1
data: 0:0:0:0:0:0:0:1:60576

data: null | 7:31:38.826000000
aesteve commented 7 years ago

Hopefully you're open to being asked questions in the issues?

Absolutely, sorry for being so late to answer.

The main idea for me too look at the issue would be to try to reproduce it without the Observable.

What I'd like to try so far would be a simple :

sse.closeHandler { sseConnection ->
    sseConnection?.close()
}

And I think it could reproduce the issue.

Simply because in these few lines of codes, it tries to close a connection that has already been closed by the client, hence the IllegalStateException.

But I'd need to reproduce the issue, in Java, without any fancy Observable involved. If you have time to do so, that'd be awesome, else I'll try to check by myself when I have the time.

For the lastId stuff, same thing, I'd need to reproduce the issue in an unit test to give it a proper look.

ctranxuan commented 6 years ago

lastId is null because you have to pass it as header:

curl -v "http://localhost:8080/sse" -H "Last-Event-ID: 2"

it's the last id the client has received. If the client wants to reconnect to get the missing events, it has to reconnect with it. Otherwise, the client will get the current event and in this case, the lastEventId is null.