denosaurs / event

📆 Strictly typed event emitter with asynciterator support
https://deno.land/x/event
MIT License
42 stars 5 forks source link

Question on Internals #8

Closed ben-laird closed 4 days ago

ben-laird commented 1 week ago

Hello!

I found your repo on JSR and I'm very impressed with it, especially with your use of Transform Streams. I started investigating using Transform Streams in my own work and I can't seem to get it working. Lmk if this issue should be moved somewhere else, I figured this was the best way to contact you.

Consider this test case ```ts import { assertEquals } from "jsr:@std/assert"; class DuplexStream extends TransformStream {} Deno.test("Streams", async (t) => { const stream = new DuplexStream(); const expected = [ "Hello there", "Here's some string data", "All done", ]; await t.step("Writing", async () => { const w = stream.writable.getWriter(); for await (const element of expected) { await w.ready; // Error: Promise resolution is still pending but the event loop has already resolved. await w.write(element); } await w.ready; w.releaseLock(); }); await t.step("Reading", async () => { const r = stream.readable.values(); const actual = await Array.fromAsync(r); assertEquals(actual, expected); }); }); ```

When running deno test on the test case above, it responds with an error as soon as it hits await w.write(element);:

error: Promise resolution is still pending but the event loop has already resolved.

Refactoring to use a WritableStream only (test 2 below) or to pipe a ReadableStream through (test 3 below) solves the error, but these approaches are undesirable. What am I doing wrong here? I've tried looking for guides on Transform Streams and consulting MDN's docs on them, and nothing has been helpful. I'm curious to see how you avoided the same errors in your own repo, because it seems inevitable given you use only a TransformStream and don't use it as a pipe for any Readable Stream.

Test 2 ```ts import { assertEquals } from "jsr:@std/assert"; Deno.test("Streams 2", async (t) => { const collected: string[] = []; const stream = new WritableStream({ write(ch) { collected.push(ch); }, }); const expected = [ "Hello there", "Here's some string data", "All done", ]; await t.step("Writing", async () => { const w = stream.getWriter(); for await (const element of expected) { await w.ready; await w.write(element); } await w.ready; w.releaseLock(); }); await t.step("Reading", () => { assertEquals(collected, expected); }); }); ```
Test 3 ```ts import { assertEquals } from "jsr:@std/assert"; class DuplexStream extends TransformStream {} Deno.test("Streams 3", async (t) => { const stream = new DuplexStream(); const expected = [ "Hello there", "Here's some string data", "All done", ]; await t.step("Writing & Reading", async () => { const r = ReadableStream.from(expected).pipeThrough(stream).values(); const actual = await Array.fromAsync(r); assertEquals(actual, expected); }); }); ```

Thanks so much for your time and for making such a high-quality library. I've starred this repo and hope to contribute to it when I can!

crowlKats commented 1 week ago

Hey, so the issue is that you are writing contents to the stream, but not reading anything. basically, TransformStream doesnt buffer elements up, so writing and reading need to happen "simultaneously". the easiest way to achieve this in your first test is to not await the "Writing" step, but assigning it to a variable, and awaiting it after the "Reading" step. I know this might be a bit awkward and not ideal, but thats the only solution.

however, if you do want to buffer n-amount of elements, i believe you can try around with https://developer.mozilla.org/en-US/docs/Web/API/TransformStream/TransformStream#highwatermark (though not entirely sure, i rarely use this option and do not remember its inner workings from the top of my head)

ben-laird commented 4 days ago

Hey! Sorry for the late reply. Your solution worked perfectly; using the third argument to TransformStream to provide a queuing strategy to the underlying ReadableStream was the right call. The promises I was awaiting were queued up for the TransformStream to process, but they were never settled, hence the error I got. It's also why your use of TransformStream works: there's a reader attached to the ReadableStream side immediately, negating the need for a queue.

Thanks so much again for your help, and I hope to offer my own help on this repo if you'd like and when I can. I'll close this as completed.