jlongster / transducers.js

A small library for generalized transformation of data (inspired by Clojure's transducers)
BSD 2-Clause "Simplified" License
1.73k stars 54 forks source link

How could we use transducers with Node.js streams? #22

Open jeffbski opened 9 years ago

jeffbski commented 9 years ago

I love the concept of transducers and your implementation is excellent and performant.

I was trying to figure out how we might use transducers with Node.js streams, but I'm getting hung up on the fact that the iterator protocol doesn't really have the concept of data not being ready yet.

From my understanding, if we try to call iter.next() on an iterator that would be reading from a stream, that if there is no data then it will either end or pass undefined. So we could try to have a stream ready check to only call next once data is available, however if the transducer chain is filtering data, it could end up getting filtered out, and I assume transducer will immediately go back for another chunk which might not be available yet.

So it feels like there is something missing to throttle and allow this to work with async streams.

I know that other languages simply block when data is not available, but it would be nice to come up with a strategy that could work with ES5 as well until generators are commonly available.

Do you have any thoughts? I would really love to consolidate things using transducers for everything.

Thanks.

kevinbeaty commented 9 years ago

Here is one option: transduce-stream. It should work with either transducers.js or transducers-js (I plan on testing and verifying this weekend, only tried it with transducers-js so far.)

The secret is to use transducers with push streams: instead of using reduce/transduce to pull values from an array/iterator through a transformer, use the transformer directly and call step as you have values.

jeffbski commented 9 years ago

@kevinbeaty excellent. Thanks for mentioning this package, I will check it out.

jlongster commented 9 years ago

Right, as @kevinbeaty said, how to iterate data and how to build up a new data structures is not relevant to transducers. I include functionality in this library for working with native types for convenience, but for other data structures (like stream) you have to do the integration yourself or use a different library.

It's not too hard to use transducers manually, so it's pretty easy to integrate. @kevinbeaty's library looks nice, and it should work with my library.

pjeby commented 9 years ago

Unfortunately, the "secret" of using push streams doesn't work if you want to use, say, cat, on a stream of streams. cat uses reduce(), which expects to iterate and step by itself, instead of delegating the reduction to the container. If there were a @@transducer/reduce protocol that reduce() looked for before doing iteration, then asynchronous iteration of a source stream would work.

That would allow one to do the kind of .flatten() and .flatMap() operations on async streams that Bacon, Kefir, RxJS, Highland, etc. can do. It'd probably even let you do flatMapLatest() or flatMapWithConcurrencyLimit(), if you were clever enough with your intervening transforms.

(Basically, with that one addition to the protocol, async streams become first-class players in the transduction game. There's no backpressure capability per se, but you can stop receiving data by returning a reduced result from a downstream /step operation.)

Is there a place to propose an official addition to the spec? Is the spec even a thing? I saw something that was supposedly a link to the spec that turned out to be a lengthy GH issue discussion thread, but nothing that looked like an actual spec spec, if you know what I mean.

dypsilon commented 9 years ago

@pjeby do you have any progress with the "reduce"-addition and stream integration? I would like to port my code from awkward object streams to transducers and have no idea yet if it's even possible.

NodeGuy commented 7 years ago

Alas, what an oversight.

shaunc commented 7 years ago

@pjeby -- Could you explain your idea a bit more? Who is supposed to "know" about the subobjects in your collection/stream? Where would "reduce()" look for the "@@transducer/reduce" protocol? If it looks to the cat transformer, then that doesn't help us much. Should it look to the upstream accumulator? (Or should cat itself, which has access to it?)

pjeby commented 7 years ago

Gosh, it's been a while, so I'm actually fuzzy now in my recollection of how transducers work, not to mention this discussion. But IIRC, shouldn't the concatenation of streams implement reduction by reduction on its concatenated streams? i.e., shouldn't it be solved by recursion on reduce()?

shaunc commented 7 years ago

That sounds good... but in so much as I understand it myself, the idea of transducers (at least in the concrete implementations in js) abstract "reduce" by resolving three parts: initialization of the "accumulator" stepping through input and accumulating, and collecting results.

(See, incidentally, https://github.com/jlongster/transducers.js/issues/46 implemented by https://github.com/jlongster/transducers.js/pull/48. Upstream and downstream might be opaque, but every transducer should have the opportunity to control "its own" accumulator).

A transducer needn't concern itself with the source of its objects, but it needs to "know" about the objects themselves, or be told somehow. This could be a property of the upstream "accumulator" as I was thinking earlier. More straightforward,, though it makes the pipeline "typed", would be to pass another transformer -- a transformer for the subobjects -- as an optional argument into cat. This would allow cat to run through these objects (possibly transforming them). Then we wouldn't need another special property.

Alternately, if we wanted each "cat" pipeline to be general, having a special property containing a sub-object-transformer might be the way to go?

Either way the sub-object transformer could be cat itself, perhaps, so we could flatten trees of streams if we really wanted too. :)

pjeby commented 7 years ago

Right. What I'm saying is that the problem is the part where reduction currently hardcodes reliance on the standard, synchronous iteration protocol. If instead there were a higher-level protocol for "reducing a sequence", then the default implementation would still fall back to synchronous iteration, but an asynchronous "collection" (e.g. a stream) would be able to provide an asynchronous reduction algorithm.

(In principle, one could hardcode an additional asynchronous iteration protocol, but making reduction itself a protocol gives freedom to experiment with different kinds of asynchronous streams and results.)

mzpkjs commented 4 years ago

@pjeby: I am curious, did you managed to implement such protocol? I have been lately working on such with no success whatsoever, so any insight would be helpful.

Way of transducing seems to be really coupled with both, a source you "loop" through and a target you reduce to. E.g. a transducer for Iterator -> Iterator and Array -> Iterator / Iterator -> Array need to be implemented differently in my case.

For now, I am trying to somehow mitigate this issue with some kind of double dispatching and picking the "right" transduce implementation based on both, a target and a source.

Anyone could share their experience on this topic?

jaawerth commented 4 years ago

@mzpkjs Async iteration was actually added to the ES Spec in 2018 along with async generators, and is already supported on a number of platforms, including modern browsers and Node >= 12.x! The tldr is that the .next() method on an async iterator returns a promise that resolves to { value, done }.

In those newer node releases, there's also Readable.from, which can accept an iterable OR an async iterable and returns a readable stream, and Readable.prototype[Symbol.asyncIterator] allows iterating a stream using a for..await loop in an async function, or anything else that works with the protocol.

It should be possible to transduce in terms of these when appropriate... definitely worth looking into!

pjeby commented 4 years ago

Nice to know about; it sounds like the new Readable partially obsoletes yieldable-streams, which was originally written for node 0.10 (not to be confused with node 10 :wink:).

jaawerth commented 4 years ago

Yep! It's the native JS interop glue that's been missing from streams. You can do things like:

const { Readable, PassThrough } = require('stream');

const sleep = (ms) => new Promise(resolve => setTimeout(resolve, ms));
async function* delayedRange(delay, start = 1, end = Infinity) {
  for (let i = start; i <= end; i++) {
    // just to show you can both await and yield from an async generator
    await sleep(delay);
    yield i;
  }
}

// create stream from iterable or async iterable
const src1 = Readable.from(delayedRange(1000, 1, 5));
// pipe to another stream
const src2 = new PassThrough({ objectMode: true });
src1.pipe(src2);

// iterate a readable stream from an async function with for..await
(async () => {
  for await (const chunk of src2) {
    console.log(chunk);
  }
})();

The beauty is that a transduce function that operates on async iterables could easily be plugged into a streams interface via either an adapter or native support, without worrying about platform-specific implementation details (e.g. node streams vs the new DOM Streams API