ClickHouse / clickhouse-js

Official JS client for ClickHouse DB
https://clickhouse.com
Apache License 2.0
217 stars 26 forks source link

Error: socket hang up #294

Closed uginroot closed 2 months ago

uginroot commented 2 months ago

Describe the bug

Similar problem which was sort of solved in version 0.3.0: https://github.com/ClickHouse/clickhouse-js/issues/150 Error: socket hang up

This problem only appears at very high insertion rates in dozens of tables.

I am reading data from kafka in 10 threads and the addToInsertQueue call is going to different tables depending on the message received from kafka.

If I add await sleep(50) before calling addToInsertQueue, the problem appears much less often about once every 2-3 hours. If there is only one table, the problem appears about once a week.

I specifically use a machine with 2 CPUs and 4GB of memory. The CPU is at 100% on one of the processors, the memory is at 1.7GB.

Steps to reproduce

  1. Create many tables (100)
  2. Insert batch rows to many tables
  3. Wait 5-10 min

Expected behaviour

No uncaughtException messages

Code example

import { ClickHouseLogLevel, createClient, NodeClickHouseClient } from "@clickhouse/client";
import Stream from "stream";

const connection: NodeClickHouseClient = createClient({
    url: `http://remote-host:8123`,
    password: "password",
    username: "default",
    database: "default",
    keep_alive: {
        enabled: true,
        idle_socket_ttl: 1000,
    },
    log: {
        level: ClickHouseLogLevel.TRACE,
    },
});

type ConnBaseResult = ReturnType<ReturnType<typeof createClient>["insert"]>;

type BatchContext = {
    timeStart: number;
    insertStream: Stream.Readable;
    insertPromise: ConnBaseResult;
    timeoutId?: NodeJS.Timeout;
    rowsCount: number;
};

export abstract class ClickhouseAbstractBatchInsertRepository<T extends Record<string, any>> {
    private batchContext: BatchContext | undefined = undefined;

    protected readonly insertQueueCapacity = 10_000;

    protected readonly autoCommitInsertQueueInterval = 9_000;

    public constructor(private readonly clickhouseConnection: NodeClickHouseClient) {
        clickhouseConnection;
    }

    public async addToInsertQueue(row: T) {
        if (this.batchContext === undefined) {
            const insertStream = new Stream.Readable({
                objectMode: true, // required for JSON* family formats
                read() {
                    //
                },
            });
            this.batchContext = {
                timeStart: Date.now(),
                insertStream: insertStream,
                insertPromise: this.clickhouseConnection.insert({
                    table: this.getTableName(),
                    values: insertStream,
                    format: "JSONEachRow",
                }),
                timeoutId: setTimeout(() => {
                    void this.commit();
                }, this.autoCommitInsertQueueInterval),
                rowsCount: 0,
            };
        }
        this.batchContext.insertStream.push(row);
        this.batchContext.rowsCount++;
        void this._commit();
    }

    public async commit(): Promise<void> {
        await this._commit(true);
    }

    private async _commit(force = false): Promise<void> {
        const batchContext = this.batchContext;
        if (batchContext === undefined) {
            return;
        }

        if (!force && batchContext.rowsCount < this.insertQueueCapacity) {
            return;
        }

        this.batchContext = undefined;
        clearTimeout(batchContext.timeoutId);

        const exec = async (batchContext: BatchContext) => {
            if (batchContext.rowsCount === 0) {
                return;
            }

            // Выполняем запрос для вставки всех записей в формате json
            const startStart = Date.now();
            batchContext.insertStream.push(null);
            await batchContext.insertPromise;
            batchContext.insertStream.destroy();
        };

        await exec(batchContext);
    }

    protected abstract getTableName(): string;
}

// 1. Create many child classes for each table (extended ClickhouseAbstractBatchInsertRepository)
// 2. Call addToInsertQueue for each row

Error log

Insert: HTTP request error.
Error: socket hang up
    at connResetException (node:internal/errors:717:14)
    at Socket.socketOnEnd (node:_http_client:526:23)
    at Socket.emit (node:events:525:35)
    at Socket.emit (node:domain:489:12)
    at endReadableNT (node:internal/streams/readable:1359:12)
    at process.processTicksAndRejections (node:internal/process/task_queues:82:21)

image image

Configuration

Environment

uginroot commented 2 months ago

I think this is the reason https://medium.com/dkatalis/eventloop-in-nodejs-macrotasks-and-microtasks-164417e619b9

slvrtrn commented 2 months ago

Yes, considering your logs and your scenario, it indeed looks like when the event loop is blocked for too long, the timer that handles an idle socket removal is not working as expected (see https://github.com/ClickHouse/clickhouse-js/blob/e78b6f04417bd978b636aa3165ea824ed65049ba/packages/client-node/src/connection/node_base_connection.ts#L521-L529). It also explains why await sleep(50) helps (probably await sleep(0) would've worked, too).

uginroot commented 2 months ago

@slvrtrn What helped me was adding this code before the request:

// TODO: make macrotask
await new Promise((resolve) => setTimeout(resolve, 0));
slvrtrn commented 2 months ago

Side note: you could also try https://clickhouse.com/docs/en/optimize/asynchronous-inserts instead, even without waiting for an ack, instead of batching on the client side.

There are examples in the repo as well: https://github.com/ClickHouse/clickhouse-js/blob/main/examples/async_insert_without_waiting.ts

What helped me was adding this code before the request:

This checks out. Probably makes sense to add it at the start of the NodeBaseConnection#request method.

uginroot commented 2 months ago

I tried this way, but this way I think is wrong because I had a request to the server on every call to addToInsertQueue, and the insertion speed on one worker dropped from 800 per second to 300. And also the clickhouse server was overloaded (7% -> 30%, 8cpu).

Maybe I did something wrong.

I don't have the ability to accumulate records, so with this approach I have to call insert for each event.

slvrtrn commented 2 months ago

Maybe I did something wrong.

IIRC, Kafka consumers poll multiple messages at a time (and kafkajs has this), depending on the batch size in bytes, of course.

So maybe with async_insert the interface could be

public async addToInsertQueue(rows: Array<T>) {
  //
}

instead.

Anyways, would you like to open a PR with a zero timeout in request, if it is confirmed to resolve the issue? Otherwise, I can add it later.

uginroot commented 2 months ago

It took me a few days to figure out the cause of this error, for now it will be enough for me to locally apply my variant, but I hope it will be fixed later.

By the way, I found out that calling sleep(0) before insert does not help, or rather it does, but not for long. It crashes about once every 30 minutes, instead of crashing once every 5-10 minutes.

Only sleep(0) before each addToInsertQueue call saves, otherwise the timeout, judging by the logs, is still not respected, the difference is not 2000 ms, as in the logs above, but within 100ms.

slvrtrn commented 2 months ago

The fix is included in 1.4.1.

uginroot commented 2 months ago

Thank you! Checked, no more errors like that.

uginroot commented 2 months ago

@slvrtrn As it turns out the error still occurs. But now not a few minutes after startup, but about 1-2 times a day.

I dug into the source code of http.Request and http.Agent implementations and I think it is related to calling callback functions on socket (http.Request) and destroy (http.Agent) events through process.nextTick after emit method call.

I think you should think how to get rid of setTimeout in request method completely or, more simple solution, add mutex implementation.

class Mutex {
  constructor() {
    this._lastPromise = Promise.resolve();
  }

  async lock() {
    let resolveUnlock;

    const unlockPromise = new Promise(resolve => {
      resolveUnlock = resolve;
    });

    const currentPromise = this._lastPromise.then(() => unlockPromise);
    this._lastPromise = currentPromise;

    await currentPromise;
    return resolveUnlock;
  }
}

this.mutex = new Mutex();

async function request() {
  const unlock = await this.mutex.lock();
  await sleep(0);

  return new Promise(() => {
    // ...
    request.on('socket', (socket) => {
      // ...
      unlock();
    });
    // ...
  });
}
slvrtrn commented 2 months ago

@uginroot, aside from that, could you try adding backpressure handling to the stream that is provided into the insert method?

I checked the code from the OP again; it could be that

this.batchContext.insertStream.push(row);
this.batchContext.rowsCount++;

is called too often, and the event loop is overloaded with various events emitted by that. In that case, push will return false (and you should wait for drain to continue pushing the values).

Also, is there a good reason why this promise is dangling?

void this._commit();
uginroot commented 2 months ago

I added this code, I'll see if it works or not

import { once } from "events";
// ...
while (!this.batchContext.insertStream.push(row)) {
    await once(this.batchContext.insertStream, "drain");
}

Regarding void this._commit(); I actually simplified the code a bit to make it shorter, but the essence remains the same.

But I've already checked and made sure that it's not that.

The problem is that the onSocket call happens before cleanup works, but cleanup still works and then I try to write with a closed socket.

I tried to come up with a nice and simple solution without using setTimeout, but only solutions with storing sockets in an array (like here) and deleting them before calling this.createClientRequest(params) + Mutex work.


By the way, if you add your own http.Agent, the problem doesn't appear anymore:

http_agent: new http.Agent({
    keepAlive: true,
    keepAliveMsecs: 1000,
    timeout: 1000,
    maxSockets: 20,
}),
slvrtrn commented 2 months ago
while (!this.batchContext.insertStream.push(row)) {
    await once(this.batchContext.insertStream, "drain");
}

Shouldn't it be

if (!this.batchContext.insertStream.push(row)) {
  await once(this.batchContext.insertStream, "drain")
}

instead?

Cause in the docs:

Returns: true if additional chunks of data may continue to be pushed; false otherwise.

In any case, as I mentioned earlier, you could also try writing an entire batch received from your message broker (see EachBatch) without wrapping essentially every single row in a promise (with your addToInsertQueue) as it seems wasteful (i.e. addToInsertQueue(row: T) could become addToInsertQueue(rows: T[])); or you could try async inserts instead (again, not with single rows, but for the entire batch from that EachBatch method, as sending rows one by one will have too much overhead).

uginroot commented 2 months ago

I guess we have to use eachBatch to ensure that all events are recorded.

slvrtrn commented 2 months ago

if you add your own http.Agent, the problem doesn't appear anymore

You mentioned that the issue now happens 1-2 times a day. Maybe it hasn't been triggered yet when using a "custom" agent? Also, this is almost identical to how the internal HTTP agent is instantiated:

https://github.com/ClickHouse/clickhouse-js/blob/main/packages/client-node/src/connection/node_http_connection.ts#L11-L14

keepAliveMsecs: 1000,

this is the default value.

timeout: 1000,

IIRC this essentially calls socket.setTimeout and, by default, this is already set by the client on the socket level (here) and regulated via the request_timeout client option.