knative-extensions / eventing-kafka

Kafka integrations with Knative Eventing.
Apache License 2.0
77 stars 82 forks source link

[consolidated] "cannot map host to channel" errors in kafka-ch-dispatcher even when KafkaChannels Ready #1141

Closed maschmid closed 2 years ago

maschmid commented 2 years ago

Describe the bug On one of my test runs, the kafka-ch-dispatcher apparently stopped working, replying a 404 to any attempt of sending events, only logging:

{"level":"info","ts":"2022-03-21T15:48:07.155Z","logger":"kafkachannel-dispatcher","caller":"channel/message_receiver.go:169","msg":"cannot map host to channel: broker-kne-trigger-kn-channel.kc-broker-v1-dlq.svc.cluster.local","knative.dev/pod":"kafka-ch-dispatcher-5f459b6fd5-ttbnz"}

despite the broker-kne-trigger KafkaChannel being Ready

Restarting the kafka-ch-dispatcher makes it working correctly again.

It seems to me as if the KafkaChannel Informer just stopped informing the kafka-ch-dispatcher about KafkaChannels, but I cannot prove this yet.

Expected behavior kafka-ch-dispatcher should not stop accepting events

To Reproduce Unknown, the cluster was used to run eventing and eventing-kafka e2e tests, they passed successfully on the cluster, before starting to produce this broken behavior.

Knative release version 1.0.0

Additional context Installed via OpenShift Serverless operator 1.21 (dev build), on OCP 4.6

SIGQUIT the kafka-ch-dispatcher to see the goroutine dump: https://gist.github.com/maschmid/674db98a89a438828c8450be461474e8

maschmid commented 2 years ago

kafka-ch-dispatcher logs https://gist.github.com/maschmid/2ac108e707e7a2ac66c17304ca4128dc kafka-ch-controller logs https://gist.github.com/maschmid/8db67562724070b947f15c8e1541285c

maschmid commented 2 years ago

This bit looks suspicious to me, could it be that the Close hangs and thus blocks the informer forever?

knative.dev/eventing-kafka/pkg/common/consumer.(*customConsumerGroup).Close(0xc0009fe960)
    /opt/app-root/src/go/src/knative.dev/eventing-kafka/pkg/common/consumer/consumer_factory.go:63 +0x35
knative.dev/eventing-kafka/pkg/channel/consolidated/dispatcher.(*KafkaDispatcher).unsubscribe(0xc0000fcfa0, {{0xc0008d5d40, 0xc0009f0680}, {0xc0008d5d20, 0xc0009f0680}}, {{0xc0006b5050, 0x24}, {0xc000a73830, 0x0, 0x0, ...}})
    /opt/app-root/src/go/src/knative.dev/eventing-kafka/pkg/channel/consolidated/dispatcher/dispatcher.go:350 +0x5a2
knative.dev/eventing-kafka/pkg/channel/consolidated/dispatcher.(*KafkaDispatcher).CleanupChannel(0xc0000fcfa0, {0xc0008d5d20, 0x8}, {0xc0008d5d40, 0x10}, {0xc00094d287, 0xc0004ab088})
    /opt/app-root/src/go/src/knative.dev/eventing-kafka/pkg/channel/consolidated/dispatcher/dispatcher.go:268 +0x31b
knative.dev/eventing-kafka/pkg/channel/consolidated/reconciler/dispatcher.(*Reconciler).CleanupChannel(...)
    /opt/app-root/src/go/src/knative.dev/eventing-kafka/pkg/channel/consolidated/reconciler/dispatcher/kafkachannel.go:197
knative.dev/eventing-kafka/pkg/channel/consolidated/reconciler/dispatcher.NewController.func2({0x1c5be40, 0xc000d4e380})
    /opt/app-root/src/go/src/knative.dev/eventing-kafka/pkg/channel/consolidated/reconciler/dispatcher/kafkachannel.go:139 +0x90
k8s.io/client-go/tools/cache.ResourceEventHandlerFuncs.OnDelete(...)
    /opt/app-root/src/go/src/knative.dev/eventing-kafka/vendor/k8s.io/client-go/tools/cache/controller.go:245
k8s.io/client-go/tools/cache.FilteringResourceEventHandler.OnDelete({0xc00026b380, {0x1fb3740, 0xc00066ca80}}, {0x1c5be40, 0xc000d4e380})
    /opt/app-root/src/go/src/knative.dev/eventing-kafka/vendor/k8s.io/client-go/tools/cache/controller.go:288 +0x64
k8s.io/client-go/tools/cache.(*processorListener).run.func1()
    /opt/app-root/src/go/src/knative.dev/eventing-kafka/vendor/k8s.io/client-go/tools/cache/shared_informer.go:779 +0xdf
k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1(0x7f77c548d1d8)
    /opt/app-root/src/go/src/knative.dev/eventing-kafka/vendor/k8s.io/apimachinery/pkg/util/wait/wait.go:155 +0x67
k8s.io/apimachinery/pkg/util/wait.BackoffUntil(0xc00011d738, {0x1f83200, 0xc0009063c0}, 0x1, 0xc000594720)
    /opt/app-root/src/go/src/knative.dev/eventing-kafka/vendor/k8s.io/apimachinery/pkg/util/wait/wait.go:156 +0xb6
k8s.io/apimachinery/pkg/util/wait.JitterUntil(0x0, 0x3b9aca00, 0x0, 0x0, 0xc00011d788)
    /opt/app-root/src/go/src/knative.dev/eventing-kafka/vendor/k8s.io/apimachinery/pkg/util/wait/wait.go:133 +0x89
k8s.io/apimachinery/pkg/util/wait.Until(...)
    /opt/app-root/src/go/src/knative.dev/eventing-kafka/vendor/k8s.io/apimachinery/pkg/util/wait/wait.go:90
k8s.io/client-go/tools/cache.(*processorListener).run(0xc0004c0200)
    /opt/app-root/src/go/src/knative.dev/eventing-kafka/vendor/k8s.io/client-go/tools/cache/shared_informer.go:771 +0x6b
k8s.io/apimachinery/pkg/util/wait.(*Group).Start.func1()
    /opt/app-root/src/go/src/knative.dev/eventing-kafka/vendor/k8s.io/apimachinery/pkg/util/wait/wait.go:73 +0x5a
created by k8s.io/apimachinery/pkg/util/wait.(*Group).Start
    /opt/app-root/src/go/src/knative.dev/eventing-kafka/vendor/k8s.io/apimachinery/pkg/util/wait/wait.go:71 +0x88
maschmid commented 2 years ago

Note the errors in the logs

{"level":"info","ts":"2022-03-21T12:08:21.841Z","logger":"kafkachannel-dispatcher","caller":"dispatcher/dispatcher.go:329","msg":"Unsubscribing from channel","knative.dev/pod":"kafka-ch-dispatcher-5f459b6fd5-ttbnz","channel":"serverless-tests/smoke-kc","subscription":"278a6cb5-4e6b-4b32-8a6a-d1f2f1ec8089"}
{"level":"error","ts":"2022-03-21T12:09:20.890Z","logger":"kafkachannel-dispatcher","caller":"consumer/consumer_factory.go:111","msg":"error while checking if offsets are initialized","knative.dev/pod":"kafka-ch-dispatcher-5f459b6fd5-ttbnz","knative.dev/controller":"knative.dev.eventing-kafka.pkg.channel.consolidated.reconciler.dispatcher.Reconciler","knative.dev/kind":"messaging.knative.dev.KafkaChannel","knative.dev/traceid":"2f082ef8-6723-4db3-baf1-f9cfdf2db9cb","knative.dev/key":"serverless-tests/smoke-kc","topics":["knative-messaging-kafka.serverless-tests.smoke-kc"],"groupId":"kafka.serverless-tests.smoke-kc.278a6cb5-4e6b-4b32-8a6a-d1f2f1ec8089","channel":"serverless-tests/smoke-kc","error":"failed to check if offsets are initialized timed out waiting for the condition","stacktrace":"knative.dev/eventing-kafka/pkg/common/consumer.kafkaConsumerGroupFactoryImpl.startExistingConsumerGroup.func1\n\t/opt/app-root/src/go/src/knative.dev/eventing-kafka/pkg/common/consumer/consumer_factory.go:111"}
{"level":"info","ts":"2022-03-21T12:09:20.890Z","logger":"kafkachannel-dispatcher","caller":"controller/controller.go:550","msg":"Reconcile succeeded","knative.dev/pod":"kafka-ch-dispatcher-5f459b6fd5-ttbnz","knative.dev/controller":"knative.dev.eventing-kafka.pkg.channel.consolidated.reconciler.dispatcher.Reconciler","knative.dev/kind":"messaging.knative.dev.KafkaChannel","knative.dev/traceid":"6e6b837a-b671-4049-8684-92fc0ac5e2a9","knative.dev/key":"serverless-tests/smoke-kc","duration":0.000031326}
{"level":"warn","ts":"2022-03-21T12:09:20.890Z","logger":"kafkachannel-dispatcher","caller":"dispatcher/dispatcher.go:314","msg":"Error in consumer group","knative.dev/pod":"kafka-ch-dispatcher-5f459b6fd5-ttbnz","error":"failed to check if offsets are initialized timed out waiting for the condition"}

Mostly just a guess, but we're possibly waiting for

https://github.com/knative-sandbox/eventing-kafka/blob/release-1.0/pkg/common/consumer/consumer_factory.go#L63

but that won't come as we've returned (due to "error while checking if offsets are initialized") here:

https://github.com/knative-sandbox/eventing-kafka/blob/release-1.0/pkg/common/consumer/consumer_factory.go#L115

thus never sending to releasedCh as we would here? https://github.com/knative-sandbox/eventing-kafka/blob/release-1.0/pkg/common/consumer/consumer_factory.go#L122

github-actions[bot] commented 2 years ago

This issue is stale because it has been open for 90 days with no activity. It will automatically close after 30 more days of inactivity. Reopen the issue with /reopen. Mark the issue as fresh by adding the comment /remove-lifecycle stale.