spring-projects / spring-framework

Spring Framework
https://spring.io/projects/spring-framework
Apache License 2.0
56.32k stars 38.01k forks source link

Provide a way to customize ResponseBodyEmitter's earlySendAttempts behaviour #26267

Closed celcius112 closed 3 years ago

celcius112 commented 3 years ago

Hello everyone,

I'm having a small feature request in a endpoint using a SseEmitter, with an implementation looking like so:

    @GetMapping("/poll")
    public SseEmitter poll() {
        var sseEmitter = myService.registerAndStartPolling();

        myService.getOrderedListOfSomething()
                .forEach(something -> sseEmitter.send(something));

        return sseEmitter;
    }

In which I create a SseEmitter in order to start polling early, but where I still need to emit a few early values. However, I would like these early values to be ultimately emitted before the ones retrieved in the early polling. In my case this could be achieved easily by comparing a certain value (the creation date time).

ResponseBodyEmitter's earlySendAttempts is however a first-come-first-emitted Set by default. In my case, I could replace it with a TreeSet for instance, or use an overridable preProcessEarlySendAttempts(Set<DataWithMediaType> earlySendAttempts) that would be called in ResponseBodyEmitter#initialize

rstoyanchev commented 3 years ago

Can't you emit those values before the ones from the service?

    @GetMapping("/poll")
    public SseEmitter poll() {
        var sseEmitter = myService.registerAndStartPolling();

        sseEmitter.send(...)
        sseEmitter.send(...)
        sseEmitter.send(...)

        myService.getOrderedListOfSomething()
                .forEach(something -> sseEmitter.send(something));

        return sseEmitter;
    }
celcius112 commented 3 years ago

Hello and thank you for the answer. Actually the registerAndStartPolling subscribes to a topic from a pubsub service, in which received values are emitted. As such we do not have a hand on when values could arrive and be emitted. Hence it is entirely possible that values are emitted before calling getOrderedListOfSomething

rstoyanchev commented 3 years ago

Sure but why can't those elements be pushed before others? I can imagine a number of ways. For example have the SseEmitter created outside and passed in, or pass those initial elements in to the service so it can insert them, or make the service be deferred and expose a start method.

celcius112 commented 3 years ago

You are totally right with these solutions. The fault is in me for not providing enough functional feedback. The main issue with getOrderedListOfSomething is that it is relatively "slow", and while we would like the subscription to be registered as soon as possible in the request, we would also like the result of getOrderedListOfSomething to be emitted before the published result. Since we want the polling to start as soon as possible, and since getOrderedListOfSomething is quite slow, we cannot call it before creating the emitter and subscribing it to a topic.

With a flatten view it would look like this

    @GetMapping("/poll")
    public SseEmitter poll() {
        var sseEmitter = new SseEmitter();
        var subscriber = new Subscriber();
        subscriber.onMessage(sseEmitter::send);

        // subscribe very early
        container.addSubscriber(subscriber);

        // this is long, but at least we are already subscribed
        myService.getOrderedListOfSomething()
                .forEach(sseEmitter::send);

        return sseEmitter;
    }

There are of course other solutions, ones that would require a bit more work on our side. Having a hand on the earlySendAttempts was however the easiest and most straight-forward solution (laziness is a virtue !). We would not complain if the feature request was rejected :)

rstoyanchev commented 3 years ago

Okay I think I understand better. If I can summarize you want to do 2 things in parallel: a) obtain items from a slow service and stream those to the response first, and b) start polling for remote messages but stream those to the response second.

I don't think this can be solved with a sorted Set or a protected method. Something needs to ensure the results from (a) become available first before anything can be streamed. For example we might have some perfectly ordered items in the sorted Set but we can stream yet until the results of (a) are also added

You can do this sort of synchronization and ordering in your own code. That's probably what you meant by "more work on our side". For what it's worth, this sort of concurrent code and ordering is very well suited for the declarative style.

For example this:

@GetMapping("/poll")
public Flux<Foo> poll() {
    // ... myService, container created somehow

    Flux<Foo> flux1 = Mono.fromCallable(myService::getOrderedListOfSomething)
            .subscribeOn(Schedulers.boundedElastic()) // perform the slow call concurrently
            .flatMapIterable(Function.identity());

    Flux<Foo> flux2 = Flux.create(sink -> {
        Subscriber subscriber = new Subscriber();
        subscriber.onMessage(sink::next);
        container.addSubscriber(subscriber);   // presumably this is async already?
    });

    return flux1.concatWith(flux2);
}

Use of Flux is supported with Spring MVC as an alternative to use of SseEmitter in imperative style. The above can also be enhanced further with some buffering through the .onBackpressureBuffer set of operators in case the second source needs to absorb more than a certain number of items initially.