confluentinc / confluent-kafka-python

Confluent's Kafka Python Client
http://docs.confluent.io/current/clients/confluent-kafka-python
Other
78 stars 890 forks source link

Publisher keeps retrying even after delivering failure report? #526

Closed paol closed 5 years ago

paol commented 5 years ago

Description

I'm trying to create unit tests that inject network faults in order to test our fault tolerance code, and I ran into this behaviour that I can't make sense of. This is the timeline of the test:

  1. send 4 messages
  2. sleep 5s
  3. bring down the network in the Kafka broker (single node running in docker) with ip link set eth0 down
  4. send 4 messages
  5. sleep 20s
  6. bring up the network in the Kafka broker with ip link set eth0 up
  7. send 4 messages
  8. consume available messages to verify what was effectively received

Because the message timeout is set to 10s (see config below) I would expect the middle 4 messages to be lost. Sure enough, while sleeping in step 5 the delivery callback gets called indicating failure of delivery of the preceding 4 messages. (The error is KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"})

But in spite of this, by the end of the test all 12 messages were received by the broker! The middle 4 messages are out of order - they arrive last. It looks as if the producer kept retrying the messages even after delivering the failure report.

Checklist

Please provide the following information:

edenhill commented 5 years ago

There were some corner cases with retry-handling that could cause behaviour like this. It is fixed on the librdkafka master branch (upcoming v1.0.0 branch), so it would be great if you could try to reproduce it on librdkafka master.

paol commented 5 years ago

Thanks. Is there a packaged version of 1.0.0rc5 somewhere?

edenhill commented 5 years ago

We're preparing a new RC this week, sit tight

paol commented 5 years ago

Ok, I managed to produce a wheel from the master branch with the scripts in /tools. The reported versions are now ('1.0.0', 1048576) and ('1.0.0', 16777471).

The result is exactly the same as before: all messages reach the broker, the ones sent during the network fault are received last.

Unrelated: in this version the Consumer doesn't seem to be able to recover after the fault. poll() returns KafkaError{code=_NOT_IMPLEMENTED,val=-170,str="Failed to fetch offsets: Local: Not implemented"}

edenhill commented 5 years ago

Message ordering can't be guaranteed with max.in.flight>1 and retries>0 in case of failure, unless the idempotent producer is enabled (enable.idempotence=true). There are some issues with message timeouts that are being worked out on the idemp_msg_tmout branch, which is not yet ready for merging.

As for the Consumer error; that looks interesting, what's your consumer config?

paol commented 5 years ago

Yeah, the ordering isn't really my main concern. The problem is that we're going to have difficulty testing our retransmission logic if librdkafka "lies" about having given up sending the message. Next I'm going to try to mess with the Producer configs to see if I can make the problem go away.

Consumer config only has 'bootstrap.servers' and 'group.id'. As far as the consumer is concerned, the test proceeds like this:

  1. before doing anything else, we subscribe() and poll() once, to get the consumer connected
  2. rest of the test proceeds as described above
  3. we poll() to get the awaiting messages

The error happens in 3

edenhill commented 5 years ago

If a message was transmitted to the broker before the connection went down, no ack was received, and the message later times out, there is no way for the producer to know if the message was actually persisted by the broker. You can use rd_kafka_message_status() to get the level of known persistancy for each message:

edenhill commented 5 years ago

Can you reproduce the consumer issue with debug=cgrp,broker,topic,protocol set on the consumer and provide us with the logs?

paol commented 5 years ago

Can you reproduce the consumer issue with debug=cgrp,broker,topic,protocol set on the consumer and provide us with the logs?

consumer.log In the log, line 222 immediately precedes the poll() returning KafkaError{code=_NOT_IMPLEMENTED,val=-170,str="Failed to fetch offsets: Local: Not implemented"}

paol commented 5 years ago

If a message was transmitted to the broker before the connection went down

That's not the case. As you can see from the timeline in the top post the messages are produced after the network is down. (I just added a 5s delay after killing the network to make sure it wasn't a race condition, no change.)

You can use rd_kafka_message_status() to get the level of known persistancy for each message:

Is that in the python API? I can't find it.

edenhill commented 5 years ago

We're adding msg.status() in the upcoming v1.0.0 release. (cc @rnpridgeon )

edenhill commented 5 years ago

NOT_IMPLEMENTED is a sub-sequent error from the connection timing out, which is also passed to any queued requests. We should hide this error from the user since it is temporary.

paol commented 5 years ago

I took some time to get to the bottom of this with wireshark, and it's not a Kafka problem at all. It was just TCP retransmission persisting and eventually succeeding in delivering the lost packets.

This was annoyingly hard to figure out because the retransmissions are not visible on the client side, only by tapping the network interface of the docker container running Kafka. I guess it's a quirk of the bridge networking setup that docker uses.

Should I close this, or do you want to keep it open to track the client error?

edenhill commented 5 years ago

Whoa, good troubleshooting effort! Glad you found it.

You may close this issue, thanks.