azuqua / cassanknex

A CQL query builder written in the spirit of Knexjs
MIT License
50 stars 10 forks source link

How can I handle streams backpressure? #71

Closed idandagan1 closed 3 years ago

idandagan1 commented 4 years ago

Hi, Let's say I have an express app and I'd like to use streams.

for example:

const express = require('express');
const app = express();

app.use((req, res, next) => {
  client.stream('SELECT time, val FROM temperature WHERE station_id=', [ 'abc' ])
    .on('readable', () => {
      let row;
      while (row = this.read()) {
         res.write(row); // THIS SHOULD BE PIPED
      }
    })
    .on('end', function () {})
    .on('error', function (err) {});
})

So in Node.js v10.0.0, they added support for pipeline which uses the benefits of pipe for automatic pause/resume logic - but also forwarding errors and properly cleaning up streams if needed.

How can I handle backpressure?

UnbounDev commented 4 years ago

@idandagan1 that's a good question... the cassandra-client docs only list examples similar to your own, but the stream code does return a ReadableStream (actually a custom type that inherits from stream.Readable), so you'd think that it should be possible to include that type in a nodejs pipeline.

All that being said, there's some interesting logic going on w/ that custom type and I'm not certain if that would in some way inhibit your use case.

I would suggest two possible directions for the case:

  1. Ask your question on the https://github.com/jorgebay/node-cassandra-cql repo and see if someone has figured this out for express+pipelines+cassandra-client.
  2. As a work around, it may be possible to create a helper function that wraps the cassandra-client stream method, creating and returning a stream for use in your express response pipeline (I've tended to use highland as a utility for this type of thing).
idandagan1 commented 3 years ago

I eventually used directly cassandra client:

  const cassanknex = require('cassanknex');
  const cassandraClient = cassanknex.getClient();

  cassandraClient.stream('SELECT * FROM table', { prepare: true });