When initialising the @cached_property KafkaProducer client, it looks through the broker addresses provided in bootstrap_servers, and retrieves cluster metadata from the Kafka Metadata API
After an upgrade on K8s, every unit has a new IP (i.e no longer the ones defined originally in bootstrap_servers, and so after losing connection to all three, the producer client fails and exits.
Changes Made
fix: retry producer clients if all IPs change
When the KafkaProducer fails, retry re-initialising the client to use new broker IPs
If at least one connects, the new client will be able to connect to any of the other new brokers
Adding some connection retry args also helps
chore: use tenacity for retries
New Behavior
2024-02-22 01:02:35,288 INFO [kafka.conn] (kafka-python-producer-1-network-thread) (close) <BrokerConnection node_id=2 host=kafka-2.kafka-endpoints:9093 <connected> [IPv4 ('10.1.138.118', 9093)]>: Closing connection. KafkaConnectionError: Socket EVENT_READ without in-flight-requests
2024-02-22 01:02:35,288 WARNING [kafka.client] (kafka-python-producer-1-network-thread) (_conn_state_change) Node 2 connection failed -- refreshing metadata
2024-02-22 01:02:35,320 INFO [kafka.conn] (kafka-python-producer-1-network-thread) (connect) <BrokerConnection node_id=2 host=kafka-2.kafka-endpoints:9093 <connecting> [IPv4 ('10.1.138.118', 9093)]>: connecting to kafka-2.kafka-endpoints:9093 [('10.1.138.118', 9093) IPv4]
2024-02-22 01:02:35,321 ERROR [kafka.conn] (kafka-python-producer-1-network-thread) (connect) Connect attempt to <BrokerConnection node_id=2 host=kafka-2.kafka-endpoints:9093 <connecting> [IPv4 ('10.1.138.118', 9093)]> returned error 111. Disconnecting.
2024-02-22 01:02:35,321 INFO [kafka.conn] (kafka-python-producer-1-network-thread) (close) <BrokerConnection node_id=2 host=kafka-2.kafka-endpoints:9093 <connecting> [IPv4 ('10.1.138.118', 9093)]>: Closing connection. KafkaConnectionError: 111 ECONNREFUSED
2024-02-22 01:02:35,321 WARNING [kafka.client] (kafka-python-producer-1-network-thread) (_conn_state_change) Node 2 connection failed -- refreshing metadata
2024-02-22 01:03:00,345 INFO [kafka.conn] (kafka-python-producer-1-network-thread) (connect) <BrokerConnection node_id=2 host=kafka-2.kafka-endpoints:9093 <connecting> [IPv4 ('10.1.138.74', 9093)]>: connecting to kafka-2.kafka-endpoints:9093 [('10.1.138.74', 9093) IPv4]
2024-02-22 01:03:00,345 ERROR [kafka.conn] (kafka-python-producer-1-network-thread) (connect) Connect attempt to <BrokerConnection node_id=2 host=kafka-2.kafka-endpoints:9093 <connecting> [IPv4 ('10.1.138.74', 9093)]> returned error 111. Disconnecting.
2024-02-22 01:03:00,345 INFO [kafka.conn] (kafka-python-producer-1-network-thread) (close) <BrokerConnection node_id=2 host=kafka-2.kafka-endpoints:9093 <connecting> [IPv4 ('10.1.138.74', 9093)]>: Closing connection. KafkaConnectionError: 111 ECONNREFUSED
2024-02-22 01:03:00,345 WARNING [kafka.client] (kafka-python-producer-1-network-thread) (_conn_state_change) Node 2 connection failed -- refreshing metadata
2024-02-22 01:03:02,389 INFO [kafka.conn] (kafka-python-producer-1-network-thread) (connect) <BrokerConnection node_id=2 host=kafka-2.kafka-endpoints:9093 <connecting> [IPv4 ('10.1.138.74', 9093)]>: connecting to kafka-2.kafka-endpoints:9093 [('10.1.138.74', 9093) IPv4]
2024-02-22 01:03:02,401 INFO [kafka.conn] (kafka-python-producer-1-network-thread) (_try_authenticate_scram) <BrokerConnection node_id=2 host=kafka-2.kafka-endpoints:9093 <authenticating> [IPv4 ('10.1.138.74', 9093)]>: Authenticated as relation-11 via SCRAM-SHA-512
2024-02-22 01:03:02,401 INFO [kafka.conn] (kafka-python-producer-1-network-thread) (connect) <BrokerConnection node_id=2 host=kafka-2.kafka-endpoints:9093 <authenticating> [IPv4 ('10.1.138.74', 9093)]>: Connection complete.
2024-02-22 01:03:35,476 INFO [kafka.conn] (kafka-python-producer-1-network-thread) (close) <BrokerConnection node_id=1 host=kafka-1.kafka-endpoints:9093 <connected> [IPv4 ('10.1.138.117', 9093)]>: Closing connection. KafkaConnectionError: Socket EVENT_READ without in-flight-requests
2024-02-22 01:03:35,476 WARNING [kafka.client] (kafka-python-producer-1-network-thread) (_conn_state_change) Node 1 connection failed -- refreshing metadata
2024-02-22 01:03:35,610 INFO [__main__] (MainThread) (produce_message) Message published to topic=HOT-TOPIC, message content: {"timestamp": 1708563815.607964, "_id": "b6c657f7f32f4c5e8b53d2e55b3d24d6", "origin": "producer-0 (10.1.138.123)", "content": "Message #304"}
2024-02-22 01:03:36,114 INFO [__main__] (MainThread) (produce_message) Message published to topic=HOT-TOPIC, message content: {"timestamp": 1708563816.110774, "_id": "8a9e4c97b54b4251966a123268401858", "origin": "producer-0 (10.1.138.123)", "content": "Message #305"}
2024-02-22 01:03:36,621 INFO [__main__] (MainThread) (produce_message) Message published to topic=HOT-TOPIC, message content: {"timestamp": 1708563816.614666, "_id": "ae07d51e82234ef2a367486aa7e0c313", "origin": "producer-0 (10.1.138.123)", "content": "Message #306"}
2024-02-22 01:03:37,124 INFO [__main__] (MainThread) (produce_message) Message published to topic=HOT-TOPIC, message content: {"timestamp": 1708563817.122551, "_id": "042f9339fafd472e873178f22966220c", "origin": "producer-0 (10.1.138.123)", "content": "Message #307"}
2024-02-22 01:03:37,625 INFO [kafka.conn] (kafka-python-producer-1-network-thread) (connect) <BrokerConnection node_id=1 host=kafka-1.kafka-endpoints:9093 <connecting> [IPv4 ('10.1.138.117', 9093)]>: connecting to kafka-1.kafka-endpoints:9093 [('10.1.138.117', 9093) IPv4]
2024-02-22 01:04:13,385 INFO [kafka.conn] (kafka-python-producer-1-network-thread) (close) <BrokerConnection node_id=0 host=kafka-0.kafka-endpoints:9093 <connected> [IPv4 ('10.1.138.68', 9093)]>: Closing connection. KafkaConnectionError: Socket EVENT_READ without in-flight-requests
2024-02-22 01:04:13,385 WARNING [kafka.client] (kafka-python-producer-1-network-thread) (_conn_state_change) Node 0 connection failed -- refreshing metadata
2024-02-22 01:04:37,625 ERROR [__main__] (MainThread) (_refresh_producer) All original broker IPs unreachable, re-establishing client...
2024-02-22 01:04:37,625 WARNING [kafka.producer.record_accumulator] (kafka-python-producer-1-network-thread) (done) Produced messages to topic-partition TopicPartition(topic='HOT-TOPIC', partition=2) with base offset -1 log start offset None and error None.
2024-02-22 01:04:37,625 WARNING [kafka.producer.record_accumulator] (kafka-python-producer-1-network-thread) (abort_expired_batches) Expired 1 batches in accumulator
2024-02-22 01:04:37,626 ERROR [kafka.conn] (kafka-python-producer-1-network-thread) (connect) Connection attempt to <BrokerConnection node_id=1 host=kafka-1.kafka-endpoints:9093 <connecting> [IPv4 ('10.1.138.117', 9093)]> timed out
2024-02-22 01:04:37,626 INFO [kafka.conn] (kafka-python-producer-1-network-thread) (close) <BrokerConnection node_id=1 host=kafka-1.kafka-endpoints:9093 <connecting> [IPv4 ('10.1.138.117', 9093)]>: Closing connection. KafkaConnectionError: timeout
2024-02-22 01:04:37,626 WARNING [kafka.client] (kafka-python-producer-1-network-thread) (_conn_state_change) Node 1 connection failed -- refreshing metadata
2024-02-22 01:04:37,627 INFO [kafka.conn] (kafka-python-producer-1-network-thread) (close) <BrokerConnection node_id=2 host=kafka-2.kafka-endpoints:9093 <connected> [IPv4 ('10.1.138.74', 9093)]>: Closing connection.
2024-02-22 01:04:37,628 INFO [kafka.conn] (MainThread) (connect) <BrokerConnection node_id=bootstrap-0 host=kafka-2.kafka-endpoints:9093 <connecting> [IPv4 ('10.1.138.74', 9093)]>: connecting to kafka-2.kafka-endpoints:9093 [('10.1.138.74', 9093) IPv4]
2024-02-22 01:04:37,628 INFO [kafka.conn] (MainThread) (check_version) Probing node bootstrap-0 broker version
2024-02-22 01:04:37,629 INFO [kafka.conn] (MainThread) (_wrap_ssl) <BrokerConnection node_id=bootstrap-0 host=kafka-2.kafka-endpoints:9093 <handshake> [IPv4 ('10.1.138.74', 9093)]>: Loading SSL CA from /tmp/ca.pem
2024-02-22 01:04:37,639 INFO [kafka.conn] (MainThread) (_try_authenticate_scram) <BrokerConnection node_id=bootstrap-0 host=kafka-2.kafka-endpoints:9093 <authenticating> [IPv4 ('10.1.138.74', 9093)]>: Authenticated as relation-11 via SCRAM-SHA-512
2024-02-22 01:04:37,639 INFO [kafka.conn] (MainThread) (connect) <BrokerConnection node_id=bootstrap-0 host=kafka-2.kafka-endpoints:9093 <authenticating> [IPv4 ('10.1.138.74', 9093)]>: Connection complete.
2024-02-22 01:04:37,742 INFO [kafka.conn] (MainThread) (check_version) Broker version identified as 2.5.0
2024-02-22 01:04:37,742 INFO [kafka.conn] (MainThread) (check_version) Set configuration api_version=(2, 5, 0) to skip auto check_version requests on startup
2024-02-22 01:04:37,760 INFO [kafka.conn] (kafka-python-producer-2-network-thread) (connect) <BrokerConnection node_id=2 host=kafka-2.kafka-endpoints:9093 <connecting> [IPv4 ('10.1.138.74', 9093)]>: connecting to kafka-2.kafka-endpoints:9093 [('10.1.138.74', 9093) IPv4]
2024-02-22 01:04:37,761 INFO [kafka.conn] (kafka-python-producer-2-network-thread) (_wrap_ssl) <BrokerConnection node_id=2 host=kafka-2.kafka-endpoints:9093 <handshake> [IPv4 ('10.1.138.74', 9093)]>: Loading SSL CA from /tmp/ca.pem
2024-02-22 01:04:37,769 INFO [kafka.conn] (kafka-python-producer-2-network-thread) (_try_authenticate_scram) <BrokerConnection node_id=2 host=kafka-2.kafka-endpoints:9093 <authenticating> [IPv4 ('10.1.138.74', 9093)]>: Authenticated as relation-11 via SCRAM-SHA-512
2024-02-22 01:04:37,770 INFO [kafka.conn] (kafka-python-producer-2-network-thread) (connect) <BrokerConnection node_id=2 host=kafka-2.kafka-endpoints:9093 <authenticating> [IPv4 ('10.1.138.74', 9093)]>: Connection complete.
2024-02-22 01:04:37,770 INFO [kafka.conn] (kafka-python-producer-2-network-thread) (close) <BrokerConnection node_id=bootstrap-0 host=kafka-2.kafka-endpoints:9093 <connected> [IPv4 ('10.1.138.74', 9093)]>: Closing connection.
2024-02-22 01:04:38,773 INFO [__main__] (MainThread) (produce_message) Message published to topic=HOT-TOPIC, message content: {"timestamp": 1708563817.624921, "_id": "9ee0ac987a5d4002a65df2f78853ce1b", "origin": "producer-0 (10.1.138.123)", "content": "Message #308"}
2024-02-22 01:04:39,275 INFO [kafka.conn] (kafka-python-producer-2-network-thread) (connect) <BrokerConnection node_id=1 host=kafka-1.kafka-endpoints:9093 <connecting> [IPv4 ('10.1.138.78', 9093)]>: connecting to kafka-1.kafka-endpoints:9093 [('10.1.138.78', 9093) IPv4]
2024-02-22 01:04:39,275 INFO [kafka.conn] (kafka-python-producer-2-network-thread) (_wrap_ssl) <BrokerConnection node_id=1 host=kafka-1.kafka-endpoints:9093 <handshake> [IPv4 ('10.1.138.78', 9093)]>: Loading SSL CA from /tmp/ca.pem
2024-02-22 01:04:39,285 INFO [kafka.conn] (kafka-python-producer-2-network-thread) (_try_authenticate_scram) <BrokerConnection node_id=1 host=kafka-1.kafka-endpoints:9093 <authenticating> [IPv4 ('10.1.138.78', 9093)]>: Authenticated as relation-11 via SCRAM-SHA-512
2024-02-22 01:04:39,286 INFO [kafka.conn] (kafka-python-producer-2-network-thread) (connect) <BrokerConnection node_id=1 host=kafka-1.kafka-endpoints:9093 <authenticating> [IPv4 ('10.1.138.78', 9093)]>: Connection complete.
Context
When initialising the
@cached_property
KafkaProducer client, it looks through the broker addresses provided inbootstrap_servers
, and retrieves cluster metadata from the Kafka Metadata APIAfter an upgrade on K8s, every unit has a new IP (i.e no longer the ones defined originally in
bootstrap_servers
, and so after losing connection to all three, the producer client fails and exits.Changes Made
fix: retry producer clients if all IPs change
chore: use tenacity for retries
New Behavior