microsoft / typespec

https://typespec.io/
MIT License
4.1k stars 194 forks source link

Define streaming APIs(Server side events, log stream, etc.) #154

Open timotheeguerin opened 2 years ago

timotheeguerin commented 2 years ago

This proposal adds additional decorators and types to TypeSpec in order to support defining the notion of streams and the notion of events. These are useful for such protocols as JSONL over HTTP and Server-sent events.

Goals

  1. A focus on primitives that should be generally useful
  2. Defining SSE endpoints
  3. Defining JSONL streaming endpoints
  4. Support defining event-based interfaces

Non-goals

  1. Fully support AsyncAPI (good for the future... maybe)
  2. Describe AMQP/MQTT/etc. semantics in any detail
  3. Define RPC anything, so all examples so far only support the HTTP and Protobuf transports.

New Libraries

@typespec/stream

Namespace: TypeSpec.Stream

@streamOf(T)

Applied to a data type called the stream protocol type, identifies that type as representing a stream whose data is described by T. The stream protocol type describes the underlying data model of the stream. For example, in HTTP this is likely a model that the OpenAPI emitter will understand, e.g. one that returns a string. It may be empty, or a scalar, when there is no useful stream protocol type.

Stream<T>

A type representing a stream. This type is defined as:

@streamOf(T)
model Stream<T> { }

This type is handy for two purposes:

  1. When the underlying data type is not relevant, and
  2. To serve as the base type for custom streams, such as SSEStream described later.

@typespec/event

Namespace: TypeSpec.Event

@events

Applied to a union, defines a set of events. When the event union has named union variants, and the underlying event protocol supports the notion of event types or kinds out-of-band (i.e. not part of the event envelope or payload), then the variant names must identify the event type. Otherwise, the event type may be ignored.

@contentType(string)

When applied to a data type, the content type of the event envelope or the event body (which may be different). When applied to a model property, defines an envelope property that defines the content type of the event data. It is an error to apply @contentType to a member without having a corresponding @data decorator.

@data

Applied to a model property, identifies the payload of the event. Any models containing a member with an @data decorator are considered event envelopes. The @data property may be nested arbitrarily deep within the model.

@typespec/server-sent-events

Namespace: TypeSpec.ServerSentEvents

SSEStream<T>

Creates an SSE stream, which is itself an http stream.

model SSEStream<T> is HTTPStream<T, "text/event-stream">;

@terminalEvent

Because SSE can't close the connection from the service side, the service needs a way to communicate to clients to disconnect. This can be accomplished with a terminal event - upon receiving the terminal event, the client is expected to disconnect.

The terminal event MAY contain useful data, so clients should have the ability to recieve this additional data.

Updates to existing libraries

@typespec/http

HttpStream<T>

model HttpStream<T, ContentType extends valueof string> extends Stream<T> {
  @header contentType: typeof ContentType;
  @body body: TWireType;
}

JsonlStream<T>

model JsonlStream<T> is HttpStream<T, "application/jsonl">

This is the default stream representation for HTTP, so JsonlStream<T> is equivalent to Stream<T>.

@typespec/protobuf

These are proposed for the future, but will not be done as part of this change.

// Today
@stream(StreamMode.Out)
getSignIdsforKioskId(@field(1) kiosk_id: int32): GetSignIdResponse;

// Updated  
getSignIdsforKioskId(@field(1) kiosk_id: int32): Stream<GetSignIdResponse>;

// Today
@stream(StreamMode.In)
getSignIdsforManyKioskIds(@field(1) kiosk_id: int32): GetSignIdResponse;

// Updated 
getSignIdsforManyKioskIds(stream: Stream<{@field(1) kiosk_id: int32}>): GetSignIdResponse;

Examples

JSONL

import "@typespec/streams";
using Streams;

// implicitly uses JSONL
@post op createCompletionStream(): Stream<Events>;

// explicitly JSONL
model JSONLStream<T> is Stream<T> {
  @header contentType: "application/jsonl";
  @body body: string;
}    

@post op createCompletionStream(): JSONLStream<Events>;

SSE

import "@typespec/event";
import "@typespec/stream";
import "@typespec/sse";

@post op createCompletionStream(): SSEStream<Events>;

model Completion {
  text: string;
}

@Event.Events
union Events {
  completion: Completion;
  error: string;
} 

This defines two SSE event types, completion and error. Both completion and error are implicitly the JSON content type, so e.g. error will have quotes around it on the wire.

SSE (OpenAI Variant)

@post
op createCompletionStream(): SSEStream<CompletionResult>;

@Event.events
union CompletionResult{
  @Event.contentType("text/plain")
  @terminalEvent
  "[done]";

  Completion;
}

This defines two events which do not use an event type. One event is the literal string [done] which is marked as a terminal event (so the client should disconnect when this event is received), and the other is a JSON object.

SSE JS Async Iterator

import "@typespec/event";
import "@typespec/server-sent-events";

@post op createCompletionStream(): SSEStream<IteratorResult<string>>;

@Event.events
union IteratorResult<T> {
  @ServerSentEvents.terminalEvent
  { done: true };

  { done: false, @Events.data value: T };
} 

This has two events which don't have a type. They are discriminated by the done property. When done is true, the connection can be closed. When done is false, the event data is contained in the value. Emitters may therefore unpack the event data if it wishes.

markcowl commented 2 years ago

Notes from design meeting

timotheeguerin commented 2 years ago

Options

1. @stream decorator

// Helper model
model Stream<T> {
  @stream stream: T[]; // or not an array?
}

// Stream response string
op foo(): {@stream foo: string[]}
op foo(): {...Stream<string>}
// Stream input string
op foo(@stream foo: string[]): void
op foo(...Stream<string>): void

// Stream custom message
model MyMessage {id: string, value: string}
op foo(): {@stream foo: MyMessage[]}
op foo(): {...Stream<MyMessage>}
// Stream custom message
op foo(@stream foo: MyMessage[]): void
op foo(...Stream<MyMessage>): void

2. Stream intrinsic type

Similar to how you would want to return a primitive type as the body(string, number, etc.)

// Helper model
@intrinsic
model Stream<T> {
  stream: T;
}

// Stream response string
op foo(): Stream<string>

// Stream custom message
model MyMessage {id: string, value: string}
op foo(): Stream<MyMessage>
op foo(@body stream Stream<MyMessage>): void;

Doesn't make that much sense with websockets?

op foo(@body stream Stream<MyMessage>): Stream<MyMessage>;

Examples

Messaging

model Message {
  id: string;
  from: string;
  group: string;
  content: string;
}
// Option 1
op messages(@stream messages: Message[]):  {@stream messages: Message[]}
// Option 2
op messages(@body messages: Stream<Message>):  {@body messages: Stream<Message>}

Webpub sub

https://docs.microsoft.com/en-us/azure/azure-web-pubsub/reference-json-webpubsub-subprotocol

alias WebPubSubRequest = JoinGroupRequest | LeaveGroupRequest | SentToGroupRequest

model JoinGroupRequest {
  type: "joinGroup";
  group: string;
  ackId: int32;
}

model LeaveGroupRequest {
  type: "leaveGroup";
  group: string;
  ackId: int32;
}

model SentToGroupRequest {
    type: "sendToGroup";
    group: string;
    ackId : number;
    noEcho: boolean;
    dataType : "json" |"text"|"binary";
    data: string | {};
}

model WebPubSubResponse = AckResponse | MessageResponse;

model AckResponse {
  type:  "ack";
  ackId: number;
  success: boolean;
  error?: {
    name: "Forbidden"|"InternalServerError"|"Duplicate";
    message: string;
  }
}
model MessageResponse {
  type: "message";
  from: "group";
  group: string;
  dataType : "json" |"text"|"binary";
  data: string | {};
  fromUserId: string;
}

model ConnectedSystemResponse {
  type: "system";
  event: "connected";
  userId: string;
  connectionId: string;
}

model DisconnectedSystemResponse {
  type: "system";
  event: "disconnected";
  message: string;
}
// Option 1
op webpubsub(@stream messages: WebPubSubRequest[]):  {@stream messages: WebPubSubResponse[]}
// Option 2
op webpubsub(@body messages: Stream<WebPubSubRequest>): Stream<WebPubSubResponse>

Server side events

model ServerSideEvent {
  id: string;
  event: string;
  data: {};
}

// Option 1
op messages(): {
  @header("Cache-Control") cacheControl: "no-store";
  @header("Content-Type")  contentType: "text/event-stream";
  @stream events: ServerSideEvent[];
}

// Option 2
op messages(): {
  @header("Cache-Control") cacheControl: "no-store";
  @header("Content-Type")  contentType: "text/event-stream";
  @body events: Stream<ServerSideEvent>;
}
// Option 2 alt with named model
model MyType is Stream<ServerSideEvent>{
  @header("Cache-Control") cacheControl: "no-store";
  @header("Content-Type")  contentType: "text/event-stream";
}
op messages(): MyType 
timotheeguerin commented 2 years ago
timotheeguerin commented 2 years ago

Case 1 simple return string

op readLogs(@query fromBytes: int64): string;
// ------------ convert to stream -------------

op readLogs(@query fromBytes: int64): Stream<string>;

Case 2: Include header

op readLogs(@query fromBytes: int64): {@body body: string, @header contentType: "text/plain"};
// ------------ convert to stream -------------

op readLogs(@query fromBytes: int64): {@body stream: Stream<String>, @header contentType: "text/event-stream"};

Case 3: Metadata extract

op readLogs(@query fromBytes: int64): {data: string, lineCount: number, @header contentType: "application/json"};
// ------------ convert to stream -------------

// Option 1 wrap everything in stream type and have everything that should belong in the body be there.
op readLogs(@query fromBytes: int64): Stream<{data: string, lineCount: number, @header contentType: "application/json"}>;

// Option 2: Extract body manually
op readLogs(@query fromBytes: int64): {@body stream: Stream<{data: string, lineCount: number}>, @header contentType: "application/stream+json"};

// Option 3: @stream
op readLogs(@query fromBytes: int64): {@stream stream: {data: string, lineCount: number}, @header contentType: "application/stream+json"};

Case 4: Convert array to stream

op readItems(@query skipCount: number): Foo[];
// ------------ convert to stream -------------

// Now stream the foo items.
op readItems(@query skipCount: number): Stream<Foo>;

Case 5: convert input array to stream

op postItems(@body items: Foo[]): void;
// ------------ convert to stream -------------

// Now stream the foo items into the server.
op readItems(@body items: Stream<Foo>): void;
markcowl commented 2 years ago

Notes from Design Meeting 4/14

heaths commented 1 year ago

This came up in the API Stewardship Board today in a review, and doing some research on this I also found application/x-ndjson - newline-delimited JSON e.g.,

{"foo":"bar","baz":1}
{"foo":"qux","baz":2}

Seems it's gaining traction and certainly seems a lot more compact than SSE.

iscai-msft commented 1 year ago

Want to bump this thread too. I'm working with the batch team, and they have some operations that we generate as a streamed response (swagger ex here)

bterlson commented 4 months ago

I've updated the original post with the latest proposal. Changelog from internal review: