apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.89k stars 4.27k forks source link

[Bug]: ReadFromKafka does not work without max_num_records parameter #31400

Open gomerudo opened 6 months ago

gomerudo commented 6 months ago

What happened?

When running a streaming job (with DirectRunner locally and with DataflowRunner on GCP) that uses the apache_beam.io.kafka.ReadFromKafka connector without max_num_records, the job does not process any information and instead gets trapped in an infinite loop of creating consumers that subscribe and get assigned a partition and offset but do not process any information. We are forcing auto.offset.reset = earliest.

We verified that when setting max_num_records the job runs and processes the information correctly both locally and on DataFlow. All of this makes us conclude that this is not a GCP issue but rather a Beam one.

We noticed the infinite loop in the logs and we also noticed that Lenses never reports active members of the consumer group:

image image

We have tried the default Kafka configurations as well as custom ones. I'm just sharing the latest:

            pipeline
            | "ReadFromStream" >> apache_beam.io.kafka.ReadFromKafka(
                consumer_config={  # Also tested with a single broker
                    "bootstrap.servers": "kafka-1782273228-1-1908664276.wmt-uscentral.kafka-v2-qarth-reg-stg.ms-df-messaging.prod-wmt-uscentral-4.prod.walmart.com:9092,kafka-1782274279-1-1908664354.wmt-uscentral.kafka-v2-qarth-reg-stg.ms-df-messaging.prod-wmt-uscentral-5.prod.walmart.com:9092,kafka-1782274320-1-1908664432.wmt-uscentral.kafka-v2-qarth-reg-stg.ms-df-messaging.prod-wmt-uscentral-6.prod.walmart.com:9092",
                    "auto.offset.reset": "earliest",
                    "fetch.max.bytes": "52428800",
                    "fetch.min.bytes": "1",
                    "fetch.max.wait.ms": "1000",
                    "max.poll.interval.ms": "20000",
                    "max.poll.records": "10",
                    "request.timeout.ms": "30000",
                    "session.timeout.ms": "45000",
                    "timeout.ms": "10000",
                    "group.id": "test-group-id",
                    "heartbeat.interval.ms": "200",
                    "reconnect.backoff.ms": "100",
                    "reconnect.backoff.max.ms": "10000",
                },
                topics=["some-topic-i-cannot-share"],
                with_metadata=True,
                # max_num_records=1000  # For testing only

This does not seem to be a problem of our Kafka Topic, since custom python clients (that use kafka-python) run successfully with the exact same Kafka configuration.

Beam SDK language: Python Beam SDK version: 2.52.0

Any feedback is greatly appreciated.

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

liferoad commented 5 months ago

Tracked by the support ticket now.