confluentinc / confluent-kafka-dotnet

Confluent's Apache Kafka .NET client
https://github.com/confluentinc/confluent-kafka-dotnet/wiki
Apache License 2.0
90 stars 869 forks source link

Kafka Produce not throwing exception when unable to connect to kafka cluster #1025

Open shrutiwadnerkar opened 5 years ago

shrutiwadnerkar commented 5 years ago

Description

I am trying to get an exception when the producer isn't able to connect to kafka cluster. The code outputs the error into the console but I need an error code or exception for the same.

How to reproduce

ProducerConfig _configKafka = new ProducerConfig { BootstrapServers = "localhst:9092/" };
ProducerBuilder<string, string> _kafkaProducer = new ProducerBuilder<string, string>(_configKafka);
using (var kafkaProducer = _kafkaProducer.Build())
{
    try
    {
        var dr = await kafkaProducer.ProduceAsync("Kafka_Messages", new Message<string, string> { Key = null, Value = $"message " });
        Action<DeliveryReport<Null, string>> handler = r =>
                Console.WriteLine(!r.Error.IsError
                    ? $"Delivered message to {r.TopicPartitionOffset}"
                    : $"Delivery Error: {r.Error.Reason}");
    }
    catch (ProduceException<Null, string> e)
    {
        Console.WriteLine($"Delivery failed: {e.Error.Reason}");
    }
}

Checklist

Please provide the following information:

bartelink commented 5 years ago

DeliveryReport suggests you are using 0.11.x - I believe the v1 DLLs do throw, but v0 never has

In wrappers I use/maintain, I throw when using v0:

mhowlett commented 5 years ago

by default the producer will attempt to deliver messages for 5 minutes (default value of message.timeout.ms) before the produce request fails. you won't get an exception - even if all brokers are down - until that time has elapsed. you may want to configure that setting downwards.

ghost commented 4 years ago

HI @mhowlett @bartelink when I provide the 'message.timeout.ms' value it throws a Local_MsgTimedOut error but it does not give the underlying error. I can see the logs are saying broker is down, but how do I get the message in the catch block.

PS D:\projects\confluent-kafka-dotnet\examples\my-producer> dotnet run %3|1588085333.182|FAIL|rdkafka#producer-1| [thrd:sasl_plaintext://localhost:6667/bootstrap]: sasl_plaintext://localhost:6667/bootstrap: Connect to ipv4#127.0.0.1:6667 failed: No connection could be made because the target machine actively refused it... (after 2018ms in state CONNECT) %3|1588085333.183|ERROR|rdkafka#producer-1| [thrd:sasl_plaintext://localhost:6667/bootstrap]: sasl_plaintext://localhost:6667/bootstrap: Connect to ipv4#127.0.0.1:6667 failed: No connection could be made because the target machine actively refused it... (after 2018ms in state CONNECT) %3|1588085333.187|ERROR|rdkafka#producer-1| [thrd:sasl_plaintext://localhost:6667/bootstrap]: 1/1 brokers are down failed to deliver message: Local: Message timed out [Local_MsgTimedOut] PS D:\projects\confluent-kafka-dotnet\examples\my-producer>

`using System; using System.Threading.Tasks; using Confluent.Kafka;

class Program { public static async Task Main(string[] args) { var config = new ProducerConfig { BootstrapServers = "localhost:6667", SecurityProtocol = SecurityProtocol.SaslPlaintext, SaslMechanism = SaslMechanism.Gssapi, SaslKerberosServiceName = "kafka", //Debug = "security,broker,topic,protocol,msg", MessageTimeoutMs = 10000 };

    // If serializers are not specified, default serializers from
    // `Confluent.Kafka.Serializers` will be automatically used where
    // available. Note: by default strings are encoded as UTF8.
    using (var producer = new ProducerBuilder<string, string>(config).Build())
    {
        try
        {
            string key = "null";
            string val = "text";
            // Note: Awaiting the asynchronous produce request below prevents flow of execution
            // from proceeding until the acknowledgement from the broker is received (at the 
            // expense of low throughput).
            var deliveryReport = await producer.ProduceAsync(
                "test_topic", new Message<string, string> { Key = key, Value = val });

            Console.WriteLine($"delivered to: {deliveryReport.TopicPartitionOffset}");
        }
        catch (ProduceException<string, string> e)
        {
            Console.WriteLine($"failed to deliver message: {e.Message} [{e.Error.Code}]");
        }
    }
}

}`

mhowlett commented 4 years ago

you can't get any more information than that out of a produce exception in this case.

divyas1912 commented 4 years ago

I am also facing same issue in confluent.kafka 1.4.0 library as Kafka producers not throwing exception, it's logging error on console only. Tried the option of specifying messageTimeout as 10000. I have to stop the client for publishing the message to topic during Kafka cluster failure, basically catch the exception and throw it on client.

divyas1912 commented 4 years ago

Is there any solution of the issues

ghost commented 4 years ago

That's the experience I had as well. There is no solution other than relying on the timeout and log messages; remember this .net Kafka client is a wrapper around librdkafka library written in C and the exceptions are not propagated to .net. In Java it works as expected; you do get the exceptions.

mhowlett commented 4 years ago

every produce call has a corresponding delivery report (exposed as an exception in the ProduceAsync case via a callback in the Produce case), which gives as detailed information as possible about any error that occurred - the java client does not provide any additional information.

a produce call will not fail immediately if a broker goes down, because in general you will be connected to a cluster, and this is "normal operating behavior" - the partition leader will just change to another broker and the client will automatically discover that and keep going. in the event the cluster is not contactable, the client will attempt to reconnect as per the timeout configuration.

error handling in a distributed system is difficult, and the idea is that the client abstracts as much of this complexity away as possible.

finally, there are some cases (in particular on startup), where I think we should give some more direct indication of problems.

divyas1912 commented 4 years ago

Thanks for the update. Yes I have tried the produceAsync..but question is, once Kafka cluster is up, is it connect automatically in client end. Or client need to connect back...

anchitj commented 5 months ago

Closing as the original questions are answered. Feel free to reopen if still an issue.