dapr / components-contrib

Community driven, reusable components for distributed apps
Apache License 2.0
543 stars 470 forks source link

Kafka Subscriber stops subscribing and doesn't recover automatically #3264

Open olitomlinson opened 9 months ago

olitomlinson commented 9 months ago

Dapr runtime 1.12.2 on EKS

We have a Kafka PubSub subscriber, working fine, then we got the following 2 error message in the sidecar:

Immediately after, the following logs were output (which looks like kafka client is attempting to reinitialise)

time="2023-12-12T12:04:57.361666911Z" level=debug msg="consumer/broker/0 closed dead subscription to ds-applicationtenant-created/0\n" app_id=dbx component="general-purpose-pubsub (pubsub.kafka/v1)" instance=sciplat-svc-REDACTED-manager-deployment-859fd76f4c-ksgjg scope=dapr.contrib type=log ver=1.12.2
time="2023-12-12T12:04:57.364608263Z" level=debug msg="consumer/broker/1 closed dead subscription to ds-applicationuser-updated/0\n" app_id=dbx component="general-purpose-pubsub (pubsub.kafka/v1)" instance=sciplat-svc-REDACTED-manager-deployment-859fd76f4c-ksgjg scope=dapr.contrib type=log ver=1.12.2
time="2023-12-12T12:04:57.364694874Z" level=debug msg="consumergroup/session/sarama-9a9a9c70-d096-41a5-9ce2-d57c5d3c3b91/74 heartbeat loop stopped\n" app_id=dbx component="general-purpose-pubsub (pubsub.kafka/v1)" instance=sciplat-svc-REDACTED-manager-deployment-859fd76f4c-ksgjg scope=dapr.contrib type=log ver=1.12.2
time="2023-12-12T12:04:57.364714625Z" level=debug msg="consumergroup/session/sarama-9a9a9c70-d096-41a5-9ce2-d57c5d3c3b91/74 released\n" app_id=dbx component="general-purpose-pubsub (pubsub.kafka/v1)" instance=sciplat-svc-REDACTED-manager-deployment-859fd76f4c-ksgjg scope=dapr.contrib type=log ver=1.12.2
time="2023-12-12T12:04:57.364731435Z" level=debug msg="Starting loop to consume." app_id=dbx component="general-purpose-pubsub (pubsub.kafka/v1)" instance=sciplat-svc-REDACTED-manager-deployment-859fd76f4c-ksgjg scope=dapr.contrib type=log ver=1.12.2
time="2023-12-12T12:04:57.364793657Z" level=debug msg="client/metadata fetching metadata for [ds-applicationuser-updated] from broker REDACTED-kafka-kafka-0.REDACTED-kafka-kafka-brokers.dotm-services.svc:9092\n" app_id=dbx component="general-purpose-pubsub (pubsub.kafka/v1)" instance=sciplat-svc-REDACTED-manager-deployment-859fd76f4c-ksgjg scope=dapr.contrib type=log ver=1.12.2
time="2023-12-12T12:04:57.366867612Z" level=debug msg="client/metadata fetching metadata for [ds-applicationuser-updated] from broker REDACTED-kafka-kafka-0.REDACTED-kafka-kafka-brokers.dotm-services.svc:9092\n" app_id=dbx component="general-purpose-pubsub (pubsub.kafka/v1)" instance=sciplat-svc-REDACTED-manager-deployment-859fd76f4c-ksgjg scope=dapr.contrib type=log ver=1.12.2
time="2023-12-12T12:04:57.373229493Z" level=debug msg="client/coordinator requesting coordinator for ds-application.dbx from REDACTED-kafka-kafka-0.REDACTED-kafka-kafka-brokers.dotm-services.svc:9092\n" app_id=dbx component="general-purpose-pubsub (pubsub.kafka/v1)" instance=sciplat-svc-REDACTED-manager-deployment-859fd76f4c-ksgjg scope=dapr.contrib type=log ver=1.12.2
time="2023-12-12T12:04:57.37419421Z" level=debug msg="client/coordinator coordinator for ds-application.dbx is #2 (REDACTED-kafka-kafka-2.REDACTED-kafka-kafka-brokers.dotm-services.svc:9092)\n" app_id=dbx component="general-purpose-pubsub (pubsub.kafka/v1)" instance=sciplat-svc-REDACTED-manager-deployment-859fd76f4c-ksgjg scope=dapr.contrib type=log ver=1.12.2
time="2023-12-12T12:04:57.47748149Z" level=debug msg="consumer/broker/1 accumulated 1 new subscriptions\n" app_id=dbx component="general-purpose-pubsub (pubsub.kafka/v1)" instance=sciplat-svc-REDACTED-manager-deployment-859fd76f4c-ksgjg scope=dapr.contrib type=log ver=1.12.2
time="2023-12-12T12:04:57.477578492Z" level=debug msg="consumer/broker/1 added subscription to ds-applicationuser-updated/0\n" app_id=dbx component="general-purpose-pubsub (pubsub.kafka/v1)" instance=sciplat-svc-REDACTED-manager-deployment-859fd76f4c-ksgjg scope=dapr.contrib type=log ver=1.12.2
time="2023-12-12T12:14:42.452646345Z" level=debug msg="ClientID is the default of 'sarama', you should consider setting it to something application-specific." app_id=dbx component="general-purpose-pubsub (pubsub.kafka/v1)" instance=sciplat-svc-REDACTED-manager-deployment-859fd76f4c-ksgjg scope=dapr.contrib type=log ver=1.12.2
time="2023-12-12T12:14:42.452767447Z" level=debug msg="client/metadata fetching metadata for all topics from broker REDACTED-kafka-kafka-0.REDACTED-kafka-kafka-brokers.dotm-services.svc:9092\n" app_id=dbx component="general-purpose-pubsub (pubsub.kafka/v1)" instance=sciplat-svc-REDACTED-manager-deployment-859fd76f4c-ksgjg scope=dapr.contrib type=log ver=1.12.2
time="2023-12-12T12:14:42.456015284Z" level=debug msg="Connected to broker at REDACTED-kafka-kafka-0.REDACTED-kafka-kafka-brokers.dotm-services.svc:9092 (registered as #0)\n" app_id=dbx component="general-purpose-pubsub (pubsub.kafka/v1)" instance=sciplat-svc-REDACTED-manager-deployment-859fd76f4c-ksgjg scope=dapr.contrib type=log ver=1.12.2
time="2023-12-12T12:14:51.967077716Z" level=debug msg="client/metadata fetching metadata for all topics from broker REDACTED-kafka-kafka-0.REDACTED-kafka-kafka-brokers.dotm-services.svc:9092\n" app_id=dbx component="general-purpose-pubsub (pubsub.kafka/v1)" instance=sciplat-svc-REDACTED-manager-deployment-859fd76f4c-ksgjg scope=dapr.contrib type=log ver=1.12.2

So, given that nothing had happened by this point, we decided to restart the subscribing pod.

Success! - the message is consumed immediately, interestingly it had expired (not sure if this is a red herring though)

time="2023-12-12T12:23:51.437188279Z" level=info msg="dapr initialized. Status: Running. Init Elapsed 15267ms" app_id=dbx instance=sciplat-svc-REDACTED-manager-deployment-859fd76f4c-c949w scope=dapr.runtime type=log ver=1.12.2
time="2023-12-12T12:23:51.538416198Z" level=debug msg="consumer/broker/1 accumulated 1 new subscriptions\n" app_id=dbx component="general-purpose-pubsub (pubsub.kafka/v1)" instance=sciplat-svc-REDACTED-manager-deployment-859fd76f4c-c949w scope=dapr.contrib type=log ver=1.12.2
time="2023-12-12T12:23:51.538541168Z" level=debug msg="consumer/broker/1 added subscription to ds-applicationuser-updated/0\n" app_id=dbx component="general-purpose-pubsub (pubsub.kafka/v1)" instance=sciplat-svc-REDACTED-manager-deployment-859fd76f4c-c949w scope=dapr.contrib type=log ver=1.12.2
time="2023-12-12T12:23:51.538436258Z" level=debug msg="consumer/broker/0 accumulated 1 new subscriptions\n" app_id=dbx component="general-purpose-pubsub (pubsub.kafka/v1)" instance=sciplat-svc-REDACTED-manager-deployment-859fd76f4c-c949w scope=dapr.contrib type=log ver=1.12.2
time="2023-12-12T12:23:51.538727619Z" level=debug msg="consumer/broker/0 added subscription to ds-applicationtenant-created/0\n" app_id=dbx component="general-purpose-pubsub (pubsub.kafka/v1)" instance=sciplat-svc-REDACTED-manager-deployment-859fd76f4c-c949w scope=dapr.contrib type=log ver=1.12.2
time="2023-12-12T12:23:51.539936474Z" level=debug msg="Processing Kafka message: ds-applicationtenant-created/0/9 [key=]" app_id=dbx component="general-purpose-pubsub (pubsub.kafka/v1)" instance=sciplat-svc-REDACTED-manager-deployment-859fd76f4c-c949w scope=dapr.contrib type=log ver=1.12.2
time="2023-12-12T12:23:51.540069626Z" level=warning msg="dropping expired pub/sub event ede85cb8-9f54-4705-8420-4fca90f40466 as of 2023-12-12T12:21:09Z" app_id=dbx instance=sciplat-svc-REDACTED-manager-deployment-859fd76f4c-c949w scope=dapr.runtime.processor.pubsub type=log ver=1.12.2

So my question is, did the message expiring (via the TTL built into dapr Pubsub ttlInSeconds) cause the entire sidecar to stop consuming from that partition?

My initial thinking was yes, it must be related to TTL...however the processing error occurred at 12:04:57Z and the message didn't expire until 12:21:09Z So there is a 15 minute window there where the message had not yet expired, but the kafka client was not doing anything.

So if its not the TTL, what is the problem?

olitomlinson commented 9 months ago

Going back to the original 2 original log message

encountered a retriable error while publishing a subscribed message to topic tenant-created, err: Post \"http://127.0.0.1:8087/subscription/tenant-created\": context canceled : tracked down to here

Too many failed attempts at processing Kafka message: ds-applicationtenant-created/0/9 [key=]. Error: context canceled. : tracked down to here

What does context canceled mean as the detail here? It's not very helpful!

My guess is given the first log message, its trying to relay the message to the app channel, but the app channel is not available for some reason? maybe a connection timed out to the app channel?

So, assuming that the app channel goes away for some reason, maybe its under load / busy, and is unable to serve any more requests for a period of time, what is the recourse here?

My assumption is that if the app channel can't serve any more requests, and therefor the retries have exhausted, then there is little point of the sidecar remaining in the kafka ConsumerGroup. My guess is, It should gracefully eject its self from the consumerGroup, and then begin a process of attempting to rejoin, otherwise the sidecar just gets stuck with no further redelivery attempts.

Another assumption is that while all this is happening the Kafka client inside Dapr is still sending heartbeats to Kafka, even though it's not attempting any further redeliveries, hence why the sidecar hasn't been forcefully ejected from the consumer group by Kafka server.

ItalyPaleAle commented 9 months ago

Looks like there are a bunch of un-documented options for Kafka that allow configuring the retry behavior: https://github.com/dapr/components-contrib/blob/79adc565c17ad8936048896591cd205a6609ad67/common/component/kafka/kafka.go#L123-L128

I don't see this documented anywhere and it's unclear what the defaults are

LiamClarkeNZ commented 9 months ago

@ItalyPaleAle it defaults to true :)

https://github.com/dapr/components-contrib/blob/02c6b21ec6216ffce45d47cc1dbb1a6e674f370f/pubsub/kafka/kafka.go#L108

github-actions[bot] commented 8 months ago

This issue has been automatically marked as stale because it has not had activity in the last 30 days. It will be closed in the next 7 days unless it is tagged (pinned, good first issue, help wanted or triaged/resolved) or other activity occurs. Thank you for your contributions.

olitomlinson commented 8 months ago

do not close

berndverst commented 7 months ago

This issue is pervasive in all of our pubsub components. Once a consumer errors out.. nothing reinits that consumer.

https://github.com/dapr/components-contrib/blob/9acfcc16b81f9d19c47dc4572d0a00c4b24518eb/common/component/kafka/consumer.go#L352C1-L354C6

But the same is also true for Azure EventHubs https://github.com/dapr/components-contrib/issues/3325

olitomlinson commented 7 months ago

@berndverst Ouch!

At the very least if a consumer errors out I would expect the dapr process/sidecar to exit so that it can be recycled by whatever orchestration is in place (k8s restarting the pod, being the common example)

Having a stalled subscriber particularly for things like Kafka is very painful indeed. (more painful than a traditional transactional broker like Service Bus)

berndverst commented 7 months ago

@berndverst Ouch!

At the very least if a consumer errors out I would expect the dapr process/sidecar to exit so that it can be recycled by whatever orchestration is in place (k8s restarting the pod, being the common example)

Having a stalled subscriber particularly for things like Kafka is very painful indeed. (more painful than a traditional transactional broker like Service Bus)

Not sure what is going on there. Not a low hanging fruit after all. I will not be looking into this but contributions welcome.

https://github.com/dapr/components-contrib/blob/9acfcc16b81f9d19c47dc4572d0a00c4b24518eb/common/component/kafka/consumer.go#L341-L354

almogtavor commented 2 months ago

@berndverst Is there any plan to address this issue? A stalled subscriber makes the Kafka connector practically unusable in a production environment. If the consumer errors out and doesn't recover automatically it severely impacts reliability. At the very least, the sidecar process should have some os.Exit call when encountering a permanent error, allowing the pod to be restarted by Kubernetes. This would help maintain service availability until a permanent fix is implemented. Without this the Kafka connector cannot be considered production-ready.

I've also seen that Dapr's Kafka consumer uses Sarama, which has related issues https://github.com/IBM/sarama/issues/2621 https://github.com/IBM/sarama/issues/2682