ajndkr / lanarky

The web framework for building LLM microservices
https://lanarky.ajndkr.com/
MIT License
976 stars 74 forks source link

StreamingResponse SSE text/event-stream requires "data:" prefix and "\n\n" terminator to work properly #121

Closed auxon closed 12 months ago

auxon commented 1 year ago

Let's say you have a FastAPI endpoint that returns this:

return StreamingResponse.from_chain(
            chain,
            request.messages[-1].content,
            as_json=True,
            media_type="text/event-stream",
        )

StreamingResponse.from_chain works with fetch but not with other ways like using Postman which has support for SSE now. This is because the callbacks don't add the "data:" prefix or newline terminators expected. So if you test out an endpoint in Postman or use something like Axios clients the stream doesn't send events until it's done. The fix is to change the async streaming response callbacks to something like this:


class AsyncStreamingResponseCallback(AsyncLanarkyCallback):
    """Async Callback handler for StreamingResponse."""

    send: Send = Field(...)

    def _construct_message(self, content: str) -> Message:
        """Constructs a Message from a string."""
        return {
            "type": "http.response.body",
            "body": f"data: {content}\n\n".encode("utf-8"),
            "more_body": True,
        }

class AsyncStreamingJSONResponseCallback(AsyncStreamingResponseCallback):
    """Async Callback handler for StreamingJSONResponse."""

    send: Send = Field(...)

    def _construct_message(self, content: StreamingJSONResponse) -> Message:
        """Constructs a Message from a dictionary."""
        return {
            "type": "http.response.body",
            "body": f"data: {json.dumps(content.dict(), ensure_ascii=False, allow_nan=False, indent=None, separators=(',', ':'))}\n\n".encode("utf-8"),
            "more_body": True,
        }

I'll get around to making a pull request for this (after some more testing), but wanted to flag it right away.

auxon commented 1 year ago

@ajndkr Assign this one to me if you like.

ajndkr commented 1 year ago

@auxon interesting! how would the parsing on the client side work if a user wants to use fetch?

auxon commented 1 year ago

@ajndkr Hey, sorry for the delay - had a busy week at work. I actually tried to commit the fix to my fork but there was some weird issue with the pre-commit hooks, with some SSL issue, trying to get it working. I tried again a few days later, but it was still a problem. May have to do it on my MacBook to get it working.

As for the parsing with fetch - well if it streams token by token you can just use a regex to extract the content, but in my case I actually stream the events in the event-stream from the Python API to an ASP.NET server to another ASP.NET server before processing it in the frontend React UI, and by then the stream doesn't arrive token by token anymore. So, I have to do this (removed UI specific things):

           const controller = new AbortController();
           const signal = controller.signal;
           const response = await fetch(endpoint, {
            method: 'POST',
            headers: {
              'Connection': 'keep-alive',
              'Content-Type': 'application/json',
            },
            signal: signal,
            body: body,
          });
            const data = response.body;
            const reader = data.getReader();
            const decoder = new TextDecoder();
            let done = false;
            let isFirst = true;
            let text = '';
            let docSources: Source[] = [];
            let chunkValue = '';
            while (!done) {
              if (stopConversationRef.current === true) {  // UI signal to abort (cancel generation)  
                controller.abort();
                done = true;
                break;
              }
              const { value, done: doneReading } = await reader.read();
              done = doneReading;
              if(done) {
                break;
              }
              chunkValue += decoder.decode(value as Uint8Array);
              let endPos = chunkValue.lastIndexOf('\n\n');
              if (endPos !== -1) {
                let processChunk = chunkValue.substring(0, endPos + 2);
                chunkValue = chunkValue.substring(endPos + 2); // keep the characters after '\n\n'
                const regex = /data:\s?({.*?})\n\n/g;
                const matches = [...processChunk.matchAll(regex)];
                const jsonStrings = matches.map((match) => match[1]);
                if (jsonStrings.length > 0) {
                  text += jsonStrings.map((jsonString) => {
                    if (jsonString.startsWith('{"token":')) {
                      const jsonObject = JSON.parse(jsonString);
                      const tokenValue = jsonObject.token;
                      return tokenValue;
                    } else if (jsonString.startsWith('{"source_documents":')) {
                      const jsonObject = JSON.parse(jsonString);
                      const sourceDocs = jsonObject.source_documents;
                      docSources = sourceDocs.map((sourceDoc: { page_content: string, metadata: { source: string; }; }) => 
                        ({ url: sourceDoc.metadata.source, page_content: sourceDoc.page_content })
                      );
                    }
                    else
                    {
                      return jsonString;
                    }
                  }).join('');
                }
                // Bunch of React code to update UI with text and docSources.
              }
            } 

Then the UI is updated with the text as it changes and docSources at the very end.

ajndkr commented 1 year ago

As for the parsing with fetch - well if it streams token by token you can just use a regex to extract the content, but in my case I actually stream the events in the event-stream from the Python API to an ASP.NET server to another ASP.NET server before processing it in the frontend React UI, and by then the stream doesn't arrive token by token anymore. So, I have to do this (removed UI specific things):

I see! okay... let's maybe do the following:

instead of:

def _construct_message(self, content: StreamingJSONResponse) -> Message:
        """Constructs a Message from a dictionary."""
        return {
            "type": "http.response.body",
            "body": f"data: {json.dumps(content.dict(), ensure_ascii=False, allow_nan=False, indent=None, separators=(',', ':'))}\n\n".encode("utf-8"),
            "more_body": True,
        }

It might be better to do something like:

def _construct_message(self, content: StreamingJSONResponse) -> Message:
        """Constructs a Message from a dictionary."""
        return {
            "type": "http.response.body",
            "body": f"{self.body_prefix}{json.dumps(content.dict(), ensure_ascii=False, allow_nan=False, indent=None, separators=(',', ':'))}{self.body_suffix}".encode("utf-8"),
            "more_body": True,
        }

provides flexibility for other users imo.

I actually tried to commit the fix to my fork but there was some weird issue with the pre-commit hooks, with some SSL issue, trying to get it working. I tried again a few days later, but it was still a problem. May have to do it on my MacBook to get it working.

about this, don't worry about pre-commit. if the code check fails, i will run the linters on my end and fix any errors.

ajndkr commented 12 months ago

@auxon feel free to test the new feature from main branch. It will be part of v0.8 release (will make a new release by next week or so)