getsentry / arroyo

A library to build streaming applications that consume from and produce to Kafka.
https://getsentry.github.io/arroyo/
Apache License 2.0
39 stars 6 forks source link

Really Confusing Error #372

Open chokosabe opened 1 month ago

chokosabe commented 1 month ago

Created this Strategy based off the examples.

import json
import logging
from typing import Callable, Mapping, TypeVar

from arroyo.backends.kafka.consumer import KafkaPayload
from arroyo.processing.strategies import CommitOffsets, RunTask
from arroyo.processing.strategies.abstract import (
    ProcessingStrategy,
    ProcessingStrategyFactory,
)
from arroyo.types import Commit, Message, Partition

logger = logging.getLogger(__name__)

T = TypeVar('T')

class DBPersistStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
    """
    A factory that builds a processing strategy for persisting messages to a database.

    The processing function should handle the logic for transforming and saving the messages.
    """

    def __init__(self, processing_function: Callable[[Message[KafkaPayload]], None]):
        self.__processing_function = processing_function

    def create_with_partitions(
        self,
        commit: Commit,
        partitions: Mapping[Partition, int],
    ) -> ProcessingStrategy[KafkaPayload]:

        return RunTask(
            function=self.__processing_function,
            next_step=CommitOffsets(commit),
        )

Trying to run it in tests and getting this error:

                except InvalidMessage as e:
                    self._handle_invalid_message(e)

                else:
                    # Resume if we are currently in a paused state
                    if self.__is_paused:
                        self.__metrics_buffer.incr_counter("arroyo.consumer.resume", 1)
                        self.__consumer.resume([*self.__consumer.tell().keys()])
                        self.__is_paused = False

                    # Clear backpressure timestamp if it is set
                    self._clear_backpressure()

                    self.__message = None
        else:
            if self.__message is not None:
>               raise InvalidStateError(
                    "received message without active processing strategy"
                )
E               arroyo.processing.processor.InvalidStateError: received message without active processing strategy

/usr/local/lib/python3.10/site-packages/arroyo/processing/processor.py:457: InvalidStateError

My question is - what is an active processing strategy?!

Thanks - really need some sane basic examples. The 2 currently there not enough

untitaker commented 1 month ago

can you post the full example where you use the processing strategy? the reason the error message is confusing is because you're tripping an internal assertion that the user is never supposed to see. I don't think our examples can do that