oven-sh / bun

Incredibly fast JavaScript runtime, bundler, test runner, and package manager – all in one
https://bun.sh
Other
73.3k stars 2.69k forks source link

Implement `stream.Writable.toWeb` #3927

Open vjpr opened 1 year ago

vjpr commented 1 year ago

What version of Bun is running?

0.7.2

What platform is your computer?

No response

What steps can reproduce the bug?

console.log('process.stdout', stream.Writable.toWeb(process.stdout))

TypeError: h().newWritableStreamFromStreamWritable is not a function. (In 'h().newWritableStreamFromStreamWritable(X)', 'h().newWritableStreamFromStreamWritable' is undefined)
      at node:stream:2:106081
      at /xxx/src/index.ts:172:32

What is the expected behavior?

No response

What do you see instead?

No response

Additional information

No response

paperdave commented 1 year ago

damn, i cant believe this is just {}

image

though for what youre doing you should use Bun.stdout.*, though i guess thats not a writablestream

varshneydevansh commented 1 year ago

If this is still active, can I look into this? May I know if there are any initial indicators that I should be aware of?

javalsai commented 9 months ago

If this is still active, can I look into this? May I know if there are any initial indicators that I should be aware of?

EDIT: Ignore all this and skip to my next reply. EDIT AGAIN: This does apply, I just didn't realize smth

I don't know if you are still interesting, but if someone really needs to get that function to work, this workaround worked for me:

import { type Writable } from 'node:stream'

function WriteStreamToWeb(streamWritable: Writable): WritableStream {
    return new WritableStream({
        start() { },
        write(chunk) {
            streamWritable.write(chunk)
        },
        close() {
            streamWritable.end()
        },
        abort() { },
    })
}

and ofc the JS counterpart

function WriteStreamToWeb(streamWritable) {
    return new WritableStream({
        start() { },
        write(chunk) {
            streamWritable.write(chunk)
        },
        close() {
            streamWritable.end()
        },
        abort() { },
    })
}
javalsai commented 9 months ago

If this is still active, can I look into this? May I know if there are any initial indicators that I should be aware of?

I don't know if you are still interesting, but if someone really needs to get that function to work, this workaround worked for me:

import { type Writable } from 'node:stream'

function WriteStreamToWeb(streamWritable: Writable): WritableStream {
    return new WritableStream({
        start() { },
        write(chunk) {
            streamWritable.write(chunk)
        },
        close() {
            streamWritable.end()
        },
        abort() { },
    })
}

and ofc the JS counterpart

function WriteStreamToWeb(streamWritable) {
    return new WritableStream({
        start() { },
        write(chunk) {
            streamWritable.write(chunk)
        },
        close() {
            streamWritable.end()
        },
        abort() { },
    })
}

Well, I just found out that Writable.toWeb might do this job, and there's also Writable.fromWeb and Readable equivalents, plus a generic .from function that can take transformers, passthroughs...

varshneydevansh commented 9 months ago

@javalsai so should I work on this?

javalsai commented 9 months ago

@javalsai so should I work on this?

I don't think it's necessary, Writable.toWeb seems to do the same as stream.Writable.toWeb.

You can take a look at why the native Writable differs from stream.Writable tho. It makes sense to me that stream would just use native Writable and Readable, specially if it's a custom implementation of node:stream for Bun; but that doesn't seem to be the case.

I don't know about Zig or how the Bun source code is structured, but if you decide to take a look and find something interesting feel free to share here!

javalsai commented 9 months ago

@javalsai so should I work on this?

I don't think it's necessary, Writable.toWeb seems to do the same as stream.Writable.toWeb.

You can take a look at why the native Writable differs from stream.Writable tho. It makes sense to me that stream would just use native Writable and Readable, specially if it's a custom implementation of node:stream for Bun; but that doesn't seem to be the case.

I don't know about Zig or how the Bun source code is structured, but if you decide to take a look and find something interesting feel free to share here!

Wait no, I imported Writable and Readable from node:stream (it was late when I was working on this and didn't realize it worked because the stream data was being cached by my function), so there's no native Writable and Readable

javalsai commented 9 months ago

Messing with the code I found that this file might be relevant: https://github.com/oven-sh/bun/blob/main/src/js/node/stream.js

Specially lines 3283, 3369, 4025 and 4017 (what is that function supposed to do?) (I think that what happens is that webStreamsAdapters is missing functions like newStreamWritableFromWritableStream and newWritableStreamFromStreamWritable (there's probably more))

javalsai commented 9 months ago

Ig somthing like this would fix Writable.toWeb on most cases:

diff --git a/src/js/node/stream.js b/src/js/node/stream.js
index 5297c033c..c2c0d0ddd 100644
--- a/src/js/node/stream.js
+++ b/src/js/node/stream.js
@@ -4013,16 +4013,29 @@ var require_writable = __commonJS({
     Writable.prototype[EE.captureRejectionSymbol] = function (err) {
       this.destroy(err);
     };
-    var webStreamsAdapters;
-    function lazyWebStreams() {
-      if (webStreamsAdapters === void 0) webStreamsAdapters = {};
-      return webStreamsAdapters;
-    }
+    var webStreamsAdapters = {
+        newStreamWritableFromWritableStream() {
+            // TODO:
+        },
+        newWritableStreamFromStreamWritable(streamWritable) {
+            // ! This is very likely incomplete
+            return new WritableStream({
+                start() { },
+                write(chunk) {
+                    streamWritable.write(chunk)
+                },
+                close() {
+                    streamWritable.end()
+                },
+                abort() { },
+            })
+        }
+    };
     Writable.fromWeb = function (writableStream, options) {
-      return lazyWebStreams().newStreamWritableFromWritableStream(writableStream, options);
+      return webStreamsAdapters.newStreamWritableFromWritableStream(writableStream, options);
     };
     Writable.toWeb = function (streamWritable) {
-      return lazyWebStreams().newWritableStreamFromStreamWritable(streamWritable);
+      return webStreamsAdapters.newWritableStreamFromStreamWritable(streamWritable);
     };
   },
 });

NOTE: I also found that Duplex has lazyWebStreams to an empty webStreamsAdapters, so I'm guessing it also misses functionality for fromWeb and toWeb

varshneydevansh commented 9 months ago

Okay I will look into this @javalsai and will create the PR

javalsai commented 9 months ago

Okay I will look into this @javalsai and will create the PR

Sure, good luck! And ask if you want help with anything, I'd love to see this fixed on next release. Try to mimic the checks that the reader webStreamsAdapters do, mainly to provide the same compatibility. I might take a look at Duplexafter too, as it seems to have the same issue.

varshneydevansh commented 9 months ago

I'll try my best thanks for helping this much =)

varshneydevansh commented 8 months ago

Hi @javalsai I completed the development setup and understood what needs to be done to an extent but I just couldn't understand how do I test it?

javalsai commented 8 months ago

Hi @javalsai I completed the development setup and understood what needs to be done to an extent but I just couldn't understand how do I test it?

I don't really know how bun structures everything, but my guess is that you'll just have to implement the functions Writable functions to behave like they do on node (just follow a similar logic to Readable which is implemented in bun) and then compile it and test some code that relies on it.

Maybe this will be useful? https://github.com/oven-sh/bun/blob/main/CONTRIBUTING.md

Also, check if there has been any change on this since then.

I'ma make a quick JS file that works on node and not on bun for you to test this.

javalsai commented 8 months ago

@varshneydevansh this should play with those adapter functions enough to test them (make sure to make /tmp/readfile or just change it to any existing file or device...)

const { createReadStream, createWriteStream } = require('node:fs')
const { Readable, Writable } = require('node:stream')

const readStream = createReadStream('/tmp/readfile')
const writeStream = createWriteStream('/tmp/writefile')

const readWebStream = Readable.toWeb(readStream)
const writeWebStream = Writable.toWeb(writeStream)

const readStreamAgain = Readable.fromWeb(readWebStream)
const writeStreamAgain = Writable.fromWeb(writeWebStream)

console.log(readStreamAgain, writeStreamAgain)

Running that with node will adapt the stream 2 times in both directions and print the streams (will appear as object data on console) Running with bun will fail on calling the Writable toWeb/fromWeb functions (Readable should work as they are implemented), the error should be something like TypeError: undefined is not a function

paperdave commented 8 months ago

Maybe this will be useful? sorry this file is really out of date

follow this to setup a devenv: https://bun.sh/docs/project/contributing

the code you need to edit is src/js/stream.js. this code is bundled version of the readable-stream package that has a ton of random edits to have some hooks into native code.

Ig somthing like this would fix...

Yeah that looks about right. we can also inline these function calls into the actual toWeb function insetad of having it be a function that calls another one.

Also, we should handle abort, right? does node do that? i assume it's something like doing stream.emit("error", ...)

varshneydevansh commented 8 months ago

Thank you for the help Dave and Sai.

image

I was reading this earlier -

https://nodejs.org/dist/latest-v17.x/docs/api/stream.html#class-streamwritable

javalsai commented 8 months ago

Also, we should handle abort, right? does node do that? i assume it's something like doing stream.emit("error", ...)

Sure, that's was just a hotfix that I'm using temporarily, might be useful to get the basic structure of it. But I think it should be implemented in a format similar to the Readable counterpart (source of fromWeb and toWeb).

Also, don't forget that Duplex has those functions empty just like Readable, but maybe that can be done by just separating it into readable and writable and adapting each. I think web duplex equivalent is just a object with a readable and writable property (https://developer.mozilla.org/en-US/docs/Web/API/TransformStream#instance_properties, I think I got it working in bun by putting it as a object tho).

In fact, I ended up with these utils working:

export function writable_ToWeb(streamWritable: Writable): WritableStream {
    return new WritableStream({
        start() { },
        write(chunk) {
            streamWritable.write(chunk)
        },
        close() {
            streamWritable.end()
        },
        abort() { },
    })
}

export function transformerToWeb(transformer: Transform) {
    return {
        readable: Readable.toWeb(transformer) as ReadableStream,
        writable: writable_ToWeb(transformer),
    }
}

Ik these are Transformers, not Duplexes, but they seem to be the same concept

varshneydevansh commented 8 months ago

This is the Duplex one where the same change is needed? -

https://github.com/oven-sh/bun/blob/353f724a9cd12d1749e974e92bbc47d9138a6601/src/js/node/stream.js#L4442C1-L4452C7

    var webStreamsAdapters;
    function lazyWebStreams() {
      if (webStreamsAdapters === void 0) webStreamsAdapters = {};
      return webStreamsAdapters;
    }
    Duplex.fromWeb = function (pair, options) {
      return lazyWebStreams().newStreamDuplexFromReadableWritablePair(pair, options);
    };
    Duplex.toWeb = function (duplex) {
      return lazyWebStreams().newReadableWritablePairFromDuplex(duplex);
    };

and this is the place of the Readable one? -

https://github.com/oven-sh/bun/blob/353f724a9cd12d1749e974e92bbc47d9138a6601/src/js/node/stream.js#L3364C1-L3369C7

    Readable.fromWeb = function (readableStream, options) {
      return webStreamsAdapters.newStreamReadableFromReadableStream(readableStream, options);
    };
    Readable.toWeb = function (streamReadable, options) {
      return webStreamsAdapters.newReadableStreamFromStreamReadable(streamReadable, options);
    };

This is what I have done for the Writable -


    Writable.fromWeb = function (writableStream, options) {
      return new WritableStream({
        start() { },
        write(chunk) {
             writableStream.write(chunk)
        },
        close() {
           writableStream.end()
        },
        abort(reason) {
          const err = new Error(`Abort: ${reason}`);
          writableStream.destroy(err);
        },
      });
    };
    Writable.toWeb = function (streamWritable) {
      return new WritableStream({
        start() { },
        write(chunk) {
          try {
            streamWritable.write(chunk);
          } catch (e) {
            this.controller.error(e);
          }
        },
        close() {
          streamWritable.end();
        },
        abort(reason) {
          streamWritable.destroy(new Error(`Abort: ${reason}`));
        },
      });
    }; 
javalsai commented 8 months ago

@varshneydevansh don't worry about duplexes if you're not sure on how to do it, they're not close to being as important as the writable functions.

Also, the Writable.toWeb looks good to me for it to work at least (It'll prob need some future work for options or smth), but the Writable.fromWeb takes a web stream and returns a Writable instance, exactly the opposite function. Let me take a look at it bcs I think I made it also work, but deleted it soon after due to being dead code...

Two last things:

Nice work btw!

javalsai commented 8 months ago

Or maybe just omit fromWeb for now, it seems a little bit overcomplicated:

guest271314 commented 7 months ago

Bun.stdout should have a writable property that us a WHATWG WritableStream. That's what Deno does and that's what Duplex.toWeb() achieves.

We should be able to skip the Node.js middle part altogether for Bun.stdout.

//  stdout_stream.js
import { Duplex } from "node:stream";
import { exit, stdin, stdout } from "node:process";

const runtime = navigator.userAgent;

const { readable } = /Node|Bun/.test(runtime)
  ? Duplex.toWeb(stdin)
  : Deno.stdin;
const { writable } = /Node|Bun/.test(runtime)
  ? Duplex.toWeb(stdout)
  : Deno.stdout;

const encoder = new TextEncoder();
const decoder = new TextDecoder();

function encodeMessage(message) {
  return encoder.encode(JSON.stringify(message));
}

async function* getMessage() {
  for await (let message of readable) {
    yield encoder.encode(decoder.decode(message).toUpperCase());
  }
}
async function sendMessage(message) {
  // Closing WritableStream causes host to exit
  await new Blob([
    message,
  ])
    .stream()
    .pipeTo(writable, { preventClose: true });
}

try {
  for await (const message of getMessage()) {
    await sendMessage(message);
  }
} catch (e) {
  exit();
}
guest271314 commented 7 months ago

The Node.js compatibility documentation page should probably be updated to include Duplex.toWeb() doesn't work.

varshneydevansh commented 7 months ago

The Node.js compatibility documentation page should probably be updated to include Duplex.toWeb() doesn't work.

This is what I try to do today -

Writable.fromWeb = function (writableStream, options = {}) {
  const { encoding = 'utf-8', highWaterMark = 16384 } = options;

  return new WritableStream({
    start(controller) {
      this.controller = controller;
    },
    write(chunk) {
      try {
        writableStream.write(chunk, encoding);
      } catch (error) {
        this.controller.error(error);
      }

      // Backpressure handling:
      if (writableStream.writable && writableStream.writableEnded) {
        this.controller.close();
      } else if (writableStream.writableHighWaterMark !== undefined &&
                 writableStream.bufferedAmount >= writableStream.writableHighWaterMark) {
        this.controller.desiredSize = 0;
      }
    },
    close() {
      writableStream.end();
    },
    abort(reason) {
      const err = new Error(`Abort: ${reason}`);
      writableStream.destroy(err);
    },
  });
};

Writable.toWeb = function (streamWritable, options = {}) {
  const { highWaterMark = 16384 } = options;

  return new WritableStream({
    start(controller) {
      this.controller = controller;
    },
    write(chunk) {
      try {
        streamWritable.write(chunk);
      } catch (error) {
        this.controller.error(error);
      }

      // Backpressure handling:
      if (streamWritable.bufferedAmount >= highWaterMark) {
        this.controller.desiredSize = 0;
      }
    },
    close() {
      streamWritable.end();
    },
    abort(reason) {
      const err = new Error(`Abort: ${reason}`);
      streamWritable.destroy(err);
    },
  });
};

Is this better than what I earlier did?

guest271314 commented 7 months ago

This is what I wound up doing for now. The rest of the code is runtime agnostic and runs using node, deno, and bun

const runtime = navigator.userAgent;

let readable, writable, exit;

if (runtime.startsWith("Deno")) {
  ({ readable } = Deno.stdin);
  ({ writable } = Deno.stdout);
  exit = Deno.exit;
}

if (runtime.startsWith("Node")) {
   const { stdin, stdout, exit:_exit } = await import("node:process");
   const { Duplex } = await import("node:stream");
   ({ readable } = Duplex.toWeb(stdin));
   ({ writable } = Duplex.toWeb(stdout));
   exit = process.exit;
}

if (runtime.startsWith("Bun")) {
   readable = Bun.stdin.stream();
   writable = new WritableStream({
     async write(value) {
       await Bun.write(Bun.stdout, value);
     }
   }, new CountQueuingStrategy({ highWaterMark: Infinity }));
   exit = process.exit;
}
javalsai commented 7 months ago

This is what I wound up doing for now. The rest of the code is runtime agnostic and runs using node, deno, and bun

const runtime = navigator.userAgent;

let readable, writable, exit;

if (runtime.startsWith("Deno")) {
  ({ readable } = Deno.stdin);
  ({ writable } = Deno.stdout);
  exit = Deno.exit;
}

if (runtime.startsWith("Node")) {
   const { stdin, stdout, exit:_exit } = await import("node:process");
   const { Duplex } = await import("node:stream");
   ({ readable } = Duplex.toWeb(stdin));
   ({ writable } = Duplex.toWeb(stdout));
   exit = process.exit;
}

if (runtime.startsWith("Bun")) {
   readable = Bun.stdin.stream();
   writable = new WritableStream({
     async write(value) {
       await Bun.write(Bun.stdout, value);
     }
   }, new CountQueuingStrategy({ highWaterMark: Infinity }));
   exit = process.exit;
}

He was working on some changes on Bun's node:streams implementation, not some program that uses it. I'm not sure how the official functions should behave, so I didn't answer him, but they don't look bad to me.

varshneydevansh commented 7 months ago

Is this reasonable? https://github.com/varshneydevansh/bun/commit/acf43f6d8ff0c265114a71c320b46e3a930a9115


    /**
     * @param {WritableStream} writableStream
     * @param {{
     *   decodeStrings? : boolean,
     *   highWaterMark? : number,
     *   objectMode? : boolean,
     *   signal? : AbortSignal,
     * }} [options]
     * @returns {Writable}
     */
    function newStreamWritableFromWritableStream(writableStream, options = {}) {
      if (!isWritableStream(writableStream)) {
        throw new ERR_INVALID_ARG_TYPE("writableStream", "WritableStream", writableStream);
      }

      validateObject(options, "options");
      const { highWaterMark, decodeStrings = true, objectMode = flase, signal } = options;

      validateBoolean(objectMode, "options.objectMode");
      validateBoolean(decodeStrings, "options.decodeStrings");

      const writer = writableStream.getWriter();
      let closed = false;

      const writable = new Writable({
        highWaterMark,
        objectMode,
        decodeStrings,
        signal,

        writev(chunks, callback) {
          function done(error) {
            error = error.filter(e => e);
            try {
              callback(error.length === 0 ? undefined : error);
            } catch (error) {
              // In a next tick because this is happening within
              // a promise context, and if there are any errors
              // thrown we don't want those to cause an unhandled
              // rejection. Let's just escape the promise and
              // handle it separately.
              process.nextTick(() => destroy(writable, error));
            }
          }
          // Wrapping on a new Promise is necessary to not expose the SafePromise
          // prototype to user-land.
          primordials.SafePromiseAll = (promises, mapFn) =>
            new Promise((a, b) => SafePromise.all(arrayToSafePromiseIterable(promises, mapFn)).then(a, b));

          PromisePrototypeThen(
            writer.ready,
            () => {
              return PromisePrototypeThen(
                SafePromiseAll(chunks, data => writer.write(data.chunk)),
                done,
                done,
              );
            },
            done,
          );
        },

        write(chunk, encoding, callback) {
          if (typeof chunk === "string" && decodeStrings && !objectMode) {
            const enc = normalizeEncoding(encoding);

            if (enc === "utf8") {
              chunk = encoder.encode(chunk);
            } else {
              chunk = Buffer.from(chunk, encoding);
              chunk = new Uint8Array(
                TypedArrayPrototypeGetBuffer(chunk),
                TypedArrayPrototypeGetByteOffset(chunk),
                TypedArrayPrototypeGetByteLength(chunk),
              );
            }
          }

          function done(error) {
            try {
              callback(error);
            } catch (error) {
              destroy(writable, error);
            }
          }

          PromisePrototypeThen(
            writer.ready,
            () => {
              return PromisePrototypeThen(writer.write(chunk), done, done);
            },
            done,
          );
        },

        destroy(error, callback) {
          function done() {
            try {
              callback(error);
            } catch (error) {
              // In a next tick because this is happening within
              // a promise context, and if there are any errors
              // thrown we don't want those to cause an unhandled
              // rejection. Let's just escape the promise and
              // handle it separately.
              process.nextTick(() => {
                throw error;
              });
            }
          }

          if (!closed) {
            if (error != null) {
              PromisePrototypeThen(writer.abort(error), done, done);
            } else {
              PromisePrototypeThen(writer.close(), done, done);
            }
            return;
          }

          done();
        },

        final(callback) {
          function done(error) {
            try {
              callback(error);
            } catch (error) {
              // In a next tick because this is happening within
              // a promise context, and if there are any errors
              // thrown we don't want those to cause an unhandled
              // rejection. Let's just escape the promise and
              // handle it separately.
              process.nextTick(() => destroy(writable, error));
            }
          }

          if (!closed) {
            PromisePrototypeThen(writer.close(), done, done);
          }
        },
      });

      PromisePrototypeThen(
        writer.closed,
        () => {
          // If the WritableStream closes before the stream.Writable has been
          // ended, we signal an error on the stream.Writable.
          closed = true;
          if (!isWritableEnded(writable)) destroy(writable, new ERR_STREAM_PREMATURE_CLOSE());
        },
        error => {
          // If the WritableStream errors before the stream.Writable has been
          // destroyed, signal an error on the stream.Writable.
          closed = true;
          destroy(writable, error);
        },
      );

      return writable;
    }

    /**
     * @typedef {import('../../stream').Writable} Writable
     * @typedef {import('../../stream').Readable} Readable
     * @typedef {import('./writablestream').WritableStream} WritableStream
     * @typedef {import('./readablestream').ReadableStream} ReadableStream
     */

    /**
     * @typedef {import('../abort_controller').AbortSignal} AbortSignal
     */

    /**
     * @param {Writable} streamWritable
     * @returns {WritableStream}
     */
    function newWritableStreamFromStreamWritable(streamWritable) {
      // Not using the internal/streams/utils isWritableNodeStream utility
      // here because it will return false if streamWritable is a Duplex
      // whose writable option is false. For a Duplex that is not writable,
      // we want it to pass this check but return a closed WritableStream.
      // We check if the given stream is a stream.Writable or http.OutgoingMessage
      const checkIfWritableOrOutgoingMessage =
        streamWritable && typeof streamWritable?.write === "function" && typeof streamWritable?.on === "function";
      if (!checkIfWritableOrOutgoingMessage) {
        throw new ERR_INVALID_ARG_TYPE("streamWritable", "stream.Writable", streamWritable);
      }

      if (isDestroyed(streamWritable) || !isWritable(streamWritable)) {
        const writable = new WritableStream();
        writable.close();
        return writable;
      }

      const highWaterMark = streamWritable.writableHighWaterMark;
      const strategy = streamWritable.writableObjectMode
        ? new CountQueuingStrategy({ highWaterMark })
        : { highWaterMark };

      let controller;
      let backpressurePromise;
      let closed;

      function onDrain() {
        if (backpressurePromise !== undefined) backpressurePromise.resolve();
      }

      const cleanup = finished(streamWritable, error => {
        error = handleKnownInternalErrors(error);

        cleanup();
        // This is a protection against non-standard, legacy streams
        // that happen to emit an error event again after finished is called.
        streamWritable.on("error", () => {});
        if (error != null) {
          if (backpressurePromise !== undefined) backpressurePromise.reject(error);
          // If closed is not undefined, the error is happening
          // after the WritableStream close has already started.
          // We need to reject it here.
          if (closed !== undefined) {
            closed.reject(error);
            closed = undefined;
          }
          controller.error(error);
          controller = undefined;
          return;
        }

        if (closed !== undefined) {
          closed.resolve();
          closed = undefined;
          return;
        }
        controller.error(new AbortError());
        controller = undefined;
      });

      streamWritable.on("drain", onDrain);

      return new WritableStream(
        {
          start(c) {
            controller = c;
          },

          async write(chunk) {
            if (streamWritable.writableNeedDrain || !streamWritable.write(chunk)) {
              backpressurePromise = createDeferredPromise();
              return SafePromisePrototypeFinally(backpressurePromise.promise, () => {
                backpressurePromise = undefined;
              });
            }
          },

          abort(reason) {
            destroy(streamWritable, reason);
          },

          close() {
            if (closed === undefined && !isWritableEnded(streamWritable)) {
              closed = createDeferredPromise();
              streamWritable.end();
              return closed.promise;
            }

            controller = undefined;
            return PromiseResolve();
          },
        },
        strategy,
      );
    }

    var webStreamsAdapters = {
      newStreamWritableFromWritableStream,

      newWritableStreamFromStreamWritable,
    };

    Writable.fromWeb = function (writableStream, options) {
      return webStreamsAdapters.newStreamWritableFromWritableStream(writableStream, options);
    };
    Writable.toWeb = function (streamWritable, options) {
      return webStreamsAdapters.newWritableStreamFromStreamWritable(streamWritable);
    };
javalsai commented 7 months ago

Is this reasonable? https://github.com/varshneydevansh/bun/commit/acf43f6d8ff0c265114a71c320b46e3a930a9115


    /**
     * @param {WritableStream} writableStream
     * @param {{
     *   decodeStrings? : boolean,
     *   highWaterMark? : number,
     *   objectMode? : boolean,
     *   signal? : AbortSignal,
     * }} [options]
     * @returns {Writable}
     */
    function newStreamWritableFromWritableStream(writableStream, options = {}) {
      if (!isWritableStream(writableStream)) {
        throw new ERR_INVALID_ARG_TYPE("writableStream", "WritableStream", writableStream);
      }

      validateObject(options, "options");
      const { highWaterMark, decodeStrings = true, objectMode = flase, signal } = options;

      validateBoolean(objectMode, "options.objectMode");
      validateBoolean(decodeStrings, "options.decodeStrings");

      const writer = writableStream.getWriter();
      let closed = false;

      const writable = new Writable({
        highWaterMark,
        objectMode,
        decodeStrings,
        signal,

        writev(chunks, callback) {
          function done(error) {
            error = error.filter(e => e);
            try {
              callback(error.length === 0 ? undefined : error);
            } catch (error) {
              // In a next tick because this is happening within
              // a promise context, and if there are any errors
              // thrown we don't want those to cause an unhandled
              // rejection. Let's just escape the promise and
              // handle it separately.
              process.nextTick(() => destroy(writable, error));
            }
          }
          // Wrapping on a new Promise is necessary to not expose the SafePromise
          // prototype to user-land.
          primordials.SafePromiseAll = (promises, mapFn) =>
            new Promise((a, b) => SafePromise.all(arrayToSafePromiseIterable(promises, mapFn)).then(a, b));

          PromisePrototypeThen(
            writer.ready,
            () => {
              return PromisePrototypeThen(
                SafePromiseAll(chunks, data => writer.write(data.chunk)),
                done,
                done,
              );
            },
            done,
          );
        },

        write(chunk, encoding, callback) {
          if (typeof chunk === "string" && decodeStrings && !objectMode) {
            const enc = normalizeEncoding(encoding);

            if (enc === "utf8") {
              chunk = encoder.encode(chunk);
            } else {
              chunk = Buffer.from(chunk, encoding);
              chunk = new Uint8Array(
                TypedArrayPrototypeGetBuffer(chunk),
                TypedArrayPrototypeGetByteOffset(chunk),
                TypedArrayPrototypeGetByteLength(chunk),
              );
            }
          }

          function done(error) {
            try {
              callback(error);
            } catch (error) {
              destroy(writable, error);
            }
          }

          PromisePrototypeThen(
            writer.ready,
            () => {
              return PromisePrototypeThen(writer.write(chunk), done, done);
            },
            done,
          );
        },

        destroy(error, callback) {
          function done() {
            try {
              callback(error);
            } catch (error) {
              // In a next tick because this is happening within
              // a promise context, and if there are any errors
              // thrown we don't want those to cause an unhandled
              // rejection. Let's just escape the promise and
              // handle it separately.
              process.nextTick(() => {
                throw error;
              });
            }
          }

          if (!closed) {
            if (error != null) {
              PromisePrototypeThen(writer.abort(error), done, done);
            } else {
              PromisePrototypeThen(writer.close(), done, done);
            }
            return;
          }

          done();
        },

        final(callback) {
          function done(error) {
            try {
              callback(error);
            } catch (error) {
              // In a next tick because this is happening within
              // a promise context, and if there are any errors
              // thrown we don't want those to cause an unhandled
              // rejection. Let's just escape the promise and
              // handle it separately.
              process.nextTick(() => destroy(writable, error));
            }
          }

          if (!closed) {
            PromisePrototypeThen(writer.close(), done, done);
          }
        },
      });

      PromisePrototypeThen(
        writer.closed,
        () => {
          // If the WritableStream closes before the stream.Writable has been
          // ended, we signal an error on the stream.Writable.
          closed = true;
          if (!isWritableEnded(writable)) destroy(writable, new ERR_STREAM_PREMATURE_CLOSE());
        },
        error => {
          // If the WritableStream errors before the stream.Writable has been
          // destroyed, signal an error on the stream.Writable.
          closed = true;
          destroy(writable, error);
        },
      );

      return writable;
    }

    /**
     * @typedef {import('../../stream').Writable} Writable
     * @typedef {import('../../stream').Readable} Readable
     * @typedef {import('./writablestream').WritableStream} WritableStream
     * @typedef {import('./readablestream').ReadableStream} ReadableStream
     */

    /**
     * @typedef {import('../abort_controller').AbortSignal} AbortSignal
     */

    /**
     * @param {Writable} streamWritable
     * @returns {WritableStream}
     */
    function newWritableStreamFromStreamWritable(streamWritable) {
      // Not using the internal/streams/utils isWritableNodeStream utility
      // here because it will return false if streamWritable is a Duplex
      // whose writable option is false. For a Duplex that is not writable,
      // we want it to pass this check but return a closed WritableStream.
      // We check if the given stream is a stream.Writable or http.OutgoingMessage
      const checkIfWritableOrOutgoingMessage =
        streamWritable && typeof streamWritable?.write === "function" && typeof streamWritable?.on === "function";
      if (!checkIfWritableOrOutgoingMessage) {
        throw new ERR_INVALID_ARG_TYPE("streamWritable", "stream.Writable", streamWritable);
      }

      if (isDestroyed(streamWritable) || !isWritable(streamWritable)) {
        const writable = new WritableStream();
        writable.close();
        return writable;
      }

      const highWaterMark = streamWritable.writableHighWaterMark;
      const strategy = streamWritable.writableObjectMode
        ? new CountQueuingStrategy({ highWaterMark })
        : { highWaterMark };

      let controller;
      let backpressurePromise;
      let closed;

      function onDrain() {
        if (backpressurePromise !== undefined) backpressurePromise.resolve();
      }

      const cleanup = finished(streamWritable, error => {
        error = handleKnownInternalErrors(error);

        cleanup();
        // This is a protection against non-standard, legacy streams
        // that happen to emit an error event again after finished is called.
        streamWritable.on("error", () => {});
        if (error != null) {
          if (backpressurePromise !== undefined) backpressurePromise.reject(error);
          // If closed is not undefined, the error is happening
          // after the WritableStream close has already started.
          // We need to reject it here.
          if (closed !== undefined) {
            closed.reject(error);
            closed = undefined;
          }
          controller.error(error);
          controller = undefined;
          return;
        }

        if (closed !== undefined) {
          closed.resolve();
          closed = undefined;
          return;
        }
        controller.error(new AbortError());
        controller = undefined;
      });

      streamWritable.on("drain", onDrain);

      return new WritableStream(
        {
          start(c) {
            controller = c;
          },

          async write(chunk) {
            if (streamWritable.writableNeedDrain || !streamWritable.write(chunk)) {
              backpressurePromise = createDeferredPromise();
              return SafePromisePrototypeFinally(backpressurePromise.promise, () => {
                backpressurePromise = undefined;
              });
            }
          },

          abort(reason) {
            destroy(streamWritable, reason);
          },

          close() {
            if (closed === undefined && !isWritableEnded(streamWritable)) {
              closed = createDeferredPromise();
              streamWritable.end();
              return closed.promise;
            }

            controller = undefined;
            return PromiseResolve();
          },
        },
        strategy,
      );
    }

    var webStreamsAdapters = {
      newStreamWritableFromWritableStream,

      newWritableStreamFromStreamWritable,
    };

    Writable.fromWeb = function (writableStream, options) {
      return webStreamsAdapters.newStreamWritableFromWritableStream(writableStream, options);
    };
    Writable.toWeb = function (streamWritable, options) {
      return webStreamsAdapters.newWritableStreamFromStreamWritable(streamWritable);
    };

I'd say that if it looks fine, it should be good to go. Might be good if someone has some tests for it for edge cases. But that can always be worked on later. I might ask on the discord server if you don't mind.

varshneydevansh commented 7 months ago

Actually, only thing which I am just kinda not able to get(maybe I am not trying enough) is to point Bun to my local build so that I can test these by running the file with my local build rather to the curl -fsSL https://bun.sh/install | bash

I will try to test this in the morning.

javalsai commented 7 months ago

Actually, only thing which I am just kinda not able to get(maybe I am not trying enough) is to point Bun to my local build so that I can test these by running the file with my local build rather to the curl -fsSL https://bun.sh/install | bash

I will try to test this in the morning.

What are you trying to do exactly? Add your local build executable as a simple bun command?

varshneydevansh commented 7 months ago

What are you trying to do exactly? Add your local build executable as a simple bun command?

const { createReadStream, createWriteStream } = require('node:fs')
const { Readable, Writable } = require('node:stream')

const readStream = createReadStream('/tmp/readfile')
const writeStream = createWriteStream('/tmp/writefile')

const readWebStream = Readable.toWeb(readStream)
const writeWebStream = Writable.toWeb(writeStream)

const readStreamAgain = Readable.fromWeb(readWebStream)
const writeStreamAgain = Writable.fromWeb(writeWebStream)

console.log(readStreamAgain, writeStreamAgain)
with Bun -

 ✘  devansh  bun test_bun_modi.js 
3 | 
4 | const readStream = createReadStream('/tmp/readfile')
5 | const writeStream = createWriteStream('/tmp/writefile')
6 | 
7 | const readWebStream = Readable.toWeb(readStream)
8 | const writeWebStream = Writable.toWeb(writeStream)
                           ^
TypeError: undefined is not a function
      at node:stream:2919:65
      at /home/devansh/test_bun_modi.js:8:24
 devansh  node test_bun_modi.js
Readable {
  _events: {
    close: undefined,
    error: undefined,
    data: undefined,
    end: undefined,
    readable: undefined
  },
  _readableState: ReadableState {
    highWaterMark: 16384,
    buffer: [],
    bufferIndex: 0,
    length: 0,
    pipes: [],
    awaitDrainWriters: null,
    [Symbol(kState)]: 1052940
  },
  _read: [Function: read],
  _destroy: [Function: destroy],
  _maxListeners: undefined,
  [Symbol(shapeMode)]: true,
  [Symbol(kCapture)]: false
} Writable {
  _events: {
    close: undefined,
    error: undefined,
    prefinish: undefined,
    finish: undefined,
    drain: undefined
  },
  _writableState: WritableState {
    highWaterMark: 16384,
    length: 0,
    corked: 0,
    onwrite: [Function: bound onwrite],
    writelen: 0,
    bufferedIndex: 0,
    pendingcb: 0,
    [Symbol(kState)]: 17580812,
    [Symbol(kBufferedValue)]: null
  },
  _write: [Function: write],
  _writev: [Function: writev],
  _destroy: [Function: destroy],
  _final: [Function: final],
  _maxListeners: undefined,
  [Symbol(shapeMode)]: true,
  [Symbol(kCapture)]: false
}
node:events:496
      throw er; // Unhandled 'error' event
      ^

Error: EISDIR: illegal operation on a directory, read
Emitted 'error' event on Readable instance at:
    at emitErrorNT (node:internal/streams/destroy:169:8)
    at emitErrorCloseNT (node:internal/streams/destroy:128:3)
    at process.processTicksAndRejections (node:internal/process/task_queues:82:21) {
  errno: -21,
  code: 'EISDIR',
  syscall: 'read'
}

Node.js v20.11.0

 [~]
 ✘  devansh  bun/build/bun-debug test_bun_modi.js 
[SYS] read(3, 4096) = 4096 (0.051ms)
[fs] close(3[/home/devansh/bun/build/bun-debug])
[SYS] openat(-100, /home/devansh/bunfig.toml) = 18446744073709551614
[SYS] openat(-100, test_bun_modi.js) = 3
[SYS] fstat(3) = 0
[fs] close(3[/home/devansh/test_bun_modi.js])
[fs] close(4[/])
[fs] close(5[/home])
[fs] close(6[/home/devansh])
[SYS] openat(-100, /home/devansh/test_bun_modi.js) = 12
[fs] openat(2147483647, /home/devansh/test_bun_modi.js) = 12[/home/devansh/test_bun_modi.js]
[fs] close(12[/home/devansh/test_bun_modi.js])
[alloc] new() = src.bun.js.node.node_fs_binding.NodeJSFS@20000311400
[alloc] new() = src.bun.js.node.node_fs_binding.NodeJSFS@20000312800
[SYS] openat(-100, /tmp/readfile) = 12
[SYS] openat(-100, /tmp/writefile) = 14
[SYS] dup(12) = 15
[SYS] fstat(16) = 0
[fs] close(16[/tmp/readfile])
[SYS] openat(-100, /home/devansh/test_bun_modi.js) = 16
[fs] openat(2147483647, /home/devansh/test_bun_modi.js) = 16[/home/devansh/test_bun_modi.js]
[fs] close(16[/home/devansh/test_bun_modi.js])
3 | 
4 | const readStream = createReadStream('/tmp/readfile')
5 | const writeStream = createWriteStream('/tmp/writefile')
6 | 
7 | const readWebStream = Readable.toWeb(readStream)
8 | const writeWebStream = Writable.toWeb(writeStream)
                           ^
ReferenceError: Can't find variable: isDestroyed
      at newWritableStreamFromStreamWritable (node:stream:3928:97)
      at /home/devansh/test_bun_modi.js:8:24
      at asyncFunctionResume (:1:21)
      at promiseReactionJobWithoutPromiseUnwrapAsyncContext (:1:21)
      at promiseReactionJob (:1:21)

bun/build/bun-debug test_bun_modi.js

I wasn't paying attention even after reading this https://bun.sh/docs/project/contributing#building-bun