ralscha / sse-eventbus

EventBus library for sending events from a Spring appliction to the web browser with SSE
Apache License 2.0
79 stars 26 forks source link

SSEEmitter object - not destroyed #19

Closed kaushikranjan closed 3 years ago

kaushikranjan commented 3 years ago

After an SSEEmitter object is instantiated, if during .send(), there is a client failure subsequent complete() or completeWithError() functions are irrelevant. Even if the SSEEmitter objects are removed from the Map (which is used to keep track of active SSEEmitter's) i.e. the object should be destroyed (in theory - because of no reference), the onTimeout functionality is still triggered after the configured timeout.

Is this the expected behaviour?

ralscha commented 3 years ago

There is not really an expected behavior. What you are describing sounds like a bug. If I understand you correctly the library should destroy the Emitter object when it removes it from the map in the unregisterClient method.

Something like this?

    public void unregisterClient(String clientId) {
        unsubscribeFromAllEvents(clientId);
        Client client = this.clients.remove(clientId);
        if (client != null) {
                    client.sseEmitter().complete......
                }
    }
kaushikranjan commented 3 years ago

Exactly. I was facing this issue with my own implementation for managing SseEmitter's using ConcurrentHashMap. After struggling a lot to find the root cause, I used your implementation of sse-eventbus. But, the problem persists even when using sse-eventbus.

Can you give it a shot?

Steps to reproduce :

  1. Set the timeout for SseEmitter instance creation at 120_000L
  2. Create a executor to publish events every 15_000L.
  3. Subscribe to the emitter end-point via an EventSource and then disconnect the client. emitter.send - will fail to execute, post which we catch the exception and remove it from the list of clients.

After 120 seconds, you will see the SseEmitter.onTimeout() get invoked, even though you have removed it from the list of clients. (which theoretically should not happen)

ralscha commented 3 years ago

Thanks for the info. I will take a look

ralscha commented 3 years ago

I can't reproduce it with Spring Boot 1.5.22 and Tomcat. What HTTP server do you use? Tomcat, Jetty or Undertow? What Spring Boot version?

ralscha commented 3 years ago

I created a Git repository with a demo application so we can investigate this issue:
https://github.com/ralscha/sse-eventbus-issue

kaushikranjan commented 3 years ago

Thanks for reverting back. Yeah, I am not able to reproduce it in your code, which is very strange :)

I am on spring boot 2.3.3.RELEASE. Migrating to 2.4.2 in a couple of days. HTTP Server - jetty.

Let me see if the migration helps. I will keep this thread open, so that we can connect on the next steps, if I am still stuck

kaushikranjan commented 3 years ago

I tried to run sse-event-bus on your project as well as mine. Refactored your code to point to

java : 1.8 sprint-boot-version : 2.3.3 spring-version: 5.2.10.RELEASE

It doesn't work for me. The only difference is, your code runs on tomcat, while mine is on jetty

kaushikranjan commented 3 years ago
<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-tomcat</artifactId>
                </exclusion>
            </exclusions>
</dependency>
<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jetty</artifactId>
</dependency>

I am able to reproduce the issue. Switched to jetty server.

ralscha commented 3 years ago

Undertow works the same as Jetty.

I tried to solve it by calling the complete() method.

    public void unregisterClient(String clientId) {
        unsubscribeFromAllEvents(clientId);
        Client client = this.clients.remove(clientId);
               client.sseEmitter().complete();
    }

This does not work because when you look at the source code of the complete() method in the Spring framework you see that the method does nothing when the previous transfer failed.

    public synchronized void complete() {
        // Ignore, after send failure
        if (this.sendFailed) {
            return;
        }
        this.complete = true;
        if (this.handler != null) {
            this.handler.complete();
        }
    }

Another solution that works is overwriting the callbacks Although I'm not sure if that is a good idea.

    public void unregisterClient(String clientId) {
        unsubscribeFromAllEvents(clientId);
        Client client = this.clients.remove(clientId);
        client.sseEmitter().onCompletion(() -> {});
        client.sseEmitter().onTimeout(() -> {});
    }

There is a listener that is called each time the bus removes a stale client. You only have to implement it and configure it via SseEventBusConfigurer

SseEventBusListener.afterClientsUnregistered(String clientId)
kaushikranjan commented 3 years ago

Do you have any idea why SseEmitter behaves differently in tomcat vs jetty?

ralscha commented 3 years ago

No idea. This is a class from the Spring Framework and is manged by it and the underlying HTTP server.

Would it help when I add a new listener method, that receives the whole client object.

SseEventBusListener.afterClientsUnregistered(Set<Client> clients)

Then in your code you could listen for this event and for example overwrite the timeout handler

@Configuration
public class SseEventBusConfiguration implements SseEventBusConfigurer {

  @Override
  public SseEventBusListener listener() {
    return new SseEventBusListener() {
      @Override
      public void afterClientsUnregistered(Set<Client> clients) {
        for (Client client : clients) {
           //do something here
        }
      }
    };
  }
}

These are interface default methods so it's easy to add without breaking existing code.

kaushikranjan commented 3 years ago

It's ok ralscha. I have decided to migrate to tomcat to fix this bug. Did some digging and it looks like it's the way servlet's work out of the box in tomcat vs jetty. Jetty doesn't monitor client unavailability when send() fails, unlike tomcat.

kaushikranjan commented 3 years ago

@ralscha - Thank you for your time and efforts, helping me identify the issue.