confluentinc / confluent-kafka-javascript

Confluent's Apache Kafka JavaScript client
https://www.npmjs.com/package/@confluentinc/kafka-javascript
MIT License
92 stars 5 forks source link

Errors are not sent back to the caller #51

Open dermasmid opened 1 month ago

dermasmid commented 1 month ago

im using the kafkaJs variant and producer.send and i was observing logs like this that were not send back to the caller.

timestamp: 1717074105728
  fac: 'REQTMOUT',
  message: '[thrd:sonic-cluster-broker-1.kafka.internal.triplestack.io:9092/1]: sonic-cluster-broker-1.kafka.internal.triplestack.io:9092/1: Timed out 2 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests',
milindl commented 1 month ago

Hey @dermasmid , you should be getting a thrown error if you are awaiting producer.send, or a promise rejection if you are using "catch" on the promise returned by producer.send.

It will look similar to this:

Error: Local: Message timed out
    at Function.createLibrdkafkaError [as create] (/home/milindl/git/confluent-kafka-js/lib/error.js:456:10)
    at Producer.<anonymous> (/home/milindl/git/confluent-kafka-js/lib/producer.js:91:31) {
  name: 'KafkaJSError',
  retriable: false,
  fatal: false,
  abortable: false,
  code: -192,
  type: 'ERR__MSG_TIMED_OUT'
}

where my code was like this

producer.send({
            topic: 'mytopic',
            messages: [
                { value: 'v222', partition: 0 },
                { value: 'v11', partition: 0, key: 'x' },
            ]
        }).catch(console.error)

The logs are an additional warning which gives you the details on how the messages timed out (some of them may be timed out in flight, some of them might have timed out in the local queue the library maintains before bunching up and sending the messages, and so on).

If you do want to get every log message, you can use a custom logger, like in here - https://github.com/confluentinc/confluent-kafka-javascript/issues/41#issuecomment-2106643673 .

dermasmid commented 1 month ago

i did not get the error thrown to me i'm pretty sure. i think the error might have been suppressed in librdkafka side

milindl commented 4 weeks ago

Ah, okay, there is one more possibility. For any produce requests, there are two timeouts, the socket.timeout.ms, and the delivery.timeout.ms.

The first one is on a per-protocol-message basis. If a request is inflight for more than socket.timeout.ms, we get the REQTMOUT thing. However, the library automatically retries the message repeatedly with some backoff until delivery.timeout.ms is exhausted, and the user level is thrown only if the message completely failed delivery, including retries (since if the retry succeeds, we have recovered from the error and no user action is needed)

I am guessing that's what happened here - the actual protocol request timed out, but the library dealt with it through this mechanism of internal retries.

Is it possible to share the logs around the REQTMOUT thing? I might be able to confirm that theory.

If you don't want any internal retry behaviour, you can disable it through certain properties, by setting the allowed number of retries to 0.

         kafkaJS: {
            brokers: ['brokerName'],
            // ...,
            retry: {
                retries: 0
            }
        },
milindl commented 4 weeks ago

Also, both socket.timeout.ms, and the delivery.timeout.ms are configurable outside the kafkaJS block of the config. More details on this page about what they do.

dermasmid commented 4 weeks ago

i can share more logs, just note that i did actually loose some messages, it wasnt just the log, in fact because i saw i was missing data i went look what happened

milindl commented 4 weeks ago

Oh, that should definitely not happen. Please do share the logs.

Is it something you can reproduce on occasion, too? If so then it would be very good if you could turn on the full debug logs. They are quite verbose, though.

dermasmid commented 4 weeks ago

errno: -1
code: -1,
message: 'timed out',
origin: 'local',
message: [LibrdKafkaError: Local: Timed out] {
{
}
timestamp: 1717074108080
fac: 'FAIL',
message: '[thrd:sonic-cluster-broker-0.kafka.internal.triplestack.io:9092/0]:
sonic-cluster-broker-0.kafka.internal.triplestack.io:9092/0: 2 request(s) timed out: disconnect (after 1862053ms in
state UP)',
{
}
timestamp: 1717074108080
fac: 'REQTMOUT',
message: '[thrd:sonic-cluster-broker-0.kafka.internal.triplestack.io:9092/0]:
sonic-cluster-broker-0.kafka.internal.triplestack.io:9092/0: Timed out 2 in-flight, 0 retry-queued, 0 out-queue, 0
partially-sent requests',
{
}
timestamp: 1717074108080
fac: 'REQTMOUT',
message: '[thrd:sonic-cluster-broker-0.kafka.internal.triplestack.io:9092/0]:
sonic-cluster-broker-0.kafka.internal.triplestack.io:9092/0: Timed out ProduceRequest in flight (after 60181ms, timeout
#1)',
{
}
timestamp: 1717074108076
fac: 'REQTMOUT',
message: '[thrd:sonic-cluster-broker-0.kafka.internal.triplestack.io:9092/0]:
sonic-cluster-broker-0.kafka.internal.triplestack.io:9092/0: Timed out ProduceRequest in flight (after 60182ms, timeout
#0)',
{
}
}
errno: -1
code: -1,
message: 'timed out',
origin: 'local',
message: [LibrdKafkaError: Local: Timed out] {
{
}
}
errno: -1
code: -1,
message: 'timed out',
origin: 'local',
message: [LibrdKafkaError: Local: Timed out] {
{
}
timestamp: 1717074106466
fac: 'FAIL',
message: '[thrd:sonic-cluster-broker-2.kafka.internal.triplestack.io:9092/2]:
sonic-cluster-broker-2.kafka.internal.triplestack.io:9092/2: 1 request(s) timed out: disconnect (after 1790796ms in
state UP)',
{
}
timestamp: 1717074106466
fac: 'REQTMOUT',
message: '[thrd:sonic-cluster-broker-2.kafka.internal.triplestack.io:9092/2]:
sonic-cluster-broker-2.kafka.internal.triplestack.io:9092/2: Timed out 1 in-flight, 0 retry-queued, 0 out-queue, 0
partially-sent requests',
{
}
timestamp: 1717074106465
fac: 'REQTMOUT',
message: '[thrd:sonic-cluster-broker-2.kafka.internal.triplestack.io:9092/2]:
sonic-cluster-broker-2.kafka.internal.triplestack.io:9092/2: Timed out ProduceRequest in flight (after 60739ms, timeout
#0)',
{
}
timestamp: 1717074106371
fac: 'FAIL',
message: '[thrd:sonic-cluster-broker-1.kafka.internal.triplestack.io:9092/1]:
sonic-cluster-broker-1.kafka.internal.triplestack.io:9092/1: 1 request(s) timed out: disconnect (after 1792292ms in
state UP)',
{
}
timestamp: 1717074106371
fac: 'REQTMOUT',
message: '[thrd:sonic-cluster-broker-1.kafka.internal.triplestack.io:9092/1]:
sonic-cluster-broker-1.kafka.internal.triplestack.io:9092/1: Timed out 1 in-flight, 0 retry-queued, 0 out-queue, 0
partially-sent requests',
{
}
timestamp: 1717074106370
fac: 'REQTMOUT',
message: '[thrd:sonic-cluster-broker-1.kafka.internal.triplestack.io:9092/1]:
sonic-cluster-broker-1.kafka.internal.triplestack.io:9092/1: Timed out ProduceRequest in flight (after 60572ms, timeout
#0)',
{
}
}
errno: -1
code: -1,
message: 'timed out',
origin: 'local',
message: [LibrdKafkaError: Local: Timed out] {
{
}
}
errno: -1
code: -1,
message: 'timed out',
origin: 'local',
message: [LibrdKafkaError: Local: Timed out] {
{
}
timestamp: 1717074105826
fac: 'FAIL',
message: '[thrd:sonic-cluster-broker-1.kafka.internal.triplestack.io:9092/1]:
sonic-cluster-broker-1.kafka.internal.triplestack.io:9092/1: 1 request(s) timed out: disconnect (after 1017523ms in
state UP)',
{
}
timestamp: 1717074105826
fac: 'REQTMOUT',
message: '[thrd:sonic-cluster-broker-1.kafka.internal.triplestack.io:9092/1]:
sonic-cluster-broker-1.kafka.internal.triplestack.io:9092/1: Timed out 1 in-flight, 0 retry-queued, 0 out-queue, 0
partially-sent requests',
{
}
timestamp: 1717074105822
fac: 'REQTMOUT',
message: '[thrd:sonic-cluster-broker-1.kafka.internal.triplestack.io:9092/1]:
sonic-cluster-broker-1.kafka.internal.triplestack.io:9092/1: Timed out ProduceRequest in flight (after 60036ms, timeout
#0)',
{
}
timestamp: 1717074105729
fac: 'FAIL',
message: '[thrd:sonic-cluster-broker-1.kafka.internal.triplestack.io:9092/1]:
sonic-cluster-broker-1.kafka.internal.triplestack.io:9092/1: 2 request(s) timed out: disconnect (after 1016854ms in
state UP)',
{
}
timestamp: 1717074105728
fac: 'REQTMOUT',
message: '[thrd:sonic-cluster-broker-1.kafka.internal.triplestack.io:9092/1]:
sonic-cluster-broker-1.kafka.internal.triplestack.io:9092/1: Timed out 2 in-flight, 0 retry-queued, 0 out-queue, 0
partially-sent requests',
{
}
timestamp: 1717074105728
fac: 'REQTMOUT',
message: '[thrd:sonic-cluster-broker-1.kafka.internal.triplestack.io:9092/1]:
sonic-cluster-broker-1.kafka.internal.triplestack.io:9092/1: Timed out ProduceRequest in flight (after 60513ms, timeout
#1)',
{
}
timestamp: 1717074105725
fac: 'REQTMOUT',
message: '[thrd:sonic-cluster-broker-1.kafka.internal.triplestack.io:9092/1]:
sonic-cluster-broker-1.kafka.internal.triplestack.io:9092/1: Timed out ProduceRequest in flight (after 60513ms, timeout
#0)',
milindl commented 3 weeks ago

Thanks for the logs, I took a look.

Where is this particular log line being logged from - is it somewhere within your code? This is the error that should have been thrown by the producer.send() function on await.

It doesn't have the other stuff (timestamp/fac) that's present in the library's logger.

Is it possible to share the code which is producing, too? It'll probably help. Thanks.

errno: -1
code: -1,
message: 'timed out',
origin: 'local',
message: [LibrdKafkaError: Local: Timed out] {
{
}
}
dermasmid commented 3 weeks ago

this is coming from librdkafka, i had extracted it from gcp, that why it might look a little different you can see the timestamp is there too

milindl commented 3 weeks ago

Okay. Could you attach the relevant bits of code, too? I can reproduce the logging, but it's resulting in an exception thrown for me (or the other case I talked about, where the message is retried until it's sent).

Dhruv-Garg79 commented 3 weeks ago

I faced a similar issue yesterday in production, the requests to Kafka are taking an indefinite time, without any timeout or error thrown. I was also using Kafkajs version.

try {
    const res = await MyKafka.getInstance().producer.send({
        topic: topic,
        messages: messages,
    });
    ctx.logger.debug('sent message to kafka %s %j', topic, res);
} catch (error) {
    ctx.trace.recordException(error);
} 
milindl commented 2 weeks ago

Thanks @Dhruv-Garg79. Are you getting the same logging as above about the timeout?