confluentinc / confluent-kafka-python

Confluent's Kafka Python Client
http://docs.confluent.io/current/clients/confluent-kafka-python
Other
123 stars 896 forks source link

Consumer not consuming from all assigned partitions #1477

Open na-ka-na opened 1 year ago

na-ka-na commented 1 year ago

Description

We had 100 partitions and 100 consumers and things worked as expected. But last night I increased number of partitions to 1000 and number of consumers to 200.

As expected each of the 200 consumers got assigned 5 partitions each. I confirmed this from consumer logs and ./kafka-consumer-groups.sh --describe

Partitions numbered 0-99 were consumed fully as expected. But only about a fifth of partitions numbered 100-999 were consumed.

Either the partitions were fully consumed, or not a single message was consumed. It is as if the consumer didn't even "connect" or bother to consume from those partitions. It is really bizarre! E.g. see below:

GROUP           TOPIC      PARTITION  CURRENT-OFFSET  LOG-END-OFFSET    LAG              CONSUMER-ID
ETLJob          etl_runs      745        -               124             -               ETLJob-b9fed09c-6b77-4098-8bdf-42859c8f57de
ETLJob          etl_runs      749        121             121             0               ETLJob-b9fed09c-6b77-4098-8bdf-42859c8f57de
ETLJob          etl_runs      746        -               136             -               ETLJob-b9fed09c-6b77-4098-8bdf-42859c8f57de
ETLJob          etl_runs      747        -               138             -               ETLJob-b9fed09c-6b77-4098-8bdf-42859c8f57de
ETLJob          etl_runs      748        -               144             -               ETLJob-b9fed09c-6b77-4098-8bdf-42859c8f57de
ETLJob          etl_runs      93         24312           24312           0               ETLJob-2190c0c3-d562-4e50-89bc-ec3a54c94e89
ETLJob          etl_runs      90         24034           24034           0               ETLJob-2190c0c3-d562-4e50-89bc-ec3a54c94e89
ETLJob          etl_runs      94         25189           25189           0               ETLJob-2190c0c3-d562-4e50-89bc-ec3a54c94e89
ETLJob          etl_runs      91         24027           24027           0               ETLJob-2190c0c3-d562-4e50-89bc-ec3a54c94e89
ETLJob          etl_runs      92         24887           24887           0               ETLJob-2190c0c3-d562-4e50-89bc-ec3a54c94e89
ETLJob          etl_runs      763        -               134             -               ETLJob-beb213ff-4bcb-43d3-b5b7-c3853f5b0e56
ETLJob          etl_runs      760        -               142             -               ETLJob-beb213ff-4bcb-43d3-b5b7-c3853f5b0e56
ETLJob          etl_runs      764        -               133             -               ETLJob-beb213ff-4bcb-43d3-b5b7-c3853f5b0e56
ETLJob          etl_runs      761        -               148             -               ETLJob-beb213ff-4bcb-43d3-b5b7-c3853f5b0e56
ETLJob          etl_runs      762        138             138             0               ETLJob-beb213ff-4bcb-43d3-b5b7-c3853f5b0e56

How to reproduce

Don't know

Checklist

Please provide the following information:

na-ka-na commented 1 year ago

Our consumer loop is as follows:

from confluent_kafka import Consumer, KafkaException, Message

        self.consumer = Consumer(self.conf, logger=self.logger)
        self.consumer.subscribe(
            topics=self.topics,
            on_assign=self._print_topic_subscription_fn("Assigned"),
            on_revoke=self._print_topic_subscription_fn("Revoked"),
            on_lost=self._print_topic_subscription_fn("Lost"),
        )

    def _consume_loop(self, func: Callable[[List[Message]], None], batch_size: int = 1):
        while True:
            messages = self.consumer.consume(num_messages=batch_size, timeout=2.0)
            if not messages:
                continue

            errored_messages = []
            for message in messages:
                if message.error():
                    errored_messages.append(message)
                    self.logger.error(
                        f"Client {self._client_id} got message with error, message={message}, "
                        f"error={message.error()}"
                    )
            if errored_messages:
                raise KafkaException(errored_messages[0].error())

            if self.semantics == ConsumerSemantics.AT_MOST_ONCE:
                self.consumer.commit()

            with timeout(seconds=self.max_batch_processing_seconds):
                func(messages)

            if self.semantics == ConsumerSemantics.AT_LEAST_ONCE:
                self.consumer.commit()
na-ka-na commented 1 year ago

I managed to work around the issue. What I did was forced the consumer to consume one message from stuck partitions and now consumers are recognizing all partitions.

This is weird, not sure what the underlying issue is. But there is some problem surely

dumitrugrl commented 1 year ago

How exactly did you "forced the consumer to consume one message from stuck partitions and now consumers are recognizing all partitions.". Thank you!

SmartXingZhou commented 1 year ago

We encountered the same issue, suspecting it to be a bug on the client side; But there is no strict evidence yet.

ccarvalho commented 2 months ago

@na-ka-na or @SmartXingZhou , any update about this problem?

I'm having the same issue but using an C# Library v1.8.2