dotnet / runtime

.NET is a cross-platform runtime for cloud, mobile, desktop, and IoT apps.
https://docs.microsoft.com/dotnet/core/
MIT License
15.37k stars 4.75k forks source link

[API Proposal]: Support parsing server-sent events (SSE) #98105

Closed stephentoub closed 5 months ago

stephentoub commented 9 months ago

Background and motivation

SSE is becoming more and more popular, especially with prominent services like OpenAI relying on it for streaming responses. The format is very simple, but it still takes some amount of code to properly handle parsing the SSE format. We should have a built-in helper in either System.Net or System.Formats that take care of it for the developer (optionally then other higher-level helpers could be layered on top).

API Proposal

namespace System.Formats.Sse;

public readonly struct SseItem<T>
{
    public SseItem(T data, string eventType);
    public T Data { get; }
    public string EventType { get; }

    public const string EventTypeDefault = "message"; // defined as the default by SSE spec
}

public static class SseParser
{
    public static SseEnumerable<string> Parse(Stream sseStream); // equivalent to the other overload using Encoding.UTF8.GetString
    public static SseEnumerable<T> Parse<T>(Stream sseStream, SseItemParser<T> itemParser);
}

public delegate T SseItemParser<out T>(string eventType, ReadOnlySpan<byte> data);

public sealed class SseEnumerable<T> : IAsyncEnumerable<SseItem<T>>, IEnumerable<SseItem<T>>
{
    public IEnumerator<SseItem<T>> GetEnumerator();
    public IAsyncEnumerator<SseItem<T>> GetAsyncEnumerator(CancellationToken cancellationToken = default);

    public string LastEventId { get; }
    public TimeSpan ReconnectionInterval { get; }
}

Open Issues

API Usage

HttpClient client = ...;
using Stream responseStream = await client.GetStreamAsync(...);
await foreach (SseItem<string> item in SseParser.Parse(responseStream, (_, bytes) => Encoding.Utf8.GetString(bytes)))
{
    Console.WriteLine(item.Data);
}
HttpClient client = ...;
using Stream responseStream = await client.GetStreamAsync(...);
await foreach (SseItem<T> item in SseParser.Parse(responseStream, (_, bytes) => JsonSerializer.Deserialize<T>(bytes)))
{
    Console.WriteLine(item.Data);
}

Alternative Designs

Risks

ghost commented 9 months ago

Tagging subscribers to this area: @dotnet/ncl See info in area-owners.md if you want to be subscribed.

Issue Details
### Background and motivation SSE is becoming more and more popular, especially with prominent services like OpenAI relying on it for streaming responses. The format is very simple, but it still takes some amount of code to properly handle parsing the SSE format. We should have a built-in helper in either System.Net or System.Formats that take care of it for the developer (optionally then other higher-level helpers could be layered on top). ### API Proposal ```csharp namespace System.Formats.Sse; public struct SseItem { public string? Event { get; set; } public T Data { get; set; } } public static class SseParser { public static IAsyncEnumerable> ParseAsync(Stream sseStream, Func, T> itemParser, CancellationToken cancellationToken = default); } ``` ### API Usage ```csharp HttpClient client = ...; using Stream responseStream = await client.GetStreamAsync(...); await foreach (SseItem item in SseParser.ParseAsync(responseStream, Encoding.Utf8.GetString)) { Console.WriteLine(item.Data); } ``` ### Alternative Designs - Not having an SseItem and just having an IAsyncEnumerable of a tuple - Always returning a byte[] instead of a callback that parses a span into a T - Making it specific to System.Net, e.g. putting it somewhere in the System.Net.Http library. ### Risks - The proposed API relies on ref structs being able to be used as a generic parameters and on Func being annotated accordingly. This is on track to happen for .NET 9 / C# 13, but if it doesn't, a different shape might be needed. - We should have equivalent support in ASP.NET for serializing out an SSE stream, and it should use whatever shared `SseItem` type is in the shared libraries, so we need to ensure it's designed accordingly.
Author: stephentoub
Assignees: -
Labels: `api-suggestion`, `area-System.Net`
Milestone: 9.0.0
ericstj commented 8 months ago

helper in either System.Net or System.Formats that take care of it for the developer

Is the proposal here to have a separate assembly for this or place it in an existing assembly? The way this is designed it seems like you have a preference. Might be good to add some thoughts around why not for the alternatives.

What do you think are next steps here? cc @dotnet/ncl @bartonjs

stephentoub commented 7 months ago

Is the proposal here to have a separate assembly for this or place it in an existing assembly?

Ideally I think it would be a separate assembly that just worked in terms of Stream and that had a netstandard2.0 asset, so that it could be used downlevel by AI-related components that need it.

What do you think are next steps here?

  1. Prototype it and make sure it meets the needs of various known existing consumers, e.g. https://github.com/microsoft/semantic-kernel/tree/9b8a218f5b6df9dcc72d69d989aff9b904cdecf0/dotnet/src/InternalUtilities/src/Text and https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/openai/Azure.AI.OpenAI/src/Helpers/SseReader.cs.
  2. Sync with ASP.NET folks on what we should be doing here for production and not just consumption: we might want both sides in this library, both reader and writer.
  3. Review, implement, test, ship it.
trrwilson commented 7 months ago

First: it's very cool to see a proposal to standardize server-sent events in this manner. It'll be a huge help to have this for consumers like OpenAI, which leans heavily on the pattern for its streamed REST response payloads.

With that, I'd suggest the API proposal be expanded to proactively cover the entirety of the spec (as outlined at https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation):

public struct SseItem<T>
{
    public string EventName { get; set; } = "message"; // per spec, "message" is the implicit default and null is impossible
    public T Data { get; set; }
    public string LastEventId { get; set; } // "id" in spec
    public TimeSpan ReconnectionInterval { get; set; } // "retry" in spec
}

Notably, the parsing logic then needs to record and propagate LastEventId as the note mentions:

If an event doesn't have an "id" field, but an earlier event did set the event source's last event ID string, then the event's lastEventId field will be set to the value of whatever the last seen "id" field was.

I haven't yet encountered use of the reconnection-related fields (id/retry) but then I also hadn't previously seen a reliance on event until it was later introduced -- just covering it all at once seems prudent.

eiriktsarpalis commented 6 months ago

Any reason why the envelope type is not a readonly struct?

stephentoub commented 6 months ago

Any reason why the envelope type is not a readonly struct?

It could be, then adding a constructor for the data. It's a simple POCO.

dlyz commented 6 months ago

I've implemented something close to some of the editions of this proposal for my own purposes and now sharing it here in case someone finds it useful. Note that the code hasn't been heavily tested, so it probably contains few bugs. Anyway I tried to implement it as close to the spec as I could (including event dispatch instructions for browsers) and to make it as efficient as I could (to a reasonable extent).

Notes about incompatibilities with the proposal:

As a bonus you can find in the gist demo of streaming request to the OpenAI API, few tests (examples from the spec), and implementation of SseResult<TValue> : IResult for ASP.NET.

stephentoub commented 6 months ago

I've implemented something close to some of the editions of this proposal for my own purposes and now sharing it here in case someone finds it useful.

Thanks! I have an implementation ready to go once this is approved, but appreciate your sharing none-the-less. It's great to know that the general shape has worked well for you.

LastEventId and ReconnectionInterval is not part of the stream. Firstly, it matches the edition of the proposal on the moment I started to implement parser. Secondly, it is not obvious how thread safe it should be, when exactly this properties of the stream object should be updated, etc. And it might be not obvious for the user too.

I think it's ok. The properties are possibly modified during a call to MoveNextAsync, and so you shouldn't be reading them during such a call, but that maps to normal concurrency rules around such objects. If someone just writes a normal foreach loop, they'll naturally fall into a pit of success in this regard.

LastEventId is a part of the event item and I think it may be useful because LastEventId may mean 'EventId' in some scenarios or because it will be more consistent in case of reconnect implementation.

I think they're effectively one in the same, as the spec details that the last event ID is retained and used for subsequent events that don't include their own ID: "Initialize event's type attribute to "message", its data attribute to data, its origin attribute to the serialization of the origin of the event stream's final URL (i.e., the URL after redirects), and its lastEventId attribute to the last event ID string of the event source" (in this regard the enumerable is effectively the equivalent of the EventSource). And by putting it on the enumerable rather than on each event, we streamline the event a bit and pass less data around. It's also expected that a consumer doesn't actually need the last event ID on each message; it's only necessary when a reconnect is desired.

dlyz commented 6 months ago

I think it's ok. The properties are possibly modified during a call to MoveNextAsync, and so you shouldn't be reading them during such a call

Yeah, probably you are right, there is no need to overthink this for some weird non-practical use-cases.

It's also expected that a consumer doesn't actually need the last event ID on each message; it's only necessary when a reconnect is desired.

I assumed id may be used not only for reconnects, but also have some application level meaning, but I am not an expert in SSE, maybe id shouldn't be used this way.

The second concern was about consistency, we should guarantee that the user has processed the event with lastEventId from the stream object before the possible reconnect. But on second thought I don't see any problems with that if we don't count application-level event processing errors that should not probably lead to reconnect.

stephentoub commented 6 months ago

@davidfowl, has anyone looked into what ASP.NET would want here? I'm imagining it would need some kind of SseFormatter for writing out events, which would be trivial, much more so than the parser, and that could be added later if desirable (might be easier for that part to just be in ASP.NET). I'm more interested in the SseItem struct here, as I'd hope that could be shared. It occurs to me that could influence the open question about whether there's a Retry and Id on the struct and whether they're optional, though as optional they could also be added later (with a new constructor).

davidfowl commented 6 months ago

No nobody has looked, but what you said is right, though I don't know if we need it, the format is simple enough. I think we'd want to support returning a SseResult<T> or SseEnumerable<T> natively from a minimal API or controller (via IResult). We could probably also adopt this client side in SignalR and replace our home grown impl (is this .NET 9 only?)

cc @captainsafia @BrennanConroy

Krzysztof318 commented 6 months ago

In the sample, always is returned a generic SseItem<T> what if sse source returns different types of objects?

stephentoub commented 6 months ago

In the sample, always is returned a generic SseItem<T> what if sse source returns different types of objects?

Then either a) you can make T a discriminated union, b) you can make T be object and have the consumer type test, c) you can just have T be byte[] containing the UTF8 bytes and transform it at the consumer (the delegate just calls ToArray), or d) you can have T be string and transform it at the consumer (the delegate just calls Encoding.UTF8.GetString). The proposed Parse overload that doesn't take a delegate does (d).

stephentoub commented 6 months ago

though I don't know if we need it, the format is simple enough

Yeah

I think we'd want to support returning a SseResult<T>

Is that the same as SseItem<T>? I could rename it that if we like it better.

or SseEnumerable<T> natively from a minimal API or controller (via IResult)

I was thinking we'd want to support IAsyncEnunerable<SseItem<T>> being returned.

We could probably also adopt this client side in SignalR and replace our home grown impl (is this .NET 9 only?)

I forgot signalr had an implementation. That'd be great. The plan here, and the draft PR, creates a nupkg which includes downlevel assets.

davidfowl commented 6 months ago

Is that the same as SseItem? I could rename it that if we like it better.

I mean something that implemented the ASP.NET Core IResult, similar to @dlyz implemented https://gist.github.com/dlyz/1c2f892e482f599093bdb9021e20c26f#file-04_sseresult-cs-L12.

I was thinking we'd want to support IAsyncEnunerable<SseItem> being returned.

Yes, we would want to support this natively as well. One point of ambiguity is if the T should be JSON serialized in the ASP.NET Core case. We need to do something with it. If you don't want this behavior, is there a way to opt out?

BrennanConroy commented 6 months ago

Looks like this should be fairly easy to adopt in client-side SignalR, our parser delegate would just be data.ToArray() since we need to pass on a ReadOnlyMemory to another layer for actual parsing.

I do find it very odd that AI folks are investing in SSE though. WebSockets and Http response streaming are great for streaming data. They also both allow binary data, whereas SSE is text only.

terrajobst commented 5 months ago

Video

namespace System.Net.ServerSentEvents;

public readonly struct SseItem<T>
{
    public SseItem(T data, string eventType);
    public T Data { get; }
    public string EventType { get; }
}

public delegate T SseItemParser<out T>(string eventType, ReadOnlySpan<byte> data);

public static class SseParser
{
    public const string EventTypeDefault = "message";

    public static SseParser<string> Create(Stream sseStream);
    public static SseParser<T> Create<T>(Stream sseStream, SseItemParser<T> itemParser);
}

public sealed class SseParser<T>
{
    public IEnumerable<SseItem<T>> Enumerate();
    public IAsyncEnumerable<SseItem<T>> EnumerateAsync();

    public string LastEventId { get; }
    public TimeSpan ReconnectionInterval { get; }
}
davidfowl commented 5 months ago

@captainsafia @BrennanConroy Can we make sure to add follow up issues on ASP.NET Core for SignalR and minimal APIs?

davidfowl commented 5 months ago

I do find it very odd that AI folks are investing in SSE though. WebSockets and Http response streaming are great for streaming data. They also both allow binary data, whereas SSE is text only.

I agree!