juanluispaz / ts-sql-query

Type-safe SQL query builder like QueryDSL or JOOQ in Java or Linq in .Net for TypeScript with MariaDB, MySql, Oracle, PostgreSql, Sqlite and SqlServer support.
https://ts-sql-query.readthedocs.io/
MIT License
291 stars 19 forks source link

ReadStream from executeSelect #96

Open Smert opened 1 year ago

Smert commented 1 year ago

Hi! I want to export data from MySQL to large csv/json files. I can't store all exported records in RAM and want to use streaming.

It will be convenient to have executeSelectStream or executeSelectIterator: AsyncIterable<T> methods.

juanluispaz commented 1 year ago

This is a very interesting feature that I would like to add to ts-sql-query; I was waiting for someone to request it. I will need to investigate this around all the databases and connectors, but I'm aware not all connectors support it.

Can you prepare a little example of how this will look like using just the MySql connector (without ts-sql-query). Just put in one function the query using the connector and another function that calls this first one and consumes the data.

Smert commented 1 year ago

Readable doesn't have record type

function getPostsStream(): Readable {
  return connection.query('SELECT * FROM posts').stream();
}

getPostsStream()
  .on('data', post => {
    console.log(post);
  });

Iterable has record type but there no destroy/abort/close methods.

function getPostsAsyncIterable(): AsyncIterable<PostRecord> {
  return connection.query('SELECT * FROM posts').stream();
}

for await (const post of getPostsAsyncIterable()) {
  console.log(post);
}

// It's possible to create a stream from AsyncIterable
Readable.from(getPostsAsyncIterable())
  .on('data', post => {
    console.log(post);
  });
juanluispaz commented 1 year ago

Hi!

I've been reviewing this in detail, and how to move forward in all supported databases (MariaDB, MySql, Oracle, PostgreSql, Sqlite and SqlServer) and connectors that can support this feature (mariadb, mssql, mysql2, oracledb, pg, postgres, mysql, msnodesqlv8, tedious), the one that can support this feature but requires more investigation how to get there (better-sqlite3, sqlite3, sqlite) and others where it is not supported (any-db, LoopBack, prisma)

In order to support this, I'm thinking of using AsyncIterable, inside the AsyncIterable, there is an option to destroy/abort/close using the return method in the iterator (more info, detailed example). The return method is called automatically by the for async if it exists to perform any clean-up.

I prefer to use AsyncIterable interface over Readable, because AsyncIterable is an easier and standard type, allowing to reason better in the interceptors, meanwhile Readable is a moving target that changes between node versions (and it is not the same for the web), and to all the database connectors support them by default, then that will be needed to implement or emulate in some way.

The new methods will be:

Both cases support an optional argument that allows specifying the buffer size, that is, the number of elements loaded to memory (but not consumed by the iterator) before the connection is paused to don't make high memory pressure in the server.

For the case of connectors that don't support that feature, I'm not sure what to do; one option will be a fallback in an executeSelectMany but returned as AsyncIterable according to what was requested or throw an Error. Maybe the best option will be to throw an error but allow to enable a flag to fallback on executeSelectMany.

Be aware; meanwhile, if the AsyncIterable is not closed (the return method is not called), the connection will be busy, which means that other queries or the end of a transaction cannot be executed.

What do you think about this?

juanluispaz commented 1 year ago

Elaborating a little bit more:

The new methods will be:

They can be used in this way:

let companies = await connection
    .selectFrom(tCompany)
    .select({
        id: tCompany.id,
        name: tCompany.name
    })
    .orderBy('id')
    .executeSelectStream()

for await(let company of companies) {
    console.log('company', company)
}

If there is an error executing the query, the executeSelectStream function must be the one that throws that error; any error in the for must be because something happened to get the next records (but not because of the query itself). It is going to be interesting to make all connectors follow this behaviour.

The executeSelectStream will be a promise to allow any initialisation (like getting the connection from an uninitialised pool) and wait for the query to be validated.

What do you think?

Smert commented 1 year ago
  1. I think in general the stream should not be in the promise. Examples:
    
    // nodejs fs
    const stream = fs.createReadStream('/dev/input/event0');

// mysql const stream = connection.query('SELECT * FROM posts').stream();

// nodejs http const stream = http.request({ port: 1337, host: '127.0.0.1', method: 'CONNECT', path: 'www.google.com:80', });


2. We can use composition with special function instead of executeSelectStreamMultiple:
```typescript
async function * collectAsyncIterable<T>(
  asyncIterable: AsyncIterable<T>,
  chunkSize: number
): AsyncIterable<T[]> {
  let chunk: T[] = [];
  for await (const item of asyncIterable) {
    chunk.push(item);
    if (chunk.length === chunkSize) {
      yield chunk;
      chunk = [];
    }
  }

  if (chunk.length) {
    yield chunk;
  }
}

const companies = await connection
    .selectFrom(tCompany)
    .select({
        id: tCompany.id,
        name: tCompany.name
    })
    .orderBy('id')
    .executeSelectStream();

for await (const chunk of collectAsyncIterable(companies, 1000)) {
    console.log('companies', chunk)
}
juanluispaz commented 1 year ago

Hi,

For connectors that only iterate on one row at a time, the collectAsyncIterable will have no impact (like MySql); but, there are connectors that allow getting chunks at the time (like postgres or pg), being this more performant; that is why I'm including executeSelectStreamMultiple; if I want to implement that function for mysql I will need to do something like collectAsyncIterable

Regarding the stream in a promise, there are several things to have in consideration:

  1. Some connectors separate the query execution (async) and then how the data is processed (Iterator), like oracledb
  2. In the case of postgres or mysql (connector), the execution of the query is deferred til the iterator requests the first value; then, in case of an error in the sql, it will be thrown in the first yield
  3. In the case of oracledb, pg, mssql and tedious; the model is different because it is based on events that will be needed to transform in the AsyncIterable
  4. There is async logic that can be executed before running the actual query; for example, if you use a pool query runner for your mysql connection, getting the connection from the pool is asynchronous, and the not query executed yet.
  5. There is another async logic, the beginning of a transaction; in ts-sql-query the transactions are not opened in the database til the first time you need to use it (not at the time you mark the DBConection in a transaction), which allows avoiding to use connections in a complex system when that is not required.

It is possible to think in a way to not return a Promise (adding a lot of complexity in the Pool management), but my issue here will be:

One consequence of having the iterator already open will be if you don't consume the result; that problem is an already issue in mssql and oracledb; in the connectors of MySql/Maria db it is not explained, but seeing the code, it can be an issue because the query is already queued.

Regarding your examples:

// nodejs fs
const stream = fs.createReadStream('/dev/input/event0');

That is a low-level specific operation that reads that file in a stream; the analogous situation in ts-sql-query will be like this, go to the file system, if the file doesn't exist, create it with a default content (get the connection from the pool), then execute the virus scanner where the software is located in another machine (open the transaction); and finally, you got access to the content of the file.

Not easy; too much to have in consideration.

What do you think?

silviaperezh commented 1 year ago

Hello, I have seen your comments. I wanted to contribute my humble opinion, I use the library, and precisely in these previous days I had to work with the generation of reports, so the idea of being able to have the possibility of a selectStream seems fantastic to me. As for who should report a possible error in the execution of a selectStream in my opinion should be the producer (repository), so that the consumer does not have to worry about it, it also seems more reasonable because it is still an error that occurs in sql, so this should be sent by the sql statement itself. I hope you agree with my opinion. Juan Luis thanks for your library, it's great. Yours sincerely, Silvia.

arnaudelub commented 1 year ago

hey, i'd like to share my thoughts on this too, if the error is coming from the query, then the consumer should not be the one throwing it, but rather the producer, it make more sense to me, i agree with @silviaperezh.

For the case of connectors that don't support that feature, I'm not sure what to do; one option will be a fallback in an executeSelectMany but returned as AsyncIterable according to what was requested or throw an Error. Maybe the best option will be to throw an error but allow to enable a flag to fallback on executeSelectMany.

If you want to avoid useless open issues (we all know that a lot of people just don't read docs) i would throw an error for this one and use the flag to clearly state that you want the fallback to executeSelectMany, thus, avoiding missleading behavior.

Thanks for great job @juanluispaz and the great lib, keep up!!!!!

Smert commented 1 year ago

Good

Smert commented 2 months ago

Bump