datopian / datapipes

Data Pipes for CSV
https://datapipes.okfnlabs.org/
MIT License
117 stars 16 forks source link

Use event-stream #110

Open rufuspollock opened 10 years ago

rufuspollock commented 10 years ago

switch to using @dominictarr https://github.com/dominictarr/event-stream much more heavily

dominictarr commented 10 years ago

I wrote event-stream before @substack convinced everyone that utility libraries are worse than using finer grained module.

I suggest using through and map-stream, and other stuff that comes up in npm search stream instead.

rufuspollock commented 10 years ago

@dominictarr good advice.

BTW, how does one in map-stream indicate that one has read enough of the input - e.g. imagine i wanted to implement a map that does stuff with first 50 rows and then does not want any more input. Does one throw an exception? (I could ask this more generally about through - what event should i trigger to say "hey, i don't want the readable stream part any more ...")

dominictarr commented 10 years ago

ah, you call destroy() on the source stream.

rufuspollock commented 10 years ago

@dominictarr that's what I thought but I couldn't spot destory in v0.10 streams (have I missed it?). If v0.10 does not have destroy what should you do instead?

dominictarr commented 10 years ago

hmm, oh, I think 0.10 stream have unpipe?

rufuspollock commented 10 years ago

@dominictarr sorry for lag here - this has been super-useful and we are most definitely going to use those libraries (btw: huge props for your work here!).

Just wanted to confirm: is unpipe equivalent of destroy ?

Basis issue for us here is: we want to allow operations on streaming data. Imagine I want to do the following:

Now it is quite likely we may only need to read the first few MBs (or less) of this big file to get the 10 items that head needs. So the head operation (the last item in the chain) once it has got 10 items needs to tell the previous item in the pipeline to stop sending info, and that in turns needs to tell the csv parser and that in turns tells the request to terminate.

How can we do that? Previously you called destroy but is it now unpipe?

rufuspollock commented 10 years ago

Note: we probably also need to use "Object Mode" on our streams: http://nodejs.org/api/stream.html#stream_object_mode

dominictarr commented 10 years ago

Oh, sorry - unpipe is not the same as destroy.

node streams do not support this use-case very well unfortunately. basically, what you need is for the error or end case to be propagated up the stream. if the consuming stream can no longer accept input - either because it only wanted the beginning, or because there has been an error, then you need to tell the source to quit.

With node streams you need to hook this up manually, by doing something like:

//end or destroy or something?
dest.on('end', function () {
  source.end() //or destroy or something.
})

Although, I wish there didn't need to be so many types of streams, when I need this sort of functionality I use https://github.com/dominictarr/pull-stream It's a much simpler stream pattern, that supports backpressure and propagates errors/abort. it works great for object streams. When I need to interface pull streams with node streams I use https://github.com/dominictarr/stream-to-pull-stream or https://github.com/dominictarr/pull-stream-to-stream

also, you can do some other neat stuff like you can define a transform stream out of other transform streams like this:

//combine 3 transformations
var combinedTransformation = pull(decode, map, encode)
//then pipe through that transformation.
pull(source, combinedTransformation, sink)

You can't really do this in a simple way with node streams... there is https://github.com/dominictarr/stream-combiner/ and https://github.com/naomik/bun but those module are a little complex.

hopefully the future platform comes along and makes node.js look like ruby will have streams that work well as both data streams and object streams.

dominictarr commented 10 years ago

most core stream2 streams will still have destroy, otherwise they wouldn't be backwards compatible. here fs.ReadStream#destroy: https://github.com/joyent/node/blob/master/lib/fs.js#L1542