dpkp / kafka-python

Python client for Apache Kafka
http://kafka-python.readthedocs.io/
Apache License 2.0
5.6k stars 1.4k forks source link

How to handle timeout aka socket disconnected like in #1323 #2328

Open schumischumi opened 2 years ago

schumischumi commented 2 years ago

Hi,

I have the problem, that my consuming app runs all the time but the producing app only runs like 3-4 hours a day. the result is, that, after all message are consumed, I run into the client timeout like it is described in #1323 I understand that a disconnected is a total normal behavior if the connections.max.idle.ms is reached, but the error messages are a problem for the logging.

The question is, how can I handle this error so it doesn't flood my logs. Of course I could raise the connections.max.idle.ms to like 21 hours but that doesn't seam right.

I use these versions: AMQ Streams 2.1 with Kafka 3.1.0 kafka-python==2.0.2 or kafka-python3==3.0.0

error logs:

ERROR:kafka.conn:<BrokerConnection node_id=0 host=broker-0-kafka-test.apps.cluster.intern:443 <connected> [IPv4 ('<removed>', 443)]>: socket disconnected
WARNING:kafka.client:Node 0 connection failed -- refreshing metadata
ERROR:kafka.consumer.fetcher:Fetch to node 0 failed: KafkaConnectionError: socket disconnected
ERROR:kafka.conn:<BrokerConnection node_id=3 host=broker-3-kafka-test.apps.cluster.intern:443 <connected> [IPv4 ('<removed>', 443)]>: socket disconnected
WARNING:kafka.client:Node 3 connection failed -- refreshing metadata
ERROR:kafka.conn:<BrokerConnection node_id=1 host=broker-1-kafka-test.apps.cluster.intern:443 <connected> [IPv4 ('<removed>', 443)]>: socket disconnected
WARNING:kafka.client:Node 1 connection failed -- refreshing metadata
ERROR:kafka.conn:<BrokerConnection node_id=4 host=broker-4-kafka-test.apps.cluster.intern:443 <connected> [IPv4 ('<removed>', 443)]>: socket disconnected
WARNING:kafka.client:Node 4 connection failed -- refreshing metadata
ERROR:kafka.consumer.fetcher:Fetch to node 4 failed: KafkaConnectionError: socket disconnected
ERROR:kafka.conn:<BrokerConnection node_id=2 host=broker-2-kafka-test.apps.cluster.intern:443 <connected> [IPv4 ('<removed>', 443)]>: socket disconnected
WARNING:kafka.client:Node 2 connection failed -- refreshing metadata
ERROR:kafka.conn:<BrokerConnection node_id=coordinator-1 host=broker-1-kafka-test.apps.cluster.intern:443 <connected> [IPv4 ('<removed>', 443)]>: socket disconnected
WARNING:kafka.client:Node coordinator-1 connection failed -- refreshing metadata
ERROR:kafka.coordinator:Error sending HeartbeatRequest_v1 to node coordinator-1 [KafkaConnectionError: socket disconnected]
WARNING:kafka.coordinator:Marking the coordinator dead (node coordinator-1) for group mygroup: KafkaConnectionError: socket disconnected.
WARNING:kafka.coordinator:Marking the coordinator dead (node coordinator-1) for group mygroup: Node Disconnected.
WARNING:kafka.coordinator.consumer:Auto offset commit failed for group mygroup: NodeNotReadyError: coordinator-1
WARNING:kafka.coordinator.consumer:Auto offset commit failed for group mygroup: NodeNotReadyError: coordinator-1
WARNING:kafka.coordinator.consumer:Auto offset commit failed for group mygroup: NodeNotReadyError: coordinator-1

code

CONSUMER = kafka3.KafkaConsumer(topic,
                                    bootstrap_servers=bootstrap_servers,
                                    group_id=group_id)

def main():
    """main function to pull and handle messages
    """
    log.info("Polling for message ... ")
    while True:
        try:
            for msg in CONSUMER:

                if msg is None:
                    continue
                if msg.error():
                    log.error("An error occurred. Program will exit")
                    break
                else:
                    message = msg.value
                    # handle message
                    ...

        except Exception as exception:
            log.error("Error while handling message: %s",
                      exception)
            sys.exit(1)

        finally:
            CONSUMER.close()
enenuki commented 1 year ago

Any updates on this? I have similar problem.

schumischumi commented 1 year ago

Yes. We switched to the confluent Kafka client, because of this problem and because it gets updated more often

lokeshpkumar commented 1 year ago

@schumischumi After switching to the confluent Kafka client is your new consumer stable. That is does it pick up the messages even after say 20-30 hours of idle time ? Even we are facing similar issue with kafka-python where the client socket gets disconnected every 5 mins (from remote) and if the consumer is left idle for more than 5-6 hours it does not process any new message.

schumischumi commented 1 year ago

Well it still disconnects, but our kafka admins told me that this is totally normal and ok, because the consumer should reconnect by itselft, which it does with confluenct kafka. With kafka-python it not only disconnected, but also crashed the app. We first encountered the problem after we switched the kafka broker from cloudera to another (I guess the plain apache one), but I think the "problem" was the newer version which came with the new broker. In the application I maintained I implemented a workaround by running the container as cronjob on kubernetes (i.e. every 1min if not already running) and inside the app run this loop: start loop 1 of 5 try to consumer messages if there are messages consume not more than 100 messages if you consumed 100 messages or if there are no more message start next loop iteration if all 5 loops are done exit and end container

So if there are no message or the maximum is reached the container will end it self gracefully without crashing and will be restarted a minute later. It's not a beautiful solution but works for this specific app and it fixes the problem till the refactoring.

A colleague of mine implemented and tested the confluent client for another project and after some tinkering with the parameters and the authentication it worked fine. I can't provide you with the parameters cause I quit my job there, but if you want I can ask my (former) colleague if he wants to share the details.

lokeshpkumar commented 1 year ago

@schumischumi Thank a lot for the information. Only if you can, please provide the parameters that would help a lot.