whatwg / streams

Streams Standard
https://streams.spec.whatwg.org/
Other
1.34k stars 156 forks source link

'Splitter' stream? #1030

Open kcoop opened 4 years ago

kcoop commented 4 years ago

I'm needing a 'splitter' WritableStream that divvies input based on some criteria, sequentially generating multiple output ReadableStreams, each created by a factory function that gets called when the input calls for it.

The specific scenario is a log file splitter, which splits a readable based on an input pattern such that anytime the pattern is seen, subsequent chunks are available from a new ReadableStream that after some transforms ultimately pipesTo a writable file stream (output1.log, output2.log, etc.).

A usage example of the API I had in mind looks something like this:

let counter = 0;
readable.pipeTo(new WritableStream(new StreamSplitter(
  (chunk: string) => { return string.includes(thePattern); },
  () => {
    counter++;
    return new ReadableStream()
        .pipeThrough(createMyRemainingTransforms())
        .pipeTo(createMyWritableFileStream(counter);
  }
)));

First, is there a better approach to this? And second, I'm scratching my head implementing StreamSplitter. My thought was to create it something like this, but I'm stuck on how to create a writable/readable pair such that writes to a writer are automatically forwarded to the readable:

export class StreamSplitter implements UnderlyingSink {

    private _writable: WritableStream;

    constructor(readonly shouldSplit: (chunk:any) => boolean, readonly createReadable: () => ReadableStream)
    {
    }

    private createNewReadable() {
        if (this._writable) {
            this._writable.getWriter().close();
        }
       const readable = this.createReadable();
        // ???
        // How do we create a writable and readable pair
        // such that every write to the writable goes to the readable?

    }

    start(controller: any) {
        this.createNewReadable();
    }

    write(chunk: any, controller: any) {
        if (this.shouldSplit(chunk)) {
            this.createNewReadable();
        }
        this._writable.getWriter().write(chunk);
    }

    close() {
        this._writable.getWriter().close();
    }

    abort(error: any) {
        this._writable.getWriter().abort();
    }
}

Any thoughts? Love the piping metaphor, but still very confused.

domenic commented 4 years ago

I'm stuck on how to create a writable/readable pair such that writes to a writer are automatically forwarded to the readable:

This is the precise intended use case for a transform stream: https://streams.spec.whatwg.org/#example-ts-sync-mapper might help get you started.

kcoop commented 4 years ago

Thanks, domenic! That makes sense, but how do I create a transform stream from an existing readable and writable?

kcoop commented 4 years ago

In the code above, I'm wanting to create a generic writable, and build the readable from a factory function, then stitch them together.

domenic commented 4 years ago

Usually you put the transform in the middle:

existingReadable.pipeThrough(transform).pipeTo(existingWritable);

It's a bit hard for me to understand the code above because it appears to not be valid JavaScript.

kcoop commented 4 years ago

It's Typescript. Please don't hold that against me. :-) Below is a translation (still using a class, sorry).

The goal again is to have a function that allows me to pass in a readable, a test function, and a 'split readable' generator function, and have it do the wiring up so that the generator creates a new readable every time the test function returns true, and routes subsequent writes to a writer associated with that new readable.

Here is my uneducated stab at it, rewritten in JS. It consists of an UnderlyingSink passed to the constructor of a WritableStream. When it gets a write, it checks whether it needs to create a new readable pair using the shouldSplit boolean function. When it does, the idea is to:

a) generate a readable stream from the factory function provided, b) create a generic writable stream that we keep around and pass writes through to c) link the readable and writable together so writes from the writable to go the readable.

Am I just Doing it Wrong? Is there a simpler way?

class StreamSplitter {

    constructor(shouldSplit, createReadable) {
      this.shouldSplit = shouldSplit;
      this.createReadable = createReadable;
    }

    createNewReadable() {
        if (this._writable) {
            this._writable.getWriter().close();
        }
       const readable = this.createReadable();
        // ???
        // How do we create a writable and readable pair
        // such that every write to the writable goes to the readable?
    }

    function start(controller: any) {
        this.createNewReadable();
    },

    function write(chunk: any, controller: any) {
        if (this.shouldSplit(chunk)) {
            this.createNewReadable();
        }
        this._writable.getWriter().write(chunk);
    },

    function close() {
        this._writable.getWriter().close();
    },

    function abort(error: any) {
        this._writable.getWriter().abort();
    }
}
MattiasBuelens commented 3 years ago

A bit late to the party, but I gave it a go. 😁

This is basically a regular pipe loop, but the writer can be switched out in the middle.

let counter = 0;
let readable = new ReadableStream(/* ... */); // this is probably an input argument
let writable = createOutputStream(counter);
let reader = readable.getReader();
let writer = writable.getWriter();
try {
  while (true) {
    await writer.ready;
    const { done, value } = await reader.read();
    if (done) {
      // all chunks have been read, we're done
      writer.close();
      break;
    }
    if (shouldSplit(value)) {
      // create and switch to a new output stream
      writer.close().catch(() => {});
      writer.releaseLock();
      counter++;
      writable = createOutputStream(counter);
      writer = writable.getWriter();
    }
    // write chunk to current output stream
    writer.write(value).catch(() => {});
  }
} catch (e) {
  // read or write failed, propagate to the other end
  reader.cancel(e).catch(() => {});
  writer.abort(e).catch(() => {});
} finally {
  // just in case someone else wants to use the readable afterwards
  reader.releaseLock();
  writer.releaseLock();
}

function shouldSplit(chunk) {
  return string.includes(thePattern);
}

function createOutputStream(counter) {
  const transform = new TransformStream();
  transform.readable
    .pipeThrough(createMyRemainingTransforms())
    .pipeTo(createMyWritableFileStream(counter));
  return transform.writable;
  // equivalent, but less obvious:
  const { readable, writable } = createMyRemainingTransforms();
  readable.pipeTo(createMyWritableFileStream(counter));
  return writable;
}

There's probably a way to replace the loop with some clever .pipeTo() calls, but I'll leave that as an exercise for the reader. 😛

yume-chan commented 2 years ago

I don't know what you really want to do, because your code doesn't type check.

The basic of ReadableStream is that only the creator can push data into it, so instead let user of StreamSplitter create the stream, StreamSplitter should create the stream itself and give it to the user.

Like this

export class StreamSplitter implements UnderlyingSink {
    private _writable: WritableStream;

    constructor(readonly shouldSplit: (chunk: any) => boolean, readonly createReadable: (readable: ReadableStream) => void) {
    }

    private createNewReadable() {
        if (this._writable) {
            this._writable.getWriter().close();
        }

        // Create a passthrough stream pair
        // Writing to `writable` will pipe the data to `readable`
        const { readable, writable } = new TransformStream();
        this._writable = writable;

        // Pass the `readable` to the user of `StreamSplitter`
        this.createReadable(readable);
    }

    start(controller: any) {
        this.createNewReadable();
    }

    write(chunk: any, controller: any) {
        if (this.shouldSplit(chunk)) {
            this.createNewReadable();
        }
        this._writable.getWriter().write(chunk);
    }

    close() {
        this._writable.getWriter().close();
    }

    abort(error: any) {
        this._writable.getWriter().abort();
    }
}

declare const readable: ReadableStream<string>;
declare const thePattern: string;
declare function createMyRemainingTransforms(): TransformStream<string, string>;
declare function createMyWritableFileStream(counter: number): WritableStream<string>;

let counter = 0;
readable.pipeTo(new WritableStream(new StreamSplitter(
    (chunk: string) => { return chunk.includes(thePattern); },
    (readable) => {
        counter++;
        return readable
            .pipeThrough(createMyRemainingTransforms())
            .pipeTo(createMyWritableFileStream(counter));
    }
)));