tulios / kafkajs

A modern Apache Kafka client for node.js
https://kafka.js.org
MIT License
3.74k stars 525 forks source link

Lock contention on high load #303

Closed tulios closed 5 years ago

tulios commented 5 years ago

From a comment on issue #177

Since it only happens in specific load conditions, it's hard for me to reproduce it right away, but looking at the logs, I think it's from a producer.send call. You can find the > following snippet on how I'm initializing the producer and how I'm calling the send function.

// Incoming data ... Can be considered something like this:
for (let i = 0; i < 100000; i++)
    queueMessage(message, 'specific-topic');

// Producer chunk ...
const kafkaClient = new Kafka({
    clientId: `SomeID`,
    brokers: ['10.240.0.6:9092', '10.240.0.7:9092', '10.240.0.20:9092'],
    connectionTimeout: 5000,
    requestTimeout: 60000,
    maxInFlightRequests: 200,
    retry: {
        initialRetryTime: 1000,
        retries: 5
    }
});

const producer = kafkaClient.producer();

async function queueMessage(message, topic, retries = 0) {
    try {
        await producer.send({
            topic,
            // TODO -> KafkaJS probable bug? key=null should be present
            messages: [{key: null, value: JSON.stringify(message)}],
            acks: 1,
            compression: kafkaUtils.SNAPPY
        });
    } catch (e) {
        if (retries < 2)
            setTimeout(() => queueMessage(message, topic, retries + 1), 2000 * (retries + 1));
    }
}

I'm not using any Promise.all, and the flow of incoming messages is a stream of small messages, so I can't actually batch them together and send them, it's more like streaming a lot of > messages in realtime, for a messaging application.

And I'm using KafkaJS 1.4.7.

@AlirezaSadeghi I moved the investigation to a new issue

AlirezaSadeghi commented 5 years ago

Hi @tulios

Um, I'm still having all sorts of problems arising sometimes here and there in the processes, sometimes it's the lock contention error, sometimes it's the following errors:

{"level":"WARN","timestamp":"2019-03-11T01:21:22.932Z","logger":"kafkajs","message":"[RequestQueue] Response without match","clientId":"ps-8682-router","broker":"10.240.0.7:9092","correlationId":4278511}

{"level":"ERROR","timestamp":"2019-03-11T01:21:22.947Z","logger":"kafkajs","message":"[Producer] Request Metadata(key: 3, version: 4) timed out","retryCount":0,"retryTime":359}

It's good also to mention that I see these 2 error types, way way more than the lock contention.

In our usecase, a consumer consumes messages, say a million, sends them (downstream, system generated) and then from the clients, the process receives 10X more messages (upstream, client generated) that need to be produced to some other kafka topics).

What I suppose happens is that due to the high load, lots of events are created in the same eventLoop tick (load on the producer side), delaying the consumer periodic heartbeat, so much until the broker loses track of the consumer, consumer tries to connect back to the cluster, but it's faced with the "Response without match" error, which I don't know why it happens actually. It seems to me that producers are somehow overloaded (because they have to handle a very huge spiky load), then they try to retry and that only makes things worse, so consumer sessions gets invalidated because CPU is exhausted to the end and there's no time for a heartbeat.

I don't know if my intuition is right here also, just something I have in mind, but I'd be glad if you could provide any insights.

tulios commented 5 years ago

Hi @AlirezaSadeghi, this is a high priority issue for me, I'll bug you a bit this week about this. I'm assuming that the "Response without match" errors are happening on version 1.5.0, right? I have a fix for it (it's related to #309).

Are the producers and consumers on the same machine (or same node.js process)? Because the requests are timing out, and the lock is timing out, so it's related to exhaustion of the event loop. I'll investigate the issue this week and get back to you with more data.

Just as temporary mitigation you can increase connectionTimeout and requestTimeout.

AlirezaSadeghi commented 5 years ago

That would be great! I'll be all out helping fix this issue because it's really reducing the reliability of the messaging mechanism.

About the consumers and producers, yes, they are in the same process, there's basically one consumer and one producer in the same process. (i.e app.js starts up the producer first, and then starts the consumer, consumer starts consuming messages and sending them, and then messages start coming back to the process and here, producer starts sending them back to a kafka topic)

And initially, I was using 1.4.7, I'm giving 1.5.0 a try now to see how it behaves. I'll also increase those two again, but they are already 60 seconds or so.

tulios commented 5 years ago

Hey @AlirezaSadeghi, I'm still looking at this. I made some tests, and I have a lead on the problem, but I haven't found the time to debug it fully.

tulios commented 5 years ago

@AlirezaSadeghi I have performed a lot of CPU profiling, and I couldn't find anything yet, one thing that I noticed while debugging was that in your example you never await for queueMessage, which means you are generating 100k promises on the same tick, was that a typo or this is how the system is working?

tulios commented 5 years ago

@AlirezaSadeghi I think I improved things for you, turned out that I was a victim of my recommendations, the lock release mechanism was using Promise.all and looking at your example the code could reach 1k+ locked resources, so this would overload the system. PR #323 should improve this, I've tried the change with your example, and it worked fine.

AlirezaSadeghi commented 5 years ago

@AlirezaSadeghi I have performed a lot of CPU profiling, and I couldn't find anything yet, one thing that I noticed while debugging was that in your example you never await for queueMessage, which means you are generating 100k promises on the same tick, was that a typo or this is how the system is working?

Um Yeah that's how the system works. Messages are received through sockets in parallel and lots of emits might be called in a single tick, that's why I didn't put the await there when calling the queueMessage function. I don't think we can reproduce what happens there when we have the await key and everything's handled sequentially.

Also thanks for the fix, any Idea how much would it take for this to end up as a release ? If not, I'll just take the code in master and spin in production to see how it performs.

tulios commented 5 years ago

If you can, I would recommend taking the code in master for a while, but I think the improvement is significative enough to release 1.5.2. I want to make sure that I won't release 1.5.4 right afterward with another fix, so I like to let it sink for a while. But we can get a new version next week.

tulios commented 5 years ago

@AlirezaSadeghi 1.5.2 was released with the fix.