quarkusio / quarkus

Quarkus: Supersonic Subatomic Java.
https://quarkus.io
Apache License 2.0
13.58k stars 2.63k forks source link

Server-Sent Events not working correctly when quarkus-undertow used #32352

Closed wojciechkopciewicz closed 1 year ago

wojciechkopciewicz commented 1 year ago

Describe the bug

It looks like when quarkus-undertow is used, SseEventSink object does not reflect current sink state - e.g. when source is closed, the sink should reflect it.

Expected behavior

SseEventSink.isClosed method should return true when SseEventSource is closed.

Actual behavior

SseEventSink.isClosed method returns false when SseEventSource is closed.

How to Reproduce?

Example test

Output of uname -a or ver

n/a

Output of java -version

openjdk version "17.0.5" 2022-10-18

GraalVM version (if different from Java)

No response

Quarkus version or git rev

2.16.5.Final (seen from 2.12.0)

Build tool (ie. output of mvnw --version or gradlew --version)

No response

Additional information

No response

hc42 commented 1 year ago

Since this is still an issue here is a dirty workaround. Not a masterpiece but maybe this is also helpful for tracking the original problem. (Tested with quarkus-resteasy 2.16.7 (classic / non-reactive))

class Endpoint... {

    // Demo endpoint method

    @GET
    @Path("....")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    @SseElementType(MediaType.APPLICATION_JSON)
    public Multi<DataEventJson> geData(
       ...params.... ,
        @Context HttpServletRequest httpRequest) {

      Multi<DataEventJson> eventStream = controller.getData(....params) ;

      // Working with quarkus 2.11.x : return multi;
      return registerConnectionCloseHandler(multi, httpRequest);
   }  

    /**
     * Workaround for Quarkus 2.16.7 - SSE connection close
     *
     * @param multi to unsubscribe on HTTP connection close
     * @param httpRequest Quarkus injected HttpServletRequest to listen for close
     * @return multi with unsubscribe listener
     * @throws RuntimeException if unexpected HttpServletRequest implementation
     */
    private <T> Multi<T> registerConnectionCloseHandler(final Multi<T> multi, final HttpServletRequest httpRequest) {

        // httpRequest is Quarkus injected proxy object
        // 1. do ->getAsyncContext()->getRequest() to unwrap proxy
        // 2. cast to HttpServletRequestImpl to access HttpServerExchange
        // 3. register custom connection completion handler
        final HttpServerExchange exchange = Optional.ofNullable(httpRequest)
                .map(ServletRequest::getAsyncContext)
                .map(AsyncContext::getRequest)
                .filter(HttpServletRequestImpl.class::isInstance)
                .map(HttpServletRequestImpl.class::cast)
                .map(HttpServletRequestImpl::getExchange)
                .orElseThrow(() -> new RuntimeException("Unexpected httpRequest type " +
                        Optional.ofNullable(httpRequest).map(Object::getClass).orElse(null)));

        final Set<Subscription> subscribers = new HashSet<>();
        exchange.addExchangeCompleteListener(e -> subscribers.forEach(Subscription::cancel));

        return multi.onSubscription().invoke(subscribers::add);
    }

}
geoand commented 1 year ago

I assume this problem does not exist when using RESTEasy Reactive, can anyone confirm?

geoand commented 1 year ago

Closing for lack of feedback

SHSolution commented 1 year ago

I do also have this issue with Quarkus 2.16.7.Final. I've seen a lot of posts and issues meanwhile about SseEventSinks never reaching isClosed() == true state, but never was solved. There were 2 commits addressing that problem and they are also included in 2.16.7.Final (actually earlier already), but the problem still exists. Or it might be another problem leading to the same behavior. I cannot understand how it cannot be recognized that writing to a not anymore existing client will lead to an exception somewhere and gets reported/notified upwards. I have the impression SSE is used very rarely - otherwise I don't know why this problem is still existing... Is there a chance that that gets finally solved in the very near future? Let me know what you need to be able to fix it.

SHSolution commented 1 year ago

Meanwhile, thanks to @hc42 for his workaround. I used this as an inspiration to react on the close/loss of connection from client side. Which led to this:

[io.qua.htt.access-log] (vert.x-eventloop-thread-1) [26831ms] 192.168.192.1 - - "GET /some/uri/sse/subscribe HTTP/1.1" 200 131 "-" "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36"
2023-09-22 11:00:20,089 INFO  [My ExchangeCompleteListenerClass] (vert.x-eventloop-thread-1) Received close request for subscription with id: 1aba9684-a02a-417b-8f0d-645db3912764

That enables me to remove dropped/lost subscriptions from my list, but would prefer that this would be handled how it's meant to be.

PrimevalCoder commented 7 months ago

I assume this problem does not exist when using RESTEasy Reactive, can anyone confirm?

That is right, this happens only with RESTEasy Classic. The issue still persists in Quarkus 3.6.5 and can be reproduced as follows:

  1. Create an SSE back-end resource like so:

    @Path("/greeting")
    public class EventResource {
    
    @Inject
    GreetingService greetingService;
    
    @GET
    @Produces(MediaType.SERVER_SENT_EVENTS)
    @Path("{name}")
    public void sendGreeting(@PathParam("name") String name, @Context SseEventSink sink, @Context
    Sse sse) {
        greetingService.registerSink(name, sink);
    }
    }
  2. On the front-end, bind an EventSource to the SSE endpoint (in this example, you can bind to /greeting/John and /greeting/Lisa). Assume GreetingService will store the sinks for later usage, for example into an ApplicationScoped bean.
  3. Call .close() on one of the EventSources on the front-end (or close the browser tab). You can, for instance, have a button to call this function.
  4. Grab the corresponding SseEventSink on the back-end and check its .isClosed() value. It will return false.

And indeed, this doesn't occur with RESTEasy Reactive. With that, .isClosed() returns true as expected. In this example I used a browser client (web page with Javascript/Angular), but feel free to try other methods as well.