reduxjs / redux-toolkit

The official, opinionated, batteries-included toolset for efficient Redux development
https://redux-toolkit.js.org
MIT License
10.54k stars 1.14k forks source link

Handle StreamingResponse and Server-Sent Events #3701

Open saschahofmann opened 10 months ago

saschahofmann commented 10 months ago

Use Case

I want to display a Progress bar for process running in the backend. I am also writing the API so I can make it anything: a streaming response, SSE or even a websocket. I managed to make rtk-query work with all three of these solutions but only for GET requests. In my case, the process is triggered by a POST request and I seem unable to make it work.

using onCacheEntryAdded

I would like to see rtk-query handling streaming responses and/or SSE. I made a simple version work with a StreamingResponse using the onCacheEntryAdded prop similar to how it's used in the docs for websockets. It feels quite quirky because, I have to set the original queryFn: () => ({data: null}) and then write the whole fetching logic inside the onCacheEntryAdded function.

      async onCacheEntryAdded(arg, { cacheDataLoaded }) {
        console.log('here', arg);
        await cacheDataLoaded;
        // Would be nice if I had access to the baseQuery here
        const response = await fetch(
          `${process.env.NEXT_PUBLIC_API_URL}/risk_profiles/`
        );
        const reader = response.body
          .pipeThrough(new TextDecoderStream())
          .getReader();
        let done, value;
        while (!done) {
          ({ value, done } = await reader.read());
          // Overwrite the data value
          updateCachedData(() => value);
        }
      }

Without having looked at the code, I could imagine that maybe a returned ReadableStream response could be handled differently. Unfortunately, I can imagine two ways consecutive responses should be handled

a) By overwriting the previous value b) By appending to a list

I'd say b) is the more general case and could be the default.

In any case, above works for a get request. Unfortunately, the isLoading is already false after it ran the queryFn and I haven't found a way to change it back to true.

The bigger issue is that this doesn't work for mutations like POST requests. While the mutation builder has onCacheEntryAdded property, it has no access to updateCachedData. Is there any specific reason for this? I think adding that to the props would be enough for me to hack a solution together for my use case. Even now I might be able to use the dispatch function, but I would love to see an easier way to do this in rtk-query.

dvargas92495 commented 9 months ago

I'm also interested in this being officially supported. My current workaround involves passing stream handlers as an API arg:

const [apiStream] = useApiStreamMutation();
...
const onSubmit = () => {
    apiStream({
        data,
        // @ts-ignore - ideally I don't need this ignore
        onChunk: (chunk) => {
            // do something with each chunk
        }
    }).unwrap().then((chunks) => {
       // do something with all chunks - requires type casting to an array
    })
}

which I use when I extend the endpoint definition:

apiStream: {
  query: (
    queryArg: ApiStreamApiArg & CustomStreamHandlers<ApiStreamResponse>
  ) => ({
    url: `/api/stream`,
    method: "POST",
    body: queryArg.data,
    responseHandler: (response) => {
        const reader = response.body.getReader();
        let done, value;
        while (!done) {
          ({ value, done } = await reader.read());
          queryArg.onChunk(value);
        }
    },
  }),
}

This is not ideal because I have to do a bunch of manual type casting, serialization ignoring, and type ignoring in order to get this to work. Ideally there's a solution provided by rtkq out of the box

markerikson commented 9 months ago

@dvargas92495 : what would a notional API change look like here?

dvargas92495 commented 9 months ago

There are four things that I care about:

Here's one possible way I can think of specifying this, but I'm not at all married to it:

const [apiStream, {
    isStreaming,
    data, // or chunk
    cancelStreaming,
}] = useApiStreamMutation();

const onSubmit = () => {
    apiStream().unwrap().then((fullData: (typeof data)[]) => {
        notifyUserWeAreDoneStreaming();
        handleAllData(fullData);
    })
}

React.useEffect(() => {
   if (isStreaming) {
       notifyUserWeStartedStreaming();
   }
}, [isStreaming]);

React.useEffect(() => {
   if (data) {
       handleChunkFromApi(data);
   }
}, [data]);

const onUserClickCancel = () => {
   cancelStreaming();
}
simon-lighthouse commented 1 month ago

This is increasingly relevant as we are seeing a growing number of applications interacting with LLM's. In these instances, we're expecting to send POST requests/ mutations to endpoints where we receive only a streamed response. @dvargas92495 's sample would work very well.

dvargas92495 commented 1 month ago

@markerikson would your team be open to proposals to help move this forward or is this something you guys want to handle internally as part of the Post 2.0 effort?

markerikson commented 1 month ago

@dvargas92495 we're always open to proposals and potential PRs, yeah.