supabase / supabase-flutter

Flutter integration for Supabase. This package makes it simple for developers to build secure and scalable products.
https://supabase.com/
MIT License
661 stars 154 forks source link

Add support for SSE response on functions client. #894

Closed dshukertjr closed 2 months ago

dshukertjr commented 2 months ago

Is your feature request related to a problem? Please describe. Yes. Currently, there seems to be no possibility to invoke an Edge Function that returns a server-sent events using the client library.

Describe the solution you'd like I would like to see the implementation of an additional function, which could return a Stream of server-sent events instead of standard promise.

Describe alternatives you've considered One alternative I considered was to simply await the full result. However, this would leave the user waiting for a long period, which is not an ideal user experience.

Additional context The motivation is a project, where we are looking to implement a ChatGPT interface within our Flutter application, and we need to stream the response of the API call to OpenAI to the user in real-time.

Implementing this functionality on the backend as an Edge Function was straightforward enough - I followed the tutorial on Streaming Data in Edge Functions by Supabase (https://www.youtube.com/watch?v=9N66JBRLNYU).

Copied from https://github.com/supabase/functions-js/issues/67

nietsmmar commented 2 months ago

Because this feature does not exist yet I used a fork of dart_sse_client which provides web-support and used it together with fetch_client library.

This is super hacky but works for (most) part. But now since the upgrade from

supabase-flutter package > 2.4.0 gotrue package > 2.5.1

it does not work anymore. I always get 415 Unsupported Media Type as a response from my Edge-Function. I think it is some change in gotrue package to version 2.6.0 although I couldn't figure out which.

@dshukertjr It seems like this issue here is just a change of two lines, regarding the js-PR. I would really appreciate to finally be able to use streaming in flutter from the official supabase package. As of now it doesn't even work with the workaround anymore.

Thanks for all the great work.

dshukertjr commented 2 months ago

@nietsmmar I did some quick research, and for the Dart library, it's not going to be just updating two lines of code, but a few more than that. I want to properly test it out, and with some other tasks that I have, it might take some days for me to get to this one. Sorry for the inconvenience.

nietsmmar commented 2 months ago

@dshukertjr Thank you for checking it out! Really appreciate it! Looking forward to the fix :)

oliverbytes commented 2 months ago

I'm eagerly waiting as well. Thanks so much to anyone doing the work.

nietsmmar commented 1 month ago

@dshukertjr I initialized it now like this:

final client = FetchClient(mode: RequestMode.cors);
  await Supabase.initialize(
    url: config.api.url,
    anonKey: config.api.anonKey,
    debug: kDebugMode,
    httpClient: kIsWeb ? client : null,
  );
final response = await supabaseClient.functions.invoke(
  'chat_stream',
  body: {
    'body': ...
  },
  headers: {'x-region': 'eu-central-1'},
);
final stream = ((response.data) as ByteStream).transform(const Utf8Decoder());

try {
  await for (final String element in stream) {
    final chunk = json.decode(element);

    // I only get complete streamed data here at once

    yield chunk;
  }
} catch (e) {
  yield* Stream.error(
    "myCustomError",
  );
}

But I just get the full stream at once in the end as one element in my await for. Am I doing something wrong? I am using Web but with fetch_client it should work to give me every segment.

dshukertjr commented 1 month ago

@nietsmmar What does your edge function code look like?

nietsmmar commented 1 month ago

@nietsmmar What does your edge function code look like?

Looks like this (worked before with dart_sse_client, I am getting a stream from openAI that I want to forward):

const body = new ReadableStream({
  async start(controller) {
    for await (const chunk of stream) {
      var finish = false;
      if (chunk['choices'][0]['finishReason'] === 'stop') {
        finish = true;
      }
      var token = chunk['choices'][0]['delta']['content'] ?? '';
      var chunkJson = {
        token: token,
        finish: finish
      };
      console.log(chunkJson);
      controller.enqueue(new TextEncoder().encode("data:" + JSON.stringify(chunkJson) + "\r\n\r\n"));
      if (finish) {
        controller.close();
        return;
      }
    }
  },
  cancel() {
    // ...
  },
});
dshukertjr commented 1 month ago

@nietsmmar Just to double check, what version of functions_client are you using? You can find this within your pubspec.lock file.

nietsmmar commented 1 month ago

@nietsmmar Just to double check, what version of functions_client are you using? You can find this within your pubspec.lock file.

My pubspec.lock says:

  functions_client:
    dependency: transitive
    description:
      name: functions_client
      sha256: a70b0dd9a1c35d05d1141557f7e49ffe4de5f450ffde31755a9eeeadca03b8ee
      url: "https://pub.dev"
    source: hosted
    version: "2.1.0"
dshukertjr commented 1 month ago

@nietsmmar Hmm, alright. Do you have the 'Content-Type': 'text/event-stream', set on your edge function?

nietsmmar commented 1 month ago

@nietsmmar Hmm, alright. Do you have the 'Content-Type': 'text/event-stream', set on your edge function?

I am returning this:

return new Response(content, {
    headers: {
      ...corsHeaders,
      "Content-Type": "text/event-stream",
    },
  });
dshukertjr commented 1 month ago

@nietsmmar This is the test script that I used. Another difference that I see is Connection: 'keep-alive', in the headers. Would you be able to try adding it? If that doesn't work, would you be able to use my test script and see if that works on your end?

Deno.serve(async (req) => {
  const { input } = await req.json()

  const headers = new Headers({
    'Content-Type': 'text/event-stream',
    Connection: 'keep-alive',
  })

  // Create a stream
  const stream = new ReadableStream({
    async start(controller) {
      const encoder = new TextEncoder()

      try {
        for await (const char of input) {
          controller.enqueue(encoder.encode(char))
        }
      } catch (err) {
        console.error('Stream error:', err)
      } finally {
        controller.close()
      }
    },
  })

  // Return the stream to the user
  return new Response(stream, {
    headers,
  })
})
nietsmmar commented 1 month ago

@dshukertjr I tried adding kee-alive but it didn't change anything.

I copied your test script and created a new function with the exact code. Then I called it like this:

final response = await supabaseClient.functions.invoke(
  'stream_test',
  body: {
    'input': 'this is a test, hello?',
  },
  headers: {'x-region': 'eu-central-1'},
);
final stream =
    ((response.data) as ByteStream).transform(const Utf8Decoder());

try {
  await for (final String element in stream) {
    print('start');
    print(element);
    print('end');
    //yield ...
  }
} catch (e) {
  yield* Stream.error(
    'myCustomError',
  );
}

And this is my print output:

start
this is a test, hello?
end
dshukertjr commented 1 month ago

@nietsmmar Could you check to see you can listen to SSE using both your function and mine on an iOS or Android device/emulator?

nietsmmar commented 1 month ago

@dshukertjr I first tried building for linux as this was easier for me (not having an android emulator running right now). There it works for both our functions.

What version of fetch_client are you using? I am using 1.0.2.

dshukertjr commented 1 month ago

@nietsmmar I'm using fetch_client v1.0.2 as well. What browser are you using? I tested it on Google Chrome, and nothing else.

nietsmmar commented 1 month ago

@dshukertjr I also only testet in Google Chrome. Version 124.0.6367.91 (Official Build) (64-bit)

This is really weird. I also did run flutter clean etc. can there still be something old cached that I am missing?

dshukertjr commented 1 month ago

@nietsmmar Does the same thing happen if you run your edge functions locally, because that's what I've been doing.

Here is the Dart code that I'm using the listen to stream BTW.

final res = await supabase.functions
    .invoke('sse', body: {'input': 'sample text'});

(res.data as ByteStream)
    .transform(const Utf8Decoder())
    .listen((val) {
      print(val);
});
nietsmmar commented 1 month ago

@dshukertjr I did run my edge function locally too.

I just tried your exact dart-code. And there I get the whole message at once too.

I did try building in dev and in release. But in release the same happens.

I also tried wrapping runApp with runWithClient like this:

runWithClient(() => 
    runApp(
        child: const MyApp(),
    ),
); FetchClient.new,);

But that also didn't change the behavior.

nietsmmar commented 1 month ago

@dshukertjr You are sure it works on your end? I updated to newest fetch_client versions etc. but I still can't make it work.

I still get

start
{input}
end
dshukertjr commented 1 month ago

@nietsmmar Yeah, this is the deployed version of a simple SSE app that works for me. Can you try it out to see if it works on your end? https://sse.dshukertjr.dev/

Here is a screen recording of what I see.

https://github.com/supabase/supabase-flutter/assets/18113850/3178659e-e412-4a24-b5a2-d9e897f1a92f

nietsmmar commented 1 month ago

@dshukertjr yes it works. I am so puzzled. I literally copied your code. 🤯 Do you have the code on a github repo or something so I could clone and try myself?

dshukertjr commented 1 month ago

@nietsmmar Good to hear that it works on your end as well!

The code is here, but this repo is my "junk yard", so it's a bit messy, and you will find bunch of unrelated code as well 😂 https://github.com/dshukertjr/flutter_playground/tree/main

nietsmmar commented 1 month ago

@nietsmmar Good to hear that it works on your end as well!

The code is here, but this repo is my "junk yard", so it's a bit messy, and you will find bunch of unrelated code as well 😂 https://github.com/dshukertjr/flutter_playground/tree/main

thanks a lot! I will be on vacation a few days but will try your exact code when I am back.

nietsmmar commented 1 month ago

@dshukertjr oh wow! It is this:

const delay = Math.random() * 100
await new Promise((resolve) => setTimeout(resolve, delay))

when there is no delay, the messages just get send together. I did not know about that. So your first backendcode you posted above did not have any delay and everything came just as one message. So I always thought it won't work.

dshukertjr commented 1 month ago

@nietsmmar Ah, my bad 🙈

nietsmmar commented 1 month ago

@dshukertjr So it is normal, that parts can be sent together? I never had this with my older implementation.

As I am processing each chunk of stream seperately in my client. I need to distinguish them myself then. (which is okay)

Thanks again for your help to make this all work! Thanks a lot!

nietsmmar commented 1 month ago

@dshukertjr I am also having a bit of a problem when throwing an error while streaming.

When doing supabaseClient.functions.invoke(...)... and catching error I only get:

js_primitives.dart:28 Instance of 'minified:RO'

In my Edge-Function I throw a 500 Error with JSON-Response like:

{
    "error": "Custom Error message",
    "reasonPhrase": "Custom Reasonphrase."
}

Does the thrown error have some specific format to be parsed correctly from the supabase package?

nietsmmar commented 4 weeks ago

I do send my chunks like this:

var chunkJson = {
  token: token,
  finish: finish
};

controller.enqueue(new TextEncoder().encode(JSON.stringify(chunkJson)));

But sometimes I get this as one chunk: {"token":" gest","finish":fa

And in the next chunk I get: lse}

As I decode these json-strings in my client my code breaks because each chunk itself is no proper JSON anymore. Is this normal behaviour that I should expect in this stream? I find it difficult to properly process my data like that.

Thanks!