Open dodiameer opened 2 years ago
Don't have time to write the docs myself, at least right now — but here are a few things to know about readable streams and multipart/form-data:
ReadableStream
interface doesn't provide a read()
method directly; instead, you call getReader()
to get a Reader interface that has a read()
method.read()
method. The default mode is easier to handle, but the BYOB mode is better for saving memory in some situations. If you already have a Buffer object that you want to put the data into, use BYOB mode, otherwise just use default mode.read()
, you get a "chunk". Chunks are not guaranteed to end at a convenient place. They could end right in the middle of a UTF-8 byte sequence, e.g. the last byte of one chunk could be 0xE2, the next chunk could be a single byte containing 0x80, and the last chunk could start with 0xA2. If you assemble those properly, you'll be able to reassemble the 0xE2 0x80 0xA2 sequence, which is UTF-8 for the • character. Do it wrong and you could end up with two or three � characters instead.0x13
0x10
0x2D
0x2D
(boundary) (zero or more 0x20
) 0x13
0x10
, or else 0x13
0x10
0x2D
0x2D
(boundary) 0x2D
0x2D
(zero or more 0x20
) 0x13
0x10
(assuming your incoming data is in UTF-8). Remember that chunks can be split anywhere, so don't assume that the boundary sequence will be found in a single chunk. It could be split across multiple chunks, even.Whoever produces sample code for this will need to make sure the sample code doesn't fall into the most common pitfall of stream-handling, which is assuming that chunk boundaries will be nice. A chunk boundary that splits a UTF-8 sequence or a multipart/form-data boundary marker is going to cause issues to naive code, and the examples should probably show how to do it right.
Having said that, here's a general guideline to handling large file uploads in a form with other data:
I'd love to use an existing library for this rather than rolling our own multipart form data parser, even if we abstract it somehow. Unfortunately the ones that I know of (busboy, formidable) are wedded to Node AFAICT.
Anyone know of something more modern, that can take a Response
object (or a Headers
and a ReadableStream
, I guess) and do something useful with it?
Streaming A Large Dataset With A Pipe To Response Writable would be handy. So documentation with an example of a large file being read by an endpoint, converted to a readable stream, and then being fetched by a page is a common use case, avoiding filling up memory. I tried some usual code that would work in Express or Hyper-Express with piping a readableStream, but I get a dest.on is not a function type error.
TypeError: dest.on is not a function
at Stream.pipe (node:internal/streams/legacy:33:8)
Something like how Express or Hyper-express gives examples for piping would be great. https://github.com/kartikk221/hyper-express/blob/master/docs/Examples.md
Makes it easy to pipe to a stream to another stream etc. etc.
readStream.pipe(transformStream).pipe(response).on('finish', () => {
console.log(`Finished transforming the contents of ${filePath} and piping the output to the response.`);
});
This seems like a common use case.
I hope this is relevant here.
I am importing/using an npm package on a server endpoint and getting the errors
response.body.getReader is not a function
.
From the conversations I see that this might be related here - how would I with the new feature enable the package to use ReadableStream?
I've been testing Server Sent Events (SSEs) with Kit recently. It's almost working properly, but the non-streamable Response
objects obviously prevent the connections from staying open.
In a comment on a related issue, Rich said SSEs are a special case of ReadableStreams
. Now that ReadableStreams
appears to be implemented, how can we use this to make SSE connections stay open?
Ideally, we would be able to have access to all the res
and req
methods so that we can properly detect and respond to stream states, too, but simply keeping the connection open would be a great leap forward.
Thank you @Rich-Harris and the rest of the team for all the work you've done on this project.
@JuliaBonita
For SSE
it is enough to return ReadableStream
as response body. I can give a more complex example based on TransformStream
, which subscribes to EventEmitter
and unsubscribes when connection is closed.
+server.js
import { createSSE } from './sse';
import { bus } from './bus';
/** @type {import('./$types').RequestHandler} */
export async function GET({ request }) {
// does not have to be a number, this is just an example
const last_event_id = Number(request.headers.get('last-event-id')) || 0;
const { readable, subscribe } = createSSE(last_event_id);
subscribe(bus, 'time');
subscribe(bus, 'date');
return new Response(readable, {
headers: {
'cache-control': 'no-cache',
'content-type': 'text/event-stream',
}
});
}
sse.js
export function createSSE(last_id = 0, retry = 0) {
let id = last_id;
const { readable, writable } = new TransformStream({
start(controller) {
controller.enqueue(': hello\n\n');
if (retry > 0) controller.enqueue(`retry: ${retry}\n\n`);
},
transform({ event, data }, controller) {
let msg = `id: ${++id}\n`;
if (event) msg += `event: ${event}\n`;
if (typeof data === 'string') {
msg += 'data: ' + data.trim().replace(/\n+/gm, '\ndata: ') + '\n';
} else {
msg += `data: ${JSON.stringify(data)}\n`;
}
controller.enqueue(msg + '\n');
}
});
const writer = writable.getWriter();
return {
readable,
/**
* @param {import('node:events').EventEmitter} eventEmitter
* @param {string} event
*/
async subscribe(eventEmitter, event) {
function listener(/** @type {any} */ data) {
writer.write({ event, data });
}
eventEmitter.on(event, listener);
await writer.closed.catch(() => { });
eventEmitter.off(event, listener);
}
};
}
bus.js
import { EventEmitter } from 'node:events';
export const bus = new EventEmitter();
setInterval(() => {
bus.emit('time', new Date().toLocaleTimeString());
}, 2e3);
setInterval(() => {
bus.emit('date', new Date().toLocaleDateString());
}, 5e3);
@repsac-by Thank you very much for your help. I appreciate this and will spend more time learning the Streams API if necessary, but it's a totally different API and much more complex than the simple SSE API. The simplicity of the SSE API for simple server-push applications is the most significant feature that differentiates it from WebSockets. If we have to use the more complicated Streams API in Kit, then it seems we might as well use WebSockets and assume our apps will use bi-directional channels even if we only need data pushed from the server to the client.
My assumption (maybe incorrect) was that Kit would include a way to use the actual SSE API with the same simple functionality that we have with vanilla Node/Express. Your example is excellent for the Streams API, but it still requires a significant learning curve to achieve the equivalent functionality and reliability that is trivially easy to achieve with SSEs, which is sufficient for the vast majority of text-based server-push applications.
So before I go down the more complicated Streams API path, can you please confirm that there is no efficient way to implement SSE in Kit to keep connections open and manage the connections like we can easily do in Node/Express?
I just looked at the source code for the Streams API "Hello World" example. It requires 80 lines of code (including markup) just to display "Hello World" on the page. Swimming through all the Streams API docs is a deep rabbit hole. Clearly the Streams API is not ergonomic, which is why very few people actually understand or use it. It's also overkill for virtually all SSE text-based applications.
I bet over 95% of the Svelte community will never take the time to learn the Streams API well enough to use it effectively, which makes Kit much less likely to be used in large-scale apps. So I hope there's a way to use actual SSEs in Kit (including proper event handling on the server like with Express).
@JuliaBonita So, here is the ReadableStream for SSE in a few lines.
export async function GET() {
const readable = new ReadableStream({
start(ctr) {
this.interval = setInterval(() => ctr.enqueue('data: ping\n\n'), 3000);
},
cancel() {
clearInterval(this.interval);
}
});
return new Response(readable, {
headers: {
'content-type': 'text/event-stream',
}
});
}
@repsac-by Thank you. The second example helps to show that the Streams API is basically a transport mechanism to stream any data flow, including higher level protocols like SSEs. This should be enough for me to figure out how to apply it to my app.
In the future, I suspect Svelte devs will ask for a more simplified interface for streams and SSEs like Express provides, but for now, at least I can see a path forward. Thank you.
@repsac-by Thnx. Your "SSE in a few lines" comment helped me to push ticker data from server to client. More here: https://stackoverflow.com/questions/74330190/how-to-respond-with-a-stream-in-a-sveltekit-server-load-function/74336207#74336207
@voscausa you forgot something
/** @type {import('./$types').RequestHandler} */
export function GET({ request }) {
+ const ac = new AbortController();
console.log("GET api: yahoo-finance-ticker")
const stream = new ReadableStream({
start(controller) {
tickerListener.on("ticker", (ticker) => {
console.log(ticker.price);
controller.enqueue(String(ticker.price));
- })
+ }, { signal: ac.signal });
+ },
+ cancel() {
+ ac.abort();
},
})
return new Response(stream, {
headers: {
'content-type': 'text/event-stream',
}
});
}
OK. Thanks for the update. It solved the exeption when changing page route.
@repsac-by Now it's working fine. I also included abort in the +page.svelte route
But now I do not understand the { signal: ac.signal })
in the api below.
routes/yahoo/+page.svelte
<script>
import { onDestroy } from "svelte";
let result = "";
const ac = new AbortController();
const signal = ac.signal;
async function getStream() {
const pairs = ["BTC-USD", "EURUSD=X"].join(",");
const response = await fetch(`/api/yahoo-finance-ticker?pairs=${pairs}&logging=true`, {
signal,
});
const reader = response.body.pipeThrough(new TextDecoderStream()).getReader();
while (true) {
const { value, done } = await reader.read();
console.log("resp", done, value);
if (done) break;
result += `${value}<br>`;
}
}
getStream();
onDestroy(() => {
ac.abort();
console.log("ticker fetch **aborted");**
});
</script>
routes/api/yahoo-finance-ticker/+server.js
import YahooFinanceTicker from "yahoo-finance-ticker";
/** @type {import('./$types').RequestHandler} */
export function GET({ url }) {
const ac = new AbortController();
const ticker = new YahooFinanceTicker();
ticker.setLogging(Boolean(url.searchParams.get('logging') ?? 'false'));
const pairs = (url.searchParams.get('pairs') ?? '').split(",");
console.log("GET api: yahoo-finance-ticker")
const stream = new ReadableStream({
start(controller) {
(async () => {
const tickerListener = await ticker.subscribe(pairs);
tickerListener.on("ticker", (ticker) => {
console.log(ticker.price);
controller.enqueue(String(ticker.price));
}, { signal: ac.signal }); // ?? { signal: ac.signal } @repsac-by
})().catch(err => console.error(err));
},
cancel() {
console.log("cancel and abort");
ticker.unsubscribe();
ac.abort();
},
})
return new Response(stream, {
headers: {
'content-type': 'text/event-stream',
}
});
}
@voscausa you can read the MDN docs here on the AbortController interface to understand what the signal does. Passing the signal in to your fetch call essentially associates your call with your AbortController instance so that ac.abort()
will abort your fetch call.
Just found out about unstable_parsemultipartformdata
in the remix docs.
Implementation can be found here https://github.com/remix-run/remix/blob/main/packages/remix-server-runtime/formData.ts
Pretty sure that can be used in sveltekit aswell.
unstable_parsemultipartfoemdata looks like what was needed. I'll fiddle around and see if I can come up with something that works
+1 for this feature
Would be really great to have a small consensual example here while waiting for an official doc!
Here is how I get SSE working for me
routes/sse/+server.ts
function delay(ms: number): Promise<void> {
return new Promise((res) => setTimeout(res, ms));
}
export function GET() {
const encoder = new TextEncoder();
const readable = new ReadableStream({
async start(controller) {
for (let i = 0; i < 20; i++) {
controller.enqueue(encoder.encode('hello'));
await delay(1000)
}
controller.close()
}
});
return new Response(readable, {
headers: {
'content-type': 'text/event-stream',
}
});
}
routes/sse/+page.svelte
<script lang="ts">
import { onMount } from 'svelte';
const result: string[] = [];
async function subscribe() {
const response = await fetch('/sse');
const reader = response.body.pipeThrough(new TextDecoderStream()).getReader();
while (true) {
const { value, done } = await reader.read();
if (done) break;
result.push(value)
result = result
}
}
onMount(subscribe);
</script>
{#each result as str}
<p>{str}</p>
{/each}
any updates on this?
Example using EventSource in the client.
<!-- src/routes/sse/+server.ts -->
<script lang="ts">
import { onMount } from 'svelte';
let messages: string[] = [];
onMount(() => {
const eventSource = new EventSource('/sse');
eventSource.addEventListener('message', (event) => {
if (!event.data) return;
messages.push(event.data);
messages = messages;
});
eventSource.addEventListener('close', eventSource.close);
// TODO Cleanup event listeners
});
</script>
<ol>
{#each messages as message (message)}
<li>{message}</li>
{/each}
</ol>
When the server responds with a custom event 'close', the client closes the EventSource and does not reconnect.
// src/routes/sse/+server.ts
const duration = 5 * 1000; // 5 seconds
export const GET = () => {
const encoder = new TextEncoder();
const readable = new ReadableStream({
start: async (controller) => {
const startedAt = new Date();
while (true) {
const message = new Date().toISOString();
controller.enqueue(encoder.encode(`data: ${message}\n\n`));
const elapsedFor = Date.now() - startedAt.valueOf();
if (elapsedFor > duration) break;
await new Promise((resolve) => setTimeout(resolve, 1000));
}
controller.enqueue(encoder.encode('event: close\ndata:\n\n'));
controller.close();
}
});
return new Response(readable, { headers: { 'content-type': 'text/event-stream' } });
};
The server does not check for client disconnection in this example. Reference the following issue: https://github.com/sveltejs/kit/issues/11751
I need a way to detect when a client has disconnected, otherwise I'll be wasting I/O time on emitting data to a stream that nobody's ever going to read.
Describe the problem
As of #5291 SvelteKit now supports using
ReadableStream
to read the request body in endpoints and supports returning it as a response body, however there are currently no examples in the docs on how to do common things, for example reading files submitted from amultipart/form-data
as a stream and write them to disk, reading files from disk and returning them as a stream, etc.Describe the proposed solution
Adding examples for common use-cases of streaming in the docs
Alternatives considered
No response
Importance
nice to have
Additional Information
I would help with this myself, but I honestly couldn't figure it out, and I wouldn't trust the code I write to be up to the standard of the docs. I specifically wanted to handle super-large (over 1gb) file uploads that would be in a
multipart/form-data
request with other fields but couldn't figure it out.Also feel like I should mention this: in #3419, Rich mentions having utility functions for handling multipart/octet streams, so if the maintainers are planning on adding them then it would make sense to not add documentation until they are added, as they would simplify any code that could be written without them.