confluentinc / confluent-kafka-dotnet

Confluent's Apache Kafka .NET client
https://github.com/confluentinc/confluent-kafka-dotnet/wiki
Apache License 2.0
64 stars 860 forks source link

Getting Error "Unknown topic or partition" after upgrading to 1.5.0 from consumer #1366

Open Georgio123 opened 4 years ago

Georgio123 commented 4 years ago

Description

When there are no messages for that topic and the consumer starts first, we are getting error "Unknown topic or partition" from consumer.Consume(). Downgrading to 1.4.4 works as the consumer creates the topic if it does not exist.

This creates issues as we start the prod and cons and there no messages yet. Is this the new behavior going forward or is a side effect?

How to reproduce

Start consumer first with no topic, version 1.5.0

Checklist

Please provide the following information:

mhowlett commented 4 years ago

that is an explicit change: https://github.com/edenhill/librdkafka/releases you can re-enable with AllowAutoCreateTopics.

Georgio123 commented 4 years ago

thank you

Georgio123 commented 4 years ago

I prematurely spoke before testing it. I set AllowAutoCreateTopics=true in the ConsumerConfig but still get the error "Unknown topic or partition". The topics are created though this time.

How can I avoid getting "Unknown topic or partition" at consumer.Consume?

Thanks

mhowlett commented 4 years ago

i would need to look into this.

if you ignore the error and continue to call consume, do you get messages? if so do that.

Georgio123 commented 4 years ago

If I ignore it, I get another "Unknown topic or partition", topics are created and then get messages. Sleeping before subscribe makes no difference ie still get topic error.

I did a workaround where before the subscribe, I use AdminClient to create the topics if not there, thread.sleep for few secs and then subscribe. At the beginning, I did not have thread sleep and got the "Unknown topic or partition". Adding the sleep, all is clean and get messages.

ThomasTosik commented 4 years ago

Our story is similiar.

Now we also think about creating the topics somewhere else (with AdminClient etc) so our consumers do not get this error anymore. The topics are created by the producers though, when they produce something.

mhowlett commented 4 years ago

here's the related kip: https://cwiki.apache.org/confluence/display/KAFKA/KIP-487%3A+Client-side+Automatic+Topic+Creation+on+Producer#:~:text=Kafka%20has%20a%20feature%20that,broker%20enables%20topic%20auto%20creation.

In the early days of Kafka, this was one of only a few ways to create topics. However, with the addition of AdminClient, this functionality is no longer the recommended way to create topics.

so generally, the recommendation is to not rely on auto topic creation.

... a consume request should not necessarily create a topic, since a consumer can retry until the topic is created and can actually consume.

so from the above comments, it seems the current implementation in librdkafka may not behave as it ideally should if the topic does not exist. I will need to check all of this to have an opinion on what if anything should be changed (todo).

wwarby commented 4 years ago

Not sure if mine is the same issue or related in any way, but I've just run into massive instability after upgrading some high throughput C# apps from v 1.4.4 to 1.5.0. In my case the error thrown is

dotnet: rdkafka_roundrobin_assignor.c:97: rd_kafka_roundrobin_assignor_assign_cb: Assertion `next < rd_list_cnt(&eligible_topic-> members)' failed

... and results in an all out application crash. Rolled back to 1.4.4 and it solved the problem.

In my case, I'm consuming data from topics that already exist and I don't have topic auto-creation enabled - I'm running multiple instances of the C# app, each instantiating a single consumer to join the consumer group.

mhowlett commented 4 years ago

thanks @wwarby - that's not good, opening as a new issue.

derekgreer commented 4 years ago

I'm seeing the same issue. The change in auto-create behavior didn't catch me off guard as I saw the release notes beforehand and went about configuring my consumer to enable auto creation. What I'm seeing, however, is that it kind of works. That is to say, given the broker is configured to allow auto-creation of topics and the consumer has allow.auto.create.topics set to true, upon calling consume the first time, it throws an exception, but if you try the second time it works. This definitely seems like a defect. Here is my code:

                   var allowUnknownTopic = true;

                    while (!ct.IsCancellationRequested)
                    {
                        try
                        {
                            var consumeResult = _consumer.Consume(ct);
                            if (consumeResult != null)
                                ConsumerOnMessage(consumeResult);
                        }
                        catch (Exception e)
                        {
                            // TODO This accounts for a kafka client bug where allow.auto.create.topics is true, but Consume still throws the first try
                            if ((_connectionSettings.AllowTopicAutoCreate.HasValue && _connectionSettings.AllowTopicAutoCreate.Value)
                                && allowUnknownTopic && !e.Message.Equals("Broker: Unknown topic or partition"))
                                throw;

                            allowUnknownTopic = false;
                        }
                    }
mhowlett commented 4 years ago

thanks @derekgreer - noted in #1369

Dujma commented 3 years ago

I'm still experiencing the same issue after upgrading to 1.5.2 with the latest version of https://hub.docker.com/r/confluentinc/cp-kafka/ running as Kafka broker. The topic itself gets created, but the exception is thrown. This only happens on the first run. I'm running the application on .Net Core 2.1. EDIT: AllowAutoCreateTopics is set to true

LiorBanaiKama commented 3 years ago

Same issue when using 1.5.2 It appears this version is not really tested.. I guess we need to go back to 1.3.X or 1.4.X

TheFireCookie commented 3 years ago

Same, we tried 1.5.2 and we, too, still have an error

Astaelan commented 3 years ago

I recently upgraded a service using 0.11.6 client, which was working fine. This service was using .NET Core 2.1, and is being upgraded to .NET Core 3.1, which created an issue with 0.11.6 package not finding librdkafka at runtime startup. We checked dependencies and noticed newer 1.x were brought up to be compliant with newer standard targets, so we thought this might resolve the lib problem. I upgraded it to 1.5.2, and the lib problem went away, but was unable to receive any messages. I tried including the AllowAutoCreateTopics as true, with no effect. The assigned partitions callback handler indicated we got partitions assigned (0) as the only consumer. But still we got no messages. So based on what was said in this ticket, I downgraded to 1.4.4, removed the option for auto create since it was no longer available, and still had the same problem. Partitions were assigned but no messages got through. The lib problem remains resolved in 1.4.4. From the message a couple above this, I decided to try downgrading again to 1.3.0 and now it works. Partitions assigned and I'm receiving messages. The lib problem remains resolved in 1.3.0 for us. It would seem there was some change after 1.3.0 that causes the Subscribe not to work correctly regardless of the auto creation of topics. In this particular case the topics already existed and it still didn't matter. I confirmed with the callback that I was getting partitions assigned in all cases the same way. If I can provide any additional information that will assist with this, please let me know, as we are currently stuck on 1.3.0 for this upgrade.

plachor commented 3 years ago

In our case after upgrade to 1.5.2 (with auto-create-disabled on consumer) we noticed that our integration tests fail to due to timeouts. I had not enough time to gather any concrete conclusions yet but will dig in too this topic and return with conclusions.

Just to narrow my search few questions:

thanks

mhowlett commented 3 years ago

is it possible that longer timeout (MessageTimeoutMs and enormous MessageSendMaxRetries) on Producer can be used by library to offload some additional work in background and thus delay topic creation on produce?

these settings are irrelevant here - topics are auto created as a result of metadata requests sent to the broker. the client will do a metadata request to discover information about any topic it needs to know about but doesn't yet have metadata for, as soon as it knows it needs to know this.

is it possible that multiple instances of consumer and producer in same process can affect (delay) creation of topic by production

no, they are independent.

will consumer instance receive this error only once per not-existing topic

I'm quite sure so, but some chance I mis-read the code.

will consumer instance after receiving this error about "Unknown topic or partition" eventually receive messages once topic is created or we need to perform manual re-subscription after some time

yes, it should automatically receive messages, and I just verified this behavior.


I did a quick test of subscribing to a non-existent topic with AllowAutoCreateTopics set to true. The initial metadata request before the join (which instructs Kafka to create the topic), responds with 'Broker: Leader not available' for the partition of interest. That is reasonable - topic creation is asynchronous, involves coordinating multiple brokers - the metadata request shouldn't block on it. From that, I think librdkafka via here: https://github.com/edenhill/librdkafka/blob/v1.5.2/src/rdkafka_cgrp.c#L3656 is generating a 'Unknown topic or partition' broker error here: https://github.com/edenhill/librdkafka/blob/v1.5.2/src/rdkafka_cgrp.c#L3580 (I could be mis-reading this, but leader not available is somehow resulting in unknown topic or partition), and that error is getting pushed to the user. Librdkafka requests metadata again fairly soon after, the topic is seen, and all is good.

If i disable auto topic creation on the broker, instead of 'Broker: Leader not available', I see 'Broker: Unknown topic or partition' in the metadata response for the partition of interest - this also makes total sense. The client repeatedly tries to get metadata until the topic exists. We get a single 'Unknown topic or partition' broker error pushed to the user as in the other case.

I think this additional error notification to the user was put into librdkafka as a feature at the same time AllowAutoCreateTopics was added. I think this is inappropriately exposing "leader not available" metadata response to the user as a problem, where as it should just stay silent in that case.

This is harmless - it's just a notification that you're not expecting and is misleading.

Note: I don't think librdkafka should ever be bringing any broker errors into existence as it does here: https://github.com/edenhill/librdkafka/blob/v1.5.2/src/rdkafka_cgrp.c#L3581 - my assumption when seeing a broker error is that it originated from the broker and nothing has been meddled with. i should be able to rely on this - anything inventing of broker errors is doing more harm by taking away this guarantee than potentially being helpful.

Note 2: I don't really like this method of communicating errors to the application. I think (based on all considerations i'm aware of atm) that we should have a synchronous (awaitable) Subscribe which throws an exception on non-existent topic. This would be after some amount of retry logic internally. We should also retain the non-blocking subscribe, for users who want associated semantics (no notification of topic not exist).

plachor commented 3 years ago

Thanks for your answers, I'll need investigate my issues further.


And to your last note: we would like to preserve this current behavior where consumers can subscribe to non-existing topics (and receive messages once topic is created).

Our application consist of many instances that can consume any topic. But can only produce to one that they own. They know exact desired configuration for own topics (partitions, retention). That's why it is great for us that consumers without that knowledge will not create those topics with broker defaults.

Existing behavior does not require us to keep track of not-existing yet topics as consumers can subscribe them (other way around would add more complexity on our end). However maybe there should be flag indicating if not-existing topic should throw or not in consume loop (we can have many uninitialized topics).

edenhill commented 3 years ago

The initial metadata request before the join (which instructs Kafka to create the topic), responds with 'Broker: Leader not available' for the partition of interest.

If that propagates as an error (Unknown topic or partition) it is a bug, we should treat this as a temporary error which I'm quite sure the topic object code does, but maybe not the consumer group code?

mhowlett commented 3 years ago

@plachor :

And to your last note: we would like to preserve this current behavior where consumers can subscribe to non-existing topics (and receive messages once topic is created).

I believe this should work (and we have no intent on depreciating it). You will get an error, but if you ignore it, the consumer will pick up the new topic when it becomes available.

plachor commented 3 years ago

MY timeouts in 1.5.2 might be related to https://github.com/edenhill/librdkafka/issues/3159#issuecomment-736274668

jaco129 commented 3 years ago

I am also having this issue after upgrading to 1.6.x. However, unlike the behavior described above for 1.5.x the topic is never created and isn't returned in metadata until something else creates it. The consumer appears to not even attempt to send a create request when AllowAutoCreateTopics is set to true.

This happens in nuget 1.6.2 and 1.6.3. Downgrading to 1.5.3 results in the behavior described above with the error on initial subscription but the topic is still actually created.

Consumer Config:

client.software.name = confluent-kafka-dotnet
client.software.version = 1.6.1
metadata.broker.list = localhost:29092
metadata.max.age.ms = 1000
debug = topic,cgrp,fetch,consumer,conf
error_cb = 0x108313344
default_topic_conf = 0x7f8d27c38ee0
security.protocol = plaintext
group.id = ReadProjectionGroup
enable.auto.offset.store = false
allow.auto.create.topics = true
massada commented 3 years ago

@mhowlett

librdkafka tests for auto create topic seem to indicate that the error is expected and temporary. Shouldn't the client handle this?

https://github.com/edenhill/librdkafka/blob/4837e34bc5d173b10934e874adadf39f98dd2b94/tests/0109-auto_create_topics.cpp#L165

massada commented 3 years ago

Did a quick test using the 1.5.3 branch with the following changes to the driver.

In the Consumer constructor:

...

var allowAutoCreateTopicsString = config.FirstOrDefault(prop => prop.Key == "allow.auto.create.topics").Value;
if (allowAutoCreateTopicsString != null)
{
    this.allowAutoCreateTopics = bool.Parse(allowAutoCreateTopicsString);
}

...

In the Consume(int millisecondsTimeout) method:

...

var msg = Util.Marshal.PtrToStructure<rd_kafka_message>(msgPtr);

// Temporary unknown-topic errors are okay for auto-created topics
if (msg.err == ErrorCode.UnknownTopicOrPart && this.allowAutoCreateTopics)
{
    return null;
}

...

Enabled the auto create: ConsumerConfig.AllowAutoCreateTopics = true;

And all was good. Would this work as a fix?

stukselbax commented 3 years ago

It would be great if error message disappeared when it is the expected behavior. Will maintainers accepts the solution proposed by @massada?

nervgh commented 2 years ago

I have the same issue with node-rdkafka@2.11.0.

rubailovks commented 1 year ago

I'm seeing the same issue. The change in auto-create behavior didn't catch me off guard as I saw the release notes beforehand and went about configuring my consumer to enable auto creation. What I'm seeing, however, is that it kind of works. That is to say, given the broker is configured to allow auto-creation of topics and the consumer has allow.auto.create.topics set to true, upon calling consume the first time, it throws an exception, but if you try the second time it works. This definitely seems like a defect. Here is my code:

                   var allowUnknownTopic = true;

                    while (!ct.IsCancellationRequested)
                    {
                        try
                        {
                            var consumeResult = _consumer.Consume(ct);
                            if (consumeResult != null)
                                ConsumerOnMessage(consumeResult);
                        }
                        catch (Exception e)
                        {
                            // TODO This accounts for a kafka client bug where allow.auto.create.topics is true, but Consume still throws the first try
                            if ((_connectionSettings.AllowTopicAutoCreate.HasValue && _connectionSettings.AllowTopicAutoCreate.Value)
                                && allowUnknownTopic && !e.Message.Equals("Broker: Unknown topic or partition"))
                                throw;

                            allowUnknownTopic = false;
                        }
                    }

Hello. Set "TOPIC_CREATE_BACKOFF_MS" = "10000". Its work!

SleeplessChallenger commented 1 year ago

Hey, I had the same issue with Kafka in docker-compose. Use KAFKA_AUTO_CREATE_TOPICS_ENABLE=false

environment:
      - KAFKA_BROKER_ID=1
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      - KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT
      - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
      - KAFKA_AUTO_CREATE_TOPICS_ENABLE=false
anchitj commented 4 months ago

Is this still an issue?

ayeshawaheed7 commented 1 month ago

@anchitj, if you're using the confluentinc/cp-kafka image, please ensure it's updated to the latest version. This should resolve the issue.