mariadb-corporation / mariadb-connector-nodejs

MariaDB Connector/Node.js is used to connect applications developed on Node.js to MariaDB and MySQL databases. MariaDB Connector/Node.js is LGPL licensed.
GNU Lesser General Public License v2.1
368 stars 91 forks source link

queryStream does not handle backpressure #161

Closed akesser closed 3 years ago

akesser commented 3 years ago

I use queryStream to load many (>500000) lines from a table in combination with stream.pipeline to create packages of variable size. The packages are then handled slowly (by using a timeout of two seconds). The two (transform and write) streams used in stream.pipeline respect backpressure. queryStream seems not to handle it.

Since commit https://github.com/mariadb-corporation/mariadb-connector-nodejs/commit/ddb208003cd942b3551cc0cd7cd28df29163db3e was merged, handleNewRows(row) in stream.js does a force push to the underling Readable stream, that is returned to the user:

handleNewRows(row) {    this.inStream.push(row);  }

If I undo the changes from the commit mentioned above, and adding one line

class Stream extends Query {
  constructor(cmdOpts, connOpts, sql, values, socket) {
    super(
      () => {},
      () => {},
      cmdOpts,
      connOpts,
      sql,
      values
    );
    this.socket = socket;
    this.inStream = new Readable({
      objectMode: true,
      read: () => {
        this.socket.resume();
      }
    });

    this.on('fields', function (meta) {
      this.inStream.emit('fields', meta);
    });

    this.on('error', function (err) {
      this.inStream.emit('error', err);
    });

    this.on('end', function (err) {
      if (err) this.inStream.emit('error', err);
      this.socket.resume(); // <- This line was added by me otherwise the connection is not closed
      this.inStream.push(null);
    });
  }

  handleNewRows(row) {
    if (!this.inStream.push(row)) {
      this.socket.pause();
    }
  }
}

The following image shows the different size off process.memoryUsage["rss"] for the two different implementations: Bild 09 06 21 um 10 46

Additionally, if I understand it correctly, because the data from the net socket is handled using the "data" event, the size of the data that is read and the speed can only be controlled by socket.pause and socket.resume. Is the system able to handle backpressure via the tcp connection in this case?

Thanks for your help.

rusher commented 3 years ago

I'll need to dig a little on that.

On one hand previous implementation (the one you use in green) handle backpressure well, but if pipelining results, any error when handling result makes connection stall indefinitively.

On other side, current implementation might use more memory if client result handling takes more time than database returning data, but connection will be ok.

There might be some solution to avoid having connection hanging.

rusher commented 3 years ago

For the moment, i would tend to think that providing a method to close stream would seem the best solution.

like :

const queryStream = conn.queryStream('SELECT * FROM seq_1_to_10000');

stream.pipeline(queryStream, transformStream, someWriterStream, (err) => {
  if (err) queryStream.close(); // <= method to call to ensure having connection not stalling
  ...
});
rusher commented 3 years ago

Choice was initially to have a good state whatever the situation, but this goes against stream feature to avoid charging memory.

Correction will handle implementation like the correction you describe, and provide a method to permits close query stream when error occurs, to properly handle connection. Documentation will explicit this use case.

akesser commented 3 years ago

Im not sure if stream.pipeline propagates errors to the other streams, but if so, could a listener in the Stream extends Query class help to inform the socket or the queryStream about this event?

something like

this.inStream.on('error') {
  this.emit("error"); // or some other way to inform about this
}
rusher commented 3 years ago

correction will be release in 3.0.1-RC.

If any error occurs using resultset stream, query stream will have to be close. previous example :

const queryStream = connection.queryStream("SELECT * FROM mysql.user");
stream.pipeline(queryStream, transformStream, someWriterStream);

is now

const queryStream = connection.queryStream("SELECT * FROM mysql.user");
stream.pipeline(queryStream, transformStream, someWriterStream, (err) => { queryStream.close(); });

There is no way to ensure that automagically, so documentation explicitly explain that, permitting real streaming