Open flesler opened 6 years ago
@flesler There was some discussion of this on the other thread, but the core issue is that PG's connection is an event emitter rather than a readable.
At its heart though lies a socket, which does offer a Readable interface with backpressure. I don't see why pg couldn't be refactored into a stream instead, but you would need to change things all the way up. (Not that I'm offering to this work 😄...)
If I'm reading the code right, once we get to the data fetching part of the query starting here, it's basically a processing chain with a switch:
The main difficulty would be in the demuxing of events that the connection does. I imagine you could maintain the event-emitter interface with all the events and additionally provide a Readable that emits rows. Then in the connection you route all non-row messages straight to the event emitter (no backpressure) and you keep all the row events in the stream. You would then conceptually do something like:
socket
.pipe(packetReaderTransform())
.pipe(connection.rowTransorm())
.pipe(cursor.recordRow()) // Sidebar: not sure I understand why _rows is needed
.pipe(myTransformer())
So then backpressure would flow from myTransformer all the way to connection. connection
could eagerly process non-row messages until it hits a row and stalls, at which point backpressure would flow up and into the socket.
The nice thing about this setup is that you could maintain your OOB error handling through the event emitter the way its currently done.
So, yeah, somebody do this...
I would like to stream large amounts of rows from a database without being powered behind the scenes by having multiple queries with a cursor. Currently using this is super slow in comparison to just loading 100K rows and processing them when ready by ~5 times.
The
pg
library already emits a 'row' event for each row. In fact, if there's no need for the whole set of rows at the end, it won't even buffer them in memory (which is a big plus).Initially reported at: https://github.com/tgriesser/knex/issues/2535
Thanks