confluentinc / confluent-kafka-python

Confluent's Kafka Python Client
http://docs.confluent.io/current/clients/confluent-kafka-python
Other
85 stars 894 forks source link

suspicious memory leak in producer (rdkafka) #1361

Open tigerinus opened 2 years ago

tigerinus commented 2 years ago

Description

I have a microservice that consumes messages from Kafka, do some work with it, and publish the result back to Kafka.

However it quickly get OOMKilled after started.

With help of memory profile, I managed to figure out that it's rdk:broker0 contributed the biggest memory usage (In my example it's a 384MiB pod in Kubernetes)

image

As seen in this report, there is no Python object that holds anything larger than 2MB from GC; It's rdk:broker0 holding 4460 allocations and 165MiB of memory unreleased.

Here is the KafkaProducerService code that calls Producer:

# irrelevant code omitted 

class KafkaProducerService:
    '''
    main class
    '''
    def __init__(self, bootstrap_servers: str, topic: str) -> None:
        self.__producer = Producer({
            'bootstrap.servers': bootstrap_servers
        })

        self.__topic = topic
        self.__count = 0
        self.__previous_count_timestamp = datetime.now().timestamp()

    def publish(self, key: str, value: bytes, headers: dict) -> None:
        '''
        publish message
        '''
        try:
            self.__producer.produce(
                self.__topic,
                key=key,
                value=value,
                headers=headers
            )
        except BufferError as error:
            raise RuntimeError(
                "internal producer message queue is full"
            ) from error
        except KafkaException as error:
            raise RuntimeError(
                "error adding to producer message queue"
            ) from error

        num_messages_to_be_delievered = len(self.__producer)
        if num_messages_to_be_delievered > 1000:
            log.debug("wait for %s messages to be delivered to Kafka...",
                      num_messages_to_be_delievered)
            try:
                num_message = self.__producer.flush()
            except KafkaException as error:
                raise RuntimeError(
                    "error when flushing producer message queue to Kafka"
                ) from error

            log.debug("%d messages still in Kafka", num_message)

        self.__count += 1
        self.__count_published()

    def __count_published(self) -> None:
        current_count_timestamp = datetime.now().timestamp()
        if current_count_timestamp - self.__previous_count_timestamp >= 1:
            self.__previous_count_timestamp = current_count_timestamp

            if self.__count == 0:
                return

            log.info("%d messages published (%s messages pending for delivery)",
                     self.__count, len(self.__producer))
            self.__count = 0

How to reproduce

  1. Install https://bloomberg.github.io/memray
  2. Use the snippet code above, build a Python script that keeps re-publishing Kafka messages back to the same topic.
    • each message should be bigger than 50k
  3. Run the script with memray for like 5mins.

Checklist

Please provide the following information:

>>> confluent_kafka.version()
('1.8.2', 17302016)
>>> confluent_kafka.libversion()
('1.6.0', 17170687)
3.0.0

default, except bootstrap.servers

Reproduced on both Alpine and Debian (Bullseye)

tigerinus commented 2 years ago

If call flush() on every publish, instead of every 1000 messages, memory still leaks but it's just very slow. Will get OOMKill eventuall, like in 30mins.

tigerinus commented 2 years ago

I've added some verbose logging to capture the remaining unpublished messages in Kafka internal queue every second (updated the snippet above):

2022-06-03 13:24:29,463 MainProcess(9) INFO kafka_consumer::__count_consumed - 1 messages consumed - last offset: 62597, last timestamp: 2022-05-29 18:02:19.732000 (1653847339732)
2022-06-03 13:24:35,550 KafkaProducerWorker(10) INFO kafka_producer::__count_published - 999 messages published (0 messages pending for delivery)
2022-06-03 13:24:42,028 KafkaProducerWorker(10) INFO kafka_producer::__count_published - 1001 messages published (0 messages pending for delivery)
2022-06-03 13:24:48,462 KafkaProducerWorker(10) INFO kafka_producer::__count_published - 1001 messages published (0 messages pending for delivery)
2022-06-03 13:24:54,916 KafkaProducerWorker(10) INFO kafka_producer::__count_published - 1001 messages published (0 messages pending for delivery)
2022-06-03 13:24:54,983 MainProcess(9) INFO kafka_consumer::__count_consumed - 1 messages consumed - last offset: 62598, last timestamp: 2022-05-29 18:02:27.617000 (1653847347617)
2022-06-03 13:25:01,379 KafkaProducerWorker(10) INFO kafka_producer::__count_published - 998 messages published (0 messages pending for delivery)
2022-06-03 13:25:07,885 KafkaProducerWorker(10) INFO kafka_producer::__count_published - 1001 messages published (0 messages pending for delivery)
2022-06-03 13:25:14,369 KafkaProducerWorker(10) INFO kafka_producer::__count_published - 1001 messages published (0 messages pending for delivery)
2022-06-03 13:25:20,849 KafkaProducerWorker(10) INFO kafka_producer::__count_published - 1001 messages published (0 messages pending for delivery)
2022-06-03 13:25:27,665 KafkaProducerWorker(10) INFO kafka_producer::__count_published - 1001 messages published (0 messages pending for delivery)
2022-06-03 13:25:34,190 KafkaProducerWorker(10) INFO kafka_producer::__count_published - 1001 messages published (0 messages pending for delivery)
2022-06-03 13:25:40,643 KafkaProducerWorker(10) INFO kafka_producer::__count_published - 1001 messages published (0 messages pending for delivery)
2022-06-03 13:25:46,975 KafkaProducerWorker(10) INFO kafka_producer::__count_published - 996 messages published (0 messages pending for delivery)
2022-06-03 13:25:53,397 KafkaProducerWorker(10) INFO kafka_producer::__count_published - 1001 messages published (0 messages pending for delivery)
2022-06-03 13:25:53,610 MainProcess(9) INFO kafka_consumer::__count_consumed - 1 messages consumed - last offset: 62599, last timestamp: 2022-05-29 18:02:29.433000 (1653847349433)
2022-06-03 13:25:59,878 KafkaProducerWorker(10) INFO kafka_producer::__count_published - 996 messages published (0 messages pending for delivery)
2022-06-03 13:26:06,369 KafkaProducerWorker(10) INFO kafka_producer::__count_published - 999 messages published (0 messages pending for delivery)
2022-06-03 13:26:12,721 KafkaProducerWorker(10) INFO kafka_producer::__count_published - 1001 messages published (0 messages pending for delivery)

Most of time there is 0 message pending for delivery, i.e. all messages are published in time. Thus the high memory usage is unlikely due to the remaining messages in the queue.

Atheuz commented 2 years ago

@tigerinus is this also present in 1.9.0?

mhowlett commented 2 years ago

worth trying 1.9.0, but I don't recall this coming up.

it's unusual to call flush except on producer shutdown. perhaps try poll based solution instead ( something along the lines of https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/asyncio_example.py )

with that said, i'm going to preemptively label this a bug, even though I haven't looked into it as I don't see why this should leak, and I believe you that it does.

shawn35gh commented 2 years ago

Note from one of customers: Has this issue on 1.9.0.

Although, worth pointing out that I still get the leak without calling flush.

mhowlett commented 1 year ago

does the issue persist if you specify a delivery callback method?

aviv-elmakias commented 3 months ago

i'm having the same issue with my fastapi application using the latest version of the lib. just creating a producer without sending any message cause it to leak from the same spot.