ballerina-platform / ballerina-library

The Ballerina Library
https://ballerina.io/learn/api-docs/ballerina/
Apache License 2.0
138 stars 56 forks source link

Proposal: HTTP Server Sent Events Support #6687

Open MohamedSabthar opened 4 days ago

MohamedSabthar commented 4 days ago

Summary

The Ballerina http package offers service and client interfaces to produce and consume HTTP services. However, neither the service nor the client currently provides an interface to work with Server-Sent Events (SSE). This proposal aims to add SSE support to the http package.

Goals

Motivation

Implementing SSE support in the Ballerina http package would significantly enhance its capabilities, offering benefits such as real-time data streaming and reliable event delivery. In the current trend, many foundation model providers exclusively use SSEs for their streaming APIs, making this an essential feature to support.

Description

Server-Sent Events (SSE) is a standardized technology for pushing real-time updates from a server to a client (unidirectional). At a high level, SSE works by keeping an open connection between the server and the client, allowing the server to send updates whenever they are available. The connection is kept alive using ‘Connection: keep-alive’ headers to ensure it stays open. If the connection drops, the client automatically tries to reconnect and resumes receiving events from the point of interruption. This makes SSE a reliable choice for applications needing live updates. The following sections describe the API changes to the existing http:Client and http:Service to incorporate SSE, along with high-level implementation details.

The SseEvent Type

This type will be introduced in the http package. Which represents a Server Sent Event format.

# Represents a Server Sent Event emitted from a service.
public type SseEvent record {|
    # Name of the event
    string event?; 
    # Id of the event
    string id?;
    # Data part of the event
    anydata data; 
    # The reconnect time on failure in milliseconds.
    int 'retry?;
    # Comment added to the event
    string comment?;
|};

API Changes related to http:Service

Changes in the Resource Function Signature

The stream<http:SseEvent, error?> type will be allowed as the return type in get resource methods of http:Service.

Users can write similar code as follows to send SSE:

public type FooEvent record {|
    *http:SseEvent;
     json data; 
|};

service / on new http:Listener(9090) {
 resource function get 'stream() returns stream<http:SseEvent, error?> {
        stream<FooEvent, error?> eventStream = new (new EventStreamGenerator());
        return eventStream;
    }
}

Changes in the http:Caller and http:ResponseMessage

The http:ResponseMessage union type will be modified to accommodate stream<http:SseEvent, error?>, which changes the http:Caller respond method signature.

# The types of messages that are accepted by HTTP `listener` when sending out the outbound response.
public type ResponseMessage anydata|Response|mime:Entity[]|stream<byte[], io:Error?>|stream<http:SseEvent, error?>;

remote isolated function respond(ResponseMessage|StatusCodeResponse|error message = ()) returns ListenerError? { }

With this proposed change, users will be able to write similar code as follows:

service / on new http:Listener(9090) {
 resource function get 'stream(http:Caller caller) returns error? {
        stream<FooEvent, error?> eventStream = new (new EventStreamGenerator());
        check caller->respond(eventStream);
    }
}

Processing Streams and Sending Events

The following pseudocode describes how to process the stream and send events over the wire when a stream is returned/responded from the resource methods.

  1. Obtain the stream from the resource method.
  2. Call the next() method of the stream and obtain the result:
  3. If the result is of type http:SseEvent:
    1. Convert each field value to a string.
    2. Prepare a string payload by combining the fields and values according to the event-format.
    3. Set the following headers
      • Content-Type: 'text/event-stream'
      • Cache-Control: 'no-cache'
      • Connection: 'keep-alive'
    4. Write the payload to the wire along with the headers.
    5. Move to step 2
  4. If the result is of type nil:
    1. Close the stream.
    2. Close the connection.
  5. If the result is of type error:
    1. Close the stream.
    2. Close the connection.

API Changes in http:Client

Changes to the get Resource Function Signature

The get resource function signatures will be modified to support stream<http:SseEvent, error?> as it’s return type.

isolated resource function get [PathParamType ...path](map<string|string[]>? headers = (), typedesc<Response|anydata|stream<http:SseEvent, error?>> targetType = <>, *QueryParams params) returns targetType|ClientError = @java:Method { }

remote isolated function get(string path, map<string|string[]>? headers = (), typedesc<Response|anydata|stream<http:SseEvent, error?>> targetType = <>) returns targetType|ClientError = @java:Method { }

With the above changes, users will be able to consume the Server Sent Events as a Ballerina stream. The following is an example of user written code.

public function main() returns error? {
    http:Client clientEp = check new ("http://localhost:8080");
    stream<http:SseEvent, error?> events = check clientEp->/events;

    // ... consume stream
    check from http:SseEvent event in events 
    do {
        string? eventName = event?.event;
        if eventName is string && eventName == "ping" {
            // … process ping event
        }
        // … process other events
        if eventName is string && eventName == "end" {
           // close the stream on agreed event type, in this case event type name is `end`
           check events.close();
        }
    };
}

http:Client Behavior on Consuming SSE

This section describes the client behaviors when consuming an event stream with the proposed API:

  1. If the underlying Netty client receives a response without headers containing the following key-value pairs: Content-Type: 'text/event-stream' Connection: 'keep-alive' Then, an http:ClientError is returned at the API level instead of returning a stream.

  2. If the underlying Netty client receives an HTTP 204 response from the server during event transmission, a nil value is returned from the stream, causing the stream to end. Further reconnection attempts to the server will not be made.

  3. If the underlying Netty client encounters network failures or receives HTTP 500 responses, it attempts to reconnect to the server based on values (retry and id fields) consumed from the previous SseEvent. Upon reconnecting, the client automatically sets the Last-Event-ID header if an event ID was previously received from the server. At the API level, no errors are thrown if the client successfully reconnects to the server; the stream continues without errors. Otherwise, an error is returned from the stream, ending the stream and closing the connection.

  4. The http:Client will attempt to reconnect to the server based on the retry configuration provided via the existing client configuration http:RetryConfig. If the number of retry attempts (RetryConfig.count) is exhausted, the stream ends with an error. If the interval value of http:RetryConfig is set to -1, then the retry interval value received from the previous SseEvent will be used when reconnecting; otherwise, the interval configuration provided via RetryConfig is used.

  5. If the stream.close() function is manually called by the user, the underlying connection will be closed.

Consuming Events and Producing Streams

This section explains high-level implementation details on how http:Client produces streams from consumed events.

For each HTTP connection related to Server Sent Events created by the http:Client, a stream generator object may need to be created and maintained internally. This object should contain a blocking queue to store events and maintain a reference to the underlying connection. The object's next function will consume events from the blocking queue, while the close method closes the underlying HTTP connection. Furthermore, this object will track the id and retry values obtained from the latest SseEvent, which are referenced during client reconnection in case of failure.

References

TharmiganK commented 4 days ago

@MohamedSabthar I have the following concerns,

  1. IIUC there will be only one response from the server(one status code and headers are written only once), and the data will be pushed time-to-time. If that is the case, it is not possible to change the status code based on the stream.next() return type. I believe when there is an error or if we reach the end of stream, we should close the connection. We need to think how the client should act upon such scenarios since the client does not know whether there is an error or that is the end of events unless the server sent some specific event type to notify the client. Can we check on that? Check whether this changes the client side consuming part also

  2. We allow setting different payload types to the response object. In that case, are we going to allow creating an http:Response, set the event stream and return? I believe the SSE supported response should have text/event-stream content type and content type directives should be ignored (the charset only supports the UTF-8).

MohamedSabthar commented 3 days ago

@MohamedSabthar I have the following concerns,

  1. IIUC there will be only one response from the server(one status code and headers are written only once), and the data will be pushed time-to-time. If that is the case, it is not possible to change the status code based on the stream.next() return type. I believe when there is an error or if we reach the end of stream, we should close the connection. We need to think how the client should act upon such scenarios since the client does not know whether there is an error or that is the end of events unless the server sent some specific event type to notify the client. Can we check on that? Check whether this changes the client side consuming part also
  2. We allow setting different payload types to the response object. In that case, are we going to allow creating an http:Response, set the event stream and return? I believe the SSE supported response should have text/event-stream content type and content type directives should be ignored (the charset only supports the UTF-8).

@TharmiganK Yes, you're correct – we can't send the status code twice, I've updated the proposal by removing them. Ideally, the server should specify an event type indicating event completion. Upon receiving such an event type, the client should close the stream, preventing unnecessary reconnection attempts.

And for the second comment, yes, we can introduce a getter and setter in the http.Response object to get and set the event streams. I'll add a separate section in the proposal for that.

xlight05 commented 2 days ago

Is it normal to support only SSE in GET requests? Seems like libraries like fastapi supports this in POST eps as well.

MohamedSabthar commented 3 hours ago

Is it normal to support only SSE in GET requests? Seems like libraries like fastapi supports this in POST eps as well.

SSE are traditionally initiated with GET requests, and the Browser EventSource API doesn't seem to allow POST requests. While the standard doesn't explicitly say POST cannot be used, it also doesn't specify any details on how to handle reconnection attempts with POST requests.

Ideally, POST requests are intended for one-time data submissions, and resending the same POST data on every reconnection would be inefficient. However, I can see a few use cases where SSE could be used with POST:

We could enable POST requests by mirroring the API changes mentioned in the proposal for GET, but without reconnection attempts. @shafreenAnfar, @TharmiganK what are your thoughts on this?

TharmiganK commented 1 hour ago

I thought that these servers sends a redirect response for the POST / PUT requests. But it seems they are directly returning the events as a success response. In that case, +1 to support this for all the resources. If we do so, the retry attempts will do the same request again which is the decision from the user's side, they can even disable the retry attempt by not providing any values for retry.

Lets say we do not have retry, and then if the server close connection/ if there are any network failures, there will be a client error when we consume the stream. In the case of OpenAPI and anthropic, they sent a specific event to close the connection. So the we can manually call stream.close() based on the event type to close the connection. Anyway we need to have SSE connectors for OpenAI and anthropic, which should handle this stream consumption behaviours.