getsentry / arroyo

A library to build streaming applications that consume from and produce to Kafka.
https://getsentry.github.io/arroyo/
Apache License 2.0
39 stars 6 forks source link

Unfold removes committable from values in ValueBatch #369

Open mj0nez opened 1 month ago

mj0nez commented 1 month ago

Hi, I’m not sure if this is a really a bug, but I stumbled over the Unfold strategy’s message generation. Consider the use case where our streaming process consists of the following steps:

0) consume -> 1) batch -> 2) process in batch-> 3) unbatch -> 4) process message wise -> 5) commit

When the ValuesBatch is unbatched in step 3, the Unfold strategy creates a new Message instance with a new Value which is then submitted to step 4. Unfortunately, only the last message of the batch gets a committable, although all payloads might have one already. From what I gathered from the BatchStep’s flow, I assumed it would just fan out the messages and submit them one after the other to the following step, thus unchanged.

I believe the current behavior, while useful to reduce the number of commits, does not belong to a generic strategy or at least is a bit hidden. My reasoning for moving this out of the strategy or add a note to both classes, would be that for downstream steps like number 4 it is now impossible to provide Partition and offset when raising an InvalidMessage exception. Furthermore, in case of an exception in the batches last message, the commit information is lost all together.

Environment

arroyo 2.17.4

Steps to Reproduce

from datetime import datetime
from unittest.mock import MagicMock

from arroyo.backends.kafka import KafkaPayload
from arroyo.processing.strategies import BatchStep, UnbatchStep
from arroyo.types import BrokerValue, Message, Partition, Topic

topic = Topic("test")
partition = Partition(topic, 0)

def test_unfold():
    commit = MagicMock()

    unbatch = UnbatchStep(next_step=commit)

    batch = BatchStep(max_batch_size=2, max_batch_time=3, next_step=unbatch)

    for i in range(1, 6):
        batch.submit(
            message=Message(
                BrokerValue(
                    KafkaPayload(None, b"1", []),
                    Partition(topic, 0),
                    i,
                    datetime.now(),
                )
            )
        )

    # last messages in batch - will receive committable
    assert commit.method_calls[1].args[0].committable == {
        Partition(topic=topic, index=0): 3
    }
    assert commit.method_calls[3].args[0].committable == {
        Partition(topic=topic, index=0): 5
    }

    # this will raise
    assert commit.method_calls[2].args[0].committable == {
        Partition(topic=topic, index=0): 4
    }
untitaker commented 1 month ago

Yeah this seems like surprising behavior, feel free to change it. I am not sure if the issue is isolated to unbatch or if batch already loses the commit data.

mj0nez commented 1 month ago

I took a shot, feel free to take it apart :)

onewland commented 2 weeks ago

This has a PR in review: https://github.com/getsentry/arroyo/pull/371

(posting to take it out of our support queue)