confluentinc / confluent-kafka-dotnet

Confluent's Apache Kafka .NET client
https://github.com/confluentinc/confluent-kafka-dotnet/wiki
Apache License 2.0
44 stars 857 forks source link

Elapsed time to recover connectivity between client / brokers #1642

Open jsolana opened 3 years ago

jsolana commented 3 years ago

Description

Hi all,

I have an app, producing messages. This is the code:

producerConfig.MaxInFlight = 1;
producerConfig.MessageTimeoutMs = 0;
producerConfig.SocketTimeoutMs = 10000;

producerBuilder.SetErrorHandler((_, e) => OnError?.Invoke(new StreamingError { IsFatal = e.IsFatal, Reason = e.Reason }));
var p = producerBuilder.Build();

p.Produce(this.topic, getMessage.Invoke(message),
      r =>
        {
            if (r.Error.IsError)
                OnError?.Invoke(new StreamingError { IsFatal = r.Error.IsFatal , Reason = r.Error.Reason });
        });

//Omit for clarity

 p.Flush(cancellationToken);
// Continue

Other producer configuration parameters (sasl configuration omited):

"BatchNumMessages": "1000",
"LingerMs": "1000",
"CompressionType" : "zstd",
"Acks" : "-1",
"MaxInflight": "1",
"EnableIdempotence" : true,
"SslEndpointIdentificationAlgorithm": "https",
"SecurityProtocol": "SaslSsl",
"SaslMechanism": "ScramSha512",

Environment

How to reproduce

I want to confirm if this is the expected behavior : 1) I establish connection between client and brokers 2) When I produce the message (just before), I lost the connectivity (I force this situation shutingdown VPN connection) and then continue the execution 3) The thread still waiting in Flush method. 3) Each 10 seconds a REQTMOUT message is printed 4) Each 120 seconds producerBuilder.SetErrorHandler code is execute with a connection timeout error

This is the log:

%4|1624532390.033|REQTMOUT|rdkafka#producer-1| [thrd:sasl_ssl://XXX]: sasl_ssl://XXXX:9096/2: Timed out 0 in-flight, 0 retry-queued, 11 out-queue, 0 partially-sent requests
%4|1624532401.043|REQTMOUT|rdkafka#producer-1| [thrd:sasl_ssl://XXX]: sasl_ssl://XXXX:9096/2: Timed out 0 in-flight, 0 retry-queued, 11 out-queue, 0 partially-sent requests
%4|1624532412.053|REQTMOUT|rdkafka#producer-1| [thrd:sasl_ssl://XXX]: sasl_ssl://XXXX:9096/2: Timed out 0 in-flight, 0 retry-queued, 11 out-queue, 0 partially-sent requests
%4|1624532423.062|REQTMOUT|rdkafka#producer-1| [thrd:sasl_ssl://XXX]: sasl_ssl://XXXX:9096/2: Timed out 0 in-flight, 0 retry-queued, 11 out-queue, 0 partially-sent requests
%4|1624532434.072|REQTMOUT|rdkafka#producer-1| [thrd:sasl_ssl://XXX]: sasl_ssl://XXXX:9096/2: Timed out 0 in-flight, 0 retry-queued, 11 out-queue, 0 partially-sent requests
%4|1624532445.083|REQTMOUT|rdkafka#producer-1| [thrd:sasl_ssl://XXX]: sasl_ssl://XXXX:9096/2: Timed out 0 in-flight, 0 retry-queued, 11 out-queue, 0 partially-sent requests
%4|1624532456.093|REQTMOUT|rdkafka#producer-1| [thrd:sasl_ssl://XXX]: sasl_ssl://XXXX:9096/2: Timed out 0 in-flight, 0 retry-queued, 11 out-queue, 0 partially-sent requests
%4|1624532467.103|REQTMOUT|rdkafka#producer-1| [thrd:sasl_ssl://XXX]: sasl_ssl://XXXX:9096/2: Timed out 0 in-flight, 0 retry-queued, 11 out-queue, 0 partially-sent requests
%4|1624532478.112|REQTMOUT|rdkafka#producer-1| [thrd:sasl_ssl://XXX]: sasl_ssl://XXXX:9096/2: Timed out 0 in-flight, 0 retry-queued, 11 out-queue, 0 partially-sent requests
%4|1624532489.121|REQTMOUT|rdkafka#producer-1| [thrd:sasl_ssl://XXX]: sasl_ssl://XXXX:9096/2: Timed out 0 in-flight, 0 retry-queued, 11 out-queue, 0 partially-sent requests
%3|1624532499.985|FAIL|rdkafka#producer-1| [thrd:sasl_ssl://XXX]: sasl_ssl://XXXX:9096/2: Connect to ipv4#14.7.1.9:9096 failed: Connection timed out (after 130991ms in state CONNECT, 1 identical error(s) suppressed)
%3|1624532499.985|FAIL|rdkafka#producer-1| [thrd:sasl_ssl://XXX]: sasl_ssl://XXXX:9096/1: Connect to ipv4#14.7.1.46:9096 failed: Connection timed out (after 130865ms in state CONNECT, 1 identical error(s) suppressed)
ts=2021-06-24 11:01:39,985 level=ERROR logger=NinjectModule`1 - An error occurrs sending data to sink: sasl_ssl://XXXX:9096/2: Connect to ipv4#14.7.1.9:9096 failed: Connection timed out (after 130991ms in state CONNECT, 1 identical error(s) suppressed)
ts=2021-06-24 11:01:39,985 level=ERROR logger=NinjectModule`1 - An error occurrs sending data to sink: sasl_ssl://XXXX:9096/1: Connect to ipv4#14.7.1.46:9096 failed: Connection timed out (after 130865ms in state CONNECT, 1 identical error(s) suppressed)
%3|1624532499.989|FAIL|rdkafka#producer-1| [thrd:sasl_ssl://XXX]: sasl_ssl://XXXX.com:9096/3: Connect to ipv4#14.7.1.81:9096 failed: Connection timed out (after 129869ms in state CONNECT, 1 identical error(s) suppressed)
ts=2021-06-24 11:01:39,989 level=ERROR logger=NinjectModule`1 - An error occurrs sending data to sink: sasl_ssl://XXXX.com:9096/3: Connect to ipv4#14.7.1.81:9096 failed: Connection timed out (after 129869ms in state CONNECT, 1 identical error(s) suppressed)

If I recover the conectivity I have to wait as much 120 seconds (when the next Connection time out happend) and then Flush execution can finish (the message is produced) and continue.

I asume that this interval between Connection timed out (after 129869ms in state CONNECT, 1 identical error(s) suppressed) and the connection recovery is configured with delivery.timeout.ms. Am I right or exists other form to configure connection timeout?

Thanks,

Checklist

Please provide the following information:

mhowlett commented 3 years ago

If I recover the conectivity I have to wait as much 120 seconds (when the next Connection time out happend) and then Flush execution can finish (the message is produced) and continue.

I have not looked into this in detail, but reconnect.backoff.max.ms is 10000, so I would expect the maximum time you should need to wait would be of that order (not 120 seconds). @edenhill ?