spring-projects / spring-boot

Spring Boot
https://spring.io/projects/spring-boot
Apache License 2.0
74.7k stars 40.58k forks source link

SseEmitter onComplete/onError not always called #12321

Closed Oduig closed 6 years ago

Oduig commented 6 years ago

Problem

When using SseEmitter and closing tabs in the browser, the onComplete and onError callbacks are not called for every subscription.

Code to subscribe:

    SseEmitter emitter = new SseEmitter(sseTimeoutMs);

    Consumer<String> subscription = message -> {
      SseEventBuilder event = SseEmitter.event().name("message").data(message);
      trySend(emitter, event);
    };

    subscriptions.add(subscription);
    System.out.println("Subscription added: there are " + subscriptions.size() + " subscribers");

    emitter.onCompletion(() -> {
      subscriptions.remove(subscription);
      System.out.println("Subscription completed: there are " + subscriptions.size() + " subscribers");
    });
    emitter.onError(error -> {
      subscriptions.remove(subscription);
      System.out.println("Subscription crashed: there are " + subscriptions.size() + " subscribers");
    });
    emitter.onTimeout(() -> {
      subscriptions.remove(subscription);
      System.out.println("Subscription timed out: there are " + subscriptions.size() + " subscribers");
    });

Note that an SSE stream is not supposed to know when it is terminated. Only when a message fails to be sent, can the stream be closed. This is what I tried to do.

  private void trySend(SseEmitter emitter, SseEmitter.SseEventBuilder event) {
    try {
      emitter.send(event);
    } catch (Exception ex) {
      // This is normal behavior when a client disconnects.
      try {
        emitter.completeWithError(ex);
        System.out.println("Marked SseEmitter as complete with an error.");
      } catch (Exception completionException) {
        System.out.println("Failed to mark SseEmitter as complete on error.");
      }
    }
  }

See below for the output log. I observe that when a send call fails, the SseEmitter is not terminated correctly. In addition, it's not possible to mark it as terminated by hand. The completeWithError call succeeds in one case and fails in another, but neither call seem to register with the onComplete or onError listeners.

Is this a bug? What is the proper way to implement an SseEmitter?

Steps to reproduce

  1. Clone the git repo below
  2. Start the application
  3. Open localhost:8088 in four tabs, then close the first three
  4. After the tab is complete (1 minute), close the last tab
  5. Log should indicate the following
Subscription added: there are 1 subscribers
Subscription added: there are 2 subscribers
Subscription added: there are 3 subscribers
Subscription added: there are 4 subscribers
Marked SseEmitter as complete with an error.
Marked SseEmitter as complete with an error.
Failed to mark SseEmitter as complete on error.
Subscription added: there are 5 subscribers
Subscription added: there are 6 subscribers
Subscription completed: there are 5 subscribers
Subscription completed: there are 4 subscribers
Subscription completed: there are 3 subscribers
Subscription timed out: there are 2 subscribers
Subscription completed: there are 2 subscribers

Resources

philwebb commented 6 years ago

The SseEmitter is part of the Spring Framework. Spring Boot just exposes support. The Spring Framework issue tracker is here