Open loveencounterflow opened 10 years ago
It sounds like you might want something like this: https://www.npmjs.org/package/map-stream
Also, you could build your own with through. The simplest way to do this with through is to keep a count of how many chunks you have on the fly, and don't end the stream if there are some things unfinished. This won't guarantee they come out in order, but they will come out (unless someone call's their callback twice - but you can detect that with a similar strategy)
Alternatively, use through2. it's the same idea as through but using streams 2. the only different is that the data/end handlers callsback when they are done. You can create this effect using pause and resume, but that requires the upstream stream to be well behaved (which through streams are by default, but not all classic streams are). But to be honest, through2 is simpler for this. It will ensure that the data coming out is in order, too, which may be what you want.
The first approach will process every chunk in parallel, the second in serial.
thanks a lot for your recommendations, which sort of mesh well with what i've found out so far. i've already dabbled with through2 which looks very interesting to me.
on a related note i've just found out that an ordinary fs
readstream gets terminated prematurely when all i do is peeking into the data using a function built with event-stream
's map
; this got corrected when i replaced a map
-based solution with one using event-stream
's through
. not sure why there should be this difference, but surely it would be great if such pitfalls, if resulting from a design decision, could be mentioned in the docs. right now i'm suspecting map
may be the main / only culprit in my pipes.
i have the following problem: i read in keys from a (level) DB; upon each key i see, i want to open another readstream where i look for more data related to the first key and write that back into the stream. my data collection is rather big so certainly there are many chunks of raw bytes read and written before one run is finished. this tends to hide the obnoxious fact that, sometimes, some pieces of data would appear to get lost in the process.
i've written a library called Pipedreams to help me abstract away the dirty details of stream handling; it mainly relies on
through
to get its stuff done. i've been building up and using that library for a couple months now, and since i knew even less about streams back when i started, it sure does contain bugs (when you compare the code of individual functions in that library, you can see the different strata of comprehension / skill i had at my disposal when writing that function. time for an overhaul i guess).i have illustrated my specific use case over at https://gist.github.com/loveencounterflow/65fd8ec711cf78950aa0; i'm using short files there instead of levelDB, but the problem remains the same as far as i can see.
as i see it, the problem seems to be that when i open a readstream and pipe it through transformers, everything goes fine as long as the transformers are all synchronous. but when one transformer has to do something asynchronous—like openening a file to read lines—then its results may come late to the pipe's destination stream. it is as if the end or close event of the source stream is passed through to the destination which then closes, to, letting some data stranded. the gist's output clearly shows that all processing is indeed performed, but it never reaches its destination. i suspect that if the sample files were much longer (so they'd cause multiple chunks to be emitted), the earlier processing results would indeed show up at the destination.
we all now that Node won't exit by itself as long as there's still something on the event loop, so you can use
setTimeout( fn, 1e6 )
to force the process not to terminate. i triedsetImmediate
to make data passing 'more asynchronous' (you see i'm guessing here—what does it really mean thatthrough
is 'synchronous'—does that mean it is inherently unsuited for asynchronous tasks? do i have to useES.readable
instead?), and indeed with that trick i managed to get some (but crucially not all) data items through.from the experience my intense dabbling with NodeJS streams for the better part of this year i'd say that when i open a readstream and call
s.pause()
on it (say from within a piped transformer), then that should make NodeJS hand indefinitely until i calls.resume()
—otherwise i haven't paused at all. that seemingly does not work; also, you have to be careful where to put thepause
call to avoid getting aunable to get back to old streams mode at this point
.i should be very grateful if some knowledgeable people could fill me out on my doubts, misunderstandings, and lacunae. streams are great!