confluentinc / confluent-kafka-dotnet

Confluent's Apache Kafka .NET client
https://github.com/confluentinc/confluent-kafka-dotnet/wiki
Apache License 2.0
91 stars 869 forks source link

Consumer subscribers are not being called. #1832

Open niemyjski opened 2 years ago

niemyjski commented 2 years ago

Description

We've been writing a Foundatio message bus implementation around Kafka and noticed that our tests are extremely flakey in some cases (https://github.com/FoundatioFx/Foundatio.Kafka/actions all test failures). The commonality so far is when we have multiple consumers listening to the same topic, the consumers are never notified of a topic message. 1.9.0 helped a lot with reliability locally but still get failures at random. I have a 5900x with a lot of resources locally compared to the build server.

How to reproduce

  1. Clone https://github.com/FoundatioFx/Foundatio.Kafka
  2. Run docker compose up in the cloned folder.
  3. Open the solution and run dotnet test.

The test KafkaMessageBusTests.CanSendMessageToMultipleSubscribersAsync seems to be the test most easily to reproduce this error (after a few runs) and is the simplest.

[Fact]
public override async Task CanSendMessageToMultipleSubscribersAsync() {
    var messageBus = GetMessageBus();
    if (messageBus == null)
        return;

    try {
        var countdown = new AsyncCountdownEvent(3);
        await messageBus.SubscribeAsync<SimpleMessageA>(msg => {
            Assert.Equal("Hello", msg.Data);
            countdown.Signal();
        });
        await messageBus.SubscribeAsync<SimpleMessageA>(msg => {
            Assert.Equal("Hello", msg.Data);
            countdown.Signal();
        });
        await messageBus.SubscribeAsync<SimpleMessageA>(msg => {
            Assert.Equal("Hello", msg.Data);
            countdown.Signal();
        });
        await messageBus.PublishAsync(new SimpleMessageA {
            Data = "Hello"
        });

        await countdown.WaitAsync(TimeSpan.FromSeconds(2));
        Assert.Equal(0, countdown.CurrentCount);
    } finally {
        await CleanupMessageBusAsync(messageBus);
    }
}

Under the hood, each call to subscribe will ensure topic exists, then create a consumer subscriber listening in a loop, only if an existing listener isn't already running (at most one listener per bus instance). I've included logs of varying detail.

Checklist

Please provide the following information:

Test logs with handlers not commented out (https://github.com/FoundatioFx/Foundatio.Kafka/blob/main/src/Foundatio.Kafka/Messaging/KafkaMessageBus.cs#L167-L173)

I saw a similar issue and I wondered if I should not be using these handlers (https://github.com/ah-/rdkafka-dotnet/issues/61). Upon googling I came across this which I didn't know if it was similar (https://github.com/confluentinc/confluent-kafka-python/issues/970)

See the following gist for all the unit logs (both passing and failing with variying levels of debug logs). GitHub wouldn't let me post it here as it said the commend was too long: https://gist.github.com/niemyjski/bac539002aa046738d6e029d0d1ba688

Broker logs from recent failure

kafka                   | [2022-06-08 20:11:53,310] INFO Creating topic test_1ab960c172c84b5caf31c969d87c5a4f with configuration {} and initial partition assignment Map(0 -> ArrayBuffer(1)) (kafka.zk.AdminZkClient)
kafka                   | [2022-06-08 20:11:53,319] INFO [Controller id=1] New topics: [Set(test_1ab960c172c84b5caf31c969d87c5a4f)], deleted topics: [Set()], new partition replica assignment [Set(TopicIdReplicaAssignment(test_1ab960c172c84b5caf31c969d87c5a4f,Some(CJssgWbAThWmPO9sa96mYg),Map(test_1ab960c172c84b5caf31c969d87c5a4f-0 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=))))] (kafka.controller.KafkaController)
kafka                   | [2022-06-08 20:11:53,319] INFO [Controller id=1] New partition creation callback for test_1ab960c172c84b5caf31c969d87c5a4f-0 (kafka.controller.KafkaController)
kafka                   | [2022-06-08 20:11:53,319] INFO [Controller id=1 epoch=1] Changed partition test_1ab960c172c84b5caf31c969d87c5a4f-0 state from NonExistentPartition to NewPartition with assigned replicas 1 (state.change.logger)
kafka                   | [2022-06-08 20:11:53,319] INFO [Controller id=1 epoch=1] Sending UpdateMetadata request to brokers Set() for 0 partitions (state.change.logger)
kafka                   | [2022-06-08 20:11:53,320] INFO [Controller id=1 epoch=1] Sending UpdateMetadata request to brokers Set() for 0 partitions (state.change.logger)
kafka                   | [2022-06-08 20:11:53,328] INFO [Controller id=1 epoch=1] Changed partition test_1ab960c172c84b5caf31c969d87c5a4f-0 from NewPartition to OnlinePartition with state LeaderAndIsr(leader=1, leaderEpoch=0, isr=List(1), leaderRecoveryState=RECOVERED, zkVersion=0) (state.change.logger)
kafka                   | [2022-06-08 20:11:53,328] INFO [Controller id=1 epoch=1] Sending LeaderAndIsr request to broker 1 with 1 become-leader and 0 become-follower partitions (state.change.logger)
kafka                   | [2022-06-08 20:11:53,328] INFO [Controller id=1 epoch=1] Sending UpdateMetadata request to brokers Set(1) for 1 partitions (state.change.logger)
kafka                   | [2022-06-08 20:11:53,328] INFO [Controller id=1 epoch=1] Sending UpdateMetadata request to brokers Set() for 0 partitions (state.change.logger)
kafka                   | [2022-06-08 20:11:53,328] INFO [Broker id=1] Handling LeaderAndIsr request correlationId 1317 from controller 1 for 1 partitions (state.change.logger)
kafka                   | [2022-06-08 20:11:53,329] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions Set(test_1ab960c172c84b5caf31c969d87c5a4f-0) (kafka.server.ReplicaFetcherManager)
kafka                   | [2022-06-08 20:11:53,329] INFO [Broker id=1] Stopped fetchers as part of LeaderAndIsr request correlationId 1317 from controller 1 epoch 1 as part of the become-leader transition for 1 partitions (state.change.logger)
kafka                   | [2022-06-08 20:11:53,330] INFO [LogLoader partition=test_1ab960c172c84b5caf31c969d87c5a4f-0, dir=/bitnami/kafka/data] Loading producer state till offset 0 with message format version 2 (kafka.log.UnifiedLog$)
kafka                   | [2022-06-08 20:11:53,331] INFO Created log for partition test_1ab960c172c84b5caf31c969d87c5a4f-0 in /bitnami/kafka/data/test_1ab960c172c84b5caf31c969d87c5a4f-0 with properties {} (kafka.log.LogManager)
kafka                   | [2022-06-08 20:11:53,331] INFO [Partition test_1ab960c172c84b5caf31c969d87c5a4f-0 broker=1] No checkpointed highwatermark is found for partition test_1ab960c172c84b5caf31c969d87c5a4f-0 (kafka.cluster.Partition)
kafka                   | [2022-06-08 20:11:53,331] INFO [Partition test_1ab960c172c84b5caf31c969d87c5a4f-0 broker=1] Log loaded for partition test_1ab960c172c84b5caf31c969d87c5a4f-0 with initial high watermark 0 (kafka.cluster.Partition)
kafka                   | [2022-06-08 20:11:53,331] INFO [Broker id=1] Leader test_1ab960c172c84b5caf31c969d87c5a4f-0 starts at leader epoch 0 from offset 0 with high watermark 0 ISR [1] addingReplicas [] removingReplicas []. Previous leader epoch was -1. (state.change.logger)
kafka                   | [2022-06-08 20:11:53,348] INFO [Broker id=1] Finished LeaderAndIsr request in 20ms correlationId 1317 from controller 1 for 1 partitions (state.change.logger)
kafka                   | [2022-06-08 20:11:53,349] INFO [Broker id=1] Add 1 partitions and deleted 0 partitions from metadata cache in response to UpdateMetadata request sent by controller 1 epoch 1 with correlation id 1318 (state.change.logger)
niemyjski commented 2 years ago

@edenhill this has me at a loss as the logs don't get give much to go off of. From the logs, it seems like it's something to do with Group assignment may not be happening so the subscriber doesn't get notified, or an issue with topic metadata propagation? I'm still a huge kafka noob but any and all advice would be greatly appreciated.

niemyjski commented 2 years ago

One of the highly concurrent tests just failed with:

20:20.45457 E:KafkaMessageBus - Error committing message test_5b929ca603a24f2f954b91c796547eeb [[0]] @56: Broker: Unknown topic or partition
20:20.45457 E:KafkaMessageBus - Error committing message test_5b929ca603a24f2f954b91c796547eeb [[0]] @56: Broker: Unknown topic or partition
20:20.45457 E:KafkaMessageBus - Error committing message test_5b929ca603a24f2f954b91c796547eeb [[0]] @57: Broker: Unknown topic or partition
20:20.45457 E:KafkaMessageBus - Error committing message test_5b929ca603a24f2f954b91c796547eeb [[0]] @51: Broker: Unknown topic or partition
20:20.87403 E:KafkaMessageBus - Error consuming test_5b929ca603a24f2f954b91c796547eeb GroupId=44b59d379c0b4de28ee49f6d142eb630 message: Failed to query logical offset END: Broker: Unknown topic or partition
20:20.88929 E:KafkaMessageBus - Error consuming test_5b929ca603a24f2f954b91c796547eeb GroupId=4ac196fe056b4c49bedc0d24a774747f message: Failed to query logical offset END: Broker: Unknown topic or partition
mhowlett commented 2 years ago

Under the hood, each call to subscribe will ensure topic exists

metadata doesn't propagate synchronously. my first thought is what you're seeing may in some way be due to this (given the error in the previous comment). if you try setting some arbitrary delay after topic creation (maybe 2s or something), does this help?

niemyjski commented 2 years ago

@mhowlett I added a second delay and it helped but caused some test failures. I added a two second delay via referenced commit and it took a lot of runs before it failed locally. Two seconds on the build server caused an immediate failure.

If this is the issue, can we get a wait option when we call the create topic apis? If there is any chance you could debug this locally that would be a massive help. I'd be willing to meet up and work with you as well.

niemyjski commented 2 years ago

@mhowlett I still haven't been able to narrow this down even on 1.9.0

mhowlett commented 2 years ago

is this a single broker cluster?

niemyjski commented 2 years ago

Yes, full reproduction is available in the linked project above via simple clone and run. I did a single broker cluster for resource constraints for running on github actions and locally.

niemyjski commented 2 years ago

I made some changes to try out on mac arm (in latest commit), but it's way less stable than windows and seems like my broker becomes unresponsive locally even via kcat while running tests (might be related?).

mhowlett commented 2 years ago

getting Broker: Unknown topic or partition after you've waited for topic creation on a single broker cluster seems very odd

mhowlett commented 2 years ago

consistent with broker issue..

niemyjski commented 2 years ago

I agree, it's all very odd. I would think that single broker would be the most reliable for these tests as less communication and sync across (only single) brokers.

niemyjski commented 2 years ago

@mhowlett Do you think 1.9.1 will help or what are the next steps?

niemyjski commented 2 years ago

@mhowlett do you think any of my issues could be related to the issues you are running into with .NET 6?

mhowlett commented 2 years ago

no

mhowlett commented 2 years ago

i'm triaging this as investigate further/low. we appreciate the testing and there is some chance this may be reflective of an actual issue.

niemyjski commented 2 years ago

Thanks, I just tried the latest rc and still have failures and tests seems slower to run locally.

https://github.com/FoundatioFx/Foundatio.Kafka/actions/runs/3348053132

niemyjski commented 2 years ago

@mhowlett were you able to figure out if this is apart of a larger issue?

niemyjski commented 1 year ago

Latest 2.0.2 release seems better but still getting failures

43:46.62225 E:KafkaMessageBus - Error consuming test_ef3b4c5799cd43a5b174f2ed3aee1d43 GroupId=5e9f8df1a6604840bc4ed9642045cb43 message: Failed to query logical offset END: Broker: Unknown topic or partition 43:46.62443 E:KafkaMessageBus - Error consuming test_ef3b4c5799cd43a5b174f2ed3aee1d43 GroupId=0843dbcf82254112bf558efef3a773f4 message: Failed to query logical offset END: Broker: Unknown topic or partition 43:46.62447 E:KafkaMessageBus - Error consuming test_ef3b4c5799cd43a5b174f2ed3aee1d43 GroupId=51da660955af4f0aabf1a3838550b0b1 message: Failed to query logical offset END: Broker: Unknown topic or partition 43:46.62448 E:KafkaMessageBus - Error consuming test_ef3b4c5799cd43a5b174f2ed3aee1d43 GroupId=9dee71745289471aadb14bf5ccc874bc message: Failed to query logical offset END: Broker: Unknown topic or partition 43:46.62449 E:KafkaMessageBus - Error consuming test_ef3b4c5799cd43a5b174f2ed3aee1d43 GroupId=468a61b9b37641b092f930e799057293 message: Failed to query logical offset END: Broker: Unknown topic or partition 43:46.62450 E:KafkaMessageBus - Error consuming test_ef3b4c5799cd43a5b174f2ed3aee1d43 GroupId=d65ac7ebe61841558e02e80385800493 message: Failed to query logical offset END: Broker: Unknown topic or partition 43:46.92477 E:KafkaMessageBus - Error consuming test_ef3b4c5799cd43a5b174f2ed3aee1d43 GroupId=d65ac7ebe61841558e02e80385800493 message: Failed to query logical offset END: Broker: Unknown topic or partition

niemyjski commented 1 week ago

Just bumping to see if there is any idea what might be causing this. The following pr is tracking every new release and still has this issue: https://github.com/FoundatioFx/Foundatio.Kafka/pull/8