confluentinc / confluent-kafka-python

Confluent's Kafka Python Client
http://docs.confluent.io/current/clients/confluent-kafka-python
Other
84 stars 892 forks source link

Segmentation fault happens when trying to commit a message after the max.poll.interval.ms passed without polling #1707

Closed abdallahashraf22 closed 3 months ago

abdallahashraf22 commented 7 months ago

Description

when trying to commit a message after not polling for anything more than the max.poll.interval.ms, I'm getting a non-recoverable segmentation fault that I can't handle in an exception handler, causing the python interpreter to exit

How to reproduce

consumer = Consumer({
                "bootstrap.servers": cls._bootstrap_servers,
                "group.id": group_id,
                "enable.auto.commit": False,
                "auto.offset.reset": "earliest"
            })
consumer.subscribe([topic_name])
while True:
       logger.info("polling...")
       message = consumer.poll(timeout=10.0)
       if message is None:
            continue
      consumer.pause(consumer.assignment())
      logger.info("consumer has been paused")
      try:
          output = message.value()
          logger.info(f"received message from kafka {output}")
          output = json.loads(output)
          logger.info("-------Consumed Message------")
          logger.info(output)
          processing_output_function(output)    # this processing should take more than the max.poll.interval.ms, which would be 5 minutes by default
     except TypeError as e:
          logger.error(f"json type error: {e}")
     except json.decoder.JSONDecodeError as e:
          logger.error(f"json decode error: {e}")
     except Exception as e:
          logger.error(f"JAIS General Exception: {e}")
     finally:
          logger.info("committing message")
          consumer.commit(message)
          logger.info("message has been committed")
          consumer.resume(consumer.assignment())
          logger.info("consumer has been resumed")

in the section above, if processing_output_function(output) takes more than the max.poll.interval.ms, that particular loop with end correctly, commit the message, and then on the next one, I consume a message that says

Application maximum poll interval (300000ms) exceeded by 231ms

, decoding this fails with " json.decoder.JSONDecodeError" exception, when going to finally commit the message, the logs says "commiting message" segmentation fault

and I don't reach the part about "message has been commited"

I'm not sure if this is a working as intended situation, but it seems weird that it will stop my pthon execution and with no way to handle an exit exception for now I increased the polling max time but can fetch logs later on if requested

Checklist

Please provide the following information:

abdallahashraf22 commented 7 months ago

confirmed the problem not from anything regarding the processing output function, as reproduced the problem with time.sleep

config = {
        "bootstrap.servers": "some host",
        "group.id": "some group id",
        "enable.auto.commit": False,
        "auto.offset.reset": "earliest",
        "max.poll.interval.ms": 60000,
    }
consumer = Consumer(config)
consumer.subscribe(["some_topic"])
while True:
   logger.info("polling...")
   message = consumer.poll(timeout=10.0)
   if message is None:
       continue
   consumer.pause(consumer.assignment())
   logger.info("consumer has been paused")
   try:
       output = message.value()
       logger.info(f"received message from kafka {output}")
       output = json.loads(output)
       if isinstance(output, dict) and output.get("task") == "sleep":
            time.sleep(65)
       logger.info("-------Consumed Message------")
       logger.info(output)
   except TypeError as e:
        logger.error(f"json type error: {e}")
        except json.decoder.JSONDecodeError as e:
        logger.error(f"json decode error: {e}")
        except Exception as e:
        logger.error(f"General Exception: {e}")
   finally:
        logger.info("committing message")
        consumer.commit(message)
        logger.info("message has been committed")
        consumer.resume(consumer.assignment())
        logger.info("consumer has been resumed")

logs

INFO:     __main__|polling...
INFO:     __main__|polling...
INFO:     __main__|polling...
INFO:     __main__|consumer has been paused
INFO:     __main__|received message from kafka b'{'id': 'testing 2', 'task': 'sleep', 'text': 'something'}'
INFO:     __main__|committing message
INFO:     __main__|message has been committed
INFO:     __main__|received message from kafka b'Application maximum poll interval (60000ms) exceeded by 500ms'
ERROR:    __main__|json decode error: Expecting value: line 1 column 1 (char 0)
INFO:     __main__|:committing message
Segmentation fault (core dumped)
pranavrth commented 4 months ago

I am able to reproduce this issue.

Output of the message consumer.poll(timeout=10.0) can be a valid message or can contain an error. In your case, it contains an error and you are trying to commit that error message. Error messages doesn't have valid topic name or offset and hence it is giving SegFault when trying to commit.

Check valid usage in Consumer example

    try:
        while True:
            msg = c.poll(timeout=1.0)
            if msg is None:
                continue
            if msg.error():
                raise KafkaException(msg.error())
            else:
                # Proper message
                sys.stderr.write('%% %s [%d] at offset %d with key %s:\n' %
                                 (msg.topic(), msg.partition(), msg.offset(),
                                  str(msg.key())))
                print(msg.value())
pranavrth commented 4 months ago

I think we shouldn't through SegFault in this case even though this is not the correct usage.

abdallahashraf22 commented 3 months ago

ah I see I had wrong expectations of the error behaviour, sorry

and yeah, I agree it still shouldn't go through the python level and throw a segmentation fault, should be able to except that exception

thanks for the fix, once this pr gets merged feel free to close the issue