ClickHouse / clickhouse-js

Official JS client for ClickHouse DB
https://clickhouse.com
Apache License 2.0
189 stars 22 forks source link

Receiving [ERR_STREAM_PREMATURE_CLOSE] in the logs when intentionally closing the ResultSet stream before end #263

Closed spinlud closed 2 months ago

spinlud commented 2 months ago

Describe the bug

I have a stream generated from a ResultSet of a query. When consuming the stream, I need to end it prematurely if a certain condition is met:

import {pipeline} from 'stream/promises';
import {createWriteStream} from 'fs';
import {createClient} from '@clickhouse/client';

async function* myAsyncGenerator() {
    const clickHouseClient = createClient({
        // ...
    });

    const resultSet = await clickHouseClient.query({        
        query: `select * from my_table limit 10`,
        format: 'JSONCompactStringsEachRowWithNamesAndTypes',
    });

    const stream = resultSet.stream();

    let i = 0;

    for await (const rows of stream) {
        for (const row of rows) {
            i++;
            let jsonRow = await row.json();

            // If a condition is true, I want to stop consuming the stream prematurely
            if (i === 3) {
                return stream.destroy();
            }

            yield jsonRow.join(',') + '\n\n';
        }
    }
}

(async () => {
    const asyncGen = myAsyncGenerator();
    const outputStream = createWriteStream('output.txt');
    await pipeline(asyncGen, outputStream);
})();

This code terminates successfully (exit code 0) but it logs the following[ERR_STREAM_PREMATURE_CLOSE] error on the console:

image

Expected behaviour

I could be wrong, but this [ERR_STREAM_PREMATURE_CLOSE] error doesn't seem to originate on the stream object received from the call to the database, I suspect instead that this can orginate from another stream that is writing on this one (e.g. a stream from the socket who is writing the data?).

In this case I am voluntarily closing the stream before consuming all the data, so I don't want to see this [ERR_STREAM_PREMATURE_CLOSE] logged on the console. Also it doesn't seem to be an error, but just the log of an error, because the program terminates with success (exit code 0).

Is there any way to prevent this [ERR_STREAM_PREMATURE_CLOSE] error log to appear in the console?

Error log

Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
    at new NodeError (node:internal/errors:405:5)
    at Transform.onclose (node:internal/streams/end-of-stream:159:30)
    at Transform.emit (node:events:529:35)
    at Transform.emit (node:domain:489:12)
    at emitCloseNT (node:internal/streams/destroy:132:10)
    at processTicksAndRejections (node:internal/process/task_queues:81:21) {
  code: 'ERR_STREAM_PREMATURE_CLOSE'
}

Process finished with exit code 0

Configuration

Environment

ClickHouse server

slvrtrn commented 2 months ago

If the desired logic cannot be implemented via the ClickHouse query, you could just use AbortController.

import { pipeline } from 'stream/promises'
import { createWriteStream } from 'fs'
import { createClient } from '@clickhouse/client'

async function* myAsyncGenerator() {
  const clickHouseClient = createClient({
    // ...
  })

  const abortController = new AbortController()
  const resultSet = await clickHouseClient.query({
    query: `select * from system.numbers limit 10`,
    format: 'JSONCompactStringsEachRowWithNamesAndTypes',
    abort_signal: abortController.signal,
  })

  let i = 0
  for await (const rows of resultSet.stream()) {
    for (const row of rows) {
      i++

      // If a condition is true, I want to stop consuming the stream prematurely
      if (i === 5) {
        console.log('aborting')
        abortController.abort()
        break
      }

      const result = row.json<string[]>().join(',') + '\n\n'
      console.log('Yielding row', result)
      yield result
    }
  }
}
;(async () => {
  const asyncGen = myAsyncGenerator()
  const outputStream = createWriteStream('output.txt')
  await pipeline(asyncGen, outputStream)
})()

console output:

➜  examples git:(main) ✗ ts-node node/test.ts
Yielding row number

Yielding row UInt64

Yielding row 0

Yielding row 1

aborting

The output.txt file contents:

image

NB: You might want to add https://clickhouse.com/docs/en/operations/settings/settings#cancel-http-readonly-queries-on-client-close. Also, see the abort_request example.

spinlud commented 2 months ago

Hi, thanks for the help!

I have tried your solution using the AbortController, but got this error instead:

image

Also I have noticed that you are breaking only the inner loop when aborting, but not the outer loop. Is any specific reason for not breaking also the outer loop?

slvrtrn commented 2 months ago

You are right. I tried adjusting the asyncGenerator approach again, which was not trivial. Maybe something like this will do the trick? Please notice the backpressure comment — you'll likely need to handle this with such a manual approach.

import { createWriteStream } from 'fs'
import { createClient, ClickHouseLogLevel } from '@clickhouse/client'

(async () => {
  process.on('unhandledRejection', (err) => {
    console.error('unhandledRejection:', err)
  })
  process.on('uncaughtException', (err) => {
    console.error('uncaughtException:', err)
  })

  const client = createClient({
    log: {
      // level: ClickHouseLogLevel.TRACE,
    },
  })

  const abortController = new AbortController()
  const resultSet = await client.query({
    query: `select * from system.numbers limit 10`,
    format: 'JSONCompactStringsEachRowWithNamesAndTypes',
    abort_signal: abortController.signal,
  })

  let i = 0
  const stream = resultSet.stream<string[]>()
  const outputStream = createWriteStream('output.txt')

  await new Promise((resolve, reject) => {
    stream
      .on('data', (rows) => {
        for (const row of rows) {
          if (i++ === 5) {
            console.log('Reached the condition, ending stream...')
            abortController.abort()
            return
          }
          const result = row.json().join(',') + '\n\n'
          console.log('Yielding row', result)
          // important: need to add backpressure handling here
          outputStream.write(result)
        }
      })
      .on('error', (err) => {
        console.error('Error in stream', err)
        reject(err)
      })
      .on('end', () => {
        console.log('End of stream, resolve...')
        resolve(0)
      })
  })

  console.log('Closing client...')
  await client.close()
})()

Prints:

Yielding row number

Yielding row UInt64

Yielding row 0

Yielding row 1

Yielding row 2

Reached the condition, ending stream...
[2024-05-08T14:37:53.701Z][TRACE][@clickhouse/client][Connection] Socket 0ae5c20c-868e-471c-8b2f-43f7a2ffb68e was released
End of stream, resolve...
Closing client...
[2024-05-08T14:37:53.701Z][TRACE][@clickhouse/client][Connection] Socket 0ae5c20c-868e-471c-8b2f-43f7a2ffb68e was closed or ended, 'free' listener removed

This output looks right to me - the socket is properly released on the abort event and is back in the keep-alive pool, and it is only destroyed when we close the client.

slvrtrn commented 2 months ago

And with just 1 socket and the second consecutive request, all seems OK, too.

import { createWriteStream } from 'fs'
import { createClient } from '@clickhouse/client'
import { ClickHouseLogLevel } from '@clickhouse/client-common'

;(async () => {
  process.on('unhandledRejection', (err) => {
    console.error('unhandledRejection:', err)
  })
  process.on('uncaughtException', (err) => {
    console.error('uncaughtException:', err)
  })

  const client = createClient({
    max_open_connections: 1,
    log: {
      level: ClickHouseLogLevel.TRACE,
    },
  })

  const abortController = new AbortController()
  const resultSet = await client.query({
    query: `select * from system.numbers limit 10`,
    format: 'JSONCompactStringsEachRowWithNamesAndTypes',
    abort_signal: abortController.signal,
  })

  let i = 0
  const stream = resultSet.stream<string[]>()
  const outputStream = createWriteStream('output.txt')

  await new Promise((resolve, reject) => {
    stream
      .on('data', (rows) => {
        for (const row of rows) {
          if (i++ === 5) {
            console.log('Reached the condition, ending stream...')
            abortController.abort()
            return
          }
          const result = row.json().join(',') + '\n\n'
          console.log('Yielding row', result)
          // important: need to add backpressure handling here
          outputStream.write(result)
        }
      })
      .on('error', (err) => {
        console.error('Error in stream', err)
        reject(err)
      })
      .on('end', () => {
        console.log('End of stream, resolve...')
        resolve(0)
      })
  })

  const rs = await client.query({
    query: 'SELECT 1 AS number',
    format: 'JSONEachRow',
  })
  console.log(
    'Verifying that we can query using the same socket one more time...',
    await rs.json(),
  )

  console.log('Closing client...')
  await client.close()
})()
Yielding row number

Yielding row UInt64

Yielding row 0

Yielding row 1

Yielding row 2

Reached the condition, ending stream...
[2024-05-08T14:47:24.767Z][TRACE][@clickhouse/client][Connection] Socket 1bbbdc08-9d1d-466e-8248-a2d2c62cffc3 was released
End of stream, resolve...
[2024-05-08T14:47:24.767Z][TRACE][@clickhouse/client][Connection] Reusing socket 1bbbdc08-9d1d-466e-8248-a2d2c62cffc3
...
[2024-05-08T14:47:24.774Z][TRACE][@clickhouse/client][Connection] Socket 1bbbdc08-9d1d-466e-8248-a2d2c62cffc3 was released
Verifying that we can query using the same socket one more time... [ { number: 1 } ]
Closing client...
[2024-05-08T14:47:24.774Z][TRACE][@clickhouse/client][Connection] Socket 1bbbdc08-9d1d-466e-8248-a2d2c62cffc3 was closed or ended, 'free' listener removed
slvrtrn commented 2 months ago

I think I understand what is happening now.

Due to the query being in the async generator function body, an unfinished request goes out of scope, and so does the AbortController that is used internally to cancel the request on errors in the Node.js connection src.

That internal AbortController is used to prevent the underlying sockets from being stuck while dialing an unreachable host, which can happen even if the request was timed out (and that was the only sensible solution I could find to that issue).

A fun fact about the AbortController is that it fires the abort signal when it goes out of scope. This also explains why my stream example does not produce any errors, as there is no extra function there (but I believe it is still incorrect), so the query (and the request) does not go out of scope.

Then, the error is printed here.

I will check if I can make it less annoying and more transparent to the user so that just the ResultSet.close method can be called, and that's it, with proper discard of the response stream while keeping the socket and without unnecessary error messages in the console, as the last part is very outdated and is basically a tech debt.

slvrtrn commented 2 months ago

Should be fixed in 1.0.2.

import { pipeline } from 'stream/promises'
import { createWriteStream } from 'fs'
import { createClient } from '@clickhouse/client'

async function* myAsyncGenerator() {
  const clickHouseClient = createClient({
    // ...
  })

  const resultSet = await clickHouseClient.query({
    query: `SELECT * FROM system.numbers LIMIT 10`,
    format: 'JSONCompactStringsEachRowWithNamesAndTypes',
  })

  const stream = resultSet.stream()

  let i = 0

  for await (const rows of stream) {
    for (const row of rows) {
      // If a condition is true, I want to stop consuming the stream prematurely
      if (i++ === 5) {
        return stream.destroy()
      }

      yield row.json<string[]>().join(',') + '\n'
    }
  }
}

;(async () => {
  const asyncGen = myAsyncGenerator()
  const outputStream = createWriteStream('output.txt')
  await pipeline(asyncGen, outputStream)
})()

Yields the following file:

number
UInt64
0
1
2

Without unnecessary errors in the logs.

Please feel free to re-open or create a new one if there are still any issues.