spring-projects / spring-framework

Spring Framework
https://spring.io/projects/spring-framework
Apache License 2.0
56.32k stars 38.01k forks source link

SSE in Java Spring Boot giving uneven chunks #32866

Closed cdaman123 closed 4 months ago

cdaman123 commented 4 months ago

I have an SSE endpoint in flask which emit SSE data chunks which are according MDN standards which ends in two \n characters, and when I received them on my frontend they received on the same format.

But I need to write a proxy for the same endpoint in spring boot which make a call on flask SSE endpoint and return same events to client. When I hit that endpoint using the same script, I received uneven chunks, for example in some chunks data: received and the value of that event received in the next chunk.

Please help me with this issue.

Sample Flask APP

import json
import time

from flask import Flask, Response
from flask_cors import CORS, cross_origin

app = Flask(__name__)

cors = CORS(
    app,
    origins="*",
)

def markdown_event():
    yield "data: "+ json.dumps({"data": "{\"RESPONSE\": \"well_structured_markdown_formatted_response_with_citation_without_appending_references_and_sources_and_additional_questions\", \"HEADING\": \"\", \"NEXT_HEADING\": \"suggestive_questions\", \"REFERENCE\": \"id_list\", \"IS_SORRY\": \"\", \"IS_RELEVANT\": \"\", \"LANGUAGE_OF_QUERY\": \"\", \"TEXT_CLASS\": \"\"}", "type": "schema"}) + "\n\n"
    chunks = ["{\"", "well", "_struct", "ured", "_mark", "down", "_formatted", "_response", "_with", "_c", "itation", "_without", "_app", "ending", "_references", "_and", "_sources", "_and", "_additional", "_questions", "\":", " \"", "Hello", " there", " [", "2", "]", " how", " can", " i", " be", " sure", " that", " [", "1", ",", "2", "]", " citation", " are", " being", " covered", " [", "1", " -", " ", "4", "],", " more", " citation", " formats", ":", " [", "1", ",", " ", "3", " -", " ", "5", "].", " Some", " more", " [", "1", "][", "4", "](", "3", ").", " The", " years", " are", " like", " [", "199", "5", " -", " ", "199", "8", "]", " []", " String", ":", " '", "Hello", " there", " [", "4", "]", " how", " can", " i", " be", " sure", " that", " (", "1", ",", "2", ")", " citation", " are", " being", ".", " Hello", " [", "202", "1", "]", " the", " ", " college", " year", " was", " [", "199", "2", "-", "96", "].", "\\", "n", "\\n", "The", " task", " is", " simple", " you", " should", " remove", " citations", " from", " the", " random", " above", " string", " in", " format", " like", " [", "xxx", "]", " [", "xxx", " -", " xxx", "],", " [", "xxx", " ,", " xxx", "],", " where", " every", " x", " is", " a", " digit", ";", " and", " not", " years", " in", " the", " following", " formats", " [", "xxxx", "],", " [", "xxxx", " -", " xxxx", "],", " [", "xxxx", " -", " xx", "]\\", "n", "\\n", "The", " output", " of", " the", " above", " string", " would", " be", ":\\", "n", "Hello", " there", " ", " how", " can", " i", " be", " sure", " that", " ", " citation", " are", " being", " covered", " ,", " more", " citation", " formats", ":", " .", " Some", " more", " .", " The", " years", " are", " like", " [", "199", "5", " -", " ", "199", "8", "]", " []", " String", ":", " '", "Hello", " there", " ", " how", " can", " i", " be", " sure", " that", " ", " citation", " are", " being", ".", " Hello", " [", "202", "1", "]", " the", " ", " college", " year", " was", " [", "199", "2", "-", "96", "].", "\\", "n", "\\n", "See", " that", " was", " easy", ".\",", " \"", "reference", "_ids", "_mapping", "\":", " [{\"", "citation", "_number", "\":", " ", "1", ",", " \"", "reference", "_id", "\":", " \"", "01", "HT", "G", "AA", "28", "DB", "H", "8", "A", "1", "ND", "P", "XY", "RK", "SW", "GE", "\"},", " {\"", "citation", "_number", "\":", " ", "2", ",", " \"", "reference", "_id", "\":", " \"", "01", "HP", "7", "CV", "V", "38", "J", "BC", "7", "D", "RA", "4", "EW", "QM", "FS", "71", "\"},", " {\"", "citation", "_number", "\":", " ", "3", ",", " \"", "reference", "_id", "\":", " \"", "01", "HP", "72", "M", "DS", "19", "J", "TF", "J", "Y", "WF", "6", "G", "5", "A", "AY", "7", "W", "\"}", "],", " \"", "suggest", "ive", "_questions", "\":", " [\"", "What", " are", " the", " benefits", " of", " Q", "IV", " in", " children", " aged", " ", "6", " months", " to", " ", "17", " years", "?\",", " \"", "Can", " Q", "IV", " be", " administered", " to", " children", " aged", " ", "6", " months", " and", " above", "?\",", " \"", "What", " is", " the", " safety", " profile", " of", " Q", "IV", " in", " children", " aged", " ", "6", "-", "35", " months", "?", "\"]}"]

    for chunk in chunks:
        yield "data: "+json.dumps({"data": chunk, "type":"function_call"}) + "\n\n"
        time.sleep(0.1)

@app.route("/")
def index():
    return "Hello, go to /markdown endpoint"

@app.route("/markdown", methods=["GET", "POST"])
def markdown():
    return Response(markdown_event(), mimetype="text/event-stream")

if __name__ == "__main__":
    app.run(port=5500)

cmd to run above flask application:

python -m flask --app=app run --port=5500 --host=0.0.0.0

Sample client :

async function sse() {
    var requestOptions = {
        method: "POST",
        redirect: "follow",
    };
    const response = await fetch(
        "http://localhost:5000/proxy-sse-2",
        // "http://localhost:5500/markdown",
        requestOptions
    );
    console.log(response.status)
    console.log(response.headers)
    count = 1
    const reader = response.body
        .pipeThrough(new TextDecoderStream())
        .getReader();
    while (true) {
        const { value, done } = await reader.read();
        if (done) break;
        console.log("Chunk: ", count, " *********************************");
        const a = String.raw`${value}`;
        console.log("Received", a);
        console.log("--------------------------------");
        count += 1;
    }
    console.log(count);
}
sse()

Java spring boot controller with SseEmitter:

package com.spring.demo.controller;

import java.io.IOException;
import java.time.LocalDateTime;

import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter.SseEventBuilder;

@RestController
public class HelloWorldController {

    private final String targetSseUrl = "http://localhost:5500/";

    private final WebClient webClient;

    public HelloWorldController(WebClient.Builder webClientBuilder) {
        this.webClient = webClientBuilder.baseUrl(targetSseUrl).build();
    }

    @PostMapping("/proxy-sse")
    public SseEmitter proxySse() {
        SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);

        // Call the original SSE endpoint and forward events to the client
        webClient.post()
                .uri("/markdown")
                .accept(MediaType.TEXT_EVENT_STREAM)
                .retrieve()
                .bodyToFlux(String.class)
                .subscribe(arg0 -> {
                    try {
                        System.err.println(arg0);
                        emitter.send(SseEmitter.event().data(arg0));
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }, emitter::completeWithError, emitter::complete);

        return emitter;
    }

    @PostMapping("/local-sse")
    public SseEmitter proxySse1() {
        SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);

        emitter.onTimeout(() -> System.out.println("SSE connection timed out"));
        emitter.onCompletion(() -> System.out.println("SSE connection closed"));

        new Thread(() -> {
            int count = 0;
            while (true) {
                count++;
                try {
                    String data = "Event at: " + LocalDateTime.now();
                    SseEventBuilder eventBuilder = SseEmitter.event().data(data);
                    emitter.send(eventBuilder);
                    Thread.sleep(100); // Send event every 100ms (adjust as needed)
                } catch (Exception e) {
                    emitter.completeWithError(e);
                    break;
                }
                if (count == 100) { // Number of events to be send (adjust as needed)
                    emitter.complete();
                    break;
                }
            }
        }).start();

        return emitter;
    }
}

Above code block have two endpoints, one which make a call on flask endpoint and emit data received from that and another endpoint generate data and omit to client.

When I make a call on flask endpoint directly I received complete chunks but when I make call on any of the java endpoint I received uneven broken chunks.

Result for all endpoints:

  1. flask endpoint
  2. java proxy SSE endpoint
  3. java local SSE endpoint

Please help me how to fix this issue.

bclozel commented 4 months ago

Sorry but can't justify spending the time on this - it seems the issue resides in the integration between both services and not specifically in Spring Framework. We can reopen this issue if you manage to provide a minimal sample application (something we can download or git clone and run) that only involves Spring and produces invalid SSE streams that browsers will not accept.

cdaman123 commented 4 months ago

Hi

@bclozel thanks for reply. I tried Spring's SseEmitter with JavaScript's default EventSource, and it worked as expected.

However, I noticed that SseEventBuilder.build() in Spring creates three separate packets for a single event containing only data. For instance, if I try to send an event with the value "Event at: 2024-05-22T12:10:50.985317511", the returned Set from SseEventBuilder.build() has a size of 3, containing "data:", "Event at: 2024-05-22T12:10:50.985317511", and "\n\n". This behavior causes issues with my custom event handling logic.

Code can be found below: https://github.com/spring-projects/spring-framework/blob/dae4366325f645fd83b035e229027549dcf61c50/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/SseEmitter.java#L199-L281

sample application repo : https://github.com/cdaman123/spring-sse-sample-app

bclozel commented 4 months ago

I tried Spring's SseEmitter with JavaScript's default EventSource, and it worked as expected.

This points to an issue with your integration, not in Spring Framework.

However, I noticed that SseEventBuilder.build() in Spring creates three separate packets for a single event containing only data.

We're concatenating String instances, but in the end this is flushed at once on the network so this shouldn't be a problem.

cdaman123 commented 4 months ago

We're concatenating String instances, but in the end this is flushed at once on the network so this shouldn't be a problem.

@bclozel I checked the new version of spring-framework, all chunks are flushed at once, but strings are not concatenated instead of stored in a SET.

https://github.com/spring-projects/spring-framework/blob/489d18a1691d00c7487ec6034f7065da28fcb3b8/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/SseEmitter.java#L235-L245

My question is why they are stored in a SET instead of being appended to a string.

bclozel commented 4 months ago

I think this issue has run its course. You did not provide any sample application showing our SSE support having a problem. We can't justify spending the time to discuss the details of the implementation unless this is an enhancement request.

ryanrupp commented 4 months ago

@bclozel @cdaman123 I was troubleshooting something else with SSE and noticed this behavior in the logging/debugging. I think it's possible this was fixed in 6.0.12 with this change (edit - that said, not sure eagerly flushing should create a correctness problem really but maybe for the original example)

Which explains the

I checked the new version of spring-framework, all chunks are flushed at once

FWIW the issue I ran into (just dropping it here but it's pretty unrelated but maybe someone will find this in a search) is that I was trying to re-use an instance of SseEventBuilder i.e. one event broadcast to many emitters. The sort of non-obvious part here was that the implementation of SseEventBuilder#build mutates the contents therefore you cannot re-use the same event/builder to multiple calls of send. See implementation here, resulted in extra newline characters which confused some clients.

cdaman123 commented 4 months ago

@ryanrupp I checked this behavior with spring version 6.0.12 but it is not completely fixed, just due to flushing all chunk at once, at client side the frequency of complete event increased but not completely fixed.

@bclozel any plan for consider this as enhancement request?

bclozel commented 4 months ago

@cdaman123 there's still nothing actionable for us in this issue. This was first reported as a bug because of broken chunks so I don't understand which enhancement you're talking about. I think @ryanrupp is mentioning a different case as your initial sample was not reusing the same instance of SseEventBuilder for multiple messages.

cdaman123 commented 4 months ago

Hi @bclozel I am talking about sending a complete event as a single string ('data:message\n\n')instead of breaking them in 'data:', 'message' and '\n\n'.

bclozel commented 4 months ago

This should be the case already. See ResponseBodyEmitter and:


    /**
     * Write a set of data and MediaType pairs in a batch.
     * <p>Compared to {@link #send(Object, MediaType)}, this batches the write operations
     * and flushes to the network at the end.
     * @param items the object and media type pairs to write
     * @throws IOException raised when an I/O error occurs
     * @throws java.lang.IllegalStateException wraps any other errors
     * @since 6.0.12
     */
    public synchronized void send(Set<DataWithMediaType> items) throws IOException {