faust-streaming / faust

Python Stream Processing. A Faust fork
https://faust-streaming.github.io/faust/
Other
1.62k stars 180 forks source link

Certain topic partitions are not sending fetch requests since initial start #629

Open Syntaf opened 3 months ago

Syntaf commented 3 months ago

Checklist

FWIW I have already dug through https://github.com/faust-streaming/faust/issues/166 and feel this issue is different enough in it's origins that I'm creating a new ticket to discuss or dig through this error.

My interpretation of this error message is that it's indicating that a particular TopicPartition (topic <-> partition pair) has not executed a fetch request since the workers have initially started.

We've been getting a large amount of these errors anytime a rebalance occurs on our faust pods, and have thus confirmed so far that there's been no negative impact on our ability to consume events or process streams. I've spent the last week+ trying to track down why this might be happening but am at a loss at this point.

From what I understand: Normally this message shouldn't occur because a background task on the Fetcher service; this task populates the fetcher's internal fetch queue with calls to the broker for each topic & partition in the current assignment.

Once the fetch queue is consumed, a TopicPartition's timestamp field is updated to represent the last time a fetch request was performed for the topic & partition.

The source of this error comes from a different background task on the Recovery service however; this task executes on startup or rebalance to build tables out of a topic's changelog and calls verify_event_path along the way. This method eventually checks whether each partition has performed a fetch since it's initial start and if one hasn't, this error message is received.

Expected behavior

Each topic-partition under the current assignments will eventually be fetched within the Fetcher requests routine

Actual behavior

Occasionally faust reports that certain topic-partitions have not proc'd a fetch request since their initial start:

[^---AIOKafkaConsumerThread]: Aiokafka has not sent fetch request for ["<redacted>",0] since start (started 7.12 minutes ago)

Unknowns

I'm stumped as to how our faust consumers are seemingly processing everything as usual yet certain topic partitions never end up performing fetches since the initial start of the workers. Is this the result of a race condition, or some unique detail about how our configuration is setup?

    app = faust.App(
        id=settings.KAFKA_CONFIG["KAFKA_CONSUMER_GROUP"],
        autodiscover=True,
        broker=settings.KAFKA_CONFIG["KAFKA_BROKERS"],
        broker_credentials=broker_credentials,
        key_serializer="raw",
        value_serializer="raw",
        worker_redirect_stdouts=False,
        processing_guarantee="at_least_once",
        web_enabled=False,
        # creates a monitoring topic if not set, we can't make topics in our cluster!
        topic_disable_leader=True,
        topic_partitions=6,
        monitor=get_monitor_for_environment(settings.ENVIRONMENT),
    )

Any thoughts or recommendations here are appreciated! We're seeing tens of thousands of these error messages every week and it's unnecessarily impacting our sentry quota's as these are reported as errors

Versions