Closed rkclark closed 2 years ago
This behavior would have changed with #1274, which provides more context. The intention of the mechanism is to allow a consumer to restart if the reason for the crash was a retriable error. Previously, if any error was thrown from inside a retry
block, it would end up throwing a KafkaJSNumberOfRetriesExceeded
error, which was always considered retriable. Now, if the original error is not retriable, it instead throws a KafkaJSNonRetriableError
- essentially it's reflecting the retriability of the original cause. I would say it's a bug fix, since a non-retriable error magically became retriable just because it happened to be thrown from inside a retrier.
await retry(async () => {
throw new TypeError('π£') // This would have resulted in a restart, even though it would never work
})
throw new TypeError('π') // This would *not* have resulted in a restart, because it wasn't wrapped in a retrier
Non-retriable errors never led to a restart - unless they happened to be thrown from inside a retrier. This doesn't really make sense to me. The new approach is consistent, whether the error comes from a retrier or not.
await retry(async () => {
throw new TypeError('π£') // This will now not cause a restart
})
await retry(async () => {
throw new KafkaJSConnectionError('π') // This is retriable and will eventually cause a restart
})
Is it definitely intended that restartOnFailure no longer allows customisation of behaviour for KafkaJSNumberOfRetriesExceeded and will this change be retained?
restartOnFailure
still controls the behavior for retriable errors, whether they are coming from inside a retrier or not. It does not control the behavior of non-retriable errors, however. For example, if there's a TypeError being thrown from somewhere, it will cause a crash and not a restart, whether it's in a retrier or not.
I believe it's saying specifically that consumers will restart on KafkaJSNumberOfRetriesExceeded but that is no longer the case
It should be updated to say that consumers will restart on retriable errors.
I'd assumed that previously, KafkaJSNumberOfRetriesExceeded would lead to a restart because a restart would potentially resolve whatever was causing the retries to fail. Is that not the case?
Precisely. If the error is retriable, but we ran out of attempts, the assumption is that if we throw away current state and restart, the issue may resolve itself. If the error is not retriable, then the assumption is that restarting won't help. See #69 for the original feature request and related pull request.
is it the case that any logic relating to restarting because of KafkaJSNumberOfRetriesExceeded would now be expected to be in a consumer crash listener?
If you want to restart the consumer on any error, whether it's a transient network error or a TypeError, then at the moment I don't see any other way than listening for the crash event and manually recreating the consumer. But then I would ask what the point would be of restarting on an error that won't be solved by restarting.
Thanks @Nevon ! Makes sense and thank you for the detailed reply.
I don't want to restart consumers on non-retriable errors such as a TypeError :) What I think I am seeing on 1.16.0 is a connection error that is not prompting a restart but going to a crash which then just leaves the consumer sitting there doing nothing.
What I am doing:
What happens on 1.15.0 is that the consumer enters the retry + restart cycle.
What happens on 1.16.0 is that the consumer does one set of retries and then crashes with no restart.
Here are the relevant logs from 1.16.0:
ERROR [2022-02-22T15:33:52.003Z]: Failed to connect to seed broker, trying another broker from the list: Connection error: connect ECONNREFUSED ::1:29092
retryCount: 5
retryTime: 8620
code: "kafkajs"
namespace: "BrokerPool"
ERROR [2022-02-22T15:33:52.006Z]: Crash: KafkaJSNonRetriableError: Connection error: connect ECONNREFUSED ::1:29092
stack: "KafkaJSNonRetriableError: Connection error: connect ECONNREFUSED ::1:29092\n at /Users/rickc/dev/project/node_modules/kafkajs/src/retry/index.js:53:18\n at processTicksAndRejections (node:internal/process/task_queues:96:5)"
code: "kafkajs"
namespace: "Consumer"
Crash listener receives:
id: 3,
type: 'consumer.crash',
payload: {
error: KafkaJSNonRetriableError: Connection error: connect ECONNREFUSED ::1:29092
at /Users/rickc/dev/project/node_modules/kafkajs/src/retry/index.js:53:18
at processTicksAndRejections (node:internal/process/task_queues:96:5) {
retriable: false,
helpUrl: undefined,
originalError: [KafkaJSNumberOfRetriesExceeded]
},
restart: false
},
timestamp: 1645544032012
}
Then nothing happens - no further logs, no restart, the consumer is just sat doing nothing. It doesn't trigger the restartOnFailure method. That's why I thought I might need to do a manual restart.
To compare, here is the same scenario with 1.15.0:
ERROR [2022-02-22T15:19:36.762Z]: Failed to connect to seed broker, trying another broker from the list: Connection error: connect ECONNREFUSED ::1:29092
retryCount: 5
retryTime: 13530
code: "kafkajs"
namespace: "BrokerPool"
ERROR [2022-02-22T15:19:36.762Z]: Crash: KafkaJSNumberOfRetriesExceeded: Connection error: connect ECONNREFUSED ::1:29092
retryCount: 5
stack: "KafkaJSNonRetriableError\n Caused by: KafkaJSConnectionError: Connection error: connect ECONNREFUSED ::1:29092\n at Socket.onError (/Users/rickc/dev/project/node_modules/kafkajs/src/network/connection.js:152:23)\n at Socket.emit (node:events:390:28)\n at Socket.emit (node:domain:475:12)\n at emitErrorNT (node:internal/streams/destroy:164:8)\n at emitErrorCloseNT (node:internal/streams/destroy:129:3)\n at processTicksAndRejections (node:internal/process/task_queues:83:21)"
code: "kafkajs"
namespace: "Consumer"
It then enters restartOnFailure where our code returns true
Then the crash listener gets:
{
id: 3,
type: 'consumer.crash',
payload: {
error: KafkaJSNonRetriableError
Caused by: KafkaJSConnectionError: Connection error: connect ECONNREFUSED ::1:29092
at Socket.onError (/Users/rickc/dev/project/node_modules/kafkajs/src/network/connection.js:152:23)
at Socket.emit (node:events:390:28)
at Socket.emit (node:domain:475:12)
at emitErrorNT (node:internal/streams/destroy:164:8)
at emitErrorCloseNT (node:internal/streams/destroy:129:3)
at processTicksAndRejections (node:internal/process/task_queues:83:21) {
name: 'KafkaJSNumberOfRetriesExceeded',
retriable: false,
helpUrl: undefined,
originalError: [KafkaJSConnectionError],
retryCount: 5,
retryTime: 13530
},
restart: true
},
timestamp: 1645543176763
}
The consumer then restarts.
So - am I seeing expected behaviour on 1.16.0? Thanks for your time π
Thanks for the detailed information. No, I would say that's not intended behavior, since a KafkaJSConnectionError should be a retriable error, so I would expect that the consumer restarts in that case.
Looking at the message from the crash listener, I wonder if this is a case of nested retriers somewhere, because the error that gets thrown is a KafkaJSNonRetriableError
, which is expected when a retrier encounters an error that's not retriable, but the originalError
is a KafkaJSNumberOfRetriesExceeded
error, not a KafkaJSConnectionError
- which is what I would have expected to see.
@Nevon Thanks again.
I've done some further testing today and have found that the issue in 1.16.0 seems to be intermittent to some extent.
I replicated the same steps as mentioned in my last post:
First time I got the same result and logs as before - leaving the consumer in a crashed state with no restart.
Second time I got a different result. Here are the logs:
ERROR [2022-02-23T09:11:22.639Z]: Failed to connect to seed broker, trying another broker from the list: Connection error: connect ECONNREFUSED ::1:29092
retryCount: 5
retryTime: 11396
code: "kafkajs"
namespace: "BrokerPool"
ERROR [2022-02-23T09:11:22.640Z]: Crash: KafkaJSNumberOfRetriesExceeded: Connection error: connect ECONNREFUSED ::1:29092
retryCount: 5
stack: "KafkaJSNonRetriableError\n Caused by: KafkaJSConnectionError: Connection error: connect ECONNREFUSED ::1:29092\n at Socket.onError (/Users/rickc/dev/project/node_modules/kafkajs/src/network/connection.js:149:23)\n at Socket.emit (node:events:390:28)\n at Socket.emit (node:domain:475:12)\n at emitErrorNT (node:internal/streams/destroy:164:8)\n at emitErrorCloseNT (node:internal/streams/destroy:129:3)\n at processTicksAndRejections (node:internal/process/task_queues:83:21)"
code: "kafkajs"
namespace: "Consumer"
It then went into restartOnFailure
which returned true, here's a log from within that function:
INFO [2022-02-23T09:11:22.640Z]: Assessing kafkajs error to determine whether consumer should restart.
namespace: "Consumer"
sourceError: {
"name": "KafkaJSNumberOfRetriesExceeded",
"retriable": false,
"originalError": {
"name": "KafkaJSConnectionError",
"retriable": true,
"broker": "localhost:29092",
"code": "ECONNREFUSED"
},
"retryCount": 5,
"retryTime": 11396
}
Because we returned true it then crashes with a restartable error then then restarts:
--in CRASH---> {
id: 1,
type: 'consumer.crash',
payload: {
error: KafkaJSNonRetriableError
Caused by: KafkaJSConnectionError: Connection error: connect ECONNREFUSED ::1:29092
at Socket.onError (/Users/rickc/dev/project/node_modules/kafkajs/src/network/connection.js:149:23)
at Socket.emit (node:events:390:28)
at Socket.emit (node:domain:475:12)
at emitErrorNT (node:internal/streams/destroy:164:8)
at emitErrorCloseNT (node:internal/streams/destroy:129:3)
at processTicksAndRejections (node:internal/process/task_queues:83:21) {
name: 'KafkaJSNumberOfRetriesExceeded',
retriable: false,
helpUrl: undefined,
originalError: [KafkaJSConnectionError],
retryCount: 5,
retryTime: 11396
},
restart: true
},
timestamp: 1645607482640
}
So - it looks like somehow, sometimes the originalError is KafkaJSConnectionError
and sometimes it's KafkaJSNumberOfRetriesExceeded
π€ I'm not that familiar with the codebase so not sure what might be causing that. Some kind of race condition perhaps?
I've had this exact same issue after updating to 1.16. Downgrading back to 1.15 seems to have fixed the issue for me, I'm running some tests and will update this as soon I'm done.
Edit (01/03/2022): It was indeed the version.
We also had this problem with 1.16 though I could not reproduce locally with docker-compose. In our k8s cluster, sometimes the consumers stop restarting and become unable to receive messages. After downgrading to 1.15 it started working again. Maybe it is some edge case of an specific error that is not handled well. But digging through the logs we did not find anything unusual, except some connection errors that should not cause this issue.
Could those affected by this try https://github.com/tulios/kafkajs/tree/fix-retries-1299 and let me know if it fixes things? Since it's very difficult to reproduce this in a reliable way, I'm not 100% confident that this solves it, but based on the logs I believe it will.
So has anyone tried my potential fix above yet, so I can know if it actually fixes things?
@Nevon hey! Silly question maybe, but what in your view is the best way to pull your branch into an existing project that uses kafkajs in order to test it?
npm install tulios/kafkajs#fix-retries-1299 --save
/ yarn add tulios/kafkajs#fix-retries-1299
will do it.
Thanks - am making a ticket in our backlog to do some testing on this and will try to get it played ASAP
We also have a backlog item to test this on our k8s environment. We will probably need to leave it running a couple days to confirm the fix.
Checking back in, as I've got some time for kafkajs this week. Anyone tried it out yet?
Checking back in, as I've got some time for kafkajs this week. Anyone tried it out yet?
I've been on leave for a couple of weeks unfortunately. Will be aiming to get our backlog ticket related to this into our next sprint. It's top of our backlog.
I am experiencing this problem as well with 1.16.
My setup is like this:
restartOnFailure
(as it did in 1.15)restartOnFailure
(and keep going through it if i return true
), some (most of them) never do as non-retriable error is raised, confirming an intermittent behaviorTesting https://github.com/tulios/kafkajs/tree/fix-retries-1299 i see a consistent behavior, all consumers hit restartOnFailure
, so that the condition can be handled by my code.
Should we expect a fix in 1.16.1 or is it going straight in 1.17?
@Nevon Hi - finally had a chance to test this for you. I agree with @uwburn - I am seeing consistent behaviour with your changes π Tested over 10 times. Thanks for your help!
Merged the fix. It'll go out in v2.0.0, which I'm planning to ship this week, if all goes well.
Describe the bug Following changes in v1.16.0 we've experienced a breaking change within our application relating to https://github.com/tulios/kafkajs/pull/1274. I'm opening this issue to: 1) check my understanding of what is happening 2) check whether the v1.16.0 approach is likely to change 3) determine what changes I may need to make in our application
1.15.0 / old behaviour
At v1.15.0 and below,
KafkaJSNumberOfRetriesExceeded
was classed as being an error that would cause the consumer to restart. Potentially, this could lead to an infinite restart loop. However, my understanding was that therestartOnFailure
option was available to allow users to customise that behaviour to the needs of the application, per the docs here: https://kafka.js.org/docs/configuration#restartonfailure.This is what we have done in our application. In our
restartOnFailure
, we specifically watch forKafkaJSNumberOfRetriesExceeded
and we only allow the consumer to restart a specific number of times based on this error before we then gracefully shutdown and exit.1.16.0 / new behaviour
In v1.16.0 this no longer works.
KafkaJSNumberOfRetriesExceeded
leads directly to a consumer crash and does not trigger therestartOnFailure
method. Based on this I have the following questions:restartOnFailure
no longer allows customisation of behaviour forKafkaJSNumberOfRetriesExceeded
and will this change be retained? Don't want to modify our consumers to work with v1.16.0 if it might change again in the near future!restartOnFailure
still valid? https://kafka.js.org/docs/configuration#restartonfailure I believe it's saying specifically that consumers will restart onKafkaJSNumberOfRetriesExceeded
but that is no longer the case π€KafkaJSNumberOfRetriesExceeded
would lead to a restart because a restart would potentially resolve whatever was causing the retries to fail. Is that not the case? What is the benefit of a restart compared to retry?KafkaJSNumberOfRetriesExceeded
would now be expected to be in a consumer crash listener? i.e. we'd need to "manually" restart rather than using the consumer's built in restart/restartOnFailure behaviour.Thanks for the awesome library and I hope the above makes sense π
Environment: