IBM / sarama

Sarama is a Go library for Apache Kafka.
MIT License
11.21k stars 1.73k forks source link

kafka: Failed to produce message to topic myRsp: write tcp [ip1]:[port1]->[ip2]:[port2]: write: broken pipe #2900

Open yitian108 opened 1 month ago

yitian108 commented 1 month ago
Description

Hi there, I encountered an old issue similar to #1565 and #2004 when I use AsyncProducer

Versions
Sarama Kafka Go
v1.42.2 3.7.0 1.18
Configuration
cfg.Producer.Return.Successes = true
cfg.Producer.Return.Errors = true
cfg.Producer.RequiredAcks = sarama.WaitForAll

cfg.Producer.Retry.Max = 1
cfg.Net.DialTimeout = 5 * time.Second
cfg.Net.ReadTimeout = 5 * time.Second
Logs
logs: CLICK ME

``` [sarama]2024/05/17 16:47:14 Connected to broker at 192.168.2.148:13644 (registered as #3) [sarama]2024/05/17 16:56:57 client/metadata fetching metadata for all topics from broker 192.168.2.147:13644 [sarama]2024/05/17 16:56:57 client/metadata fetching metadata for all topics from broker 192.168.2.147:13644 [sarama]2024/05/17 16:56:57 client/metadata fetching metadata for all topics from broker 192.168.2.147:13644 [sarama]2024/05/17 16:57:00 client/metadata fetching metadata for all topics from broker 192.168.2.147:13644 [sarama]2024/05/17 16:57:00 client/metadata got error from broker 2 while fetching metadata: EOF [sarama]2024/05/17 16:57:00 Closed connection to broker 192.168.2.147:13644 [sarama]2024/05/17 16:57:00 client/brokers deregistered broker #2 at 192.168.2.147:13644 [sarama]2024/05/17 16:57:01 client/metadata fetching metadata for all topics from broker 192.168.2.148:13644 [sarama]2024/05/17 16:57:01 client/brokers registered new broker #2 at 192.168.2.147:13644 [sarama]2024/05/17 17:06:18 producer/leader/myRsp/3 state change to [retrying-1] [sarama]2024/05/17 17:06:18 producer/leader/myRsp/3 abandoning broker 1 [sarama]2024/05/17 17:06:18 producer/broker/1 input chan closed [sarama]2024/05/17 17:06:18 producer/broker/1 shut down [sarama]2024/05/17 17:06:18 client/metadata fetching metadata for [myRsp] from broker 192.168.2.147:13644 [sarama]2024/05/17 17:06:18 producer/leader/myRsp/3 selected broker 1 [sarama]2024/05/17 17:06:18 producer/broker/1 state change to [open] on myRsp/3 [sarama]2024/05/17 17:06:18 producer/leader/myRsp/3 state change to [flushing-1] [sarama]2024/05/17 17:06:18 producer/leader/myRsp/3 state change to [normal] [sarama]2024/05/17 17:06:18 producer/broker/1 state change to [closing] because write tcp 100.78.225.15:58112->192.168.2.146:13644: write: broken pipe [sarama]2024/05/17 17:06:18 Closed connection to broker 192.168.2.146:13644 [sarama]2024/05/17 17:06:57 client/metadata fetching metadata for all topics from broker 192.168.2.146:13644 [sarama]2024/05/17 17:06:57 Connected to broker at 192.168.2.146:13644 (registered as #1) ```

Additional Context

The occurrence time of the issue is 17:06:18

yitian108 commented 1 month ago

@dnwe @k-wall , could you mind having a look at this?

k-wall commented 1 month ago

I'm not using Sarama much right now. Have you examined the logs from the Broker side?

yitian108 commented 1 month ago

Thanks @k-wall for replying. Unfortunately, there is no broker log while the issue occurs.

Generally, if I set the configuration like this:

cfg.Producer.Retry.Max = 3
cfg.Net.DialTimeout = 30 * time.Second
cfg.Net.ReadTimeout = 30 * time.Second

the AsyncProducer could send message successfully. However, if I configure them like this, the fault detection time will be extended, which will cause the business unit to not accept this delay.

yitian108 commented 1 month ago

@dnwe @k-wall , I reproduced the issue on 27th May, and I found the connection between 192.168.2.148(server) and 100.100.134.251(client) has been finished by kafka server at 15:54:54.104013 (frame NO. 16770976) and this acknowledged by client at 15:54:54.144786 (frame NO. 16771072). Please see the following screenshot: capture

At this moment, the client should not use the finished connection anymore, but the client try to send message again via the outdated connection at 16:01:20. Please see the following screenshot: send failed log

To my surprise, why does the producer continue to use the old connection sometimes? Which parameter determines this?

puellanivis commented 1 month ago

I don’t think there’s anyway to know that the underlying socket has been FIN closed from the remote side, except to attempt to write to it, and receiving a broken pipe result?

yitian108 commented 1 month ago

Thank you, @puellanivis, for your response. As you can see, frame NO. 16771072 has been acknowledged with a FIN from the remote server side. This means that the client should receive a notification that the connection has been closed (finished).

Alternatively, is there a way to quickly establish a new connection upon receiving a broken pipe without having to set cfg.Producer.Retry.Max to 2 or a larger value? Setting cfg.Producer.Retry.Max to a higher value will result in business timeouts.

puellanivis commented 1 month ago

If the problem is that the client is closing down the connection, then yes, it should be silently handling this disconnect scenario, but if it’s a result of the client calling a function to explicitly shutdown the connection to Kafka, then the correct action is not to attempt a reconnect, but rather avoid sending on that same client object that you’ve told to close down.

If the problem is on the server side closing the connection, there’s not really a way for the client to know that this connection has already been lost (I think). I’m also unsure if a retry would attempt a reconnect before sending. 🤷‍♀️

I’m kind of starting to reach outside of my scope of knowledge.