getsentry / arroyo

A library to build streaming applications that consume from and produce to Kafka.
https://getsentry.github.io/arroyo/
Apache License 2.0
39 stars 6 forks source link

Processor does not respect min_commit_frequency_sec with policy ONCE_PER_SECOND #365

Closed mj0nez closed 1 month ago

mj0nez commented 1 month ago

Environment

What version are you running?

Steps to Reproduce

Hi, we have a more like batch throughput, with a high ingestion rate and long periods of no incoming messages. I have noticed that the consumer group offset never reaches 0 during those idle periods, unless we force a flush with a graceful shutdown. Meanwhile all messages were consumed because the output topic contains the exact number of expected messages.

I tried to find the issue on my own but ran out of luck. The following example should reproduce the issue:

import time
from datetime import datetime
from unittest.mock import MagicMock

from arroyo.backends.kafka import KafkaPayload
from arroyo.backends.local.backend import (
    LocalBroker,
    LocalConsumer,
    LocalProducer,
)
from arroyo.backends.local.storages.memory import MemoryMessageStorage
from arroyo.commit import ONCE_PER_SECOND
from arroyo.processing import StreamProcessor
from arroyo.processing.strategies import (
    CommitOffsets,
    ProcessingStrategyFactory,
    Produce,
)
from arroyo.types import Message, Partition, Topic, Value

class SimpleStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
    def __init__(
        self,
        producer,
        producer_topic,
    ) -> None:
        self._producer = producer
        self._producer_topic = producer_topic

    def create_with_partitions(
        self,
        commit,
        _,
    ):
        return Produce(
            producer=self._producer,
            topic=self._producer_topic,
            next_step=CommitOffsets(commit=commit),
        )

def test_all_messages_are_processed():
    ## setup
    broker_storage: MemoryMessageStorage[KafkaPayload] = MemoryMessageStorage()
    broker: LocalBroker[KafkaPayload] = LocalBroker(
        message_storage=broker_storage
    )

    output_topic = Topic("produce")
    input_topic = Topic("consume")
    broker.create_topic(input_topic, 1)
    broker.create_topic(output_topic, 1)

    consumer: LocalConsumer[KafkaPayload] = broker.get_consumer(
        "consumer-group"
    )
    producer: LocalProducer[KafkaPayload] = broker.get_producer()

    processor = StreamProcessor(
        consumer=consumer,
        topic=input_topic,
        processor_factory=SimpleStrategyFactory(
            producer=producer, producer_topic=output_topic
        ),
        commit_policy=ONCE_PER_SECOND,
    )

    n_messages = 3

    # preload the storage with some messages
    for m in range(n_messages):
        broker_storage.produce(
            partition=Partition(Topic("consume"), 0),
            payload=KafkaPayload(None, bytes(m), []),
            timestamp=datetime.now(),
        )

    # we manually run the processor for a while
    for _ in range(n_messages * 5):
        processor._run_once()

    # we should sleep for s > commit_policy.min_commit_interval
    # and run once more to see if we trigger a offset commit
    time.sleep(2)
    processor._run_once()

    # check offsets were committed
    assert consumer.commit_offsets_calls > 1

    # check offset is equal to the number of messages
    assert (
        broker._LocalBroker__offsets["consumer-group"][
            Partition(input_topic, 0)
        ]
        == n_messages
    )

Expected Result

If min_commit_frequency_sec has passed between calls of ProcessingStrategy.poll, I would expect the consumer to commit its offsets.

Actual Result

Although no new messages were ingested, and the processor was running (observed via arroyo.consumer.run.count) it held on to its offsets and did not commit them to the broker but does output these log lines:

2024-05-16T18:25:37.998903Z [info     ] Processing {Partition(topic=Topic(name='preselection'), index=11): 200} service_name=stream-processor
2024-05-16 18:25:39,173 - arroyo.processing.processor - DEBUG - Waited 0.0004 seconds for offsets to be committed to <arroyo.backends.kafka.consumer.KafkaConsumer object at 0x7f7659269a10>.
2024-05-16 18:25:39,173 - arroyo.processing.processor - DEBUG - Waited 0.0004 seconds for offsets to be committed to <arroyo.backends.kafka.consumer.KafkaConsumer object at 0x7f7659269a10>.

The first log line is generated by a function wrapped in a RunTask that is before CommitOffsets in the strategy, and is exactly the offset that is pending commit.

During debugging, I have checked if the CommitOffsets strategy correctly calls on submit and poll, which it does. I have also verified that if I add a self.__commit({}) call to the the StreamProcessor’s run_once method, as new branch when it does not get a message (here), the issue does not persist.

mj0nez commented 1 month ago

I guess I've found the reason... The StreamingProcessor correctly calls the strategy and therefore also our commit step, but right before Healthcheck and Commit there is a Produce step which only polls the next step if the queue is not empty. I guess we should add a guard clause to the poll method because otherwise this results in a deadlock if no new messages are coming in. I’ve discovered this because our container orchestrator constantly marked the pod as unhealthy.

untitaker commented 1 month ago

nailed it, thanks!