Closed vincenzon closed 1 year ago
Does it behave the same if you follow this endless JSON stream example?
That's the exact scenario we tested for the streaming of large inserts continuously for prolonged amounts of time.
Please note that we don't await the insert promise here. The chunks of the data are periodically dumped into the stream, and it is never closed until we hit Ctrl+C. Then, finally, we finalize the stream and await the promise.
I have not, but I will. The reason I avoided this method was this note in the example:
// NB: results will not appear in ClickHouse immediately as it is buffered internally,
// and by default flushing happens very infrequently for smaller datasets,
// as ClickHouse awaits for the input stream to be terminated.
Any idea what "infrequently" means, and is there a way to control how often the buffer is flushed?
I moved the allocation of the stream and the insert operation outside of the function pushing the data, so it is like:
const stream = new Stream.Readable({ objectMode: true, read: () => {}});
clientCH.insert({
table: 'data_table',
values: stream, // data, (Note, using the stream or the raw data results in the same)
format: "JSONEachRow",
});
const pushData1 = (data: Array<RowData>) => {
stream.push(data);
};
const pushData2 = (data: Array<RowData>) => {
data.forEach(d => stream.push(d));
};
When I repeatedly call pushData1
memory usage increases without bound. I also tried pushData2
thinking somehow the push
was holding on to the data array somehow, but that also did not work.
Thanks for trying. Does it eventually crash the application when run with really low heap limits, such as --max-old-space-size=128
?
@vincenzon would it be possible to provide a minimalistic code snippet to reproduce the problem?
Thanks for the example.
Here's my conclusion: generating the data in while(true)
does not work correctly with the Node's GC.
See the corrected implementation:
import { createClient } from "@clickhouse/client";
import type Stream from 'stream'
import { Readable } from "stream";
const table = "test_ch_conn";
async function connect() {
const url = new URL('http://default:@localhost:8123');
const client = createClient({
host: url.origin,
username: url.username,
password: url.password,
});
await client.exec({
query: `
CREATE TABLE IF NOT EXISTS ${table}
(id UInt64, name String, val Float64)
ENGINE MergeTree()
ORDER BY (id)
`,
});
return client
};
type Row = { id: number; name: string; val: number };
let id = 0;
function pushData(stream: Stream.Readable, n: number): void {
for (let i = 0; i < n; i++) {
const row: Row = { id: id++, name: `row ${id}`, val: id / 3.1 }
// stream might be closed by SIGINT handler
if (!stream.closed) {
// note that we push individual rows, not arrays of rows
stream.push(row)
}
}
};
const run = async () => {
console.log(`Running...`);
const client = await connect();
const stream = new Readable({
objectMode: true,
read: () => {},
});
const insertPromise = client.insert({
table,
values: stream,
format: "JSONEachRow",
}).catch(e => {
console.error(e)
process.exit(1)
});
const n = 10000;
const w = 100 * n;
// replace while true with setInterval, so GC can do its job
setInterval(() => {
if (id % w === 0) {
console.log(`id ${id}, mem: ${process.memoryUsage().heapUsed}`);
}
pushData(stream, n)
}, 50)
async function cleanup() {
stream.push(null) // close the stream and finalize the write
await insertPromise
await client.close()
console.info('Stream is closed')
process.exit(0)
}
process.on('SIGINT', cleanup)
process.on('SIGTERM', cleanup)
};
process.on('uncaughtException', (err) => logAndQuit(err))
process.on('unhandledRejection', (err) => logAndQuit(err))
function logAndQuit(err: unknown) {
console.error({ msg: 'Unhandled exception', err })
process.exit(1)
}
run().catch(e => {
console.error(e)
process.exit(1)
});
Please note that setInterval
, especially with such low values, is chosen to be an example;
you probably wouldn't want to do that in production :) but the value is low enough to mimic an endless loop (the next interval call is never started before the end of the previous one, IIRC).
The problem is that the loop was holding back the GC on the generated arrays. Thus, we retained too many garbage strings over time and crashed.
With the updated source, no leaks:
Running...
id 0, mem: 102240024
id 1000000, mem: 114551232
id 2000000, mem: 116838336
id 3000000, mem: 107154520
id 4000000, mem: 105199264
^C
Stream is closed
As for your other question:
Any idea what "infrequently" means, and is there a way to control how often the buffer is flushed?
It's... a complicated topic.
I'd say it depends mainly on the size of the incoming data. Here are a few quotes from our CH Core team engineers:
There's no way to make new data appear in table immediately after sending it from client, because:
- it may be buffered on client side
- even if you flush buffers on client side, it still may be buffered by kernel on client's host
- even if you force kernel to flush network buffers, it takes time to transfer data by network
- server's host also have network buffers in kernel; clickhouse-server also has its own complex buffering on top
- it takes time to process received data and insert it
You should not expect any data to appear before you received response to the insert query. It's only guaranteed that new data will become visible on the same replica immediately after client receives response.
As stream is open, we process insert query until stream will be closed so in our read buffer in input format from http we will get eof and will be sure that there will be no data anymore In row input format we process rows in batches, we process max_insert_block_size (about 65k by default) rows and only after it we create a new block and send it further in pipeline, so sometimes you see that data was inserted after we processed max_insert_block_size rows
there are some related settings like max_read_buffer_size, max_block_size, max_insert_block_size, min_insert_block_size_rows, min_insert_block_size_bytes, max_compress_block_size, etc. But these settings are low-level and it's highly not recommended to change them. Moreover, tuning these settings will not completely solve the problem because of buffering in the kernel, for example
My main takeaway from this - I need to provide more clear examples on streaming (with more comments and caveats outlining).
@vincenzon, please let us know if we can help you more with your implementation.
Thanks for the response @slvrtrn ,
Unfortunately, I don't think it is the setInterval
vs while (true)
that is at play here. If you change the setInterval
time from 50 down to 1 (I did this in my example and pushed it for you to try) you will see memory blows up. So, it is not the setInterval
per se, but rather the speed of the queries.
In my actual code I don't have a while
loop, what I have is much closer to the setInterval
case. In my code the inserts are triggered by an asynchronous process. That process await
's on the analogue of the push
call in the example. It does push messages very quickly, like the revised example (1ms instead of 50ms). It's still not clear to me why the memory usage expands so quickly.
I appreciate the commentary on the inserts. For my particular case it is not important that the latency between the push
into the stream event and the eventual appearance in the table be optimized. As long as that latency is reasonably fast (order minutes) it would be fine. My reading of the comments is that an amount of data less than max_insert_block_size
could be buffered indefinitely until the stream is actually closed. This means, for me, during busy times it is not necessary to close / reconnect the stream, but when things slow down, a close / reconnect will guarantee the latency is never more than a threshold I can code to. So that works for me.
If you have any ideas on how to control memory when inserts are happening very quickly, I'd like to hear about them.
I see. Indeed, I should've tried setInterval(() => {}, 1)
... my bad.
As long as that latency is reasonably fast (order minutes) it would be fine. My reading of the comments is that an amount of data less than max_insert_block_size could be buffered indefinitely until the stream is actually closed. This means, for me, during busy times it is not necessary to close / reconnect the stream, but when things slow down, a close / reconnect will guarantee the latency is never more than a threshold I can code to. So that works for me.
Sounds like a plan for now :)
We will brainstorm what is possibly going wrong in this case (cause there are very few pieces in the source that cause this issue, and they don't look too suspicious to me).
For now, to me, it looks like there might not be enough time for Node to perform the GC operation in case of very big inserts happening too fast.
(edit: no, this is not the case, cause if (global.gc) { global.gc() }
does not seem to help with the --expose-gc
parameter added; something else is holding the memory back)
But again, we will check it out.
Cheers.
@vincenzon I think I got it right this time.
What we were missing is backpressure in case of extremely large inserts with extremely little latency. Please have a look at the improved implementation (with while (true)
):
import { createClient } from "@clickhouse/client";
import type Stream from 'stream'
import { Readable } from "stream";
const table = "test_ch_conn";
async function connect() {
const url = new URL('http://default:@localhost:8123');
const client = createClient({
host: url.origin,
username: url.username,
password: url.password,
});
await client.exec({
query: `
CREATE TABLE IF NOT EXISTS ${table}
(id UInt64, name String, val Float64)
ENGINE MergeTree()
ORDER BY (id)
`,
});
return client
};
type Row = { id: number; name: string; val: number };
let id = 0;
async function pushRow(stream: Stream.Readable, row: Row) {
// stream might be closed by SIGINT handler
if (!stream.closed) {
// note that we push individual rows, not arrays of rows
// console.log(`Trying to push row #${row.id}`)
const backpressure = !stream.push(row)
// if the destination is overloaded - try again a bit later
if (backpressure) {
// console.log(`Too much data! Retry row #${row.id}`)
await new Promise(resolve => {
setTimeout(resolve, 1)
})
await pushRow(stream, row)
}
// console.log(`Pushed row #${row.id}`)
}
}
async function pushRows(stream: Stream.Readable, n: number): Promise<void> {
for (let i = 0; i < n; i++) {
const row: Row = { id: id++, name: `row ${id}`, val: id / 3.1 }
await pushRow(stream, row)
}
};
const run = async () => {
console.log(`Running...`);
const client = await connect();
const stream = new Readable({
objectMode: true,
read: () => {},
});
const insertPromise = client.insert({
table,
values: stream,
format: "JSONEachRow",
}).catch(e => {
console.error(e)
process.exit(1)
});
async function cleanup() {
stream.push(null) // close the stream and finalize the write
await insertPromise
await client.close()
console.info('Stream is closed')
process.exit(0)
}
process.on('SIGINT', cleanup)
process.on('SIGTERM', cleanup)
const n = 10000;
const w = 100 * n;
while (true) {
if (id % w === 0) {
console.log(`id ${id}, mem: ${process.memoryUsage().heapUsed}`);
}
await pushRows(stream, n)
}
};
process.on('uncaughtException', (err) => logAndQuit(err))
process.on('unhandledRejection', (err) => logAndQuit(err))
function logAndQuit(err: unknown) {
console.error({ msg: 'Unhandled exception', err })
process.exit(1)
}
run().catch(e => {
console.error(e)
process.exit(1)
});
I've got a pretty nice result here without memory leaks:
➜ node -r ts-node/register --max_old_space_size=128 --expose-gc ./src/main.ts
Running...
id 0, mem: 104351616
id 1000000, mem: 101595544
id 2000000, mem: 101055944
id 3000000, mem: 104570360
id 4000000, mem: 101126504
id 5000000, mem: 100460584
id 6000000, mem: 103365192
id 7000000, mem: 105620712
id 8000000, mem: 103390560
id 9000000, mem: 100426936
id 10000000, mem: 104352112
id 11000000, mem: 104218032
If we uncomment a log entry about backpressure, we will see a lot of actual retries happening, even in the beginning, and in the case of a local ClickHouse, a mere millisecond is enough to wait:
➜ node -r ts-node/register --max_old_space_size=128 --expose-gc ./src/main.ts
Running...
id 0, mem: 106729328
Too much data! Retry row #335
Too much data! Retry row #662
Too much data! Retry row #991
Too much data! Retry row #1311
Too much data! Retry row #1631
Too much data! Retry row #1953
Too much data! Retry row #2275
Too much data! Retry row #2597
Too much data! Retry row #2919
Too much data! Retry row #3240
Too much data! Retry row #3558
Too much data! Retry row #3876
Too much data! Retry row #4194
Too much data! Retry row #4512
What was happening here - the superfluous records kept piling up as the destination stream could not keep up, ending up in memory endlessly, which in turn led to a crash with OOM.
Now I think taking the result of stream.push
into account and acting accordingly might work in your case.
We must have been reading the same stuff. I was trying to push(null)
and allocate a new stream when there was back pressure but couldn't get that to work. I'll give your solution a go!
I noticed you resend the row when there is back pressure. This is not necessary. The indication of back pressure (return of false
) does not indicate a failure to do the insert, just that the buffers are reaching thresholds. You can see that resending duplicates rows by running the query:
select count(*) as nRow, count(distinct id) as nId
from default.test_ch_conn
and you'll see nId < nRow.
I think I need to add handling of the drain event to adjust properly for the back pressure. Looking into that now and will post here the finished result.
Nice catch!
EDIT: yeah, the drain
event handler could've been more robust in this case, as we possibly can improve the performance by avoiding any "sleeps". However, the "sleep" might be handy as we can fine-tune the delay if the application does something other than data ingestion.
EDIT2: wrapping every insert in a Promise has a huge negative performance impact, though. So, in the end, drain
seems like the best solution.
It turns out the Readable
stream does not emit drain events when back pressure has eased. I ended up with a solution that keeps the client open, but creates a new stream every few seconds to push the inserts into. I found a long lived stream led to memory issues and/or excessive delivery delays (as mentioned in the comments above).
With the new arrangement things are so far looking good, so I'm closing this.
I am inserting data by repeatedly calling:
This eventually runs out of memory. Logging
process.memoryUsage()
shows the heap grows without ever decreasing.Very similar code was running without this issue using another node Clickhouse client (@apla/node-clickhouse) so I assume I am missing something in my translation to this client.
Is there a way to prevent this excessive memory consumption?